* MP-8 OTHER Implementiere JWT-Authentifizierungs-Filter im Gateway * Fix(ci): Update upload-artifact action to v4 * Fix(ci): Add start command for Keycloak and failure logs * Fix(ci): Remove invalid 'command' property from Keycloak service * Fix(ci): Use KC_DEV_MODE env var to start Keycloak * Fix(ci): Keycloak service was removed from GitHub Actions services and replaced with a manual docker run step that starts Keycloak with the start-dev command. * dev(ci): vereinheitliche Keycloak auf 26.4.2; aktiviere Health im CI (MP-8) * Fix(ci): Stabilize Keycloak startup in integration tests via matrix - Add `dev-file` Keycloak variant to matrix for stability fallback. - Improve wait logic and health checks for Keycloak and Postgres. - Unify Keycloak version to 26.4.2 across codebase. - Add log dumps on failure. * Fix(ci): Die betroffene Datei docs/Visionen-Ideen/Infrastruktur-Strategie_DSGVO-Konformität.md endet aktuell mit genau einer leeren Zeile (Zeile 87). Das entspricht der Regel MD047 („Files should end with a single newline character“). Damit ist deine Korrektur korrekt. * Fix(ci): Repository-wide auto-fix for Markdown files was implemented with a GitHub Actions workflow and a local helper script. EditorConfig and markdownlint ignore files were added to ensure consistent formatting. Instructions for using the auto-fix both via GitHub Actions and locally were provided. * fix(gradle): build.gradle.kts jsBrowser testTask disabled * fix(gradle): build.gradle.kts jsBrowser testTask disabled * Fix(ci): Stabilize integration tests with Keycloak matrix build (MP-8) Introduces a matrix strategy (`keycloak_db: [postgres, dev-file]`) in the integration-tests workflow to mitigate flaky Keycloak starts when using the Postgres service container. - Adds a `dev-file` Keycloak variant for stability fallback. - Improves wait logic and health checks for Keycloak/Postgres. - Unifies Keycloak version to 26.4.2 across codebase (Dockerfile, Compose, ADR, README, tests). - Adds log dumps on failure in CI. - Ensures `KC_HEALTH_ENABLED=true` is set. - Updates related documentation (README, Schlachtplan). - Includes broader Docker SSoT cleanup (versions.toml as source, script updates, env file cleanup, validator hardening). This resolves recurring CI failures related to Keycloak startup and ensures required checks for PRs (#15) are reliable, while also improving overall Docker build consistency. * feat(docs, ci): Implement YouTrack SSoT strategy with Dokka sync (MP-8) - Add Dokka multi-module Gradle configuration and KDoc style guide. - Add GitHub Actions workflow (docs-kdoc-sync.yml) and Python script (youtrack-sync-kb.py) to sync Dokka GFM output to YouTrack KB. - Extend front-matter schema (bc, doc_type) and update relevant pages/stubs. - Adapt CI scripts (validate-frontmatter, check-docs-drift, ci-docs link ignore). - Update README.md to reference YouTrack KB. * feat(docs, ci): Implement YouTrack SSoT strategy with Dokka sync (MP-8) - Add Dokka multi-module Gradle configuration and KDoc style guide. - Add GitHub Actions workflow (docs-kdoc-sync.yml) and Python script (youtrack-sync-kb.py) to sync Dokka GFM output to YouTrack KB. - Extend front-matter schema (bc, doc_type) and update relevant pages/stubs. - Adapt CI scripts (validate-frontmatter, check-docs-drift, ci-docs link ignore). - Update README.md to reference YouTrack KB. * Fix(ci): Replace OpenAPI validator with Spectral Replaces the deprecated 'char0n/swagger-editor-validate' action, which failed due to sandbox issues in GitHub Actions, with the modern '@stoplight/spectral-cli'. This ensures robust OpenAPI specification validation without requiring a headless browser environment. The 'generate-api-docs' job now depends on the successful completion of the Spectral validation. Part of resolving CI failures for PR #15 (MP-8). * Fix(ci): Specify spectral:oas ruleset for OpenAPI validation (MP-8) * Fix(ci): Remove explicit ruleset argument for Spectral validation (MP-8) * Fix(ci): Added a .spectral.yaml file to fix Spectral linting errors. Corrected markdown lint issues in two documentation files. Updated README.md with a new guidelines section to fix link validation errors. * Fix(ci): Markdownlint errors were fixed by adding required blank lines. The Guidelines Validation error was resolved by updating the README.md link. The API Documentation Generator workflow was stabilized by updating paths, tasks, and validation steps. * Fix(ci): Alle vier fehlerhaften GitHub-Action-Prüfungen wurden behoben. Fehler in der OpenAPI-Spezifikation, Probleme mit der Markdown-Linting-Analyse und Validierungsfehler bei Querverweisen wurden korrigiert. Die README.md enthält nun alle erforderlichen Links zu den Richtlinien. * Fix(ci): Markdown linting errors in docs/api/README.md were fixed by specifying languages in fenced code blocks. OpenAPI specification errors in documentation.yaml were resolved by correcting example property types to strings. Cross-reference validation errors in README.md were fixed by adding the missing link to project-standards/coding-standards.md. * Fix(ci): Duplicate heading errors in docs/api/members-api.md were fixed. Cross-reference validation errors for docker-architecture.md were resolved. All originally reported issues passed validation successfully. * Fix(ci): The markdown heading levels in docs/api/members-api.md were corrected from h5 to h4 to fix linting errors. The missing cross-reference link from technology-guides/docker/docker-development.md to docker-overview.md was added. These fixes resolved the original validation and linting errors causing the process to fail. * Fix(ci): Duplicate heading warnings in docs/api/members-api.md were resolved. Cross-reference validation for docker-development.md to docker-architecture.md was fixed. A new unrelated warning about docker-production.md was identified but not addressed. * refactor(ci,docs): Simplify CI pipeline and migrate docs to YouTrack SSoT BREAKING CHANGE: Documentation structure radically simplified - Consolidate 9 GitHub Actions workflows into 1 main pipeline (ci-main.yml) - Remove redundant workflows: ci-docs, markdownlint-autofix, guidelines-validation, api-docs - Delete documentation migrated to YouTrack: api/, BCs/, Visionen-Ideen/, reference/, now/, overview/ - Keep only ADRs, C4 diagrams, and essential dev guides in repo - Update README.md with YouTrack KB links - Create new docs/README.md as documentation gateway - Relax markdown-lint config for pragmatic developer experience Kept workflows: - ssot-guard.yml (Docker SSoT validation) - docs-kdoc-sync.yml (KDoc → YouTrack sync) - integration-tests.yml (Integration tests) - deploy-proxmox.yml (Deployment) - youtrack-sync.yml (YouTrack integration) Related: MP-DOCS-001 * refactor(ci,docs): Simplify CI pipeline and migrate docs to YouTrack SSoT BREAKING CHANGE: Documentation structure radically simplified - Consolidate 9 GitHub Actions workflows into 1 main pipeline (ci-main.yml) - Remove redundant workflows: ci-docs, markdownlint-autofix, guidelines-validation, api-docs - Delete documentation migrated to YouTrack: api/, BCs/, Visionen-Ideen/, reference/, now/, overview/ - Keep only ADRs, C4 diagrams, and essential dev guides in repo - Update README.md with YouTrack KB links - Create new docs/README.md as documentation gateway - Relax markdown-lint config for pragmatic developer experience Kept workflows: - ssot-guard.yml (Docker SSoT validation) - docs-kdoc-sync.yml (KDoc → YouTrack sync) - integration-tests.yml (Integration tests) - deploy-proxmox.yml (Deployment) - youtrack-sync.yml (YouTrack integration) Related: MP-DOCS-001 * refactor(ci,docs): README.md und einige andere Dokumentationen überarbeitet. ports-and-urls.md hinzugefügt. Related: MP-DOCS-001 * refactor(ci,docs): Die Markdownlint-Fehler in README.md und docs/README.md wurden behoben, indem die Überschriftenebenen angepasst, überflüssige Satzzeichen am Ende entfernt und die notwendigen Leerzeilen um Überschriften, Listen, Tabellen und Codeblöcke eingefügt wurden. Das problematische Leerzeichen am Ende in docs/README.md wurde ebenfalls entfernt. Die Dateien entsprechen nun den vorgegebenen Markdownlint-Regeln und sollten die CI-Validierung bestehen. Related: MP-DOCS-001 * refactor(ci,docs): Docker guideline cross-references were fixed and normalized to lowercase labels. Validation scripts confirmed zero cross-reference warnings and consistent metadata. Documentation was updated with a changelog and enhanced README navigation. Related: MP-DOCS-001 * refactor(ci,docs): Docker guideline cross-references were fixed and normalized to lowercase labels. Validation scripts confirmed zero cross-reference warnings and consistent metadata. Documentation was updated with a changelog and enhanced README navigation. Related: MP-DOCS-001 * refactor(ci,docs): Dead links in docs/architecture/adr were fixed by updating URLs to stable sources and adding an ignore pattern for a placeholder link. Specific ADR files had their broken links replaced with valid ones. The markdown-link-check GitHub Action is expected to pass with zero dead links now. Related: MP-DOCS-001 * refactor(ci,docs): Links in ADR checked Related: MP-DOCS-001 * refactor(ci,docs): Links in ADR checked Related: MP-DOCS-001 * refactor(ci,docs): Markdown Regeln ausgebessert Related: MP-DOCS-001 * refactor(ci,docs): Markdown Regeln ausgebessert Related: MP-DOCS-001 * refactor(ci,docs): Markdown Regeln ausgebessert Related: MP-DOCS-001 * Chore: Rerun CI checks with updated branch protection rules
28 KiB
Infrastructure/Messaging Module
Überblick
Das Messaging-Modul stellt die Infrastruktur für die asynchrone, reaktive Kommunikation zwischen den Microservices bereit. Es nutzt Apache Kafka als hochperformanten, verteilten Message-Broker und ist entscheidend für die Entkopplung von Services und die Implementierung einer skalierbaren, ereignisgesteuerten Architektur.
Das Modul implementiert moderne Domain-Driven Design (DDD) Prinzipien mit expliziter Fehlerbehandlung über das Result Pattern und bietet sowohl suspending Coroutine-APIs als auch reaktive Stream-APIs für maximale Flexibilität.
Kernfeatures
- 🎯 Result Pattern APIs: Typsichere Fehlerbehandlung ohne Exceptions
- ⚡ Reactive Streams: Hochperformante, nicht-blockierende I/O-Operationen
- 🔄 Intelligent Retry Logic: Differenzierte Retry-Strategien basierend auf Fehlertypen
- 📊 Batch Processing: Optimierte Verarbeitung mehrerer Events mit kontrollierbarer Parallelität
- 🔒 Security Features: Sichere Deserialisierung mit Trusted-Package-Validierung
- 📈 Observability: Umfassendes Logging und Monitoring für Production-Ready-Deployment
- 🧪 Comprehensive Testing: Integration Tests mit Testcontainers und fokussierte Unit Tests
Architektur
Das Modul ist in zwei spezialisierte Komponenten aufgeteilt, um Konfiguration von der Client-Logik zu trennen:
infrastructure/messaging/ ├── messaging-config/ # Stellt die zentrale Kafka-Konfiguration bereit └── messaging-client/ # Stellt wiederverwendbare, reaktive Clients bereit
messaging-config
Dieses Modul zentralisiert die grundlegende Kafka-Konfiguration für das gesamte Projekt.
- Zweck: Definiert Spring-Beans für die
ProducerFactory(Basis für Producer) und eineMapmit Standard-Konfigurationen für Consumer (z.B.bootstrap-servers,group-id, Serializer). - Vorteil: Stellt Konsistenz sicher und vereinfacht die Einrichtung neuer Producer oder Consumer in den Services.
messaging-client
Dieses Modul baut auf der Konfiguration auf und stellt wiederverwendbare High-Level-Komponenten für die Interaktion mit Kafka bereit.
Kern-Komponenten
-
EventPublisherInterface: Definiert moderne APIs für das Publizieren von Domain Events- Moderne APIs:
publishEvent()undpublishEvents()mit Result Pattern - Legacy APIs:
publishEventReactive()undpublishEventsReactive()(deprecated)
- Moderne APIs:
-
EventConsumerInterface: Definiert APIs für das Empfangen von Domain Events- Moderne APIs:
receiveEventsWithResult()mit Flow<Result> für typsichere Fehlerbehandlung - Legacy APIs:
receiveEvents()mit Flux (deprecated)
- Moderne APIs:
-
KafkaEventPublisher: Implementierung des EventPublisher mit umfassendem Feature-Set- Reaktive, nicht-blockierende Kafka-Integration mit
ReactiveKafkaProducerTemplate - Intelligente Retry-Logic mit exponential backoff
- Optimierte Batch-Verarbeitung mit kontrollierbarer Parallelität (10 concurrent operations)
- Comprehensive Logging und Progress-Tracking
- Reaktive, nicht-blockierende Kafka-Integration mit
-
KafkaEventConsumer: Implementierung des EventConsumer mit erweiterten Funktionen- Connection-Pooling zur Wiederverwendung von KafkaReceiver-Instanzen
- Sichere Deserialisierung mit Trusted-Package-Validierung
- Manual Acknowledgment Control für bessere Kontrolle über Commit-Verhalten
- Consumer-Cache-Management für Ressourcenoptimierung
-
MessagingErrorHierarchie: Domain-spezifische Fehlertypen für strukturierte FehlerbehandlungSerializationError,DeserializationError: Serialization-/Deserialization-ProblemeConnectionError: Netzwerk- und VerbindungsfehlerTimeoutError: ZeitüberschreitungenAuthenticationError: Authentifizierungs-/AutorisierungsfehlerTopicConfigurationError: Topic-KonfigurationsproblemeUnexpectedError: Allgemeine unerwartete Fehler
Vorteile
- Typsichere Fehlerbehandlung: Result Pattern eliminiert unerwartete Exceptions
- Flexible APIs: Sowohl moderne Coroutine-basierte als auch Legacy reaktive APIs
- Production-Ready: Umfassendes Retry-Management, Observability und Ressourcenoptimierung
- Domain-Driven Design: Explizite Fehlertypen und saubere Abstraktionen
Verwendung
Ein Microservice, der Nachrichten senden oder empfangen möchte, deklariert eine Abhängigkeit zu :infrastructure:messaging:messaging-client und injiziert die entsprechenden Interfaces.
Moderne API (Result Pattern + Coroutines) - Empfohlen
Beispiel für das Senden einer Nachricht mit typsicherer Fehlerbehandlung:
@Service
class EventNotificationService(
private val eventPublisher: EventPublisher
) {
suspend fun notifyNewEvent(eventDetails: EventDetails): Result<Unit> {
val topic = "new-events-topic"
return eventPublisher.publishEvent(topic, eventDetails.id, eventDetails)
.onFailure { error ->
when (error) {
is MessagingError.SerializationError -> logger.error("Serialization failed for event", error)
is MessagingError.ConnectionError -> logger.warn("Connection issue, will retry later", error)
is MessagingError.TimeoutError -> logger.warn("Timeout publishing event", error)
else -> logger.error("Unexpected error publishing event", error)
}
}
}
suspend fun notifyMultipleEvents(events: List<Pair<String, EventDetails>>): Result<List<Unit>> {
val topic = "batch-events-topic"
return eventPublisher.publishEvents(topic, events)
.onSuccess { results ->
logger.info("Successfully published {} events", results.size)
}
.onFailure { error ->
logger.error("Failed to publish batch events: {}", error.message)
}
}
}
Beispiel für das Empfangen von Nachrichten mit typsicherer Fehlerbehandlung:
@Component
class ModernEventListener(
private val eventConsumer: EventConsumer
) {
private val logger = LoggerFactory.getLogger(ModernEventListener::class.java)
@PostConstruct
fun startListening() {
val topic = "new-events-topic"
// Moderne Result-basierte API mit Flow<Result<T>>
eventConsumer.receiveEventsWithResult(topic, EventDetails::class.java)
.asFlow()
.collect { result ->
result
.onSuccess { event ->
logger.info("Successfully received event with ID: {}", event.id)
processEvent(event)
}
.onFailure { error ->
when (error) {
is MessagingError.DeserializationError -> {
logger.error("Failed to deserialize event from topic '{}': {}", topic, error.message)
// Deserialization-Fehler sind meist permanent - keine weiteren Versuche
handlePoisonMessage(topic, error)
}
is MessagingError.ConnectionError -> {
logger.warn("Connection issue while consuming from topic '{}': {}", topic, error.message)
// Connection-Fehler sind oft temporär - Consumer wird automatisch retries
}
is MessagingError.TimeoutError -> {
logger.warn("Timeout while consuming from topic '{}': {}", topic, error.message)
// Timeout-Fehler können retries bekommen
}
else -> {
logger.error("Unexpected error consuming from topic '{}': {}", topic, error.message, error)
handleUnexpectedError(topic, error)
}
}
}
}
}
private suspend fun processEvent(event: EventDetails) {
// Geschäftslogik zur Verarbeitung des Events
logger.debug("Processing event: {}", event)
}
private suspend fun handlePoisonMessage(topic: String, error: MessagingError.DeserializationError) {
// Poison Messages in separates Topic oder Dead Letter Queue verschieben
logger.warn("Moving poison message from topic '{}' to dead letter queue", topic)
}
private suspend fun handleUnexpectedError(topic: String, error: MessagingError) {
// Monitoring/Alerting für unerwartete Fehler
logger.error("Alerting monitoring system for unexpected error in topic '{}'", topic)
}
}
Beispiel für Consumer mit Coroutines und strukturierter Parallelität:
@Service
class BatchEventProcessor(
private val eventConsumer: EventConsumer
) {
private val logger = LoggerFactory.getLogger(BatchEventProcessor::class.java)
suspend fun processBatchEvents(topic: String): Result<Int> = withContext(Dispatchers.IO) {
try {
var processedCount = 0
var errorCount = 0
eventConsumer.receiveEventsWithResult(topic, EventDetails::class.java)
.asFlow()
.take(100) // Verarbeite maximal 100 Events pro Batch
.collect { result ->
result
.onSuccess { event ->
processedCount++
logger.debug("Processed event {}/{}", processedCount, 100)
}
.onFailure { error ->
errorCount++
logger.warn("Error processing event: {}", error.message)
}
}
logger.info("Batch processing completed: {} processed, {} errors", processedCount, errorCount)
Result.success(processedCount)
} catch (exception: Exception) {
logger.error("Batch processing failed", exception)
Result.failure(MessagingError.UnexpectedError("Batch processing failed: ${exception.message}", exception))
}
}
}
Legacy Reactive API - Wird depreciert
Beispiel für das Senden einer Nachricht (reaktiv, nicht-blockierend):
@Service
class LegacyEventNotificationService(
private val eventPublisher: EventPublisher
) {
@Deprecated("Use suspending publishEvent with Result instead")
fun notifyNewEventReactive(eventDetails: EventDetails) {
val topic = "new-events-topic"
eventPublisher.publishEventReactive(topic, eventDetails.id, eventDetails)
.subscribe(
{ /* onNext: Unit received */ },
{ error -> logger.error("Failed to send message to topic '{}'", topic, error) },
{ /* onComplete: Nichts zu tun */ }
)
// Die Methode kehrt sofort zurück, ohne auf die Bestätigung von Kafka zu warten.
}
}
Beispiel für das Empfangen von Nachrichten (reaktiv):
@Component
class EventListener(
private val eventConsumer: EventConsumer
) {
@PostConstruct
fun listenForEvents() {
val topic = "new-events-topic"
eventConsumer.receiveEvents<EventDetails>(topic)
.subscribe { event ->
logger.info("Received new event with ID: {}", event.id)
// Geschäftslogik zur Verarbeitung des Events...
}
}
}
Konfiguration
Das Messaging-Modul bietet umfassende Konfigurationsmöglichkeiten über die KafkaConfig-Klasse mit automatischer Validierung und optimierten Standardwerten für Production-Ready-Deployments.
Basis-Konfiguration
@Configuration
class MessagingConfiguration {
@Bean
fun kafkaConfig(): KafkaConfig {
return KafkaConfig().apply {
// Kafka-Cluster-Verbindung
bootstrapServers = "kafka-cluster:9092" // oder "localhost:9092" für lokale Entwicklung
// Consumer-Gruppierung
defaultGroupIdPrefix = "myapp-messaging"
// Sicherheitseinstellungen
trustedPackages = "com.mycompany.*,at.mocode.*"
enableSecurityFeatures = true
// Performance-Tuning
connectionPoolSize = 20 // Für hochfrequente Anwendungen
}
}
}
Konfigurationsoptionen
| Parameter | Typ | Standard | Beschreibung |
|---|---|---|---|
bootstrapServers |
String | "localhost:9092" | Kafka-Cluster-Endpunkte. Unterstützt host:port und PROTOCOL://host:port Formate |
defaultGroupIdPrefix |
String | "messaging-client" | Präfix für automatisch generierte Consumer-Gruppen |
trustedPackages |
String | "at.mocode.*" | Comma-separated List von Packages für sichere JSON-Deserialisierung |
enableSecurityFeatures |
Boolean | true | Aktiviert erweiterte Sicherheitsfeatures für Production |
connectionPoolSize |
Int | 10 | Anzahl der gleichzeitigen Kafka-Verbindungen im Pool |
Production-Konfiguration
Für Production-Umgebungen empfohlene Konfiguration:
@Configuration
@Profile("production")
class ProductionMessagingConfiguration {
@Bean
fun kafkaConfig(): KafkaConfig {
return KafkaConfig().apply {
// Hochverfügbares Kafka-Cluster
bootstrapServers = "kafka-01.prod:9092,kafka-02.prod:9092,kafka-03.prod:9092"
// Environment-spezifische Gruppierung
defaultGroupIdPrefix = "${System.getenv("APP_NAME")}-${System.getenv("ENVIRONMENT")}"
// Restriktive Sicherheitseinstellungen
trustedPackages = "com.mycompany.events.*,com.mycompany.domain.*"
enableSecurityFeatures = true
// Optimiert für hohe Parallelität
connectionPoolSize = 50
}
}
}
Umgebungsvariablen
Das Modul unterstützt Konfiguration über Umgebungsvariablen für Container-Deployments:
# Docker/Kubernetes Environment Variables
KAFKA_BOOTSTRAP_SERVERS=kafka-cluster:9092
KAFKA_GROUP_ID_PREFIX=myapp-prod
KAFKA_TRUSTED_PACKAGES=com.mycompany.*
KAFKA_CONNECTION_POOL_SIZE=25
KAFKA_ENABLE_SECURITY=true
Erweiterte Producer-Konfiguration
Die KafkaConfig stellt optimierte Producer-Eigenschaften bereit:
// Automatisch konfigurierte Producer-Eigenschaften:
// - Batch-Verarbeitung (32KB Batches, 5ms Linger)
// - Snappy-Komprimierung für bessere Performance
// - Idempotenz für Exactly-Once-Semantics
// - Intelligente Retry-Logik (3 Versuche, 1s Backoff)
// - 30s Delivery-Timeout mit 10s Request-Timeout
Erweiterte Consumer-Konfiguration
Consumer werden automatisch mit optimierten Einstellungen konfiguriert:
// Automatisch konfigurierte Consumer-Eigenschaften:
// - Manual Commit für bessere Kontrolle
// - Optimierte Fetch-Größen (1KB min, 1MB max)
// - 500ms Max-Wait für Fetch-Operationen
// - Session-Timeout: 30s, Heartbeat: 3s
// - Automatic Offset Reset: earliest
// - Max 500 Records pro Poll
Monitoring und Observability
@Component
class MessagingHealthIndicator(
private val kafkaConfig: KafkaConfig
) : HealthIndicator {
override fun health(): Health {
return try {
// Kafka-Cluster-Konnektivität prüfen
val adminClient = AdminClient.create(kafkaConfig.producerConfigs())
val clusterMetadata = adminClient.describeCluster()
val nodeCount = clusterMetadata.nodes().get(5, TimeUnit.SECONDS).size
Health.up()
.withDetail("kafka.cluster.nodes", nodeCount)
.withDetail("kafka.bootstrap.servers", kafkaConfig.bootstrapServers)
.withDetail("kafka.connection.pool.size", kafkaConfig.connectionPoolSize)
.build()
} catch (exception: Exception) {
Health.down()
.withDetail("kafka.error", exception.message)
.withException(exception)
.build()
}
}
}
Dependency Management
Gradle-Konfiguration
Das Messaging-Modul nutzt eine saubere Modularisierung über Gradle Composite Builds:
// In einem Service-Modul
dependencies {
// Hauptabhängigkeit für messaging functionality
implementation(projects.infrastructure.messaging.messagingClient)
// Die messaging-config wird transitiv eingebunden
// Alle benötigten Kafka-, Spring- und Reactive-Dependencies sind enthalten
}
Verfügbare Module
| Modul | Zweck | Transitive Dependencies |
|---|---|---|
messaging-config |
Zentrale Kafka-Konfiguration | Spring Kafka, Jackson, Kafka Clients |
messaging-client |
High-Level Publisher/Consumer APIs | Reactor Kafka, Kotlinx Coroutines, messaging-config |
Version-Management
// platform/platform-bom/build.gradle.kts - Zentrale Versionsverwaltung
dependencies {
constraints {
api("org.springframework.kafka:spring-kafka:3.1.4")
api("io.projectreactor.kafka:reactor-kafka:1.3.22")
api("org.apache.kafka:kafka-clients:3.6.1")
}
}
Testing-Strategie
Die Zuverlässigkeit des Moduls wird durch eine mehrstufige Teststrategie sichergestellt, die sowohl Unit- als auch Integrationstests umfasst:
Integrationstests (Goldstandard)
- Testcontainers: Der
KafkaIntegrationTeststartet einen echten Apache Kafka Docker-Container, um die Funktionalität unter realen Bedingungen zu validieren - Reaktives Testen: Nutzt Project Reactor's
StepVerifierfür deterministische Tests der reaktiven Streams ohne unzuverlässige Thread.sleep-Aufrufe - Lifecycle Management: Saubere Ressourcenverwaltung über @BeforeEach und @AfterEach für korrekte Freigabe von Producer-Threads
- End-to-End Validierung: Vollständige Publish-Subscribe-Zyklen mit echtem Kafka-Cluster
Unit Tests
KafkaEventPublisherErrorTest: Fokussierte Tests für Fehlerbehandlung mit MockK für isolierte Testszenarien- Fehlerszenarien: Systematische Tests für Serialization-, Authentication-, Connection- und Timeout-Fehler
- Batch-Verarbeitung: Validierung von Batch-Operationen und Empty-Batch-Handling
- Retry-Logic: Tests für intelligente Retry-Mechanismen und Retry-Exhaustion
Sicherheits- und Konfigurationstests
KafkaSecurityTest: Validierung der Sicherheitskonfigurationen und Trusted-Package-VerwaltungKafkaEventConsumerCacheTest: Tests für Consumer-Caching und Ressourcenoptimierung- Konfigurationsvalidierung: Automatische Validierung aller Konfigurationsparameter
Neue Features und Optimierungen (2025)
Domain-Driven Design (DDD) Integration
- Result Pattern APIs: Neue suspending Coroutine-basierte APIs mit typsicherer Fehlerbehandlung über das Result Pattern
- Domain-spezifische Fehlertypen: Umfassende
MessagingErrorHierarchie (SerializationError, ConnectionError, TimeoutError, AuthenticationError, etc.) - Explizite Fehlerbehandlung: Eliminiert unerwartete Exceptions durch strukturierte Fehler-Typen
- Backward Compatibility: Legacy-reactive APIs bleiben verfügbar, sind aber als deprecated markiert
Erweiterte Konfigurationsvalidierung
- Automatische Validierung: Alle Konfigurationsparameter werden automatisch bei der Zuweisung validiert
- Bootstrap-Server-Format: Unterstützt sowohl einfache (
host:port) als auch protokoll-präfixierte Formate (PLAINTEXT://host:port) - Sicherheitsfeatures: Konfigurierbare Sicherheitsfunktionen für Produktionsumgebungen
- Connection-Pool-Management: Konfigurierbare Verbindungspool-Größe für bessere Ressourcenverwaltung
Verbesserte Observability
- Strukturierte Logs: Erweiterte Logging-Informationen mit GroupID, Timestamps und Event-Kontext
- Fehlerkontext: Detaillierte Fehlerinformationen mit Retry-Status und Event-Type-Details
- Performance-Tracking: Bessere Nachvollziehbarkeit von Batch-Operationen und Retry-Versuchen
- Batch-Progress-Logging: Automatisches Progress-Logging bei großen Batch-Operationen (alle 100 Events)
Robustheit-Verbesserungen
- Intelligente Retry-Logik: Differenzierte Retry-Strategien basierend auf Fehlertypen (keine Retries für Serialization/Auth-Fehler)
- Exponential Backoff: Konfigurierbare Retry-Delays mit exponential backoff (1s initial, max 10s backoff)
- Controlled Batch Concurrency: Optimierte Batch-Verarbeitung mit konfigurierbarer Parallelität (Standard: 10 concurrent operations)
- Testcontainer-Kompatibilität: Vollständige Kompatibilität mit Docker-basierten Tests
- Enhanced Error Handling: Verbesserte Fehlerbehandlung mit strukturierten Kontext-Informationen
Test-Suite Optimierung
- Fokussierte Unit Tests: Bereinigte Test-Suite mit Fokus auf essentielle Funktionalität
- MockK Integration: Moderne Mocking-Frameworks für isolierte Unit Tests
- StepVerifier Korrekturen: Korrigierte reaktive Test-Assertions für
Mono<Unit>Rückgabetypen - Reduced Test Complexity: Entfernung unnötiger Performance- und Logging-Tests zugunsten fokussierter Funktionstests
Troubleshooting
Häufige Probleme und Lösungen
1. Connection-Fehler zu Kafka
Problem: MessagingError.ConnectionError beim Senden oder Empfangen von Nachrichten
Mögliche Ursachen und Lösungen:
- Kafka-Cluster-Erreichbarkeit prüfen:
# Teste Verbindung zu Kafka-Cluster
telnet kafka-cluster 9092
# Oder mit nc (netcat)
nc -zv kafka-cluster 9092
- Bootstrap-Server-Konfiguration validieren:
// Multiple Broker für High Availability
kafkaConfig.bootstrapServers = "kafka-01:9092,kafka-02:9092,kafka-03:9092"
- Netzwerk-Timeouts erhöhen für langsame Verbindungen:
// Producer-Konfiguration erweitern
override fun producerConfigs(): Map<String, Any> = super.producerConfigs() + mapOf(
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG to 30000, // 30 Sekunden
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG to 60000 // 1 Minute
)
2. Deserialization-Fehler
Problem: MessagingError.DeserializationError beim Empfangen von Nachrichten
Lösungsansätze:
// 1. Trusted Packages erweitern
kafkaConfig.trustedPackages = "at.mocode.*,com.mycompany.*,java.util.*"
// 2. Event-Schema-Kompatibilität prüfen
@JsonIgnoreProperties(ignoreUnknown = true)
data class EventDetails(
val id: String,
val version: Int = 1 // Schema-Versionierung
)
// 3. Dead Letter Queue für Poison Messages implementieren
private suspend fun handlePoisonMessage(topic: String, error: MessagingError.DeserializationError) {
val dlqTopic = "${topic}.dlq"
eventPublisher.publishEvent(dlqTopic, "error", error.message)
}
3. Performance-Probleme
Problem: Langsame Message-Verarbeitung oder hohe Latenz
Optimierungsstrategien:
// 1. Connection Pool vergrößern
kafkaConfig.connectionPoolSize = 50
// 2. Batch-Verarbeitung nutzen
suspend fun processEventsBatch(events: List<EventDetails>) {
val batchSize = 100
events.chunked(batchSize).forEach { batch ->
// Parallele Verarbeitung pro Batch
batch.map { event ->
async { processEvent(event) }
}.awaitAll()
}
}
// 3. Consumer-Parallelität erhöhen
// Mehrere Consumer-Instanzen mit unterschiedlichen Group-IDs
4. Memory-Leaks bei Consumers
Problem: Speicherverbrauch steigt kontinuierlich
Lösungen:
// 1. Consumer-Cache korrekt verwalten
@PreDestroy
fun cleanup() {
eventConsumer.cleanup() // Cached receivers freigeben
}
// 2. Flow-Streams korrekt beenden
eventConsumer.receiveEventsWithResult(topic, EventDetails::class.java)
.asFlow()
.take(1000) // Streams begrenzen
.catch { exception ->
logger.error("Stream error", exception)
}
.collect { /* process */ }
// 3. Subscription Management
val subscription = eventConsumer.receiveEvents<EventDetails>(topic)
.take(Duration.ofMinutes(5)) // Auto-Timeout nach 5 Minuten
.subscribe()
Best Practices
1. Error Handling
// Strukturierte Fehlerbehandlung mit spezifischen Aktionen
suspend fun handleMessagingError(error: MessagingError, topic: String) {
when (error) {
is MessagingError.SerializationError,
is MessagingError.DeserializationError -> {
// Keine Retries - permanente Fehler
alertMonitoring("Schema compatibility issue", error)
}
is MessagingError.ConnectionError,
is MessagingError.TimeoutError -> {
// Retries möglich - temporäre Fehler
scheduleRetry(error, topic)
}
is MessagingError.AuthenticationError -> {
// Security-Issue - sofortige Attention erforderlich
alertSecurity("Authentication failed", error)
}
else -> {
// Unbekannte Fehler - Investigation erforderlich
alertDevelopment("Unknown messaging error", error)
}
}
}
2. Monitoring und Alerting
// Umfassendes Monitoring einrichten
@Component
class MessagingMetrics( private val meterRegistry: MeterRegistry ) {
private val publishedEvents = Counter.builder("messaging.events.published")
.register(meterRegistry)
private val consumedEvents = Counter.builder("messaging.events.consumed")
.register(meterRegistry)
private val errorCounter = Counter.builder("messaging.errors")
.tag("type", "unknown")
.register(meterRegistry)
fun recordPublishedEvent(topic: String) {
publishedEvents.increment(Tags.of("topic", topic))
}
fun recordError(error: MessagingError, topic: String) {
errorCounter.increment(
Tags.of(
"error.type", error.javaClass.simpleName,
"topic", topic
)
)
}
}
3. Testing von Messaging-Code
// Integration Test mit Testcontainers
@TestMethodOrder(OrderAnnotation::class)
class MessagingIntegrationTest {
companion object {
@Container
val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))
}
@Test
@Order(1)
fun `should publish and consume events successfully`() = runTest {
// Given
val topic = "test-topic"
val event = EventDetails("test-id", "test-data")
// When
val publishResult = eventPublisher.publishEvent(topic, event.id, event)
val consumedEvents = mutableListOf<Result<EventDetails>>()
eventConsumer.receiveEventsWithResult(topic, EventDetails::class.java)
.asFlow()
.take(1)
.collect { result -> consumedEvents.add(result) }
// Then
publishResult.shouldBeSuccess()
consumedEvents.shouldHaveSize(1)
consumedEvents.first().getOrNull()?.id shouldBe event.id
}
}
Häufig gestellte Fragen (FAQ)
Q: Wie unterscheidet sich die moderne API von der Legacy-API?
A: Die moderne API nutzt das Result Pattern für explizite Fehlerbehandlung und Kotlin Coroutines für bessere Performance. Legacy APIs verwenden reaktive Streams mit Exception-basierter Fehlerbehandlung.
Q: Wann sollte ich Batch-Verarbeitung verwenden?
A: Batch-Verarbeitung ist empfohlen bei:
- Mehr als 10 Events pro Sekunde
- Hoher Netzwerk-Latenz zum Kafka-Cluster
- Events, die zusammen verarbeitet werden können
Q: Wie handle ich Backpressure bei hohem Event-Durchsatz?
A: Nutzen Sie die eingebauten Flow-Operatoren:
eventConsumer.receiveEventsWithResult(topic, EventType::class.java)
.asFlow()
.buffer(1000) // Puffering für Backpressure-Handling
.flowOn(Dispatchers.IO) // Separater Dispatcher
.collect { /* process */ }
Letzte Aktualisierung: 15. August 2025
Aktualisierungen (September 2025)
- ReactiveKafkaConfig: Der Bean kafkaConfig() ist jetzt mit @ConditionalOnMissingBean annotiert. Dadurch wird kein zweiter KafkaConfig-Bean erzeugt, wenn bereits extern einer bereitgestellt wird. Dies verhindert Bean-Kollisionen und erleichtert Überschreibungen in Services/Tests.
- Legacy Consumer API: Die reifizierte Extension receiveEvents(topic) wirft bei Fehlern nicht mehr, sondern filtert Fehl-Results heraus und protokolliert sie. Das hält den Flux lebendig und ist robuster. Die moderne, empfohlene Methode bleibt receiveEventsWithResult(topic): Flow<Result>.
- Dokumentation: Diese Hinweise wurden ergänzt. Module bleiben ansonsten unverändert und production-ready.