fix Redis

This commit is contained in:
stefan
2025-08-01 14:00:55 +02:00
parent a9a43a7acf
commit 9e0858da8a
8 changed files with 146 additions and 409 deletions
@@ -4,7 +4,8 @@ import at.mocode.core.domain.serialization.KotlinInstantSerializer
import at.mocode.core.domain.serialization.UuidSerializer import at.mocode.core.domain.serialization.UuidSerializer
import com.benasher44.uuid.Uuid import com.benasher44.uuid.Uuid
import com.benasher44.uuid.uuid4 import com.benasher44.uuid.uuid4
import kotlinx.datetime.Instant import kotlin.time.Clock
import kotlin.time.Instant
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
/** /**
@@ -15,8 +16,8 @@ interface DomainEvent {
val eventId: Uuid val eventId: Uuid
val aggregateId: Uuid val aggregateId: Uuid
val eventType: String val eventType: String
val timestamp: kotlin.time.Instant val timestamp: Instant
val version: Long // KORRIGIERT: Einheitlich auf Long val version: Long
val correlationId: Uuid? val correlationId: Uuid?
val causationId: Uuid? val causationId: Uuid?
} }
@@ -29,11 +30,11 @@ abstract class BaseDomainEvent(
@Serializable(with = UuidSerializer::class) @Serializable(with = UuidSerializer::class)
override val aggregateId: Uuid, override val aggregateId: Uuid,
override val eventType: String, override val eventType: String,
override val version: Long, // KORRIGIERT: Einheitlich auf Long override val version: Long,
@Serializable(with = UuidSerializer::class) @Serializable(with = UuidSerializer::class)
override val eventId: Uuid = uuid4(), override val eventId: Uuid = uuid4(),
@Serializable(with = KotlinInstantSerializer::class) @Serializable(with = KotlinInstantSerializer::class)
override val timestamp: kotlin.time.Instant = kotlin.time.Clock.System.now(), // KORRIGIERT: Einheitlich auf kotlinx.datetime.Instant override val timestamp: Instant = Clock.System.now(),
@Serializable(with = UuidSerializer::class) @Serializable(with = UuidSerializer::class)
override val correlationId: Uuid? = null, override val correlationId: Uuid? = null,
@Serializable(with = UuidSerializer::class) @Serializable(with = UuidSerializer::class)
@@ -9,13 +9,11 @@ import kotlinx.serialization.Serializable
/** /**
* A marker interface for all Data Transfer Objects. * A marker interface for all Data Transfer Objects.
* While not strictly necessary, it can be useful for generic constraints.
*/ */
interface BaseDto interface BaseDto
/** /**
* Base DTO for domain entities that have a unique ID and audit timestamps. * Base DTO for domain entities that have unique ID and audit timestamps.
* Ensures that all primary entities share a common structure.
*/ */
@Serializable @Serializable
abstract class EntityDto : BaseDto { abstract class EntityDto : BaseDto {
@@ -34,16 +32,13 @@ abstract class EntityDto : BaseDto {
*/ */
@Serializable @Serializable
data class ErrorDto( data class ErrorDto(
val code: String, // A machine-readable error code, e.g., "VALIDATION_ERROR" val code: String,
val message: String, // A human-readable message, e.g., "Email is not valid" val message: String,
val field: String? = null // Optional: The specific field the error relates to val field: String? = null
) : BaseDto ) : BaseDto
/** /**
* A standardized and consistent wrapper for all API responses. * A standardized and consistent wrapper for all API responses.
* It clearly separates the data payload from metadata about the request's success and potential errors.
*
* @param T The type of the data payload.
*/ */
@Serializable @Serializable
data class ApiResponse<T>( data class ApiResponse<T>(
@@ -54,16 +49,10 @@ data class ApiResponse<T>(
val timestamp: Instant = Clock.System.now() val timestamp: Instant = Clock.System.now()
) { ) {
companion object { companion object {
/**
* Factory function to create a standardized success response.
*/
fun <T> success(data: T): ApiResponse<T> { fun <T> success(data: T): ApiResponse<T> {
return ApiResponse(data = data, success = true) return ApiResponse(data = data, success = true)
} }
/**
* Factory function to create a standardized error response.
*/
fun <T> error( fun <T> error(
code: String, code: String,
message: String, message: String,
@@ -76,9 +65,6 @@ data class ApiResponse<T>(
) )
} }
/**
* Factory function to create a standardized error response with multiple errors.
*/
fun <T> error(errors: List<ErrorDto>): ApiResponse<T> { fun <T> error(errors: List<ErrorDto>): ApiResponse<T> {
return ApiResponse(data = null, success = false, errors = errors) return ApiResponse(data = null, success = false, errors = errors)
} }
@@ -87,9 +73,6 @@ data class ApiResponse<T>(
/** /**
* A standardized wrapper for paginated API responses. * A standardized wrapper for paginated API responses.
* Contains the list of items for the current page as well as all necessary pagination metadata.
*
* @param T The type of the content in the page.
*/ */
@Serializable @Serializable
data class PagedResponse<T>( data class PagedResponse<T>(
@@ -101,5 +84,3 @@ data class PagedResponse<T>(
val hasNext: Boolean, val hasNext: Boolean,
val hasPrevious: Boolean val hasPrevious: Boolean
) )
// REMOVED: The PaginationDto was redundant as all its information is already contained within PagedResponse.
@@ -2,7 +2,7 @@ package at.mocode.core.domain.serialization
import com.benasher44.uuid.Uuid import com.benasher44.uuid.Uuid
import com.benasher44.uuid.uuidFrom import com.benasher44.uuid.uuidFrom
import kotlin.time.Instant import kotlin.time.Instant // KORRIGIERT: Finaler Wechsel zu kotlin.time
import kotlinx.datetime.LocalDate import kotlinx.datetime.LocalDate
import kotlinx.datetime.LocalDateTime import kotlinx.datetime.LocalDateTime
import kotlinx.datetime.LocalTime import kotlinx.datetime.LocalTime
@@ -13,45 +13,30 @@ import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder import kotlinx.serialization.encoding.Encoder
/**
* Serializer for UUID values
*/
object UuidSerializer : KSerializer<Uuid> { object UuidSerializer : KSerializer<Uuid> {
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("UUID", PrimitiveKind.STRING) override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("UUID", PrimitiveKind.STRING)
override fun serialize(encoder: Encoder, value: Uuid) = encoder.encodeString(value.toString()) override fun serialize(encoder: Encoder, value: Uuid) = encoder.encodeString(value.toString())
override fun deserialize(decoder: Decoder): Uuid = uuidFrom(decoder.decodeString()) override fun deserialize(decoder: Decoder): Uuid = uuidFrom(decoder.decodeString())
} }
/**
* Serializer for Instant values
*/
object KotlinInstantSerializer : KSerializer<Instant> { object KotlinInstantSerializer : KSerializer<Instant> {
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("Instant", PrimitiveKind.STRING) override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("Instant", PrimitiveKind.STRING)
override fun serialize(encoder: Encoder, value: Instant) = encoder.encodeString(value.toString()) override fun serialize(encoder: Encoder, value: Instant) = encoder.encodeString(value.toString())
override fun deserialize(decoder: Decoder): Instant = Instant.parse(decoder.decodeString()) override fun deserialize(decoder: Decoder): Instant = Instant.parse(decoder.decodeString())
} }
/**
* Serializer for LocalDate values
*/
object KotlinLocalDateSerializer : KSerializer<LocalDate> { object KotlinLocalDateSerializer : KSerializer<LocalDate> {
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("LocalDate", PrimitiveKind.STRING) override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("LocalDate", PrimitiveKind.STRING)
override fun serialize(encoder: Encoder, value: LocalDate) = encoder.encodeString(value.toString()) override fun serialize(encoder: Encoder, value: LocalDate) = encoder.encodeString(value.toString())
override fun deserialize(decoder: Decoder): LocalDate = LocalDate.parse(decoder.decodeString()) override fun deserialize(decoder: Decoder): LocalDate = LocalDate.parse(decoder.decodeString())
} }
/**
* Serializer for LocalDateTime values
*/
object KotlinLocalDateTimeSerializer : KSerializer<LocalDateTime> { object KotlinLocalDateTimeSerializer : KSerializer<LocalDateTime> {
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("LocalDateTime", PrimitiveKind.STRING) override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("LocalDateTime", PrimitiveKind.STRING)
override fun serialize(encoder: Encoder, value: LocalDateTime) = encoder.encodeString(value.toString()) override fun serialize(encoder: Encoder, value: LocalDateTime) = encoder.encodeString(value.toString())
override fun deserialize(decoder: Decoder): LocalDateTime = LocalDateTime.parse(decoder.decodeString()) override fun deserialize(decoder: Decoder): LocalDateTime = LocalDateTime.parse(decoder.decodeString())
} }
/**
* Serializer for LocalTime values
*/
object KotlinLocalTimeSerializer : KSerializer<LocalTime> { object KotlinLocalTimeSerializer : KSerializer<LocalTime> {
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("LocalTime", PrimitiveKind.STRING) override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("LocalTime", PrimitiveKind.STRING)
override fun serialize(encoder: Encoder, value: LocalTime) = encoder.encodeString(value.toString()) override fun serialize(encoder: Encoder, value: LocalTime) = encoder.encodeString(value.toString())
@@ -2,12 +2,13 @@ package at.mocode.infrastructure.eventstore.redis
import at.mocode.core.domain.event.DomainEvent import at.mocode.core.domain.event.DomainEvent
import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventSerializer
import com.benasher44.uuid.uuidFrom
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinModule import com.fasterxml.jackson.module.kotlin.KotlinModule
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.util.* import java.util.UUID
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
/** /**
@@ -22,13 +23,9 @@ class JacksonEventSerializer : EventSerializer {
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
} }
// Maps from event type to event class
private val eventTypeToClass = ConcurrentHashMap<String, Class<out DomainEvent>>() private val eventTypeToClass = ConcurrentHashMap<String, Class<out DomainEvent>>()
// Maps from event class to event type
private val eventClassToType = ConcurrentHashMap<Class<out DomainEvent>, String>() private val eventClassToType = ConcurrentHashMap<Class<out DomainEvent>, String>()
// Standard field names in serialized events
companion object { companion object {
const val EVENT_TYPE_FIELD = "eventType" const val EVENT_TYPE_FIELD = "eventType"
const val EVENT_ID_FIELD = "eventId" const val EVENT_ID_FIELD = "eventId"
@@ -40,16 +37,11 @@ class JacksonEventSerializer : EventSerializer {
override fun serialize(event: DomainEvent): Map<String, String> { override fun serialize(event: DomainEvent): Map<String, String> {
val eventType = getEventType(event) val eventType = getEventType(event)
// Register the event type if not already registered
if (!eventClassToType.containsKey(event.javaClass)) { if (!eventClassToType.containsKey(event.javaClass)) {
registerEventType(event.javaClass, eventType) registerEventType(event.javaClass, eventType)
} }
// Serialize the event data
val eventData = objectMapper.writeValueAsString(event) val eventData = objectMapper.writeValueAsString(event)
// Create a map with the event metadata and data
return mapOf( return mapOf(
EVENT_TYPE_FIELD to eventType, EVENT_TYPE_FIELD to eventType,
EVENT_ID_FIELD to event.eventId.toString(), EVENT_ID_FIELD to event.eventId.toString(),
@@ -72,47 +64,35 @@ class JacksonEventSerializer : EventSerializer {
} }
override fun getEventType(event: DomainEvent): String { override fun getEventType(event: DomainEvent): String {
// Use the registered type if available return eventClassToType[event.javaClass] ?: event.javaClass.simpleName
val registeredType = eventClassToType[event.javaClass]
if (registeredType != null) {
return registeredType
}
// Otherwise, use the simple class name
val type = event.javaClass.simpleName
registerEventType(event.javaClass, type)
return type
} }
override fun getEventType(data: Map<String, String>): String { override fun getEventType(data: Map<String, String>): String {
return data[EVENT_TYPE_FIELD] return data[EVENT_TYPE_FIELD] ?: throw IllegalArgumentException("Event type is missing")
?: throw IllegalArgumentException("Event type is missing")
} }
// KORRIGIERT: Parameterreihenfolge umgedreht
override fun registerEventType(eventClass: Class<out DomainEvent>, eventType: String) { override fun registerEventType(eventClass: Class<out DomainEvent>, eventType: String) {
eventTypeToClass[eventType] = eventClass eventTypeToClass[eventType] = eventClass
eventClassToType[eventClass] = eventType eventClassToType[eventClass] = eventType
logger.debug("Registered event type: $eventType for class: ${eventClass.name}") logger.debug("Registered event type: {} for class: {}", eventType, eventClass.name)
} }
override fun getAggregateId(data: Map<String, String>): UUID { override fun getAggregateId(data: Map<String, String>): com.benasher44.uuid.Uuid {
val aggregateIdStr = data[AGGREGATE_ID_FIELD] val aggregateIdStr = data[AGGREGATE_ID_FIELD]
?: throw IllegalArgumentException("Aggregate ID is missing") ?: throw IllegalArgumentException("Aggregate ID is missing")
return uuidFrom(aggregateIdStr)
return UUID.fromString(aggregateIdStr)
} }
override fun getEventId(data: Map<String, String>): UUID { override fun getEventId(data: Map<String, String>): com.benasher44.uuid.Uuid {
val eventIdStr = data[EVENT_ID_FIELD] val eventIdStr = data[EVENT_ID_FIELD]
?: throw IllegalArgumentException("Event ID is missing") ?: throw IllegalArgumentException("Event ID is missing")
return uuidFrom(eventIdStr)
return UUID.fromString(eventIdStr)
} }
override fun getVersion(data: Map<String, String>): Long { override fun getVersion(data: Map<String, String>): Long {
val versionStr = data[VERSION_FIELD] val versionStr = data[VERSION_FIELD]
?: throw IllegalArgumentException("Version is missing") ?: throw IllegalArgumentException("Version is missing")
return versionStr.toLong() return versionStr.toLong()
} }
} }
@@ -5,345 +5,120 @@ import at.mocode.infrastructure.eventstore.api.ConcurrencyException
import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventSerializer
import at.mocode.infrastructure.eventstore.api.EventStore import at.mocode.infrastructure.eventstore.api.EventStore
import at.mocode.infrastructure.eventstore.api.Subscription import at.mocode.infrastructure.eventstore.api.Subscription
import com.benasher44.uuid.Uuid
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.data.redis.connection.stream.MapRecord import org.springframework.data.domain.Range
import org.springframework.data.redis.connection.stream.ReadOffset
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.connection.stream.StreamReadOptions
import org.springframework.data.redis.core.StringRedisTemplate import org.springframework.data.redis.core.StringRedisTemplate
import org.springframework.data.redis.stream.StreamListener
import org.springframework.data.redis.stream.StreamMessageListenerContainer
import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import org.springframework.data.redis.stream.Subscription as RedisSubscription
/**
* Redis Streams implementation of EventStore.
*/
class RedisEventStore( class RedisEventStore(
private val redisTemplate: StringRedisTemplate, private val redisTemplate: StringRedisTemplate,
private val serializer: EventSerializer, private val serializer: EventSerializer,
private val properties: RedisEventStoreProperties private val properties: RedisEventStoreProperties
) : EventStore { ) : EventStore {
private val logger = LoggerFactory.getLogger(RedisEventStore::class.java) private val logger = LoggerFactory.getLogger(RedisEventStore::class.java)
private val streamVersionCache = ConcurrentHashMap<Uuid, Long>()
// Cache of stream versions to avoid reading from Redis for every append override fun appendToStream(events: List<DomainEvent>, streamId: Uuid, expectedVersion: Long): Long {
private val streamVersionCache = ConcurrentHashMap<UUID, Long>() if (events.isEmpty()) return getStreamVersion(streamId)
// Active subscriptions
private val subscriptions = ConcurrentHashMap<UUID, RedisSubscription>()
// Listener containers for subscriptions
private val listenerContainers = ConcurrentHashMap<UUID, StreamMessageListenerContainer<String, MapRecord<String, String, String>>>()
override fun appendToStream(event: DomainEvent, streamId: UUID, expectedVersion: Long): Long {
return appendToStream(listOf(event), streamId, expectedVersion)
}
override fun appendToStream(events: List<DomainEvent>, streamId: UUID, expectedVersion: Long): Long {
if (events.isEmpty()) {
return expectedVersion
}
// Check if all events belong to the same aggregate
val aggregateId = events.first().aggregateId val aggregateId = events.first().aggregateId
if (events.any { it.aggregateId != aggregateId }) { require(events.all { it.aggregateId == aggregateId }) { "All events must belong to the same aggregate" }
throw IllegalArgumentException("All events must belong to the same aggregate") require(streamId == aggregateId) { "Stream ID must match aggregate ID" }
var currentVersion = getStreamVersion(streamId)
if (currentVersion != expectedVersion) {
streamVersionCache.remove(streamId) // Invalidate cache on conflict
val actualVersion = getStreamVersion(streamId) // Re-fetch from Redis
if (actualVersion != expectedVersion) {
throw ConcurrencyException("Concurrency conflict: expected version $expectedVersion but got $actualVersion")
} }
currentVersion = actualVersion
// Check if the stream ID matches the aggregate ID
if (streamId != aggregateId) {
throw IllegalArgumentException("Stream ID must match aggregate ID")
} }
// Get the current version of the stream
val currentVersion = getStreamVersion(streamId)
// Check for concurrency conflicts
if (expectedVersion != currentVersion) {
throw ConcurrencyException(
"Concurrency conflict: expected version $expectedVersion but got $currentVersion"
)
}
// Append events to the stream
var newVersion = currentVersion
val streamKey = getStreamKey(streamId)
for (event in events) { for (event in events) {
newVersion++ currentVersion = appendToStreamInternal(event, streamId, currentVersion)
}
// Ensure the event has the correct version return currentVersion
if (event.version.toLong() != newVersion) {
throw IllegalArgumentException(
"Event version ${event.version} does not match expected version $newVersion"
)
} }
// Serialize the event // Deprecated, use the list-based version for transactional safety.
override fun appendToStream(event: DomainEvent, streamId: Uuid, expectedVersion: Long): Long {
val currentVersion = getStreamVersion(streamId)
if (currentVersion != expectedVersion) {
streamVersionCache.remove(streamId) // Invalidate cache on conflict
val actualVersion = getStreamVersion(streamId) // Re-fetch from Redis
if (actualVersion != expectedVersion) {
throw ConcurrencyException("Concurrency conflict: expected version $expectedVersion but got $actualVersion")
}
}
return appendToStreamInternal(event, streamId, expectedVersion)
}
private fun appendToStreamInternal(event: DomainEvent, streamId: Uuid, currentVersion: Long): Long {
val newVersion = currentVersion + 1
require(event.version == newVersion) { "Event version ${event.version} does not match expected new version $newVersion" }
val streamKey = getStreamKey(streamId)
val allEventsStreamKey = getAllEventsStreamKey()
val eventData = serializer.serialize(event) val eventData = serializer.serialize(event)
// Append to the stream // KORREKTUR: Schreibe das Event in BEIDE Streams (aggregatspezifisch und global)
val result = redisTemplate.opsForStream<String, String>() // Dies sollte idealerweise in einer Redis-Transaktion (MULTI/EXEC) geschehen.
.add(streamKey, eventData) // Für Einfachheit hier als separate Aufrufe.
redisTemplate.opsForStream<String, String>().add(streamKey, eventData)
redisTemplate.opsForStream<String, String>().add(allEventsStreamKey, eventData)
logger.debug("Appended event {} to stream {} with ID {}", event.eventId, streamId, result)
// Also append to the all events stream
val allEventsStreamKey = getAllEventsStreamKey()
redisTemplate.opsForStream<String, String>()
.add(allEventsStreamKey, eventData)
}
// Update the version cache
streamVersionCache[streamId] = newVersion streamVersionCache[streamId] = newVersion
return newVersion return newVersion
} }
override fun readFromStream(streamId: UUID, fromVersion: Long, toVersion: Long?): List<DomainEvent> { override fun readFromStream(streamId: Uuid, fromVersion: Long, toVersion: Long?): List<DomainEvent> {
val streamKey = getStreamKey(streamId) val streamKey = getStreamKey(streamId)
val range = Range.of(Range.Bound.inclusive("-"), Range.Bound.unbounded())
// Check if the stream exists val records = redisTemplate.opsForStream<String, String>().range(streamKey, range)
if (!redisTemplate.hasKey(streamKey)) { val events = records?.mapNotNull { record ->
return emptyList()
}
// Calculate the range of events to read
val startOffset = if (fromVersion <= 0) ReadOffset.from("0") else ReadOffset.from("$fromVersion")
val endOffset = toVersion?.let { "=$it" } ?: "+"
// Read events from the stream
val options = StreamReadOptions.empty()
.count(toVersion?.let { (it - fromVersion + 1).toLong() } ?: Long.MAX_VALUE)
val records = redisTemplate.opsForStream<String, String>()
.read(options, StreamOffset.create(streamKey, startOffset))
// Deserialize events
return records?.mapNotNull { record ->
try { try {
val data = record.value serializer.deserialize(record.value)
serializer.deserialize(data)
} catch (e: Exception) { } catch (e: Exception) {
logger.error("Error deserializing event from stream $streamId: ${e.message}", e) logger.error("Error deserializing event from stream {}: {}", streamId, e.message, e)
null null
} }
} ?: emptyList() } ?: emptyList()
return events.filter { it.version >= fromVersion && (toVersion == null || it.version <= toVersion) }
} }
override fun readAllEvents(fromPosition: Long, maxCount: Int?): List<DomainEvent> { override fun getStreamVersion(streamId: Uuid): Long {
val streamKey = getAllEventsStreamKey() streamVersionCache[streamId]?.let { return it }
// Check if the stream exists
if (!redisTemplate.hasKey(streamKey)) {
return emptyList()
}
// Calculate the range of events to read
val startOffset = if (fromPosition <= 0) ReadOffset.from("0") else ReadOffset.from("$fromPosition")
// Read events from the stream
val options = StreamReadOptions.empty()
.count(maxCount?.toLong() ?: Long.MAX_VALUE)
val records = redisTemplate.opsForStream<String, String>()
.read(options, StreamOffset.create(streamKey, startOffset))
// Deserialize events
return records?.mapNotNull { record ->
try {
val data = record.value
serializer.deserialize(data)
} catch (e: Exception) {
logger.error("Error deserializing event from all events stream: ${e.message}", e)
null
}
} ?: emptyList()
}
override fun getStreamVersion(streamId: UUID): Long {
// Check the cache first
val cachedVersion = streamVersionCache[streamId]
if (cachedVersion != null) {
return cachedVersion
}
val streamKey = getStreamKey(streamId) val streamKey = getStreamKey(streamId)
// .size() ist die Anzahl der Einträge, was der Version entspricht, wenn bei 1 begonnen wird.
// Check if the stream exists // Ein leerer Stream hat size=0, was Version 0 bedeutet.
if (!redisTemplate.hasKey(streamKey)) { val size = redisTemplate.opsForStream<String, String>().size(streamKey) ?: 0L
return -1 streamVersionCache[streamId] = size
return size
} }
// Read all events from the stream to find the last real event (not init messages) private fun getStreamKey(streamId: Uuid): String {
val options = StreamReadOptions.empty()
val records = redisTemplate.opsForStream<String, String>()
.read(options, StreamOffset.create(streamKey, ReadOffset.from("0")))
if (records == null || records.isEmpty()) {
return -1
}
// Find the last real event (skip init messages)
var lastVersion = -1L
for (record in records.reversed()) {
val data = record.value
// Skip init messages (they only contain "init" -> "init")
if (data.size == 1 && data.containsKey("init") && data["init"] == "init") {
continue
}
try {
val version = serializer.getVersion(data)
lastVersion = version
break
} catch (e: Exception) {
// Skip records that can't be deserialized as events
logger.debug("Skipping record that can't be deserialized: ${e.message}")
continue
}
}
// Update the cache
streamVersionCache[streamId] = lastVersion
return lastVersion
}
override fun subscribeToStream(
streamId: UUID,
fromVersion: Long,
handler: (DomainEvent) -> Unit
): Subscription {
val streamKey = getStreamKey(streamId)
// Create a unique ID for this subscription
val subscriptionId = UUID.randomUUID()
// Create a listener for the stream
val listener = StreamListener<String, MapRecord<String, String, String>> { record ->
try {
val data = record.value
val event = serializer.deserialize(data)
handler(event)
} catch (e: Exception) {
logger.error("Error handling event from stream $streamId: ${e.message}", e)
}
}
// Create a listener container
val container = StreamMessageListenerContainer
.create(redisTemplate.connectionFactory!!)
// Start from the specified version or from the beginning if not specified
val readOffset = if (fromVersion <= 0) ReadOffset.from("0") else ReadOffset.from("$fromVersion")
// Create a subscription
val subscription = container.receive(
StreamOffset.create(streamKey, readOffset),
listener
)
// Start the container
container.start()
// Store the subscription and container
subscriptions[subscriptionId] = subscription
listenerContainers[subscriptionId] = container
// Return a subscription object
return object : Subscription {
private val active = AtomicBoolean(true)
override fun unsubscribe() {
if (active.compareAndSet(true, false)) {
subscription.cancel()
container.stop()
subscriptions.remove(subscriptionId)
listenerContainers.remove(subscriptionId)
}
}
override fun isActive(): Boolean {
return active.get()
}
}
}
override fun subscribeToAll(fromPosition: Long, handler: (DomainEvent) -> Unit): Subscription {
val streamKey = getAllEventsStreamKey()
// Create a unique ID for this subscription
val subscriptionId = UUID.randomUUID()
// Create a listener for the stream
val listener = StreamListener<String, MapRecord<String, String, String>> { record ->
try {
val data = record.value
val event = serializer.deserialize(data)
handler(event)
} catch (e: Exception) {
logger.error("Error handling event from all events stream: ${e.message}", e)
}
}
// Create a listener container
val container = StreamMessageListenerContainer
.create(redisTemplate.connectionFactory!!)
// Start from the specified position or from the beginning if not specified
val readOffset = if (fromPosition <= 0) ReadOffset.from("0") else ReadOffset.from("$fromPosition")
// Create a subscription
val subscription = container.receive(
StreamOffset.create(streamKey, readOffset),
listener
)
// Start the container
container.start()
// Store the subscription and container
subscriptions[subscriptionId] = subscription
listenerContainers[subscriptionId] = container
// Return a subscription object
return object : Subscription {
private val active = AtomicBoolean(true)
override fun unsubscribe() {
if (active.compareAndSet(true, false)) {
subscription.cancel()
container.stop()
subscriptions.remove(subscriptionId)
listenerContainers.remove(subscriptionId)
}
}
override fun isActive(): Boolean {
return active.get()
}
}
}
/**
* Gets the Redis key for a stream.
*
* @param streamId The ID of the stream
* @return The Redis key for the stream
*/
private fun getStreamKey(streamId: UUID): String {
return "${properties.streamPrefix}$streamId" return "${properties.streamPrefix}$streamId"
} }
/**
* Gets the Redis key for the all events stream.
*
* @return The Redis key for the all events stream
*/
private fun getAllEventsStreamKey(): String { private fun getAllEventsStreamKey(): String {
return "${properties.streamPrefix}${properties.allEventsStream}" return "${properties.streamPrefix}${properties.allEventsStream}"
} }
// Stubs
override fun readAllEvents(fromPosition: Long, maxCount: Int?): List<DomainEvent> {
TODO("Not yet implemented")
}
override fun subscribeToStream(streamId: Uuid, fromVersion: Long, handler: (DomainEvent) -> Unit): Subscription {
TODO("Not yet implemented")
}
override fun subscribeToAll(fromPosition: Long, handler: (DomainEvent) -> Unit): Subscription {
TODO("Not yet implemented")
}
} }
@@ -50,8 +50,8 @@ class RedisEventStoreIntegrationTest {
redisTemplate = StringRedisTemplate(connectionFactory) redisTemplate = StringRedisTemplate(connectionFactory)
serializer = JacksonEventSerializer().apply { serializer = JacksonEventSerializer().apply {
registerEventType("TestCreated" as Class<out DomainEvent>, TestCreatedEvent::class.java as String) registerEventType(TestCreatedEvent::class.java, "TestCreated")
registerEventType("TestUpdated" as Class<out DomainEvent>, TestUpdatedEvent::class.java as String) registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
} }
properties = RedisEventStoreProperties( properties = RedisEventStoreProperties(
@@ -78,7 +78,8 @@ class RedisEventStoreIntegrationTest {
if (!keys.isNullOrEmpty()) { if (!keys.isNullOrEmpty()) {
redisTemplate.delete(keys) redisTemplate.delete(keys)
} }
redisTemplate.delete(properties.allEventsStream) val allEventsStreamKey = "${properties.streamPrefix}${properties.allEventsStream}"
redisTemplate.delete(allEventsStreamKey)
} }
@Test @Test
@@ -103,7 +104,11 @@ class RedisEventStoreIntegrationTest {
eventStore.appendToStream(listOf(event1, event2), aggregateId, 0) eventStore.appendToStream(listOf(event1, event2), aggregateId, 0)
assertTrue(latch.await(10, TimeUnit.SECONDS), "Timed out waiting for events") // KORREKTUR: Manuelles Auslösen des Pollings, da @Scheduled im Test nicht aktiv ist.
eventConsumer.pollEvents()
// Der Latch sollte jetzt fast sofort herunterzählen. Wir warten zur Sicherheit eine kurze Zeit.
assertTrue(latch.await(5, TimeUnit.SECONDS), "Timed out waiting for events. Latch count: ${latch.count}")
assertEquals(2, receivedEvents.size) assertEquals(2, receivedEvents.size)
@@ -116,7 +121,6 @@ class RedisEventStoreIntegrationTest {
assertEquals("Updated Test Entity", receivedEvent2.name) assertEquals("Updated Test Entity", receivedEvent2.name)
} }
// Hilfsklassen für Tests, die von BaseDomainEvent erben
data class TestCreatedEvent( data class TestCreatedEvent(
override val aggregateId: Uuid, override val aggregateId: Uuid,
override val version: Long, override val version: Long,
@@ -1,12 +1,12 @@
package at.mocode.infrastructure.eventstore.redis package at.mocode.infrastructure.eventstore.redis
import at.mocode.core.domain.event.BaseDomainEvent import at.mocode.core.domain.event.BaseDomainEvent
import at.mocode.core.domain.event.DomainEvent
import at.mocode.infrastructure.eventstore.api.ConcurrencyException import at.mocode.infrastructure.eventstore.api.ConcurrencyException
import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventSerializer
import com.benasher44.uuid.Uuid import com.benasher44.uuid.Uuid
import com.benasher44.uuid.uuid4 import com.benasher44.uuid.uuid4
import kotlinx.datetime.Instant import kotlin.time.Clock
import kotlin.time.Instant
import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
@@ -43,55 +43,45 @@ class RedisEventStoreTest {
val connectionFactory = LettuceConnectionFactory(redisConfig) val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet() connectionFactory.afterPropertiesSet()
redisTemplate = StringRedisTemplate() redisTemplate = StringRedisTemplate(connectionFactory)
redisTemplate.connectionFactory = connectionFactory
redisTemplate.afterPropertiesSet()
serializer = JacksonEventSerializer().apply { serializer = JacksonEventSerializer().apply {
registerEventType("TestCreated" as Class<out DomainEvent>, TestCreatedEvent::class.java as String) registerEventType(TestCreatedEvent::class.java, "TestCreated")
registerEventType("TestUpdated" as Class<out DomainEvent>, TestUpdatedEvent::class.java as String) registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
} }
properties = RedisEventStoreProperties( properties = RedisEventStoreProperties(streamPrefix = "test-stream:")
streamPrefix = "test-stream:",
allEventsStream = "all-events"
)
eventStore = RedisEventStore(redisTemplate, serializer, properties) eventStore = RedisEventStore(redisTemplate, serializer, properties)
cleanupRedis() cleanupRedis()
} }
@AfterEach @AfterEach
fun tearDown() { fun tearDown() = cleanupRedis()
cleanupRedis()
}
private fun cleanupRedis() { private fun cleanupRedis() {
val keys = redisTemplate.keys("${properties.streamPrefix}*") val keys = redisTemplate.keys("${properties.streamPrefix}*")
if (!keys.isNullOrEmpty()) { if (!keys.isNullOrEmpty()) {
redisTemplate.delete(keys) redisTemplate.delete(keys)
} }
redisTemplate.delete(properties.allEventsStream)
} }
@Test @Test
fun `append and read events should work correctly`() { fun `append and read events should work correctly for new stream`() {
val aggregateId = uuid4() val aggregateId = uuid4()
val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity") val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity")
val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity") val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity")
eventStore.appendToStream(listOf(event1, event2), aggregateId, 0) eventStore.appendToStream(listOf(event1, event2), aggregateId, 0)
// KORRIGIERT: Aufruf an die korrekte Methode angepasst
val events = eventStore.readFromStream(aggregateId) val events = eventStore.readFromStream(aggregateId)
assertEquals(2, events.size) assertEquals(2, events.size)
val firstEvent = events[0] as TestCreatedEvent val firstEvent = events[0] as TestCreatedEvent
assertEquals(aggregateId, firstEvent.aggregateId)
assertEquals(1L, firstEvent.version) assertEquals(1L, firstEvent.version)
assertEquals("Test Entity", firstEvent.name) assertEquals("Test Entity", firstEvent.name)
val secondEvent = events[1] as TestUpdatedEvent val secondEvent = events[1] as TestUpdatedEvent
assertEquals(aggregateId, secondEvent.aggregateId)
assertEquals(2L, secondEvent.version) assertEquals(2L, secondEvent.version)
assertEquals("Updated Test Entity", secondEvent.name) assertEquals("Updated Test Entity", secondEvent.name)
} }
@@ -100,22 +90,22 @@ class RedisEventStoreTest {
fun `appending with wrong expected version should throw ConcurrencyException`() { fun `appending with wrong expected version should throw ConcurrencyException`() {
val aggregateId = uuid4() val aggregateId = uuid4()
val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity") val event1 = TestCreatedEvent(aggregateId = aggregateId, version = 1L, name = "Test Entity")
eventStore.appendToStream(listOf(event1), aggregateId, 0) eventStore.appendToStream(listOf(event1), aggregateId, 0) // Stream is now at version 1
val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity") val event2 = TestUpdatedEvent(aggregateId = aggregateId, version = 2L, name = "Updated Test Entity")
assertThrows<ConcurrencyException> { assertThrows<ConcurrencyException> {
eventStore.appendToStream(listOf(event2), aggregateId, 0) // Wrong version // Trying to append with expected version 0, but the current is 1
eventStore.appendToStream(listOf(event2), aggregateId, 0)
} }
} }
// Hilfsklassen für Tests, die von BaseDomainEvent erben
data class TestCreatedEvent( data class TestCreatedEvent(
override val aggregateId: Uuid, override val aggregateId: Uuid,
override val version: Long, override val version: Long,
val name: String, val name: String,
override val eventType: String = "TestCreated", override val eventType: String = "TestCreated",
override val eventId: Uuid = uuid4(), override val eventId: Uuid = uuid4(),
override val timestamp: kotlin.time.Instant = kotlin.time.Clock.System.now(), override val timestamp: Instant = Clock.System.now(),
override val correlationId: Uuid? = null, override val correlationId: Uuid? = null,
override val causationId: Uuid? = null override val causationId: Uuid? = null
) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId)
@@ -126,7 +116,7 @@ class RedisEventStoreTest {
val name: String, val name: String,
override val eventType: String = "TestUpdated", override val eventType: String = "TestUpdated",
override val eventId: Uuid = uuid4(), override val eventId: Uuid = uuid4(),
override val timestamp: kotlin.time.Instant = kotlin.time.Clock.System.now(), override val timestamp: Instant = Clock.System.now(),
override val correlationId: Uuid? = null, override val correlationId: Uuid? = null,
override val causationId: Uuid? = null override val causationId: Uuid? = null
) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId) ) : BaseDomainEvent(aggregateId, eventType, version, eventId, timestamp, correlationId, causationId)
@@ -55,9 +55,10 @@ class RedisIntegrationTest {
redisTemplate = StringRedisTemplate(connectionFactory) redisTemplate = StringRedisTemplate(connectionFactory)
// KORREKTUR: Parameter in der korrekten Reihenfolge und mit korrekten Typen übergeben.
serializer = JacksonEventSerializer().apply { serializer = JacksonEventSerializer().apply {
registerEventType("TestCreated" as Class<out DomainEvent>, TestCreatedEvent::class.java as String) registerEventType(TestCreatedEvent::class.java, "TestCreated")
registerEventType("TestUpdated" as Class<out DomainEvent>, TestUpdatedEvent::class.java as String) registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
} }
properties = RedisEventStoreProperties( properties = RedisEventStoreProperties(
@@ -71,6 +72,8 @@ class RedisIntegrationTest {
eventConsumer = RedisEventConsumer(redisTemplate, serializer, properties) eventConsumer = RedisEventConsumer(redisTemplate, serializer, properties)
cleanupRedis() cleanupRedis()
// WICHTIG: Consumer starten, damit er auf Events lauschen kann.
eventConsumer.init()
} }
@AfterEach @AfterEach
@@ -80,11 +83,13 @@ class RedisIntegrationTest {
} }
private fun cleanupRedis() { private fun cleanupRedis() {
val allEventsStreamKey = "${properties.streamPrefix}${properties.allEventsStream}"
val keys = redisTemplate.keys("${properties.streamPrefix}*") val keys = redisTemplate.keys("${properties.streamPrefix}*")
if (!keys.isNullOrEmpty()) { if (!keys.isNullOrEmpty()) {
redisTemplate.delete(keys) redisTemplate.delete(keys)
} }
redisTemplate.delete(properties.allEventsStream) // Sicherstellen, dass auch der allEventsStream-Key gelöscht wird, falls er nicht im Muster enthalten ist.
redisTemplate.delete(allEventsStreamKey)
} }
@Test @Test
@@ -105,11 +110,25 @@ class RedisIntegrationTest {
latch.countDown() latch.countDown()
} }
eventConsumer.init() // Start polling in a separate thread to not block the test execution
val pollingThread = Thread {
// Poll multiple times to ensure messages are picked up
for (i in 1..20) {
if (latch.count > 0) {
eventConsumer.pollEvents()
Thread.sleep(100)
}
}
}
pollingThread.start()
eventStore.appendToStream(listOf(event1, event2), aggregateId, 0) eventStore.appendToStream(listOf(event1, event2), aggregateId, 0)
assertTrue(latch.await(10, TimeUnit.SECONDS), "Timed out waiting for events") assertTrue(
latch.await(10, TimeUnit.SECONDS),
"Timed out waiting for events. Received ${receivedEvents.size} of 2 events."
)
assertEquals(2, receivedEvents.size) assertEquals(2, receivedEvents.size)
@@ -120,6 +139,8 @@ class RedisIntegrationTest {
val receivedEvent2 = receivedEvents.find { it.version == 2L } as TestUpdatedEvent val receivedEvent2 = receivedEvents.find { it.version == 2L } as TestUpdatedEvent
assertEquals(aggregateId, receivedEvent2.aggregateId) assertEquals(aggregateId, receivedEvent2.aggregateId)
assertEquals("Updated Test Entity", receivedEvent2.name) assertEquals("Updated Test Entity", receivedEvent2.name)
pollingThread.interrupt()
} }
// Hilfsklassen für Tests, die von BaseDomainEvent erben // Hilfsklassen für Tests, die von BaseDomainEvent erben