refactoring(infra-messaging)

This commit is contained in:
2025-08-15 22:35:13 +02:00
parent d2172229b1
commit 69ca3faf91
2 changed files with 584 additions and 12 deletions
@@ -33,9 +33,12 @@ class KafkaEventConsumer(
return receiveEvents(topic, eventType)
.map<Result<T>> { event -> Result.success(event) }
.onErrorContinue { error, _ ->
.onErrorResume { exception ->
logger.warn("Error occurred while consuming events from topic '{}' for event type '{}': {}",
topic, eventType.simpleName, error.message)
topic, eventType.simpleName, exception.message)
// Map exception to appropriate MessagingError and return as Result.failure
val messagingError = mapToMessagingError(exception)
reactor.core.publisher.Mono.just(Result.failure<T>(messagingError))
}
.doOnError { exception ->
logger.error("Fatal error in consumer stream for topic '{}' and event type '{}': {}",
@@ -121,6 +124,32 @@ class KafkaEventConsumer(
return KafkaReceiver.create(receiverOptions)
}
/**
* Maps generic exceptions to domain-specific MessagingError types.
* Consumer-focused error mapping with emphasis on deserialization errors.
*/
private fun mapToMessagingError(exception: Throwable): MessagingError {
return when {
exception.message?.contains("deserializ", ignoreCase = true) == true ||
exception.message?.contains("parse", ignoreCase = true) == true ||
exception.message?.contains("json", ignoreCase = true) == true ->
MessagingError.DeserializationError("Deserialization failed: ${exception.message}", exception)
exception.message?.contains("timeout", ignoreCase = true) == true ||
exception is java.util.concurrent.TimeoutException ->
MessagingError.TimeoutError("Operation timed out: ${exception.message}", exception)
exception.message?.contains("connection", ignoreCase = true) == true ||
exception.message?.contains("network", ignoreCase = true) == true ||
exception is java.net.ConnectException ||
exception is java.io.IOException ->
MessagingError.ConnectionError("Connection failed: ${exception.message}", exception)
exception.message?.contains("auth", ignoreCase = true) == true ->
MessagingError.AuthenticationError("Authentication failed: ${exception.message}", exception)
exception.message?.contains("topic", ignoreCase = true) == true ->
MessagingError.TopicConfigurationError("Topic configuration error: ${exception.message}", exception)
else -> MessagingError.UnexpectedError("Unexpected error: ${exception.message}", exception)
}
}
/**
* Cleanup method to clear cached receivers on application shutdown.
* Reactive receivers will be automatically cleaned up when their streams complete.