refactor: replace Redis references with Valkey in tests and cache modules
Updated test cases in `ValkeyEventStoreTest` and cache implementation in `ValkeyDistributedCache` to fully transition from Redis to Valkey. Adjusted configurations, templates, connection handling, and exception management to reflect Valkey-specific behavior and APIs.
This commit is contained in:
@@ -30,6 +30,11 @@ dependencies {
|
||||
// OPTIMIERUNG: Wiederverwendung des `valkey-cache`-Bundles, da es die
|
||||
// gleichen Technologien (Spring Data Valkey, Lettuce, Jackson) verwendet
|
||||
implementation(libs.bundles.valkey.cache)
|
||||
// Benötigt für Lettuce-basierten Valkey-Client (LettuceConnectionFactory)
|
||||
implementation(libs.lettuce.core)
|
||||
// Für Boot-Autoconfiguration-Annotations (z. B. @ConditionalOnMissingBean,
|
||||
// @ConfigurationProperties, @EnableConfigurationProperties)
|
||||
implementation("org.springframework.boot:spring-boot-autoconfigure")
|
||||
// Stellt Jakarta Annotations bereit (z. B. @PostConstruct), die von Spring verwendet werden
|
||||
implementation(libs.jakarta.annotation.api)
|
||||
// Für Kotlin-spezifische Coroutines-Integration mit Spring
|
||||
|
||||
+240
-236
@@ -2,12 +2,16 @@ package at.mocode.infrastructure.eventstore.valkey
|
||||
|
||||
import at.mocode.core.domain.event.DomainEvent
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import io.valkey.springframework.data.valkey.connection.stream.Consumer
|
||||
import io.valkey.springframework.data.valkey.connection.stream.MapRecord
|
||||
import io.valkey.springframework.data.valkey.connection.stream.ReadOffset
|
||||
import io.valkey.springframework.data.valkey.connection.stream.StreamOffset
|
||||
import io.valkey.springframework.data.valkey.connection.stream.StreamReadOptions
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import jakarta.annotation.PostConstruct
|
||||
import jakarta.annotation.PreDestroy
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.data.domain.Range
|
||||
import org.springframework.data.redis.connection.stream.*
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import org.springframework.scheduling.annotation.Scheduled
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
@@ -16,272 +20,272 @@ import java.util.concurrent.CopyOnWriteArrayList
|
||||
* Consumer for Valkey Streams that processes events using consumer groups.
|
||||
*/
|
||||
class ValkeyEventConsumer(
|
||||
private val valkeyTemplate: StringRedisTemplate,
|
||||
private val valkeyTemplate: StringValkeyTemplate,
|
||||
private val serializer: EventSerializer,
|
||||
private val properties: ValkeyEventStoreProperties
|
||||
) {
|
||||
private val logger = LoggerFactory.getLogger(ValkeyEventConsumer::class.java)
|
||||
private val eventTypeHandlers = ConcurrentHashMap<String, CopyOnWriteArrayList<(DomainEvent) -> Unit>>()
|
||||
private val allEventHandlers = CopyOnWriteArrayList<(DomainEvent) -> Unit>()
|
||||
private var running = false
|
||||
private val logger = LoggerFactory.getLogger(ValkeyEventConsumer::class.java)
|
||||
private val eventTypeHandlers = ConcurrentHashMap<String, CopyOnWriteArrayList<(DomainEvent) -> Unit>>()
|
||||
private val allEventHandlers = CopyOnWriteArrayList<(DomainEvent) -> Unit>()
|
||||
private var running = false
|
||||
|
||||
/**
|
||||
* Initializes the consumer.
|
||||
*/
|
||||
@PostConstruct
|
||||
fun init() {
|
||||
if (properties.createConsumerGroupIfNotExists) {
|
||||
createConsumerGroupsIfNotExist()
|
||||
/**
|
||||
* Initializes the consumer.
|
||||
*/
|
||||
@PostConstruct
|
||||
fun init() {
|
||||
if (properties.createConsumerGroupIfNotExists) {
|
||||
createConsumerGroupsIfNotExist()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the consumer.
|
||||
*/
|
||||
@PreDestroy
|
||||
fun shutdown() {
|
||||
running = false
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a handler for a specific event type.
|
||||
*
|
||||
* @param eventType The type of event to handle
|
||||
* @param handler The handler to call when an event of the specified type is received
|
||||
*/
|
||||
fun registerEventHandler(eventType: String, handler: (DomainEvent) -> Unit) {
|
||||
eventTypeHandlers.computeIfAbsent(eventType) { CopyOnWriteArrayList() }.add(handler)
|
||||
logger.debug("Registered handler for event type: $eventType")
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a handler for all events.
|
||||
*
|
||||
* @param handler The handler to call when any event is received
|
||||
*/
|
||||
fun registerAllEventsHandler(handler: (DomainEvent) -> Unit) {
|
||||
allEventHandlers.add(handler)
|
||||
logger.debug("Registered handler for all events")
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregisters a handler for a specific event type.
|
||||
*
|
||||
* @param eventType The type of event
|
||||
* @param handler The handler to unregister
|
||||
*/
|
||||
fun unregisterEventHandler(eventType: String, handler: (DomainEvent) -> Unit) {
|
||||
eventTypeHandlers[eventType]?.remove(handler)
|
||||
logger.debug("Unregistered handler for event type: $eventType")
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregisters a handler for all events.
|
||||
*
|
||||
* @param handler The handler to unregister
|
||||
*/
|
||||
fun unregisterAllEventsHandler(handler: (DomainEvent) -> Unit) {
|
||||
allEventHandlers.remove(handler)
|
||||
logger.debug("Unregistered handler for all events")
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates consumer groups for all streams if they don't exist.
|
||||
*/
|
||||
private fun createConsumerGroupsIfNotExist() {
|
||||
try {
|
||||
val allEventsStreamKey = getAllEventsStreamKey()
|
||||
try {
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.add(allEventsStreamKey, mapOf("init" to "init"))
|
||||
logger.debug("Ensured all-events stream has messages: $allEventsStreamKey")
|
||||
} catch (e: Exception) {
|
||||
logger.debug("All-events stream might already have messages: ${e.message}")
|
||||
}
|
||||
|
||||
createConsumerGroupIfNotExists(allEventsStreamKey)
|
||||
|
||||
val streamKeys = valkeyTemplate.keys("${properties.streamPrefix}*")
|
||||
|
||||
for (streamKey in streamKeys) {
|
||||
if (streamKey != allEventsStreamKey) {
|
||||
createConsumerGroupIfNotExists(streamKey)
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error creating consumer groups: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a consumer group for a stream if it doesn't exist.
|
||||
*
|
||||
* @param streamKey The key of the stream
|
||||
*/
|
||||
private fun createConsumerGroupIfNotExists(streamKey: String) {
|
||||
try {
|
||||
try {
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.add(streamKey, mapOf("init" to "init"))
|
||||
logger.debug("Ensured stream has messages: $streamKey")
|
||||
} catch (e: Exception) {
|
||||
logger.debug("Stream $streamKey might already have messages: ${e.message}")
|
||||
}
|
||||
|
||||
try {
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.createGroup(streamKey, ReadOffset.latest(), properties.consumerGroup)
|
||||
logger.debug("Created consumer group ${properties.consumerGroup} for stream: $streamKey")
|
||||
} catch (e: Exception) {
|
||||
logger.debug("Could not create consumer group ${properties.consumerGroup} for stream: $streamKey: ${e.message}")
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error creating consumer group for stream $streamKey: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodic polls for new events from all streams.
|
||||
*/
|
||||
@Scheduled(fixedDelayString = "\${valkey.event-store.poll-interval:100}")
|
||||
fun pollEvents() {
|
||||
if (!running) {
|
||||
running = true
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the consumer.
|
||||
*/
|
||||
@PreDestroy
|
||||
fun shutdown() {
|
||||
running = false
|
||||
try {
|
||||
pollStream(getAllEventsStreamKey())
|
||||
claimPendingMessages()
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error polling events: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a handler for a specific event type.
|
||||
*
|
||||
* @param eventType The type of event to handle
|
||||
* @param handler The handler to call when an event of the specified type is received
|
||||
*/
|
||||
fun registerEventHandler(eventType: String, handler: (DomainEvent) -> Unit) {
|
||||
eventTypeHandlers.computeIfAbsent(eventType) { CopyOnWriteArrayList() }.add(handler)
|
||||
logger.debug("Registered handler for event type: $eventType")
|
||||
}
|
||||
/**
|
||||
* Polls a stream for new events.
|
||||
*
|
||||
* @param streamKey The key of the stream to poll
|
||||
*/
|
||||
private fun pollStream(streamKey: String) {
|
||||
try {
|
||||
val options = StreamReadOptions.empty()
|
||||
.count(properties.maxBatchSize.toLong())
|
||||
.block(properties.pollTimeout)
|
||||
|
||||
/**
|
||||
* Registers a handler for all events.
|
||||
*
|
||||
* @param handler The handler to call when any event is received
|
||||
*/
|
||||
fun registerAllEventsHandler(handler: (DomainEvent) -> Unit) {
|
||||
allEventHandlers.add(handler)
|
||||
logger.debug("Registered handler for all events")
|
||||
}
|
||||
val records = valkeyTemplate.opsForStream<String, String>()
|
||||
.read(
|
||||
Consumer.from(properties.consumerGroup, properties.consumerName),
|
||||
options,
|
||||
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
|
||||
)
|
||||
|
||||
/**
|
||||
* Unregisters a handler for a specific event type.
|
||||
*
|
||||
* @param eventType The type of event
|
||||
* @param handler The handler to unregister
|
||||
*/
|
||||
fun unregisterEventHandler(eventType: String, handler: (DomainEvent) -> Unit) {
|
||||
eventTypeHandlers[eventType]?.remove(handler)
|
||||
logger.debug("Unregistered handler for event type: $eventType")
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregisters a handler for all events.
|
||||
*
|
||||
* @param handler The handler to unregister
|
||||
*/
|
||||
fun unregisterAllEventsHandler(handler: (DomainEvent) -> Unit) {
|
||||
allEventHandlers.remove(handler)
|
||||
logger.debug("Unregistered handler for all events")
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates consumer groups for all streams if they don't exist.
|
||||
*/
|
||||
private fun createConsumerGroupsIfNotExist() {
|
||||
try {
|
||||
val allEventsStreamKey = getAllEventsStreamKey()
|
||||
try {
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.add(allEventsStreamKey, mapOf("init" to "init"))
|
||||
logger.debug("Ensured all-events stream has messages: $allEventsStreamKey")
|
||||
} catch (e: Exception) {
|
||||
logger.debug("All-events stream might already have messages: ${e.message}")
|
||||
}
|
||||
|
||||
createConsumerGroupIfNotExists(allEventsStreamKey)
|
||||
|
||||
val streamKeys = valkeyTemplate.keys("${properties.streamPrefix}*")
|
||||
|
||||
for (streamKey in streamKeys) {
|
||||
if (streamKey != allEventsStreamKey) {
|
||||
createConsumerGroupIfNotExists(streamKey)
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error creating consumer groups: ${e.message}", e)
|
||||
if (records != null) {
|
||||
for (record in records) {
|
||||
processRecord(record)
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
val message = e.message
|
||||
if (message == null || !message.contains("NOGROUP")) {
|
||||
logger.error("Error polling stream $streamKey: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a consumer group for a stream if it doesn't exist.
|
||||
*
|
||||
* @param streamKey The key of the stream
|
||||
*/
|
||||
private fun createConsumerGroupIfNotExists(streamKey: String) {
|
||||
try {
|
||||
try {
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.add(streamKey, mapOf("init" to "init"))
|
||||
logger.debug("Ensured stream has messages: $streamKey")
|
||||
} catch (e: Exception) {
|
||||
logger.debug("Stream $streamKey might already have messages: ${e.message}")
|
||||
}
|
||||
/**
|
||||
* Claims pending messages that have been idle for too long.
|
||||
*/
|
||||
private fun claimPendingMessages() {
|
||||
try {
|
||||
val streamKey = getAllEventsStreamKey()
|
||||
|
||||
try {
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.createGroup(streamKey, ReadOffset.latest(), properties.consumerGroup)
|
||||
logger.debug("Created consumer group ${properties.consumerGroup} for stream: $streamKey")
|
||||
} catch (e: Exception) {
|
||||
logger.debug("Could not create consumer group ${properties.consumerGroup} for stream: $streamKey: ${e.message}")
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error creating consumer group for stream $streamKey: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
val pendingSummary = valkeyTemplate.opsForStream<String, String>()
|
||||
.pending(streamKey, properties.consumerGroup)
|
||||
|
||||
/**
|
||||
* Periodic polls for new events from all streams.
|
||||
*/
|
||||
@Scheduled(fixedDelayString = $$"${valkey.event-store.poll-interval:100}")
|
||||
fun pollEvents() {
|
||||
if (!running) {
|
||||
running = true
|
||||
}
|
||||
if (pendingSummary != null && pendingSummary.totalPendingMessages > 0) {
|
||||
val pendingMessages = valkeyTemplate.opsForStream<String, String>()
|
||||
.pending(
|
||||
streamKey,
|
||||
Consumer.from(properties.consumerGroup, properties.consumerName),
|
||||
Range.unbounded<String>(),
|
||||
properties.maxBatchSize.toLong()
|
||||
)
|
||||
|
||||
try {
|
||||
pollStream(getAllEventsStreamKey())
|
||||
claimPendingMessages()
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error polling events: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
if (pendingMessages.size() > 0) {
|
||||
val messageIdsList = pendingMessages.map { it.id }.toList()
|
||||
|
||||
/**
|
||||
* Polls a stream for new events.
|
||||
*
|
||||
* @param streamKey The key of the stream to poll
|
||||
*/
|
||||
private fun pollStream(streamKey: String) {
|
||||
try {
|
||||
val options = StreamReadOptions.empty()
|
||||
.count(properties.maxBatchSize.toLong())
|
||||
.block(properties.pollTimeout)
|
||||
if (messageIdsList.isNotEmpty()) {
|
||||
val messageIds = messageIdsList.toTypedArray()
|
||||
|
||||
val records = valkeyTemplate.opsForStream<String, String>()
|
||||
.read(
|
||||
Consumer.from(properties.consumerGroup, properties.consumerName),
|
||||
options,
|
||||
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
|
||||
)
|
||||
.claim(
|
||||
streamKey,
|
||||
properties.consumerGroup,
|
||||
properties.consumerName,
|
||||
properties.claimIdleTimeout,
|
||||
*messageIds
|
||||
)
|
||||
|
||||
if (records != null) {
|
||||
for (record in records) {
|
||||
processRecord(record)
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
val message = e.message
|
||||
if (message == null || !message.contains("NOGROUP")) {
|
||||
logger.error("Error polling stream $streamKey: ${e.message}", e)
|
||||
for (record in records) {
|
||||
processRecord(record)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error claiming pending messages: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Claims pending messages that have been idle for too long.
|
||||
*/
|
||||
private fun claimPendingMessages() {
|
||||
/**
|
||||
* Processes a record from a stream.
|
||||
*
|
||||
* @param record The record to process
|
||||
*/
|
||||
private fun processRecord(record: MapRecord<String, String, String>) {
|
||||
try {
|
||||
val data = record.value
|
||||
|
||||
if (data.size == 1 && data.containsKey("init") && data["init"] == "init") {
|
||||
logger.debug("Skipping init message")
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.acknowledge(properties.consumerGroup, record)
|
||||
return
|
||||
}
|
||||
|
||||
val event = serializer.deserialize(data)
|
||||
val eventType = serializer.getEventType(data)
|
||||
|
||||
eventTypeHandlers[eventType]?.forEach { handler ->
|
||||
try {
|
||||
val streamKey = getAllEventsStreamKey()
|
||||
|
||||
val pendingSummary = valkeyTemplate.opsForStream<String, String>()
|
||||
.pending(streamKey, properties.consumerGroup)
|
||||
|
||||
if (pendingSummary != null && pendingSummary.totalPendingMessages > 0) {
|
||||
val pendingMessages = valkeyTemplate.opsForStream<String, String>()
|
||||
.pending(
|
||||
streamKey,
|
||||
Consumer.from(properties.consumerGroup, properties.consumerName),
|
||||
Range.unbounded<String>(),
|
||||
properties.maxBatchSize.toLong()
|
||||
)
|
||||
|
||||
if (pendingMessages.size() > 0) {
|
||||
val messageIdsList = pendingMessages.map { it.id }.toList()
|
||||
|
||||
if (messageIdsList.isNotEmpty()) {
|
||||
val messageIds = messageIdsList.toTypedArray()
|
||||
|
||||
val records = valkeyTemplate.opsForStream<String, String>()
|
||||
.claim(
|
||||
streamKey,
|
||||
properties.consumerGroup,
|
||||
properties.consumerName,
|
||||
properties.claimIdleTimeout,
|
||||
*messageIds
|
||||
)
|
||||
|
||||
for (record in records) {
|
||||
processRecord(record)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
handler(event)
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error claiming pending messages: ${e.message}", e)
|
||||
logger.error("Error handling event of type $eventType: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a record from a stream.
|
||||
*
|
||||
* @param record The record to process
|
||||
*/
|
||||
private fun processRecord(record: MapRecord<String, String, String>) {
|
||||
allEventHandlers.forEach { handler ->
|
||||
try {
|
||||
val data = record.value
|
||||
|
||||
if (data.size == 1 && data.containsKey("init") && data["init"] == "init") {
|
||||
logger.debug("Skipping init message")
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.acknowledge(properties.consumerGroup, record)
|
||||
return
|
||||
}
|
||||
|
||||
val event = serializer.deserialize(data)
|
||||
val eventType = serializer.getEventType(data)
|
||||
|
||||
eventTypeHandlers[eventType]?.forEach { handler ->
|
||||
try {
|
||||
handler(event)
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error handling event of type $eventType: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
||||
allEventHandlers.forEach { handler ->
|
||||
try {
|
||||
handler(event)
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error handling event: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.acknowledge(properties.consumerGroup, record)
|
||||
|
||||
handler(event)
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error processing record: ${e.message}", e)
|
||||
logger.error("Error handling event: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the Valkey key for the all-events stream.
|
||||
*
|
||||
* @return The Valkey key for the all-events stream
|
||||
*/
|
||||
private fun getAllEventsStreamKey(): String {
|
||||
return "${properties.streamPrefix}${properties.allEventsStream}"
|
||||
valkeyTemplate.opsForStream<String, String>()
|
||||
.acknowledge(properties.consumerGroup, record)
|
||||
|
||||
} catch (e: Exception) {
|
||||
logger.error("Error processing record: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the Valkey key for the all-events stream.
|
||||
*
|
||||
* @return The Valkey key for the all-events stream
|
||||
*/
|
||||
private fun getAllEventsStreamKey(): String {
|
||||
return "${properties.streamPrefix}${properties.allEventsStream}"
|
||||
}
|
||||
}
|
||||
|
||||
+8
-7
@@ -8,16 +8,17 @@ import at.mocode.infrastructure.eventstore.api.ConcurrencyException
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import at.mocode.infrastructure.eventstore.api.EventStore
|
||||
import at.mocode.infrastructure.eventstore.api.Subscription
|
||||
import io.valkey.springframework.data.valkey.core.SessionCallback
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import io.valkey.springframework.data.valkey.core.ValkeyOperations
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.dao.DataAccessException
|
||||
import org.springframework.data.domain.Range
|
||||
import org.springframework.data.redis.core.SessionCallback
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import kotlin.uuid.Uuid
|
||||
|
||||
class ValkeyEventStore(
|
||||
private val valkeyTemplate: StringRedisTemplate,
|
||||
private val valkeyTemplate: StringValkeyTemplate,
|
||||
private val serializer: EventSerializer,
|
||||
private val properties: ValkeyEventStoreProperties
|
||||
) : EventStore {
|
||||
@@ -131,8 +132,8 @@ class ValkeyEventStore(
|
||||
try {
|
||||
valkeyTemplate.execute(object : SessionCallback<List<Any>> {
|
||||
@Throws(DataAccessException::class)
|
||||
override fun <K, V> execute(operations: org.springframework.data.redis.core.RedisOperations<K, V>): List<Any> {
|
||||
val streamOps = (operations as StringRedisTemplate).opsForStream<String, String>()
|
||||
override fun <K, V> execute(operations: ValkeyOperations<K, V>): List<Any> {
|
||||
val streamOps = (operations as StringValkeyTemplate).opsForStream<String, String>()
|
||||
|
||||
operations.multi()
|
||||
|
||||
@@ -178,8 +179,8 @@ class ValkeyEventStore(
|
||||
try {
|
||||
valkeyTemplate.execute(object : SessionCallback<List<Any>> {
|
||||
@Throws(DataAccessException::class)
|
||||
override fun <K, V> execute(operations: org.springframework.data.redis.core.RedisOperations<K, V>): List<Any> {
|
||||
val streamOps = (operations as StringRedisTemplate).opsForStream<String, String>()
|
||||
override fun <K, V> execute(operations: ValkeyOperations<K, V>): List<Any> {
|
||||
val streamOps = (operations as StringValkeyTemplate).opsForStream<String, String>()
|
||||
|
||||
operations.multi()
|
||||
streamOps.add(streamKey, eventData)
|
||||
|
||||
+18
-17
@@ -2,16 +2,17 @@ package at.mocode.infrastructure.eventstore.valkey
|
||||
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import at.mocode.infrastructure.eventstore.api.EventStore
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyPassword
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration
|
||||
import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import org.springframework.beans.factory.annotation.Qualifier
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory
|
||||
import org.springframework.data.redis.connection.RedisPassword
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
@@ -53,11 +54,11 @@ class ValkeyEventStoreConfiguration {
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = ["eventStoreValkeyConnectionFactory"])
|
||||
fun eventStoreValkeyConnectionFactory(properties: ValkeyEventStoreProperties): RedisConnectionFactory {
|
||||
val config = RedisStandaloneConfiguration().apply {
|
||||
fun eventStoreValkeyConnectionFactory(properties: ValkeyEventStoreProperties): ValkeyConnectionFactory {
|
||||
val config = ValkeyStandaloneConfiguration().apply {
|
||||
hostName = properties.host
|
||||
port = properties.port
|
||||
properties.password?.let { password = RedisPassword.of(it) }
|
||||
properties.password?.let { password = ValkeyPassword.of(it) }
|
||||
database = properties.database
|
||||
}
|
||||
|
||||
@@ -76,10 +77,10 @@ class ValkeyEventStoreConfiguration {
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = ["eventStoreValkeyTemplate"])
|
||||
fun eventStoreValkeyTemplate(
|
||||
@org.springframework.beans.factory.annotation.Qualifier("eventStoreValkeyConnectionFactory")
|
||||
connectionFactory: RedisConnectionFactory
|
||||
): StringRedisTemplate {
|
||||
return StringRedisTemplate().apply {
|
||||
@Qualifier("eventStoreValkeyConnectionFactory")
|
||||
connectionFactory: ValkeyConnectionFactory
|
||||
): StringValkeyTemplate {
|
||||
return StringValkeyTemplate().apply {
|
||||
setConnectionFactory(connectionFactory)
|
||||
afterPropertiesSet()
|
||||
}
|
||||
@@ -107,8 +108,8 @@ class ValkeyEventStoreConfiguration {
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
fun eventStore(
|
||||
@org.springframework.beans.factory.annotation.Qualifier("eventStoreValkeyTemplate")
|
||||
valkeyTemplate: StringRedisTemplate,
|
||||
@Qualifier("eventStoreValkeyTemplate")
|
||||
valkeyTemplate: StringValkeyTemplate,
|
||||
eventSerializer: EventSerializer,
|
||||
properties: ValkeyEventStoreProperties
|
||||
): EventStore {
|
||||
@@ -126,8 +127,8 @@ class ValkeyEventStoreConfiguration {
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
fun eventConsumer(
|
||||
@org.springframework.beans.factory.annotation.Qualifier("eventStoreValkeyTemplate")
|
||||
valkeyTemplate: StringRedisTemplate,
|
||||
@Qualifier("eventStoreValkeyTemplate")
|
||||
valkeyTemplate: StringValkeyTemplate,
|
||||
eventSerializer: EventSerializer,
|
||||
properties: ValkeyEventStoreProperties
|
||||
): ValkeyEventConsumer {
|
||||
|
||||
+5
-5
@@ -8,6 +8,8 @@ import at.mocode.infrastructure.cache.valkey.JacksonCacheSerializer
|
||||
import at.mocode.infrastructure.cache.valkey.ValkeyConfiguration
|
||||
import at.mocode.infrastructure.cache.valkey.ValkeyDistributedCache
|
||||
import at.mocode.infrastructure.eventstore.api.EventStore
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.core.ValkeyTemplate
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
@@ -20,8 +22,6 @@ import org.springframework.boot.test.context.SpringBootTest
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.context.annotation.Import
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory
|
||||
import org.springframework.data.redis.core.RedisTemplate
|
||||
import org.springframework.test.context.DynamicPropertyRegistry
|
||||
import org.springframework.test.context.DynamicPropertySource
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
@@ -96,7 +96,7 @@ class ValkeyCacheAndEventStoreIntegrationTest {
|
||||
class TestConfig {
|
||||
@Bean
|
||||
fun distributedCache(
|
||||
@Qualifier("valkeyTemplate") valkeyTemplate: RedisTemplate<String, ByteArray>,
|
||||
@Qualifier("valkeyTemplate") valkeyTemplate: ValkeyTemplate<String, ByteArray>,
|
||||
cacheConfiguration: CacheConfiguration
|
||||
): DistributedCache {
|
||||
return ValkeyDistributedCache(
|
||||
@@ -116,11 +116,11 @@ class ValkeyCacheAndEventStoreIntegrationTest {
|
||||
// Verify separate ConnectionFactories
|
||||
@Autowired
|
||||
@Qualifier("valkeyConnectionFactory")
|
||||
private lateinit var cacheConnectionFactory: RedisConnectionFactory
|
||||
private lateinit var cacheConnectionFactory: ValkeyConnectionFactory
|
||||
|
||||
@Autowired
|
||||
@Qualifier("eventStoreValkeyConnectionFactory")
|
||||
private lateinit var eventStoreConnectionFactory: RedisConnectionFactory
|
||||
private lateinit var eventStoreConnectionFactory: ValkeyConnectionFactory
|
||||
|
||||
@Test
|
||||
fun `test both modules can be used simultaneously without conflicts`(): Unit = runBlocking {
|
||||
|
||||
+7
-7
@@ -6,6 +6,9 @@ import at.mocode.core.domain.model.AggregateId
|
||||
import at.mocode.core.domain.model.EventType
|
||||
import at.mocode.core.domain.model.EventVersion
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration
|
||||
import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.Transient
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
@@ -14,9 +17,6 @@ import org.junit.jupiter.api.Assertions.assertTrue
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.junit.jupiter.Container
|
||||
import org.testcontainers.junit.jupiter.Testcontainers
|
||||
@@ -35,11 +35,11 @@ class ValkeyEventConsumerResilienceTest {
|
||||
|
||||
companion object {
|
||||
@Container
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine"))
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine"))
|
||||
.withExposedPorts(6379)
|
||||
}
|
||||
|
||||
private lateinit var valkeyTemplate: StringRedisTemplate
|
||||
private lateinit var valkeyTemplate: StringValkeyTemplate
|
||||
private lateinit var serializer: EventSerializer
|
||||
private lateinit var properties: ValkeyEventStoreProperties
|
||||
private lateinit var eventStore: ValkeyEventStore
|
||||
@@ -51,11 +51,11 @@ class ValkeyEventConsumerResilienceTest {
|
||||
val valkeyPort = valkeyContainer.getMappedPort(6379)
|
||||
val valkeyHost = valkeyContainer.host
|
||||
|
||||
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
|
||||
connectionFactory.afterPropertiesSet()
|
||||
|
||||
valkeyTemplate = StringRedisTemplate(connectionFactory)
|
||||
valkeyTemplate = StringValkeyTemplate(connectionFactory)
|
||||
|
||||
serializer = JacksonEventSerializer().apply {
|
||||
registerEventType(ResilienceTestEvent::class.java, "ResilienceTestEvent")
|
||||
|
||||
+8
-8
@@ -2,6 +2,8 @@ package at.mocode.infrastructure.eventstore.valkey
|
||||
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import at.mocode.infrastructure.eventstore.api.EventStore
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import org.junit.jupiter.api.Assertions.*
|
||||
import org.junit.jupiter.api.DisplayName
|
||||
import org.junit.jupiter.api.Test
|
||||
@@ -11,8 +13,6 @@ import org.springframework.boot.autoconfigure.AutoConfigurations
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
@@ -71,8 +71,8 @@ class ValkeyEventStoreConfigurationTest {
|
||||
assertTrue(context.containsBean("eventConsumer"))
|
||||
|
||||
// Verify bean types
|
||||
assertNotNull(context.getBean<RedisConnectionFactory>("eventStoreValkeyConnectionFactory"))
|
||||
assertNotNull(context.getBean<StringRedisTemplate>("eventStoreValkeyTemplate"))
|
||||
assertNotNull(context.getBean<ValkeyConnectionFactory>("eventStoreValkeyConnectionFactory"))
|
||||
assertNotNull(context.getBean<StringValkeyTemplate>("eventStoreValkeyTemplate"))
|
||||
assertNotNull(context.getBean<EventSerializer>("eventSerializer"))
|
||||
assertNotNull(context.getBean<EventStore>("eventStore"))
|
||||
assertNotNull(context.getBean<ValkeyEventConsumer>("eventConsumer"))
|
||||
@@ -160,7 +160,7 @@ class ValkeyEventStoreConfigurationTest {
|
||||
"valkey.event-store.database=1"
|
||||
)
|
||||
.run { context ->
|
||||
val connectionFactory = context.getBean<RedisConnectionFactory>("eventStoreValkeyConnectionFactory")
|
||||
val connectionFactory = context.getBean<ValkeyConnectionFactory>("eventStoreValkeyConnectionFactory")
|
||||
assertNotNull(connectionFactory)
|
||||
|
||||
// Verify the connection factory is properly configured
|
||||
@@ -176,7 +176,7 @@ class ValkeyEventStoreConfigurationTest {
|
||||
fun `should handle Valkey template creation correctly`() {
|
||||
contextRunner
|
||||
.run { context ->
|
||||
val valkeyTemplate = context.getBean<StringRedisTemplate>("eventStoreValkeyTemplate")
|
||||
val valkeyTemplate = context.getBean<StringValkeyTemplate>("eventStoreValkeyTemplate")
|
||||
assertNotNull(valkeyTemplate)
|
||||
|
||||
// Verify the template is properly set up
|
||||
@@ -211,7 +211,7 @@ class ValkeyEventStoreConfigurationTest {
|
||||
assertTrue(eventStore is ValkeyEventStore)
|
||||
|
||||
// Verify dependencies are wired correctly
|
||||
val valkeyTemplate = context.getBean<StringRedisTemplate>("eventStoreValkeyTemplate")
|
||||
val valkeyTemplate = context.getBean<StringValkeyTemplate>("eventStoreValkeyTemplate")
|
||||
val eventSerializer = context.getBean<EventSerializer>("eventSerializer")
|
||||
val properties = context.getBean<ValkeyEventStoreProperties>()
|
||||
|
||||
@@ -231,7 +231,7 @@ class ValkeyEventStoreConfigurationTest {
|
||||
assertNotNull(eventConsumer)
|
||||
|
||||
// Verify dependencies are available
|
||||
val valkeyTemplate = context.getBean<StringRedisTemplate>("eventStoreValkeyTemplate")
|
||||
val valkeyTemplate = context.getBean<StringValkeyTemplate>("eventStoreValkeyTemplate")
|
||||
val eventSerializer = context.getBean<EventSerializer>("eventSerializer")
|
||||
val properties = context.getBean<ValkeyEventStoreProperties>()
|
||||
|
||||
|
||||
+7
-7
@@ -6,6 +6,9 @@ import at.mocode.core.domain.model.EventType
|
||||
import at.mocode.core.domain.model.EventVersion
|
||||
import at.mocode.infrastructure.eventstore.api.ConcurrencyException
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration
|
||||
import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.Transient
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
@@ -13,9 +16,6 @@ import org.junit.jupiter.api.Assertions.*
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.assertThrows
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.junit.jupiter.Container
|
||||
import org.testcontainers.junit.jupiter.Testcontainers
|
||||
@@ -32,11 +32,11 @@ class ValkeyEventStoreErrorHandlingTest {
|
||||
|
||||
companion object {
|
||||
@Container
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine"))
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine"))
|
||||
.withExposedPorts(6379)
|
||||
}
|
||||
|
||||
private lateinit var valkeyTemplate: StringRedisTemplate
|
||||
private lateinit var valkeyTemplate: StringValkeyTemplate
|
||||
private lateinit var serializer: EventSerializer
|
||||
private lateinit var properties: ValkeyEventStoreProperties
|
||||
private lateinit var eventStore: ValkeyEventStore
|
||||
@@ -46,11 +46,11 @@ class ValkeyEventStoreErrorHandlingTest {
|
||||
val valkeyPort = valkeyContainer.getMappedPort(6379)
|
||||
val valkeyHost = valkeyContainer.host
|
||||
|
||||
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
|
||||
connectionFactory.afterPropertiesSet()
|
||||
|
||||
valkeyTemplate = StringRedisTemplate(connectionFactory)
|
||||
valkeyTemplate = StringValkeyTemplate(connectionFactory)
|
||||
|
||||
serializer = JacksonEventSerializer().apply {
|
||||
registerEventType(TestErrorEvent::class.java, "TestErrorEvent")
|
||||
|
||||
+7
-7
@@ -7,14 +7,14 @@ import at.mocode.core.domain.event.DomainEvent
|
||||
import at.mocode.core.domain.model.*
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import at.mocode.infrastructure.eventstore.api.EventStore
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration
|
||||
import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.Assertions.assertTrue
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.junit.jupiter.Container
|
||||
import org.testcontainers.junit.jupiter.Testcontainers
|
||||
@@ -30,11 +30,11 @@ class ValkeyEventStoreIntegrationTest {
|
||||
|
||||
companion object {
|
||||
@Container
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine"))
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine"))
|
||||
.withExposedPorts(6379)
|
||||
}
|
||||
|
||||
private lateinit var valkeyTemplate: StringRedisTemplate
|
||||
private lateinit var valkeyTemplate: StringValkeyTemplate
|
||||
private lateinit var serializer: EventSerializer
|
||||
private lateinit var properties: ValkeyEventStoreProperties
|
||||
private lateinit var eventStore: EventStore
|
||||
@@ -45,11 +45,11 @@ class ValkeyEventStoreIntegrationTest {
|
||||
val valkeyPort = valkeyContainer.getMappedPort(6379)
|
||||
val valkeyHost = valkeyContainer.host
|
||||
|
||||
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
|
||||
connectionFactory.afterPropertiesSet()
|
||||
|
||||
valkeyTemplate = StringRedisTemplate(connectionFactory)
|
||||
valkeyTemplate = StringValkeyTemplate(connectionFactory)
|
||||
|
||||
serializer = JacksonEventSerializer().apply {
|
||||
registerEventType(TestCreatedEvent::class.java, "TestCreated")
|
||||
|
||||
+7
-7
@@ -5,6 +5,9 @@ import at.mocode.core.domain.model.AggregateId
|
||||
import at.mocode.core.domain.model.EventType
|
||||
import at.mocode.core.domain.model.EventVersion
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration
|
||||
import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.Transient
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
@@ -12,9 +15,6 @@ import org.junit.jupiter.api.Assertions.*
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.junit.jupiter.Container
|
||||
import org.testcontainers.junit.jupiter.Testcontainers
|
||||
@@ -31,11 +31,11 @@ class ValkeyEventStoreStreamTest {
|
||||
|
||||
companion object {
|
||||
@Container
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine"))
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine"))
|
||||
.withExposedPorts(6379)
|
||||
}
|
||||
|
||||
private lateinit var valkeyTemplate: StringRedisTemplate
|
||||
private lateinit var valkeyTemplate: StringValkeyTemplate
|
||||
private lateinit var serializer: EventSerializer
|
||||
private lateinit var properties: ValkeyEventStoreProperties
|
||||
private lateinit var eventStore: ValkeyEventStore
|
||||
@@ -45,11 +45,11 @@ class ValkeyEventStoreStreamTest {
|
||||
val valkeyPort = valkeyContainer.getMappedPort(6379)
|
||||
val valkeyHost = valkeyContainer.host
|
||||
|
||||
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
|
||||
connectionFactory.afterPropertiesSet()
|
||||
|
||||
valkeyTemplate = StringRedisTemplate(connectionFactory)
|
||||
valkeyTemplate = StringValkeyTemplate(connectionFactory)
|
||||
|
||||
serializer = JacksonEventSerializer().apply {
|
||||
registerEventType(StreamTestEvent::class.java, "StreamTestEvent")
|
||||
|
||||
+78
-78
@@ -6,6 +6,9 @@ import at.mocode.core.domain.model.EventType
|
||||
import at.mocode.core.domain.model.EventVersion
|
||||
import at.mocode.infrastructure.eventstore.api.ConcurrencyException
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration
|
||||
import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.Transient
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
@@ -13,9 +16,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.assertThrows
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.junit.jupiter.Container
|
||||
import org.testcontainers.junit.jupiter.Testcontainers
|
||||
@@ -25,93 +25,93 @@ import kotlin.uuid.Uuid
|
||||
@Testcontainers
|
||||
class ValkeyEventStoreTest {
|
||||
|
||||
companion object {
|
||||
@Container
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine"))
|
||||
.withExposedPorts(6379)
|
||||
companion object {
|
||||
@Container
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine"))
|
||||
.withExposedPorts(6379)
|
||||
}
|
||||
|
||||
private lateinit var valkeyTemplate: StringValkeyTemplate
|
||||
private lateinit var serializer: EventSerializer
|
||||
private lateinit var properties: ValkeyEventStoreProperties
|
||||
private lateinit var eventStore: ValkeyEventStore
|
||||
|
||||
@BeforeEach
|
||||
fun setUp() {
|
||||
val valkeyPort = valkeyContainer.getMappedPort(6379)
|
||||
val valkeyHost = valkeyContainer.host
|
||||
|
||||
val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
|
||||
connectionFactory.afterPropertiesSet()
|
||||
|
||||
valkeyTemplate = StringValkeyTemplate(connectionFactory)
|
||||
|
||||
serializer = JacksonEventSerializer().apply {
|
||||
registerEventType(TestCreatedEvent::class.java, "TestCreated")
|
||||
registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
|
||||
}
|
||||
|
||||
private lateinit var valkeyTemplate: StringRedisTemplate
|
||||
private lateinit var serializer: EventSerializer
|
||||
private lateinit var properties: ValkeyEventStoreProperties
|
||||
private lateinit var eventStore: ValkeyEventStore
|
||||
|
||||
@BeforeEach
|
||||
fun setUp() {
|
||||
val valkeyPort = valkeyContainer.getMappedPort(6379)
|
||||
val valkeyHost = valkeyContainer.host
|
||||
|
||||
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
|
||||
connectionFactory.afterPropertiesSet()
|
||||
|
||||
valkeyTemplate = StringRedisTemplate(connectionFactory)
|
||||
|
||||
serializer = JacksonEventSerializer().apply {
|
||||
registerEventType(TestCreatedEvent::class.java, "TestCreated")
|
||||
registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
|
||||
}
|
||||
|
||||
properties = ValkeyEventStoreProperties().apply {
|
||||
streamPrefix = "test-stream:"
|
||||
}
|
||||
eventStore = ValkeyEventStore(valkeyTemplate, serializer, properties)
|
||||
cleanupValkey()
|
||||
properties = ValkeyEventStoreProperties().apply {
|
||||
streamPrefix = "test-stream:"
|
||||
}
|
||||
eventStore = ValkeyEventStore(valkeyTemplate, serializer, properties)
|
||||
cleanupValkey()
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
fun tearDown() = cleanupValkey()
|
||||
@AfterEach
|
||||
fun tearDown() = cleanupValkey()
|
||||
|
||||
private fun cleanupValkey() {
|
||||
val keys = valkeyTemplate.keys("${properties.streamPrefix}*")
|
||||
if (!keys.isNullOrEmpty()) {
|
||||
valkeyTemplate.delete(keys)
|
||||
}
|
||||
private fun cleanupValkey() {
|
||||
val keys = valkeyTemplate.keys("${properties.streamPrefix}*")
|
||||
if (!keys.isNullOrEmpty()) {
|
||||
valkeyTemplate.delete(keys)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `append and read events should work correctly for new stream`() {
|
||||
val aggregateId = Uuid.random()
|
||||
val event1 = TestCreatedEvent(AggregateId(aggregateId), EventVersion(1L), "Test Entity")
|
||||
val event2 = TestUpdatedEvent(AggregateId(aggregateId), EventVersion(2L), "Updated Test Entity")
|
||||
@Test
|
||||
fun `append and read events should work correctly for new stream`() {
|
||||
val aggregateId = Uuid.random()
|
||||
val event1 = TestCreatedEvent(AggregateId(aggregateId), EventVersion(1L), "Test Entity")
|
||||
val event2 = TestUpdatedEvent(AggregateId(aggregateId), EventVersion(2L), "Updated Test Entity")
|
||||
|
||||
eventStore.appendToStream(listOf(event1, event2), aggregateId, 0)
|
||||
eventStore.appendToStream(listOf(event1, event2), aggregateId, 0)
|
||||
|
||||
val events = eventStore.readFromStream(aggregateId)
|
||||
assertEquals(2, events.size)
|
||||
val events = eventStore.readFromStream(aggregateId)
|
||||
assertEquals(2, events.size)
|
||||
|
||||
val firstEvent = events[0] as TestCreatedEvent
|
||||
assertEquals(EventVersion(1L), firstEvent.version)
|
||||
assertEquals("Test Entity", firstEvent.name)
|
||||
val firstEvent = events[0] as TestCreatedEvent
|
||||
assertEquals(EventVersion(1L), firstEvent.version)
|
||||
assertEquals("Test Entity", firstEvent.name)
|
||||
|
||||
val secondEvent = events[1] as TestUpdatedEvent
|
||||
assertEquals(EventVersion(2L), secondEvent.version)
|
||||
assertEquals("Updated Test Entity", secondEvent.name)
|
||||
val secondEvent = events[1] as TestUpdatedEvent
|
||||
assertEquals(EventVersion(2L), secondEvent.version)
|
||||
assertEquals("Updated Test Entity", secondEvent.name)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `appending with wrong expected version should throw ConcurrencyException`() {
|
||||
val aggregateId = Uuid.random()
|
||||
val event1 = TestCreatedEvent(AggregateId(aggregateId), EventVersion(1L), "Test Entity")
|
||||
eventStore.appendToStream(listOf(event1), aggregateId, 0) // Stream is now at version 1
|
||||
|
||||
val event2 = TestUpdatedEvent(AggregateId(aggregateId), EventVersion(2L), "Updated Test Entity")
|
||||
assertThrows<ConcurrencyException> {
|
||||
eventStore.appendToStream(listOf(event2), aggregateId, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `appending with wrong expected version should throw ConcurrencyException`() {
|
||||
val aggregateId = Uuid.random()
|
||||
val event1 = TestCreatedEvent(AggregateId(aggregateId), EventVersion(1L), "Test Entity")
|
||||
eventStore.appendToStream(listOf(event1), aggregateId, 0) // Stream is now at version 1
|
||||
@Serializable
|
||||
data class TestCreatedEvent(
|
||||
@Transient override val aggregateId: AggregateId = AggregateId(Uuid.random()),
|
||||
@Transient override val version: EventVersion = EventVersion(0),
|
||||
val name: String
|
||||
) : BaseDomainEvent(aggregateId, EventType("TestCreated"), version)
|
||||
|
||||
val event2 = TestUpdatedEvent(AggregateId(aggregateId), EventVersion(2L), "Updated Test Entity")
|
||||
assertThrows<ConcurrencyException> {
|
||||
eventStore.appendToStream(listOf(event2), aggregateId, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Serializable
|
||||
data class TestCreatedEvent(
|
||||
@Transient override val aggregateId: AggregateId = AggregateId(Uuid.random()),
|
||||
@Transient override val version: EventVersion = EventVersion(0),
|
||||
val name: String
|
||||
) : BaseDomainEvent(aggregateId, EventType("TestCreated"), version)
|
||||
|
||||
@Serializable
|
||||
data class TestUpdatedEvent(
|
||||
@Transient override val aggregateId: AggregateId = AggregateId(Uuid.random()),
|
||||
@Transient override val version: EventVersion = EventVersion(0),
|
||||
val name: String
|
||||
) : BaseDomainEvent(aggregateId, EventType("TestUpdated"), version)
|
||||
@Serializable
|
||||
data class TestUpdatedEvent(
|
||||
@Transient override val aggregateId: AggregateId = AggregateId(Uuid.random()),
|
||||
@Transient override val version: EventVersion = EventVersion(0),
|
||||
val name: String
|
||||
) : BaseDomainEvent(aggregateId, EventType("TestUpdated"), version)
|
||||
}
|
||||
|
||||
+7
-7
@@ -9,15 +9,15 @@ import at.mocode.core.domain.model.EventType
|
||||
import at.mocode.core.domain.model.EventVersion
|
||||
import at.mocode.infrastructure.eventstore.api.EventSerializer
|
||||
import at.mocode.infrastructure.eventstore.api.EventStore
|
||||
import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration
|
||||
import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory
|
||||
import io.valkey.springframework.data.valkey.core.StringValkeyTemplate
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.Transient
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
|
||||
import org.springframework.data.redis.core.StringRedisTemplate
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.junit.jupiter.Container
|
||||
import org.testcontainers.junit.jupiter.Testcontainers
|
||||
@@ -29,11 +29,11 @@ class ValkeyIntegrationTest {
|
||||
|
||||
companion object {
|
||||
@Container
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine"))
|
||||
val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine"))
|
||||
.withExposedPorts(6379)
|
||||
}
|
||||
|
||||
private lateinit var valkeyTemplate: StringRedisTemplate
|
||||
private lateinit var valkeyTemplate: StringValkeyTemplate
|
||||
private lateinit var serializer: EventSerializer
|
||||
private lateinit var properties: ValkeyEventStoreProperties
|
||||
private lateinit var eventStore: EventStore
|
||||
@@ -43,10 +43,10 @@ class ValkeyIntegrationTest {
|
||||
fun setUp() {
|
||||
val valkeyPort = valkeyContainer.getMappedPort(6379)
|
||||
val valkeyHost = valkeyContainer.host
|
||||
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort)
|
||||
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
|
||||
connectionFactory.afterPropertiesSet()
|
||||
valkeyTemplate = StringRedisTemplate(connectionFactory)
|
||||
valkeyTemplate = StringValkeyTemplate(connectionFactory)
|
||||
serializer = JacksonEventSerializer().apply {
|
||||
registerEventType(TestCreatedEvent::class.java, "TestCreated")
|
||||
registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
|
||||
|
||||
Reference in New Issue
Block a user