feat(infra-messaging): Implement fully reactive Kafka producer and consumer

This commit introduces a comprehensive refactoring of the messaging module to establish a fully reactive, non-blocking, and robust infrastructure for Kafka-based communication.

Features & Refinements
Reactive Publisher:

The KafkaEventPublisher has been refactored from a blocking implementation (KafkaTemplate) to a fully non-blocking, reactive one using Spring's ReactiveKafkaProducerTemplate.

The EventPublisher interface now returns reactive types (Mono, Flux) to reflect the asynchronous nature of the operations.

Reactive Consumer:

A new KafkaEventConsumer has been implemented, providing a standardized, reusable, and reactive way for services to consume events.

It encapsulates the complexity of reactor-kafka and exposes a simple receiveEvents<T>(topic) method that returns a Flux<T>.

Architectural Cleanup:

The Spring configuration has been split. The basic ProducerFactory and consumer properties reside in messaging-config, while the reactive-specific ReactiveKafkaProducerTemplate bean is now correctly located in messaging-client.

Testing
Added Kafka Integration Test: A new KafkaIntegrationTest has been created to ensure the reliability of the messaging infrastructure.

The test uses Testcontainers to spin up a real Apache Kafka broker for end-to-end validation.

Project Reactor's StepVerifier is used to test the reactive streams deterministically, avoiding flaky tests.

The test correctly manages the lifecycle of Kafka producers to ensure clean shutdown without hanging threads.

Bug Fixes
Resolved UninitializedPropertyAccessException in tests by making the KafkaConfig test-friendly.

Fixed IllegalStateException related to Testcontainers lifecycle by making the container a static resource.

