fixing(gradle)
This commit is contained in:
+7
-1
@@ -2,6 +2,8 @@ package at.mocode.infrastructure.messaging.client
|
||||
|
||||
import reactor.core.publisher.Flux
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.reactive.asPublisher
|
||||
|
||||
/**
|
||||
* A generic interface for consuming events from a message broker.
|
||||
@@ -51,5 +53,9 @@ inline fun <reified T : Any> EventConsumer.receiveEventsWithResult(topic: String
|
||||
*/
|
||||
@Deprecated("Use receiveEventsWithResult with Flow<Result<T>> instead", ReplaceWith("receiveEventsWithResult<T>(topic)"))
|
||||
inline fun <reified T : Any> EventConsumer.receiveEvents(topic: String): Flux<T> {
|
||||
return this.receiveEvents(topic, T::class.java)
|
||||
// Convert Flow<Result<T>> to Flux<T> for backward compatibility
|
||||
return this.receiveEventsWithResult<T>(topic)
|
||||
.map { result: Result<T> -> result.getOrThrow() }
|
||||
.asPublisher()
|
||||
.let { Flux.from(it) }
|
||||
}
|
||||
|
||||
+35
-1
@@ -31,7 +31,28 @@ class KafkaEventConsumer(
|
||||
override fun <T : Any> receiveEventsWithResult(topic: String, eventType: Class<T>): Flow<Result<T>> {
|
||||
logger.info("Setting up Result-based consumer for topic '{}' with event type '{}'", topic, eventType.simpleName)
|
||||
|
||||
return receiveEvents(topic, eventType)
|
||||
val cacheKey = "${topic}-${eventType.name}"
|
||||
val groupId = "${kafkaConfig.defaultGroupIdPrefix}-${topic}-${eventType.simpleName.lowercase()}"
|
||||
|
||||
// Get or create a cached receiver for this topic-eventType combination
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val receiver = receiverCache.computeIfAbsent(cacheKey) {
|
||||
createOptimizedReceiver<T>(topic, eventType) as KafkaReceiver<String, Any>
|
||||
} as KafkaReceiver<String, T>
|
||||
|
||||
return receiver.receive()
|
||||
.doOnNext { record ->
|
||||
logger.debug(
|
||||
"Received message from topic-partition {}-{} with offset {} for event type '{}' [groupId={}, timestamp={}]",
|
||||
record.topic(), record.partition(), record.offset(), eventType.simpleName,
|
||||
groupId, record.timestamp()
|
||||
)
|
||||
}
|
||||
.map { record ->
|
||||
// Manual commit acknowledgment for better control
|
||||
record.receiverOffset().acknowledge()
|
||||
record.value()
|
||||
}
|
||||
.map<Result<T>> { event -> Result.success(event) }
|
||||
.onErrorResume { exception ->
|
||||
logger.warn("Error occurred while consuming events from topic '{}' for event type '{}': {}",
|
||||
@@ -40,6 +61,19 @@ class KafkaEventConsumer(
|
||||
val messagingError = mapToMessagingError(exception)
|
||||
reactor.core.publisher.Mono.just(Result.failure<T>(messagingError))
|
||||
}
|
||||
.retryWhen(
|
||||
Retry.backoff(3, Duration.ofSeconds(1))
|
||||
.maxBackoff(Duration.ofSeconds(10))
|
||||
.doBeforeRetry { retrySignal ->
|
||||
logger.warn("Retrying consumer for topic '{}', attempt: {}, error: {}",
|
||||
topic, retrySignal.totalRetries() + 1, retrySignal.failure().message)
|
||||
}
|
||||
.onRetryExhaustedThrow { _, retrySignal ->
|
||||
logger.error("Consumer retry exhausted for topic '{}' after {} attempts",
|
||||
topic, retrySignal.totalRetries())
|
||||
retrySignal.failure()
|
||||
}
|
||||
)
|
||||
.doOnError { exception ->
|
||||
logger.error("Fatal error in consumer stream for topic '{}' and event type '{}': {}",
|
||||
topic, eventType.simpleName, exception.message, exception)
|
||||
|
||||
+64
-2
@@ -42,7 +42,27 @@ class KafkaEventPublisher(
|
||||
|
||||
override suspend fun publishEvent(topic: String, key: String?, event: Any): Result<Unit> {
|
||||
return try {
|
||||
publishEventReactive(topic, key, event).awaitSingle()
|
||||
logger.debug("Publishing event to topic '{}' with key '{}', event type: '{}'",
|
||||
topic, key, event::class.simpleName)
|
||||
|
||||
reactiveKafkaTemplate.send(topic, key ?: "", event)
|
||||
.doOnSuccess { result ->
|
||||
val record = result.recordMetadata()
|
||||
logger.debug(
|
||||
"Successfully published event to topic-partition {}-{} with offset {} (key: '{}')",
|
||||
record.topic(), record.partition(), record.offset(), key
|
||||
)
|
||||
}
|
||||
.doOnError { exception ->
|
||||
logger.warn("Failed to publish event to topic '{}' with key '{}' [eventType={}, retryable={}] - will retry if configured: {}",
|
||||
topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
|
||||
}
|
||||
.retryWhen(createRetrySpec(topic, key))
|
||||
.doOnError { exception ->
|
||||
logger.error("Final failure after retries: Failed to publish event to topic '{}' with key '{}'",
|
||||
topic, key, exception)
|
||||
}
|
||||
.awaitSingle()
|
||||
Result.success(Unit)
|
||||
} catch (exception: Throwable) {
|
||||
Result.failure(mapToMessagingError(exception))
|
||||
@@ -51,7 +71,49 @@ class KafkaEventPublisher(
|
||||
|
||||
override suspend fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Result<List<Unit>> {
|
||||
return try {
|
||||
val results = publishEventsReactive(topic, events).collectList().awaitSingle()
|
||||
if (events.isEmpty()) {
|
||||
logger.debug("No events to publish to topic '{}'", topic)
|
||||
return Result.success(emptyList())
|
||||
}
|
||||
|
||||
logger.info("Publishing {} events to topic '{}' using optimized batch processing", events.size, topic)
|
||||
|
||||
val results = Flux.fromIterable(events)
|
||||
.index() // Add index for progress tracking
|
||||
.flatMap({ indexedEventPair ->
|
||||
val index = indexedEventPair.t1
|
||||
val eventPair = indexedEventPair.t2
|
||||
val (key, event) = eventPair
|
||||
reactiveKafkaTemplate.send(topic, key ?: "", event)
|
||||
.doOnSuccess { result ->
|
||||
val record = result.recordMetadata()
|
||||
logger.debug("Successfully published event to topic-partition {}-{} with offset {} (key: '{}')",
|
||||
record.topic(), record.partition(), record.offset(), key)
|
||||
if ((index + 1) % BATCH_PROGRESS_LOG_INTERVAL == 0L || index == events.size.toLong() - 1) {
|
||||
logger.info("Batch progress: {}/{} events published to topic '{}'",
|
||||
index + 1, events.size, topic)
|
||||
}
|
||||
}
|
||||
.doOnError { exception ->
|
||||
logger.warn("Failed to publish event {} in batch to topic '{}' with key '{}' [eventType={}, retryable={}] - will retry if configured: {}",
|
||||
index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
|
||||
}
|
||||
.retryWhen(createRetrySpec(topic, key))
|
||||
.map { Unit } // Convert to Mono<Unit> that emits one Unit per successful send
|
||||
.onErrorContinue { error, _ ->
|
||||
logger.error("Error publishing event {} in batch to topic '{}': {}",
|
||||
index + 1, topic, error.message)
|
||||
}
|
||||
}, BATCH_CONCURRENCY_LEVEL) // Controlled concurrency for better resource management
|
||||
.doOnComplete {
|
||||
logger.info("Completed publishing batch of {} events to topic '{}'", events.size, topic)
|
||||
}
|
||||
.doOnError { error ->
|
||||
logger.error("Batch publishing to topic '{}' failed with error: {}", topic, error.message)
|
||||
}
|
||||
.collectList()
|
||||
.awaitSingle()
|
||||
|
||||
Result.success(results)
|
||||
} catch (exception: Throwable) {
|
||||
Result.failure(mapToMessagingError(exception))
|
||||
|
||||
+4
-4
@@ -83,8 +83,8 @@ class KafkaEventConsumerCacheTest {
|
||||
// Test that receiveEvents creates reactive streams without errors
|
||||
// Note: These won't actually connect to Kafka but should create the Flux
|
||||
assertDoesNotThrow {
|
||||
val flux1 = consumer.receiveEvents<TestEvent>("topic1")
|
||||
val flux2 = consumer.receiveEvents<TestEvent>("topic2")
|
||||
val flux1 = consumer.receiveEventsWithResult<TestEvent>("topic1")
|
||||
val flux2 = consumer.receiveEventsWithResult<TestEvent>("topic2")
|
||||
|
||||
// Fluxes should be created (cold streams)
|
||||
assertThat(flux1).isNotNull
|
||||
@@ -96,8 +96,8 @@ class KafkaEventConsumerCacheTest {
|
||||
fun `should create reactive streams for different event types`() {
|
||||
// Test that different event types create different streams
|
||||
assertDoesNotThrow {
|
||||
val flux1 = consumer.receiveEvents<TestEvent>("test-topic")
|
||||
val flux2 = consumer.receiveEvents<AnotherTestEvent>("test-topic")
|
||||
val flux1 = consumer.receiveEventsWithResult<TestEvent>("test-topic")
|
||||
val flux2 = consumer.receiveEventsWithResult<AnotherTestEvent>("test-topic")
|
||||
|
||||
// Both should be created successfully
|
||||
assertThat(flux1).isNotNull
|
||||
|
||||
+1
-1
@@ -121,7 +121,7 @@ class KafkaSecurityTest {
|
||||
|
||||
// Test that reactive streams can be created (they use secure deserializer internally)
|
||||
assertDoesNotThrow {
|
||||
val flux = consumer.receiveEvents<SecureTestEvent>("secure-topic")
|
||||
val flux = consumer.receiveEventsWithResult<SecureTestEvent>("secure-topic")
|
||||
assertThat(flux).isNotNull
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user