optimierungen monitoring-Modul und messaging-Modul

This commit is contained in:
stefan
2025-09-04 11:20:40 +02:00
parent 2eefe24a4f
commit a33dd2c28a
11 changed files with 159 additions and 119 deletions
@@ -3,59 +3,68 @@ package at.mocode.infrastructure.messaging.client
import reactor.core.publisher.Flux
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.reactive.asPublisher
import org.slf4j.LoggerFactory
/**
* A generic interface for consuming events from a message broker.
* Generische Schnittstelle zum Konsumieren von Events aus einem Message-Broker.
*
* Follows DDD principles with explicit error handling using domain-specific error types.
* Provides both Result-based methods and reactive streams for flexibility.
* Folgt DDD-Prinzipien mit expliziter Fehlerbehandlung über domänenspezifische Fehlertypen.
* Bietet sowohl Result-basierte Methoden als auch reaktive Streams für Flexibilität.
*/
interface EventConsumer {
/**
* Receives events from the specified topic with explicit error handling.
* Empfängt Events vom angegebenen Topic mit expliziter Fehlerbehandlung.
*
* @param T The expected type of the event payload
* @param topic The topic to subscribe to
* @param eventType The class type of events to consume
* @return Flow<Result<T>> where each Result contains either a successful event or MessagingError
* @param T Erwarteter Typ der Event-Payload
* @param topic Das zu abonnierende Topic
* @param eventType Der Klassen-Typ der zu konsumierenden Events
* @return Flow<Result<T>> wobei jedes Result entweder ein erfolgreiches Event oder einen MessagingError enthält
*/
fun <T : Any> receiveEventsWithResult(topic: String, eventType: Class<T>): Flow<Result<T>>
/**
* Legacy reactive method for receiving events.
* Legacy reaktive Methode zum Empfangen von Events.
*
* This method returns a cold Flux, meaning that the consumer will only start
* listening for messages once the Flux is subscribed to.
* Diese Methode liefert einen "kalten" Flux, d. h. der Consumer beginnt erst
* nach Subscription mit dem Empfang von Nachrichten.
*
* @param T The expected type of the event payload.
* @param topic The topic to subscribe to.
* @return A reactive stream (Flux) of events of type T.
* @param T Erwarteter Typ der Event-Payload.
* @param topic Das zu abonnierende Topic.
* @return Ein reaktiver Stream (Flux) von Events des Typs T.
*/
@Deprecated("Use receiveEventsWithResult with Flow<Result<T>> instead", ReplaceWith("receiveEventsWithResult(topic, eventType)"))
fun <T : Any> receiveEvents(topic: String, eventType: Class<T>): Flux<T>
}
/**
* Kotlin-idiomatic extension function for `receiveEventsWithResult` using reified types.
* Kotlin-idiomatische Extension-Funktion für `receiveEventsWithResult` mit reified Typen.
*
* Example: `consumer.receiveEventsWithResult<MyEvent>("my-topic").collect { result -> ... }`
* Beispiel: `consumer.receiveEventsWithResult<MyEvent>("my-topic").collect { result -> ... }`
*/
inline fun <reified T : Any> EventConsumer.receiveEventsWithResult(topic: String): Flow<Result<T>> {
return this.receiveEventsWithResult(topic, T::class.java)
}
/**
* Kotlin-idiomatic extension function for `receiveEvents` using reified types.
* Kotlin-idiomatische Extension-Funktion für `receiveEvents` mit reified Typen.
*
* Example: `consumer.receiveEvents<MyEvent>("my-topic").subscribe { ... }`
* Beispiel: `consumer.receiveEvents<MyEvent>("my-topic").subscribe { ... }`
*/
@Deprecated("Use receiveEventsWithResult with Flow<Result<T>> instead", ReplaceWith("receiveEventsWithResult<T>(topic)"))
inline fun <reified T : Any> EventConsumer.receiveEvents(topic: String): Flux<T> {
// Convert Flow<Result<T>> to Flux<T> for backward compatibility
// New behavior: emit only successful events; log failures instead of throwing to keep the stream alive
val logger = LoggerFactory.getLogger("EventConsumerExtensions")
return this.receiveEventsWithResult<T>(topic)
.map { result: Result<T> -> result.getOrThrow() }
.mapNotNull { result: Result<T> ->
result.getOrElse {
logger.warn("Dropping failed event in legacy receiveEvents: {}", it.message)
null
}
}
.asPublisher()
.let { Flux.from(it) }
}
@@ -4,35 +4,35 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
/**
* Interface for publishing domain events to message broker.
* Schnittstelle zum Publizieren von Domain-Events in den Message-Broker.
*
* Follows DDD principles with explicit error handling using domain-specific error types.
* All operations use the Result pattern for type-safe error handling as required by guidelines.
* Folgt DDD-Prinzipien mit expliziter Fehlerbehandlung über domänenspezifische Fehlertypen.
* Alle Operationen verwenden das Result-Pattern für typsichere Fehlerbehandlung.
*/
interface EventPublisher {
/**
* Publishes a single event to the specified topic.
* Veröffentlicht ein einzelnes Event in das angegebene Topic.
*
* @param topic The Kafka topic to publish to
* @param key Optional message key for partitioning
* @param event The domain event to publish
* @return Result<Unit> indicating success or MessagingError exception for specific failure reason
* @param topic Das Kafka-Topic
* @param key Optionaler Schlüssel für Partitionierung
* @param event Das zu veröffentlichende Domain-Event
* @return Result<Unit> bei Erfolg oder MessagingError bei spezifischem Fehler
*/
suspend fun publishEvent(topic: String, key: String? = null, event: Any): Result<Unit>
/**
* Publishes multiple events to the specified topic in batch.
* Veröffentlicht mehrere Events als Batch in das angegebene Topic.
*
* @param topic The Kafka topic to publish to
* @param events List of key-event pairs to publish
* @return Result<List<Unit>> with success indicators or MessagingError exception for failure reason
* @param topic Das Kafka-Topic
* @param events Liste aus (Key, Event)-Paaren
* @return Result<List<Unit>> bei Erfolg oder MessagingError bei Fehlern
*/
suspend fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Result<List<Unit>>
/**
* Legacy reactive methods for backward compatibility.
* These will be deprecated in favor of the Result-based methods above.
* Legacy reaktive Methoden für Abwärtskompatibilität.
* Diese werden zugunsten der Result-basierten Methoden mittelfristig entfernt.
*/
@Deprecated("Use suspending publishEvent with Result instead", ReplaceWith("publishEvent(topic, key, event)"))
fun publishEventReactive(topic: String, key: String? = null, event: Any): Mono<Unit>
@@ -15,8 +15,8 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap
/**
* A reactive, non-blocking Kafka implementation of the EventConsumer interface
* with optimized connection pooling, security, and error handling.
* Reaktive, nicht-blockierende Kafka-Implementierung des EventConsumer-Interfaces
* mit optimiertem Connection-Pooling, Sicherheit und Fehlerbehandlung.
*/
@Component
class KafkaEventConsumer(
@@ -10,11 +10,11 @@ import reactor.util.retry.Retry
import java.time.Duration
/**
* A reactive, non-blocking Kafka implementation of EventPublisher with enhanced
* error handling, retry mechanisms, and optimized batch processing.
* Reaktive, nicht-blockierende Kafka-Implementierung von EventPublisher mit erweiterter
* Fehlerbehandlung, Retry-Mechanismen und optimierter Batch-Verarbeitung.
*
* Implements both Result-based methods (preferred) and reactive methods (legacy).
* Follows DDD principles with explicit error handling using domain-specific error types.
* Implementiert sowohl Result-basierte Methoden (präferiert) als auch reaktive Legacy-Methoden.
* Folgt DDD-Prinzipien mit expliziter Fehlerbehandlung über domänenspezifische Fehlertypen.
*/
@Component
class KafkaEventPublisher(
@@ -24,19 +24,19 @@ class KafkaEventPublisher(
private val logger = LoggerFactory.getLogger(KafkaEventPublisher::class.java)
companion object {
/** Maximum number of retry attempts for failed message publishing operations */
/** Maximale Anzahl an Retry-Versuchen für fehlgeschlagene Publish-Operationen */
private const val MAX_RETRY_ATTEMPTS = 3L
/** Initial delay in seconds between retry attempts */
/** Initiale Verzögerung in Sekunden zwischen Retry-Versuchen */
private const val RETRY_DELAY_SECONDS = 1L
/** Maximum backoff delay in seconds for exponential backoff retry strategy */
/** Maximale Backoff-Verzögerung in Sekunden für die exponentielle Retry-Strategie */
private const val MAX_BACKOFF_SECONDS = 10L
/** Default concurrency level for batch processing operations */
/** Standard-Parallelität für Batch-Operationen */
private const val BATCH_CONCURRENCY_LEVEL = 10
/** Progress logging interval for batch operations (every N events) */
/** Fortschritts-Logging-Intervall für Batch-Operationen (alle N Events) */
private const val BATCH_PROGRESS_LOG_INTERVAL = 100
}
@@ -1,8 +1,8 @@
package at.mocode.infrastructure.messaging.client
/**
* Domain-specific error types for messaging operations.
* Follows the DDD guidelines for explicit error handling using the Result pattern.
* Domänenspezifische Fehlertypen für Messaging-Operationen.
* Folgt den DDD-Richtlinien mit expliziter Fehlerbehandlung über das Result-Pattern.
*/
sealed class MessagingError(
val code: String,
@@ -11,7 +11,7 @@ sealed class MessagingError(
) : Exception(message, cause) {
/**
* Error when event publishing fails due to serialization issues.
* Fehler beim Veröffentlichen aufgrund von Serialisierungsproblemen.
*/
data class SerializationError(
override val message: String,
@@ -19,7 +19,7 @@ sealed class MessagingError(
) : MessagingError("MESSAGING_SERIALIZATION_ERROR", message, cause)
/**
* Error when event publishing fails due to connection issues.
* Fehler beim Veröffentlichen aufgrund von Verbindungsproblemen.
*/
data class ConnectionError(
override val message: String,
@@ -27,7 +27,7 @@ sealed class MessagingError(
) : MessagingError("MESSAGING_CONNECTION_ERROR", message, cause)
/**
* Error when event publishing fails due to timeout.
* Fehler beim Veröffentlichen aufgrund von Zeitüberschreitung.
*/
data class TimeoutError(
override val message: String,
@@ -35,7 +35,7 @@ sealed class MessagingError(
) : MessagingError("MESSAGING_TIMEOUT_ERROR", message, cause)
/**
* Error when event publishing fails due to authentication/authorization issues.
* Fehler aufgrund von Authentifizierungs-/Autorisierungsproblemen.
*/
data class AuthenticationError(
override val message: String,
@@ -43,7 +43,7 @@ sealed class MessagingError(
) : MessagingError("MESSAGING_AUTHENTICATION_ERROR", message, cause)
/**
* Error when event publishing fails due to topic configuration issues.
* Fehler aufgrund von Topic-Konfigurationsproblemen.
*/
data class TopicConfigurationError(
override val message: String,
@@ -51,7 +51,7 @@ sealed class MessagingError(
) : MessagingError("MESSAGING_TOPIC_CONFIGURATION_ERROR", message, cause)
/**
* Error when event consumption fails due to deserialization issues.
* Fehler beim Empfangen aufgrund von Deserialisierungsproblemen.
*/
data class DeserializationError(
override val message: String,
@@ -59,7 +59,7 @@ sealed class MessagingError(
) : MessagingError("MESSAGING_DESERIALIZATION_ERROR", message, cause)
/**
* Generic messaging error for unexpected failures.
* Generischer Messaging-Fehler für unerwartete Ausfälle.
*/
data class UnexpectedError(
override val message: String,
@@ -2,6 +2,7 @@ package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
@@ -10,7 +11,7 @@ import reactor.kafka.sender.SenderOptions
import java.time.Duration
/**
* Spring Configuration for reactive Kafka components with optimized settings.
* Spring-Konfiguration für reaktive Kafka-Komponenten mit optimierten Einstellungen.
*/
@Configuration
class ReactiveKafkaConfig(
@@ -20,8 +21,8 @@ class ReactiveKafkaConfig(
private val logger = LoggerFactory.getLogger(ReactiveKafkaConfig::class.java)
/**
* Creates a Spring Bean for the optimized ReactiveKafkaProducerTemplate.
* This template includes enhanced error handling, monitoring, and performance tuning.
* Erstellt einen Spring-Bean für das optimierte ReactiveKafkaProducerTemplate.
* Dieses Template beinhaltet erweiterte Fehlerbehandlung, Monitoring und Performance-Tuning.
*/
@Bean
fun reactiveKafkaProducerTemplate(): ReactiveKafkaProducerTemplate<String, Any> {
@@ -45,10 +46,11 @@ class ReactiveKafkaConfig(
}
/**
* Creates a KafkaConfig bean if not already provided.
* This allows for external configuration override while providing sensible defaults.
* Erstellt einen KafkaConfig-Bean, falls nicht bereits vorhanden.
* Ermöglicht externe Konfigurationsüberschreibung bei gleichzeitigen sinnvollen Defaults.
*/
@Bean
@ConditionalOnMissingBean(KafkaConfig::class)
fun kafkaConfig(): KafkaConfig {
return KafkaConfig().apply {
logger.info("Initializing KafkaConfig with bootstrap servers: {}", bootstrapServers)