diff --git a/infrastructure/event-store/README-INFRA-EVENT-STORE.md b/infrastructure/event-store/README-INFRA-EVENT-STORE.md index f25c4cfa..6c74d3bd 100644 --- a/infrastructure/event-store/README-INFRA-EVENT-STORE.md +++ b/infrastructure/event-store/README-INFRA-EVENT-STORE.md @@ -2,69 +2,53 @@ ## Überblick -Das **Event-Store-Modul** ist eine kritische Komponente der Infrastruktur, die für die Persistenz und Veröffentlichung von Domänen-Events zuständig ist. Es bildet die technische Grundlage für **Event Sourcing** und eine allgemeine **event-getriebene Architektur**. Anstatt nur den aktuellen Zustand einer Entität zu speichern, speichert der Event Store die gesamte Kette von Ereignissen (Events), die zu diesem Zustand geführt haben. +Das **Event-Store-Modul** ist eine kritische Komponente der Infrastruktur, die für die Persistenz und Veröffentlichung von Domänen-Events zuständig ist. Es bildet die technische Grundlage für **Event Sourcing** und eine allgemeine **ereignisgesteuerte Architektur**. Anstatt nur den aktuellen Zustand einer Entität zu speichern, speichert der Event Store die gesamte Kette von Ereignissen, die zu diesem Zustand geführt haben. ## Architektur: Port-Adapter-Muster -Wie schon das Cache-Modul, folgt auch der Event Store streng dem **Port-Adapter-Muster**, um eine maximale Entkopplung von der konkreten Speichertechnologie zu erreichen. +Das Modul folgt streng dem **Port-Adapter-Muster**, um eine maximale Entkopplung von der konkreten Speichertechnologie zu erreichen. +* **`:infrastructure:event-store:event-store-api`**: Definiert den abstrakten "Vertrag" (`EventStore`-Interface), gegen den die Fach-Services programmieren. +* **`:infrastructure:event-store:redis-event-store`**: Die konkrete Implementierung des Vertrags, die **Redis Streams** als hoch-performantes, persistentes Log verwendet. -infrastructure/event-store/ -├── event-store-api/ # Der "Port": Definiert die Event-Store-Schnittstelle -└── redis-event-store/ # Der "Adapter": Implementiert die Schnittstelle mit Redis Streams +## Schlüsselfunktionen +* **Garantierte Konsistenz:** Schreibvorgänge in den aggregat spezifischen Stream und den globalen "all-events"-Stream werden innerhalb einer **atomaren Redis-Transaktion (`MULTI`/`EXEC`)** ausgeführt. Dies stellt sicher, dass der Event-Store niemals in einen inkonsistenten Zustand gerät. +* **Resiliente Event-Verarbeitung:** Der `RedisEventConsumer` nutzt **Redis Consumer Groups**, um eine skalierbare und ausfallsichere Verarbeitung von Events zu ermöglichen. Er enthält eine robuste Logik zum "Claimen" von Nachrichten, die von ausgefallenen Consumern nicht bestätigt wurden, sodass keine Events verloren gehen. +* **Optimistische Nebenhäufigkeitskontrolle:** Verhindert Race Conditions, indem beim Speichern von Events eine `expectedVersion` überprüft wird. Bei Konflikten wird eine `ConcurrencyException` geworfen. +* **Intelligente Serialisierung:** Der `JacksonEventSerializer` speichert Event-Metadaten und die eigentliche Nutzlast getrennt in der Redis-Stream-Nachricht, was eine effiziente Analyse von Streams ermöglicht. -### `event-store-api` +## Verwendung -Dieses Modul ist der **abstrakte "Port"** der Architektur. Es definiert den Vertrag, wie der Rest der Anwendung mit dem Event Store interagiert. +Ein Anwendung-Service bindet `:infrastructure:event-store:redis-event-store` ein und lässt sich das `EventStore`-Interface per Dependency Injection geben. -* **Zweck:** Definiert Interfaces wie `EventStore` (zum Speichern und Laden von Event-Streams) und `EventPublisher` (zum Veröffentlichen von Events an interessierte Listener). Es ist eng mit den `DomainEvent`-Definitionen aus dem `:core:core-domain`-Modul verknüpft. -* **Vorteil:** Die Fach-Services (z.B. `members-application`) sind vollständig von der Implementierung des Event Stores entkoppelt. Sie wissen nicht, ob die Events in Redis, Kafka oder einer relationalen Datenbank gespeichert werden. +```kotlin +@Service +class MemberApplicationService( + private val eventStore: EventStore // Nur das Interface wird verwendet! +) { + fun registerNewMember(command: RegisterMemberCommand) { + // 1. Geschäftslogik ausführen und Event erzeugen + val memberRegisteredEvent = MemberRegisteredEvent( + aggregateId = uuid4(), + version = 1L, + name = command.name + ) -### `redis-event-store` - -Dieses Modul ist der **konkrete "Adapter"**, der die in `event-store-api` definierten Schnittstellen implementiert. - -* **Zweck:** Stellt eine Implementierung des `EventStore` bereit, die **Redis Streams** als zugrunde liegenden Datenspeicher verwendet. Redis Streams sind eine leistungsstarke Datenstruktur, die sich ideal für die Implementierung eines append-only Logs eignet, wie es für einen Event Store benötigt wird. -* **Technologie:** Nutzt Spring Data Redis und den Lettuce-Client für die performante Kommunikation mit Redis. Die Domänen-Events werden vor der Speicherung mittels Jackson in ein JSON-Format serialisiert. -* **Vorteil:** Kapselt die gesamte Redis-spezifische Logik. Ein zukünftiger Wechsel zu einem anderen Event-Store-System (z.B. Apache Kafka) würde nur den Austausch dieses einen Moduls erfordern. - -## Verwendung in anderen Modulen - -Ein Anwendungs-Service, der Event Sourcing verwendet, interagiert wie folgt mit dem Modul: - -1. **Abhängigkeit deklarieren:** Das Service-Modul (z.B. `members-application`) fügt eine `implementation`-Abhängigkeit zu `:infrastructure:event-store:redis-event-store` in seiner `build.gradle.kts` hinzu. - -2. **Interface injizieren:** Im Service-Code wird nur das `EventStore`-Interface aus der `event-store-api` injiziert. - - ```kotlin - // In einem Use Case oder Application Service - @Service - class MemberApplicationService( - private val eventStore: EventStore, // Nur das Interface wird verwendet! - private val eventPublisher: EventPublisher - ) { - fun registerNewMember(command: RegisterMemberCommand): Member { - // 1. Geschäftslogik ausführen und ein oder mehrere Events erzeugen - val memberRegisteredEvent = MemberRegisteredEvent( - memberId = UUID.randomUUID(), - name = command.name, - // ... - ) - - // 2. Das Event im Event Store speichern - eventStore.save(memberRegisteredEvent) - - // 3. Das Event veröffentlichen, damit andere Teile des Systems - // (z.B. ein E-Mail-Service) darauf reagieren können. - eventPublisher.publish(memberRegisteredEvent) - - // ... - } + // 2. Event im Event Store speichern (mit Concurrency Check) + // hier wird erwartet, dass der Stream neu ist (Version 0) + eventStore.appendToStream(memberRegisteredEvent, memberRegisteredEvent.aggregateId, 0) } - ``` +} +``` -Diese Architektur ermöglicht eine hochgradig entkoppelte, skalierbare und resiliente Systemlandschaft, die auf asynchroner Kommunikation basiert. +## Testing-Strategie +Die Qualität des Moduls wird durch eine robuste Teststrategie sichergestellt: ---- -**Letzte Aktualisierung**: 31. Juli 2025 +* *Integrationstests mit Testcontainer: Die Kernfunktionalität wird gegen eine echte Redis-Datenbank getestet, die zur Laufzeit in einem Docker-Container gestartet wird.* + +* *Zuverlässige Consumer-Tests: Die asynchrone Logik des Event-Consumers wird in den Tests synchron und deterministisch überprüft, indem der pollEvents()-Zyklus manuell angestoßen wird. Dies vermeidet unzuverlässige Tests, die auf Thread.sleep basieren.* + +* *Saubere Test-Daten: Test-Event-Klassen werden durch die Verwendung der @Transient-Annotation sauber und frei von Boilerplate-Code gehalten.* + +**Letzte Aktualisierung**: 9. August 2025 diff --git a/infrastructure/event-store/event-store-api/src/main/kotlin/at/mocode/infrastructure/eventstore/api/EventSerializer.kt b/infrastructure/event-store/event-store-api/src/main/kotlin/at/mocode/infrastructure/eventstore/api/EventSerializer.kt index 1f257c11..8653238c 100644 --- a/infrastructure/event-store/event-store-api/src/main/kotlin/at/mocode/infrastructure/eventstore/api/EventSerializer.kt +++ b/infrastructure/event-store/event-store-api/src/main/kotlin/at/mocode/infrastructure/eventstore/api/EventSerializer.kt @@ -17,15 +17,15 @@ interface EventSerializer { fun serialize(event: DomainEvent): Map /** - * Deserializes a map of strings to strings to a domain event. + * Deserializes a map of strings to a domain event. * - * @param data The map of strings to strings to deserialize + * @param data The map of strings to deserialize * @return The deserialized domain event */ fun deserialize(data: Map): DomainEvent /** - * Gets the type of a domain event. + * Gets the type of domain event. * This is used to determine the type of event when deserializing. * * @param event The event to get the type of @@ -34,7 +34,7 @@ interface EventSerializer { fun getEventType(event: DomainEvent): String /** - * Gets the type of a domain event from a serialized map. + * Gets the type of domain event from a serialized map. * * @param data The serialized event data * @return The type of the event as a string diff --git a/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventConsumer.kt b/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventConsumer.kt index ab1051a8..911b57d9 100644 --- a/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventConsumer.kt +++ b/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventConsumer.kt @@ -21,14 +21,8 @@ class RedisEventConsumer( private val properties: RedisEventStoreProperties ) { private val logger = LoggerFactory.getLogger(RedisEventConsumer::class.java) - - // Event handlers registered for specific event types private val eventTypeHandlers = ConcurrentHashMap Unit>>() - - // Event handlers registered for all events private val allEventHandlers = CopyOnWriteArrayList<(DomainEvent) -> Unit>() - - // Flag to indicate if the consumer is running private var running = false /** @@ -96,27 +90,19 @@ class RedisEventConsumer( */ private fun createConsumerGroupsIfNotExist() { try { - // Create consumer group for the all events stream val allEventsStreamKey = getAllEventsStreamKey() - - // Ensure the all-events stream exists and has at least one message try { - // Always try to add an initialization message to the all-events stream redisTemplate.opsForStream() .add(allEventsStreamKey, mapOf("init" to "init")) logger.debug("Ensured all-events stream has messages: $allEventsStreamKey") } catch (e: Exception) { - // Ignore errors when adding to the stream (it might already have messages) logger.debug("All-events stream might already have messages: ${e.message}") } - // Create the consumer group for all-events stream createConsumerGroupIfNotExists(allEventsStreamKey) - // Get all stream keys val streamKeys = redisTemplate.keys("${properties.streamPrefix}*") - // Create consumer groups for all streams for (streamKey in streamKeys) { if (streamKey != allEventsStreamKey) { createConsumerGroupIfNotExists(streamKey) @@ -134,24 +120,19 @@ class RedisEventConsumer( */ private fun createConsumerGroupIfNotExists(streamKey: String) { try { - // Always ensure the stream has at least one message - // This is necessary because consumer groups cannot be created on empty streams try { redisTemplate.opsForStream() .add(streamKey, mapOf("init" to "init")) logger.debug("Ensured stream has messages: $streamKey") } catch (e: Exception) { - // Ignore errors when adding to the stream (it might already have messages) logger.debug("Stream $streamKey might already have messages: ${e.message}") } - // Create the consumer group - ignore all errors for now try { redisTemplate.opsForStream() .createGroup(streamKey, ReadOffset.latest(), properties.consumerGroup) logger.debug("Created consumer group ${properties.consumerGroup} for stream: $streamKey") } catch (e: Exception) { - // Ignore all consumer group creation errors for now logger.debug("Could not create consumer group ${properties.consumerGroup} for stream: $streamKey: ${e.message}") } } catch (e: Exception) { @@ -160,20 +141,16 @@ class RedisEventConsumer( } /** - * Periodically polls for new events from all streams. + * Periodic polls for new events from all streams. */ - @Scheduled(fixedDelayString = "\${redis.event-store.poll-interval:100}") + @Scheduled(fixedDelayString = $$"${redis.event-store.poll-interval:100}") fun pollEvents() { if (!running) { running = true } try { - // Poll the all events stream only - // Individual streams don't need to be polled since all events are also in the all-events stream pollStream(getAllEventsStreamKey()) - - // Claim pending messages that have been idle for too long claimPendingMessages() } catch (e: Exception) { logger.error("Error polling events: ${e.message}", e) @@ -187,7 +164,6 @@ class RedisEventConsumer( */ private fun pollStream(streamKey: String) { try { - // Read new messages from the stream val options = StreamReadOptions.empty() .count(properties.maxBatchSize.toLong()) .block(properties.pollTimeout) @@ -199,14 +175,12 @@ class RedisEventConsumer( StreamOffset.create(streamKey, ReadOffset.lastConsumed()) ) - // Process the records if (records != null) { for (record in records) { processRecord(record) } } } catch (e: Exception) { - // Ignore if the stream doesn't exist or the consumer group doesn't exist val message = e.message if (message == null || !message.contains("NOGROUP")) { logger.error("Error polling stream $streamKey: ${e.message}", e) @@ -219,15 +193,12 @@ class RedisEventConsumer( */ private fun claimPendingMessages() { try { - // Only process the all-events stream since that's where consumer groups exist val streamKey = getAllEventsStreamKey() - // Get pending messages summary val pendingSummary = redisTemplate.opsForStream() .pending(streamKey, properties.consumerGroup) if (pendingSummary != null && pendingSummary.totalPendingMessages > 0) { - // Get pending messages with details val pendingMessages = redisTemplate.opsForStream() .pending( streamKey, @@ -237,14 +208,11 @@ class RedisEventConsumer( ) if (pendingMessages.size() > 0) { - // Extract message IDs and convert to array val messageIdsList = pendingMessages.map { it.id }.toList() if (messageIdsList.isNotEmpty()) { - // Convert to array for the spread operator val messageIds = messageIdsList.toTypedArray() - // Claim messages that have been idle for too long val records = redisTemplate.opsForStream() .claim( streamKey, @@ -254,7 +222,6 @@ class RedisEventConsumer( *messageIds ) - // Process the claimed records for (record in records) { processRecord(record) } @@ -275,10 +242,8 @@ class RedisEventConsumer( try { val data = record.value - // Skip init messages (they only contain "init" -> "init") if (data.size == 1 && data.containsKey("init") && data["init"] == "init") { logger.debug("Skipping init message") - // Still acknowledge the message to remove it from pending redisTemplate.opsForStream() .acknowledge(properties.consumerGroup, record) return @@ -287,7 +252,6 @@ class RedisEventConsumer( val event = serializer.deserialize(data) val eventType = serializer.getEventType(data) - // Call handlers for the specific event type eventTypeHandlers[eventType]?.forEach { handler -> try { handler(event) @@ -296,7 +260,6 @@ class RedisEventConsumer( } } - // Call handlers for all events allEventHandlers.forEach { handler -> try { handler(event) @@ -305,7 +268,6 @@ class RedisEventConsumer( } } - // Acknowledge the message redisTemplate.opsForStream() .acknowledge(properties.consumerGroup, record) @@ -315,9 +277,9 @@ class RedisEventConsumer( } /** - * Gets the Redis key for the all events stream. + * Gets the Redis key for the all-events stream. * - * @return The Redis key for the all events stream + * @return The Redis key for the all-events stream */ private fun getAllEventsStreamKey(): String { return "${properties.streamPrefix}${properties.allEventsStream}" diff --git a/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStore.kt b/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStore.kt index 6f423253..65836186 100644 --- a/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStore.kt +++ b/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStore.kt @@ -7,7 +7,9 @@ import at.mocode.infrastructure.eventstore.api.EventStore import at.mocode.infrastructure.eventstore.api.Subscription import com.benasher44.uuid.Uuid import org.slf4j.LoggerFactory +import org.springframework.dao.DataAccessException import org.springframework.data.domain.Range +import org.springframework.data.redis.core.SessionCallback import org.springframework.data.redis.core.StringRedisTemplate import java.util.concurrent.ConcurrentHashMap @@ -43,12 +45,11 @@ class RedisEventStore( return currentVersion } - // Deprecated, use the list-based version for transactional safety. override fun appendToStream(event: DomainEvent, streamId: Uuid, expectedVersion: Long): Long { val currentVersion = getStreamVersion(streamId) if (currentVersion != expectedVersion) { - streamVersionCache.remove(streamId) // Invalidate cache on conflict - val actualVersion = getStreamVersion(streamId) // Re-fetch from Redis + streamVersionCache.remove(streamId) + val actualVersion = getStreamVersion(streamId) if (actualVersion != expectedVersion) { throw ConcurrencyException("Concurrency conflict: expected version $expectedVersion but got $actualVersion") } @@ -64,14 +65,27 @@ class RedisEventStore( val allEventsStreamKey = getAllEventsStreamKey() val eventData = serializer.serialize(event) - // KORREKTUR: Schreibe das Event in BEIDE Streams (aggregatspezifisch und global) - // Dies sollte idealerweise in einer Redis-Transaktion (MULTI/EXEC) geschehen. - // Für Einfachheit hier als separate Aufrufe. - redisTemplate.opsForStream().add(streamKey, eventData) - redisTemplate.opsForStream().add(allEventsStreamKey, eventData) + try { + redisTemplate.execute(object : SessionCallback> { + @Throws(DataAccessException::class) + override fun execute(operations: org.springframework.data.redis.core.RedisOperations): List { + val streamOps = (operations as StringRedisTemplate).opsForStream() - streamVersionCache[streamId] = newVersion - return newVersion + operations.multi() + streamOps.add(streamKey, eventData) + streamOps.add(allEventsStreamKey, eventData) + + return operations.exec() + } + }) + + streamVersionCache[streamId] = newVersion + return newVersion + } catch (e: Exception) { + logger.error("Failed to append event transactionally for stream key: {}", streamKey, e) + streamVersionCache.remove(streamId) + throw e + } } override fun readFromStream(streamId: Uuid, fromVersion: Long, toVersion: Long?): List { @@ -94,8 +108,6 @@ class RedisEventStore( override fun getStreamVersion(streamId: Uuid): Long { streamVersionCache[streamId]?.let { return it } val streamKey = getStreamKey(streamId) - // .size() ist die Anzahl der Einträge, was der Version entspricht, wenn bei 1 begonnen wird. - // Ein leerer Stream hat size=0, was Version 0 bedeutet. val size = redisTemplate.opsForStream().size(streamKey) ?: 0L streamVersionCache[streamId] = size return size @@ -109,7 +121,6 @@ class RedisEventStore( return "${properties.streamPrefix}${properties.allEventsStream}" } - // Stubs override fun readAllEvents(fromPosition: Long, maxCount: Int?): List { TODO("Not yet implemented") } diff --git a/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStoreTest.kt b/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStoreTest.kt index 3d3eaa27..cff9e044 100644 --- a/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStoreTest.kt +++ b/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStoreTest.kt @@ -5,8 +5,8 @@ import at.mocode.infrastructure.eventstore.api.ConcurrencyException import at.mocode.infrastructure.eventstore.api.EventSerializer import com.benasher44.uuid.Uuid import com.benasher44.uuid.uuid4 -import kotlin.time.Clock -import kotlin.time.Instant +import kotlinx.serialization.Serializable +import kotlinx.serialization.Transient import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -68,12 +68,11 @@ class RedisEventStoreTest { @Test fun `append and read events should work correctly for new stream`() { val aggregateId = uuid4() - val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity") - val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity") + val event1 = TestCreatedEvent(aggregateId, 1L, "Test Entity") + val event2 = TestUpdatedEvent(aggregateId, 2L, "Updated Test Entity") eventStore.appendToStream(listOf(event1, event2), aggregateId, 0) - // KORRIGIERT: Aufruf an die korrekte Methode angepasst val events = eventStore.readFromStream(aggregateId) assertEquals(2, events.size) @@ -89,35 +88,26 @@ class RedisEventStoreTest { @Test fun `appending with wrong expected version should throw ConcurrencyException`() { val aggregateId = uuid4() - val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity") + val event1 = TestCreatedEvent(aggregateId, 1L, "Test Entity") eventStore.appendToStream(listOf(event1), aggregateId, 0) // Stream is now at version 1 - val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity") + val event2 = TestUpdatedEvent(aggregateId, 2L, "Updated Test Entity") assertThrows { - // Trying to append with expected version 0, but the current is 1 eventStore.appendToStream(listOf(event2), aggregateId, 0) } } + @Serializable data class TestCreatedEvent( - override val aggregateId: Uuid, - override val version: Long, - val name: String, - override val eventType: String = "TestCreated", - override val eventId: Uuid = uuid4(), - override val timestamp: Instant = Clock.System.now(), - override val correlationId: Uuid? = null, - override val causationId: Uuid? = null - ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) + @Transient override val aggregateId: Uuid = uuid4(), + @Transient override val version: Long = 0, + val name: String + ) : BaseDomainEvent(aggregateId, "TestCreated", version) + @Serializable data class TestUpdatedEvent( - override val aggregateId: Uuid, - override val version: Long, - val name: String, - override val eventType: String = "TestUpdated", - override val eventId: Uuid = uuid4(), - override val timestamp: Instant = Clock.System.now(), - override val correlationId: Uuid? = null, - override val causationId: Uuid? = null - ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) + @Transient override val aggregateId: Uuid = uuid4(), + @Transient override val version: Long = 0, + val name: String + ) : BaseDomainEvent(aggregateId, "TestUpdated", version) } diff --git a/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisIntegrationTest.kt b/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisIntegrationTest.kt index 1377b06e..99fe07e0 100644 --- a/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisIntegrationTest.kt +++ b/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisIntegrationTest.kt @@ -6,11 +6,10 @@ import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventStore import com.benasher44.uuid.Uuid import com.benasher44.uuid.uuid4 -import kotlin.time.Clock -import kotlin.time.Instant +import kotlinx.serialization.Serializable +import kotlinx.serialization.Transient import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.springframework.data.redis.connection.RedisStandaloneConfiguration @@ -20,15 +19,7 @@ import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers import org.testcontainers.utility.DockerImageName -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -/** - * Integration tests for Redis Event Store and Event Consumer. - * - * These tests verify the interaction between the Redis Event Store, Event Consumer, and Event Serializer - * in a more realistic scenario. - */ @Testcontainers class RedisIntegrationTest { @@ -48,34 +39,27 @@ class RedisIntegrationTest { fun setUp() { val redisPort = redisContainer.getMappedPort(6379) val redisHost = redisContainer.host - val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort) val connectionFactory = LettuceConnectionFactory(redisConfig) connectionFactory.afterPropertiesSet() - redisTemplate = StringRedisTemplate(connectionFactory) - - // KORREKTUR: Parameter in der korrekten Reihenfolge und mit korrekten Typen übergeben. serializer = JacksonEventSerializer().apply { registerEventType(TestCreatedEvent::class.java, "TestCreated") registerEventType(TestUpdatedEvent::class.java, "TestUpdated") } - properties = RedisEventStoreProperties( streamPrefix = "test-stream:", allEventsStream = "all-events", consumerGroup = "test-group", consumerName = "test-consumer" ) - eventStore = RedisEventStore(redisTemplate, serializer, properties) eventConsumer = RedisEventConsumer(redisTemplate, serializer, properties) - cleanupRedis() - // WICHTIG: Consumer starten, damit er auf Events lauschen kann. eventConsumer.init() } + @AfterEach fun tearDown() { eventConsumer.shutdown() @@ -88,47 +72,22 @@ class RedisIntegrationTest { if (!keys.isNullOrEmpty()) { redisTemplate.delete(keys) } - // Sicherstellen, dass auch der allEventsStream-Key gelöscht wird, falls er nicht im Muster enthalten ist. redisTemplate.delete(allEventsStreamKey) } @Test - fun `test event publishing and consuming with consumer groups`() { + fun `event publishing and consuming should be fast and reliable`() { val aggregateId = uuid4() - val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity") - val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity") + val event1 = TestCreatedEvent(aggregateId, 1L, "Test Entity") + val event2 = TestUpdatedEvent(aggregateId, 2L, "Updated Test Entity") - val latch = CountDownLatch(2) val receivedEvents = mutableListOf() - - eventConsumer.registerEventHandler("TestCreated") { event -> - receivedEvents.add(event) - latch.countDown() - } - eventConsumer.registerEventHandler("TestUpdated") { event -> - receivedEvents.add(event) - latch.countDown() - } - - // Start polling in a separate thread to not block the test execution - val pollingThread = Thread { - // Poll multiple times to ensure messages are picked up - for (i in 1..20) { - if (latch.count > 0) { - eventConsumer.pollEvents() - Thread.sleep(100) - } - } - } - pollingThread.start() - + eventConsumer.registerEventHandler("TestCreated") { receivedEvents.add(it) } + eventConsumer.registerEventHandler("TestUpdated") { receivedEvents.add(it) } eventStore.appendToStream(listOf(event1, event2), aggregateId, 0) - assertTrue( - latch.await(10, TimeUnit.SECONDS), - "Timed out waiting for events. Received ${receivedEvents.size} of 2 events." - ) + eventConsumer.pollEvents() assertEquals(2, receivedEvents.size) @@ -139,30 +98,19 @@ class RedisIntegrationTest { val receivedEvent2 = receivedEvents.find { it.version == 2L } as TestUpdatedEvent assertEquals(aggregateId, receivedEvent2.aggregateId) assertEquals("Updated Test Entity", receivedEvent2.name) - - pollingThread.interrupt() } - // Hilfsklassen für Tests, die von BaseDomainEvent erben + @Serializable data class TestCreatedEvent( - override val aggregateId: Uuid, - override val version: Long, - val name: String, - override val eventType: String = "TestCreated", - override val eventId: Uuid = uuid4(), - override val timestamp: Instant = Clock.System.now(), - override val correlationId: Uuid? = null, - override val causationId: Uuid? = null - ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) + @Transient override val aggregateId: Uuid = uuid4(), + @Transient override val version: Long = 0, + val name: String + ) : BaseDomainEvent(aggregateId, "TestCreated", version) + @Serializable data class TestUpdatedEvent( - override val aggregateId: Uuid, - override val version: Long, - val name: String, - override val eventType: String = "TestUpdated", - override val eventId: Uuid = uuid4(), - override val timestamp: Instant = Clock.System.now(), - override val correlationId: Uuid? = null, - override val causationId: Uuid? = null - ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) + @Transient override val aggregateId: Uuid = uuid4(), + @Transient override val version: Long = 0, + val name: String + ) : BaseDomainEvent(aggregateId, "TestUpdated", version) } diff --git a/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt b/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt index 3ed45786..23fe4a09 100644 --- a/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt +++ b/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt @@ -16,7 +16,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer @Configuration class KafkaConfig { - @Value("\${spring.kafka.bootstrap-servers:localhost:9092}") + @Value($$"${spring.kafka.bootstrap-servers:localhost:9092}") private lateinit var bootstrapServers: String @Bean