Corrected compilation errors in tests related to resource cleanup by using the concrete DefaultKafkaProducerFactory type.
This commit is contained in:
2025-08-10 00:02:59 +02:00
parent f9927066a2
commit d87a5a4a93
10 changed files with 302 additions and 99 deletions
@@ -0,0 +1,30 @@
package at.mocode.infrastructure.messaging.client
import reactor.core.publisher.Flux
/**
* A generic, reactive interface for consuming events from a message broker.
*/
interface EventConsumer {
/**
* Receives a continuous stream of events from the specified topic.
*
* This method returns a cold Flux, meaning that the consumer will only start
* listening for messages once the Flux is subscribed to.
*
* @param T The expected type of the event payload.
* @param topic The topic to subscribe to.
* @return A reactive stream (Flux) of events of type T.
*/
fun <T : Any> receiveEvents(topic: String, eventType: Class<T>): Flux<T>
}
/**
* Kotlin-idiomatic extension function for `receiveEvents` using reified types.
*
* Example: `consumer.receiveEvents<MyEvent>("my-topic").subscribe { ... }`
*/
inline fun <reified T : Any> EventConsumer.receiveEvents(topic: String): Flux<T> {
return this.receiveEvents(topic, T::class.java)
}
@@ -1,24 +1,22 @@
package at.mocode.infrastructure.messaging.client
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
/**
* Interface for publishing domain events to message broker.
*/
interface EventPublisher {
/**
* Publishes an event to the specified topic.
*
* @param topic The topic to publish to
* @param key The message key (optional)
* @param event The event to publish
* Publishes a single event to the specified topic.
* Returns a Mono that completes when the send operation is finished.
*/
suspend fun publishEvent(topic: String, key: String? = null, event: Any)
fun publishEvent(topic: String, key: String? = null, event: Any): Mono<Void>
/**
* Publishes multiple events to the specified topic.
*
* @param topic The topic to publish to
* @param events The events to publish with their keys
* Returns a Flux that completes when all send operations are finished.
*/
suspend fun publishEvents(topic: String, events: List<Pair<String?, Any>>)
fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Flux<Void>
}
@@ -0,0 +1,48 @@
package at.mocode.infrastructure.messaging.client
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import java.util.Collections
/**
* A reactive, non-blocking Kafka implementation of the EventConsumer interface.
*/
@Component
class KafkaEventConsumer(
// Wir injizieren die Basis-Konfigurationseigenschaften aus messaging-config
private val consumerConfig: Map<String, Any>
) : EventConsumer {
private val logger = LoggerFactory.getLogger(KafkaEventConsumer::class.java)
override fun <T : Any> receiveEvents(topic: String, eventType: Class<T>): Flux<T> {
// Für jeden Aufruf wird eine neue, spezifische Konfiguration für diesen Topic erstellt.
val receiverOptions = ReceiverOptions.create<String, T>(consumerConfig)
.subscription(Collections.singleton(topic))
.withValueDeserializer(JsonDeserializer(eventType).trustedPackages("*"))
.addAssignListener { partitions ->
logger.info("Partitions assigned for topic '{}': {}", topic, partitions)
}
.addRevokeListener { partitions ->
logger.warn("Partitions revoked for topic '{}': {}", topic, partitions)
}
return KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext { record ->
logger.debug(
"Received message from topic-partition {}-{} with offset {}",
record.topic(), record.partition(), record.offset()
)
}
.map { it.value() } // Extrahiere nur die deserialisierte Nachricht
.doOnError { exception ->
logger.error("Error receiving events from topic '{}'", topic, exception)
}
}
}
@@ -1,49 +1,46 @@
package at.mocode.infrastructure.messaging.client
import kotlinx.coroutines.future.await
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
/**
* Kafka implementation of EventPublisher.
* A reactive, non-blocking Kafka implementation of EventPublisher.
*/
@Component
class KafkaEventPublisher(
private val kafkaTemplate: KafkaTemplate<String, Any>
// KORREKTUR: Verwendung des reaktiven Templates
private val reactiveKafkaTemplate: ReactiveKafkaProducerTemplate<String, Any>
) : EventPublisher {
private val logger = LoggerFactory.getLogger(KafkaEventPublisher::class.java)
override suspend fun publishEvent(topic: String, key: String?, event: Any) {
try {
logger.debug("Publishing event to topic '{}' with key '{}'", topic, key)
val sendResult = if (key != null) {
kafkaTemplate.send(topic, key, event).get()
} else {
kafkaTemplate.send(topic, event).get()
override fun publishEvent(topic: String, key: String?, event: Any): Mono<Void> {
logger.debug("Publishing event to topic '{}' with key '{}'", topic, key)
return reactiveKafkaTemplate.send(topic, key, event)
.doOnSuccess { result ->
val record = result.recordMetadata()
logger.info(
"Successfully published event to topic-partition {}-{} with offset {}",
record.topic(), record.partition(), record.offset()
)
}
logger.info("Successfully published event to topic '{}' with key '{}'", topic, key)
} catch (exception: Exception) {
logger.error("Failed to publish event to topic '{}' with key '{}'", topic, key, exception)
throw exception
}
.doOnError { exception ->
logger.error("Failed to publish event to topic '{}' with key '{}'", topic, key, exception)
}
.then() // Wandelt das Ergebnis in ein Mono<Void> um
}
override suspend fun publishEvents(topic: String, events: List<Pair<String?, Any>>) {
try {
logger.debug("Publishing {} events to topic '{}'", events.size, topic)
events.forEach { (key, event) ->
override fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Flux<Void> {
logger.debug("Publishing {} events to topic '{}'", events.size, topic)
// Verwendet Flux.fromIterable, um eine Sequenz von Sende-Operationen zu erstellen
return Flux.fromIterable(events)
// .flatMap stellt sicher, dass die Sende-Operationen parallelisiert,
// aber dennoch reaktiv (nicht-blockierend) ausgeführt werden.
.flatMap { (key, event) ->
publishEvent(topic, key, event)
}
logger.info("Successfully published {} events to topic '{}'", events.size, topic)
} catch (exception: Exception) {
logger.error("Failed to publish events to topic '{}'", topic, exception)
throw exception
}
}
}
@@ -0,0 +1,18 @@
package at.mocode.infrastructure.messaging.client
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import reactor.kafka.sender.SenderOptions
@Configuration
class ReactiveKafkaConfig {
@Bean
fun reactiveKafkaProducerTemplate(producerFactory: ProducerFactory<String, Any>): ReactiveKafkaProducerTemplate<String, Any> {
// Nutzt die ProducerFactory aus dem messaging-config-Modul
val senderOptions = SenderOptions.create<String, Any>(producerFactory.configurationProperties)
return ReactiveKafkaProducerTemplate(senderOptions)
}
}
@@ -0,0 +1,85 @@
// KafkaIntegrationTest.kt
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import reactor.test.StepVerifier
import java.util.*
@Testcontainers
class KafkaIntegrationTest {
companion object {
@Container
private val kafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"))
}
private lateinit var kafkaEventPublisher: KafkaEventPublisher
private lateinit var producerFactory: DefaultKafkaProducerFactory<String, Any>
private val testTopic = "test-topic-${UUID.randomUUID()}"
@BeforeEach
fun setUp() {
val kafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
}
producerFactory = kafkaConfig.producerFactory() as DefaultKafkaProducerFactory<String, Any>
val reactiveKafkaConfig = ReactiveKafkaConfig()
val reactiveTemplate = reactiveKafkaConfig.reactiveKafkaProducerTemplate(producerFactory)
kafkaEventPublisher = KafkaEventPublisher(reactiveTemplate)
}
@AfterEach
fun tearDown() {
producerFactory.destroy()
}
@Test
fun `publishEvent should send a message that can be received`() {
// Arrange
val testKey = "test-key"
val testEvent = TestEvent("Test Message")
val consumerProps = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaContainer.bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG to "test-group-${UUID.randomUUID()}",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
JsonDeserializer.TRUSTED_PACKAGES to "*"
)
val receiverOptions = ReceiverOptions.create<String, TestEvent>(consumerProps).subscription(listOf(testTopic))
// Der Mono, der das nächste empfangene Ereignis darstellt
val receivedEvent = KafkaReceiver.create(receiverOptions)
.receive()
.next() // Nimm nur das erste Ereignis
.map { it.value() } // Extrahiere den Wert (unsere TestEvent-Instanz)
// Der Mono, der die Sende-Aktion darstellt
val sendAction = kafkaEventPublisher.publishEvent(testTopic, testKey, testEvent)
// KORREKTUR: Kombiniere die Sende-Aktion und die Empfangs-Erwartung in einem StepVerifier.
// Die `then` Methode stellt sicher, dass erst die Sende-Aktion abgeschlossen wird,
// bevor der `receivedEvent` Mono abonniert und verifiziert wird.
StepVerifier.create(sendAction.then(receivedEvent))
.expectNext(testEvent) // Erwarte, dass unser Test-Event ankommt
.verifyComplete() // Schließe die Überprüfung ab
}
data class TestEvent(val message: String)
}