chore(ci): Align GH Workflows with Docker SSoT, new paths; minimal SSoT guard; staticAnalysis (#23)

* chore(MP-21): snapshot pre-refactor state (Epic 1)

* chore(MP-22): scaffold new repo structure, relocate Docker Compose, move frontend/backend modules, update Makefile; add docs mapping and env template

* MP-22 Epic 2: Erfolgreich umgesetzt und verifiziert

* MP-23 Epic 3: Gradle/Build Governance zentralisieren

* MP-23 Epic 3: Gradle/Build Governance zentralisieren

* chore(devops)!: Docker-SSoT (.env) konsolidiert, Compose-Mounts ergänzt, Makefile entfernt

- ENV Single Source of Truth
  - docker/.env.example neu (inkl. REDIS_PASSWORD, Ports, Build-Overrides)
  - config/.env(.example) als DEPRECATED markiert (Verweis auf docker/.env[.example])

- Docker Compose vereinheitlicht (docker/docker-compose.yaml)
  - Postgres: zentralen postgresql.conf mounten (../config/postgres/postgresql.conf)
    und Start mit -c config_file=/etc/postgresql/postgresql.conf
  - Redis: zentralen redis.conf mounten (../config/redis/redis.conf)
    und Start via "redis-server … ${REDIS_PASSWORD:+--requirepass $REDIS_PASSWORD}"
  - Web-Nginx: ../config/nginx/nginx.prod.conf → /etc/nginx/nginx.conf (ro)
  - Monitoring: Prometheus/Grafana nutzen ../config/monitoring/* als SSoT

- Frontend/DI/Network (MP-23 Grundlage)
  - :frontend:core:network Modul mit Koin `apiClient` (Ktor + JSON/Retry/Timeout/Logging)
  - Plattform-Basis-URL-Auflösung (JVM: ENV API_BASE_URL; JS: globalThis.API_BASE_URL / Same-Origin)
  - Web index.html setzt API_BASE_URL (Query `?apiBaseUrl=…` > Same-Origin > Fallback)

- Build/Gradle & Module-Refs
  - settings.gradle.kts: neue Frontend-/Backend-Pfade bereits inkludiert
  - Features/Shell: Abhängigkeiten auf :frontend:shared / :frontend:core:* angepasst
  - Ping-API-Refs auf :backend:services:ping:ping-api vereinheitlicht

- Dockerfiles angepasst
  - backend/infrastructure/gateway/Dockerfile → Tasks/Pfade auf :backend:gateway
  - backend/services/ping/Dockerfile → Tasks/Pfade auf :backend:services:ping:ping-service

- Static Analysis / Guards
  - config/detekt/detekt.yml hinzugefügt
  - Leichter Arch-Guard (Frontend) gegen manuelle Authorization-Header vorbereitet

- Doku
  - docs/ARCHITECTURE.md (Struktur, Mapping, Next Steps) ergänzt
  - docs/adr/README.md angelegt

BREAKING CHANGES:
- Makefile komplett entfernt (bitte direkt `docker compose` verwenden)
- ENV-Quelle ist jetzt docker/.env (statt config/.env oder Root)
- Compose-Datei unter docker/docker-compose.yaml (nicht mehr compose.yaml im Repo-Root)

Verifikation (lokal):
- ENV anlegen: `cp docker/.env.example docker/.env` (Werte anpassen)
- Compose prüfen: `docker compose --env-file docker/.env -f docker/docker-compose.yaml config`
- Infrastruktur: `docker compose --env-file docker/.env -f docker/docker-compose.yaml -p meldestelle up -d postgres redis keycloak web-app`
- Services bauen: `docker compose --env-file docker/.env -f docker/docker-compose.yaml -p meldestelle build api-gateway ping-service --no-cache --progress=plain`

Refs: MP-22 (Epic 2), MP-23 (Epic 3)

* chore(devops)!: Docker-SSoT (.env) konsolidiert, Compose-Mounts ergänzt, Makefile entfernt

- ENV Single Source of Truth
  - docker/.env.example neu (inkl. REDIS_PASSWORD, Ports, Build-Overrides)
  - config/.env(.example) als DEPRECATED markiert (Verweis auf docker/.env[.example])

- Docker Compose vereinheitlicht (docker/docker-compose.yaml)
  - Postgres: zentralen postgresql.conf mounten (../config/postgres/postgresql.conf)
    und Start mit -c config_file=/etc/postgresql/postgresql.conf
  - Redis: zentralen redis.conf mounten (../config/redis/redis.conf)
    und Start via "redis-server … ${REDIS_PASSWORD:+--requirepass $REDIS_PASSWORD}"
  - Web-Nginx: ../config/nginx/nginx.prod.conf → /etc/nginx/nginx.conf (ro)
  - Monitoring: Prometheus/Grafana nutzen ../config/monitoring/* als SSoT

- Frontend/DI/Network (MP-23 Grundlage)
  - :frontend:core:network Modul mit Koin `apiClient` (Ktor + JSON/Retry/Timeout/Logging)
  - Plattform-Basis-URL-Auflösung (JVM: ENV API_BASE_URL; JS: globalThis.API_BASE_URL / Same-Origin)
  - Web index.html setzt API_BASE_URL (Query `?apiBaseUrl=…` > Same-Origin > Fallback)

- Build/Gradle & Module-Refs
  - settings.gradle.kts: neue Frontend-/Backend-Pfade bereits inkludiert
  - Features/Shell: Abhängigkeiten auf :frontend:shared / :frontend:core:* angepasst
  - Ping-API-Refs auf :backend:services:ping:ping-api vereinheitlicht

- Dockerfiles angepasst
  - backend/infrastructure/gateway/Dockerfile → Tasks/Pfade auf :backend:gateway
  - backend/services/ping/Dockerfile → Tasks/Pfade auf :backend:services:ping:ping-service

- Static Analysis / Guards
  - config/detekt/detekt.yml hinzugefügt
  - Leichter Arch-Guard (Frontend) gegen manuelle Authorization-Header vorbereitet

- Doku
  - docs/ARCHITECTURE.md (Struktur, Mapping, Next Steps) ergänzt
  - docs/adr/README.md angelegt

BREAKING CHANGES:
- Makefile komplett entfernt (bitte direkt `docker compose` verwenden)
- ENV-Quelle ist jetzt docker/.env (statt config/.env oder Root)
- Compose-Datei unter docker/docker-compose.yaml (nicht mehr compose.yaml im Repo-Root)

Verifikation (lokal):
- ENV anlegen: `cp docker/.env.example docker/.env` (Werte anpassen)
- Compose prüfen: `docker compose --env-file docker/.env -f docker/docker-compose.yaml config`
- Infrastruktur: `docker compose --env-file docker/.env -f docker/docker-compose.yaml -p meldestelle up -d postgres redis keycloak web-app`
- Services bauen: `docker compose --env-file docker/.env -f docker/docker-compose.yaml -p meldestelle build api-gateway ping-service --no-cache --progress=plain`

Refs: MP-22 (Epic 2), MP-23 (Epic 3)

* chore(devops)!: Docker-SSoT (.env) konsolidiert, Compose-Mounts ergänzt, Makefile entfernt

- ENV Single Source of Truth
  - docker/.env.example neu (inkl. REDIS_PASSWORD, Ports, Build-Overrides)
  - config/.env(.example) als DEPRECATED markiert (Verweis auf docker/.env[.example])

- Docker Compose vereinheitlicht (docker/docker-compose.yaml)
  - Postgres: zentralen postgresql.conf mounten (../config/postgres/postgresql.conf)
    und Start mit -c config_file=/etc/postgresql/postgresql.conf
  - Redis: zentralen redis.conf mounten (../config/redis/redis.conf)
    und Start via "redis-server … ${REDIS_PASSWORD:+--requirepass $REDIS_PASSWORD}"
  - Web-Nginx: ../config/nginx/nginx.prod.conf → /etc/nginx/nginx.conf (ro)
  - Monitoring: Prometheus/Grafana nutzen ../config/monitoring/* als SSoT

- Frontend/DI/Network (MP-23 Grundlage)
  - :frontend:core:network Modul mit Koin `apiClient` (Ktor + JSON/Retry/Timeout/Logging)
  - Plattform-Basis-URL-Auflösung (JVM: ENV API_BASE_URL; JS: globalThis.API_BASE_URL / Same-Origin)
  - Web index.html setzt API_BASE_URL (Query `?apiBaseUrl=…` > Same-Origin > Fallback)

- Build/Gradle & Module-Refs
  - settings.gradle.kts: neue Frontend-/Backend-Pfade bereits inkludiert
  - Features/Shell: Abhängigkeiten auf :frontend:shared / :frontend:core:* angepasst
  - Ping-API-Refs auf :backend:services:ping:ping-api vereinheitlicht

- Dockerfiles angepasst
  - backend/infrastructure/gateway/Dockerfile → Tasks/Pfade auf :backend:gateway
  - backend/services/ping/Dockerfile → Tasks/Pfade auf :backend:services:ping:ping-service

- Static Analysis / Guards
  - config/detekt/detekt.yml hinzugefügt
  - Leichter Arch-Guard (Frontend) gegen manuelle Authorization-Header vorbereitet

- Doku
  - docs/ARCHITECTURE.md (Struktur, Mapping, Next Steps) ergänzt
  - docs/adr/README.md angelegt

BREAKING CHANGES:
- Makefile komplett entfernt (bitte direkt `docker compose` verwenden)
- ENV-Quelle ist jetzt docker/.env (statt config/.env oder Root)
- Compose-Datei unter docker/docker-compose.yaml (nicht mehr compose.yaml im Repo-Root)

Verifikation (lokal):
- ENV anlegen: `cp docker/.env.example docker/.env` (Werte anpassen)
- Compose prüfen: `docker compose --env-file docker/.env -f docker/docker-compose.yaml config`
- Infrastruktur: `docker compose --env-file docker/.env -f docker/docker-compose.yaml -p meldestelle up -d postgres redis keycloak web-app`
- Services bauen: `docker compose --env-file docker/.env -f docker/docker-compose.yaml -p meldestelle build api-gateway ping-service --no-cache --progress=plain`

Refs: MP-22 (Epic 2), MP-23 (Epic 3)

* chore(ci): Workflows an Docker-SSoT & neue Struktur angepasst, minimaler SSoT-Guard

- ssot-guard.yml: Option B (minimal) → `docker compose -f docker/docker-compose.yaml config` als Lint
- integration-tests.yml: `./gradlew staticAnalysis` vor Integrationstests
- docs-kdoc-sync.yml: Dokka-Task Fallback (dokkaGfmAll || dokkaGfm), YouTrack-Sync nur wenn Script vorhanden
- deploy-proxmox.yml: Compose-Pfade auf docker/docker-compose.yaml + `--env-file docker/.env`; Build/Test Schritte vereinheitlicht
- ci-main.yml: SSoT-Skripte per `if: hashFiles(...)` guarded, Compose-Lint Fallback; OpenAPI‑Pfad → backend/gateway; ADR‑Pfade → docs/adr/**; `staticAnalysis` in Build integriert
- youtrack-sync.yml: unverändert (funktional)

Refs: MP-22, MP-23

* chore(ci): Workflows an Docker-SSoT & neue Struktur angepasst, minimaler SSoT-Guard

- ssot-guard.yml: Option B (minimal) → `docker compose -f docker/docker-compose.yaml config` als Lint
- integration-tests.yml: `./gradlew staticAnalysis` vor Integrationstests
- docs-kdoc-sync.yml: Dokka-Task Fallback (dokkaGfmAll || dokkaGfm), YouTrack-Sync nur wenn Script vorhanden
- deploy-proxmox.yml: Compose-Pfade auf docker/docker-compose.yaml + `--env-file docker/.env`; Build/Test Schritte vereinheitlicht
- ci-main.yml: SSoT-Skripte per `if: hashFiles(...)` guarded, Compose-Lint Fallback; OpenAPI‑Pfad → backend/gateway; ADR‑Pfade → docs/adr/**; `staticAnalysis` in Build integriert
- youtrack-sync.yml: unverändert (funktional)

Refs: MP-22, MP-23

* fix(ci): create .env from example before validating compose config

* fix(ci): update ssot-guard filename (.yaml) and sync workflow state

* fixing

* fix(webpack): correct sql.js fallback configuration for webpack 5
This commit is contained in:
StefanMo
2025-12-03 12:03:40 +01:00
committed by GitHub
parent 034892e890
commit 95fe3e0573
365 changed files with 2283 additions and 15142 deletions
@@ -0,0 +1,31 @@
// Dieses Modul stellt High-Level-Clients (Producer/Consumer) für die
// Interaktion mit Apache Kafka bereit. Es baut auf der `messaging-config` auf.
plugins {
alias(libs.plugins.kotlinJvm)
alias(libs.plugins.kotlinSpring)
alias(libs.plugins.spring.boot)
alias(libs.plugins.spring.dependencyManagement)
}
// Deaktiviert die Erstellung eines ausführbaren Jars für dieses Bibliotheks-Modul.
tasks.bootJar {
enabled = false
}
// Stellt sicher, dass stattdessen ein reguläres Jar gebaut wird
tasks.jar {
enabled = true
}
dependencies {
// Stellt sicher, dass alle Versionen aus der zentralen BOM kommen.
implementation(platform(projects.platform.platformBom))
// Stellt gemeinsame Abhängigkeiten bereit.
implementation(projects.platform.platformDependencies)
// Baut auf der zentralen Kafka-Konfiguration auf und erbt deren Abhängigkeiten.
implementation(projects.backend.infrastructure.messaging.messagingConfig)
// Fügt die reaktive Kafka-Implementierung hinzu (Project Reactor).
implementation(libs.reactor.kafka)
// Stellt alle Test-Abhängigkeiten gebündelt bereit.
testImplementation(projects.platform.platformTesting)
}
@@ -0,0 +1,69 @@
package at.mocode.infrastructure.messaging.client
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.reactive.asPublisher
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
/**
* Generische Schnittstelle zum Konsumieren von Events aus einem Message-Broker.
*
* Folgt DDD-Prinzipien mit expliziter Fehlerbehandlung über domänenspezifische Fehlertypen.
* Bietet sowohl Result-basierte Methoden als auch reaktive Streams für Flexibilität.
*/
interface EventConsumer {
/**
* Empfängt Events vom angegebenen Topic mit expliziter Fehlerbehandlung.
*
* @param T Erwarteter Typ der Event-Payload
* @param topic Das zu abonnierende Topic
* @param eventType Der Klassen-Typ der zu konsumierenden Events
* @return Flow<Result<T>> wobei jedes Result entweder ein erfolgreiches Event oder einen MessagingError enthält
*/
fun <T : Any> receiveEventsWithResult(topic: String, eventType: Class<T>): Flow<Result<T>>
/**
* Legacy reaktive Methode zum Empfangen von Events.
*
* Diese Methode liefert einen "kalten" Flux, d. h. der Consumer beginnt erst
* nach Subscription mit dem Empfang von Nachrichten.
*
* @param T Erwarteter Typ der Event-Payload.
* @param topic Das zu abonnierende Topic.
* @return Ein reaktiver Stream (Flux) von Events des Typs T.
*/
@Deprecated("Use receiveEventsWithResult with Flow<Result<T>> instead", ReplaceWith("receiveEventsWithResult(topic, eventType)"))
fun <T : Any> receiveEvents(topic: String, eventType: Class<T>): Flux<T>
}
/**
* Kotlin-idiomatische Extension-Funktion für `receiveEventsWithResult` mit reified Typen.
*
* Beispiel: `consumer.receiveEventsWithResult<MyEvent>("my-topic").collect { result -> ... }`
*/
inline fun <reified T : Any> EventConsumer.receiveEventsWithResult(topic: String): Flow<Result<T>> {
return this.receiveEventsWithResult(topic, T::class.java)
}
/**
* Kotlin-idiomatische Extension-Funktion für `receiveEvents` mit reified Typen.
*
* Beispiel: `consumer.receiveEvents<MyEvent>("my-topic").subscribe { ... }`
*/
@Deprecated("Use receiveEventsWithResult with Flow<Result<T>> instead", ReplaceWith("receiveEventsWithResult<T>(topic)"))
inline fun <reified T : Any> EventConsumer.receiveEvents(topic: String): Flux<T> {
// Convert Flow<Result<T>> to Flux<T> for backward compatibility
// New behavior: emit only successful events; log failures instead of throwing to keep the stream alive
val logger = LoggerFactory.getLogger("EventConsumerExtensions")
return this.receiveEventsWithResult<T>(topic)
.mapNotNull { result: Result<T> ->
result.getOrElse {
logger.warn("Dropping failed event in legacy receiveEvents: {}", it.message)
null
}
}
.asPublisher()
.let { Flux.from(it) }
}
@@ -0,0 +1,42 @@
package at.mocode.infrastructure.messaging.client
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
/**
* Schnittstelle zum Publizieren von Domain-Events in den Message-Broker.
*
* Folgt DDD-Prinzipien mit expliziter Fehlerbehandlung über domänenspezifische Fehlertypen.
* Alle Operationen verwenden das Result-Pattern für typsichere Fehlerbehandlung.
*/
interface EventPublisher {
/**
* Veröffentlicht ein einzelnes Event in das angegebene Topic.
*
* @param topic Das Kafka-Topic
* @param key Optionaler Schlüssel für Partitionierung
* @param event Das zu veröffentlichende Domain-Event
* @return Result<Unit> bei Erfolg oder MessagingError bei spezifischem Fehler
*/
suspend fun publishEvent(topic: String, key: String? = null, event: Any): Result<Unit>
/**
* Veröffentlicht mehrere Events als Batch in das angegebene Topic.
*
* @param topic Das Kafka-Topic
* @param events Liste aus (Key, Event)-Paaren
* @return Result<List<Unit>> bei Erfolg oder MessagingError bei Fehlern
*/
suspend fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Result<List<Unit>>
/**
* Legacy reaktive Methoden für Abwärtskompatibilität.
* Diese werden zugunsten der Result-basierten Methoden mittelfristig entfernt.
*/
@Deprecated("Use suspending publishEvent with Result instead", ReplaceWith("publishEvent(topic, key, event)"))
fun publishEventReactive(topic: String, key: String? = null, event: Any): Mono<Unit>
@Deprecated("Use suspending publishEvents with Result instead", ReplaceWith("publishEvents(topic, events)"))
fun publishEventsReactive(topic: String, events: List<Pair<String?, Any>>): Flux<Unit>
}
@@ -0,0 +1,199 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import org.slf4j.LoggerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import reactor.util.retry.Retry
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
/**
* Reaktive, nicht-blockierende Kafka-Implementierung des EventConsumer-Interfaces
* mit optimiertem Connection-Pooling, Sicherheit und Fehlerbehandlung.
*/
@Component
class KafkaEventConsumer(
private val kafkaConfig: KafkaConfig
) : EventConsumer {
private val logger = LoggerFactory.getLogger(KafkaEventConsumer::class.java)
// Connection pool to reuse KafkaReceiver instances per topic-eventType combination
private val receiverCache = ConcurrentHashMap<String, KafkaReceiver<String, Any>>()
override fun <T : Any> receiveEventsWithResult(topic: String, eventType: Class<T>): Flow<Result<T>> {
logger.info("Setting up Result-based consumer for topic '{}' with event type '{}'", topic, eventType.simpleName)
val cacheKey = "${topic}-${eventType.name}"
val groupId = "${kafkaConfig.defaultGroupIdPrefix}-${topic}-${eventType.simpleName.lowercase()}"
// Get or create a cached receiver for this topic-eventType combination
@Suppress("UNCHECKED_CAST")
val receiver = receiverCache.computeIfAbsent(cacheKey) {
createOptimizedReceiver<T>(topic, eventType) as KafkaReceiver<String, Any>
} as KafkaReceiver<String, T>
return receiver.receive()
.doOnNext { record ->
logger.debug(
"Received message from topic-partition {}-{} with offset {} for event type '{}' [groupId={}, timestamp={}]",
record.topic(), record.partition(), record.offset(), eventType.simpleName,
groupId, record.timestamp()
)
}
.map { record ->
// Manual commit acknowledgment for better control
record.receiverOffset().acknowledge()
record.value()
}
.map<Result<T>> { event -> Result.success(event) }
.onErrorResume { exception ->
logger.warn("Error occurred while consuming events from topic '{}' for event type '{}': {}",
topic, eventType.simpleName, exception.message)
// Map exception to appropriate MessagingError and return as Result.failure
val messagingError = mapToMessagingError(exception)
reactor.core.publisher.Mono.just(Result.failure<T>(messagingError))
}
.retryWhen(
Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.doBeforeRetry { retrySignal ->
logger.warn("Retrying consumer for topic '{}', attempt: {}, error: {}",
topic, retrySignal.totalRetries() + 1, retrySignal.failure().message)
}
.onRetryExhaustedThrow { _, retrySignal ->
logger.error("Consumer retry exhausted for topic '{}' after {} attempts",
topic, retrySignal.totalRetries())
retrySignal.failure()
}
)
.doOnError { exception ->
logger.error("Fatal error in consumer stream for topic '{}' and event type '{}': {}",
topic, eventType.simpleName, exception.message, exception)
}
.asFlow()
}
@Deprecated("Use receiveEventsWithResult with Flow<Result<T>> instead.")
override fun <T : Any> receiveEvents(topic: String, eventType: Class<T>): Flux<T> {
logger.info("Setting up reactive consumer for topic '{}' with event type '{}'", topic, eventType.simpleName)
val cacheKey = "${topic}-${eventType.name}"
val groupId = "${kafkaConfig.defaultGroupIdPrefix}-${topic}-${eventType.simpleName.lowercase()}"
// Get or create a cached receiver for this topic-eventType combination
@Suppress("UNCHECKED_CAST")
val receiver = receiverCache.computeIfAbsent(cacheKey) {
createOptimizedReceiver<T>(topic, eventType) as KafkaReceiver<String, Any>
} as KafkaReceiver<String, T>
return receiver.receive()
.doOnNext { record ->
logger.debug(
"Received message from topic-partition {}-{} with offset {} for event type '{}' [groupId={}, timestamp={}]",
record.topic(), record.partition(), record.offset(), eventType.simpleName,
groupId, record.timestamp()
)
}
.map { record ->
// Manual commit acknowledgment for better control
record.receiverOffset().acknowledge()
record.value()
}
.doOnError { exception ->
logger.error("Error receiving events from topic '{}' for event type '{}' [groupId={}, cacheKey={}]: {}",
topic, eventType.simpleName, groupId, cacheKey, exception.message, exception)
}
.retryWhen(
Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.doBeforeRetry { retrySignal ->
logger.warn("Retrying consumer for topic '{}', attempt: {}, error: {}",
topic, retrySignal.totalRetries() + 1, retrySignal.failure().message)
}
.onRetryExhaustedThrow { _, retrySignal ->
logger.error("Consumer retry exhausted for topic '{}' after {} attempts",
topic, retrySignal.totalRetries())
retrySignal.failure()
}
)
}
/**
* Creates an optimized KafkaReceiver with secure configuration and performance tuning.
*/
private fun <T : Any> createOptimizedReceiver(topic: String, eventType: Class<T>): KafkaReceiver<String, T> {
// Generate unique group ID for this consumer instance
val groupId = "${kafkaConfig.defaultGroupIdPrefix}-${topic}-${eventType.simpleName.lowercase()}"
val consumerConfig = kafkaConfig.consumerConfigs(groupId)
// Create type-safe JSON deserializer with restricted trusted packages
val jsonDeserializer = JsonDeserializer(eventType).apply {
// Use restricted trusted packages instead of wildcard for security
addTrustedPackages(kafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, T>(consumerConfig)
.subscription(Collections.singleton(topic))
.withValueDeserializer(jsonDeserializer)
.addAssignListener { partitions ->
logger.info("Consumer '{}' assigned partitions for topic '{}': {}",
groupId, topic, partitions.map { "${it.topicPartition().topic()}-${it.topicPartition().partition()}" })
}
.addRevokeListener { partitions ->
logger.warn("Consumer '{}' revoked partitions for topic '{}': {}",
groupId, topic, partitions.map { "${it.topicPartition().topic()}-${it.topicPartition().partition()}" })
}
// Enable commit interval for manual acknowledgment control
.commitInterval(Duration.ofSeconds(5))
.commitBatchSize(100)
return KafkaReceiver.create(receiverOptions)
}
/**
* Maps generic exceptions to domain-specific MessagingError types.
* Consumer-focused error mapping with emphasis on deserialization errors.
*/
private fun mapToMessagingError(exception: Throwable): MessagingError {
return when {
exception.message?.contains("deserializ", ignoreCase = true) == true ||
exception.message?.contains("parse", ignoreCase = true) == true ||
exception.message?.contains("json", ignoreCase = true) == true ->
MessagingError.DeserializationError("Deserialization failed: ${exception.message}", exception)
exception.message?.contains("timeout", ignoreCase = true) == true ||
exception is java.util.concurrent.TimeoutException ->
MessagingError.TimeoutError("Operation timed out: ${exception.message}", exception)
exception.message?.contains("connection", ignoreCase = true) == true ||
exception.message?.contains("network", ignoreCase = true) == true ||
exception is java.net.ConnectException ||
exception is java.io.IOException ->
MessagingError.ConnectionError("Connection failed: ${exception.message}", exception)
exception.message?.contains("auth", ignoreCase = true) == true ->
MessagingError.AuthenticationError("Authentication failed: ${exception.message}", exception)
exception.message?.contains("topic", ignoreCase = true) == true ->
MessagingError.TopicConfigurationError("Topic configuration error: ${exception.message}", exception)
else -> MessagingError.UnexpectedError("Unexpected error: ${exception.message}", exception)
}
}
/**
* Cleanup method to clear cached receivers on application shutdown.
* Reactive receivers will be automatically cleaned up when their streams complete.
*/
@jakarta.annotation.PreDestroy
fun cleanup() {
logger.info("Cleaning up Kafka consumer cache...")
val cacheSize = receiverCache.size
receiverCache.clear()
logger.info("Kafka consumer cleanup completed. Cleared {} cached receivers", cacheSize)
}
}
@@ -0,0 +1,253 @@
package at.mocode.infrastructure.messaging.client
import kotlinx.coroutines.reactor.awaitSingle
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.util.retry.Retry
import java.time.Duration
/**
* Reaktive, nicht-blockierende Kafka-Implementierung von EventPublisher mit erweiterter
* Fehlerbehandlung, Retry-Mechanismen und optimierter Batch-Verarbeitung.
*
* Implementiert sowohl Result-basierte Methoden (präferiert) als auch reaktive Legacy-Methoden.
* Folgt DDD-Prinzipien mit expliziter Fehlerbehandlung über domänenspezifische Fehlertypen.
*/
@Component
class KafkaEventPublisher(
private val reactiveKafkaTemplate: ReactiveKafkaProducerTemplate<String, Any>
) : EventPublisher {
private val logger = LoggerFactory.getLogger(KafkaEventPublisher::class.java)
companion object {
/** Maximale Anzahl an Retry-Versuchen für fehlgeschlagene Publish-Operationen */
private const val MAX_RETRY_ATTEMPTS = 3L
/** Initiale Verzögerung in Sekunden zwischen Retry-Versuchen */
private const val RETRY_DELAY_SECONDS = 1L
/** Maximale Backoff-Verzögerung in Sekunden für die exponentielle Retry-Strategie */
private const val MAX_BACKOFF_SECONDS = 10L
/** Standard-Parallelität für Batch-Operationen */
private const val BATCH_CONCURRENCY_LEVEL = 10
/** Fortschritts-Logging-Intervall für Batch-Operationen (alle N Events) */
private const val BATCH_PROGRESS_LOG_INTERVAL = 100
}
override suspend fun publishEvent(topic: String, key: String?, event: Any): Result<Unit> {
return try {
logger.debug("Publishing event to topic '{}' with key '{}', event type: '{}'",
topic, key, event::class.simpleName)
reactiveKafkaTemplate.send(topic, key ?: "", event)
.doOnSuccess { result ->
val record = result.recordMetadata()
logger.debug(
"Successfully published event to topic-partition {}-{} with offset {} (key: '{}')",
record.topic(), record.partition(), record.offset(), key
)
}
.doOnError { exception ->
logger.warn("Failed to publish event to topic '{}' with key '{}' [eventType={}, retryable={}] - will retry if configured: {}",
topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
}
.retryWhen(createRetrySpec(topic, key))
.doOnError { exception ->
logger.error("Final failure after retries: Failed to publish event to topic '{}' with key '{}'",
topic, key, exception)
}
.awaitSingle()
Result.success(Unit)
} catch (exception: Throwable) {
Result.failure(mapToMessagingError(exception))
}
}
override suspend fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Result<List<Unit>> {
return try {
if (events.isEmpty()) {
logger.debug("No events to publish to topic '{}'", topic)
return Result.success(emptyList())
}
logger.info("Publishing {} events to topic '{}' using optimized batch processing", events.size, topic)
val results = Flux.fromIterable(events)
.index() // Add index for progress tracking
.flatMap({ indexedEventPair ->
val index = indexedEventPair.t1
val eventPair = indexedEventPair.t2
val (key, event) = eventPair
reactiveKafkaTemplate.send(topic, key ?: "", event)
.doOnSuccess { result ->
val record = result.recordMetadata()
logger.debug("Successfully published event to topic-partition {}-{} with offset {} (key: '{}')",
record.topic(), record.partition(), record.offset(), key)
if ((index + 1) % BATCH_PROGRESS_LOG_INTERVAL == 0L || index == events.size.toLong() - 1) {
logger.info("Batch progress: {}/{} events published to topic '{}'",
index + 1, events.size, topic)
}
}
.doOnError { exception ->
logger.warn("Failed to publish event {} in batch to topic '{}' with key '{}' [eventType={}, retryable={}] - will retry if configured: {}",
index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
}
.retryWhen(createRetrySpec(topic, key))
.map { } // Convert to Mono<Unit> that emits one Unit per successful send
.onErrorContinue { error, _ ->
logger.error("Error publishing event {} in batch to topic '{}': {}",
index + 1, topic, error.message)
}
}, BATCH_CONCURRENCY_LEVEL) // Controlled concurrency for better resource management
.doOnComplete {
logger.info("Completed publishing batch of {} events to topic '{}'", events.size, topic)
}
.doOnError { error ->
logger.error("Batch publishing to topic '{}' failed with error: {}", topic, error.message)
}
.collectList()
.awaitSingle()
Result.success(results)
} catch (exception: Throwable) {
Result.failure(mapToMessagingError(exception))
}
}
@Deprecated("Use publishEvent with Result<Unit> instead.")
override fun publishEventReactive(topic: String, key: String?, event: Any): Mono<Unit> {
logger.debug("Publishing event to topic '{}' with key '{}', event type: '{}'",
topic, key, event::class.simpleName)
return reactiveKafkaTemplate.send(topic, key ?: "", event)
.doOnSuccess { result ->
val record = result.recordMetadata()
logger.debug(
"Successfully published event to topic-partition {}-{} with offset {} (key: '{}')",
record.topic(), record.partition(), record.offset(), key
)
}
.doOnError { exception ->
logger.warn("Failed to publish event to topic '{}' with key '{}' [eventType={}, retryable={}] - will retry if configured: {}",
topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
}
.retryWhen(createRetrySpec(topic, key))
.doOnError { exception ->
logger.error("Final failure after retries: Failed to publish event to topic '{}' with key '{}'",
topic, key, exception)
}
.map { }
}
@Deprecated("Use publishEvents with Result<List<Unit>> instead.")
override fun publishEventsReactive(topic: String, events: List<Pair<String?, Any>>): Flux<Unit> {
if (events.isEmpty()) {
logger.debug("No events to publish to topic '{}'", topic)
return Flux.empty()
}
logger.info("Publishing {} events to topic '{}' using optimized batch processing", events.size, topic)
return Flux.fromIterable(events)
.index() // Add index for progress tracking
.flatMap({ indexedEventPair ->
val index = indexedEventPair.t1
val eventPair = indexedEventPair.t2
val (key, event) = eventPair
reactiveKafkaTemplate.send(topic, key ?: "", event)
.doOnSuccess { result ->
val record = result.recordMetadata()
logger.debug("Successfully published event to topic-partition {}-{} with offset {} (key: '{}')",
record.topic(), record.partition(), record.offset(), key)
if ((index + 1) % BATCH_PROGRESS_LOG_INTERVAL == 0L || index == events.size.toLong() - 1) {
logger.info("Batch progress: {}/{} events published to topic '{}'",
index + 1, events.size, topic)
}
}
.doOnError { exception ->
logger.warn("Failed to publish event {} in batch to topic '{}' with key '{}' [eventType={}, retryable={}] - will retry if configured: {}",
index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
}
.retryWhen(createRetrySpec(topic, key))
.map { } // Convert to Mono<Unit> that emits one Unit per successful send
.onErrorContinue { error, _ ->
logger.error("Error publishing event {} in batch to topic '{}': {}",
index + 1, topic, error.message)
}
}, BATCH_CONCURRENCY_LEVEL) // Controlled concurrency for better resource management
.doOnComplete {
logger.info("Completed publishing batch of {} events to topic '{}'", events.size, topic)
}
.doOnError { error ->
logger.error("Batch publishing to topic '{}' failed with error: {}", topic, error.message)
}
}
/**
* Creates a retry specification with exponential backoff for robust error handling.
*/
private fun createRetrySpec(topic: String, key: String?): Retry =
Retry.backoff(MAX_RETRY_ATTEMPTS, Duration.ofSeconds(RETRY_DELAY_SECONDS))
.maxBackoff(Duration.ofSeconds(MAX_BACKOFF_SECONDS))
.filter { exception ->
// Only retry on transient errors (not serialization errors, etc.)
isRetryableException(exception)
}
.doBeforeRetry { retrySignal ->
logger.info("Retrying publish to topic '{}' with key '{}', attempt: {}, error: {}",
topic, key, retrySignal.totalRetries() + 1,
retrySignal.failure().message?.take(100))
}
.onRetryExhaustedThrow { _, retrySignal ->
logger.error("Retry exhausted for topic '{}' with key '{}' after {} attempts",
topic, key, retrySignal.totalRetries())
retrySignal.failure()
}
/**
* Maps generic exceptions to domain-specific MessagingError types.
*/
private fun mapToMessagingError(exception: Throwable): MessagingError {
return when {
exception.message?.contains("serializ", ignoreCase = true) == true ->
MessagingError.SerializationError("Serialization failed: ${exception.message}", exception)
exception.message?.contains("timeout", ignoreCase = true) == true ||
exception is java.util.concurrent.TimeoutException ->
MessagingError.TimeoutError("Operation timed out: ${exception.message}", exception)
exception.message?.contains("connection", ignoreCase = true) == true ||
exception.message?.contains("network", ignoreCase = true) == true ||
exception is java.net.ConnectException ||
exception is java.io.IOException ->
MessagingError.ConnectionError("Connection failed: ${exception.message}", exception)
exception.message?.contains("auth", ignoreCase = true) == true ->
MessagingError.AuthenticationError("Authentication failed: ${exception.message}", exception)
exception.message?.contains("topic", ignoreCase = true) == true ->
MessagingError.TopicConfigurationError("Topic configuration error: ${exception.message}", exception)
else -> MessagingError.UnexpectedError("Unexpected error: ${exception.message}", exception)
}
}
/**
* Determines if an exception is retryable based on its type and characteristics.
*/
private fun isRetryableException(exception: Throwable): Boolean {
return when {
exception.message?.contains("timeout", ignoreCase = true) == true -> true
exception.message?.contains("connection", ignoreCase = true) == true -> true
exception.message?.contains("network", ignoreCase = true) == true -> true
exception is java.util.concurrent.TimeoutException -> true
exception is java.net.ConnectException -> true
exception is java.io.IOException -> true
// Don't retry serialization errors or authentication failures
exception.message?.contains("serializ", ignoreCase = true) == true -> false
exception.message?.contains("auth", ignoreCase = true) == true -> false
else -> true // Default to retryable for unknown exceptions
}
}
}
@@ -0,0 +1,68 @@
package at.mocode.infrastructure.messaging.client
/**
* Domänenspezifische Fehlertypen für Messaging-Operationen.
* Folgt den DDD-Richtlinien mit expliziter Fehlerbehandlung über das Result-Pattern.
*/
sealed class MessagingError(
val code: String,
override val message: String,
override val cause: Throwable? = null
) : Exception(message, cause) {
/**
* Fehler beim Veröffentlichen aufgrund von Serialisierungsproblemen.
*/
data class SerializationError(
override val message: String,
override val cause: Throwable? = null
) : MessagingError("MESSAGING_SERIALIZATION_ERROR", message, cause)
/**
* Fehler beim Veröffentlichen aufgrund von Verbindungsproblemen.
*/
data class ConnectionError(
override val message: String,
override val cause: Throwable? = null
) : MessagingError("MESSAGING_CONNECTION_ERROR", message, cause)
/**
* Fehler beim Veröffentlichen aufgrund von Zeitüberschreitung.
*/
data class TimeoutError(
override val message: String,
override val cause: Throwable? = null
) : MessagingError("MESSAGING_TIMEOUT_ERROR", message, cause)
/**
* Fehler aufgrund von Authentifizierungs-/Autorisierungsproblemen.
*/
data class AuthenticationError(
override val message: String,
override val cause: Throwable? = null
) : MessagingError("MESSAGING_AUTHENTICATION_ERROR", message, cause)
/**
* Fehler aufgrund von Topic-Konfigurationsproblemen.
*/
data class TopicConfigurationError(
override val message: String,
override val cause: Throwable? = null
) : MessagingError("MESSAGING_TOPIC_CONFIGURATION_ERROR", message, cause)
/**
* Fehler beim Empfangen aufgrund von Deserialisierungsproblemen.
*/
data class DeserializationError(
override val message: String,
override val cause: Throwable? = null
) : MessagingError("MESSAGING_DESERIALIZATION_ERROR", message, cause)
/**
* Generischer Messaging-Fehler für unerwartete Ausfälle.
*/
data class UnexpectedError(
override val message: String,
override val cause: Throwable? = null
) : MessagingError("MESSAGING_UNEXPECTED_ERROR", message, cause)
}
@@ -0,0 +1,58 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import reactor.kafka.sender.SenderOptions
import java.time.Duration
/**
* Spring-Konfiguration für reaktive Kafka-Komponenten mit optimierten Einstellungen.
*/
@Configuration
class ReactiveKafkaConfig(
private val kafkaConfig: KafkaConfig
) {
private val logger = LoggerFactory.getLogger(ReactiveKafkaConfig::class.java)
/**
* Erstellt einen Spring-Bean für das optimierte ReactiveKafkaProducerTemplate.
* Dieses Template beinhaltet erweiterte Fehlerbehandlung, Monitoring und Performance-Tuning.
*/
@Bean
fun reactiveKafkaProducerTemplate(): ReactiveKafkaProducerTemplate<String, Any> {
logger.info("Creating optimized ReactiveKafkaProducerTemplate with enhanced configuration")
val producerFactory = kafkaConfig.producerFactory()
val props: Map<String, Any> = producerFactory.configurationProperties
val senderOptions = SenderOptions.create<String, Any>(props)
// Enhanced sender options for better performance and reliability
.maxInFlight(1024) // Increase in-flight requests for better throughput
.scheduler(reactor.core.scheduler.Schedulers.boundedElastic()) // Use bounded elastic scheduler
.closeTimeout(Duration.ofSeconds(30)) // Give enough time for graceful shutdown
.stopOnError(false) // Continue processing even if some messages fail
return ReactiveKafkaProducerTemplate(senderOptions).apply {
// Configure additional properties if needed
logger.info("ReactiveKafkaProducerTemplate configured successfully with bootstrap servers: {}",
kafkaConfig.bootstrapServers)
}
}
/**
* Erstellt einen KafkaConfig-Bean, falls nicht bereits vorhanden.
* Ermöglicht externe Konfigurationsüberschreibung bei gleichzeitigen sinnvollen Defaults.
*/
@Bean
@ConditionalOnMissingBean(KafkaConfig::class)
fun kafkaConfig(): KafkaConfig {
return KafkaConfig().apply {
logger.info("Initializing KafkaConfig with bootstrap servers: {}", bootstrapServers)
}
}
}
@@ -0,0 +1,241 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertDoesNotThrow
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaEventConsumerCacheTest {
private lateinit var kafkaConfig: KafkaConfig
private lateinit var consumer: KafkaEventConsumer
@BeforeEach
fun setUp() {
kafkaConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "test-consumer"
trustedPackages = "at.mocode.*"
}
consumer = KafkaEventConsumer(kafkaConfig)
}
@Test
fun `should create consumer successfully with valid configuration`() {
// Test that consumer can be created with different configurations
val customConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "custom-consumer"
trustedPackages = "at.mocode.*,com.example.*"
connectionPoolSize = 5
}
assertDoesNotThrow {
KafkaEventConsumer(customConfig)
}
}
@Test
fun `should create different consumers with different configurations`() {
val config1 = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "consumer1"
trustedPackages = "at.mocode.*"
}
val config2 = KafkaConfig().apply {
bootstrapServers = "localhost:9093"
defaultGroupIdPrefix = "consumer2"
trustedPackages = "com.example.*"
}
val consumer1 = KafkaEventConsumer(config1)
val consumer2 = KafkaEventConsumer(config2)
// Both consumers should be created successfully
assertThat(consumer1).isNotNull
assertThat(consumer2).isNotNull
assertThat(consumer1).isNotSameAs(consumer2)
}
@Test
fun `should handle cleanup gracefully`() {
// Create consumer and call cleanup
val testConsumer = KafkaEventConsumer(kafkaConfig)
// Cleanup should not throw any exceptions
assertDoesNotThrow {
testConsumer.cleanup()
}
// Multiple cleanup calls should also be safe
assertDoesNotThrow {
testConsumer.cleanup()
testConsumer.cleanup()
}
}
@Test
fun `should create reactive streams for different topics`() {
// Test that receiveEvents creates reactive streams without errors
// Note: These won't actually connect to Kafka but should create the Flux
assertDoesNotThrow {
val flux1 = consumer.receiveEventsWithResult<TestEvent>("topic1")
val flux2 = consumer.receiveEventsWithResult<TestEvent>("topic2")
// Fluxes should be created (cold streams)
assertThat(flux1).isNotNull
assertThat(flux2).isNotNull
}
}
@Test
fun `should create reactive streams for different event types`() {
// Test that different event types create different streams
assertDoesNotThrow {
val flux1 = consumer.receiveEventsWithResult<TestEvent>("test-topic")
val flux2 = consumer.receiveEventsWithResult<AnotherTestEvent>("test-topic")
// Both should be created successfully
assertThat(flux1).isNotNull
assertThat(flux2).isNotNull
}
}
@Test
fun `should handle consumer configuration with security features`() {
val secureConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "secure-consumer"
trustedPackages = "at.mocode.*,com.secure.*"
enableSecurityFeatures = true
connectionPoolSize = 15
}
assertDoesNotThrow {
val secureConsumer = KafkaEventConsumer(secureConfig)
assertThat(secureConsumer).isNotNull
// Should be able to create streams
val flow = secureConsumer.receiveEventsWithResult<TestEvent>("secure-topic")
assertThat(flow).isNotNull
}
}
@Test
fun `should validate trusted packages configuration`() {
// Test with various trusted package configurations
val configs = listOf(
"at.mocode.*",
"at.mocode.*,com.example.*",
"java.lang.*,java.util.*,at.mocode.*"
)
configs.forEach { trustedPackages ->
val config = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "validation-consumer"
this.trustedPackages = trustedPackages
}
assertDoesNotThrow {
val testConsumer = KafkaEventConsumer(config)
val flow = testConsumer.receiveEventsWithResult<TestEvent>("validation-topic")
assertThat(flow).isNotNull
}
}
}
@Test
fun `should handle different connection pool sizes`() {
val poolSizes = listOf(1, 5, 10, 20, 50)
poolSizes.forEach { poolSize ->
val config = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "pool-test-consumer"
connectionPoolSize = poolSize
}
assertDoesNotThrow {
val testConsumer = KafkaEventConsumer(config)
assertThat(testConsumer).isNotNull
// Should be able to create reactive streams
val flow = testConsumer.receiveEventsWithResult<TestEvent>("pool-test-topic")
assertThat(flow).isNotNull
}
}
}
@Test
fun `should handle different group ID prefixes`() {
val prefixes = listOf(
"test-consumer",
"production-consumer",
"development.consumer",
"consumer_123"
)
prefixes.forEach { prefix ->
val config = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = prefix
trustedPackages = "at.mocode.*"
}
assertDoesNotThrow {
val testConsumer = KafkaEventConsumer(config)
val flow = testConsumer.receiveEventsWithResult<TestEvent>("prefix-test-topic")
assertThat(flow).isNotNull
}
}
}
@Test
fun `should support extension function for reified types`() {
// Test the Kotlin extension function receiveEventsWithResult<T>()
assertDoesNotThrow {
val flowWithReified = consumer.receiveEventsWithResult<TestEvent>("reified-topic")
val flowWithClass = consumer.receiveEventsWithResult("reified-topic", TestEvent::class.java)
// Both should work and create valid Flow instances
assertThat(flowWithReified).isNotNull
assertThat(flowWithClass).isNotNull
}
}
@Test
fun `should handle concurrent consumer creation`() {
// Test that multiple consumers can be created concurrently
val consumers = (1..10).map { index ->
val config = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "concurrent-consumer-$index"
trustedPackages = "at.mocode.*"
}
KafkaEventConsumer(config)
}
// All consumers should be created successfully
assertThat(consumers).hasSize(10)
consumers.forEach { testConsumer ->
assertThat(testConsumer).isNotNull
// Each should be able to create streams
val flow = testConsumer.receiveEventsWithResult<TestEvent>("concurrent-topic")
assertThat(flow).isNotNull
}
// Clean up all consumers
consumers.forEach { testConsumer ->
assertDoesNotThrow { testConsumer.cleanup() }
}
}
data class TestEvent(val message: String)
data class AnotherTestEvent(val data: String)
}
@@ -0,0 +1,111 @@
package at.mocode.infrastructure.messaging.client
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import reactor.core.publisher.Mono
import reactor.kafka.sender.SenderResult
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaEventPublisherErrorTest {
private lateinit var mockTemplate: ReactiveKafkaProducerTemplate<String, Any>
private lateinit var publisher: KafkaEventPublisher
@BeforeEach
fun setUp() {
mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
publisher = KafkaEventPublisher(mockTemplate)
}
@Test
fun `should publish single event successfully`() = runTest {
val testEvent = TestEvent("data")
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "test-topic"
every { mockRecordMetadata.partition() } returns 0
every { mockRecordMetadata.offset() } returns 0L
every { mockResult.recordMetadata() } returns mockRecordMetadata
every { mockTemplate.send("test-topic", "key", testEvent) } returns Mono.just(mockResult)
val result = publisher.publishEvent("test-topic", "key", testEvent)
assert(result.isSuccess) { "Expected successful result" }
verify(exactly = 1) { mockTemplate.send("test-topic", "key", testEvent) }
}
@Test
fun `should handle serialization errors without retry`() = runTest {
val testEvent = TestEvent("data")
every { mockTemplate.send("test-topic", "key", testEvent) } returns
Mono.error(RuntimeException("Serialization failed"))
val result = publisher.publishEvent("test-topic", "key", testEvent)
assert(result.isFailure) { "Expected failed result" }
assert(result.exceptionOrNull() is MessagingError.SerializationError) { "Expected MessagingError.SerializationError" }
assert(result.exceptionOrNull()?.message?.contains("Serialization failed") == true) { "Expected specific error message" }
verify(exactly = 1) { mockTemplate.send("test-topic", "key", testEvent) }
}
@Test
fun `should handle authentication errors without retry`() = runTest {
val testEvent = TestEvent("data")
every { mockTemplate.send("test-topic", "key", testEvent) } returns
Mono.error(RuntimeException("Authentication failed"))
val result = publisher.publishEvent("test-topic", "key", testEvent)
assert(result.isFailure) { "Expected failed result" }
assert(result.exceptionOrNull() is MessagingError.AuthenticationError) { "Expected MessagingError.AuthenticationError" }
assert(result.exceptionOrNull()?.message?.contains("Authentication failed") == true) { "Expected specific error message" }
verify(exactly = 1) { mockTemplate.send("test-topic", "key", testEvent) }
}
@Test
fun `should handle empty batch gracefully`() = runTest {
val emptyEvents = emptyList<Pair<String?, Any>>()
val result = publisher.publishEvents("test-topic", emptyEvents)
assert(result.isSuccess) { "Expected successful result for empty batch" }
assert(result.getOrNull()?.isEmpty() == true) { "Expected empty result list" }
verify(exactly = 0) { mockTemplate.send(any(), any(), any()) }
}
@Test
fun `should publish batch events successfully`() = runTest {
val events = listOf(
"key1" to TestEvent("message1"),
"key2" to TestEvent("message2")
)
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "test-topic"
every { mockRecordMetadata.partition() } returns 0
every { mockRecordMetadata.offset() } returns 0L
every { mockResult.recordMetadata() } returns mockRecordMetadata
every { mockTemplate.send("test-topic", "key1", any()) } returns Mono.just(mockResult)
every { mockTemplate.send("test-topic", "key2", any()) } returns Mono.just(mockResult)
val result = publisher.publishEvents("test-topic", events)
assert(result.isSuccess) { "Expected successful batch result" }
assert(result.getOrNull()?.size == 2) { "Expected 2 successful operations" }
verify(exactly = 1) { mockTemplate.send("test-topic", "key1", any()) }
verify(exactly = 1) { mockTemplate.send("test-topic", "key2", any()) }
}
data class TestEvent(val message: String)
}
@@ -0,0 +1,291 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import kotlinx.coroutines.test.runTest
import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.kafka.KafkaContainer
import org.testcontainers.utility.DockerImageName
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import reactor.test.StepVerifier
import java.util.*
@Testcontainers
class KafkaIntegrationTest {
companion object {
@Container
private val kafkaContainer = KafkaContainer(
DockerImageName.parse("apache/kafka:3.8.1")
)
}
private lateinit var kafkaEventPublisher: KafkaEventPublisher
private lateinit var producerFactory: DefaultKafkaProducerFactory<String, Any>
private val testTopic = "test-topic-${UUID.randomUUID()}"
@BeforeEach
fun setUp() {
val kafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
}
producerFactory = kafkaConfig.producerFactory()
val reactiveKafkaConfig = ReactiveKafkaConfig(kafkaConfig)
val reactiveTemplate = reactiveKafkaConfig.reactiveKafkaProducerTemplate()
kafkaEventPublisher = KafkaEventPublisher(reactiveTemplate)
}
@AfterEach
fun tearDown() {
producerFactory.destroy()
}
@Test
fun `publishEvent should send a message that can be received`() = runTest {
// Arrange
val testKey = "test-key"
val testEvent = TestEvent("Test Message")
// Use the same KafkaConfig for consistent and secure configuration
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
// For tests, we need to trust the test package
trustedPackages = "at.mocode.*"
}
val consumerProps = testKafkaConfig.consumerConfigs("test-group-${UUID.randomUUID()}")
val jsonValueDeserializer = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, TestEvent>(consumerProps)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonValueDeserializer)
.subscription(listOf(testTopic))
// The Mono that represents the next received event
val receivedEvent = KafkaReceiver.create(receiverOptions)
.receive()
.next() // Take only the first event
.map { it.value() } // Extract the value (our TestEvent instance)
// Execute the send action and verify success
val publishResult = kafkaEventPublisher.publishEvent(testTopic, testKey, testEvent)
assert(publishResult.isSuccess) { "Expected successful publish result" }
// Verify that the message can be received
StepVerifier.create(receivedEvent)
.expectNext(testEvent) // Expect that our test event arrives
.verifyComplete() // Complete the verification
}
@Test
fun `publishEvents should send batch messages that can be received`() = runTest {
// Arrange
val batchSize = 10
val eventBatch = (1..batchSize).map { i ->
"batch-key-$i" to TestEvent("Batch message $i")
}
// Consumer setup
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
trustedPackages = "at.mocode.*"
}
val consumerProps = testKafkaConfig.consumerConfigs("batch-test-group-${UUID.randomUUID()}")
val jsonValueDeserializer = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, TestEvent>(consumerProps)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonValueDeserializer)
.subscription(listOf(testTopic))
// Collect received events
val receivedEvents = KafkaReceiver.create(receiverOptions)
.receive()
.take(batchSize.toLong())
.map { it.value() }
.collectList()
// Send batch and verify success
val publishResult = kafkaEventPublisher.publishEvents(testTopic, eventBatch)
assert(publishResult.isSuccess) { "Expected successful batch publish result" }
assert(publishResult.getOrNull()?.size == batchSize) { "Expected $batchSize successful operations" }
// Verify reception
StepVerifier.create(receivedEvents)
.expectNextMatches { events ->
events.size == batchSize && events.all { it.message.startsWith("Batch message") }
}
.verifyComplete()
}
@Test
fun `should handle multiple consumers on same topic`() = runTest {
val testEvent = TestEvent("Multi-consumer message")
val testKey = "multi-consumer-key"
// Setup two consumers with different group IDs
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
trustedPackages = "at.mocode.*"
}
val consumer1Props = testKafkaConfig.consumerConfigs("consumer-group-1-${UUID.randomUUID()}")
val consumer2Props = testKafkaConfig.consumerConfigs("consumer-group-2-${UUID.randomUUID()}")
val jsonDeserializer1 = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val jsonDeserializer2 = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions1 = ReceiverOptions.create<String, TestEvent>(consumer1Props)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonDeserializer1)
.subscription(listOf(testTopic))
val receiverOptions2 = ReceiverOptions.create<String, TestEvent>(consumer2Props)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonDeserializer2)
.subscription(listOf(testTopic))
val consumer1Event = KafkaReceiver.create(receiverOptions1)
.receive()
.next()
.map { it.value() }
val consumer2Event = KafkaReceiver.create(receiverOptions2)
.receive()
.next()
.map { it.value() }
// Execute the send action and verify success
val publishResult = kafkaEventPublisher.publishEvent(testTopic, testKey, testEvent)
assert(publishResult.isSuccess) { "Expected successful publish result" }
// Both consumers should receive the same message (different groups)
StepVerifier.create(consumer1Event.zipWith(consumer2Event))
.expectNextMatches { tuple ->
tuple.t1 == testEvent && tuple.t2 == testEvent
}
.verifyComplete()
}
@Test
fun `should handle different event types in integration scenario`() = runTest {
val complexEvent = ComplexTestEvent(
id = 123,
name = "Integration Test",
metadata = mapOf("type" to "complex", "version" to "1.0"),
timestamp = System.currentTimeMillis()
)
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
trustedPackages = "at.mocode.*"
}
val consumerProps = testKafkaConfig.consumerConfigs("complex-test-group-${UUID.randomUUID()}")
val jsonValueDeserializer = JsonDeserializer(ComplexTestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, ComplexTestEvent>(consumerProps)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonValueDeserializer)
.subscription(listOf(testTopic))
val receivedEvent = KafkaReceiver.create(receiverOptions)
.receive()
.next()
.map { it.value() }
// Execute the send action and verify success
val publishResult = kafkaEventPublisher.publishEvent(testTopic, "complex-key", complexEvent)
assert(publishResult.isSuccess) { "Expected successful publish result" }
// Verify that the complex event can be received
StepVerifier.create(receivedEvent)
.expectNext(complexEvent)
.verifyComplete()
}
@Test
fun `should maintain message ordering within partition`() = runTest {
val partitionKey = "ordered-messages"
val messageCount = 5
val orderedEvents = (1..messageCount).map { i ->
partitionKey to TestEvent("Ordered message $i")
}
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
trustedPackages = "at.mocode.*"
}
val consumerProps = testKafkaConfig.consumerConfigs("ordering-test-group-${UUID.randomUUID()}")
val jsonValueDeserializer = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, TestEvent>(consumerProps)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonValueDeserializer)
.subscription(listOf(testTopic))
val receivedEvents = KafkaReceiver.create(receiverOptions)
.receive()
.take(messageCount.toLong())
.map { it.value() }
.collectList()
// Send ordered events and verify success
val publishResult = kafkaEventPublisher.publishEvents(testTopic, orderedEvents)
assert(publishResult.isSuccess) { "Expected successful batch publish result" }
assert(publishResult.getOrNull()?.size == messageCount) { "Expected $messageCount successful operations" }
// Verify message ordering is maintained
StepVerifier.create(receivedEvents)
.expectNextMatches { events ->
events.size == messageCount &&
events.mapIndexed { index, event ->
event.message == "Ordered message ${index + 1}"
}.all { it }
}
.verifyComplete()
}
@Test
fun `should handle empty batch gracefully in integration test`() = runTest {
val emptyBatch = emptyList<Pair<String?, Any>>()
val publishResult = kafkaEventPublisher.publishEvents(testTopic, emptyBatch)
assert(publishResult.isSuccess) { "Expected successful result for empty batch" }
assert(publishResult.getOrNull()?.isEmpty() == true) { "Expected empty result list" }
}
data class TestEvent(val message: String)
data class ComplexTestEvent(
val id: Int,
val name: String,
val metadata: Map<String, String>,
val timestamp: Long
)
}
@@ -0,0 +1,317 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaSecurityTest {
@Test
fun `should configure trusted packages correctly for JSON deserializer`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.*,com.example.*"
}
val consumerConfigs = config.consumerConfigs("security-test-group")
// Verify trusted packages configuration
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES]).isEqualTo("at.mocode.*,com.example.*")
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
}
@Test
fun `should reject empty trusted packages configuration`() {
val config = KafkaConfig()
assertThrows<IllegalArgumentException> {
config.trustedPackages = ""
}
assertThrows<IllegalArgumentException> {
config.trustedPackages = " "
}
}
@Test
fun `should validate trusted packages with various formats`() {
val config = KafkaConfig()
// Valid trusted package formats
val validPackages = listOf(
"at.mocode.*",
"at.mocode.*,com.example.*",
"java.lang.*,java.util.*",
"com.company.specific.Package",
"org.springframework.*,at.mocode.*,com.test.*"
)
validPackages.forEach { packages ->
assertDoesNotThrow {
config.trustedPackages = packages
assertThat(config.trustedPackages).isEqualTo(packages)
}
}
}
@Test
fun `should configure security features when enabled`() {
val config = KafkaConfig().apply {
enableSecurityFeatures = true
}
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs("secure-group")
// Verify security-related producer configurations
assertThat(producerConfigs[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG]).isEqualTo(true)
assertThat(producerConfigs[ProducerConfig.ACKS_CONFIG]).isEqualTo("all")
assertThat(producerConfigs[ProducerConfig.RETRIES_CONFIG]).isEqualTo(3)
// Verify security-related consumer configurations
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES]).isEqualTo("at.mocode.*")
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
assertThat(consumerConfigs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG]).isEqualTo(false)
}
@Test
fun `should configure security features when disabled`() {
val config = KafkaConfig().apply {
enableSecurityFeatures = false // Explicitly disable
}
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs("non-secure-group")
// Even when disabled, core security features should still be present
// This ensures baseline security is maintained
assertThat(producerConfigs[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG]).isEqualTo(true)
assertThat(producerConfigs[ProducerConfig.ACKS_CONFIG]).isEqualTo("all")
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES]).isEqualTo("at.mocode.*")
}
@Test
fun `should prevent JSON type header usage for security`() {
val config = KafkaConfig()
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs("header-test-group")
// Type headers should be disabled to prevent deserialization attacks
assertThat(producerConfigs[JsonSerializer.ADD_TYPE_INFO_HEADERS]).isEqualTo(false)
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
}
@Test
fun `should create secure JSON deserializer for consumer`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.*,com.test.*"
}
val consumer = KafkaEventConsumer(config)
// Test that consumer can be created with security configuration
assertThat(consumer).isNotNull
// Test that reactive streams can be created (they use secure deserializer internally)
assertDoesNotThrow {
val flux = consumer.receiveEventsWithResult<SecureTestEvent>("secure-topic")
assertThat(flux).isNotNull
}
}
@Test
fun `should handle multiple trusted package patterns`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.domain.*,at.mocode.events.*,com.example.secure.*"
}
val consumerConfigs = config.consumerConfigs("multi-pattern-group")
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES])
.isEqualTo("at.mocode.domain.*,at.mocode.events.*,com.example.secure.*")
}
@Test
fun `should enforce manual commit for better security control`() {
val config = KafkaConfig()
val consumerConfigs = config.consumerConfigs("manual-commit-group")
// Auto-commit should be disabled for better control over message processing
assertThat(consumerConfigs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG]).isEqualTo(false)
// Session and heartbeat timeouts should be configured for security
assertThat(consumerConfigs[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG]).isEqualTo(30000)
assertThat(consumerConfigs[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG]).isEqualTo(3000)
}
@Test
fun `should configure connection security settings`() {
val config = KafkaConfig()
val consumerConfigs = config.consumerConfigs("connection-security-group")
// Connection security settings
assertThat(consumerConfigs[ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG]).isEqualTo(540000)
assertThat(consumerConfigs[ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG]).isEqualTo(50)
assertThat(consumerConfigs[ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG]).isEqualTo(1000)
}
@Test
fun `should validate connection pool size for security`() {
val config = KafkaConfig()
// Valid connection pool sizes
assertDoesNotThrow { config.connectionPoolSize = 1 }
assertDoesNotThrow { config.connectionPoolSize = 5 }
assertDoesNotThrow { config.connectionPoolSize = 50 }
// Invalid connection pool sizes (security risk - too many connections)
assertThrows<IllegalArgumentException> { config.connectionPoolSize = 0 }
assertThrows<IllegalArgumentException> { config.connectionPoolSize = -1 }
}
@Test
fun `should create producer factory with secure configuration`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.*"
enableSecurityFeatures = true
}
val producerFactory = config.producerFactory()
// Verify producer factory is created successfully
assertThat(producerFactory).isNotNull
// Test creating a producer
assertDoesNotThrow {
val producer = producerFactory.createProducer()
assertThat(producer).isNotNull
}
// Verify secure configuration is applied
val configs = producerFactory.configurationProperties
assertThat(configs[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG]).isEqualTo(true)
assertThat(configs[JsonSerializer.ADD_TYPE_INFO_HEADERS]).isEqualTo(false)
}
@Test
fun `should handle security configuration for different environments`() {
// Development environment
val devConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
trustedPackages = "at.mocode.*,com.test.*"
enableSecurityFeatures = true
}
// Production environment
val prodConfig = KafkaConfig().apply {
bootstrapServers = "prod-kafka:9092"
trustedPackages = "at.mocode.*" // More restrictive
enableSecurityFeatures = true
connectionPoolSize = 20
}
// Both configurations should be valid
assertDoesNotThrow {
KafkaEventConsumer(devConfig)
KafkaEventPublisher(ReactiveKafkaConfig(devConfig).reactiveKafkaProducerTemplate())
}
assertDoesNotThrow {
KafkaEventConsumer(prodConfig)
KafkaEventPublisher(ReactiveKafkaConfig(prodConfig).reactiveKafkaProducerTemplate())
}
}
@Test
fun `should validate group ID format for security`() {
val config = KafkaConfig()
// Valid group ID prefixes
val validPrefixes = listOf(
"secure-consumer",
"production.consumer",
"dev_consumer",
"consumer-123"
)
validPrefixes.forEach { prefix ->
assertDoesNotThrow {
config.defaultGroupIdPrefix = prefix
assertThat(config.defaultGroupIdPrefix).isEqualTo(prefix)
}
}
// Invalid group ID prefixes (potential security issues)
val invalidPrefixes = listOf(
"", // Empty
" ", // Whitespace only
"invalid@consumer", // Special characters
"consumer with spaces",
"consumer/with/slashes",
"consumer#hash"
)
invalidPrefixes.forEach { prefix ->
assertThrows<IllegalArgumentException> {
config.defaultGroupIdPrefix = prefix
}
}
}
@Test
fun `should configure serialization security`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.*,com.secure.*"
}
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs("serialization-security-group")
// Producer serialization security
assertThat(producerConfigs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG].toString())
.isEqualTo("class org.apache.kafka.common.serialization.StringSerializer")
assertThat(producerConfigs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG].toString())
.isEqualTo("class org.springframework.kafka.support.serializer.JsonSerializer")
assertThat(producerConfigs[JsonSerializer.ADD_TYPE_INFO_HEADERS]).isEqualTo(false)
// Consumer deserialization security
assertThat(consumerConfigs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG].toString())
.isEqualTo("class org.apache.kafka.common.serialization.StringDeserializer")
assertThat(consumerConfigs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG].toString())
.isEqualTo("class org.springframework.kafka.support.serializer.JsonDeserializer")
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES]).isEqualTo("at.mocode.*,com.secure.*")
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
}
@Test
fun `should provide secure defaults`() {
val config = KafkaConfig() // Use default values
// Verify secure defaults
assertThat(config.trustedPackages).isEqualTo("at.mocode.*")
assertThat(config.enableSecurityFeatures).isEqualTo(true)
assertThat(config.connectionPoolSize).isEqualTo(10)
assertThat(config.defaultGroupIdPrefix).isEqualTo("messaging-client")
// Verify secure configurations are applied with defaults
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs()
assertThat(producerConfigs[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG]).isEqualTo(true)
assertThat(producerConfigs[JsonSerializer.ADD_TYPE_INFO_HEADERS]).isEqualTo(false)
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
assertThat(consumerConfigs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG]).isEqualTo(false)
}
data class SecureTestEvent(
val data: String,
val timestamp: Long = System.currentTimeMillis()
)
}
@@ -0,0 +1,31 @@
// Dieses Modul stellt die zentrale, wiederverwendbare Konfiguration
// für die Verbindung mit Apache Kafka bereit (z.B. Bootstrap-Server, Serializer).
plugins {
alias(libs.plugins.kotlinJvm)
alias(libs.plugins.kotlinSpring)
alias(libs.plugins.spring.boot)
alias(libs.plugins.spring.dependencyManagement)
}
// Deaktiviert die Erstellung eines ausführbaren Jars für dieses Bibliotheks-Modul.
tasks.bootJar {
enabled = false
}
// Stellt sicher, dass stattdessen ein reguläres Jar gebaut wird
tasks.jar {
enabled = true
}
dependencies {
// Stellt sicher, dass alle Versionen aus der zentralen BOM kommen.
api(platform(projects.platform.platformBom))
// Stellt gemeinsame Abhängigkeiten bereit.
api(projects.platform.platformDependencies)
// OPTIMIERUNG: Verwendung des `kafka-config`-Bundles.
// `api` wird verwendet, damit der `messaging-client` diese Konfigurationen
// und Abhängigkeiten (wie Jackson) direkt nutzen kann.
api(libs.bundles.kafka.config)
// Stellt alle Test-Abhängigkeiten gebündelt bereit.
testImplementation(projects.platform.platformTesting)
}
@@ -0,0 +1,136 @@
package at.mocode.infrastructure.messaging.config
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer
/**
* Zentrale Kafka-Konfiguration mit optimierten Einstellungen für Performance und Zuverlässigkeit.
*
* Diese Klasse kann programmatisch instanziiert werden (z. B. in Tests) oder
* als Spring-@Configuration mit @Bean-Methoden in einem Application Context registriert werden.
*
* Erweitert um Konfigurationsvalidierung und zusätzliche Optimierungen.
*/
class KafkaConfig {
/**
* Kommagetrennte Liste von host:port-Paaren für die initiale Verbindung zum Kafka-Cluster.
*/
var bootstrapServers: String = "localhost:9092"
set(value) {
require(value.isNotBlank()) { "Bootstrap servers cannot be blank" }
// Support both simple format (host:port) and protocol-prefixed format (PLAINTEXT://host:port)
val isValidFormat = value.matches(Regex("^[a-zA-Z0-9._-]+:[0-9]+(,[a-zA-Z0-9._-]+:[0-9]+)*$")) ||
value.matches(Regex("^[A-Z]+://[a-zA-Z0-9._-]+:[0-9]+(,[A-Z]+://[a-zA-Z0-9._-]+:[0-9]+)*$"))
require(isValidFormat) {
"Bootstrap servers must be in format 'host:port' or 'PROTOCOL://host:port'"
}
field = value
}
/**
* Standard-Präfix für Consumer-Group-IDs.
*/
var defaultGroupIdPrefix: String = "messaging-client"
set(value) {
require(value.isNotBlank()) { "Default group ID prefix cannot be blank" }
require(value.matches(Regex("^[a-zA-Z0-9._-]+$"))) {
"Group ID prefix must contain only alphanumeric characters, dots, underscores, and hyphens"
}
field = value
}
/**
* Comma-separated list of trusted packages for JSON deserialization security.
* Default restricts to application packages only.
*/
var trustedPackages: String = "at.mocode.*"
set(value) {
require(value.isNotBlank()) { "Trusted packages cannot be blank" }
field = value
}
/**
* Enable additional security features for production environments.
*/
var enableSecurityFeatures: Boolean = true
/**
* Connection pool size for better resource management.
*/
var connectionPoolSize: Int = 10
set(value) {
require(value > 0) { "Connection pool size must be positive" }
field = value
}
/**
* Optimized producer properties with performance tuning and reliability settings.
*/
fun producerConfigs(): Map<String, Any> = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
// Avoid adding type info headers; keeps payloads simple and interoperable.
JsonSerializer.ADD_TYPE_INFO_HEADERS to false,
// Performance optimizations
ProducerConfig.BATCH_SIZE_CONFIG to 32768, // 32KB batch size for better throughput
ProducerConfig.LINGER_MS_CONFIG to 5, // Wait up to 5ms to batch messages
ProducerConfig.COMPRESSION_TYPE_CONFIG to "snappy", // Fast compression
ProducerConfig.BUFFER_MEMORY_CONFIG to 67108864, // 64MB buffer memory
// Reliability settings
ProducerConfig.ACKS_CONFIG to "all", // Wait for all replicas
ProducerConfig.RETRIES_CONFIG to 3, // Retry failed sends
ProducerConfig.RETRY_BACKOFF_MS_CONFIG to 1000, // 1 second retry backoff
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG to 30000, // 30 second delivery timeout
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG to 10000, // 10 second request timeout
// Idempotence for exactly-once semantics
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 5
)
/**
* Optimized consumer properties with performance tuning and reliability settings.
*/
fun consumerConfigs(groupId: String? = null): Map<String, Any> = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG to (groupId ?: "${defaultGroupIdPrefix}-${System.currentTimeMillis()}"),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
// JSON deserialization security
JsonDeserializer.TRUSTED_PACKAGES to trustedPackages,
JsonDeserializer.USE_TYPE_INFO_HEADERS to false,
// Performance optimizations
ConsumerConfig.FETCH_MIN_BYTES_CONFIG to 1024, // 1KB minimum fetch size
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG to 500, // Max 500ms wait for fetch
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG to 1048576, // 1MB max partition fetch
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 500, // Process up to 500 records per poll
// Reliability settings
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, // Manual commit for better control
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to 30000, // 30 second session timeout
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to 3000, // 3 second heartbeat
// Connection settings
ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG to 540000, // 9 minutes idle timeout
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG to 50,
ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG to 1000
)
/**
* Strongly typed producer factory to avoid unchecked casts in consumers/tests.
*/
fun producerFactory(): DefaultKafkaProducerFactory<String, Any> =
DefaultKafkaProducerFactory(producerConfigs())
}
@@ -0,0 +1,147 @@
package at.mocode.infrastructure.messaging.config
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaConfigTest {
@Test
fun `should validate bootstrap servers format`() {
val config = KafkaConfig()
// Valid formats
assertDoesNotThrow { config.bootstrapServers = "localhost:9092" }
assertDoesNotThrow { config.bootstrapServers = "PLAINTEXT://localhost:9092" }
assertDoesNotThrow { config.bootstrapServers = "host1:9092,host2:9092" }
assertDoesNotThrow { config.bootstrapServers = "PLAINTEXT://host1:9092,PLAINTEXT://host2:9092" }
assertDoesNotThrow { config.bootstrapServers = "kafka.example.com:9092" }
assertDoesNotThrow { config.bootstrapServers = "kafka-cluster-01.internal:9092" }
// Invalid formats
assertThrows<IllegalArgumentException> { config.bootstrapServers = "" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = " " }
assertThrows<IllegalArgumentException> { config.bootstrapServers = "invalid-format" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = "localhost" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = ":9092" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = "localhost:" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = "localhost:abc" }
}
@Test
fun `should validate group ID prefix`() {
val config = KafkaConfig()
// Valid prefixes
assertDoesNotThrow { config.defaultGroupIdPrefix = "valid-prefix_123" }
assertDoesNotThrow { config.defaultGroupIdPrefix = "messaging-client" }
assertDoesNotThrow { config.defaultGroupIdPrefix = "test.group.id" }
assertDoesNotThrow { config.defaultGroupIdPrefix = "simple123" }
// Invalid prefixes
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "" }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = " " }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "invalid@prefix" }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "invalid#prefix" }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "invalid prefix" }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "invalid/prefix" }
}
@Test
fun `should validate trusted packages`() {
val config = KafkaConfig()
// Valid trusted packages
assertDoesNotThrow { config.trustedPackages = "at.mocode.*,com.example.*" }
assertDoesNotThrow { config.trustedPackages = "at.mocode.*" }
assertDoesNotThrow { config.trustedPackages = "com.example.specific.Package" }
assertDoesNotThrow { config.trustedPackages = "java.lang.*,java.util.*" }
// Invalid trusted packages
assertThrows<IllegalArgumentException> { config.trustedPackages = "" }
assertThrows<IllegalArgumentException> { config.trustedPackages = " " }
}
@Test
fun `should validate connection pool size`() {
val config = KafkaConfig()
// Valid pool sizes
assertDoesNotThrow { config.connectionPoolSize = 1 }
assertDoesNotThrow { config.connectionPoolSize = 5 }
assertDoesNotThrow { config.connectionPoolSize = 10 }
assertDoesNotThrow { config.connectionPoolSize = 100 }
// Invalid pool sizes
assertThrows<IllegalArgumentException> { config.connectionPoolSize = 0 }
assertThrows<IllegalArgumentException> { config.connectionPoolSize = -1 }
assertThrows<IllegalArgumentException> { config.connectionPoolSize = -10 }
}
@Test
fun `should have default values set correctly`() {
val config = KafkaConfig()
assertThat(config.bootstrapServers).isEqualTo("localhost:9092")
assertThat(config.defaultGroupIdPrefix).isEqualTo("messaging-client")
assertThat(config.trustedPackages).isEqualTo("at.mocode.*")
assertThat(config.enableSecurityFeatures).isEqualTo(true)
assertThat(config.connectionPoolSize).isEqualTo(10)
}
@Test
fun `should generate valid producer configs`() {
val config = KafkaConfig()
val producerConfigs = config.producerConfigs()
// Verify essential producer configuration
assertThat(producerConfigs["bootstrap.servers"]).isEqualTo("localhost:9092")
assertThat(producerConfigs["key.serializer"]).isEqualTo(org.apache.kafka.common.serialization.StringSerializer::class.java)
assertThat(producerConfigs["value.serializer"]).isEqualTo(org.springframework.kafka.support.serializer.JsonSerializer::class.java)
assertThat(producerConfigs["acks"]).isEqualTo("all")
assertThat(producerConfigs["enable.idempotence"]).isEqualTo(true)
}
@Test
fun `should generate valid consumer configs with custom group ID`() {
val config = KafkaConfig()
val customGroupId = "test-group-123"
val consumerConfigs = config.consumerConfigs(customGroupId)
// Verify essential consumer configuration
assertThat(consumerConfigs["bootstrap.servers"]).isEqualTo("localhost:9092")
assertThat(consumerConfigs["group.id"]).isEqualTo(customGroupId)
assertThat(consumerConfigs["key.deserializer"]).isEqualTo(org.apache.kafka.common.serialization.StringDeserializer::class.java)
assertThat(consumerConfigs["value.deserializer"]).isEqualTo(org.springframework.kafka.support.serializer.JsonDeserializer::class.java)
assertThat(consumerConfigs["spring.json.trusted.packages"]).isEqualTo("at.mocode.*")
assertThat(consumerConfigs["auto.offset.reset"]).isEqualTo("earliest")
assertThat(consumerConfigs["enable.auto.commit"]).isEqualTo(false)
}
@Test
fun `should generate unique consumer configs when no group ID provided`() {
val config = KafkaConfig()
val consumerConfigs1 = config.consumerConfigs()
val consumerConfigs2 = config.consumerConfigs()
// Group IDs should be different (timestamp-based)
val groupId1 = consumerConfigs1["group.id"].toString()
val groupId2 = consumerConfigs2["group.id"].toString()
assertThat(groupId1).isNotEqualTo(groupId2)
assertThat(groupId1).startsWith("messaging-client-")
assertThat(groupId2).startsWith("messaging-client-")
}
@Test
fun `should create producer factory with correct configuration`() {
val config = KafkaConfig()
val producerFactory = config.producerFactory()
assertDoesNotThrow { producerFactory.createProducer() }
assertThat(producerFactory.configurationProperties["bootstrap.servers"]).isEqualTo("localhost:9092")
}
}