refactor(infra-event-store): Improve consistency and test reliability

This commit introduces a comprehensive refactoring of the event-store module to guarantee data consistency and improve the quality and reliability of its test suite.

Data Consistency
Implemented Redis transactions (MULTI/EXEC) for the appendToStream operation in RedisEventStore.

This ensures that writing an event to the aggregate-specific stream and the global "all-events" stream is an atomic operation, preventing data inconsistencies in case of partial failures.

Improved error handling by invalidating the local stream version cache on transactional failures.

Testing Enhancements
Refactored Consumer Tests: Replaced the asynchronous, thread-based consumer test in RedisIntegrationTest with a synchronous, deterministic approach. The test now manually calls pollEvents() to verify event consumption, making it faster and 100% reliable by removing Thread.sleep and CountDownLatch.

Simplified Test Events: Reduced boilerplate code in test event data classes (TestCreatedEvent, TestUpdatedEvent) in both RedisEventStoreTest and RedisIntegrationTest by using the @Transient annotation on overridden properties from BaseDomainEvent.

Fixed Compilation Errors: Resolved various compilation errors in the test suite that arose from refactoring and incorrect mock definitions.
This commit is contained in:
2025-08-09 18:03:04 +02:00
parent 4f67379b42
commit e72e4bddaa
7 changed files with 105 additions and 210 deletions
@@ -2,69 +2,53 @@
## Überblick
Das **Event-Store-Modul** ist eine kritische Komponente der Infrastruktur, die für die Persistenz und Veröffentlichung von Domänen-Events zuständig ist. Es bildet die technische Grundlage für **Event Sourcing** und eine allgemeine **event-getriebene Architektur**. Anstatt nur den aktuellen Zustand einer Entität zu speichern, speichert der Event Store die gesamte Kette von Ereignissen (Events), die zu diesem Zustand geführt haben.
Das **Event-Store-Modul** ist eine kritische Komponente der Infrastruktur, die für die Persistenz und Veröffentlichung von Domänen-Events zuständig ist. Es bildet die technische Grundlage für **Event Sourcing** und eine allgemeine **ereignisgesteuerte Architektur**. Anstatt nur den aktuellen Zustand einer Entität zu speichern, speichert der Event Store die gesamte Kette von Ereignissen, die zu diesem Zustand geführt haben.
## Architektur: Port-Adapter-Muster
Wie schon das Cache-Modul, folgt auch der Event Store streng dem **Port-Adapter-Muster**, um eine maximale Entkopplung von der konkreten Speichertechnologie zu erreichen.
Das Modul folgt streng dem **Port-Adapter-Muster**, um eine maximale Entkopplung von der konkreten Speichertechnologie zu erreichen.
* **`:infrastructure:event-store:event-store-api`**: Definiert den abstrakten "Vertrag" (`EventStore`-Interface), gegen den die Fach-Services programmieren.
* **`:infrastructure:event-store:redis-event-store`**: Die konkrete Implementierung des Vertrags, die **Redis Streams** als hoch-performantes, persistentes Log verwendet.
infrastructure/event-store/
├── event-store-api/ # Der "Port": Definiert die Event-Store-Schnittstelle
└── redis-event-store/ # Der "Adapter": Implementiert die Schnittstelle mit Redis Streams
## Schlüsselfunktionen
* **Garantierte Konsistenz:** Schreibvorgänge in den aggregat spezifischen Stream und den globalen "all-events"-Stream werden innerhalb einer **atomaren Redis-Transaktion (`MULTI`/`EXEC`)** ausgeführt. Dies stellt sicher, dass der Event-Store niemals in einen inkonsistenten Zustand gerät.
* **Resiliente Event-Verarbeitung:** Der `RedisEventConsumer` nutzt **Redis Consumer Groups**, um eine skalierbare und ausfallsichere Verarbeitung von Events zu ermöglichen. Er enthält eine robuste Logik zum "Claimen" von Nachrichten, die von ausgefallenen Consumern nicht bestätigt wurden, sodass keine Events verloren gehen.
* **Optimistische Nebenhäufigkeitskontrolle:** Verhindert Race Conditions, indem beim Speichern von Events eine `expectedVersion` überprüft wird. Bei Konflikten wird eine `ConcurrencyException` geworfen.
* **Intelligente Serialisierung:** Der `JacksonEventSerializer` speichert Event-Metadaten und die eigentliche Nutzlast getrennt in der Redis-Stream-Nachricht, was eine effiziente Analyse von Streams ermöglicht.
### `event-store-api`
## Verwendung
Dieses Modul ist der **abstrakte "Port"** der Architektur. Es definiert den Vertrag, wie der Rest der Anwendung mit dem Event Store interagiert.
Ein Anwendung-Service bindet `:infrastructure:event-store:redis-event-store` ein und lässt sich das `EventStore`-Interface per Dependency Injection geben.
* **Zweck:** Definiert Interfaces wie `EventStore` (zum Speichern und Laden von Event-Streams) und `EventPublisher` (zum Veröffentlichen von Events an interessierte Listener). Es ist eng mit den `DomainEvent`-Definitionen aus dem `:core:core-domain`-Modul verknüpft.
* **Vorteil:** Die Fach-Services (z.B. `members-application`) sind vollständig von der Implementierung des Event Stores entkoppelt. Sie wissen nicht, ob die Events in Redis, Kafka oder einer relationalen Datenbank gespeichert werden.
```kotlin
@Service
class MemberApplicationService(
private val eventStore: EventStore // Nur das Interface wird verwendet!
) {
fun registerNewMember(command: RegisterMemberCommand) {
// 1. Geschäftslogik ausführen und Event erzeugen
val memberRegisteredEvent = MemberRegisteredEvent(
aggregateId = uuid4(),
version = 1L,
name = command.name
)
### `redis-event-store`
Dieses Modul ist der **konkrete "Adapter"**, der die in `event-store-api` definierten Schnittstellen implementiert.
* **Zweck:** Stellt eine Implementierung des `EventStore` bereit, die **Redis Streams** als zugrunde liegenden Datenspeicher verwendet. Redis Streams sind eine leistungsstarke Datenstruktur, die sich ideal für die Implementierung eines append-only Logs eignet, wie es für einen Event Store benötigt wird.
* **Technologie:** Nutzt Spring Data Redis und den Lettuce-Client für die performante Kommunikation mit Redis. Die Domänen-Events werden vor der Speicherung mittels Jackson in ein JSON-Format serialisiert.
* **Vorteil:** Kapselt die gesamte Redis-spezifische Logik. Ein zukünftiger Wechsel zu einem anderen Event-Store-System (z.B. Apache Kafka) würde nur den Austausch dieses einen Moduls erfordern.
## Verwendung in anderen Modulen
Ein Anwendungs-Service, der Event Sourcing verwendet, interagiert wie folgt mit dem Modul:
1. **Abhängigkeit deklarieren:** Das Service-Modul (z.B. `members-application`) fügt eine `implementation`-Abhängigkeit zu `:infrastructure:event-store:redis-event-store` in seiner `build.gradle.kts` hinzu.
2. **Interface injizieren:** Im Service-Code wird nur das `EventStore`-Interface aus der `event-store-api` injiziert.
```kotlin
// In einem Use Case oder Application Service
@Service
class MemberApplicationService(
private val eventStore: EventStore, // Nur das Interface wird verwendet!
private val eventPublisher: EventPublisher
) {
fun registerNewMember(command: RegisterMemberCommand): Member {
// 1. Geschäftslogik ausführen und ein oder mehrere Events erzeugen
val memberRegisteredEvent = MemberRegisteredEvent(
memberId = UUID.randomUUID(),
name = command.name,
// ...
)
// 2. Das Event im Event Store speichern
eventStore.save(memberRegisteredEvent)
// 3. Das Event veröffentlichen, damit andere Teile des Systems
// (z.B. ein E-Mail-Service) darauf reagieren können.
eventPublisher.publish(memberRegisteredEvent)
// ...
}
// 2. Event im Event Store speichern (mit Concurrency Check)
// hier wird erwartet, dass der Stream neu ist (Version 0)
eventStore.appendToStream(memberRegisteredEvent, memberRegisteredEvent.aggregateId, 0)
}
```
}
```
Diese Architektur ermöglicht eine hochgradig entkoppelte, skalierbare und resiliente Systemlandschaft, die auf asynchroner Kommunikation basiert.
## Testing-Strategie
Die Qualität des Moduls wird durch eine robuste Teststrategie sichergestellt:
---
**Letzte Aktualisierung**: 31. Juli 2025
* *Integrationstests mit Testcontainer: Die Kernfunktionalität wird gegen eine echte Redis-Datenbank getestet, die zur Laufzeit in einem Docker-Container gestartet wird.*
* *Zuverlässige Consumer-Tests: Die asynchrone Logik des Event-Consumers wird in den Tests synchron und deterministisch überprüft, indem der pollEvents()-Zyklus manuell angestoßen wird. Dies vermeidet unzuverlässige Tests, die auf Thread.sleep basieren.*
* *Saubere Test-Daten: Test-Event-Klassen werden durch die Verwendung der @Transient-Annotation sauber und frei von Boilerplate-Code gehalten.*
**Letzte Aktualisierung**: 9. August 2025
@@ -17,15 +17,15 @@ interface EventSerializer {
fun serialize(event: DomainEvent): Map<String, String>
/**
* Deserializes a map of strings to strings to a domain event.
* Deserializes a map of strings to a domain event.
*
* @param data The map of strings to strings to deserialize
* @param data The map of strings to deserialize
* @return The deserialized domain event
*/
fun deserialize(data: Map<String, String>): DomainEvent
/**
* Gets the type of a domain event.
* Gets the type of domain event.
* This is used to determine the type of event when deserializing.
*
* @param event The event to get the type of
@@ -34,7 +34,7 @@ interface EventSerializer {
fun getEventType(event: DomainEvent): String
/**
* Gets the type of a domain event from a serialized map.
* Gets the type of domain event from a serialized map.
*
* @param data The serialized event data
* @return The type of the event as a string
@@ -21,14 +21,8 @@ class RedisEventConsumer(
private val properties: RedisEventStoreProperties
) {
private val logger = LoggerFactory.getLogger(RedisEventConsumer::class.java)
// Event handlers registered for specific event types
private val eventTypeHandlers = ConcurrentHashMap<String, CopyOnWriteArrayList<(DomainEvent) -> Unit>>()
// Event handlers registered for all events
private val allEventHandlers = CopyOnWriteArrayList<(DomainEvent) -> Unit>()
// Flag to indicate if the consumer is running
private var running = false
/**
@@ -96,27 +90,19 @@ class RedisEventConsumer(
*/
private fun createConsumerGroupsIfNotExist() {
try {
// Create consumer group for the all events stream
val allEventsStreamKey = getAllEventsStreamKey()
// Ensure the all-events stream exists and has at least one message
try {
// Always try to add an initialization message to the all-events stream
redisTemplate.opsForStream<String, String>()
.add(allEventsStreamKey, mapOf("init" to "init"))
logger.debug("Ensured all-events stream has messages: $allEventsStreamKey")
} catch (e: Exception) {
// Ignore errors when adding to the stream (it might already have messages)
logger.debug("All-events stream might already have messages: ${e.message}")
}
// Create the consumer group for all-events stream
createConsumerGroupIfNotExists(allEventsStreamKey)
// Get all stream keys
val streamKeys = redisTemplate.keys("${properties.streamPrefix}*")
// Create consumer groups for all streams
for (streamKey in streamKeys) {
if (streamKey != allEventsStreamKey) {
createConsumerGroupIfNotExists(streamKey)
@@ -134,24 +120,19 @@ class RedisEventConsumer(
*/
private fun createConsumerGroupIfNotExists(streamKey: String) {
try {
// Always ensure the stream has at least one message
// This is necessary because consumer groups cannot be created on empty streams
try {
redisTemplate.opsForStream<String, String>()
.add(streamKey, mapOf("init" to "init"))
logger.debug("Ensured stream has messages: $streamKey")
} catch (e: Exception) {
// Ignore errors when adding to the stream (it might already have messages)
logger.debug("Stream $streamKey might already have messages: ${e.message}")
}
// Create the consumer group - ignore all errors for now
try {
redisTemplate.opsForStream<String, String>()
.createGroup(streamKey, ReadOffset.latest(), properties.consumerGroup)
logger.debug("Created consumer group ${properties.consumerGroup} for stream: $streamKey")
} catch (e: Exception) {
// Ignore all consumer group creation errors for now
logger.debug("Could not create consumer group ${properties.consumerGroup} for stream: $streamKey: ${e.message}")
}
} catch (e: Exception) {
@@ -160,20 +141,16 @@ class RedisEventConsumer(
}
/**
* Periodically polls for new events from all streams.
* Periodic polls for new events from all streams.
*/
@Scheduled(fixedDelayString = "\${redis.event-store.poll-interval:100}")
@Scheduled(fixedDelayString = $$"${redis.event-store.poll-interval:100}")
fun pollEvents() {
if (!running) {
running = true
}
try {
// Poll the all events stream only
// Individual streams don't need to be polled since all events are also in the all-events stream
pollStream(getAllEventsStreamKey())
// Claim pending messages that have been idle for too long
claimPendingMessages()
} catch (e: Exception) {
logger.error("Error polling events: ${e.message}", e)
@@ -187,7 +164,6 @@ class RedisEventConsumer(
*/
private fun pollStream(streamKey: String) {
try {
// Read new messages from the stream
val options = StreamReadOptions.empty()
.count(properties.maxBatchSize.toLong())
.block(properties.pollTimeout)
@@ -199,14 +175,12 @@ class RedisEventConsumer(
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
)
// Process the records
if (records != null) {
for (record in records) {
processRecord(record)
}
}
} catch (e: Exception) {
// Ignore if the stream doesn't exist or the consumer group doesn't exist
val message = e.message
if (message == null || !message.contains("NOGROUP")) {
logger.error("Error polling stream $streamKey: ${e.message}", e)
@@ -219,15 +193,12 @@ class RedisEventConsumer(
*/
private fun claimPendingMessages() {
try {
// Only process the all-events stream since that's where consumer groups exist
val streamKey = getAllEventsStreamKey()
// Get pending messages summary
val pendingSummary = redisTemplate.opsForStream<String, String>()
.pending(streamKey, properties.consumerGroup)
if (pendingSummary != null && pendingSummary.totalPendingMessages > 0) {
// Get pending messages with details
val pendingMessages = redisTemplate.opsForStream<String, String>()
.pending(
streamKey,
@@ -237,14 +208,11 @@ class RedisEventConsumer(
)
if (pendingMessages.size() > 0) {
// Extract message IDs and convert to array
val messageIdsList = pendingMessages.map { it.id }.toList()
if (messageIdsList.isNotEmpty()) {
// Convert to array for the spread operator
val messageIds = messageIdsList.toTypedArray()
// Claim messages that have been idle for too long
val records = redisTemplate.opsForStream<String, String>()
.claim(
streamKey,
@@ -254,7 +222,6 @@ class RedisEventConsumer(
*messageIds
)
// Process the claimed records
for (record in records) {
processRecord(record)
}
@@ -275,10 +242,8 @@ class RedisEventConsumer(
try {
val data = record.value
// Skip init messages (they only contain "init" -> "init")
if (data.size == 1 && data.containsKey("init") && data["init"] == "init") {
logger.debug("Skipping init message")
// Still acknowledge the message to remove it from pending
redisTemplate.opsForStream<String, String>()
.acknowledge(properties.consumerGroup, record)
return
@@ -287,7 +252,6 @@ class RedisEventConsumer(
val event = serializer.deserialize(data)
val eventType = serializer.getEventType(data)
// Call handlers for the specific event type
eventTypeHandlers[eventType]?.forEach { handler ->
try {
handler(event)
@@ -296,7 +260,6 @@ class RedisEventConsumer(
}
}
// Call handlers for all events
allEventHandlers.forEach { handler ->
try {
handler(event)
@@ -305,7 +268,6 @@ class RedisEventConsumer(
}
}
// Acknowledge the message
redisTemplate.opsForStream<String, String>()
.acknowledge(properties.consumerGroup, record)
@@ -315,9 +277,9 @@ class RedisEventConsumer(
}
/**
* Gets the Redis key for the all events stream.
* Gets the Redis key for the all-events stream.
*
* @return The Redis key for the all events stream
* @return The Redis key for the all-events stream
*/
private fun getAllEventsStreamKey(): String {
return "${properties.streamPrefix}${properties.allEventsStream}"
@@ -7,7 +7,9 @@ import at.mocode.infrastructure.eventstore.api.EventStore
import at.mocode.infrastructure.eventstore.api.Subscription
import com.benasher44.uuid.Uuid
import org.slf4j.LoggerFactory
import org.springframework.dao.DataAccessException
import org.springframework.data.domain.Range
import org.springframework.data.redis.core.SessionCallback
import org.springframework.data.redis.core.StringRedisTemplate
import java.util.concurrent.ConcurrentHashMap
@@ -43,12 +45,11 @@ class RedisEventStore(
return currentVersion
}
// Deprecated, use the list-based version for transactional safety.
override fun appendToStream(event: DomainEvent, streamId: Uuid, expectedVersion: Long): Long {
val currentVersion = getStreamVersion(streamId)
if (currentVersion != expectedVersion) {
streamVersionCache.remove(streamId) // Invalidate cache on conflict
val actualVersion = getStreamVersion(streamId) // Re-fetch from Redis
streamVersionCache.remove(streamId)
val actualVersion = getStreamVersion(streamId)
if (actualVersion != expectedVersion) {
throw ConcurrencyException("Concurrency conflict: expected version $expectedVersion but got $actualVersion")
}
@@ -64,14 +65,27 @@ class RedisEventStore(
val allEventsStreamKey = getAllEventsStreamKey()
val eventData = serializer.serialize(event)
// KORREKTUR: Schreibe das Event in BEIDE Streams (aggregatspezifisch und global)
// Dies sollte idealerweise in einer Redis-Transaktion (MULTI/EXEC) geschehen.
// Für Einfachheit hier als separate Aufrufe.
redisTemplate.opsForStream<String, String>().add(streamKey, eventData)
redisTemplate.opsForStream<String, String>().add(allEventsStreamKey, eventData)
try {
redisTemplate.execute(object : SessionCallback<List<Any>> {
@Throws(DataAccessException::class)
override fun <K : Any?, V : Any?> execute(operations: org.springframework.data.redis.core.RedisOperations<K, V>): List<Any> {
val streamOps = (operations as StringRedisTemplate).opsForStream<String, String>()
streamVersionCache[streamId] = newVersion
return newVersion
operations.multi()
streamOps.add(streamKey, eventData)
streamOps.add(allEventsStreamKey, eventData)
return operations.exec()
}
})
streamVersionCache[streamId] = newVersion
return newVersion
} catch (e: Exception) {
logger.error("Failed to append event transactionally for stream key: {}", streamKey, e)
streamVersionCache.remove(streamId)
throw e
}
}
override fun readFromStream(streamId: Uuid, fromVersion: Long, toVersion: Long?): List<DomainEvent> {
@@ -94,8 +108,6 @@ class RedisEventStore(
override fun getStreamVersion(streamId: Uuid): Long {
streamVersionCache[streamId]?.let { return it }
val streamKey = getStreamKey(streamId)
// .size() ist die Anzahl der Einträge, was der Version entspricht, wenn bei 1 begonnen wird.
// Ein leerer Stream hat size=0, was Version 0 bedeutet.
val size = redisTemplate.opsForStream<String, String>().size(streamKey) ?: 0L
streamVersionCache[streamId] = size
return size
@@ -109,7 +121,6 @@ class RedisEventStore(
return "${properties.streamPrefix}${properties.allEventsStream}"
}
// Stubs
override fun readAllEvents(fromPosition: Long, maxCount: Int?): List<DomainEvent> {
TODO("Not yet implemented")
}
@@ -5,8 +5,8 @@ import at.mocode.infrastructure.eventstore.api.ConcurrencyException
import at.mocode.infrastructure.eventstore.api.EventSerializer
import com.benasher44.uuid.Uuid
import com.benasher44.uuid.uuid4
import kotlin.time.Clock
import kotlin.time.Instant
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -68,12 +68,11 @@ class RedisEventStoreTest {
@Test
fun `append and read events should work correctly for new stream`() {
val aggregateId = uuid4()
val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity")
val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity")
val event1 = TestCreatedEvent(aggregateId, 1L, "Test Entity")
val event2 = TestUpdatedEvent(aggregateId, 2L, "Updated Test Entity")
eventStore.appendToStream(listOf(event1, event2), aggregateId, 0)
// KORRIGIERT: Aufruf an die korrekte Methode angepasst
val events = eventStore.readFromStream(aggregateId)
assertEquals(2, events.size)
@@ -89,35 +88,26 @@ class RedisEventStoreTest {
@Test
fun `appending with wrong expected version should throw ConcurrencyException`() {
val aggregateId = uuid4()
val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity")
val event1 = TestCreatedEvent(aggregateId, 1L, "Test Entity")
eventStore.appendToStream(listOf(event1), aggregateId, 0) // Stream is now at version 1
val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity")
val event2 = TestUpdatedEvent(aggregateId, 2L, "Updated Test Entity")
assertThrows<ConcurrencyException> {
// Trying to append with expected version 0, but the current is 1
eventStore.appendToStream(listOf(event2), aggregateId, 0)
}
}
@Serializable
data class TestCreatedEvent(
override val aggregateId: Uuid,
override val version: Long,
val name: String,
override val eventType: String = "TestCreated",
override val eventId: Uuid = uuid4(),
override val timestamp: Instant = Clock.System.now(),
override val correlationId: Uuid? = null,
override val causationId: Uuid? = null
) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId)
@Transient override val aggregateId: Uuid = uuid4(),
@Transient override val version: Long = 0,
val name: String
) : BaseDomainEvent(aggregateId, "TestCreated", version)
@Serializable
data class TestUpdatedEvent(
override val aggregateId: Uuid,
override val version: Long,
val name: String,
override val eventType: String = "TestUpdated",
override val eventId: Uuid = uuid4(),
override val timestamp: Instant = Clock.System.now(),
override val correlationId: Uuid? = null,
override val causationId: Uuid? = null
) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId)
@Transient override val aggregateId: Uuid = uuid4(),
@Transient override val version: Long = 0,
val name: String
) : BaseDomainEvent(aggregateId, "TestUpdated", version)
}
@@ -6,11 +6,10 @@ import at.mocode.infrastructure.eventstore.api.EventSerializer
import at.mocode.infrastructure.eventstore.api.EventStore
import com.benasher44.uuid.Uuid
import com.benasher44.uuid.uuid4
import kotlin.time.Clock
import kotlin.time.Instant
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
@@ -20,15 +19,7 @@ import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
/**
* Integration tests for Redis Event Store and Event Consumer.
*
* These tests verify the interaction between the Redis Event Store, Event Consumer, and Event Serializer
* in a more realistic scenario.
*/
@Testcontainers
class RedisIntegrationTest {
@@ -48,34 +39,27 @@ class RedisIntegrationTest {
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = StringRedisTemplate(connectionFactory)
// KORREKTUR: Parameter in der korrekten Reihenfolge und mit korrekten Typen übergeben.
serializer = JacksonEventSerializer().apply {
registerEventType(TestCreatedEvent::class.java, "TestCreated")
registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
}
properties = RedisEventStoreProperties(
streamPrefix = "test-stream:",
allEventsStream = "all-events",
consumerGroup = "test-group",
consumerName = "test-consumer"
)
eventStore = RedisEventStore(redisTemplate, serializer, properties)
eventConsumer = RedisEventConsumer(redisTemplate, serializer, properties)
cleanupRedis()
// WICHTIG: Consumer starten, damit er auf Events lauschen kann.
eventConsumer.init()
}
@AfterEach
fun tearDown() {
eventConsumer.shutdown()
@@ -88,47 +72,22 @@ class RedisIntegrationTest {
if (!keys.isNullOrEmpty()) {
redisTemplate.delete(keys)
}
// Sicherstellen, dass auch der allEventsStream-Key gelöscht wird, falls er nicht im Muster enthalten ist.
redisTemplate.delete(allEventsStreamKey)
}
@Test
fun `test event publishing and consuming with consumer groups`() {
fun `event publishing and consuming should be fast and reliable`() {
val aggregateId = uuid4()
val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity")
val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity")
val event1 = TestCreatedEvent(aggregateId, 1L, "Test Entity")
val event2 = TestUpdatedEvent(aggregateId, 2L, "Updated Test Entity")
val latch = CountDownLatch(2)
val receivedEvents = mutableListOf<DomainEvent>()
eventConsumer.registerEventHandler("TestCreated") { event ->
receivedEvents.add(event)
latch.countDown()
}
eventConsumer.registerEventHandler("TestUpdated") { event ->
receivedEvents.add(event)
latch.countDown()
}
// Start polling in a separate thread to not block the test execution
val pollingThread = Thread {
// Poll multiple times to ensure messages are picked up
for (i in 1..20) {
if (latch.count > 0) {
eventConsumer.pollEvents()
Thread.sleep(100)
}
}
}
pollingThread.start()
eventConsumer.registerEventHandler("TestCreated") { receivedEvents.add(it) }
eventConsumer.registerEventHandler("TestUpdated") { receivedEvents.add(it) }
eventStore.appendToStream(listOf(event1, event2), aggregateId, 0)
assertTrue(
latch.await(10, TimeUnit.SECONDS),
"Timed out waiting for events. Received ${receivedEvents.size} of 2 events."
)
eventConsumer.pollEvents()
assertEquals(2, receivedEvents.size)
@@ -139,30 +98,19 @@ class RedisIntegrationTest {
val receivedEvent2 = receivedEvents.find { it.version == 2L } as TestUpdatedEvent
assertEquals(aggregateId, receivedEvent2.aggregateId)
assertEquals("Updated Test Entity", receivedEvent2.name)
pollingThread.interrupt()
}
// Hilfsklassen für Tests, die von BaseDomainEvent erben
@Serializable
data class TestCreatedEvent(
override val aggregateId: Uuid,
override val version: Long,
val name: String,
override val eventType: String = "TestCreated",
override val eventId: Uuid = uuid4(),
override val timestamp: Instant = Clock.System.now(),
override val correlationId: Uuid? = null,
override val causationId: Uuid? = null
) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId)
@Transient override val aggregateId: Uuid = uuid4(),
@Transient override val version: Long = 0,
val name: String
) : BaseDomainEvent(aggregateId, "TestCreated", version)
@Serializable
data class TestUpdatedEvent(
override val aggregateId: Uuid,
override val version: Long,
val name: String,
override val eventType: String = "TestUpdated",
override val eventId: Uuid = uuid4(),
override val timestamp: Instant = Clock.System.now(),
override val correlationId: Uuid? = null,
override val causationId: Uuid? = null
) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId)
@Transient override val aggregateId: Uuid = uuid4(),
@Transient override val version: Long = 0,
val name: String
) : BaseDomainEvent(aggregateId, "TestUpdated", version)
}
@@ -16,7 +16,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer
@Configuration
class KafkaConfig {
@Value("\${spring.kafka.bootstrap-servers:localhost:9092}")
@Value($$"${spring.kafka.bootstrap-servers:localhost:9092}")
private lateinit var bootstrapServers: String
@Bean