7.7 KiB
Redis Event Store Module
Überblick
Dieses Modul stellt eine konkrete Implementierung der event-store-api unter Verwendung von Redis Streams als Event-Store-Backend bereit.
Architektur
Das Modul folgt dem Provider-Pattern:
- event-store-api: Provider-agnostische Interfaces (
EventStore,EventSerializer) - redis-event-store: Redis Streams-spezifische Implementierung
Verwendung
Dependency Hinzufügen
dependencies {
implementation(projects.infrastructure.eventStore.redisEventStore)
}
Konfiguration
Das Modul verwendet Spring Boot Auto-Configuration. Konfigurieren Sie Redis über application.yml:
redis:
event-store:
host: localhost
port: 6379
password: null # Optional
database: 1 # Separate database for event store (default: 1)
connectionTimeout: 2000
readTimeout: 2000
usePooling: true
maxPoolSize: 8
minPoolSize: 2
consumerGroup: event-processors
consumerName: event-consumer
streamPrefix: "event-stream:"
allEventsStream: all-events
claimIdleTimeout: 60s
pollTimeout: 100ms
maxBatchSize: 100
createConsumerGroupIfNotExists: true
Code-Beispiel
@Service
class MyEventService(
private val eventStore: EventStore,
private val eventConsumer: RedisEventConsumer
) {
// Event speichern
suspend fun saveEvent(aggregateId: Uuid, event: DomainEvent) {
eventStore.appendEvent(
aggregateId = aggregateId,
event = event,
expectedVersion = EventVersion.ANY
)
}
// Events abrufen
suspend fun loadEvents(aggregateId: Uuid): List<DomainEvent> {
return eventStore.loadEvents(aggregateId)
}
// Events konsumieren
fun startConsuming() {
eventConsumer.consumeEvents { event ->
println("Received event: $event")
}
}
}
Features
- ✅ Event Sourcing mit Redis Streams
- ✅ Optimistic Locking mit Event Versioning
- ✅ Consumer Groups für parallele Event-Verarbeitung
- ✅ Event Replay-Fähigkeit
- ✅ Pub/Sub für Event-Benachrichtigungen
- ✅ Jackson-basierte Serialisierung
- ✅ Connection Pooling mit Lettuce
- ✅ Kotlin Coroutines Support
Redis Streams
Das Modul nutzt Redis Streams für Event Sourcing:
- Stream pro Aggregate:
event-stream:{aggregateId} - All Events Stream:
event-stream:all-events - Consumer Groups: Für parallele Verarbeitung
- Message IDs: Für Event-Ordering und Replay
Beans
Das Modul registriert folgende Spring Beans:
eventStoreRedisConnectionFactory: Separate Redis ConnectionFactory für Event StoreeventStoreRedisTemplate: StringRedisTemplate für Event-OperationeneventSerializer: Jackson-basierter Event-SerializereventStore: EventStore ImplementierungeventConsumer: RedisEventConsumer für Event-Verarbeitung
Gleichzeitige Verwendung mit redis-cache
⚠️ WICHTIG: Wenn Sie sowohl redis-cache als auch redis-event-store im selben Service verwenden:
Unterschiedliche Databases
Die Module verwenden separate Redis Databases, um Konflikte zu vermeiden:
- redis-cache: Database 0 (Standard)
- redis-event-store: Database 1 (Standard, konfigurierbar)
Konfigurationsbeispiel
# Beide Module in einer application.yml
redis:
# Cache Konfiguration
host: localhost
port: 6379
database: 0 # Cache verwendet Database 0
# Event Store Konfiguration (nested)
event-store:
host: localhost
port: 6379
database: 1 # Event Store verwendet Database 1
consumerGroup: event-processors
Bean-Namen
Die Module verwenden unterschiedliche Bean-Namen zur Vermeidung von Konflikten:
| Komponente | redis-cache | redis-event-store |
|---|---|---|
| ConnectionFactory | redisConnectionFactory |
eventStoreRedisConnectionFactory |
| Template | redisTemplate |
eventStoreRedisTemplate |
| Serializer | cacheSerializer |
eventSerializer |
Keine Konflikte
✅ Die Module sind so designed, dass sie ohne Konflikte gleichzeitig verwendet werden können:
- Separate ConnectionFactories mit
@QualifierAnnotations - Separate Property-Prefixes:
redisvsredis.event-store - Unterschiedliche Database-Nummern: 0 vs 1
- Unterschiedliche Bean-Namen: Explizite Qualifier verhindern Kollisionen
Event Versioning
Das Modul unterstützt Optimistic Locking:
// Erwartete Version spezifizieren
eventStore.appendEvent(
aggregateId = aggregateId,
event = myEvent,
expectedVersion = EventVersion.of(5) // Erwartet Version 5
)
// Beliebige Version akzeptieren
eventStore.appendEvent(
aggregateId = aggregateId,
event = myEvent,
expectedVersion = EventVersion.ANY
)
Bei Version-Konflikten wird eine ConcurrencyException geworfen.
Consumer Groups
Das Modul unterstützt Consumer Groups für parallele Event-Verarbeitung:
// Consumer 1
eventConsumer.consumeEvents(
consumerName = "consumer-1"
) { event ->
// Verarbeite Event
processEvent(event)
}
// Consumer 2 (in der gleichen Consumer Group)
eventConsumer.consumeEvents(
consumerName = "consumer-2"
) { event ->
// Verarbeite Event parallel
processEvent(event)
}
Events werden automatisch auf verfügbare Consumer verteilt.
Event Replay
Sie können Events von einem bestimmten Zeitpunkt oder Message-ID replaying:
// Replay alle Events eines Aggregates
val events = eventStore.loadEvents(aggregateId)
// Replay Events ab einer bestimmten Version
val eventsFromVersion = eventStore.loadEvents(
aggregateId = aggregateId,
fromVersion = EventVersion.of(10)
)
Serialisierung
Das Modul verwendet Jackson für Event-Serialisierung:
- Automatische Kotlin-Modul Integration
- Polymorphe Serialisierung für verschiedene Event-Typen
- Custom Serializer können via
@Beanüberschrieben werden
Performance
- Connection Pooling: Wiederverwendbare Verbindungen via Lettuce
- Batch Processing: Konfigurierbare Batch-Größe für Consumer
- Non-blocking I/O: Reaktive Operations mit Kotlin Coroutines
- Stream-basiert: Effiziente Event-Speicherung mit Redis Streams
Troubleshooting
Redis Verbindungsfehler
RedisConnectionFailureException: Unable to connect to Redis
Lösung: Überprüfen Sie Redis-Server und Netzwerk-Konfiguration. Stellen Sie sicher, dass Redis Streams unterstützt werden (Redis 5.0+).
Concurrency Exception
ConcurrencyException: Expected version X but found Y
Lösung: Dies ist normales Verhalten bei Optimistic Locking. Implementieren Sie Retry-Logik oder verwenden Sie EventVersion.ANY.
Consumer Group Fehler
Consumer group already exists
Lösung: Setzen Sie createConsumerGroupIfNotExists: true in der Konfiguration oder löschen Sie die Consumer Group manuell.
Bean-Konflikte mit Cache
Wenn Sie Fehler wie "Multiple beans of type RedisConnectionFactory" erhalten:
Lösung: Die Module verwenden bereits unterschiedliche Bean-Namen mit @Qualifier. Stellen Sie sicher, dass Sie beide Module korrekt konfiguriert haben (siehe Abschnitt "Gleichzeitige Verwendung").
Best Practices
- Separate Databases: Verwenden Sie immer separate Redis Databases für Cache und Event Store
- Event Versioning: Verwenden Sie Optimistic Locking für kritische Aggregates
- Consumer Groups: Nutzen Sie Consumer Groups für horizontale Skalierung
- Error Handling: Implementieren Sie Retry-Logik für transiente Fehler
- Monitoring: Überwachen Sie Stream-Größen und Consumer Lag
Weitere Informationen
- Siehe auch: cache-api README
- Siehe auch: redis-cache README
- Redis Streams Dokumentation: https://redis.io/docs/data-types/streams/