fixing(infra-messaging)

This commit is contained in:
2025-08-15 01:17:24 +02:00
parent ad91050a2f
commit 846918cf69
14 changed files with 2324 additions and 45 deletions
@@ -1,5 +1,6 @@
package at.mocode.infrastructure.gateway.config
import org.slf4j.LoggerFactory
import org.springframework.cloud.gateway.filter.GatewayFilterChain
import org.springframework.cloud.gateway.filter.GlobalFilter
import org.springframework.core.Ordered
@@ -58,6 +59,8 @@ class CorrelationIdFilter : GlobalFilter, Ordered {
@org.springframework.context.annotation.Profile("!test")
class EnhancedLoggingFilter : GlobalFilter, Ordered {
private val logger = LoggerFactory.getLogger(EnhancedLoggingFilter::class.java)
override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
val startTime = System.currentTimeMillis()
val request = exchange.request
@@ -77,29 +80,44 @@ class EnhancedLoggingFilter : GlobalFilter, Ordered {
}
private fun logRequest(request: ServerHttpRequest, correlationId: String?) {
println("""
[${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}] [REQUEST] [${correlationId}]
Method: ${request.method}
URI: ${request.uri}
RemoteAddress: ${request.remoteAddress}
UserAgent: ${request.headers.getFirst("User-Agent")}
""".trimIndent())
logger.info("""
[REQUEST] [{}]
Method: {}
URI: {}
RemoteAddress: {}
UserAgent: {}
""".trimIndent(),
correlationId,
request.method,
request.uri,
request.remoteAddress,
request.headers.getFirst("User-Agent")
)
}
private fun logResponse(response: ServerHttpResponse, correlationId: String?, responseTime: Long) {
println("""
[${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}] [RESPONSE] [${correlationId}]
Status: ${response.statusCode}
ResponseTime: ${responseTime}ms
""".trimIndent())
logger.info("""
[RESPONSE] [{}]
Status: {}
ResponseTime: {}ms
""".trimIndent(),
correlationId,
response.statusCode,
responseTime
)
}
private fun logError(error: Throwable, correlationId: String?, responseTime: Long) {
println("""
[${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}] [ERROR] [${correlationId}]
Error: ${error.message}
ResponseTime: ${responseTime}ms
""".trimIndent())
logger.error("""
[ERROR] [{}]
Error: {}
ResponseTime: {}ms
""".trimIndent(),
correlationId,
error.message,
responseTime,
error
)
}
override fun getOrder(): Int = Ordered.HIGHEST_PRECEDENCE + 1
@@ -80,6 +80,24 @@ Die Zuverlässigkeit des Moduls wird durch einen umfassenden Integrationstest si
* **Lifecycle Management: Der Test-Lebenszyklus wird sauber über @BeforeEach und @AfterEach verwaltet, um sicherzustellen, dass alle Ressourcen (insbesondere Producer-Threads) nach jedem Test korrekt freigegeben werden.*
## Neue Features und Optimierungen (2025)
### Erweiterte Konfigurationsvalidierung
* **Automatische Validierung**: Alle Konfigurationsparameter werden automatisch bei der Zuweisung validiert
* **Bootstrap-Server-Format**: Unterstützt sowohl einfache (`host:port`) als auch protokoll-präfixierte Formate (`PLAINTEXT://host:port`)
* **Sicherheitsfeatures**: Configurable Sicherheitsfunktionen für Produktionsumgebungen
* **Connection-Pool-Management**: Konfigurierbare Verbindungspool-Größe für bessere Ressourcenverwaltung
### Verbesserte Observability
* **Strukturierte Logs**: Erweiterte Logging-Informationen mit GroupID, Timestamps und Event-Kontext
* **Fehlerkontext**: Detaillierte Fehlerinformationen mit Retry-Status und Event-Type-Details
* **Performance-Tracking**: Bessere Nachvollziehbarkeit von Batch-Operationen und Retry-Versuchen
### Robustheit-Verbesserungen
* **Intelligente Validierung**: Erkennt und verhindert häufige Konfigurationsfehler
* **Testcontainer-Kompatibilität**: Vollständige Kompatibilität mit Docker-basierten Tests
* **Enhanced Error Handling**: Verbesserte Fehlerbehandlung mit strukturierten Kontext-Informationen
---
**Letzte Aktualisierung**: 9. August 2025
**Letzte Aktualisierung**: 14. August 2025
@@ -10,13 +10,13 @@ interface EventPublisher {
/**
* Publishes a single event to the specified topic.
* Returns a Mono that completes when the send operation is finished.
* Returns a Mono that emits Unit when the send operation is finished.
*/
fun publishEvent(topic: String, key: String? = null, event: Any): Mono<Void>
fun publishEvent(topic: String, key: String? = null, event: Any): Mono<Unit>
/**
* Publishes multiple events to the specified topic.
* Returns a Flux that completes when all send operations are finished.
* Returns a Flux that emits one Unit per successfully published event.
*/
fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Flux<Void>
fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Flux<Unit>
}
@@ -31,6 +31,7 @@ class KafkaEventConsumer(
logger.info("Setting up reactive consumer for topic '{}' with event type '{}'", topic, eventType.simpleName)
val cacheKey = "${topic}-${eventType.name}"
val groupId = "${kafkaConfig.defaultGroupIdPrefix}-${topic}-${eventType.simpleName.lowercase()}"
// Get or create a cached receiver for this topic-eventType combination
@Suppress("UNCHECKED_CAST")
@@ -41,8 +42,9 @@ class KafkaEventConsumer(
return receiver.receive()
.doOnNext { record ->
logger.debug(
"Received message from topic-partition {}-{} with offset {} for event type '{}'",
record.topic(), record.partition(), record.offset(), eventType.simpleName
"Received message from topic-partition {}-{} with offset {} for event type '{}' [groupId={}, timestamp={}]",
record.topic(), record.partition(), record.offset(), eventType.simpleName,
groupId, record.timestamp()
)
}
.map { record ->
@@ -51,8 +53,8 @@ class KafkaEventConsumer(
record.value()
}
.doOnError { exception ->
logger.error("Error receiving events from topic '{}' for event type '{}'",
topic, eventType.simpleName, exception)
logger.error("Error receiving events from topic '{}' for event type '{}' [groupId={}, cacheKey={}]: {}",
topic, eventType.simpleName, groupId, cacheKey, exception.message, exception)
}
.retryWhen(
Retry.backoff(3, Duration.ofSeconds(1))
@@ -5,6 +5,7 @@ import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kafka.sender.SenderResult
import reactor.util.retry.Retry
import java.time.Duration
@@ -26,7 +27,7 @@ class KafkaEventPublisher(
private const val DEFAULT_BATCH_CONCURRENCY = 10
}
override fun publishEvent(topic: String, key: String?, event: Any): Mono<Void> {
override fun publishEvent(topic: String, key: String?, event: Any): Mono<Unit> {
logger.debug("Publishing event to topic '{}' with key '{}', event type: '{}'",
topic, key, event::class.simpleName)
@@ -39,18 +40,18 @@ class KafkaEventPublisher(
)
}
.doOnError { exception ->
logger.warn("Failed to publish event to topic '{}' with key '{}' - will retry if configured",
topic, key, exception)
logger.warn("Failed to publish event to topic '{}' with key '{}' [eventType={}, retryable={}] - will retry if configured: {}",
topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
}
.retryWhen(createRetrySpec(topic, key))
.doOnError { exception ->
logger.error("Final failure after retries: Failed to publish event to topic '{}' with key '{}'",
topic, key, exception)
}
.then()
.map { Unit }
}
override fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Flux<Void> {
override fun publishEvents(topic: String, events: List<Pair<String?, Any>>): Flux<Unit> {
if (events.isEmpty()) {
logger.debug("No events to publish to topic '{}'", topic)
return Flux.empty()
@@ -64,13 +65,22 @@ class KafkaEventPublisher(
val index = indexedEventPair.t1
val eventPair = indexedEventPair.t2
val (key, event) = eventPair
publishEvent(topic, key, event)
.doOnSuccess {
reactiveKafkaTemplate.send(topic, key ?: "", event)
.doOnSuccess { result ->
val record = result.recordMetadata()
logger.debug("Successfully published event to topic-partition {}-{} with offset {} (key: '{}')",
record.topic(), record.partition(), record.offset(), key)
if ((index + 1) % 100 == 0L || index == events.size.toLong() - 1) {
logger.info("Batch progress: {}/{} events published to topic '{}'",
index + 1, events.size, topic)
}
}
.doOnError { exception ->
logger.warn("Failed to publish event {} in batch to topic '{}' with key '{}' [eventType={}, retryable={}] - will retry if configured: {}",
index + 1, topic, key, event::class.simpleName, isRetryableException(exception), exception.message, exception)
}
.retryWhen(createRetrySpec(topic, key))
.map { Unit } // Convert to Mono<Unit> that emits one Unit per successful send
.onErrorContinue { error, _ ->
logger.error("Error publishing event {} in batch to topic '{}': {}",
index + 1, topic, error.message)
@@ -0,0 +1,311 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.client.ReactiveKafkaConfig
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import reactor.test.StepVerifier
import java.util.*
@Testcontainers
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaBatchPerformanceTest {
private val logger = LoggerFactory.getLogger(KafkaBatchPerformanceTest::class.java)
companion object {
@Container
private val kafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"))
}
private lateinit var kafkaEventPublisher: KafkaEventPublisher
private lateinit var producerFactory: DefaultKafkaProducerFactory<String, Any>
private val testTopic = "performance-topic-${UUID.randomUUID()}"
@BeforeEach
fun setUp() {
val kafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
trustedPackages = "at.mocode.*"
}
producerFactory = kafkaConfig.producerFactory()
val reactiveKafkaConfig = ReactiveKafkaConfig(kafkaConfig)
val reactiveTemplate = reactiveKafkaConfig.reactiveKafkaProducerTemplate()
kafkaEventPublisher = KafkaEventPublisher(reactiveTemplate)
}
@AfterEach
fun tearDown() {
producerFactory.destroy()
}
@Test
fun `should handle small batch efficiently`() {
val batchSize = 50
val smallEventBatch = (1..batchSize).map { i ->
"key$i" to PerformanceTestEvent("Small batch message $i", i)
}
val startTime = System.currentTimeMillis()
StepVerifier.create(kafkaEventPublisher.publishEvents(testTopic, smallEventBatch))
.expectNextCount(batchSize.toLong())
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
// Small batch should complete quickly (within 10 seconds)
assertThat(duration).isLessThan(10000)
}
@Test
fun `should handle medium batch efficiently`() {
val batchSize = 500
val mediumEventBatch = (1..batchSize).map { i ->
"key$i" to PerformanceTestEvent("Medium batch message $i", i)
}
val startTime = System.currentTimeMillis()
StepVerifier.create(kafkaEventPublisher.publishEvents(testTopic, mediumEventBatch))
.expectNextCount(batchSize.toLong())
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
// Medium batch should complete within a reasonable time (30 seconds)
assertThat(duration).isLessThan(30000)
// Should be reasonably efficient (less than 60 ms per message on average)
val avgTimePerMessage = duration.toDouble() / batchSize
assertThat(avgTimePerMessage).isLessThan(60.0)
}
@Test
fun `should handle large batch with reasonable performance`() {
val batchSize = 1000
val largeEventBatch = (1..batchSize).map { i ->
"key$i" to PerformanceTestEvent("Large batch message $i", i)
}
val startTime = System.currentTimeMillis()
StepVerifier.create(kafkaEventPublisher.publishEvents(testTopic, largeEventBatch))
.expectNextCount(batchSize.toLong())
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
// Large batch should complete within 60 seconds
assertThat(duration).isLessThan(60000)
// Should maintain reasonable efficiency (less than 100 ms per message on average)
val avgTimePerMessage = duration.toDouble() / batchSize
assertThat(avgTimePerMessage).isLessThan(100.0)
}
@Test
fun `should handle concurrent batch publishing`() {
val batchSize = 100
val concurrentBatches = 5
val batches = (1..concurrentBatches).map { batchIndex ->
(1..batchSize).map { i ->
"batch${batchIndex}_key$i" to PerformanceTestEvent("Concurrent batch $batchIndex message $i", i)
}
}
val startTime = System.currentTimeMillis()
// Publish all batches concurrently
val publishers = batches.map { batch ->
kafkaEventPublisher.publishEvents(testTopic, batch)
.collectList() // Collect results for each batch
}
StepVerifier.create(reactor.core.publisher.Flux.merge(publishers))
.expectNextCount(concurrentBatches.toLong())
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
// Concurrent publishing should be efficient (within 45 seconds for all batches)
assertThat(duration).isLessThan(45000)
// Should benefit from concurrency (less than 80 ms per message across all batches)
val totalMessages = batchSize * concurrentBatches
val avgTimePerMessage = duration.toDouble() / totalMessages
assertThat(avgTimePerMessage).isLessThan(80.0)
}
@Test
fun `should handle single message publishing performance`() {
val messageCount = 100
val messages = (1..messageCount).map { i ->
PerformanceTestEvent("Single message $i", i)
}
val startTime = System.currentTimeMillis()
val publishers = messages.mapIndexed { index, message ->
kafkaEventPublisher.publishEvent(testTopic, "single_key_$index", message)
}
StepVerifier.create(reactor.core.publisher.Flux.merge(publishers))
.expectNextCount(messageCount.toLong())
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
// Individual message publishing should complete within 20 seconds
assertThat(duration).isLessThan(20000)
// Should maintain reasonable per-message performance
val avgTimePerMessage = duration.toDouble() / messageCount
assertThat(avgTimePerMessage).isLessThan(200.0)
}
@Test
fun `should handle mixed payload sizes efficiently`() {
val smallPayload = "small"
val mediumPayload = "medium".repeat(100) // ~600 characters
val largePayload = "large".repeat(1000) // ~5000 characters
val mixedEventBatch = listOf(
// Small payloads
*((1..50).map { i -> "small_key_$i" to PerformanceTestEvent(smallPayload, i) }.toTypedArray()),
// Medium payloads
*((1..30).map { i -> "medium_key_$i" to PerformanceTestEvent(mediumPayload, i) }.toTypedArray()),
// Large payloads
*((1..20).map { i -> "large_key_$i" to PerformanceTestEvent(largePayload, i) }.toTypedArray())
)
val startTime = System.currentTimeMillis()
StepVerifier.create(kafkaEventPublisher.publishEvents(testTopic, mixedEventBatch))
.expectNextCount(100) // 50 + 30 + 20 = 100
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
// Mixed payload sizes should be handled efficiently (within 15 seconds)
assertThat(duration).isLessThan(15000)
}
@Test
fun `should demonstrate batch vs individual performance difference`() {
val messageCount = 200
val events = (1..messageCount).map { i ->
"perf_key_$i" to PerformanceTestEvent("Performance test message $i", i)
}
// Test individual publishing
val individualStartTime = System.currentTimeMillis()
val individualPublishers = events.map { (key, event) ->
kafkaEventPublisher.publishEvent(testTopic, key, event)
}
StepVerifier.create(reactor.core.publisher.Flux.merge(individualPublishers))
.expectNextCount(messageCount.toLong())
.verifyComplete()
val individualDuration = System.currentTimeMillis() - individualStartTime
// Test batch publishing
val batchStartTime = System.currentTimeMillis()
StepVerifier.create(kafkaEventPublisher.publishEvents(testTopic, events))
.expectNextCount(messageCount.toLong())
.verifyComplete()
val batchDuration = System.currentTimeMillis() - batchStartTime
// Batch publishing should generally be more efficient or at least comparable
// We don't enforce strict performance improvements due to test environment variability,
// but we verify both approaches complete within reasonable time
assertThat(individualDuration).isLessThan(20000)
assertThat(batchDuration).isLessThan(20000)
logger.info("Individual publishing: {}ms for {} messages", individualDuration, messageCount)
logger.info("Batch publishing: {}ms for {} messages", batchDuration, messageCount)
}
@Test
fun `should handle empty batch gracefully`() {
val emptyBatch = emptyList<Pair<String?, Any>>()
val startTime = System.currentTimeMillis()
StepVerifier.create(kafkaEventPublisher.publishEvents(testTopic, emptyBatch))
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
// Empty batch should complete almost instantly (within 100 ms)
assertThat(duration).isLessThan(100)
}
@Test
fun `should maintain performance under memory pressure`() {
// Create a large batch to test memory handling
val largeBatchSize = 2000
val largeEventBatch = (1..largeBatchSize).map { i ->
"memory_key_$i" to PerformanceTestEvent("Memory pressure test message $i".repeat(10), i)
}
val startTime = System.currentTimeMillis()
StepVerifier.create(kafkaEventPublisher.publishEvents(testTopic, largeEventBatch))
.expectNextCount(largeBatchSize.toLong())
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
// Should handle large batches without excessive memory usage (within 45 seconds)
assertThat(duration).isLessThan(45000)
// Average time per message should remain reasonable even under memory pressure
val avgTimePerMessage = duration.toDouble() / largeBatchSize
assertThat(avgTimePerMessage).isLessThan(25.0)
}
@Test
fun `should respect batch concurrency limits`() {
// Test that batch processing respects configured concurrency
val batchSize = 300
val testBatch = (1..batchSize).map { i ->
"concurrency_key_$i" to PerformanceTestEvent("Concurrency test message $i", i)
}
val startTime = System.currentTimeMillis()
StepVerifier.create(kafkaEventPublisher.publishEvents(testTopic, testBatch))
.expectNextCount(batchSize.toLong())
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
// Should complete efficiently with controlled concurrency (within 20 seconds)
assertThat(duration).isLessThan(20000)
// Verify reasonable throughput
val messagesPerSecond = (batchSize.toDouble() / duration) * 1000
assertThat(messagesPerSecond).isGreaterThan(10.0) // At least 10 messages per second
}
data class PerformanceTestEvent(
val message: String,
val sequenceNumber: Int,
val timestamp: Long = System.currentTimeMillis()
)
}
@@ -0,0 +1,241 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertDoesNotThrow
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaEventConsumerCacheTest {
private lateinit var kafkaConfig: KafkaConfig
private lateinit var consumer: KafkaEventConsumer
@BeforeEach
fun setUp() {
kafkaConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "test-consumer"
trustedPackages = "at.mocode.*"
}
consumer = KafkaEventConsumer(kafkaConfig)
}
@Test
fun `should create consumer successfully with valid configuration`() {
// Test that consumer can be created with different configurations
val customConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "custom-consumer"
trustedPackages = "at.mocode.*,com.example.*"
connectionPoolSize = 5
}
assertDoesNotThrow {
KafkaEventConsumer(customConfig)
}
}
@Test
fun `should create different consumers with different configurations`() {
val config1 = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "consumer1"
trustedPackages = "at.mocode.*"
}
val config2 = KafkaConfig().apply {
bootstrapServers = "localhost:9093"
defaultGroupIdPrefix = "consumer2"
trustedPackages = "com.example.*"
}
val consumer1 = KafkaEventConsumer(config1)
val consumer2 = KafkaEventConsumer(config2)
// Both consumers should be created successfully
assertThat(consumer1).isNotNull
assertThat(consumer2).isNotNull
assertThat(consumer1).isNotSameAs(consumer2)
}
@Test
fun `should handle cleanup gracefully`() {
// Create consumer and call cleanup
val testConsumer = KafkaEventConsumer(kafkaConfig)
// Cleanup should not throw any exceptions
assertDoesNotThrow {
testConsumer.cleanup()
}
// Multiple cleanup calls should also be safe
assertDoesNotThrow {
testConsumer.cleanup()
testConsumer.cleanup()
}
}
@Test
fun `should create reactive streams for different topics`() {
// Test that receiveEvents creates reactive streams without errors
// Note: These won't actually connect to Kafka but should create the Flux
assertDoesNotThrow {
val flux1 = consumer.receiveEvents<TestEvent>("topic1")
val flux2 = consumer.receiveEvents<TestEvent>("topic2")
// Fluxes should be created (cold streams)
assertThat(flux1).isNotNull
assertThat(flux2).isNotNull
}
}
@Test
fun `should create reactive streams for different event types`() {
// Test that different event types create different streams
assertDoesNotThrow {
val flux1 = consumer.receiveEvents<TestEvent>("test-topic")
val flux2 = consumer.receiveEvents<AnotherTestEvent>("test-topic")
// Both should be created successfully
assertThat(flux1).isNotNull
assertThat(flux2).isNotNull
}
}
@Test
fun `should handle consumer configuration with security features`() {
val secureConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "secure-consumer"
trustedPackages = "at.mocode.*,com.secure.*"
enableSecurityFeatures = true
connectionPoolSize = 15
}
assertDoesNotThrow {
val secureConsumer = KafkaEventConsumer(secureConfig)
assertThat(secureConsumer).isNotNull
// Should be able to create streams
val flux = secureConsumer.receiveEvents<TestEvent>("secure-topic")
assertThat(flux).isNotNull
}
}
@Test
fun `should validate trusted packages configuration`() {
// Test with various trusted package configurations
val configs = listOf(
"at.mocode.*",
"at.mocode.*,com.example.*",
"java.lang.*,java.util.*,at.mocode.*"
)
configs.forEach { trustedPackages ->
val config = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "validation-consumer"
this.trustedPackages = trustedPackages
}
assertDoesNotThrow {
val testConsumer = KafkaEventConsumer(config)
val flux = testConsumer.receiveEvents<TestEvent>("validation-topic")
assertThat(flux).isNotNull
}
}
}
@Test
fun `should handle different connection pool sizes`() {
val poolSizes = listOf(1, 5, 10, 20, 50)
poolSizes.forEach { poolSize ->
val config = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "pool-test-consumer"
connectionPoolSize = poolSize
}
assertDoesNotThrow {
val testConsumer = KafkaEventConsumer(config)
assertThat(testConsumer).isNotNull
// Should be able to create reactive streams
val flux = testConsumer.receiveEvents<TestEvent>("pool-test-topic")
assertThat(flux).isNotNull
}
}
}
@Test
fun `should handle different group ID prefixes`() {
val prefixes = listOf(
"test-consumer",
"production-consumer",
"development.consumer",
"consumer_123"
)
prefixes.forEach { prefix ->
val config = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = prefix
trustedPackages = "at.mocode.*"
}
assertDoesNotThrow {
val testConsumer = KafkaEventConsumer(config)
val flux = testConsumer.receiveEvents<TestEvent>("prefix-test-topic")
assertThat(flux).isNotNull
}
}
}
@Test
fun `should support extension function for reified types`() {
// Test the Kotlin extension function receiveEvents<T>()
assertDoesNotThrow {
val fluxWithReified = consumer.receiveEvents<TestEvent>("reified-topic")
val fluxWithClass = consumer.receiveEvents("reified-topic", TestEvent::class.java)
// Both should work and create valid Flux instances
assertThat(fluxWithReified).isNotNull
assertThat(fluxWithClass).isNotNull
}
}
@Test
fun `should handle concurrent consumer creation`() {
// Test that multiple consumers can be created concurrently
val consumers = (1..10).map { index ->
val config = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "concurrent-consumer-$index"
trustedPackages = "at.mocode.*"
}
KafkaEventConsumer(config)
}
// All consumers should be created successfully
assertThat(consumers).hasSize(10)
consumers.forEach { testConsumer ->
assertThat(testConsumer).isNotNull
// Each should be able to create streams
val flux = testConsumer.receiveEvents<TestEvent>("concurrent-topic")
assertThat(flux).isNotNull
}
// Clean up all consumers
consumers.forEach { testConsumer ->
assertDoesNotThrow { testConsumer.cleanup() }
}
}
data class TestEvent(val message: String)
data class AnotherTestEvent(val data: String)
}
@@ -0,0 +1,252 @@
package at.mocode.infrastructure.messaging.client
import io.mockk.clearMocks
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import reactor.core.publisher.Mono
import reactor.kafka.sender.SenderResult
import reactor.test.StepVerifier
import java.io.IOException
import java.net.ConnectException
import java.util.concurrent.TimeoutException
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaEventPublisherErrorTest {
private lateinit var mockTemplate: ReactiveKafkaProducerTemplate<String, Any>
private lateinit var publisher: KafkaEventPublisher
@BeforeEach
fun setUp() {
mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
publisher = KafkaEventPublisher(mockTemplate)
}
@Test
fun `should retry on transient timeout errors`() {
val testEvent = TestEvent("data")
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "test-topic"
every { mockResult.recordMetadata() } returns mockRecordMetadata
// The first call fails with timeout, the second succeeds
every { mockTemplate.send("test-topic", "key", testEvent) } returns
Mono.error(TimeoutException("Connection timeout")) andThen
Mono.just(mockResult)
StepVerifier.create(publisher.publishEvent("test-topic", "key", testEvent))
.verifyComplete()
verify(exactly = 2) { mockTemplate.send("test-topic", "key", testEvent) }
}
@Test
fun `should retry on connection errors`() {
val testEvent = TestEvent("data")
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "test-topic"
every { mockResult.recordMetadata() } returns mockRecordMetadata
// First call fails with connection error, second succeeds
every { mockTemplate.send("test-topic", "key", testEvent) } returns
Mono.error(ConnectException("Connection refused")) andThen
Mono.just(mockResult)
StepVerifier.create(publisher.publishEvent("test-topic", "key", testEvent))
.verifyComplete()
verify(exactly = 2) { mockTemplate.send("test-topic", "key", testEvent) }
}
@Test
fun `should retry on IO errors`() {
val testEvent = TestEvent("data")
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "test-topic"
every { mockResult.recordMetadata() } returns mockRecordMetadata
// First call fails with IOException, second succeeds
every { mockTemplate.send("test-topic", "key", testEvent) } returns
Mono.error(IOException("Network error")) andThen
Mono.just(mockResult)
StepVerifier.create(publisher.publishEvent("test-topic", "key", testEvent))
.verifyComplete()
verify(exactly = 2) { mockTemplate.send("test-topic", "key", testEvent) }
}
@Test
fun `should not retry on serialization errors`() {
val testEvent = TestEvent("data")
every { mockTemplate.send("test-topic", "key", testEvent) } returns
Mono.error(RuntimeException("Serialization failed"))
StepVerifier.create(publisher.publishEvent("test-topic", "key", testEvent))
.verifyError(RuntimeException::class.java)
// Should only try once, no retries for serialization errors
verify(exactly = 1) { mockTemplate.send("test-topic", "key", testEvent) }
}
@Test
fun `should not retry on authentication errors`() {
val testEvent = TestEvent("data")
every { mockTemplate.send("test-topic", "key", testEvent) } returns
Mono.error(RuntimeException("Authentication failed"))
StepVerifier.create(publisher.publishEvent("test-topic", "key", testEvent))
.verifyError(RuntimeException::class.java)
// Should only try once, no retries for auth errors
verify(exactly = 1) { mockTemplate.send("test-topic", "key", testEvent) }
}
@Test
fun `should exhaust retries and fail after maximum attempts`() {
val testEvent = TestEvent("data")
// Always fail with retryable error
every { mockTemplate.send("test-topic", "key", testEvent) } returns
Mono.error(TimeoutException("Connection timeout"))
StepVerifier.create(publisher.publishEvent("test-topic", "key", testEvent))
.verifyError(TimeoutException::class.java)
// Should try 1 initial + 3 retries = 4 times total
verify(exactly = 4) { mockTemplate.send("test-topic", "key", testEvent) }
}
@Test
fun `should handle batch publishing with partial failures`() {
val events = listOf(
"key1" to TestEvent("success1"),
"key2" to TestEvent("failure"),
"key3" to TestEvent("success2")
)
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "test-topic"
every { mockResult.recordMetadata() } returns mockRecordMetadata
// First and third events succeed, second fails
every { mockTemplate.send("test-topic", "key1", any()) } returns Mono.just(mockResult)
every { mockTemplate.send("test-topic", "key2", any()) } returns
Mono.error(RuntimeException("Serialization failed"))
every { mockTemplate.send("test-topic", "key3", any()) } returns Mono.just(mockResult)
StepVerifier.create(publisher.publishEvents("test-topic", events))
.expectNextCount(2) // Should complete 2 successful sends
.verifyComplete()
// Verify all events were attempted
verify(exactly = 1) { mockTemplate.send("test-topic", "key1", any()) }
verify(exactly = 1) { mockTemplate.send("test-topic", "key2", any()) }
verify(exactly = 1) { mockTemplate.send("test-topic", "key3", any()) }
}
@Test
fun `should handle batch publishing with retryable failures`() {
val events = listOf(
"key1" to TestEvent("success"),
"key2" to TestEvent("retry-then-success")
)
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "test-topic"
every { mockResult.recordMetadata() } returns mockRecordMetadata
// First event succeeds immediately
every { mockTemplate.send("test-topic", "key1", any()) } returns Mono.just(mockResult)
// Second event fails first time, succeeds on retry
every { mockTemplate.send("test-topic", "key2", any()) } returns
Mono.error(TimeoutException("Connection timeout")) andThen
Mono.just(mockResult)
StepVerifier.create(publisher.publishEvents("test-topic", events))
.expectNextCount(2) // Should complete both events
.verifyComplete()
// First event called once, second event called twice (initial + retry)
verify(exactly = 1) { mockTemplate.send("test-topic", "key1", any()) }
verify(exactly = 2) { mockTemplate.send("test-topic", "key2", any()) }
}
@Test
fun `should handle empty batch gracefully`() {
val emptyEvents = emptyList<Pair<String?, Any>>()
StepVerifier.create(publisher.publishEvents("test-topic", emptyEvents))
.verifyComplete()
// Should not call the template at all
verify(exactly = 0) { mockTemplate.send(any(), any(), any()) }
}
@Test
fun `should identify retryable exceptions correctly`() {
// Test the private isRetryableException method through behavior
val testEvent = TestEvent("data")
// Test various error messages that should be retryable
val retryableErrors = listOf(
RuntimeException("timeout occurred"),
RuntimeException("connection refused"),
RuntimeException("network unreachable"),
TimeoutException("Request timeout"),
ConnectException("Connection failed"),
IOException("I/O error")
)
retryableErrors.forEach { error ->
clearMocks(mockTemplate)
every { mockTemplate.send("test-topic", "key", testEvent) } returns
Mono.error(error) andThen Mono.error(error) // Fail twice to test retry
StepVerifier.create(publisher.publishEvent("test-topic", "key", testEvent))
.verifyError()
// Should retry (at least 2 calls)
verify(atLeast = 2) { mockTemplate.send("test-topic", "key", testEvent) }
}
}
@Test
fun `should identify non-retryable exceptions correctly`() {
val testEvent = TestEvent("data")
// Test various error messages that should NOT be retryable
val nonRetryableErrors = listOf(
RuntimeException("serialization error"),
RuntimeException("deserialization failed"),
RuntimeException("authentication failed"),
RuntimeException("authorization denied")
)
nonRetryableErrors.forEach { error ->
clearMocks(mockTemplate)
every { mockTemplate.send("test-topic", "key", testEvent) } returns Mono.error(error)
StepVerifier.create(publisher.publishEvent("test-topic", "key", testEvent))
.verifyError()
// Should NOT retry (exactly 1 call)
verify(exactly = 1) { mockTemplate.send("test-topic", "key", testEvent) }
}
}
data class TestEvent(val message: String)
}
@@ -1,9 +1,7 @@
// KafkaIntegrationTest.kt
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.client.ReactiveKafkaConfig
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
@@ -72,22 +70,208 @@ class KafkaIntegrationTest {
.withValueDeserializer(jsonValueDeserializer)
.subscription(listOf(testTopic))
// Der Mono, der das nächste empfangene Ereignis darstellt
// The Mono that represents the next received event
val receivedEvent = KafkaReceiver.create(receiverOptions)
.receive()
.next() // Nimm nur das erste Ereignis
.map { it.value() } // Extrahiere den Wert (unsere TestEvent-Instanz)
.next() // Take only the first event
.map { it.value() } // Extract the value (our TestEvent instance)
// Der Mono, der die Sende-Aktion darstellt
// The Mono that represents the send action
val sendAction = kafkaEventPublisher.publishEvent(testTopic, testKey, testEvent)
// KORREKTUR: Kombiniere die Sende-Aktion und die Empfangs-Erwartung in einem StepVerifier.
// Die `then` Methode stellt sicher, dass erst die Sende-Aktion abgeschlossen wird,
// bevor der `receivedEvent` Mono abonniert und verifiziert wird.
// CORRECTION: Combine the send action and receive expectation in one StepVerifier.
// The `then` method ensures that the send action is completed first,
// before the `receivedEvent` Mono is subscribed and verified.
StepVerifier.create(sendAction.then(receivedEvent))
.expectNext(testEvent) // Erwarte, dass unser Test-Event ankommt
.verifyComplete() // Schließe die Überprüfung ab
.expectNext(testEvent) // Expect that our test event arrives
.verifyComplete() // Complete the verification
}
@Test
fun `publishEvents should send batch messages that can be received`() {
// Arrange
val batchSize = 10
val eventBatch = (1..batchSize).map { i ->
"batch-key-$i" to TestEvent("Batch message $i")
}
// Consumer setup
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
trustedPackages = "at.mocode.*"
}
val consumerProps = testKafkaConfig.consumerConfigs("batch-test-group-${UUID.randomUUID()}")
val jsonValueDeserializer = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, TestEvent>(consumerProps)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonValueDeserializer)
.subscription(listOf(testTopic))
// Collect received events
val receivedEvents = KafkaReceiver.create(receiverOptions)
.receive()
.take(batchSize.toLong())
.map { it.value() }
.collectList()
// Send batch and verify reception
val sendAction = kafkaEventPublisher.publishEvents(testTopic, eventBatch)
StepVerifier.create(sendAction.then(receivedEvents))
.expectNextMatches { events ->
events.size == batchSize && events.all { it.message.startsWith("Batch message") }
}
.verifyComplete()
}
@Test
fun `should handle multiple consumers on same topic`() {
val testEvent = TestEvent("Multi-consumer message")
val testKey = "multi-consumer-key"
// Setup two consumers with different group IDs
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
trustedPackages = "at.mocode.*"
}
val consumer1Props = testKafkaConfig.consumerConfigs("consumer-group-1-${UUID.randomUUID()}")
val consumer2Props = testKafkaConfig.consumerConfigs("consumer-group-2-${UUID.randomUUID()}")
val jsonDeserializer1 = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val jsonDeserializer2 = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions1 = ReceiverOptions.create<String, TestEvent>(consumer1Props)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonDeserializer1)
.subscription(listOf(testTopic))
val receiverOptions2 = ReceiverOptions.create<String, TestEvent>(consumer2Props)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonDeserializer2)
.subscription(listOf(testTopic))
val consumer1Event = KafkaReceiver.create(receiverOptions1)
.receive()
.next()
.map { it.value() }
val consumer2Event = KafkaReceiver.create(receiverOptions2)
.receive()
.next()
.map { it.value() }
val sendAction = kafkaEventPublisher.publishEvent(testTopic, testKey, testEvent)
// Both consumers should receive the same message (different groups)
StepVerifier.create(sendAction.then(consumer1Event.zipWith(consumer2Event)))
.expectNextMatches { tuple ->
tuple.t1 == testEvent && tuple.t2 == testEvent
}
.verifyComplete()
}
@Test
fun `should handle different event types in integration scenario`() {
val complexEvent = ComplexTestEvent(
id = 123,
name = "Integration Test",
metadata = mapOf("type" to "complex", "version" to "1.0"),
timestamp = System.currentTimeMillis()
)
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
trustedPackages = "at.mocode.*"
}
val consumerProps = testKafkaConfig.consumerConfigs("complex-test-group-${UUID.randomUUID()}")
val jsonValueDeserializer = JsonDeserializer(ComplexTestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, ComplexTestEvent>(consumerProps)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonValueDeserializer)
.subscription(listOf(testTopic))
val receivedEvent = KafkaReceiver.create(receiverOptions)
.receive()
.next()
.map { it.value() }
val sendAction = kafkaEventPublisher.publishEvent(testTopic, "complex-key", complexEvent)
StepVerifier.create(sendAction.then(receivedEvent))
.expectNext(complexEvent)
.verifyComplete()
}
@Test
fun `should maintain message ordering within partition`() {
val partitionKey = "ordered-messages"
val messageCount = 5
val orderedEvents = (1..messageCount).map { i ->
partitionKey to TestEvent("Ordered message $i")
}
val testKafkaConfig = KafkaConfig().apply {
bootstrapServers = kafkaContainer.bootstrapServers
trustedPackages = "at.mocode.*"
}
val consumerProps = testKafkaConfig.consumerConfigs("ordering-test-group-${UUID.randomUUID()}")
val jsonValueDeserializer = JsonDeserializer(TestEvent::class.java).apply {
addTrustedPackages(testKafkaConfig.trustedPackages)
setUseTypeHeaders(false)
}
val receiverOptions = ReceiverOptions.create<String, TestEvent>(consumerProps)
.withKeyDeserializer(StringDeserializer())
.withValueDeserializer(jsonValueDeserializer)
.subscription(listOf(testTopic))
val receivedEvents = KafkaReceiver.create(receiverOptions)
.receive()
.take(messageCount.toLong())
.map { it.value() }
.collectList()
val sendAction = kafkaEventPublisher.publishEvents(testTopic, orderedEvents)
StepVerifier.create(sendAction.then(receivedEvents))
.expectNextMatches { events ->
events.size == messageCount &&
events.mapIndexed { index, event ->
event.message == "Ordered message ${index + 1}"
}.all { it }
}
.verifyComplete()
}
@Test
fun `should handle empty batch gracefully in integration test`() {
val emptyBatch = emptyList<Pair<String?, Any>>()
StepVerifier.create(kafkaEventPublisher.publishEvents(testTopic, emptyBatch))
.verifyComplete()
}
data class TestEvent(val message: String)
data class ComplexTestEvent(
val id: Int,
val name: String,
val metadata: Map<String, String>,
val timestamp: Long
)
}
@@ -0,0 +1,318 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.client.ReactiveKafkaConfig
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaSecurityTest {
@Test
fun `should configure trusted packages correctly for JSON deserializer`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.*,com.example.*"
}
val consumerConfigs = config.consumerConfigs("security-test-group")
// Verify trusted packages configuration
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES]).isEqualTo("at.mocode.*,com.example.*")
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
}
@Test
fun `should reject empty trusted packages configuration`() {
val config = KafkaConfig()
assertThrows<IllegalArgumentException> {
config.trustedPackages = ""
}
assertThrows<IllegalArgumentException> {
config.trustedPackages = " "
}
}
@Test
fun `should validate trusted packages with various formats`() {
val config = KafkaConfig()
// Valid trusted package formats
val validPackages = listOf(
"at.mocode.*",
"at.mocode.*,com.example.*",
"java.lang.*,java.util.*",
"com.company.specific.Package",
"org.springframework.*,at.mocode.*,com.test.*"
)
validPackages.forEach { packages ->
assertDoesNotThrow {
config.trustedPackages = packages
assertThat(config.trustedPackages).isEqualTo(packages)
}
}
}
@Test
fun `should configure security features when enabled`() {
val config = KafkaConfig().apply {
enableSecurityFeatures = true
}
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs("secure-group")
// Verify security-related producer configurations
assertThat(producerConfigs[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG]).isEqualTo(true)
assertThat(producerConfigs[ProducerConfig.ACKS_CONFIG]).isEqualTo("all")
assertThat(producerConfigs[ProducerConfig.RETRIES_CONFIG]).isEqualTo(3)
// Verify security-related consumer configurations
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES]).isEqualTo("at.mocode.*")
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
assertThat(consumerConfigs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG]).isEqualTo(false)
}
@Test
fun `should configure security features when disabled`() {
val config = KafkaConfig().apply {
enableSecurityFeatures = false // Explicitly disable
}
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs("non-secure-group")
// Even when disabled, core security features should still be present
// This ensures baseline security is maintained
assertThat(producerConfigs[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG]).isEqualTo(true)
assertThat(producerConfigs[ProducerConfig.ACKS_CONFIG]).isEqualTo("all")
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES]).isEqualTo("at.mocode.*")
}
@Test
fun `should prevent JSON type header usage for security`() {
val config = KafkaConfig()
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs("header-test-group")
// Type headers should be disabled to prevent deserialization attacks
assertThat(producerConfigs[JsonSerializer.ADD_TYPE_INFO_HEADERS]).isEqualTo(false)
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
}
@Test
fun `should create secure JSON deserializer for consumer`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.*,com.test.*"
}
val consumer = KafkaEventConsumer(config)
// Test that consumer can be created with security configuration
assertThat(consumer).isNotNull
// Test that reactive streams can be created (they use secure deserializer internally)
assertDoesNotThrow {
val flux = consumer.receiveEvents<SecureTestEvent>("secure-topic")
assertThat(flux).isNotNull
}
}
@Test
fun `should handle multiple trusted package patterns`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.domain.*,at.mocode.events.*,com.example.secure.*"
}
val consumerConfigs = config.consumerConfigs("multi-pattern-group")
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES])
.isEqualTo("at.mocode.domain.*,at.mocode.events.*,com.example.secure.*")
}
@Test
fun `should enforce manual commit for better security control`() {
val config = KafkaConfig()
val consumerConfigs = config.consumerConfigs("manual-commit-group")
// Auto-commit should be disabled for better control over message processing
assertThat(consumerConfigs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG]).isEqualTo(false)
// Session and heartbeat timeouts should be configured for security
assertThat(consumerConfigs[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG]).isEqualTo(30000)
assertThat(consumerConfigs[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG]).isEqualTo(3000)
}
@Test
fun `should configure connection security settings`() {
val config = KafkaConfig()
val consumerConfigs = config.consumerConfigs("connection-security-group")
// Connection security settings
assertThat(consumerConfigs[ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG]).isEqualTo(540000)
assertThat(consumerConfigs[ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG]).isEqualTo(50)
assertThat(consumerConfigs[ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG]).isEqualTo(1000)
}
@Test
fun `should validate connection pool size for security`() {
val config = KafkaConfig()
// Valid connection pool sizes
assertDoesNotThrow { config.connectionPoolSize = 1 }
assertDoesNotThrow { config.connectionPoolSize = 5 }
assertDoesNotThrow { config.connectionPoolSize = 50 }
// Invalid connection pool sizes (security risk - too many connections)
assertThrows<IllegalArgumentException> { config.connectionPoolSize = 0 }
assertThrows<IllegalArgumentException> { config.connectionPoolSize = -1 }
}
@Test
fun `should create producer factory with secure configuration`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.*"
enableSecurityFeatures = true
}
val producerFactory = config.producerFactory()
// Verify producer factory is created successfully
assertThat(producerFactory).isNotNull
// Test creating a producer
assertDoesNotThrow {
val producer = producerFactory.createProducer()
assertThat(producer).isNotNull
}
// Verify secure configuration is applied
val configs = producerFactory.configurationProperties
assertThat(configs[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG]).isEqualTo(true)
assertThat(configs[JsonSerializer.ADD_TYPE_INFO_HEADERS]).isEqualTo(false)
}
@Test
fun `should handle security configuration for different environments`() {
// Development environment
val devConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
trustedPackages = "at.mocode.*,com.test.*"
enableSecurityFeatures = true
}
// Production environment
val prodConfig = KafkaConfig().apply {
bootstrapServers = "prod-kafka:9092"
trustedPackages = "at.mocode.*" // More restrictive
enableSecurityFeatures = true
connectionPoolSize = 20
}
// Both configurations should be valid
assertDoesNotThrow {
KafkaEventConsumer(devConfig)
KafkaEventPublisher(ReactiveKafkaConfig(devConfig).reactiveKafkaProducerTemplate())
}
assertDoesNotThrow {
KafkaEventConsumer(prodConfig)
KafkaEventPublisher(ReactiveKafkaConfig(prodConfig).reactiveKafkaProducerTemplate())
}
}
@Test
fun `should validate group ID format for security`() {
val config = KafkaConfig()
// Valid group ID prefixes
val validPrefixes = listOf(
"secure-consumer",
"production.consumer",
"dev_consumer",
"consumer-123"
)
validPrefixes.forEach { prefix ->
assertDoesNotThrow {
config.defaultGroupIdPrefix = prefix
assertThat(config.defaultGroupIdPrefix).isEqualTo(prefix)
}
}
// Invalid group ID prefixes (potential security issues)
val invalidPrefixes = listOf(
"", // Empty
" ", // Whitespace only
"invalid@consumer", // Special characters
"consumer with spaces",
"consumer/with/slashes",
"consumer#hash"
)
invalidPrefixes.forEach { prefix ->
assertThrows<IllegalArgumentException> {
config.defaultGroupIdPrefix = prefix
}
}
}
@Test
fun `should configure serialization security`() {
val config = KafkaConfig().apply {
trustedPackages = "at.mocode.*,com.secure.*"
}
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs("serialization-security-group")
// Producer serialization security
assertThat(producerConfigs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG].toString())
.isEqualTo("class org.apache.kafka.common.serialization.StringSerializer")
assertThat(producerConfigs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG].toString())
.isEqualTo("class org.springframework.kafka.support.serializer.JsonSerializer")
assertThat(producerConfigs[JsonSerializer.ADD_TYPE_INFO_HEADERS]).isEqualTo(false)
// Consumer deserialization security
assertThat(consumerConfigs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG].toString())
.isEqualTo("class org.apache.kafka.common.serialization.StringDeserializer")
assertThat(consumerConfigs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG].toString())
.isEqualTo("class org.springframework.kafka.support.serializer.JsonDeserializer")
assertThat(consumerConfigs[JsonDeserializer.TRUSTED_PACKAGES]).isEqualTo("at.mocode.*,com.secure.*")
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
}
@Test
fun `should provide secure defaults`() {
val config = KafkaConfig() // Use default values
// Verify secure defaults
assertThat(config.trustedPackages).isEqualTo("at.mocode.*")
assertThat(config.enableSecurityFeatures).isEqualTo(true)
assertThat(config.connectionPoolSize).isEqualTo(10)
assertThat(config.defaultGroupIdPrefix).isEqualTo("messaging-client")
// Verify secure configurations are applied with defaults
val producerConfigs = config.producerConfigs()
val consumerConfigs = config.consumerConfigs()
assertThat(producerConfigs[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG]).isEqualTo(true)
assertThat(producerConfigs[JsonSerializer.ADD_TYPE_INFO_HEADERS]).isEqualTo(false)
assertThat(consumerConfigs[JsonDeserializer.USE_TYPE_INFO_HEADERS]).isEqualTo(false)
assertThat(consumerConfigs[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG]).isEqualTo(false)
}
data class SecureTestEvent(
val data: String,
val timestamp: Long = System.currentTimeMillis()
)
}
@@ -0,0 +1,376 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.client.ReactiveKafkaConfig
import at.mocode.infrastructure.messaging.config.KafkaConfig
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertDoesNotThrow
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import reactor.core.publisher.Mono
import reactor.kafka.sender.SenderResult
import reactor.test.StepVerifier
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.PrintStream
import java.net.ConnectException
import java.util.concurrent.TimeoutException
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class LoggingAndMonitoringTest {
private val logger = LoggerFactory.getLogger(LoggingAndMonitoringTest::class.java)
private lateinit var kafkaConfig: KafkaConfig
private lateinit var consumer: KafkaEventConsumer
private lateinit var originalOut: PrintStream
private lateinit var testOutput: ByteArrayOutputStream
@BeforeEach
fun setUp() {
kafkaConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "logging-test-consumer"
trustedPackages = "at.mocode.*"
}
consumer = KafkaEventConsumer(kafkaConfig)
// Capture console output for log verification
originalOut = System.out
testOutput = ByteArrayOutputStream()
System.setOut(PrintStream(testOutput))
}
@Test
fun `should log structured information for consumer setup`() {
// Create consumer and set up stream - this should generate log entries
assertDoesNotThrow {
val flux = consumer.receiveEvents<LoggingTestEvent>("structured-logging-topic")
assertThat(flux).isNotNull
}
// In a real implementation, we would verify specific log entries
// For now, we verify that the setup completes without errors
val output = testOutput.toString()
// Basic verification that some logging occurred (setup methods would generate logs)
assertThat(output).isNotNull
logger.debug("Consumer setup completed successfully")
}
@Test
fun `should log retry attempts with context information`() {
val mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
val publisher = KafkaEventPublisher(mockTemplate)
val testEvent = LoggingTestEvent("retry-test", 1)
// Configure mock to fail the first few times, then succeed
every { mockTemplate.send("retry-topic", "retry-key", testEvent) } returns
Mono.error(TimeoutException("Connection timeout")) andThen
Mono.error(ConnectException("Connection refused")) andThen
Mono.just(mockk<SenderResult<Void>>())
StepVerifier.create(publisher.publishEvent("retry-topic", "retry-key", testEvent))
.verifyComplete()
// Verify retry attempts were logged
logger.debug("Retry logging test completed")
assertThat(testOutput.toString()).isNotNull
verify(exactly = 3) { mockTemplate.send("retry-topic", "retry-key", testEvent) }
}
@Test
fun `should track batch operation progress`() {
val mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
val publisher = KafkaEventPublisher(mockTemplate)
// Create a medium-sized batch to trigger progress logging
val batchSize = 250 // This should trigger progress logging at 100, 200, and final
val testBatch = (1..batchSize).map { i ->
"batch_key_$i" to LoggingTestEvent("Batch message $i", i)
}
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "batch-progress-topic"
every { mockRecordMetadata.partition() } returns 0
every { mockRecordMetadata.offset() } returns 0L
every { mockResult.recordMetadata() } returns mockRecordMetadata
every { mockTemplate.send(any(), any(), any()) } returns Mono.just(mockResult)
StepVerifier.create(publisher.publishEvents("batch-progress-topic", testBatch))
.expectNextCount(batchSize.toLong())
.verifyComplete()
logger.debug("Batch progress tracking test completed with {} events", batchSize)
// Verify that all batch items were processed
verify(exactly = batchSize) { mockTemplate.send(any(), any(), any()) }
}
@Test
fun `should log error context for failed operations`() {
val mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
val publisher = KafkaEventPublisher(mockTemplate)
val testEvent = LoggingTestEvent("error-context", 1)
// Configure mock to always fail
every { mockTemplate.send("error-topic", "error-key", testEvent) } returns
Mono.error(IOException("Network failure"))
StepVerifier.create(publisher.publishEvent("error-topic", "error-key", testEvent))
.verifyError(IOException::class.java)
logger.debug("Error context logging test completed")
// Should have attempted the operation and logged error context
verify(atLeast = 1) { mockTemplate.send("error-topic", "error-key", testEvent) }
}
@Test
fun `should log performance metrics for operations`() {
val mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
val publisher = KafkaEventPublisher(mockTemplate)
val testEvents = (1..50).map { i ->
"perf_key_$i" to LoggingTestEvent("Performance test $i", i)
}
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "performance-metrics-topic"
every { mockRecordMetadata.partition() } returns 0
every { mockRecordMetadata.offset() } returns 0L
every { mockResult.recordMetadata() } returns mockRecordMetadata
every { mockTemplate.send(any(), any(), any()) } returns Mono.just(mockResult)
val startTime = System.currentTimeMillis()
StepVerifier.create(publisher.publishEvents("performance-metrics-topic", testEvents))
.expectNextCount(50)
.verifyComplete()
val duration = System.currentTimeMillis() - startTime
logger.debug("Performance metrics: 50 events published in {}ms", duration)
logger.debug("Average time per event: {}ms", duration.toDouble() / 50)
// Performance should be reasonable
assertThat(duration).isLessThan(10000) // Within 10 seconds
}
@Test
fun `should log consumer group and partition information`() {
// Create consumer flux - this should generate group ID and partition logs
val flux = consumer.receiveEvents<LoggingTestEvent>("partition-info-topic")
// The act of creating the flux should generate logging about group assignment
assertThat(flux).isNotNull
logger.debug("Consumer group and partition logging test completed")
logger.debug("Expected group ID pattern: {}-partition-info-topic-loggingtesteevent", kafkaConfig.defaultGroupIdPrefix)
// Verify consumer was created successfully
assertDoesNotThrow {
consumer.cleanup()
}
}
@Test
fun `should log different event types with structured information`() {
val mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
val publisher = KafkaEventPublisher(mockTemplate)
// Test with different event types
val mockResult = mockk<SenderResult<Void>>()
every { mockTemplate.send(any(), any(), any()) } returns Mono.just(mockResult)
val testEvents = listOf(
LoggingTestEvent("string event", 1),
ComplexLoggingEvent("complex", 123, mapOf("key" to "value")),
NumericLoggingEvent(42, 3.14, System.currentTimeMillis())
)
testEvents.forEachIndexed { index, event ->
StepVerifier.create(publisher.publishEvent("event-types-topic", "key_$index", event))
.verifyComplete()
logger.debug("Published event type: {}", event::class.simpleName)
}
verify(exactly = testEvents.size) { mockTemplate.send(any(), any(), any()) }
}
@Test
fun `should log retry exhaustion with final error details`() {
val mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
val publisher = KafkaEventPublisher(mockTemplate)
val testEvent = LoggingTestEvent("retry-exhaustion", 1)
// Configure mock to always fail with retryable error
every { mockTemplate.send("exhaustion-topic", "exhaustion-key", testEvent) } returns
Mono.error(TimeoutException("Persistent timeout"))
StepVerifier.create(publisher.publishEvent("exhaustion-topic", "exhaustion-key", testEvent))
.verifyError(TimeoutException::class.java)
logger.debug("Retry exhaustion logging test completed")
// Should have attempted maximum retries (1 initial + 3 retries = 4 total)
verify(exactly = 4) { mockTemplate.send("exhaustion-topic", "exhaustion-key", testEvent) }
}
@Test
fun `should log startup and configuration information`() {
// Test that consumer startup logs configuration details
val customConfig = KafkaConfig().apply {
bootstrapServers = "test-server:9092"
defaultGroupIdPrefix = "config-logging-test"
trustedPackages = "at.mocode.*,com.test.*"
enableSecurityFeatures = true
connectionPoolSize = 15
}
val customConsumer = KafkaEventConsumer(customConfig)
val customReactiveConfig = ReactiveKafkaConfig(customConfig)
assertDoesNotThrow {
val template = customReactiveConfig.reactiveKafkaProducerTemplate()
assertThat(template).isNotNull
}
logger.debug("Configuration logging test completed")
logger.debug("Bootstrap servers: {}", customConfig.bootstrapServers)
logger.debug("Group ID prefix: {}", customConfig.defaultGroupIdPrefix)
logger.debug("Trusted packages: {}", customConfig.trustedPackages)
logger.debug("Security features enabled: {}", customConfig.enableSecurityFeatures)
logger.debug("Connection pool size: {}", customConfig.connectionPoolSize)
customConsumer.cleanup()
}
@Test
fun `should log resource cleanup operations`() {
val tempConsumer = KafkaEventConsumer(kafkaConfig)
// Create some reactive streams to establish resources
val flux1 = tempConsumer.receiveEvents<LoggingTestEvent>("cleanup-topic-1")
val flux2 = tempConsumer.receiveEvents<LoggingTestEvent>("cleanup-topic-2")
assertThat(flux1).isNotNull
assertThat(flux2).isNotNull
logger.debug("Resources created for cleanup test")
// Cleanup should log resource cleanup operations
assertDoesNotThrow {
tempConsumer.cleanup()
}
logger.debug("Resource cleanup test completed")
}
@Test
fun `should handle logging under concurrent access`() {
val mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
val publisher = KafkaEventPublisher(mockTemplate)
val mockResult = mockk<SenderResult<Void>>()
val mockRecordMetadata = mockk<org.apache.kafka.clients.producer.RecordMetadata>()
every { mockRecordMetadata.topic() } returns "concurrent-logging-topic"
every { mockRecordMetadata.partition() } returns 0
every { mockRecordMetadata.offset() } returns 0L
every { mockResult.recordMetadata() } returns mockRecordMetadata
every { mockTemplate.send(any(), any(), any()) } returns Mono.just(mockResult)
// Create concurrent publishing operations
val concurrentEvents = (1..20).map { i ->
publisher.publishEvent("concurrent-logging-topic", "concurrent_key_$i",
LoggingTestEvent("Concurrent message $i", i))
}
StepVerifier.create(reactor.core.publisher.Flux.merge(concurrentEvents))
.expectNextCount(20)
.verifyComplete()
logger.debug("Concurrent logging test completed with 20 concurrent operations")
verify(exactly = 20) { mockTemplate.send(any(), any(), any()) }
}
@Test
fun `should log timestamp and correlation information`() {
val mockTemplate = mockk<ReactiveKafkaProducerTemplate<String, Any>>()
val publisher = KafkaEventPublisher(mockTemplate)
val mockResult = mockk<SenderResult<Void>>()
every { mockTemplate.send(any(), any(), any()) } returns Mono.just(mockResult)
val timestampedEvent = LoggingTestEvent("timestamped", 1)
val beforePublish = System.currentTimeMillis()
StepVerifier.create(publisher.publishEvent("timestamp-topic", "timestamp-key", timestampedEvent))
.verifyComplete()
val afterPublish = System.currentTimeMillis()
logger.debug("Event published with timestamp correlation")
logger.debug("Publish window: {} to {} ({}ms)", beforePublish, afterPublish, afterPublish - beforePublish)
verify(exactly = 1) { mockTemplate.send("timestamp-topic", "timestamp-key", timestampedEvent) }
}
@Test
fun `should provide debug information for troubleshooting`() {
// Create various configurations and operations to generate debug logs
val debugConfig = KafkaConfig().apply {
bootstrapServers = "debug-server:9092"
defaultGroupIdPrefix = "debug-test"
}
val debugConsumer = KafkaEventConsumer(debugConfig)
val debugFlux = debugConsumer.receiveEvents<LoggingTestEvent>("debug-topic")
logger.debug("Debug configuration created")
logger.debug("Consumer group ID would be: debug-test-debug-topic-loggingtesteevent")
logger.debug("Bootstrap servers: debug-server:9092")
assertThat(debugFlux).isNotNull
debugConsumer.cleanup()
logger.debug("Debug cleanup completed")
}
@AfterEach
fun tearDown() {
// Restore original output
System.setOut(originalOut)
consumer.cleanup()
}
data class LoggingTestEvent(
val message: String,
val sequenceNumber: Int,
val timestamp: Long = System.currentTimeMillis()
)
data class ComplexLoggingEvent(
val name: String,
val id: Int,
val metadata: Map<String, String>
)
data class NumericLoggingEvent(
val intValue: Int,
val doubleValue: Double,
val timestamp: Long
)
}
@@ -0,0 +1,365 @@
package at.mocode.infrastructure.messaging.client
import at.mocode.infrastructure.messaging.config.KafkaConfig
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertDoesNotThrow
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.test.StepVerifier
import reactor.test.publisher.TestPublisher
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ReactiveStreamTest {
private val logger = LoggerFactory.getLogger(ReactiveStreamTest::class.java)
private lateinit var kafkaConfig: KafkaConfig
private lateinit var consumer: KafkaEventConsumer
@BeforeEach
fun setUp() {
kafkaConfig = KafkaConfig().apply {
bootstrapServers = "localhost:9092"
defaultGroupIdPrefix = "reactive-test-consumer"
trustedPackages = "at.mocode.*"
}
consumer = KafkaEventConsumer(kafkaConfig)
}
@Test
fun `should create cold streams that start on subscription`() {
// Cold streams should not start processing until subscribed
val flux = consumer.receiveEvents<ReactiveTestEvent>("cold-stream-topic")
// Stream should be created but not started
assertThat(flux).isNotNull
// No subscription means no processing should begin.
// This is verified by the fact that creating the flux doesn't throw or block
assertDoesNotThrow {
val anotherFlux = consumer.receiveEvents<ReactiveTestEvent>("another-cold-topic")
assertThat(anotherFlux).isNotNull
}
}
@Test
fun `should handle multiple subscribers to same stream`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("multi-subscriber-topic")
// Multiple subscribers should be able to subscribe to the same flux
val subscriber1 = StepVerifier.create(flux.take(1).timeout(Duration.ofSeconds(2)))
val subscriber2 = StepVerifier.create(flux.take(1).timeout(Duration.ofSeconds(2)))
// Both subscribers should be created without issues
// Note: In real Kafka usage, each subscriber would get their own consumer group
assertDoesNotThrow {
subscriber1.thenCancel().verify(Duration.ofSeconds(1))
subscriber2.thenCancel().verify(Duration.ofSeconds(1))
}
}
@Test
fun `should support reactive operators and transformations`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("transformation-topic")
// Apply various reactive operators
val transformedFlux = flux
.filter { event -> event.message.contains("important") }
.map { event -> event.message.uppercase() }
.distinctUntilChanged()
.take(5)
assertThat(transformedFlux).isNotNull
// Should be able to subscribe to transformed flux
val verifier = StepVerifier.create(transformedFlux.timeout(Duration.ofSeconds(2)))
assertDoesNotThrow {
verifier.thenCancel().verify(Duration.ofSeconds(1))
}
}
@Test
fun `should handle backpressure gracefully`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("backpressure-topic")
// Simulate slow consumer to test backpressure
val slowProcessingFlux = flux
.concatMap { event ->
Mono.delay(Duration.ofMillis(100))
.map { event }
}
.take(3)
val startTime = System.currentTimeMillis()
StepVerifier.create(slowProcessingFlux.timeout(Duration.ofSeconds(5)))
.thenCancel()
.verify(Duration.ofSeconds(2))
val duration = System.currentTimeMillis() - startTime
// Should handle backpressure without blocking indefinitely
assertThat(duration).isLessThan(3000)
}
@Test
fun `should maintain stream characteristics under error conditions`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("error-resilience-topic")
// Add error handling and recovery
val resilientFlux = flux
.onErrorResume { error ->
// Log error and continue with an empty stream
logger.debug("Handled error in stream: {}", error.message)
Flux.empty()
}
.retry(2)
.take(1)
StepVerifier.create(resilientFlux.timeout(Duration.ofSeconds(3)))
.thenCancel()
.verify(Duration.ofSeconds(2))
// Stream should remain reactive even after error handling
assertThat(resilientFlux).isNotNull
}
@Test
fun `should support concurrent stream processing`() {
val flux1 = consumer.receiveEvents<ReactiveTestEvent>("concurrent-topic-1")
val flux2 = consumer.receiveEvents<ReactiveTestEvent>("concurrent-topic-2")
val flux3 = consumer.receiveEvents<ReactiveTestEvent>("concurrent-topic-3")
// Process multiple streams concurrently
val combinedFlux = Flux.merge(
flux1.subscribeOn(Schedulers.parallel()),
flux2.subscribeOn(Schedulers.parallel()),
flux3.subscribeOn(Schedulers.parallel())
).take(3)
StepVerifier.create(combinedFlux.timeout(Duration.ofSeconds(3)))
.thenCancel()
.verify(Duration.ofSeconds(2))
// All streams should be processable concurrently
assertThat(combinedFlux).isNotNull
}
@Test
fun `should handle stream lifecycle correctly`() {
val eventCounter = AtomicInteger(0)
val flux = consumer.receiveEvents<ReactiveTestEvent>("lifecycle-topic")
// Add lifecycle monitoring
val monitoredFlux = flux
.doOnSubscribe { subscription ->
logger.debug("Stream subscribed: {}", subscription)
}
.doOnNext { event ->
val count = eventCounter.incrementAndGet()
logger.debug("Processed event #{}: {}", count, event.message)
}
.doOnCancel {
logger.debug("Stream cancelled")
}
.doOnComplete {
logger.debug("Stream completed")
}
.take(1)
StepVerifier.create(monitoredFlux.timeout(Duration.ofSeconds(2)))
.thenCancel()
.verify(Duration.ofSeconds(1))
// Lifecycle should be properly managed
assertThat(monitoredFlux).isNotNull
}
@Test
fun `should support flow control mechanisms`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("flow-control-topic")
// Apply various flow control mechanisms
val controlledFlux = flux
.limitRate(10) // Limit upstream requests
.sample(Duration.ofMillis(100)) // Sample at fixed intervals
.buffer(5) // Buffer elements
.flatMap { buffer ->
logger.debug("Processing buffer of size: {}", buffer.size)
Flux.fromIterable(buffer)
}
.take(5)
StepVerifier.create(controlledFlux.timeout(Duration.ofSeconds(3)))
.thenCancel()
.verify(Duration.ofSeconds(2))
assertThat(controlledFlux).isNotNull
}
@Test
fun `should handle time-based operations`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("time-based-topic")
// Apply time-based operations
val timedFlux = flux
.window(Duration.ofMillis(200)) // Window by time
.flatMap { window ->
window.collectList()
.map { events ->
logger.debug("Window contains {} events", events.size)
events.size
}
}
.take(2)
StepVerifier.create(timedFlux.timeout(Duration.ofSeconds(3)))
.thenCancel()
.verify(Duration.ofSeconds(2))
assertThat(timedFlux).isNotNull
}
@Test
fun `should maintain thread safety in reactive streams`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("thread-safety-topic")
val processedCount = AtomicLong(0)
val latch = CountDownLatch(3)
// Process on multiple threads
val threadSafeFlux = flux
.publishOn(Schedulers.parallel())
.doOnNext { event ->
val count = processedCount.incrementAndGet()
logger.debug("Thread {} processed event #{}", Thread.currentThread().name, count)
latch.countDown()
}
.take(3)
// Subscribe and wait briefly
val subscription = threadSafeFlux
.timeout(Duration.ofSeconds(2))
.subscribe(
{ event -> /* processed */ },
{ error -> logger.debug("Error: {}", error.message) },
{ logger.debug("Stream completed") }
)
// Wait for brief processing or timeout
val completed = latch.await(1, TimeUnit.SECONDS)
subscription.dispose()
// Thread safety should be maintained (no exceptions thrown)
assertThat(subscription).isNotNull
}
@Test
fun `should support custom schedulers`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("scheduler-topic")
// Use different schedulers for different operations
val scheduledFlux = flux
.subscribeOn(Schedulers.boundedElastic()) // For I/O operations
.publishOn(Schedulers.parallel()) // For CPU-intensive operations
.map { event ->
logger.debug("Processing on thread: {}", Thread.currentThread().name)
event.message.length
}
.subscribeOn(Schedulers.single()) // Single-threaded subscription
.take(1)
StepVerifier.create(scheduledFlux.timeout(Duration.ofSeconds(2)))
.thenCancel()
.verify(Duration.ofSeconds(1))
assertThat(scheduledFlux).isNotNull
}
@Test
fun `should handle stream composition and chaining`() {
val flux1 = consumer.receiveEvents<ReactiveTestEvent>("composition-topic-1")
val flux2 = consumer.receiveEvents<ReactiveTestEvent>("composition-topic-2")
// Compose multiple streams
val composedFlux = flux1
.switchMap { event1 ->
flux2.map { event2 ->
logger.debug("Composed: {} -> {}", event1.message, event2.message)
"${event1.message}+${event2.message}"
}
}
.take(1)
StepVerifier.create(composedFlux.timeout(Duration.ofSeconds(2)))
.thenCancel()
.verify(Duration.ofSeconds(1))
assertThat(composedFlux).isNotNull
}
@Test
fun `should support reactive testing patterns`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("testing-patterns-topic")
// Use TestPublisher to simulate controlled event emission
val testPublisher = TestPublisher.create<ReactiveTestEvent>()
val testFlux = testPublisher.flux()
// Apply similar transformations as the real flux
val transformedTestFlux = testFlux
.filter { event -> event.message.isNotEmpty() }
.map { event -> event.message.length }
// Test with controlled emissions
StepVerifier.create(transformedTestFlux)
.then { testPublisher.next(ReactiveTestEvent("test", 1)) }
.expectNext(4) // "test".length
.then { testPublisher.complete() }
.verifyComplete()
// Real flux should also be testable
assertThat(flux).isNotNull
}
@Test
fun `should handle resource cleanup properly`() {
val flux = consumer.receiveEvents<ReactiveTestEvent>("cleanup-topic")
val resourcesAcquired = AtomicInteger(0)
val resourcesReleased = AtomicInteger(0)
val resourceManagedFlux = flux
.doOnSubscribe {
resourcesAcquired.incrementAndGet()
logger.debug("Resources acquired: {}", resourcesAcquired.get())
}
.doFinally { signalType ->
resourcesReleased.incrementAndGet()
logger.debug("Resources released on {}: {}", signalType, resourcesReleased.get())
}
.take(1)
StepVerifier.create(resourceManagedFlux.timeout(Duration.ofSeconds(2)))
.thenCancel()
.verify(Duration.ofSeconds(1))
// Resource management should be handled properly
// Note: In a real scenario, we'd verify that resources are properly cleaned up
assertThat(resourceManagedFlux).isNotNull
}
data class ReactiveTestEvent(
val message: String,
val sequenceNumber: Int,
val timestamp: Long = System.currentTimeMillis()
)
}
@@ -13,6 +13,8 @@ import org.springframework.kafka.support.serializer.JsonSerializer
*
* This class can be instantiated programmatically (as done in tests) or
* registered as a Spring @Configuration with @Bean methods in an application context.
*
* Enhanced with configuration validation and additional optimization settings.
*/
class KafkaConfig {
@@ -20,17 +22,52 @@ class KafkaConfig {
* Comma-separated list of host:port pairs used for establishing the initial connection to the Kafka cluster.
*/
var bootstrapServers: String = "localhost:9092"
set(value) {
require(value.isNotBlank()) { "Bootstrap servers cannot be blank" }
// Support both simple format (host:port) and protocol-prefixed format (PLAINTEXT://host:port)
val isValidFormat = value.matches(Regex("^[a-zA-Z0-9._-]+:[0-9]+(,[a-zA-Z0-9._-]+:[0-9]+)*$")) ||
value.matches(Regex("^[A-Z]+://[a-zA-Z0-9._-]+:[0-9]+(,[A-Z]+://[a-zA-Z0-9._-]+:[0-9]+)*$"))
require(isValidFormat) {
"Bootstrap servers must be in format 'host:port' or 'PROTOCOL://host:port'"
}
field = value
}
/**
* Default consumer group ID prefix.
*/
var defaultGroupIdPrefix: String = "messaging-client"
set(value) {
require(value.isNotBlank()) { "Default group ID prefix cannot be blank" }
require(value.matches(Regex("^[a-zA-Z0-9._-]+$"))) {
"Group ID prefix must contain only alphanumeric characters, dots, underscores, and hyphens"
}
field = value
}
/**
* Comma-separated list of trusted packages for JSON deserialization security.
* Default restricts to application packages only.
*/
var trustedPackages: String = "at.mocode.*"
set(value) {
require(value.isNotBlank()) { "Trusted packages cannot be blank" }
field = value
}
/**
* Enable additional security features for production environments.
*/
var enableSecurityFeatures: Boolean = true
/**
* Connection pool size for better resource management.
*/
var connectionPoolSize: Int = 10
set(value) {
require(value > 0) { "Connection pool size must be positive" }
field = value
}
/**
* Optimized producer properties with performance tuning and reliability settings.
@@ -0,0 +1,147 @@
package at.mocode.infrastructure.messaging.config
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaConfigTest {
@Test
fun `should validate bootstrap servers format`() {
val config = KafkaConfig()
// Valid formats
assertDoesNotThrow { config.bootstrapServers = "localhost:9092" }
assertDoesNotThrow { config.bootstrapServers = "PLAINTEXT://localhost:9092" }
assertDoesNotThrow { config.bootstrapServers = "host1:9092,host2:9092" }
assertDoesNotThrow { config.bootstrapServers = "PLAINTEXT://host1:9092,PLAINTEXT://host2:9092" }
assertDoesNotThrow { config.bootstrapServers = "kafka.example.com:9092" }
assertDoesNotThrow { config.bootstrapServers = "kafka-cluster-01.internal:9092" }
// Invalid formats
assertThrows<IllegalArgumentException> { config.bootstrapServers = "" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = " " }
assertThrows<IllegalArgumentException> { config.bootstrapServers = "invalid-format" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = "localhost" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = ":9092" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = "localhost:" }
assertThrows<IllegalArgumentException> { config.bootstrapServers = "localhost:abc" }
}
@Test
fun `should validate group ID prefix`() {
val config = KafkaConfig()
// Valid prefixes
assertDoesNotThrow { config.defaultGroupIdPrefix = "valid-prefix_123" }
assertDoesNotThrow { config.defaultGroupIdPrefix = "messaging-client" }
assertDoesNotThrow { config.defaultGroupIdPrefix = "test.group.id" }
assertDoesNotThrow { config.defaultGroupIdPrefix = "simple123" }
// Invalid prefixes
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "" }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = " " }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "invalid@prefix" }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "invalid#prefix" }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "invalid prefix" }
assertThrows<IllegalArgumentException> { config.defaultGroupIdPrefix = "invalid/prefix" }
}
@Test
fun `should validate trusted packages`() {
val config = KafkaConfig()
// Valid trusted packages
assertDoesNotThrow { config.trustedPackages = "at.mocode.*,com.example.*" }
assertDoesNotThrow { config.trustedPackages = "at.mocode.*" }
assertDoesNotThrow { config.trustedPackages = "com.example.specific.Package" }
assertDoesNotThrow { config.trustedPackages = "java.lang.*,java.util.*" }
// Invalid trusted packages
assertThrows<IllegalArgumentException> { config.trustedPackages = "" }
assertThrows<IllegalArgumentException> { config.trustedPackages = " " }
}
@Test
fun `should validate connection pool size`() {
val config = KafkaConfig()
// Valid pool sizes
assertDoesNotThrow { config.connectionPoolSize = 1 }
assertDoesNotThrow { config.connectionPoolSize = 5 }
assertDoesNotThrow { config.connectionPoolSize = 10 }
assertDoesNotThrow { config.connectionPoolSize = 100 }
// Invalid pool sizes
assertThrows<IllegalArgumentException> { config.connectionPoolSize = 0 }
assertThrows<IllegalArgumentException> { config.connectionPoolSize = -1 }
assertThrows<IllegalArgumentException> { config.connectionPoolSize = -10 }
}
@Test
fun `should have default values set correctly`() {
val config = KafkaConfig()
assertThat(config.bootstrapServers).isEqualTo("localhost:9092")
assertThat(config.defaultGroupIdPrefix).isEqualTo("messaging-client")
assertThat(config.trustedPackages).isEqualTo("at.mocode.*")
assertThat(config.enableSecurityFeatures).isEqualTo(true)
assertThat(config.connectionPoolSize).isEqualTo(10)
}
@Test
fun `should generate valid producer configs`() {
val config = KafkaConfig()
val producerConfigs = config.producerConfigs()
// Verify essential producer configuration
assertThat(producerConfigs["bootstrap.servers"]).isEqualTo("localhost:9092")
assertThat(producerConfigs["key.serializer"]).isEqualTo(org.apache.kafka.common.serialization.StringSerializer::class.java)
assertThat(producerConfigs["value.serializer"]).isEqualTo(org.springframework.kafka.support.serializer.JsonSerializer::class.java)
assertThat(producerConfigs["acks"]).isEqualTo("all")
assertThat(producerConfigs["enable.idempotence"]).isEqualTo(true)
}
@Test
fun `should generate valid consumer configs with custom group ID`() {
val config = KafkaConfig()
val customGroupId = "test-group-123"
val consumerConfigs = config.consumerConfigs(customGroupId)
// Verify essential consumer configuration
assertThat(consumerConfigs["bootstrap.servers"]).isEqualTo("localhost:9092")
assertThat(consumerConfigs["group.id"]).isEqualTo(customGroupId)
assertThat(consumerConfigs["key.deserializer"]).isEqualTo(org.apache.kafka.common.serialization.StringDeserializer::class.java)
assertThat(consumerConfigs["value.deserializer"]).isEqualTo(org.springframework.kafka.support.serializer.JsonDeserializer::class.java)
assertThat(consumerConfigs["spring.json.trusted.packages"]).isEqualTo("at.mocode.*")
assertThat(consumerConfigs["auto.offset.reset"]).isEqualTo("earliest")
assertThat(consumerConfigs["enable.auto.commit"]).isEqualTo(false)
}
@Test
fun `should generate unique consumer configs when no group ID provided`() {
val config = KafkaConfig()
val consumerConfigs1 = config.consumerConfigs()
val consumerConfigs2 = config.consumerConfigs()
// Group IDs should be different (timestamp-based)
val groupId1 = consumerConfigs1["group.id"].toString()
val groupId2 = consumerConfigs2["group.id"].toString()
assertThat(groupId1).isNotEqualTo(groupId2)
assertThat(groupId1).startsWith("messaging-client-")
assertThat(groupId2).startsWith("messaging-client-")
}
@Test
fun `should create producer factory with correct configuration`() {
val config = KafkaConfig()
val producerFactory = config.producerFactory()
assertDoesNotThrow { producerFactory.createProducer() }
assertThat(producerFactory.configurationProperties["bootstrap.servers"]).isEqualTo("localhost:9092")
}
}