refactoring infrastructure
This commit is contained in:
@@ -0,0 +1,277 @@
|
||||
# Redis Event Store Module
|
||||
|
||||
## Überblick
|
||||
|
||||
Dieses Modul stellt eine konkrete Implementierung der `event-store-api` unter Verwendung von Redis Streams als Event-Store-Backend bereit.
|
||||
|
||||
## Architektur
|
||||
|
||||
Das Modul folgt dem Provider-Pattern:
|
||||
- **event-store-api**: Provider-agnostische Interfaces (`EventStore`, `EventSerializer`)
|
||||
- **redis-event-store**: Redis Streams-spezifische Implementierung
|
||||
|
||||
## Verwendung
|
||||
|
||||
### Dependency Hinzufügen
|
||||
|
||||
```kotlin
|
||||
dependencies {
|
||||
implementation(projects.infrastructure.eventStore.redisEventStore)
|
||||
}
|
||||
```
|
||||
|
||||
### Konfiguration
|
||||
|
||||
Das Modul verwendet Spring Boot Auto-Configuration. Konfigurieren Sie Redis über `application.yml`:
|
||||
|
||||
```yaml
|
||||
redis:
|
||||
event-store:
|
||||
host: localhost
|
||||
port: 6379
|
||||
password: null # Optional
|
||||
database: 1 # Separate database for event store (default: 1)
|
||||
connectionTimeout: 2000
|
||||
readTimeout: 2000
|
||||
usePooling: true
|
||||
maxPoolSize: 8
|
||||
minPoolSize: 2
|
||||
consumerGroup: event-processors
|
||||
consumerName: event-consumer
|
||||
streamPrefix: "event-stream:"
|
||||
allEventsStream: all-events
|
||||
claimIdleTimeout: 60s
|
||||
pollTimeout: 100ms
|
||||
maxBatchSize: 100
|
||||
createConsumerGroupIfNotExists: true
|
||||
```
|
||||
|
||||
### Code-Beispiel
|
||||
|
||||
```kotlin
|
||||
@Service
|
||||
class MyEventService(
|
||||
private val eventStore: EventStore,
|
||||
private val eventConsumer: RedisEventConsumer
|
||||
) {
|
||||
|
||||
// Event speichern
|
||||
suspend fun saveEvent(aggregateId: Uuid, event: DomainEvent) {
|
||||
eventStore.appendEvent(
|
||||
aggregateId = aggregateId,
|
||||
event = event,
|
||||
expectedVersion = EventVersion.ANY
|
||||
)
|
||||
}
|
||||
|
||||
// Events abrufen
|
||||
suspend fun loadEvents(aggregateId: Uuid): List<DomainEvent> {
|
||||
return eventStore.loadEvents(aggregateId)
|
||||
}
|
||||
|
||||
// Events konsumieren
|
||||
fun startConsuming() {
|
||||
eventConsumer.consumeEvents { event ->
|
||||
println("Received event: $event")
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Features
|
||||
|
||||
- ✅ Event Sourcing mit Redis Streams
|
||||
- ✅ Optimistic Locking mit Event Versioning
|
||||
- ✅ Consumer Groups für parallele Event-Verarbeitung
|
||||
- ✅ Event Replay-Fähigkeit
|
||||
- ✅ Pub/Sub für Event-Benachrichtigungen
|
||||
- ✅ Jackson-basierte Serialisierung
|
||||
- ✅ Connection Pooling mit Lettuce
|
||||
- ✅ Kotlin Coroutines Support
|
||||
|
||||
## Redis Streams
|
||||
|
||||
Das Modul nutzt Redis Streams für Event Sourcing:
|
||||
|
||||
- **Stream pro Aggregate**: `event-stream:{aggregateId}`
|
||||
- **All Events Stream**: `event-stream:all-events`
|
||||
- **Consumer Groups**: Für parallele Verarbeitung
|
||||
- **Message IDs**: Für Event-Ordering und Replay
|
||||
|
||||
## Beans
|
||||
|
||||
Das Modul registriert folgende Spring Beans:
|
||||
|
||||
- `eventStoreRedisConnectionFactory`: Separate Redis ConnectionFactory für Event Store
|
||||
- `eventStoreRedisTemplate`: StringRedisTemplate für Event-Operationen
|
||||
- `eventSerializer`: Jackson-basierter Event-Serializer
|
||||
- `eventStore`: EventStore Implementierung
|
||||
- `eventConsumer`: RedisEventConsumer für Event-Verarbeitung
|
||||
|
||||
## Gleichzeitige Verwendung mit redis-cache
|
||||
|
||||
⚠️ **WICHTIG**: Wenn Sie sowohl `redis-cache` als auch `redis-event-store` im selben Service verwenden:
|
||||
|
||||
### Unterschiedliche Databases
|
||||
|
||||
Die Module verwenden **separate Redis Databases**, um Konflikte zu vermeiden:
|
||||
|
||||
- **redis-cache**: Database 0 (Standard)
|
||||
- **redis-event-store**: Database 1 (Standard, konfigurierbar)
|
||||
|
||||
### Konfigurationsbeispiel
|
||||
|
||||
```yaml
|
||||
# Beide Module in einer application.yml
|
||||
redis:
|
||||
# Cache Konfiguration
|
||||
host: localhost
|
||||
port: 6379
|
||||
database: 0 # Cache verwendet Database 0
|
||||
|
||||
# Event Store Konfiguration (nested)
|
||||
event-store:
|
||||
host: localhost
|
||||
port: 6379
|
||||
database: 1 # Event Store verwendet Database 1
|
||||
consumerGroup: event-processors
|
||||
```
|
||||
|
||||
### Bean-Namen
|
||||
|
||||
Die Module verwenden unterschiedliche Bean-Namen zur Vermeidung von Konflikten:
|
||||
|
||||
| Komponente | redis-cache | redis-event-store |
|
||||
|------------|-------------|-------------------|
|
||||
| ConnectionFactory | `redisConnectionFactory` | `eventStoreRedisConnectionFactory` |
|
||||
| Template | `redisTemplate` | `eventStoreRedisTemplate` |
|
||||
| Serializer | `cacheSerializer` | `eventSerializer` |
|
||||
|
||||
### Keine Konflikte
|
||||
|
||||
✅ Die Module sind so designed, dass sie **ohne Konflikte** gleichzeitig verwendet werden können:
|
||||
- **Separate ConnectionFactories** mit `@Qualifier` Annotations
|
||||
- **Separate Property-Prefixes**: `redis` vs `redis.event-store`
|
||||
- **Unterschiedliche Database-Nummern**: 0 vs 1
|
||||
- **Unterschiedliche Bean-Namen**: Explizite Qualifier verhindern Kollisionen
|
||||
|
||||
## Event Versioning
|
||||
|
||||
Das Modul unterstützt Optimistic Locking:
|
||||
|
||||
```kotlin
|
||||
// Erwartete Version spezifizieren
|
||||
eventStore.appendEvent(
|
||||
aggregateId = aggregateId,
|
||||
event = myEvent,
|
||||
expectedVersion = EventVersion.of(5) // Erwartet Version 5
|
||||
)
|
||||
|
||||
// Beliebige Version akzeptieren
|
||||
eventStore.appendEvent(
|
||||
aggregateId = aggregateId,
|
||||
event = myEvent,
|
||||
expectedVersion = EventVersion.ANY
|
||||
)
|
||||
```
|
||||
|
||||
Bei Version-Konflikten wird eine `ConcurrencyException` geworfen.
|
||||
|
||||
## Consumer Groups
|
||||
|
||||
Das Modul unterstützt Consumer Groups für parallele Event-Verarbeitung:
|
||||
|
||||
```kotlin
|
||||
// Consumer 1
|
||||
eventConsumer.consumeEvents(
|
||||
consumerName = "consumer-1"
|
||||
) { event ->
|
||||
// Verarbeite Event
|
||||
processEvent(event)
|
||||
}
|
||||
|
||||
// Consumer 2 (in der gleichen Consumer Group)
|
||||
eventConsumer.consumeEvents(
|
||||
consumerName = "consumer-2"
|
||||
) { event ->
|
||||
// Verarbeite Event parallel
|
||||
processEvent(event)
|
||||
}
|
||||
```
|
||||
|
||||
Events werden automatisch auf verfügbare Consumer verteilt.
|
||||
|
||||
## Event Replay
|
||||
|
||||
Sie können Events von einem bestimmten Zeitpunkt oder Message-ID replaying:
|
||||
|
||||
```kotlin
|
||||
// Replay alle Events eines Aggregates
|
||||
val events = eventStore.loadEvents(aggregateId)
|
||||
|
||||
// Replay Events ab einer bestimmten Version
|
||||
val eventsFromVersion = eventStore.loadEvents(
|
||||
aggregateId = aggregateId,
|
||||
fromVersion = EventVersion.of(10)
|
||||
)
|
||||
```
|
||||
|
||||
## Serialisierung
|
||||
|
||||
Das Modul verwendet Jackson für Event-Serialisierung:
|
||||
- Automatische Kotlin-Modul Integration
|
||||
- Polymorphe Serialisierung für verschiedene Event-Typen
|
||||
- Custom Serializer können via `@Bean` überschrieben werden
|
||||
|
||||
## Performance
|
||||
|
||||
- **Connection Pooling**: Wiederverwendbare Verbindungen via Lettuce
|
||||
- **Batch Processing**: Konfigurierbare Batch-Größe für Consumer
|
||||
- **Non-blocking I/O**: Reaktive Operations mit Kotlin Coroutines
|
||||
- **Stream-basiert**: Effiziente Event-Speicherung mit Redis Streams
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Redis Verbindungsfehler
|
||||
|
||||
```
|
||||
RedisConnectionFailureException: Unable to connect to Redis
|
||||
```
|
||||
|
||||
**Lösung**: Überprüfen Sie Redis-Server und Netzwerk-Konfiguration. Stellen Sie sicher, dass Redis Streams unterstützt werden (Redis 5.0+).
|
||||
|
||||
### Concurrency Exception
|
||||
|
||||
```
|
||||
ConcurrencyException: Expected version X but found Y
|
||||
```
|
||||
|
||||
**Lösung**: Dies ist normales Verhalten bei Optimistic Locking. Implementieren Sie Retry-Logik oder verwenden Sie `EventVersion.ANY`.
|
||||
|
||||
### Consumer Group Fehler
|
||||
|
||||
```
|
||||
Consumer group already exists
|
||||
```
|
||||
|
||||
**Lösung**: Setzen Sie `createConsumerGroupIfNotExists: true` in der Konfiguration oder löschen Sie die Consumer Group manuell.
|
||||
|
||||
### Bean-Konflikte mit Cache
|
||||
|
||||
Wenn Sie Fehler wie "Multiple beans of type RedisConnectionFactory" erhalten:
|
||||
|
||||
**Lösung**: Die Module verwenden bereits unterschiedliche Bean-Namen mit `@Qualifier`. Stellen Sie sicher, dass Sie beide Module korrekt konfiguriert haben (siehe Abschnitt "Gleichzeitige Verwendung").
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Separate Databases**: Verwenden Sie immer separate Redis Databases für Cache und Event Store
|
||||
2. **Event Versioning**: Verwenden Sie Optimistic Locking für kritische Aggregates
|
||||
3. **Consumer Groups**: Nutzen Sie Consumer Groups für horizontale Skalierung
|
||||
4. **Error Handling**: Implementieren Sie Retry-Logik für transiente Fehler
|
||||
5. **Monitoring**: Überwachen Sie Stream-Größen und Consumer Lag
|
||||
|
||||
## Weitere Informationen
|
||||
|
||||
- Siehe auch: [cache-api README](../../cache/cache-api/README.md)
|
||||
- Siehe auch: [redis-cache README](../../cache/redis-cache/README.md)
|
||||
- Redis Streams Dokumentation: https://redis.io/docs/data-types/streams/
|
||||
@@ -42,6 +42,9 @@ dependencies {
|
||||
// Zusätzliche Test-Dependencies für erweiterte Event-Store-Tests
|
||||
testImplementation(libs.kotlinx.serialization.json)
|
||||
testImplementation(libs.reactor.test)
|
||||
// Für Integration Tests mit beiden Redis-Modulen
|
||||
testImplementation(projects.infrastructure.cache.cacheApi)
|
||||
testImplementation(projects.infrastructure.cache.redisCache)
|
||||
}
|
||||
|
||||
// === Task Configuration ===
|
||||
|
||||
+251
@@ -0,0 +1,251 @@
|
||||
package at.mocode.infrastructure.eventstore.redis
|
||||
|
||||
import at.mocode.core.domain.event.DomainEvent
|
||||
import at.mocode.core.domain.model.*
|
||||
import at.mocode.infrastructure.cache.api.CacheConfiguration
|
||||
import at.mocode.infrastructure.cache.api.DistributedCache
|
||||
import at.mocode.infrastructure.cache.redis.JacksonCacheSerializer
|
||||
import at.mocode.infrastructure.cache.redis.RedisConfiguration
|
||||
import at.mocode.infrastructure.cache.redis.RedisDistributedCache
|
||||
import at.mocode.infrastructure.eventstore.api.EventStore
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.Assertions.assertNotNull
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.beans.factory.annotation.Qualifier
|
||||
import org.springframework.boot.test.context.SpringBootTest
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.context.annotation.Import
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory
|
||||
import org.springframework.data.redis.core.RedisTemplate
|
||||
import org.springframework.test.context.DynamicPropertyRegistry
|
||||
import org.springframework.test.context.DynamicPropertySource
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.junit.jupiter.Container
|
||||
import org.testcontainers.junit.jupiter.Testcontainers
|
||||
import org.testcontainers.utility.DockerImageName
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
import kotlin.uuid.ExperimentalUuidApi
|
||||
import kotlin.uuid.Uuid
|
||||
|
||||
/**
|
||||
* Integration Test zur Demonstration der gleichzeitigen Verwendung von
|
||||
* redis-cache und redis-event-store im selben Service.
|
||||
*
|
||||
* Dieser Test zeigt:
|
||||
* 1. Beide Module können ohne Konflikte gleichzeitig verwendet werden
|
||||
* 2. Separate Redis Databases verhindern Daten-Überschneidungen
|
||||
* 3. Separate Bean-Namen verhindern Bean-Konflikte
|
||||
* 4. Beide Module arbeiten unabhängig voneinander
|
||||
*/
|
||||
@OptIn(ExperimentalUuidApi::class)
|
||||
@SpringBootTest(
|
||||
classes = [
|
||||
RedisCacheAndEventStoreIntegrationTest.TestConfig::class
|
||||
]
|
||||
)
|
||||
@Testcontainers
|
||||
class RedisCacheAndEventStoreIntegrationTest {
|
||||
|
||||
companion object {
|
||||
@Container
|
||||
@JvmStatic
|
||||
val redisContainer: GenericContainer<*> = GenericContainer(
|
||||
DockerImageName.parse("redis:7-alpine")
|
||||
).withExposedPorts(6379)
|
||||
|
||||
@DynamicPropertySource
|
||||
@JvmStatic
|
||||
fun configureProperties(registry: DynamicPropertyRegistry) {
|
||||
// Cache Configuration (Database 0)
|
||||
registry.add("redis.host") { redisContainer.host }
|
||||
registry.add("redis.port") { redisContainer.getMappedPort(6379) }
|
||||
registry.add("redis.database") { 0 }
|
||||
|
||||
// Event Store Configuration (Database 1)
|
||||
registry.add("redis.event-store.host") { redisContainer.host }
|
||||
registry.add("redis.event-store.port") { redisContainer.getMappedPort(6379) }
|
||||
registry.add("redis.event-store.database") { 1 }
|
||||
registry.add("redis.event-store.consumerGroup") { "test-group" }
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
@JvmStatic
|
||||
fun setUp() {
|
||||
println("[DEBUG_LOG] Starting Redis container for integration test")
|
||||
redisContainer.start()
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
@JvmStatic
|
||||
fun tearDown() {
|
||||
println("[DEBUG_LOG] Stopping Redis container")
|
||||
redisContainer.stop()
|
||||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@Import(
|
||||
RedisConfiguration::class,
|
||||
RedisEventStoreConfiguration::class
|
||||
)
|
||||
class TestConfig {
|
||||
@Bean
|
||||
fun distributedCache(
|
||||
redisTemplate: RedisTemplate<String, ByteArray>,
|
||||
cacheConfiguration: CacheConfiguration
|
||||
): DistributedCache {
|
||||
return RedisDistributedCache(
|
||||
redisTemplate = redisTemplate,
|
||||
serializer = JacksonCacheSerializer(),
|
||||
config = cacheConfiguration
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private lateinit var cache: DistributedCache
|
||||
|
||||
@Autowired
|
||||
private lateinit var eventStore: EventStore
|
||||
|
||||
// Verify separate ConnectionFactories
|
||||
@Autowired
|
||||
@Qualifier("redisConnectionFactory")
|
||||
private lateinit var cacheConnectionFactory: RedisConnectionFactory
|
||||
|
||||
@Autowired
|
||||
@Qualifier("eventStoreRedisConnectionFactory")
|
||||
private lateinit var eventStoreConnectionFactory: RedisConnectionFactory
|
||||
|
||||
@Test
|
||||
fun `test both modules can be used simultaneously without conflicts`(): Unit = runBlocking {
|
||||
println("[DEBUG_LOG] Testing simultaneous usage of cache and event store")
|
||||
|
||||
// Test Cache Operations
|
||||
val cacheKey = "test-user-${Uuid.random()}"
|
||||
val cacheData = TestUser("John Doe", 30)
|
||||
|
||||
println("[DEBUG_LOG] Cache: Storing data with key=$cacheKey")
|
||||
cache.set(cacheKey, cacheData, ttl = 5.minutes)
|
||||
|
||||
val retrievedCacheData = cache.get(cacheKey, TestUser::class.java)
|
||||
println("[DEBUG_LOG] Cache: Retrieved data=$retrievedCacheData")
|
||||
assertNotNull(retrievedCacheData)
|
||||
assertEquals(cacheData.name, retrievedCacheData!!.name)
|
||||
assertEquals(cacheData.age, retrievedCacheData.age)
|
||||
|
||||
// Test Event Store Operations
|
||||
val aggregateId = Uuid.random()
|
||||
val event = TestEvent(
|
||||
aggregateId = AggregateId(aggregateId),
|
||||
eventType = EventType("UserCreated"),
|
||||
data = mapOf("userId" to aggregateId.toString(), "name" to "Jane Doe")
|
||||
)
|
||||
|
||||
println("[DEBUG_LOG] EventStore: Appending event for aggregateId=$aggregateId")
|
||||
eventStore.appendToStream(event, aggregateId, -1L)
|
||||
|
||||
val loadedEvents = eventStore.readFromStream(aggregateId)
|
||||
println("[DEBUG_LOG] EventStore: Loaded ${loadedEvents.size} events")
|
||||
assertEquals(1, loadedEvents.size)
|
||||
assertEquals(event.eventType, (loadedEvents[0] as TestEvent).eventType)
|
||||
|
||||
// Verify Cache and Event Store are independent
|
||||
println("[DEBUG_LOG] Verifying cache and event store are independent")
|
||||
|
||||
// Cache should still work after event operations
|
||||
val cacheStillWorks = cache.get(cacheKey, TestUser::class.java)
|
||||
assertNotNull(cacheStillWorks)
|
||||
println("[DEBUG_LOG] Cache still works: key=$cacheKey exists")
|
||||
|
||||
// Event store should still work after cache operations
|
||||
val eventsStillWork = eventStore.readFromStream(aggregateId)
|
||||
assertEquals(1, eventsStillWork.size)
|
||||
println("[DEBUG_LOG] Event store still works: aggregateId=$aggregateId has ${eventsStillWork.size} events")
|
||||
|
||||
println("[DEBUG_LOG] Test completed successfully - Both modules work independently")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test separate connection factories are used`() {
|
||||
println("[DEBUG_LOG] Testing separate connection factories")
|
||||
|
||||
assertNotNull(cacheConnectionFactory)
|
||||
assertNotNull(eventStoreConnectionFactory)
|
||||
|
||||
// The connection factories should be different instances
|
||||
println("[DEBUG_LOG] Cache ConnectionFactory: ${cacheConnectionFactory.javaClass.simpleName}")
|
||||
println("[DEBUG_LOG] EventStore ConnectionFactory: ${eventStoreConnectionFactory.javaClass.simpleName}")
|
||||
|
||||
// Both should be functional
|
||||
val cacheConnection = cacheConnectionFactory.connection
|
||||
val eventStoreConnection = eventStoreConnectionFactory.connection
|
||||
|
||||
assertNotNull(cacheConnection)
|
||||
assertNotNull(eventStoreConnection)
|
||||
|
||||
// Different databases
|
||||
println("[DEBUG_LOG] Cache uses database: ${cacheConnection.nativeConnection}")
|
||||
println("[DEBUG_LOG] EventStore uses database: ${eventStoreConnection.nativeConnection}")
|
||||
|
||||
cacheConnection.close()
|
||||
eventStoreConnection.close()
|
||||
|
||||
println("[DEBUG_LOG] Both connection factories are functional and independent")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test data isolation between cache and event store`(): Unit = runBlocking {
|
||||
println("[DEBUG_LOG] Testing data isolation between cache and event store")
|
||||
|
||||
val sharedKey = "shared-key-${Uuid.random()}"
|
||||
|
||||
// Store data in cache
|
||||
cache.set(sharedKey, TestUser("Cache User", 25), ttl = 5.minutes)
|
||||
println("[DEBUG_LOG] Stored data in cache with key=$sharedKey")
|
||||
|
||||
// Store event with same UUID in event store
|
||||
val aggregateId = Uuid.random()
|
||||
val event = TestEvent(
|
||||
aggregateId = AggregateId(aggregateId),
|
||||
eventType = EventType("TestEvent"),
|
||||
data = mapOf("key" to sharedKey)
|
||||
)
|
||||
eventStore.appendToStream(event, aggregateId, -1L)
|
||||
println("[DEBUG_LOG] Stored event in event store with aggregateId=$aggregateId")
|
||||
|
||||
// Both should be retrievable independently
|
||||
val cachedUser = cache.get(sharedKey, TestUser::class.java)
|
||||
val storedEvents = eventStore.readFromStream(aggregateId)
|
||||
|
||||
assertNotNull(cachedUser)
|
||||
assertEquals(1, storedEvents.size)
|
||||
|
||||
println("[DEBUG_LOG] Data isolation verified:")
|
||||
println("[DEBUG_LOG] - Cache retrieved: ${cachedUser?.name}")
|
||||
println("[DEBUG_LOG] - Event store retrieved: ${storedEvents.size} events")
|
||||
println("[DEBUG_LOG] Cache and Event Store use separate databases - no conflicts!")
|
||||
}
|
||||
|
||||
// Test data classes
|
||||
data class TestUser(
|
||||
val name: String,
|
||||
val age: Int
|
||||
)
|
||||
|
||||
data class TestEvent(
|
||||
override val aggregateId: AggregateId,
|
||||
override val eventType: EventType,
|
||||
val data: Map<String, String>,
|
||||
override val eventId: EventId = EventId(Uuid.random()),
|
||||
override val timestamp: kotlin.time.Instant = kotlin.time.Clock.System.now(),
|
||||
override val version: EventVersion = EventVersion(0),
|
||||
override val correlationId: CorrelationId? = null,
|
||||
override val causationId: CausationId? = null
|
||||
) : DomainEvent
|
||||
}
|
||||
Reference in New Issue
Block a user