diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ea874ac8..9de619cf 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -175,6 +175,8 @@ assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj" } testcontainers-core = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" } testcontainers-junit-jupiter = { module = "org.testcontainers:junit-jupiter", version.ref = "testcontainers" } testcontainers-postgresql = { module = "org.testcontainers:postgresql", version.ref = "testcontainers" } +testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "testcontainers" } +reactor-test = { module = "io.projectreactor:reactor-test" } # Version wird von der Spring BOM verwaltet room-common-jvm = { group = "androidx.room", name = "room-common-jvm", version.ref = "roomCommonJvm" } [bundles] diff --git a/infrastructure/messaging/README-INFRA-MESSAGING.md b/infrastructure/messaging/README-INFRA-MESSAGING.md index bceee1f7..512128bc 100644 --- a/infrastructure/messaging/README-INFRA-MESSAGING.md +++ b/infrastructure/messaging/README-INFRA-MESSAGING.md @@ -2,80 +2,84 @@ ## Überblick -Das **Messaging-Modul** stellt die Infrastruktur für die asynchrone Kommunikation zwischen den Microservices des Meldestelle-Systems bereit. Es nutzt **Apache Kafka** als hochperformanten, verteilten Message-Broker. Dieses Modul ist entscheidend für die Entkopplung von Services und die Implementierung von Mustern wie Publish/Subscribe, um eine skalierbare und resiliente Architektur zu ermöglichen. +Das **Messaging-Modul** stellt die Infrastruktur für die asynchrone, reaktive Kommunikation zwischen den Microservices bereit. Es nutzt **Apache Kafka** als hochperformanten, verteilten Message-Broker und ist entscheidend für die Entkopplung von Services und die Implementierung einer skalierbaren, ereignisgesteuerten Architektur. ## Architektur -Ähnlich wie andere Infrastruktur-Module ist auch dieses in zwei spezialisierte Komponenten aufgeteilt, um Konfiguration von der Client-Logik zu trennen: +Das Modul ist in zwei spezialisierte Komponenten aufgeteilt, um Konfiguration von der Client-Logik zu trennen: infrastructure/messaging/ ├── messaging-config/ # Stellt die zentrale Kafka-Konfiguration bereit -└── messaging-client/ # Stellt wiederverwendbare Producer- und Consumer-Clients bereit +└── messaging-client/ # Stellt wiederverwendbare, reaktive Clients bereit ### `messaging-config` -Dieses Modul ist die Basis für jede Kafka-Interaktion. Es ist dafür verantwortlich, die gesamte **Konfiguration** zu zentralisieren. +Dieses Modul zentralisiert die grundlegende Kafka-Konfiguration für das gesamte Projekt. -* **Zweck:** Definiert Spring-Beans für die grundlegende Kafka-Konfiguration. Dazu gehören: - * Die Adresse der Kafka-Broker (`bootstrap-servers`). - * Konfiguration für Serializer und Deserializer (z.B. `JsonSerializer` von Spring Kafka), um sicherzustellen, dass alle Services Nachrichten im selben Format (JSON) austauschen. - * Konfiguration für Topics, Partitionen und Replikationsfaktoren. -* **Vorteil:** Jeder Service, der Kafka nutzt, kann sich auf diese zentrale Konfiguration verlassen, was die Konsistenz sicherstellt und die Einrichtung neuer Producer oder Consumer vereinfacht. +* **Zweck:** Definiert Spring-Beans für die `ProducerFactory` (Basis für Producer) und eine `Map` mit Standard-Konfigurationen für Consumer (z.B. `bootstrap-servers`, `group-id`, Serializer). +* **Vorteil:** Stellt Konsistenz sicher und vereinfacht die Einrichtung neuer Producer oder Consumer in den Services. ### `messaging-client` -Dieses Modul baut auf `messaging-config` auf und stellt **wiederverwendbare High-Level-Komponenten** für die Interaktion mit Kafka bereit. +Dieses Modul baut auf der Konfiguration auf und stellt wiederverwendbare High-Level-Komponenten für die Interaktion mit Kafka bereit. -* **Zweck:** Stellt einfach zu verwendende Klassen oder Services zur Verfügung, z.B. einen `KafkaProducerService` zum Senden von Nachrichten und einen `KafkaConsumerService` zum Empfangen von Nachrichten. Es nutzt **Project Reactor** (`reactor-kafka`), um eine reaktive und nicht-blockierende Verarbeitung von Nachrichten zu ermöglichen. -* **Vorteil:** Kapselt die Komplexität der Kafka-Producer- und -Consumer-API. Ein Fach-Service muss nur noch eine Methode wie `producer.sendMessage("topic", message)` aufrufen, ohne sich um die Details der Verbindung, Serialisierung oder Fehlerbehandlung kümmern zu müssen. +* **Zweck:** + * **`KafkaEventPublisher`**: Ein reaktiver, nicht-blockierender Service zum Senden von Nachrichten. Er nutzt den `ReactiveKafkaProducerTemplate` von Spring. + * **`KafkaEventConsumer`**: Ein reaktiver Service zum Empfangen von Nachrichten. Er kapselt die Komplexität von `reactor-kafka` und gibt einen kontinuierlichen `Flux`-Stream von Events zurück. +* **Vorteil:** Kapselt die Komplexität der reaktiven Kafka-API. Ein Fach-Service muss nur noch reaktive Streams (`Mono`, `Flux`) handhaben, ohne sich um die Details der Kafka-Interaktion zu kümmern. -## Verwendung in anderen Modulen +## Verwendung -Ein Microservice, der Nachrichten senden oder empfangen möchte, geht wie folgt vor: +Ein Microservice, der Nachrichten senden oder empfangen möchte, deklariert eine Abhängigkeit zu `:infrastructure:messaging:messaging-client` und injiziert die entsprechenden Interfaces. -1. **Abhängigkeit deklarieren:** Das Service-Modul (z.B. `events-service`) fügt eine `implementation`-Abhängigkeit zu `:infrastructure:messaging:messaging-client` in seiner `build.gradle.kts` hinzu. - -2. **Client-Service injizieren:** Im Service-Code wird der `KafkaProducerService` oder `KafkaConsumerService` per Dependency Injection angefordert. - - ```kotlin - // Beispiel für das Senden einer Nachricht - @Service - class EventNotificationService( - private val kafkaProducer: KafkaProducerService - ) { - fun notifyNewEvent(eventDetails: EventDetails) { - val topic = "new-events-topic" - // Einfacher Aufruf zum Senden der Nachricht. - // Die Komplexität der Serialisierung und des Sendens ist gekapselt. - kafkaProducer.sendMessage(topic, eventDetails.id, eventDetails) - .subscribe( - { result -> logger.info("Message sent successfully to topic '{}'", topic) }, - { error -> logger.error("Failed to send message to topic '{}'", topic, error) } - ) - } +**Beispiel für das Senden einer Nachricht (nicht-blockierend):** +```kotlin +@Service +class EventNotificationService( + private val eventPublisher: EventPublisher +) { + fun notifyNewEvent(eventDetails: EventDetails) { + val topic = "new-events-topic" + eventPublisher.publishEvent(topic, eventDetails.id, eventDetails) + .subscribe( + null, // onComplete: Nichts zu tun + { error -> logger.error("Failed to send message to topic '{}'", topic, error) } + ) + // Die Methode kehrt sofort zurück, ohne auf die Bestätigung von Kafka zu warten. } - ```kotlin - // Beispiel für das Empfangen von Nachrichten - @Component - class EventListener( - private val kafkaConsumer: KafkaConsumerService - ) { - @PostConstruct - fun listenForEvents() { - val topic = "new-events-topic" - // Reaktiv auf eingehende Nachrichten lauschen. - kafkaConsumer.receiveMessages(topic) - .subscribe { event -> - logger.info("Received new event with ID: {}", event.id) - // Geschäftslogik zur Verarbeitung des Events... - } - } - } - ``` +} +``` -Diese Architektur ermöglicht eine saubere, robuste und hochgradig entkoppelte Kommunikation zwischen den Diensten. +**Beispiel für das Empfangen von Nachrichten (reaktiv):** +```kotlin +@Component +class EventListener( + private val eventConsumer: EventConsumer +) { + @PostConstruct + fun listenForEvents() { + val topic = "new-events-topic" + eventConsumer.receiveEvents(topic) + .subscribe { event -> + logger.info("Received new event with ID: {}", event.id) + // Geschäftslogik zur Verarbeitung des Events... + } + } +} +``` + +## Testing-Strategie + +Die Zuverlässigkeit des Moduls wird durch einen umfassenden Integrationstest sichergestellt, der auf dem "Goldstandard"-Prinzip beruht: + +* **Testcontainers: Der KafkaIntegrationTest startet einen echten Apachen Kafka Docker-Container, um die Funktionalität unter realen Bedingungen zu validieren.* + +* **Reaktives Testen: Der Test nutzt Project Reactor's StepVerifier, um die reaktiven Streams (Mono, Flux) deterministisch und ohne unzuverlässige Thread.sleep-Aufrufe zu überprüfen.* + +* **Lifecycle Management: Der Test-Lebenszyklus wird sauber über @BeforeEach und @AfterEach verwaltet, um sicherzustellen, dass alle Ressourcen (insbesondere Producer-Threads) nach jedem Test korrekt freigegeben werden.* --- -**Letzte Aktualisierung**: 31. Juli 2025 + +**Letzte Aktualisierung**: 9. August 2025 diff --git a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventConsumer.kt b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventConsumer.kt new file mode 100644 index 00000000..ef6901da --- /dev/null +++ b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventConsumer.kt @@ -0,0 +1,30 @@ +package at.mocode.infrastructure.messaging.client + +import reactor.core.publisher.Flux + +/** + * A generic, reactive interface for consuming events from a message broker. + */ +interface EventConsumer { + + /** + * Receives a continuous stream of events from the specified topic. + * + * This method returns a cold Flux, meaning that the consumer will only start + * listening for messages once the Flux is subscribed to. + * + * @param T The expected type of the event payload. + * @param topic The topic to subscribe to. + * @return A reactive stream (Flux) of events of type T. + */ + fun receiveEvents(topic: String, eventType: Class): Flux +} + +/** + * Kotlin-idiomatic extension function for `receiveEvents` using reified types. + * + * Example: `consumer.receiveEvents("my-topic").subscribe { ... }` + */ +inline fun EventConsumer.receiveEvents(topic: String): Flux { + return this.receiveEvents(topic, T::class.java) +} diff --git a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventPublisher.kt b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventPublisher.kt index 04e943ec..c3703565 100644 --- a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventPublisher.kt +++ b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventPublisher.kt @@ -1,24 +1,22 @@ package at.mocode.infrastructure.messaging.client +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + /** * Interface for publishing domain events to message broker. */ interface EventPublisher { /** - * Publishes an event to the specified topic. - * - * @param topic The topic to publish to - * @param key The message key (optional) - * @param event The event to publish + * Publishes a single event to the specified topic. + * Returns a Mono that completes when the send operation is finished. */ - suspend fun publishEvent(topic: String, key: String? = null, event: Any) + fun publishEvent(topic: String, key: String? = null, event: Any): Mono /** * Publishes multiple events to the specified topic. - * - * @param topic The topic to publish to - * @param events The events to publish with their keys + * Returns a Flux that completes when all send operations are finished. */ - suspend fun publishEvents(topic: String, events: List>) + fun publishEvents(topic: String, events: List>): Flux } diff --git a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventConsumer.kt b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventConsumer.kt new file mode 100644 index 00000000..d44f4481 --- /dev/null +++ b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventConsumer.kt @@ -0,0 +1,48 @@ +package at.mocode.infrastructure.messaging.client + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.slf4j.LoggerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.stereotype.Component +import reactor.core.publisher.Flux +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import java.util.Collections + +/** + * A reactive, non-blocking Kafka implementation of the EventConsumer interface. + */ +@Component +class KafkaEventConsumer( + // Wir injizieren die Basis-Konfigurationseigenschaften aus messaging-config + private val consumerConfig: Map +) : EventConsumer { + + private val logger = LoggerFactory.getLogger(KafkaEventConsumer::class.java) + + override fun receiveEvents(topic: String, eventType: Class): Flux { + // Für jeden Aufruf wird eine neue, spezifische Konfiguration für diesen Topic erstellt. + val receiverOptions = ReceiverOptions.create(consumerConfig) + .subscription(Collections.singleton(topic)) + .withValueDeserializer(JsonDeserializer(eventType).trustedPackages("*")) + .addAssignListener { partitions -> + logger.info("Partitions assigned for topic '{}': {}", topic, partitions) + } + .addRevokeListener { partitions -> + logger.warn("Partitions revoked for topic '{}': {}", topic, partitions) + } + + return KafkaReceiver.create(receiverOptions) + .receive() + .doOnNext { record -> + logger.debug( + "Received message from topic-partition {}-{} with offset {}", + record.topic(), record.partition(), record.offset() + ) + } + .map { it.value() } // Extrahiere nur die deserialisierte Nachricht + .doOnError { exception -> + logger.error("Error receiving events from topic '{}'", topic, exception) + } + } +} diff --git a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventPublisher.kt b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventPublisher.kt index 58f518c2..9dc13097 100644 --- a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventPublisher.kt +++ b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventPublisher.kt @@ -1,49 +1,46 @@ package at.mocode.infrastructure.messaging.client -import kotlinx.coroutines.future.await import org.slf4j.LoggerFactory -import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate import org.springframework.stereotype.Component +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono /** - * Kafka implementation of EventPublisher. + * A reactive, non-blocking Kafka implementation of EventPublisher. */ @Component class KafkaEventPublisher( - private val kafkaTemplate: KafkaTemplate + // KORREKTUR: Verwendung des reaktiven Templates + private val reactiveKafkaTemplate: ReactiveKafkaProducerTemplate ) : EventPublisher { private val logger = LoggerFactory.getLogger(KafkaEventPublisher::class.java) - override suspend fun publishEvent(topic: String, key: String?, event: Any) { - try { - logger.debug("Publishing event to topic '{}' with key '{}'", topic, key) - - val sendResult = if (key != null) { - kafkaTemplate.send(topic, key, event).get() - } else { - kafkaTemplate.send(topic, event).get() + override fun publishEvent(topic: String, key: String?, event: Any): Mono { + logger.debug("Publishing event to topic '{}' with key '{}'", topic, key) + return reactiveKafkaTemplate.send(topic, key, event) + .doOnSuccess { result -> + val record = result.recordMetadata() + logger.info( + "Successfully published event to topic-partition {}-{} with offset {}", + record.topic(), record.partition(), record.offset() + ) } - - logger.info("Successfully published event to topic '{}' with key '{}'", topic, key) - } catch (exception: Exception) { - logger.error("Failed to publish event to topic '{}' with key '{}'", topic, key, exception) - throw exception - } + .doOnError { exception -> + logger.error("Failed to publish event to topic '{}' with key '{}'", topic, key, exception) + } + .then() // Wandelt das Ergebnis in ein Mono um } - override suspend fun publishEvents(topic: String, events: List>) { - try { - logger.debug("Publishing {} events to topic '{}'", events.size, topic) - - events.forEach { (key, event) -> + override fun publishEvents(topic: String, events: List>): Flux { + logger.debug("Publishing {} events to topic '{}'", events.size, topic) + // Verwendet Flux.fromIterable, um eine Sequenz von Sende-Operationen zu erstellen + return Flux.fromIterable(events) + // .flatMap stellt sicher, dass die Sende-Operationen parallelisiert, + // aber dennoch reaktiv (nicht-blockierend) ausgeführt werden. + .flatMap { (key, event) -> publishEvent(topic, key, event) } - - logger.info("Successfully published {} events to topic '{}'", events.size, topic) - } catch (exception: Exception) { - logger.error("Failed to publish events to topic '{}'", topic, exception) - throw exception - } } } diff --git a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/ReactiveKafkaConfig.kt b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/ReactiveKafkaConfig.kt new file mode 100644 index 00000000..e8cb8b1c --- /dev/null +++ b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/ReactiveKafkaConfig.kt @@ -0,0 +1,18 @@ +package at.mocode.infrastructure.messaging.client + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate +import reactor.kafka.sender.SenderOptions + +@Configuration +class ReactiveKafkaConfig { + + @Bean + fun reactiveKafkaProducerTemplate(producerFactory: ProducerFactory): ReactiveKafkaProducerTemplate { + // Nutzt die ProducerFactory aus dem messaging-config-Modul + val senderOptions = SenderOptions.create(producerFactory.configurationProperties) + return ReactiveKafkaProducerTemplate(senderOptions) + } +} diff --git a/infrastructure/messaging/messaging-client/src/test/kotlin/at/mocode/infrastructure/messaging/client/KafkaIntegrationTest.kt b/infrastructure/messaging/messaging-client/src/test/kotlin/at/mocode/infrastructure/messaging/client/KafkaIntegrationTest.kt new file mode 100644 index 00000000..285f175e --- /dev/null +++ b/infrastructure/messaging/messaging-client/src/test/kotlin/at/mocode/infrastructure/messaging/client/KafkaIntegrationTest.kt @@ -0,0 +1,85 @@ +// KafkaIntegrationTest.kt + +package at.mocode.infrastructure.messaging.client + +import at.mocode.infrastructure.messaging.config.KafkaConfig +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.junit.jupiter.Testcontainers +import org.testcontainers.utility.DockerImageName +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import reactor.test.StepVerifier +import java.util.* + +@Testcontainers +class KafkaIntegrationTest { + + companion object { + @Container + private val kafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0")) + } + + private lateinit var kafkaEventPublisher: KafkaEventPublisher + private lateinit var producerFactory: DefaultKafkaProducerFactory + private val testTopic = "test-topic-${UUID.randomUUID()}" + + @BeforeEach + fun setUp() { + val kafkaConfig = KafkaConfig().apply { + bootstrapServers = kafkaContainer.bootstrapServers + } + producerFactory = kafkaConfig.producerFactory() as DefaultKafkaProducerFactory + + val reactiveKafkaConfig = ReactiveKafkaConfig() + val reactiveTemplate = reactiveKafkaConfig.reactiveKafkaProducerTemplate(producerFactory) + kafkaEventPublisher = KafkaEventPublisher(reactiveTemplate) + } + + @AfterEach + fun tearDown() { + producerFactory.destroy() + } + + @Test + fun `publishEvent should send a message that can be received`() { + // Arrange + val testKey = "test-key" + val testEvent = TestEvent("Test Message") + + val consumerProps = mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaContainer.bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to "test-group-${UUID.randomUUID()}", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + JsonDeserializer.TRUSTED_PACKAGES to "*" + ) + val receiverOptions = ReceiverOptions.create(consumerProps).subscription(listOf(testTopic)) + + // Der Mono, der das nächste empfangene Ereignis darstellt + val receivedEvent = KafkaReceiver.create(receiverOptions) + .receive() + .next() // Nimm nur das erste Ereignis + .map { it.value() } // Extrahiere den Wert (unsere TestEvent-Instanz) + + // Der Mono, der die Sende-Aktion darstellt + val sendAction = kafkaEventPublisher.publishEvent(testTopic, testKey, testEvent) + + // KORREKTUR: Kombiniere die Sende-Aktion und die Empfangs-Erwartung in einem StepVerifier. + // Die `then` Methode stellt sicher, dass erst die Sende-Aktion abgeschlossen wird, + // bevor der `receivedEvent` Mono abonniert und verifiziert wird. + StepVerifier.create(sendAction.then(receivedEvent)) + .expectNext(testEvent) // Erwarte, dass unser Test-Event ankommt + .verifyComplete() // Schließe die Überprüfung ab + } + + data class TestEvent(val message: String) +} diff --git a/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt b/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt index 23fe4a09..22730efa 100644 --- a/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt +++ b/infrastructure/messaging/messaging-config/src/main/kotlin/at/mocode/infrastructure/messaging/config/KafkaConfig.kt @@ -1,6 +1,8 @@ package at.mocode.infrastructure.messaging.config +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean @@ -8,16 +10,18 @@ import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer import org.springframework.kafka.support.serializer.JsonSerializer -/** - * Kafka configuration for event publishing. - */ @Configuration class KafkaConfig { + // KORREKTUR: Von lateinit zu einer public var mit Standardwert, um Tests zu ermöglichen @Value($$"${spring.kafka.bootstrap-servers:localhost:9092}") - private lateinit var bootstrapServers: String + var bootstrapServers: String = "localhost:9092" + + @Value("\${spring.kafka.consumer.group-id:meldestelle-group}") + private lateinit var consumerGroupId: String @Bean fun producerFactory(): ProducerFactory { @@ -34,7 +38,20 @@ class KafkaConfig { } @Bean - fun kafkaTemplate(): KafkaTemplate { - return KafkaTemplate(producerFactory()) + fun kafkaTemplate(producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) + } + + // NEU: Stellt eine zentrale Map mit den Basis-Konfigurationen für alle Consumer bereit. + @Bean + fun kafkaConsumerConfiguration(): Map { + return mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to consumerGroupId, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", // Beginne davon am Anfang, wenn kein Offset existiert + JsonDeserializer.TRUSTED_PACKAGES to "*" // Erlaube Deserialisierung aller unserer Klassen + ) } } diff --git a/platform/platform-testing/build.gradle.kts b/platform/platform-testing/build.gradle.kts index 91f0e291..60f7bb65 100644 --- a/platform/platform-testing/build.gradle.kts +++ b/platform/platform-testing/build.gradle.kts @@ -12,6 +12,10 @@ dependencies { api(libs.bundles.testing.jvm) api(libs.bundles.testcontainers) + // Macht Kafka- und Reactor-Test-Bibliotheken verfügbar + api(libs.testcontainers.kafka) + api(libs.reactor.test) + // Stellt Spring Boot Test-Abhängigkeiten und die H2-Datenbank für Tests bereit. api(libs.spring.boot.starter.test) api(libs.h2.driver)