feat(Tracer Bullet)

This commit is contained in:
2025-08-11 23:47:05 +02:00
parent 582678e226
commit a50b1b3822
43 changed files with 1665 additions and 292 deletions
@@ -1,5 +1,6 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer
@@ -7,42 +8,109 @@ import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import reactor.util.retry.Retry
import java.time.Duration
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
/**
* A reactive, non-blocking Kafka implementation of the EventConsumer interface.
* A reactive, non-blocking Kafka implementation of the EventConsumer interface
* with optimized connection pooling, security, and error handling.
*/
@Component
class KafkaEventConsumer(
// Wir injizieren die Basis-Konfigurationseigenschaften aus messaging-config
private val consumerConfig: Map<String, Any>
private val kafkaConfig: KafkaConfig
) : EventConsumer {
private val logger = LoggerFactory.getLogger(KafkaEventConsumer::class.java)
override fun <T : Any> receiveEvents(topic: String, eventType: Class<T>): Flux<T> {
// Für jeden Aufruf wird eine neue, spezifische Konfiguration für diesen Topic erstellt.
val receiverOptions = ReceiverOptions.create<String, T>(consumerConfig)
.subscription(Collections.singleton(topic))
.withValueDeserializer(JsonDeserializer(eventType).trustedPackages("*"))
.addAssignListener { partitions ->
logger.info("Partitions assigned for topic '{}': {}", topic, partitions)
}
.addRevokeListener { partitions ->
logger.warn("Partitions revoked for topic '{}': {}", topic, partitions)
}
// Connection pool to reuse KafkaReceiver instances per topic-eventType combination
private val receiverCache = ConcurrentHashMap<String, KafkaReceiver<String, Any>>()
return KafkaReceiver.create(receiverOptions)
.receive()
override fun <T : Any> receiveEvents(topic: String, eventType: Class<T>): Flux<T> {
logger.info("Setting up reactive consumer for topic '{}' with event type '{}'", topic, eventType.simpleName)
val cacheKey = "${topic}-${eventType.name}"
// Get or create a cached receiver for this topic-eventType combination
@Suppress("UNCHECKED_CAST")
val receiver = receiverCache.computeIfAbsent(cacheKey) {
createOptimizedReceiver<T>(topic, eventType) as KafkaReceiver<String, Any>
} as KafkaReceiver<String, T>
return receiver.receive()
.doOnNext { record ->
logger.debug(
"Received message from topic-partition {}-{} with offset {}",
record.topic(), record.partition(), record.offset()
"Received message from topic-partition {}-{} with offset {} for event type '{}'",
record.topic(), record.partition(), record.offset(), eventType.simpleName
)
}
.map { it.value() } // Extrahiere nur die deserialisierte Nachricht
.doOnError { exception ->
logger.error("Error receiving events from topic '{}'", topic, exception)
.map { record ->
// Manual commit acknowledgment for better control
record.receiverOffset().acknowledge()
record.value()
}
.doOnError { exception ->
logger.error("Error receiving events from topic '{}' for event type '{}'",
topic, eventType.simpleName, exception)
}
.retryWhen(
Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.doBeforeRetry { retrySignal ->
logger.warn("Retrying consumer for topic '{}', attempt: {}, error: {}",
topic, retrySignal.totalRetries() + 1, retrySignal.failure().message)
}
.onRetryExhaustedThrow { _, retrySignal ->
logger.error("Consumer retry exhausted for topic '{}' after {} attempts",
topic, retrySignal.totalRetries())
retrySignal.failure()
}
)
}
/**
* Creates an optimized KafkaReceiver with secure configuration and performance tuning.
*/
private fun <T : Any> createOptimizedReceiver(topic: String, eventType: Class<T>): KafkaReceiver<String, T> {
// Generate unique group ID for this consumer instance
val groupId = "${kafkaConfig.defaultGroupIdPrefix}-${topic}-${eventType.simpleName.lowercase()}"
val consumerConfig = kafkaConfig.consumerConfigs(groupId)
// Create type-safe JSON deserializer with restricted trusted packages
val jsonDeserializer = JsonDeserializer(eventType).apply {
// Use restricted trusted packages instead of wildcard for security
addTrustedPackages(kafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, T>(consumerConfig)
.subscription(Collections.singleton(topic))
.withValueDeserializer(jsonDeserializer)
.addAssignListener { partitions ->
logger.info("Consumer '{}' assigned partitions for topic '{}': {}",
groupId, topic, partitions.map { "${it.topicPartition().topic()}-${it.topicPartition().partition()}" })
}
.addRevokeListener { partitions ->
logger.warn("Consumer '{}' revoked partitions for topic '{}': {}",
groupId, topic, partitions.map { "${it.topicPartition().topic()}-${it.topicPartition().partition()}" })
}
// Enable commit interval for manual acknowledgment control
.commitInterval(Duration.ofSeconds(5))
.commitBatchSize(100)
return KafkaReceiver.create(receiverOptions)
}
/**
* Cleanup method to clear cached receivers on application shutdown.
* Reactive receivers will be automatically cleaned up when their streams complete.
*/
@jakarta.annotation.PreDestroy
fun cleanup() {
logger.info("Cleaning up Kafka consumer cache...")
val cacheSize = receiverCache.size
receiverCache.clear()
logger.info("Kafka consumer cleanup completed. Cleared {} cached receivers", cacheSize)
}
}
@@ -5,42 +5,121 @@ import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.util.retry.Retry
import java.time.Duration
/**
* A reactive, non-blocking Kafka implementation of EventPublisher.
* A reactive, non-blocking Kafka implementation of EventPublisher with enhanced
* error handling, retry mechanisms, and optimized batch processing.
*/
@Component
class KafkaEventPublisher(
// KORREKTUR: Verwendung des reaktiven Templates
private val reactiveKafkaTemplate: ReactiveKafkaProducerTemplate<String, Any>
) : EventPublisher {
private val logger = LoggerFactory.getLogger(KafkaEventPublisher::class.java)
companion object {
private const val DEFAULT_RETRY_ATTEMPTS = 3L
private const val DEFAULT_RETRY_DELAY_SECONDS = 1L
private const val DEFAULT_MAX_BACKOFF_SECONDS = 10L
private const val DEFAULT_BATCH_CONCURRENCY = 10
}
override fun publishEvent(topic: String, key: String?, event: Any): Mono<Void> {
logger.debug("Publishing event to topic '{}' with key '{}'", topic, key)
logger.debug("Publishing event to topic '{}' with key '{}', event type: '{}'",
topic, key, event::class.simpleName)
return reactiveKafkaTemplate.send(topic, key, event)
.doOnSuccess { result ->
val record = result.recordMetadata()
logger.info(
"Successfully published event to topic-partition {}-{} with offset {}",
record.topic(), record.partition(), record.offset()
logger.debug(
"Successfully published event to topic-partition {}-{} with offset {} (key: '{}')",
record.topic(), record.partition(), record.offset(), key
)
}
.doOnError { exception ->
logger.error("Failed to publish event to topic '{}' with key '{}'", topic, key, exception)
logger.warn("Failed to publish event to topic '{}' with key '{}' - will retry if configured",
topic, key, exception)
}
.then() // Wandelt das Ergebnis in ein Mono<Void> um
.retryWhen(createRetrySpec(topic, key))
.doOnError { exception ->
logger.error("Final failure after retries: Failed to publish event to topic '{}' with key '{}'",
topic, key, exception)
}
.then()
}
override fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Flux<Void> {
logger.debug("Publishing {} events to topic '{}'", events.size, topic)
// Verwendet Flux.fromIterable, um eine Sequenz von Sende-Operationen zu erstellen
if (events.isEmpty()) {
logger.debug("No events to publish to topic '{}'", topic)
return Flux.empty()
}
logger.info("Publishing {} events to topic '{}' using optimized batch processing", events.size, topic)
return Flux.fromIterable(events)
// .flatMap stellt sicher, dass die Sende-Operationen parallelisiert,
// aber dennoch reaktiv (nicht-blockierend) ausgeführt werden.
.flatMap { (key, event) ->
.index() // Add index for progress tracking
.flatMap({ indexedEventPair ->
val index = indexedEventPair.t1
val eventPair = indexedEventPair.t2
val (key, event) = eventPair
publishEvent(topic, key, event)
.doOnSuccess {
if ((index + 1) % 100 == 0L || index == events.size.toLong() - 1) {
logger.info("Batch progress: {}/{} events published to topic '{}'",
index + 1, events.size, topic)
}
}
.onErrorContinue { error, _ ->
logger.error("Error publishing event {} in batch to topic '{}': {}",
index + 1, topic, error.message)
}
}, DEFAULT_BATCH_CONCURRENCY) // Controlled concurrency for better resource management
.doOnComplete {
logger.info("Completed publishing batch of {} events to topic '{}'", events.size, topic)
}
.doOnError { error ->
logger.error("Batch publishing to topic '{}' failed with error: {}", topic, error.message)
}
}
/**
* Creates a retry specification with exponential backoff for robust error handling.
*/
private fun createRetrySpec(topic: String, key: String?): Retry =
Retry.backoff(DEFAULT_RETRY_ATTEMPTS, Duration.ofSeconds(DEFAULT_RETRY_DELAY_SECONDS))
.maxBackoff(Duration.ofSeconds(DEFAULT_MAX_BACKOFF_SECONDS))
.filter { exception ->
// Only retry on transient errors (not serialization errors, etc.)
isRetryableException(exception)
}
.doBeforeRetry { retrySignal ->
logger.info("Retrying publish to topic '{}' with key '{}', attempt: {}, error: {}",
topic, key, retrySignal.totalRetries() + 1,
retrySignal.failure().message?.take(100))
}
.onRetryExhaustedThrow { _, retrySignal ->
logger.error("Retry exhausted for topic '{}' with key '{}' after {} attempts",
topic, key, retrySignal.totalRetries())
retrySignal.failure()
}
/**
* Determines if an exception is retryable based on its type and characteristics.
*/
private fun isRetryableException(exception: Throwable): Boolean {
return when {
exception.message?.contains("timeout", ignoreCase = true) == true -> true
exception.message?.contains("connection", ignoreCase = true) == true -> true
exception.message?.contains("network", ignoreCase = true) == true -> true
exception is java.util.concurrent.TimeoutException -> true
exception is java.net.ConnectException -> true
exception is java.io.IOException -> true
// Don't retry serialization errors or authentication failures
exception.message?.contains("serializ", ignoreCase = true) == true -> false
exception.message?.contains("auth", ignoreCase = true) == true -> false
else -> true // Default to retryable for unknown exceptions
}
}
}
@@ -1,22 +1,57 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import reactor.kafka.sender.SenderOptions
import java.time.Duration
/**
* Reactive Kafka configuration utilities for creating a ReactiveKafkaProducerTemplate.
* Spring Configuration for reactive Kafka components with optimized settings.
*/
class ReactiveKafkaConfig {
@Configuration
class ReactiveKafkaConfig(
private val kafkaConfig: KafkaConfig
) {
private val logger = LoggerFactory.getLogger(ReactiveKafkaConfig::class.java)
/**
* Create a ReactiveKafkaProducerTemplate using the configuration from the given ProducerFactory.
* Creates a Spring Bean for the optimized ReactiveKafkaProducerTemplate.
* This template includes enhanced error handling, monitoring, and performance tuning.
*/
fun reactiveKafkaProducerTemplate(
producerFactory: DefaultKafkaProducerFactory<String, Any>
): ReactiveKafkaProducerTemplate<String, Any> {
@Bean
fun reactiveKafkaProducerTemplate(): ReactiveKafkaProducerTemplate<String, Any> {
logger.info("Creating optimized ReactiveKafkaProducerTemplate with enhanced configuration")
val producerFactory = kafkaConfig.producerFactory()
val props: Map<String, Any> = producerFactory.configurationProperties
val senderOptions: SenderOptions<String, Any> = SenderOptions.create(props)
return ReactiveKafkaProducerTemplate(senderOptions)
val senderOptions = SenderOptions.create<String, Any>(props)
// Enhanced sender options for better performance and reliability
.maxInFlight(1024) // Increase in-flight requests for better throughput
.scheduler(reactor.core.scheduler.Schedulers.boundedElastic()) // Use bounded elastic scheduler
.closeTimeout(Duration.ofSeconds(30)) // Give enough time for graceful shutdown
.stopOnError(false) // Continue processing even if some messages fail
return ReactiveKafkaProducerTemplate(senderOptions).apply {
// Configure additional properties if needed
logger.info("ReactiveKafkaProducerTemplate configured successfully with bootstrap servers: {}",
kafkaConfig.bootstrapServers)
}
}
/**
* Creates a KafkaConfig bean if not already provided.
* This allows for external configuration override while providing sensible defaults.
*/
@Bean
fun kafkaConfig(): KafkaConfig {
return KafkaConfig().apply {
logger.info("Initializing KafkaConfig with bootstrap servers: {}", bootstrapServers)
}
}
}
@@ -38,8 +38,8 @@ class KafkaIntegrationTest {
}
producerFactory = kafkaConfig.producerFactory()
val reactiveKafkaConfig = ReactiveKafkaConfig()
val reactiveTemplate = reactiveKafkaConfig.reactiveKafkaProducerTemplate(producerFactory)
val reactiveKafkaConfig = ReactiveKafkaConfig(kafkaConfig)
val reactiveTemplate = reactiveKafkaConfig.reactiveKafkaProducerTemplate()
kafkaEventPublisher = KafkaEventPublisher(reactiveTemplate)
}
@@ -54,19 +54,18 @@ class KafkaIntegrationTest {
val testKey = "test-key"
val testEvent = TestEvent("Test Message")
val consumerProps = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaContainer.bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG to "test-group-${UUID.randomUUID()}",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
JsonDeserializer.TRUSTED_PACKAGES to "*",
JsonDeserializer.USE_TYPE_INFO_HEADERS to false,
JsonDeserializer.VALUE_DEFAULT_TYPE to TestEvent::class.java.name
)
// Use the same KafkaConfig for consistent and secure configuration
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
// For tests, we need to trust the test package
trustedPackages = "at.mocode.*"
}
val consumerProps = testKafkaConfig.consumerConfigs("test-group-${UUID.randomUUID()}")
val jsonValueDeserializer = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages("*")
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, TestEvent>(consumerProps)
.withKeyDeserializer(StringDeserializer())