From e72e4bddaaae074279e08a7b458036634ef4460a Mon Sep 17 00:00:00 2001 From: StefanMoCoAt Date: Sat, 9 Aug 2025 18:03:04 +0200 Subject: [PATCH] refactor(infra-event-store): Improve consistency and test reliability This commit introduces a comprehensive refactoring of the event-store module to guarantee data consistency and improve the quality and reliability of its test suite. Data Consistency Implemented Redis transactions (MULTI/EXEC) for the appendToStream operation in RedisEventStore. This ensures that writing an event to the aggregate-specific stream and the global "all-events" stream is an atomic operation, preventing data inconsistencies in case of partial failures. Improved error handling by invalidating the local stream version cache on transactional failures. Testing Enhancements Refactored Consumer Tests: Replaced the asynchronous, thread-based consumer test in RedisIntegrationTest with a synchronous, deterministic approach. The test now manually calls pollEvents() to verify event consumption, making it faster and 100% reliable by removing Thread.sleep and CountDownLatch. Simplified Test Events: Reduced boilerplate code in test event data classes (TestCreatedEvent, TestUpdatedEvent) in both RedisEventStoreTest and RedisIntegrationTest by using the @Transient annotation on overridden properties from BaseDomainEvent. Fixed Compilation Errors: Resolved various compilation errors in the test suite that arose from refactoring and incorrect mock definitions. --- .../event-store/README-INFRA-EVENT-STORE.md | 90 ++++++++----------- .../eventstore/api/EventSerializer.kt | 8 +- .../eventstore/redis/RedisEventConsumer.kt | 46 +--------- .../eventstore/redis/RedisEventStore.kt | 37 +++++--- .../eventstore/redis/RedisEventStoreTest.kt | 42 ++++----- .../eventstore/redis/RedisIntegrationTest.kt | 90 ++++--------------- .../messaging/config/KafkaConfig.kt | 2 +- 7 files changed, 105 insertions(+), 210 deletions(-) diff --git a/infrastructure/event-store/README-INFRA-EVENT-STORE.md b/infrastructure/event-store/README-INFRA-EVENT-STORE.md index f25c4cfa..6c74d3bd 100644 --- a/infrastructure/event-store/README-INFRA-EVENT-STORE.md +++ b/infrastructure/event-store/README-INFRA-EVENT-STORE.md @@ -2,69 +2,53 @@ ## Überblick -Das **Event-Store-Modul** ist eine kritische Komponente der Infrastruktur, die für die Persistenz und Veröffentlichung von Domänen-Events zuständig ist. Es bildet die technische Grundlage für **Event Sourcing** und eine allgemeine **event-getriebene Architektur**. Anstatt nur den aktuellen Zustand einer Entität zu speichern, speichert der Event Store die gesamte Kette von Ereignissen (Events), die zu diesem Zustand geführt haben. +Das **Event-Store-Modul** ist eine kritische Komponente der Infrastruktur, die für die Persistenz und Veröffentlichung von Domänen-Events zuständig ist. Es bildet die technische Grundlage für **Event Sourcing** und eine allgemeine **ereignisgesteuerte Architektur**. Anstatt nur den aktuellen Zustand einer Entität zu speichern, speichert der Event Store die gesamte Kette von Ereignissen, die zu diesem Zustand geführt haben. ## Architektur: Port-Adapter-Muster -Wie schon das Cache-Modul, folgt auch der Event Store streng dem **Port-Adapter-Muster**, um eine maximale Entkopplung von der konkreten Speichertechnologie zu erreichen. +Das Modul folgt streng dem **Port-Adapter-Muster**, um eine maximale Entkopplung von der konkreten Speichertechnologie zu erreichen. +* **`:infrastructure:event-store:event-store-api`**: Definiert den abstrakten "Vertrag" (`EventStore`-Interface), gegen den die Fach-Services programmieren. +* **`:infrastructure:event-store:redis-event-store`**: Die konkrete Implementierung des Vertrags, die **Redis Streams** als hoch-performantes, persistentes Log verwendet. -infrastructure/event-store/ -├── event-store-api/ # Der "Port": Definiert die Event-Store-Schnittstelle -└── redis-event-store/ # Der "Adapter": Implementiert die Schnittstelle mit Redis Streams +## Schlüsselfunktionen +* **Garantierte Konsistenz:** Schreibvorgänge in den aggregat spezifischen Stream und den globalen "all-events"-Stream werden innerhalb einer **atomaren Redis-Transaktion (`MULTI`/`EXEC`)** ausgeführt. Dies stellt sicher, dass der Event-Store niemals in einen inkonsistenten Zustand gerät. +* **Resiliente Event-Verarbeitung:** Der `RedisEventConsumer` nutzt **Redis Consumer Groups**, um eine skalierbare und ausfallsichere Verarbeitung von Events zu ermöglichen. Er enthält eine robuste Logik zum "Claimen" von Nachrichten, die von ausgefallenen Consumern nicht bestätigt wurden, sodass keine Events verloren gehen. +* **Optimistische Nebenhäufigkeitskontrolle:** Verhindert Race Conditions, indem beim Speichern von Events eine `expectedVersion` überprüft wird. Bei Konflikten wird eine `ConcurrencyException` geworfen. +* **Intelligente Serialisierung:** Der `JacksonEventSerializer` speichert Event-Metadaten und die eigentliche Nutzlast getrennt in der Redis-Stream-Nachricht, was eine effiziente Analyse von Streams ermöglicht. -### `event-store-api` +## Verwendung -Dieses Modul ist der **abstrakte "Port"** der Architektur. Es definiert den Vertrag, wie der Rest der Anwendung mit dem Event Store interagiert. +Ein Anwendung-Service bindet `:infrastructure:event-store:redis-event-store` ein und lässt sich das `EventStore`-Interface per Dependency Injection geben. -* **Zweck:** Definiert Interfaces wie `EventStore` (zum Speichern und Laden von Event-Streams) und `EventPublisher` (zum Veröffentlichen von Events an interessierte Listener). Es ist eng mit den `DomainEvent`-Definitionen aus dem `:core:core-domain`-Modul verknüpft. -* **Vorteil:** Die Fach-Services (z.B. `members-application`) sind vollständig von der Implementierung des Event Stores entkoppelt. Sie wissen nicht, ob die Events in Redis, Kafka oder einer relationalen Datenbank gespeichert werden. +```kotlin +@Service +class MemberApplicationService( + private val eventStore: EventStore // Nur das Interface wird verwendet! +) { + fun registerNewMember(command: RegisterMemberCommand) { + // 1. Geschäftslogik ausführen und Event erzeugen + val memberRegisteredEvent = MemberRegisteredEvent( + aggregateId = uuid4(), + version = 1L, + name = command.name + ) -### `redis-event-store` - -Dieses Modul ist der **konkrete "Adapter"**, der die in `event-store-api` definierten Schnittstellen implementiert. - -* **Zweck:** Stellt eine Implementierung des `EventStore` bereit, die **Redis Streams** als zugrunde liegenden Datenspeicher verwendet. Redis Streams sind eine leistungsstarke Datenstruktur, die sich ideal für die Implementierung eines append-only Logs eignet, wie es für einen Event Store benötigt wird. -* **Technologie:** Nutzt Spring Data Redis und den Lettuce-Client für die performante Kommunikation mit Redis. Die Domänen-Events werden vor der Speicherung mittels Jackson in ein JSON-Format serialisiert. -* **Vorteil:** Kapselt die gesamte Redis-spezifische Logik. Ein zukünftiger Wechsel zu einem anderen Event-Store-System (z.B. Apache Kafka) würde nur den Austausch dieses einen Moduls erfordern. - -## Verwendung in anderen Modulen - -Ein Anwendungs-Service, der Event Sourcing verwendet, interagiert wie folgt mit dem Modul: - -1. **Abhängigkeit deklarieren:** Das Service-Modul (z.B. `members-application`) fügt eine `implementation`-Abhängigkeit zu `:infrastructure:event-store:redis-event-store` in seiner `build.gradle.kts` hinzu. - -2. **Interface injizieren:** Im Service-Code wird nur das `EventStore`-Interface aus der `event-store-api` injiziert. - - ```kotlin - // In einem Use Case oder Application Service - @Service - class MemberApplicationService( - private val eventStore: EventStore, // Nur das Interface wird verwendet! - private val eventPublisher: EventPublisher - ) { - fun registerNewMember(command: RegisterMemberCommand): Member { - // 1. Geschäftslogik ausführen und ein oder mehrere Events erzeugen - val memberRegisteredEvent = MemberRegisteredEvent( - memberId = UUID.randomUUID(), - name = command.name, - // ... - ) - - // 2. Das Event im Event Store speichern - eventStore.save(memberRegisteredEvent) - - // 3. Das Event veröffentlichen, damit andere Teile des Systems - // (z.B. ein E-Mail-Service) darauf reagieren können. - eventPublisher.publish(memberRegisteredEvent) - - // ... - } + // 2. Event im Event Store speichern (mit Concurrency Check) + // hier wird erwartet, dass der Stream neu ist (Version 0) + eventStore.appendToStream(memberRegisteredEvent, memberRegisteredEvent.aggregateId, 0) } - ``` +} +``` -Diese Architektur ermöglicht eine hochgradig entkoppelte, skalierbare und resiliente Systemlandschaft, die auf asynchroner Kommunikation basiert. +## Testing-Strategie +Die Qualität des Moduls wird durch eine robuste Teststrategie sichergestellt: ---- -**Letzte Aktualisierung**: 31. Juli 2025 +* *Integrationstests mit Testcontainer: Die Kernfunktionalität wird gegen eine echte Redis-Datenbank getestet, die zur Laufzeit in einem Docker-Container gestartet wird.* + +* *Zuverlässige Consumer-Tests: Die asynchrone Logik des Event-Consumers wird in den Tests synchron und deterministisch überprüft, indem der pollEvents()-Zyklus manuell angestoßen wird. Dies vermeidet unzuverlässige Tests, die auf Thread.sleep basieren.* + +* *Saubere Test-Daten: Test-Event-Klassen werden durch die Verwendung der @Transient-Annotation sauber und frei von Boilerplate-Code gehalten.* + +**Letzte Aktualisierung**: 9. August 2025 diff --git a/infrastructure/event-store/event-store-api/src/main/kotlin/at/mocode/infrastructure/eventstore/api/EventSerializer.kt b/infrastructure/event-store/event-store-api/src/main/kotlin/at/mocode/infrastructure/eventstore/api/EventSerializer.kt index 1f257c11..8653238c 100644 --- a/infrastructure/event-store/event-store-api/src/main/kotlin/at/mocode/infrastructure/eventstore/api/EventSerializer.kt +++ b/infrastructure/event-store/event-store-api/src/main/kotlin/at/mocode/infrastructure/eventstore/api/EventSerializer.kt @@ -17,15 +17,15 @@ interface EventSerializer { fun serialize(event: DomainEvent): Map /** - * Deserializes a map of strings to strings to a domain event. + * Deserializes a map of strings to a domain event. * - * @param data The map of strings to strings to deserialize + * @param data The map of strings to deserialize * @return The deserialized domain event */ fun deserialize(data: Map): DomainEvent /** - * Gets the type of a domain event. + * Gets the type of domain event. * This is used to determine the type of event when deserializing. * * @param event The event to get the type of @@ -34,7 +34,7 @@ interface EventSerializer { fun getEventType(event: DomainEvent): String /** - * Gets the type of a domain event from a serialized map. + * Gets the type of domain event from a serialized map. * * @param data The serialized event data * @return The type of the event as a string diff --git a/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventConsumer.kt b/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventConsumer.kt index ab1051a8..911b57d9 100644 --- a/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventConsumer.kt +++ b/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventConsumer.kt @@ -21,14 +21,8 @@ class RedisEventConsumer( private val properties: RedisEventStoreProperties ) { private val logger = LoggerFactory.getLogger(RedisEventConsumer::class.java) - - // Event handlers registered for specific event types private val eventTypeHandlers = ConcurrentHashMap Unit>>() - - // Event handlers registered for all events private val allEventHandlers = CopyOnWriteArrayList<(DomainEvent) -> Unit>() - - // Flag to indicate if the consumer is running private var running = false /** @@ -96,27 +90,19 @@ class RedisEventConsumer( */ private fun createConsumerGroupsIfNotExist() { try { - // Create consumer group for the all events stream val allEventsStreamKey = getAllEventsStreamKey() - - // Ensure the all-events stream exists and has at least one message try { - // Always try to add an initialization message to the all-events stream redisTemplate.opsForStream() .add(allEventsStreamKey, mapOf("init" to "init")) logger.debug("Ensured all-events stream has messages: $allEventsStreamKey") } catch (e: Exception) { - // Ignore errors when adding to the stream (it might already have messages) logger.debug("All-events stream might already have messages: ${e.message}") } - // Create the consumer group for all-events stream createConsumerGroupIfNotExists(allEventsStreamKey) - // Get all stream keys val streamKeys = redisTemplate.keys("${properties.streamPrefix}*") - // Create consumer groups for all streams for (streamKey in streamKeys) { if (streamKey != allEventsStreamKey) { createConsumerGroupIfNotExists(streamKey) @@ -134,24 +120,19 @@ class RedisEventConsumer( */ private fun createConsumerGroupIfNotExists(streamKey: String) { try { - // Always ensure the stream has at least one message - // This is necessary because consumer groups cannot be created on empty streams try { redisTemplate.opsForStream() .add(streamKey, mapOf("init" to "init")) logger.debug("Ensured stream has messages: $streamKey") } catch (e: Exception) { - // Ignore errors when adding to the stream (it might already have messages) logger.debug("Stream $streamKey might already have messages: ${e.message}") } - // Create the consumer group - ignore all errors for now try { redisTemplate.opsForStream() .createGroup(streamKey, ReadOffset.latest(), properties.consumerGroup) logger.debug("Created consumer group ${properties.consumerGroup} for stream: $streamKey") } catch (e: Exception) { - // Ignore all consumer group creation errors for now logger.debug("Could not create consumer group ${properties.consumerGroup} for stream: $streamKey: ${e.message}") } } catch (e: Exception) { @@ -160,20 +141,16 @@ class RedisEventConsumer( } /** - * Periodically polls for new events from all streams. + * Periodic polls for new events from all streams. */ - @Scheduled(fixedDelayString = "\${redis.event-store.poll-interval:100}") + @Scheduled(fixedDelayString = $$"${redis.event-store.poll-interval:100}") fun pollEvents() { if (!running) { running = true } try { - // Poll the all events stream only - // Individual streams don't need to be polled since all events are also in the all-events stream pollStream(getAllEventsStreamKey()) - - // Claim pending messages that have been idle for too long claimPendingMessages() } catch (e: Exception) { logger.error("Error polling events: ${e.message}", e) @@ -187,7 +164,6 @@ class RedisEventConsumer( */ private fun pollStream(streamKey: String) { try { - // Read new messages from the stream val options = StreamReadOptions.empty() .count(properties.maxBatchSize.toLong()) .block(properties.pollTimeout) @@ -199,14 +175,12 @@ class RedisEventConsumer( StreamOffset.create(streamKey, ReadOffset.lastConsumed()) ) - // Process the records if (records != null) { for (record in records) { processRecord(record) } } } catch (e: Exception) { - // Ignore if the stream doesn't exist or the consumer group doesn't exist val message = e.message if (message == null || !message.contains("NOGROUP")) { logger.error("Error polling stream $streamKey: ${e.message}", e) @@ -219,15 +193,12 @@ class RedisEventConsumer( */ private fun claimPendingMessages() { try { - // Only process the all-events stream since that's where consumer groups exist val streamKey = getAllEventsStreamKey() - // Get pending messages summary val pendingSummary = redisTemplate.opsForStream() .pending(streamKey, properties.consumerGroup) if (pendingSummary != null && pendingSummary.totalPendingMessages > 0) { - // Get pending messages with details val pendingMessages = redisTemplate.opsForStream() .pending( streamKey, @@ -237,14 +208,11 @@ class RedisEventConsumer( ) if (pendingMessages.size() > 0) { - // Extract message IDs and convert to array val messageIdsList = pendingMessages.map { it.id }.toList() if (messageIdsList.isNotEmpty()) { - // Convert to array for the spread operator val messageIds = messageIdsList.toTypedArray() - // Claim messages that have been idle for too long val records = redisTemplate.opsForStream() .claim( streamKey, @@ -254,7 +222,6 @@ class RedisEventConsumer( *messageIds ) - // Process the claimed records for (record in records) { processRecord(record) } @@ -275,10 +242,8 @@ class RedisEventConsumer( try { val data = record.value - // Skip init messages (they only contain "init" -> "init") if (data.size == 1 && data.containsKey("init") && data["init"] == "init") { logger.debug("Skipping init message") - // Still acknowledge the message to remove it from pending redisTemplate.opsForStream() .acknowledge(properties.consumerGroup, record) return @@ -287,7 +252,6 @@ class RedisEventConsumer( val event = serializer.deserialize(data) val eventType = serializer.getEventType(data) - // Call handlers for the specific event type eventTypeHandlers[eventType]?.forEach { handler -> try { handler(event) @@ -296,7 +260,6 @@ class RedisEventConsumer( } } - // Call handlers for all events allEventHandlers.forEach { handler -> try { handler(event) @@ -305,7 +268,6 @@ class RedisEventConsumer( } } - // Acknowledge the message redisTemplate.opsForStream() .acknowledge(properties.consumerGroup, record) @@ -315,9 +277,9 @@ class RedisEventConsumer( } /** - * Gets the Redis key for the all events stream. + * Gets the Redis key for the all-events stream. * - * @return The Redis key for the all events stream + * @return The Redis key for the all-events stream */ private fun getAllEventsStreamKey(): String { return "${properties.streamPrefix}${properties.allEventsStream}" diff --git a/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStore.kt b/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStore.kt index 6f423253..65836186 100644 --- a/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStore.kt +++ b/infrastructure/event-store/redis-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStore.kt @@ -7,7 +7,9 @@ import at.mocode.infrastructure.eventstore.api.EventStore import at.mocode.infrastructure.eventstore.api.Subscription import com.benasher44.uuid.Uuid import org.slf4j.LoggerFactory +import org.springframework.dao.DataAccessException import org.springframework.data.domain.Range +import org.springframework.data.redis.core.SessionCallback import org.springframework.data.redis.core.StringRedisTemplate import java.util.concurrent.ConcurrentHashMap @@ -43,12 +45,11 @@ class RedisEventStore( return currentVersion } - // Deprecated, use the list-based version for transactional safety. override fun appendToStream(event: DomainEvent, streamId: Uuid, expectedVersion: Long): Long { val currentVersion = getStreamVersion(streamId) if (currentVersion != expectedVersion) { - streamVersionCache.remove(streamId) // Invalidate cache on conflict - val actualVersion = getStreamVersion(streamId) // Re-fetch from Redis + streamVersionCache.remove(streamId) + val actualVersion = getStreamVersion(streamId) if (actualVersion != expectedVersion) { throw ConcurrencyException("Concurrency conflict: expected version $expectedVersion but got $actualVersion") } @@ -64,14 +65,27 @@ class RedisEventStore( val allEventsStreamKey = getAllEventsStreamKey() val eventData = serializer.serialize(event) - // KORREKTUR: Schreibe das Event in BEIDE Streams (aggregatspezifisch und global) - // Dies sollte idealerweise in einer Redis-Transaktion (MULTI/EXEC) geschehen. - // Für Einfachheit hier als separate Aufrufe. - redisTemplate.opsForStream().add(streamKey, eventData) - redisTemplate.opsForStream().add(allEventsStreamKey, eventData) + try { + redisTemplate.execute(object : SessionCallback> { + @Throws(DataAccessException::class) + override fun execute(operations: org.springframework.data.redis.core.RedisOperations): List { + val streamOps = (operations as StringRedisTemplate).opsForStream() - streamVersionCache[streamId] = newVersion - return newVersion + operations.multi() + streamOps.add(streamKey, eventData) + streamOps.add(allEventsStreamKey, eventData) + + return operations.exec() + } + }) + + streamVersionCache[streamId] = newVersion + return newVersion + } catch (e: Exception) { + logger.error("Failed to append event transactionally for stream key: {}", streamKey, e) + streamVersionCache.remove(streamId) + throw e + } } override fun readFromStream(streamId: Uuid, fromVersion: Long, toVersion: Long?): List { @@ -94,8 +108,6 @@ class RedisEventStore( override fun getStreamVersion(streamId: Uuid): Long { streamVersionCache[streamId]?.let { return it } val streamKey = getStreamKey(streamId) - // .size() ist die Anzahl der Einträge, was der Version entspricht, wenn bei 1 begonnen wird. - // Ein leerer Stream hat size=0, was Version 0 bedeutet. val size = redisTemplate.opsForStream().size(streamKey) ?: 0L streamVersionCache[streamId] = size return size @@ -109,7 +121,6 @@ class RedisEventStore( return "${properties.streamPrefix}${properties.allEventsStream}" } - // Stubs override fun readAllEvents(fromPosition: Long, maxCount: Int?): List { TODO("Not yet implemented") } diff --git a/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStoreTest.kt b/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStoreTest.kt index 3d3eaa27..cff9e044 100644 --- a/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStoreTest.kt +++ b/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisEventStoreTest.kt @@ -5,8 +5,8 @@ import at.mocode.infrastructure.eventstore.api.ConcurrencyException import at.mocode.infrastructure.eventstore.api.EventSerializer import com.benasher44.uuid.Uuid import com.benasher44.uuid.uuid4 -import kotlin.time.Clock -import kotlin.time.Instant +import kotlinx.serialization.Serializable +import kotlinx.serialization.Transient import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -68,12 +68,11 @@ class RedisEventStoreTest { @Test fun `append and read events should work correctly for new stream`() { val aggregateId = uuid4() - val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity") - val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity") + val event1 = TestCreatedEvent(aggregateId, 1L, "Test Entity") + val event2 = TestUpdatedEvent(aggregateId, 2L, "Updated Test Entity") eventStore.appendToStream(listOf(event1, event2), aggregateId, 0) - // KORRIGIERT: Aufruf an die korrekte Methode angepasst val events = eventStore.readFromStream(aggregateId) assertEquals(2, events.size) @@ -89,35 +88,26 @@ class RedisEventStoreTest { @Test fun `appending with wrong expected version should throw ConcurrencyException`() { val aggregateId = uuid4() - val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity") + val event1 = TestCreatedEvent(aggregateId, 1L, "Test Entity") eventStore.appendToStream(listOf(event1), aggregateId, 0) // Stream is now at version 1 - val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity") + val event2 = TestUpdatedEvent(aggregateId, 2L, "Updated Test Entity") assertThrows { - // Trying to append with expected version 0, but the current is 1 eventStore.appendToStream(listOf(event2), aggregateId, 0) } } + @Serializable data class TestCreatedEvent( - override val aggregateId: Uuid, - override val version: Long, - val name: String, - override val eventType: String = "TestCreated", - override val eventId: Uuid = uuid4(), - override val timestamp: Instant = Clock.System.now(), - override val correlationId: Uuid? = null, - override val causationId: Uuid? = null - ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) + @Transient override val aggregateId: Uuid = uuid4(), + @Transient override val version: Long = 0, + val name: String + ) : BaseDomainEvent(aggregateId, "TestCreated", version) + @Serializable data class TestUpdatedEvent( - override val aggregateId: Uuid, - override val version: Long, - val name: String, - override val eventType: String = "TestUpdated", - override val eventId: Uuid = uuid4(), - override val timestamp: Instant = Clock.System.now(), - override val correlationId: Uuid? = null, - override val causationId: Uuid? = null - ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) + @Transient override val aggregateId: Uuid = uuid4(), + @Transient override val version: Long = 0, + val name: String + ) : BaseDomainEvent(aggregateId, "TestUpdated", version) } diff --git a/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisIntegrationTest.kt b/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisIntegrationTest.kt index 1377b06e..99fe07e0 100644 --- a/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisIntegrationTest.kt +++ b/infrastructure/event-store/redis-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/redis/RedisIntegrationTest.kt @@ -6,11 +6,10 @@ import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventStore import com.benasher44.uuid.Uuid import com.benasher44.uuid.uuid4 -import kotlin.time.Clock -import kotlin.time.Instant +import kotlinx.serialization.Serializable +import kotlinx.serialization.Transient import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.springframework.data.redis.connection.RedisStandaloneConfiguration @@ -20,15 +19,7 @@ import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers import org.testcontainers.utility.DockerImageName -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -/** - * Integration tests for Redis Event Store and Event Consumer. - * - * These tests verify the interaction between the Redis Event Store, Event Consumer, and Event Serializer - * in a more realistic scenario. - */ @Testcontainers class RedisIntegrationTest { @@ -48,34 +39,27 @@ class RedisIntegrationTest { fun setUp() { val redisPort = redisContainer.getMappedPort(6379) val redisHost = redisContainer.host - val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort) val connectionFactory = LettuceConnectionFactory(redisConfig) connectionFactory.afterPropertiesSet() - redisTemplate = StringRedisTemplate(connectionFactory) - - // KORREKTUR: Parameter in der korrekten Reihenfolge und mit korrekten Typen übergeben. serializer = JacksonEventSerializer().apply { registerEventType(TestCreatedEvent::class.java, "TestCreated") registerEventType(TestUpdatedEvent::class.java, "TestUpdated") } - properties = RedisEventStoreProperties( streamPrefix = "test-stream:", allEventsStream = "all-events", consumerGroup = "test-group", consumerName = "test-consumer" ) - eventStore = RedisEventStore(redisTemplate, serializer, properties) eventConsumer = RedisEventConsumer(redisTemplate, serializer, properties) - cleanupRedis() - // WICHTIG: Consumer starten, damit er auf Events lauschen kann. eventConsumer.init() } + @AfterEach fun tearDown() { eventConsumer.shutdown() @@ -88,47 +72,22 @@ class RedisIntegrationTest { if (!keys.isNullOrEmpty()) { redisTemplate.delete(keys) } - // Sicherstellen, dass auch der allEventsStream-Key gelöscht wird, falls er nicht im Muster enthalten ist. redisTemplate.delete(allEventsStreamKey) } @Test - fun `test event publishing and consuming with consumer groups`() { + fun `event publishing and consuming should be fast and reliable`() { val aggregateId = uuid4() - val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity") - val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity") + val event1 = TestCreatedEvent(aggregateId, 1L, "Test Entity") + val event2 = TestUpdatedEvent(aggregateId, 2L, "Updated Test Entity") - val latch = CountDownLatch(2) val receivedEvents = mutableListOf() - - eventConsumer.registerEventHandler("TestCreated") { event -> - receivedEvents.add(event) - latch.countDown() - } - eventConsumer.registerEventHandler("TestUpdated") { event -> - receivedEvents.add(event) - latch.countDown() - } - - // Start polling in a separate thread to not block the test execution - val pollingThread = Thread { - // Poll multiple times to ensure messages are picked up - for (i in 1..20) { - if (latch.count > 0) { - eventConsumer.pollEvents() - Thread.sleep(100) - } - } - } - pollingThread.start() - + eventConsumer.registerEventHandler("TestCreated") { receivedEvents.add(it) } + eventConsumer.registerEventHandler("TestUpdated") { receivedEvents.add(it) } eventStore.appendToStream(listOf(event1, event2), aggregateId, 0) - assertTrue( - latch.await(10, TimeUnit.SECONDS), - "Timed out waiting for events. Received ${receivedEvents.size} of 2 events." - ) + eventConsumer.pollEvents() assertEquals(2, receivedEvents.size) @@ -139,30 +98,19 @@ class RedisIntegrationTest { val receivedEvent2 = receivedEvents.find { it.version == 2L } as TestUpdatedEvent assertEquals(aggregateId, receivedEvent2.aggregateId) assertEquals("Updated Test Entity", receivedEvent2.name) - - pollingThread.interrupt() } - // Hilfsklassen für Tests, die von BaseDomainEvent erben + @Serializable data class TestCreatedEvent( - override val aggregateId: Uuid, - override val version: Long, - val name: String, - override val eventType: String = "TestCreated", - override val eventId: Uuid = uuid4(), - override val timestamp: Instant = Clock.System.now(), - override val correlationId: Uuid? = null, - override val causationId: Uuid? = null - ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) + @Transient override val aggregateId: Uuid = uuid4(), + @Transient override val version: Long = 0, + val name: String + ) : BaseDomainEvent(aggregateId, "TestCreated", version) + @Serializable data class TestUpdatedEvent( - override val aggregateId: Uuid, - override val version: Long, - val name: String, - override val eventType: String = "TestUpdated", - override val eventId: Uuid = uuid4(), - override val timestamp: Instant = Clock.System.now(), - override val correlationId: Uuid? = null, - override val causationId: Uuid? = null - ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) + @Transient override val aggregateId: Uuid = uuid4(), + @Transient override val version: Long = 0, + val name: String + ) : BaseDomainEvent(aggregateId, "TestUpdated", version) } diff --git a/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt b/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt index 3ed45786..23fe4a09 100644 --- a/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt +++ b/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt @@ -16,7 +16,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer @Configuration class KafkaConfig { - @Value("\${spring.kafka.bootstrap-servers:localhost:9092}") + @Value($$"${spring.kafka.bootstrap-servers:localhost:9092}") private lateinit var bootstrapServers: String @Bean