From 3b40cb9c4557a2f746eb95a726281867042266c9 Mon Sep 17 00:00:00 2001 From: stefan Date: Thu, 4 Sep 2025 11:24:17 +0200 Subject: [PATCH] optimierungen monitoring-Modul und messaging-Modul --- infrastructure/messaging/README-INFRA-MESSAGING.md | 4 +--- .../mocode/infrastructure/messaging/client/EventConsumer.kt | 3 +-- .../infrastructure/messaging/client/KafkaEventPublisher.kt | 6 +++--- .../infrastructure/messaging/client/ReactiveKafkaConfig.kt | 1 - 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/infrastructure/messaging/README-INFRA-MESSAGING.md b/infrastructure/messaging/README-INFRA-MESSAGING.md index cc4a7019..b7fc3bb8 100644 --- a/infrastructure/messaging/README-INFRA-MESSAGING.md +++ b/infrastructure/messaging/README-INFRA-MESSAGING.md @@ -625,9 +625,7 @@ suspend fun handleMessagingError(error: MessagingError, topic: String) { ```kotlin // Umfassendes Monitoring einrichten @Component -class MessagingMetrics( - private val meterRegistry: MeterRegistry -) { +class MessagingMetrics( private val meterRegistry: MeterRegistry ) { private val publishedEvents = Counter.builder("messaging.events.published") .register(meterRegistry) diff --git a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventConsumer.kt b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventConsumer.kt index 75d15a37..9ee4a057 100644 --- a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventConsumer.kt +++ b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/EventConsumer.kt @@ -1,11 +1,10 @@ package at.mocode.infrastructure.messaging.client -import reactor.core.publisher.Flux import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.reactive.asPublisher import org.slf4j.LoggerFactory +import reactor.core.publisher.Flux /** * Generische Schnittstelle zum Konsumieren von Events aus einem Message-Broker. diff --git a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventPublisher.kt b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventPublisher.kt index 4a241fa9..696d68c7 100644 --- a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventPublisher.kt +++ b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/KafkaEventPublisher.kt @@ -99,7 +99,7 @@ class KafkaEventPublisher( index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception) } .retryWhen(createRetrySpec(topic, key)) - .map { Unit } // Convert to Mono that emits one Unit per successful send + .map { } // Convert to Mono that emits one Unit per successful send .onErrorContinue { error, _ -> logger.error("Error publishing event {} in batch to topic '{}': {}", index + 1, topic, error.message) @@ -142,7 +142,7 @@ class KafkaEventPublisher( logger.error("Final failure after retries: Failed to publish event to topic '{}' with key '{}'", topic, key, exception) } - .map { Unit } + .map { } } @Deprecated("Use publishEvents with Result> instead.") @@ -175,7 +175,7 @@ class KafkaEventPublisher( index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception) } .retryWhen(createRetrySpec(topic, key)) - .map { Unit } // Convert to Mono that emits one Unit per successful send + .map { } // Convert to Mono that emits one Unit per successful send .onErrorContinue { error, _ -> logger.error("Error publishing event {} in batch to topic '{}': {}", index + 1, topic, error.message) diff --git a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/ReactiveKafkaConfig.kt b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/ReactiveKafkaConfig.kt index a176746a..d3259295 100644 --- a/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/ReactiveKafkaConfig.kt +++ b/infrastructure/messaging/messaging-client/src/main/kotlin/at/mocode/infrastructure/messaging/client/ReactiveKafkaConfig.kt @@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate import reactor.kafka.sender.SenderOptions import java.time.Duration