refactor(infra-monitoring)
refactor(infra-gateway)
This commit is contained in:
+12
-8
@@ -1,18 +1,22 @@
|
||||
package at.mocode.infrastructure.messaging.client
|
||||
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.kafka.core.ProducerFactory
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
||||
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
|
||||
import reactor.kafka.sender.SenderOptions
|
||||
|
||||
@Configuration
|
||||
/**
|
||||
* Reactive Kafka configuration utilities for creating a ReactiveKafkaProducerTemplate.
|
||||
*/
|
||||
class ReactiveKafkaConfig {
|
||||
|
||||
@Bean
|
||||
fun reactiveKafkaProducerTemplate(producerFactory: ProducerFactory<String, Any>): ReactiveKafkaProducerTemplate<String, Any> {
|
||||
// Nutzt die ProducerFactory aus dem messaging-config-Modul
|
||||
val senderOptions = SenderOptions.create<String, Any>(producerFactory.configurationProperties)
|
||||
/**
|
||||
* Create a ReactiveKafkaProducerTemplate using the configuration from the given ProducerFactory.
|
||||
*/
|
||||
fun reactiveKafkaProducerTemplate(
|
||||
producerFactory: DefaultKafkaProducerFactory<String, Any>
|
||||
): ReactiveKafkaProducerTemplate<String, Any> {
|
||||
val props: Map<String, Any> = producerFactory.configurationProperties
|
||||
val senderOptions: SenderOptions<String, Any> = SenderOptions.create(props)
|
||||
return ReactiveKafkaProducerTemplate(senderOptions)
|
||||
}
|
||||
}
|
||||
|
||||
+12
-3
@@ -36,7 +36,7 @@ class KafkaIntegrationTest {
|
||||
val kafkaConfig = KafkaConfig().apply {
|
||||
bootstrapServers = kafkaContainer.bootstrapServers
|
||||
}
|
||||
producerFactory = kafkaConfig.producerFactory() as DefaultKafkaProducerFactory<String, Any>
|
||||
producerFactory = kafkaConfig.producerFactory()
|
||||
|
||||
val reactiveKafkaConfig = ReactiveKafkaConfig()
|
||||
val reactiveTemplate = reactiveKafkaConfig.reactiveKafkaProducerTemplate(producerFactory)
|
||||
@@ -60,9 +60,18 @@ class KafkaIntegrationTest {
|
||||
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
|
||||
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
|
||||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
|
||||
JsonDeserializer.TRUSTED_PACKAGES to "*"
|
||||
JsonDeserializer.TRUSTED_PACKAGES to "*",
|
||||
JsonDeserializer.USE_TYPE_INFO_HEADERS to false,
|
||||
JsonDeserializer.VALUE_DEFAULT_TYPE to TestEvent::class.java.name
|
||||
)
|
||||
val receiverOptions = ReceiverOptions.create<String, TestEvent>(consumerProps).subscription(listOf(testTopic))
|
||||
|
||||
val jsonValueDeserializer = JsonDeserializer(TestEvent::class.java).apply {
|
||||
addTrustedPackages("*")
|
||||
}
|
||||
val receiverOptions = ReceiverOptions.create<String, TestEvent>(consumerProps)
|
||||
.withKeyDeserializer(StringDeserializer())
|
||||
.withValueDeserializer(jsonValueDeserializer)
|
||||
.subscription(listOf(testTopic))
|
||||
|
||||
// Der Mono, der das nächste empfangene Ereignis darstellt
|
||||
val receivedEvent = KafkaReceiver.create(receiverOptions)
|
||||
|
||||
+24
-43
@@ -1,57 +1,38 @@
|
||||
package at.mocode.infrastructure.messaging.config
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
||||
import org.apache.kafka.clients.producer.ProducerConfig
|
||||
import org.apache.kafka.common.serialization.StringDeserializer
|
||||
import org.apache.kafka.common.serialization.StringSerializer
|
||||
import org.springframework.beans.factory.annotation.Value
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.core.ProducerFactory
|
||||
import org.springframework.kafka.support.serializer.JsonDeserializer
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer
|
||||
|
||||
@Configuration
|
||||
/**
|
||||
* Central Kafka producer configuration used across modules.
|
||||
*
|
||||
* This class can be instantiated programmatically (as done in tests) or
|
||||
* registered as a Spring @Configuration with @Bean methods in an application context.
|
||||
*/
|
||||
class KafkaConfig {
|
||||
|
||||
// KORREKTUR: Von lateinit zu einer public var mit Standardwert, um Tests zu ermöglichen
|
||||
@Value($$"${spring.kafka.bootstrap-servers:localhost:9092}")
|
||||
/**
|
||||
* Comma-separated list of host:port pairs used for establishing the initial connection to the Kafka cluster.
|
||||
*/
|
||||
var bootstrapServers: String = "localhost:9092"
|
||||
|
||||
@Value("\${spring.kafka.consumer.group-id:meldestelle-group}")
|
||||
private lateinit var consumerGroupId: String
|
||||
/**
|
||||
* Common producer properties with sensible defaults (String keys, JSON values).
|
||||
*/
|
||||
fun producerConfigs(): Map<String, Any> = mapOf(
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
|
||||
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
|
||||
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
|
||||
// Avoid adding type info headers; keeps payloads simple and interoperable.
|
||||
JsonSerializer.ADD_TYPE_INFO_HEADERS to false
|
||||
)
|
||||
|
||||
@Bean
|
||||
fun producerFactory(): ProducerFactory<String, Any> {
|
||||
val configProps = mapOf(
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
|
||||
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
|
||||
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
|
||||
ProducerConfig.ACKS_CONFIG to "all",
|
||||
ProducerConfig.RETRIES_CONFIG to 3,
|
||||
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
|
||||
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1
|
||||
)
|
||||
return DefaultKafkaProducerFactory(configProps)
|
||||
}
|
||||
|
||||
@Bean
|
||||
fun kafkaTemplate(producerFactory: ProducerFactory<String, Any>): KafkaTemplate<String, Any> {
|
||||
return KafkaTemplate(producerFactory)
|
||||
}
|
||||
|
||||
// NEU: Stellt eine zentrale Map mit den Basis-Konfigurationen für alle Consumer bereit.
|
||||
@Bean
|
||||
fun kafkaConsumerConfiguration(): Map<String, Any> {
|
||||
return mapOf(
|
||||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
|
||||
ConsumerConfig.GROUP_ID_CONFIG to consumerGroupId,
|
||||
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
|
||||
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
|
||||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", // Beginne davon am Anfang, wenn kein Offset existiert
|
||||
JsonDeserializer.TRUSTED_PACKAGES to "*" // Erlaube Deserialisierung aller unserer Klassen
|
||||
)
|
||||
}
|
||||
/**
|
||||
* Strongly typed producer factory to avoid unchecked casts in consumers/tests.
|
||||
*/
|
||||
fun producerFactory(): DefaultKafkaProducerFactory<String, Any> =
|
||||
DefaultKafkaProducerFactory(producerConfigs())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user