optimierungen monitoring-Modul und messaging-Modul

This commit is contained in:
stefan
2025-09-04 11:24:17 +02:00
parent a33dd2c28a
commit 3b40cb9c45
4 changed files with 5 additions and 9 deletions
@@ -625,9 +625,7 @@ suspend fun handleMessagingError(error: MessagingError, topic: String) {
```kotlin ```kotlin
// Umfassendes Monitoring einrichten // Umfassendes Monitoring einrichten
@Component @Component
class MessagingMetrics( class MessagingMetrics( private val meterRegistry: MeterRegistry ) {
private val meterRegistry: MeterRegistry
) {
private val publishedEvents = Counter.builder("messaging.events.published") private val publishedEvents = Counter.builder("messaging.events.published")
.register(meterRegistry) .register(meterRegistry)
@@ -1,11 +1,10 @@
package at.mocode.infrastructure.messaging.client package at.mocode.infrastructure.messaging.client
import reactor.core.publisher.Flux
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.reactive.asPublisher import kotlinx.coroutines.reactive.asPublisher
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
/** /**
* Generische Schnittstelle zum Konsumieren von Events aus einem Message-Broker. * Generische Schnittstelle zum Konsumieren von Events aus einem Message-Broker.
@@ -99,7 +99,7 @@ class KafkaEventPublisher(
index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception) index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
} }
.retryWhen(createRetrySpec(topic, key)) .retryWhen(createRetrySpec(topic, key))
.map { Unit } // Convert to Mono<Unit> that emits one Unit per successful send .map { } // Convert to Mono<Unit> that emits one Unit per successful send
.onErrorContinue { error, _ -> .onErrorContinue { error, _ ->
logger.error("Error publishing event {} in batch to topic '{}': {}", logger.error("Error publishing event {} in batch to topic '{}': {}",
index + 1, topic, error.message) index + 1, topic, error.message)
@@ -142,7 +142,7 @@ class KafkaEventPublisher(
logger.error("Final failure after retries: Failed to publish event to topic '{}' with key '{}'", logger.error("Final failure after retries: Failed to publish event to topic '{}' with key '{}'",
topic, key, exception) topic, key, exception)
} }
.map { Unit } .map { }
} }
@Deprecated("Use publishEvents with Result<List<Unit>> instead.") @Deprecated("Use publishEvents with Result<List<Unit>> instead.")
@@ -175,7 +175,7 @@ class KafkaEventPublisher(
index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception) index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
} }
.retryWhen(createRetrySpec(topic, key)) .retryWhen(createRetrySpec(topic, key))
.map { Unit } // Convert to Mono<Unit> that emits one Unit per successful send .map { } // Convert to Mono<Unit> that emits one Unit per successful send
.onErrorContinue { error, _ -> .onErrorContinue { error, _ ->
logger.error("Error publishing event {} in batch to topic '{}': {}", logger.error("Error publishing event {} in batch to topic '{}': {}",
index + 1, topic, error.message) index + 1, topic, error.message)
@@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import reactor.kafka.sender.SenderOptions import reactor.kafka.sender.SenderOptions
import java.time.Duration import java.time.Duration