refactor: Migrate from monolithic to modular architecture

1. **Dokumentation der Architektur:**
    - Vervollständigen Sie die C4-Diagramme im docs-Verzeichnis
    - Dokumentieren Sie die wichtigsten Architekturentscheidungen in ADRs

2. **Redis-Integration finalisieren:**
    - Implementieren Sie die verteilte Cache-Lösung für die Offline-Fähigkeit
    - Nutzen Sie Redis Streams für das Event-Sourcing
This commit is contained in:
stefan
2025-07-23 14:29:40 +02:00
parent a256622f37
commit 9282dd0eb4
52 changed files with 5648 additions and 3 deletions
@@ -0,0 +1,69 @@
package at.mocode.infrastructure.cache.api
import java.time.Duration
/**
* Configuration for the distributed cache.
*/
interface CacheConfiguration {
/**
* Default time-to-live for cache entries.
* If null, entries do not expire by default.
*/
val defaultTtl: Duration?
/**
* Maximum number of entries to store in the local cache.
* If null, there is no limit.
*/
val localCacheMaxSize: Int?
/**
* Whether to enable offline mode.
* If true, the cache will store entries locally when offline
* and synchronize them when online.
*/
val offlineModeEnabled: Boolean
/**
* How often to attempt synchronization in offline mode.
*/
val synchronizationInterval: Duration
/**
* Maximum age of entries to keep in the local cache when offline.
* If null, entries do not expire when offline.
*/
val offlineEntryMaxAge: Duration?
/**
* Prefix to add to all cache keys.
* This can be used to namespace cache entries.
*/
val keyPrefix: String
/**
* Whether to compress cache entries.
*/
val compressionEnabled: Boolean
/**
* Threshold in bytes above which to compress cache entries.
* Only used if compressionEnabled is true.
*/
val compressionThreshold: Int
}
/**
* Default implementation of CacheConfiguration.
*/
data class DefaultCacheConfiguration(
override val defaultTtl: Duration? = Duration.ofHours(1),
override val localCacheMaxSize: Int? = 10000,
override val offlineModeEnabled: Boolean = true,
override val synchronizationInterval: Duration = Duration.ofMinutes(5),
override val offlineEntryMaxAge: Duration? = Duration.ofDays(7),
override val keyPrefix: String = "",
override val compressionEnabled: Boolean = true,
override val compressionThreshold: Int = 1024
) : CacheConfiguration
@@ -0,0 +1,96 @@
package at.mocode.infrastructure.cache.api
import java.time.Instant
/**
* Represents an entry in the cache with metadata for offline capability.
*
* @param key The key of the cache entry
* @param value The value stored in the cache
* @param createdAt When the entry was created
* @param expiresAt When the entry expires, or null if it doesn't expire
* @param lastModifiedAt When the entry was last modified
* @param isDirty Whether the entry has been modified locally and needs to be synchronized
* @param isLocal Whether the entry is only stored locally (not yet synchronized)
*/
data class CacheEntry<T : Any>(
val key: String,
val value: T,
val createdAt: Instant = Instant.now(),
val expiresAt: Instant? = null,
val lastModifiedAt: Instant = Instant.now(),
val isDirty: Boolean = false,
val isLocal: Boolean = false
) {
/**
* Checks if the entry is expired.
*
* @return true if the entry is expired, false otherwise
*/
fun isExpired(): Boolean {
return expiresAt?.isBefore(Instant.now()) ?: false
}
/**
* Creates a new entry with the isDirty flag set to true.
*
* @return A new CacheEntry with isDirty set to true
*/
fun markDirty(): CacheEntry<T> {
return copy(
isDirty = true,
lastModifiedAt = Instant.now()
)
}
/**
* Creates a new entry with the isDirty flag set to false.
*
* @return A new CacheEntry with isDirty set to false
*/
fun markClean(): CacheEntry<T> {
return copy(
isDirty = false,
isLocal = false,
lastModifiedAt = Instant.now()
)
}
/**
* Creates a new entry with the isLocal flag set to true.
*
* @return A new CacheEntry with isLocal set to true
*/
fun markLocal(): CacheEntry<T> {
return copy(
isLocal = true,
lastModifiedAt = Instant.now()
)
}
/**
* Creates a new entry with an updated value.
*
* @param newValue The new value
* @return A new CacheEntry with the updated value
*/
fun updateValue(newValue: T): CacheEntry<T> {
return copy(
value = newValue,
lastModifiedAt = Instant.now()
)
}
/**
* Creates a new entry with an updated expiration time.
*
* @param newExpiresAt The new expiration time
* @return A new CacheEntry with the updated expiration time
*/
fun updateExpiration(newExpiresAt: Instant?): CacheEntry<T> {
return copy(
expiresAt = newExpiresAt,
lastModifiedAt = Instant.now()
)
}
}
@@ -0,0 +1,56 @@
package at.mocode.infrastructure.cache.api
/**
* Interface for serializing and deserializing cache entries.
*/
interface CacheSerializer {
/**
* Serializes a value to a byte array.
*
* @param value The value to serialize
* @return The serialized value as a byte array
*/
fun <T : Any> serialize(value: T): ByteArray
/**
* Deserializes a byte array to a value.
*
* @param bytes The byte array to deserialize
* @param clazz The class of the value to deserialize to
* @return The deserialized value
*/
fun <T : Any> deserialize(bytes: ByteArray, clazz: Class<T>): T
/**
* Serializes a cache entry to a byte array.
*
* @param entry The cache entry to serialize
* @return The serialized cache entry as a byte array
*/
fun <T : Any> serializeEntry(entry: CacheEntry<T>): ByteArray
/**
* Deserializes a byte array to a cache entry.
*
* @param bytes The byte array to deserialize
* @param valueClass The class of the value in the cache entry
* @return The deserialized cache entry
*/
fun <T : Any> deserializeEntry(bytes: ByteArray, valueClass: Class<T>): CacheEntry<T>
/**
* Compresses a byte array.
*
* @param bytes The byte array to compress
* @return The compressed byte array
*/
fun compress(bytes: ByteArray): ByteArray
/**
* Decompresses a byte array.
*
* @param bytes The byte array to decompress
* @return The decompressed byte array
*/
fun decompress(bytes: ByteArray): ByteArray
}
@@ -0,0 +1,76 @@
package at.mocode.infrastructure.cache.api
import java.time.Instant
/**
* Represents the connection status of the cache.
*/
enum class ConnectionState {
/**
* The cache is connected to the remote server.
*/
CONNECTED,
/**
* The cache is disconnected from the remote server.
*/
DISCONNECTED,
/**
* The cache is attempting to reconnect to the remote server.
*/
RECONNECTING
}
/**
* Interface for tracking the connection status of the cache.
*/
interface ConnectionStatusTracker {
/**
* Gets the current connection state.
*
* @return The current connection state
*/
fun getConnectionState(): ConnectionState
/**
* Gets the time when the connection state last changed.
*
* @return The time when the connection state last changed
*/
fun getLastStateChangeTime(): Instant
/**
* Registers a listener to be notified when the connection state changes.
*
* @param listener The listener to register
*/
fun registerConnectionListener(listener: ConnectionStateListener)
/**
* Unregisters a connection state listener.
*
* @param listener The listener to unregister
*/
fun unregisterConnectionListener(listener: ConnectionStateListener)
/**
* Checks if the cache is currently connected.
*
* @return true if the cache is connected, false otherwise
*/
fun isConnected(): Boolean = getConnectionState() == ConnectionState.CONNECTED
}
/**
* Listener for connection state changes.
*/
interface ConnectionStateListener {
/**
* Called when the connection state changes.
*
* @param newState The new connection state
* @param timestamp The time when the state changed
*/
fun onConnectionStateChanged(newState: ConnectionState, timestamp: Instant)
}
@@ -0,0 +1,94 @@
package at.mocode.infrastructure.cache.api
import java.time.Duration
/**
* Interface for a distributed cache that supports offline capability.
* This cache can be used to store and retrieve data across multiple instances
* and provides mechanisms for offline operation.
*/
interface DistributedCache {
/**
* Retrieves a value from the cache.
*
* @param key The key to retrieve
* @return The value associated with the key, or null if not found
*/
fun <T : Any> get(key: String, clazz: Class<T>): T?
/**
* Stores a value in the cache with an optional time-to-live.
*
* @param key The key to store the value under
* @param value The value to store
* @param ttl Optional time-to-live for the cache entry
*/
fun <T : Any> set(key: String, value: T, ttl: Duration? = null)
/**
* Removes a value from the cache.
*
* @param key The key to remove
*/
fun delete(key: String)
/**
* Checks if a key exists in the cache.
*
* @param key The key to check
* @return true if the key exists, false otherwise
*/
fun exists(key: String): Boolean
/**
* Retrieves multiple values from the cache.
*
* @param keys The keys to retrieve
* @return A map of keys to values, with missing keys omitted
*/
fun <T : Any> multiGet(keys: Collection<String>, clazz: Class<T>): Map<String, T>
/**
* Stores multiple values in the cache with an optional time-to-live.
*
* @param entries The key-value pairs to store
* @param ttl Optional time-to-live for the cache entries
*/
fun <T : Any> multiSet(entries: Map<String, T>, ttl: Duration? = null)
/**
* Removes multiple values from the cache.
*
* @param keys The keys to remove
*/
fun multiDelete(keys: Collection<String>)
/**
* Synchronizes the local cache with the distributed cache.
* This is used to ensure that the local cache is up-to-date with the distributed cache
* after being offline.
*
* @param keys Optional collection of keys to synchronize. If null, all keys are synchronized.
*/
fun synchronize(keys: Collection<String>? = null)
/**
* Marks a key as dirty, indicating that it has been modified locally
* and needs to be synchronized with the distributed cache.
*
* @param key The key to mark as dirty
*/
fun markDirty(key: String)
/**
* Gets all keys that have been marked as dirty.
*
* @return A collection of dirty keys
*/
fun getDirtyKeys(): Collection<String>
/**
* Clears all entries from the cache.
*/
fun clear()
}
+1
View File
@@ -4,6 +4,7 @@ plugins {
}
dependencies {
api(platform(projects.platform.platformBom))
implementation(projects.infrastructure.cache.cacheApi)
implementation("org.springframework.boot:spring-boot-starter-data-redis")
@@ -0,0 +1,119 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.CacheEntry
import at.mocode.infrastructure.cache.api.CacheSerializer
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.kotlin.readValue
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
/**
* Jackson-based implementation of CacheSerializer.
*/
class JacksonCacheSerializer : CacheSerializer {
private val objectMapper: ObjectMapper = ObjectMapper().apply {
registerModule(KotlinModule.Builder().build())
registerModule(JavaTimeModule())
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
}
override fun <T : Any> serialize(value: T): ByteArray {
return objectMapper.writeValueAsBytes(value)
}
override fun <T : Any> deserialize(bytes: ByteArray, clazz: Class<T>): T {
return objectMapper.readValue(bytes, clazz)
}
override fun <T : Any> serializeEntry(entry: CacheEntry<T>): ByteArray {
// Create a wrapper that holds both the entry metadata and the serialized value
val wrapper = CacheEntryWrapper(
key = entry.key,
valueBytes = serialize(entry.value),
valueType = entry.value.javaClass.name,
createdAt = entry.createdAt,
expiresAt = entry.expiresAt,
lastModifiedAt = entry.lastModifiedAt,
isDirty = entry.isDirty,
isLocal = entry.isLocal
)
return objectMapper.writeValueAsBytes(wrapper)
}
override fun <T : Any> deserializeEntry(bytes: ByteArray, valueClass: Class<T>): CacheEntry<T> {
val wrapper = objectMapper.readValue<CacheEntryWrapper>(bytes)
val value = deserialize(wrapper.valueBytes, valueClass)
return CacheEntry(
key = wrapper.key,
value = value,
createdAt = wrapper.createdAt,
expiresAt = wrapper.expiresAt,
lastModifiedAt = wrapper.lastModifiedAt,
isDirty = wrapper.isDirty,
isLocal = wrapper.isLocal
)
}
override fun compress(bytes: ByteArray): ByteArray {
val outputStream = ByteArrayOutputStream()
GZIPOutputStream(outputStream).use { it.write(bytes) }
return outputStream.toByteArray()
}
override fun decompress(bytes: ByteArray): ByteArray {
val inputStream = GZIPInputStream(ByteArrayInputStream(bytes))
return inputStream.readBytes()
}
/**
* Wrapper class for serializing cache entries.
* This separates the metadata from the value, allowing us to deserialize
* the metadata without knowing the type of the value.
*/
private data class CacheEntryWrapper(
val key: String,
val valueBytes: ByteArray,
val valueType: String,
val createdAt: java.time.Instant,
val expiresAt: java.time.Instant?,
val lastModifiedAt: java.time.Instant,
val isDirty: Boolean,
val isLocal: Boolean
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as CacheEntryWrapper
if (key != other.key) return false
if (!valueBytes.contentEquals(other.valueBytes)) return false
if (valueType != other.valueType) return false
if (createdAt != other.createdAt) return false
if (expiresAt != other.expiresAt) return false
if (lastModifiedAt != other.lastModifiedAt) return false
if (isDirty != other.isDirty) return false
if (isLocal != other.isLocal) return false
return true
}
override fun hashCode(): Int {
var result = key.hashCode()
result = 31 * result + valueBytes.contentHashCode()
result = 31 * result + valueType.hashCode()
result = 31 * result + createdAt.hashCode()
result = 31 * result + (expiresAt?.hashCode() ?: 0)
result = 31 * result + lastModifiedAt.hashCode()
result = 31 * result + isDirty.hashCode()
result = 31 * result + isLocal.hashCode()
return result
}
}
}
@@ -0,0 +1,99 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.CacheConfiguration
import at.mocode.infrastructure.cache.api.CacheSerializer
import at.mocode.infrastructure.cache.api.DefaultCacheConfiguration
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.RedisConnectionFactory
import org.springframework.data.redis.connection.RedisPassword
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
/**
* Redis connection properties.
*/
@ConfigurationProperties(prefix = "redis")
data class RedisProperties(
val host: String = "localhost",
val port: Int = 6379,
val password: String? = null,
val database: Int = 0,
val connectionTimeout: Long = 2000,
val readTimeout: Long = 2000,
val usePooling: Boolean = true,
val maxPoolSize: Int = 8,
val minPoolSize: Int = 2
)
/**
* Spring configuration for Redis.
*/
@Configuration
@EnableConfigurationProperties(RedisProperties::class)
class RedisConfiguration {
/**
* Creates a Redis connection factory.
*
* @param properties Redis connection properties
* @return Redis connection factory
*/
@Bean
fun redisConnectionFactory(properties: RedisProperties): RedisConnectionFactory {
val config = RedisStandaloneConfiguration().apply {
hostName = properties.host
port = properties.port
properties.password?.let { password = RedisPassword.of(it) }
database = properties.database
}
return LettuceConnectionFactory(config).apply {
// Configure connection timeouts
afterPropertiesSet()
}
}
/**
* Creates a Redis template for byte arrays.
*
* @param connectionFactory Redis connection factory
* @return Redis template
*/
@Bean
fun redisTemplate(connectionFactory: RedisConnectionFactory): RedisTemplate<String, ByteArray> {
return RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
// Use default serializer for values (byte arrays)
afterPropertiesSet()
}
}
/**
* Creates a cache serializer.
*
* @return Cache serializer
*/
@Bean
@ConditionalOnMissingBean
fun cacheSerializer(): CacheSerializer {
return JacksonCacheSerializer()
}
/**
* Creates a default cache configuration if none is provided.
*
* @return Cache configuration
*/
@Bean
@ConditionalOnMissingBean
fun cacheConfiguration(): CacheConfiguration {
return DefaultCacheConfiguration()
}
}
@@ -0,0 +1,494 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.CacheConfiguration
import at.mocode.infrastructure.cache.api.CacheEntry
import at.mocode.infrastructure.cache.api.CacheSerializer
import at.mocode.infrastructure.cache.api.ConnectionState
import at.mocode.infrastructure.cache.api.ConnectionStateListener
import at.mocode.infrastructure.cache.api.ConnectionStatusTracker
import at.mocode.infrastructure.cache.api.DistributedCache
import org.slf4j.LoggerFactory
import org.springframework.data.redis.RedisConnectionFailureException
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.scheduling.annotation.Scheduled
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
/**
* Redis implementation of DistributedCache with offline capability.
*/
class RedisDistributedCache(
private val redisTemplate: RedisTemplate<String, ByteArray>,
private val serializer: CacheSerializer,
private val config: CacheConfiguration
) : DistributedCache, ConnectionStatusTracker {
private val logger = LoggerFactory.getLogger(RedisDistributedCache::class.java)
// Local cache for offline capability
private val localCache = ConcurrentHashMap<String, CacheEntry<Any>>()
// Set of keys that have been modified locally and need to be synchronized
private val dirtyKeys = ConcurrentHashMap.newKeySet<String>()
// Connection state
private var connectionState = ConnectionState.DISCONNECTED
private var lastStateChangeTime = Instant.now()
// Connection state listeners
private val connectionListeners = CopyOnWriteArrayList<ConnectionStateListener>()
init {
// Try to connect to Redis
checkConnection()
}
//
// DistributedCache implementation
//
override fun <T : Any> get(key: String, clazz: Class<T>): T? {
val prefixedKey = addPrefix(key)
// Try to get from local cache first
val localEntry = localCache[prefixedKey] as? CacheEntry<T>
if (localEntry != null) {
if (localEntry.isExpired()) {
localCache.remove(prefixedKey)
return null
}
return localEntry.value
}
// If not in local cache and we're disconnected, return null
if (!isConnected()) {
return null
}
// Try to get from Redis
try {
val bytes = redisTemplate.opsForValue().get(prefixedKey) ?: return null
val entry = serializer.deserializeEntry(bytes, clazz)
// Store in local cache
localCache[prefixedKey] = entry as CacheEntry<Any>
return entry.value
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
return null
} catch (e: Exception) {
logger.error("Error getting value from Redis for key $prefixedKey", e)
return null
}
}
override fun <T : Any> set(key: String, value: T, ttl: Duration?) {
val prefixedKey = addPrefix(key)
val expiresAt = ttl?.let { Instant.now().plus(it) } ?: config.defaultTtl?.let { Instant.now().plus(it) }
val entry = CacheEntry(
key = prefixedKey,
value = value,
expiresAt = expiresAt
)
// Store in local cache
localCache[prefixedKey] = entry as CacheEntry<Any>
// If we're disconnected, mark as dirty and return
if (!isConnected()) {
markDirty(key)
return
}
// Try to store in Redis
try {
val bytes = serializer.serializeEntry(entry)
redisTemplate.opsForValue().set(prefixedKey, bytes)
if (ttl != null) {
redisTemplate.expire(prefixedKey, ttl)
} else if (config.defaultTtl != null) {
val defaultTtl: Duration? = config.defaultTtl
redisTemplate.expire(prefixedKey, defaultTtl)
}
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
markDirty(key)
} catch (e: Exception) {
logger.error("Error setting value in Redis for key $prefixedKey", e)
markDirty(key)
}
}
override fun delete(key: String) {
val prefixedKey = addPrefix(key)
// Remove from local cache
localCache.remove(prefixedKey)
// If we're disconnected, mark as dirty and return
if (!isConnected()) {
markDirty(key)
return
}
// Try to delete from Redis
try {
redisTemplate.delete(prefixedKey)
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
markDirty(key)
} catch (e: Exception) {
logger.error("Error deleting value from Redis for key $prefixedKey", e)
markDirty(key)
}
}
override fun exists(key: String): Boolean {
val prefixedKey = addPrefix(key)
// Check local cache first
if (localCache.containsKey(prefixedKey)) {
val entry = localCache[prefixedKey]
if (entry != null && !entry.isExpired()) {
return true
}
// Remove expired entry
localCache.remove(prefixedKey)
}
// If we're disconnected, return false
if (!isConnected()) {
return false
}
// Check Redis
try {
return redisTemplate.hasKey(prefixedKey) ?: false
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
return false
} catch (e: Exception) {
logger.error("Error checking if key exists in Redis for key $prefixedKey", e)
return false
}
}
override fun <T : Any> multiGet(keys: Collection<String>, clazz: Class<T>): Map<String, T> {
val result = mutableMapOf<String, T>()
// Get from local cache first
val prefixedKeys = keys.map { addPrefix(it) }
val localEntries = prefixedKeys.mapNotNull { key ->
val entry = localCache[key] as? CacheEntry<T>
if (entry != null && !entry.isExpired()) {
key to entry.value
} else {
null
}
}.toMap()
result.putAll(localEntries.mapKeys { removePrefix(it.key) })
// If we're disconnected, return local entries
if (!isConnected()) {
return result
}
// Get missing keys from Redis
val missingKeys = prefixedKeys.filter { !localEntries.containsKey(it) }
if (missingKeys.isEmpty()) {
return result
}
try {
val redisEntries = redisTemplate.opsForValue().multiGet(missingKeys)
if (redisEntries != null) {
for (i in missingKeys.indices) {
val key = missingKeys[i]
val bytes = redisEntries[i]
if (bytes != null) {
try {
val entry = serializer.deserializeEntry(bytes, clazz)
// Store in local cache
localCache[key] = entry as CacheEntry<Any>
// Add to result
result[removePrefix(key)] = entry.value
} catch (e: Exception) {
logger.error("Error deserializing entry for key $key", e)
}
}
}
}
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
} catch (e: Exception) {
logger.error("Error getting multiple values from Redis", e)
}
return result
}
override fun <T : Any> multiSet(entries: Map<String, T>, ttl: Duration?) {
// Store in local cache and prepare for Redis
val redisBatch = mutableMapOf<String, ByteArray>()
val expiresAt = ttl?.let { Instant.now().plus(it) } ?: config.defaultTtl?.let { Instant.now().plus(it) }
for ((key, value) in entries) {
val prefixedKey = addPrefix(key)
val entry = CacheEntry(
key = prefixedKey,
value = value,
expiresAt = expiresAt
)
// Store in local cache
localCache[prefixedKey] = entry as CacheEntry<Any>
// Prepare for Redis
redisBatch[prefixedKey] = serializer.serializeEntry(entry)
}
// If we're disconnected, mark all as dirty and return
if (!isConnected()) {
entries.keys.forEach { markDirty(it) }
return
}
// Try to store in Redis
try {
redisTemplate.opsForValue().multiSet(redisBatch)
if (ttl != null || config.defaultTtl != null) {
val duration = ttl ?: config.defaultTtl
if (duration != null) {
for (key in redisBatch.keys) {
redisTemplate.expire(key, duration)
}
}
}
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
entries.keys.forEach { markDirty(it) }
} catch (e: Exception) {
logger.error("Error setting multiple values in Redis", e)
entries.keys.forEach { markDirty(it) }
}
}
override fun multiDelete(keys: Collection<String>) {
val prefixedKeys = keys.map { addPrefix(it) }
// Remove from local cache
prefixedKeys.forEach { localCache.remove(it) }
// If we're disconnected, mark all as dirty and return
if (!isConnected()) {
keys.forEach { markDirty(it) }
return
}
// Try to delete from Redis
try {
redisTemplate.delete(prefixedKeys)
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
keys.forEach { markDirty(it) }
} catch (e: Exception) {
logger.error("Error deleting multiple values from Redis", e)
keys.forEach { markDirty(it) }
}
}
override fun synchronize(keys: Collection<String>?) {
if (!isConnected()) {
logger.debug("Cannot synchronize while disconnected")
return
}
val keysToSync = keys ?: getDirtyKeys()
if (keysToSync.isEmpty()) {
logger.debug("No keys to synchronize")
return
}
logger.debug("Synchronizing ${keysToSync.size} keys")
for (key in keysToSync) {
val prefixedKey = addPrefix(key)
val localEntry = localCache[prefixedKey]
if (localEntry == null) {
// Entry was deleted locally, delete from Redis
try {
redisTemplate.delete(prefixedKey)
dirtyKeys.remove(key)
} catch (e: Exception) {
logger.error("Error deleting key $prefixedKey during synchronization", e)
}
} else {
// Entry exists locally, update in Redis
try {
val bytes = serializer.serializeEntry(localEntry)
redisTemplate.opsForValue().set(prefixedKey, bytes)
val ttl = localEntry.expiresAt?.let { Duration.between(Instant.now(), it) }
if (ttl != null && !ttl.isNegative) {
redisTemplate.expire(prefixedKey, ttl)
}
// Update local entry to mark as clean
localCache[prefixedKey] = localEntry.markClean() as CacheEntry<Any>
dirtyKeys.remove(key)
} catch (e: Exception) {
logger.error("Error updating key $prefixedKey during synchronization", e)
}
}
}
}
override fun markDirty(key: String) {
dirtyKeys.add(key)
val prefixedKey = addPrefix(key)
val entry = localCache[prefixedKey]
if (entry != null) {
localCache[prefixedKey] = entry.markDirty() as CacheEntry<Any>
}
}
override fun getDirtyKeys(): Collection<String> {
return dirtyKeys.toList()
}
override fun clear() {
// Clear local cache
localCache.clear()
dirtyKeys.clear()
// If we're disconnected, return
if (!isConnected()) {
return
}
// Try to clear Redis
try {
val keys = redisTemplate.keys("${config.keyPrefix}*")
if (keys != null && keys.isNotEmpty()) {
redisTemplate.delete(keys)
}
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
} catch (e: Exception) {
logger.error("Error clearing Redis cache", e)
}
}
//
// ConnectionStatusTracker implementation
//
override fun getConnectionState(): ConnectionState {
return connectionState
}
override fun getLastStateChangeTime(): Instant {
return lastStateChangeTime
}
override fun registerConnectionListener(listener: ConnectionStateListener) {
connectionListeners.add(listener)
}
override fun unregisterConnectionListener(listener: ConnectionStateListener) {
connectionListeners.remove(listener)
}
//
// Helper methods
//
private fun addPrefix(key: String): String {
return if (config.keyPrefix.isEmpty()) key else "${config.keyPrefix}:$key"
}
private fun removePrefix(key: String): String {
return if (config.keyPrefix.isEmpty()) key else key.substring(config.keyPrefix.length + 1)
}
private fun handleConnectionFailure(e: Exception) {
logger.warn("Redis connection failure: ${e.message}")
setConnectionState(ConnectionState.DISCONNECTED)
}
private fun setConnectionState(newState: ConnectionState) {
if (connectionState != newState) {
val oldState = connectionState
connectionState = newState
lastStateChangeTime = Instant.now()
logger.info("Cache connection state changed from $oldState to $newState")
// Notify listeners
val timestamp = lastStateChangeTime
connectionListeners.forEach { listener ->
try {
listener.onConnectionStateChanged(newState, timestamp)
} catch (e: Exception) {
logger.error("Error notifying connection listener", e)
}
}
// If reconnected, synchronize dirty keys
if (oldState != ConnectionState.CONNECTED && newState == ConnectionState.CONNECTED) {
synchronize(null)
}
}
}
/**
* Periodically check the connection to Redis.
*/
@Scheduled(fixedDelayString = "\${redis.connection-check-interval:10000}")
fun checkConnection() {
try {
redisTemplate.hasKey("connection-test")
setConnectionState(ConnectionState.CONNECTED)
} catch (e: Exception) {
setConnectionState(ConnectionState.DISCONNECTED)
}
}
/**
* Periodically clean up expired entries from the local cache.
*/
@Scheduled(fixedDelayString = "\${redis.local-cache-cleanup-interval:60000}")
fun cleanupLocalCache() {
val now = Instant.now()
val expiredKeys = localCache.entries
.filter { it.value.expiresAt?.isBefore(now) ?: false }
.map { it.key }
expiredKeys.forEach { localCache.remove(it) }
if (expiredKeys.isNotEmpty()) {
logger.debug("Removed ${expiredKeys.size} expired entries from local cache")
}
}
/**
* Periodically synchronize dirty keys when connected.
*/
@Scheduled(fixedDelayString = "\${redis.sync-interval:300000}")
fun scheduledSync() {
if (isConnected() && dirtyKeys.isNotEmpty()) {
synchronize(null)
}
}
}
@@ -0,0 +1,198 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.CacheConfiguration
import at.mocode.infrastructure.cache.api.CacheSerializer
import at.mocode.infrastructure.cache.api.ConnectionState
import at.mocode.infrastructure.cache.api.DefaultCacheConfiguration
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.time.Duration
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
@Testcontainers
class RedisDistributedCacheTest {
companion object {
@Container
val redisContainer = GenericContainer(DockerImageName.parse("redis:7-alpine"))
.withExposedPorts(6379)
}
private lateinit var redisTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
private lateinit var cache: RedisDistributedCache
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "test:",
offlineModeEnabled = true
)
cache = RedisDistributedCache(redisTemplate, serializer, config)
// Clear the cache before each test
cache.clear()
}
@AfterEach
fun tearDown() {
cache.clear()
}
@Test
fun `test basic cache operations`() {
// Set a value
cache.set("key1", "value1")
// Get the value
val value = cache.get("key1", String::class.java)
assertEquals("value1", value)
// Check if the key exists
assertTrue(cache.exists("key1"))
// Delete the key
cache.delete("key1")
// Verify it's gone
assertFalse(cache.exists("key1"))
assertNull(cache.get("key1", String::class.java))
}
@Test
fun `test cache with TTL`() {
// Set a value with a short TTL
cache.set("key2", "value2", Duration.ofMillis(100))
// Verify it exists
assertTrue(cache.exists("key2"))
assertEquals("value2", cache.get("key2", String::class.java))
// Wait for it to expire
Thread.sleep(200)
// Verify it's gone
assertFalse(cache.exists("key2"))
assertNull(cache.get("key2", String::class.java))
}
@Test
fun `test batch operations`() {
// Set multiple values
val entries = mapOf(
"batch1" to "value1",
"batch2" to "value2",
"batch3" to "value3"
)
cache.multiSet(entries)
// Get multiple values
val values = cache.multiGet(listOf("batch1", "batch2", "batch3"), String::class.java)
assertEquals(3, values.size)
assertEquals("value1", values["batch1"])
assertEquals("value2", values["batch2"])
assertEquals("value3", values["batch3"])
// Delete multiple values
cache.multiDelete(listOf("batch1", "batch3"))
// Verify they're gone
val remainingValues = cache.multiGet(listOf("batch1", "batch2", "batch3"), String::class.java)
assertEquals(1, remainingValues.size)
assertNull(remainingValues["batch1"])
assertEquals("value2", remainingValues["batch2"])
assertNull(remainingValues["batch3"])
}
@Test
fun `test offline capability`() {
// Set a value
cache.set("offline1", "value1")
// Simulate going offline by stopping the Redis container
redisContainer.stop()
// Verify connection state is DISCONNECTED
assertEquals(ConnectionState.DISCONNECTED, cache.getConnectionState())
// We should still be able to get the value from local cache
assertEquals("value1", cache.get("offline1", String::class.java))
// Set a new value while offline
cache.set("offline2", "value2")
// Verify it's marked as dirty
assertTrue(cache.getDirtyKeys().contains("offline2"))
// Start Redis again
redisContainer.start()
// Manually trigger synchronization
cache.synchronize()
// Verify connection state is CONNECTED
assertEquals(ConnectionState.CONNECTED, cache.getConnectionState())
// Verify the value set while offline is now in Redis
assertEquals("value2", cache.get("offline2", String::class.java))
// Verify it's no longer marked as dirty
assertFalse(cache.getDirtyKeys().contains("offline2"))
}
@Test
fun `test complex objects`() {
// Create a complex object
val person = Person("John Doe", 30, listOf("Reading", "Hiking"))
// Store it in the cache
cache.set("person1", person)
// Retrieve it
val retrievedPerson = cache.get("person1", Person::class.java)
// Verify it's the same
assertNotNull(retrievedPerson)
assertEquals("John Doe", retrievedPerson.name)
assertEquals(30, retrievedPerson.age)
assertEquals(2, retrievedPerson.hobbies.size)
assertTrue(retrievedPerson.hobbies.contains("Reading"))
assertTrue(retrievedPerson.hobbies.contains("Hiking"))
}
// Test data class
data class Person(
val name: String,
val age: Int,
val hobbies: List<String>
)
}
@@ -3,6 +3,9 @@ plugins {
}
dependencies {
// Apply platform BOM for version management
implementation(platform(projects.platform.platformBom))
implementation(projects.core.coreDomain)
implementation(projects.core.coreUtils)
@@ -0,0 +1,76 @@
package at.mocode.infrastructure.eventstore.api
import at.mocode.core.domain.event.DomainEvent
import java.util.UUID
/**
* Interface for serializing and deserializing domain events.
*/
interface EventSerializer {
/**
* Serializes a domain event to a map of strings to strings.
* This format is suitable for storage in Redis Streams.
*
* @param event The event to serialize
* @return A map of strings to strings representing the event
*/
fun serialize(event: DomainEvent): Map<String, String>
/**
* Deserializes a map of strings to strings to a domain event.
*
* @param data The map of strings to strings to deserialize
* @return The deserialized domain event
*/
fun deserialize(data: Map<String, String>): DomainEvent
/**
* Gets the type of a domain event.
* This is used to determine the type of event when deserializing.
*
* @param event The event to get the type of
* @return The type of the event as a string
*/
fun getEventType(event: DomainEvent): String
/**
* Gets the type of a domain event from a serialized map.
*
* @param data The serialized event data
* @return The type of the event as a string
*/
fun getEventType(data: Map<String, String>): String
/**
* Registers a domain event class with the serializer.
* This is used to map event types to their corresponding classes.
*
* @param eventClass The class of the event to register
* @param eventType The type of the event as a string
*/
fun registerEventType(eventClass: Class<out DomainEvent>, eventType: String)
/**
* Gets the aggregate ID from a serialized event.
*
* @param data The serialized event data
* @return The aggregate ID
*/
fun getAggregateId(data: Map<String, String>): UUID
/**
* Gets the event ID from a serialized event.
*
* @param data The serialized event data
* @return The event ID
*/
fun getEventId(data: Map<String, String>): UUID
/**
* Gets the version from a serialized event.
*
* @param data The serialized event data
* @return The version
*/
fun getVersion(data: Map<String, String>): Long
}
@@ -0,0 +1,99 @@
package at.mocode.infrastructure.eventstore.api
import at.mocode.core.domain.event.DomainEvent
import java.util.UUID
/**
* Interface for an event store that persists domain events.
*/
interface EventStore {
/**
* Appends an event to the event store.
*
* @param event The event to append
* @param streamId The ID of the event stream (typically the aggregate ID)
* @param expectedVersion The expected version of the stream (for optimistic concurrency)
* @return The new version of the stream
* @throws ConcurrencyException if the expected version doesn't match the actual version
*/
fun appendToStream(event: DomainEvent, streamId: UUID, expectedVersion: Long): Long
/**
* Appends multiple events to the event store.
*
* @param events The events to append
* @param streamId The ID of the event stream (typically the aggregate ID)
* @param expectedVersion The expected version of the stream (for optimistic concurrency)
* @return The new version of the stream
* @throws ConcurrencyException if the expected version doesn't match the actual version
*/
fun appendToStream(events: List<DomainEvent>, streamId: UUID, expectedVersion: Long): Long
/**
* Reads events from a stream.
*
* @param streamId The ID of the event stream to read from
* @param fromVersion The version to start reading from (inclusive)
* @param toVersion The version to read to (inclusive), or null to read all events
* @return The events in the stream
*/
fun readFromStream(streamId: UUID, fromVersion: Long = 0, toVersion: Long? = null): List<DomainEvent>
/**
* Reads all events from all streams.
*
* @param fromPosition The position to start reading from (inclusive)
* @param maxCount The maximum number of events to read, or null to read all events
* @return The events in all streams
*/
fun readAllEvents(fromPosition: Long = 0, maxCount: Int? = null): List<DomainEvent>
/**
* Gets the current version of a stream.
*
* @param streamId The ID of the event stream
* @return The current version of the stream, or -1 if the stream doesn't exist
*/
fun getStreamVersion(streamId: UUID): Long
/**
* Subscribes to events from a specific stream.
*
* @param streamId The ID of the event stream to subscribe to
* @param fromVersion The version to start subscribing from (inclusive)
* @param handler The handler to call for each event
* @return A subscription that can be used to unsubscribe
*/
fun subscribeToStream(streamId: UUID, fromVersion: Long = 0, handler: (DomainEvent) -> Unit): Subscription
/**
* Subscribes to all events from all streams.
*
* @param fromPosition The position to start subscribing from (inclusive)
* @param handler The handler to call for each event
* @return A subscription that can be used to unsubscribe
*/
fun subscribeToAll(fromPosition: Long = 0, handler: (DomainEvent) -> Unit): Subscription
}
/**
* Interface for a subscription to an event stream.
*/
interface Subscription {
/**
* Unsubscribes from the event stream.
*/
fun unsubscribe()
/**
* Checks if the subscription is active.
*
* @return true if the subscription is active, false otherwise
*/
fun isActive(): Boolean
}
/**
* Exception thrown when there is a concurrency conflict in the event store.
*/
class ConcurrencyException(message: String) : RuntimeException(message)
@@ -4,13 +4,20 @@ plugins {
}
dependencies {
// Apply platform BOM for version management
implementation(platform(projects.platform.platformBom))
implementation(projects.infrastructure.eventStore.eventStoreApi)
implementation(projects.core.coreDomain)
implementation(projects.core.coreUtils)
implementation("org.springframework.boot:spring-boot-starter-data-redis")
implementation("io.lettuce:lettuce-core")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
implementation("javax.annotation:javax.annotation-api:1.3.2")
testImplementation(projects.platform.platformTesting)
testImplementation("org.testcontainers:testcontainers")
testImplementation("org.testcontainers:junit-jupiter")
}
@@ -0,0 +1,119 @@
package at.mocode.infrastructure.eventstore.redis
import at.mocode.core.domain.event.DomainEvent
import at.mocode.infrastructure.eventstore.api.EventSerializer
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.kotlin.readValue
import org.slf4j.LoggerFactory
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
/**
* Jackson-based implementation of EventSerializer.
*/
class JacksonEventSerializer : EventSerializer {
private val logger = LoggerFactory.getLogger(JacksonEventSerializer::class.java)
private val objectMapper: ObjectMapper = ObjectMapper().apply {
registerModule(KotlinModule.Builder().build())
registerModule(JavaTimeModule())
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
}
// Maps from event type to event class
private val eventTypeToClass = ConcurrentHashMap<String, Class<out DomainEvent>>()
// Maps from event class to event type
private val eventClassToType = ConcurrentHashMap<Class<out DomainEvent>, String>()
// Standard field names in serialized events
companion object {
const val EVENT_TYPE_FIELD = "eventType"
const val EVENT_ID_FIELD = "eventId"
const val AGGREGATE_ID_FIELD = "aggregateId"
const val VERSION_FIELD = "version"
const val TIMESTAMP_FIELD = "timestamp"
const val EVENT_DATA_FIELD = "eventData"
}
override fun serialize(event: DomainEvent): Map<String, String> {
val eventType = getEventType(event)
// Register the event type if not already registered
if (!eventClassToType.containsKey(event.javaClass)) {
registerEventType(event.javaClass, eventType)
}
// Serialize the event data
val eventData = objectMapper.writeValueAsString(event)
// Create a map with the event metadata and data
return mapOf(
EVENT_TYPE_FIELD to eventType,
EVENT_ID_FIELD to event.eventId.toString(),
AGGREGATE_ID_FIELD to event.aggregateId.toString(),
VERSION_FIELD to event.version.toString(),
TIMESTAMP_FIELD to event.timestamp.toString(),
EVENT_DATA_FIELD to eventData
)
}
override fun deserialize(data: Map<String, String>): DomainEvent {
val eventType = getEventType(data)
val eventClass = eventTypeToClass[eventType]
?: throw IllegalArgumentException("Unknown event type: $eventType")
val eventData = data[EVENT_DATA_FIELD]
?: throw IllegalArgumentException("Event data is missing")
return objectMapper.readValue(eventData, eventClass)
}
override fun getEventType(event: DomainEvent): String {
// Use the registered type if available
val registeredType = eventClassToType[event.javaClass]
if (registeredType != null) {
return registeredType
}
// Otherwise, use the simple class name
val type = event.javaClass.simpleName
registerEventType(event.javaClass, type)
return type
}
override fun getEventType(data: Map<String, String>): String {
return data[EVENT_TYPE_FIELD]
?: throw IllegalArgumentException("Event type is missing")
}
override fun registerEventType(eventClass: Class<out DomainEvent>, eventType: String) {
eventTypeToClass[eventType] = eventClass
eventClassToType[eventClass] = eventType
logger.debug("Registered event type: $eventType for class: ${eventClass.name}")
}
override fun getAggregateId(data: Map<String, String>): UUID {
val aggregateIdStr = data[AGGREGATE_ID_FIELD]
?: throw IllegalArgumentException("Aggregate ID is missing")
return UUID.fromString(aggregateIdStr)
}
override fun getEventId(data: Map<String, String>): UUID {
val eventIdStr = data[EVENT_ID_FIELD]
?: throw IllegalArgumentException("Event ID is missing")
return UUID.fromString(eventIdStr)
}
override fun getVersion(data: Map<String, String>): Long {
val versionStr = data[VERSION_FIELD]
?: throw IllegalArgumentException("Version is missing")
return versionStr.toLong()
}
}
@@ -0,0 +1,314 @@
package at.mocode.infrastructure.eventstore.redis
import at.mocode.core.domain.event.DomainEvent
import at.mocode.infrastructure.eventstore.api.EventSerializer
import org.slf4j.LoggerFactory
import org.springframework.data.domain.Range
import org.springframework.data.redis.connection.stream.Consumer
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.connection.stream.ReadOffset
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.connection.stream.StreamReadOptions
import org.springframework.data.redis.core.StringRedisTemplate
import org.springframework.scheduling.annotation.Scheduled
import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy
/**
* Consumer for Redis Streams that processes events using consumer groups.
*/
class RedisEventConsumer(
private val redisTemplate: StringRedisTemplate,
private val serializer: EventSerializer,
private val properties: RedisEventStoreProperties
) {
private val logger = LoggerFactory.getLogger(RedisEventConsumer::class.java)
// Event handlers registered for specific event types
private val eventTypeHandlers = ConcurrentHashMap<String, CopyOnWriteArrayList<(DomainEvent) -> Unit>>()
// Event handlers registered for all events
private val allEventHandlers = CopyOnWriteArrayList<(DomainEvent) -> Unit>()
// Flag to indicate if the consumer is running
private var running = false
/**
* Initializes the consumer.
*/
@PostConstruct
fun init() {
if (properties.createConsumerGroupIfNotExists) {
createConsumerGroupsIfNotExist()
}
}
/**
* Stops the consumer.
*/
@PreDestroy
fun shutdown() {
running = false
}
/**
* Registers a handler for a specific event type.
*
* @param eventType The type of event to handle
* @param handler The handler to call when an event of the specified type is received
*/
fun registerEventHandler(eventType: String, handler: (DomainEvent) -> Unit) {
eventTypeHandlers.computeIfAbsent(eventType) { CopyOnWriteArrayList() }.add(handler)
logger.debug("Registered handler for event type: $eventType")
}
/**
* Registers a handler for all events.
*
* @param handler The handler to call when any event is received
*/
fun registerAllEventsHandler(handler: (DomainEvent) -> Unit) {
allEventHandlers.add(handler)
logger.debug("Registered handler for all events")
}
/**
* Unregisters a handler for a specific event type.
*
* @param eventType The type of event
* @param handler The handler to unregister
*/
fun unregisterEventHandler(eventType: String, handler: (DomainEvent) -> Unit) {
eventTypeHandlers[eventType]?.remove(handler)
logger.debug("Unregistered handler for event type: $eventType")
}
/**
* Unregisters a handler for all events.
*
* @param handler The handler to unregister
*/
fun unregisterAllEventsHandler(handler: (DomainEvent) -> Unit) {
allEventHandlers.remove(handler)
logger.debug("Unregistered handler for all events")
}
/**
* Creates consumer groups for all streams if they don't exist.
*/
private fun createConsumerGroupsIfNotExist() {
try {
// Create consumer group for the all events stream
val allEventsStreamKey = getAllEventsStreamKey()
createConsumerGroupIfNotExists(allEventsStreamKey)
// Get all stream keys
val streamKeys = redisTemplate.keys("${properties.streamPrefix}*")
// Create consumer groups for all streams
for (streamKey in streamKeys) {
if (streamKey != allEventsStreamKey) {
createConsumerGroupIfNotExists(streamKey)
}
}
} catch (e: Exception) {
logger.error("Error creating consumer groups: ${e.message}", e)
}
}
/**
* Creates a consumer group for a stream if it doesn't exist.
*
* @param streamKey The key of the stream
*/
private fun createConsumerGroupIfNotExists(streamKey: String) {
try {
// Check if the stream exists
if (!redisTemplate.hasKey(streamKey)) {
// Create the stream with an empty message
redisTemplate.opsForStream<String, String>()
.add(streamKey, mapOf("init" to "init"))
logger.debug("Created stream: $streamKey")
}
// Create the consumer group
redisTemplate.opsForStream<String, String>()
.createGroup(streamKey, properties.consumerGroup)
logger.debug("Created consumer group ${properties.consumerGroup} for stream: $streamKey")
} catch (e: Exception) {
// Ignore if the consumer group already exists
val message = e.message
if (message == null || !message.contains("BUSYGROUP")) {
logger.error("Error creating consumer group for stream $streamKey: ${e.message}", e)
}
}
}
/**
* Periodically polls for new events from all streams.
*/
@Scheduled(fixedDelayString = "\${redis.event-store.poll-interval:100}")
fun pollEvents() {
if (!running) {
running = true
}
try {
// Poll the all events stream
pollStream(getAllEventsStreamKey())
// Poll individual streams
val streamKeys = redisTemplate.keys("${properties.streamPrefix}*")
for (streamKey in streamKeys) {
if (streamKey != getAllEventsStreamKey()) {
pollStream(streamKey)
}
}
// Claim pending messages that have been idle for too long
claimPendingMessages()
} catch (e: Exception) {
logger.error("Error polling events: ${e.message}", e)
}
}
/**
* Polls a stream for new events.
*
* @param streamKey The key of the stream to poll
*/
private fun pollStream(streamKey: String) {
try {
// Read new messages from the stream
val options = StreamReadOptions.empty()
.count(properties.maxBatchSize.toLong())
.block(properties.pollTimeout)
val records = redisTemplate.opsForStream<String, String>()
.read(
Consumer.from(properties.consumerGroup, properties.consumerName),
options,
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
)
// Process the records
if (records != null) {
for (record in records) {
processRecord(record)
}
}
} catch (e: Exception) {
// Ignore if the stream doesn't exist or the consumer group doesn't exist
val message = e.message
if (message == null || !message.contains("NOGROUP")) {
logger.error("Error polling stream $streamKey: ${e.message}", e)
}
}
}
/**
* Claims pending messages that have been idle for too long.
*/
private fun claimPendingMessages() {
try {
// Get all stream keys
val streamKeys = redisTemplate.keys("${properties.streamPrefix}*")
for (streamKey in streamKeys) {
// Get pending messages summary
val pendingSummary = redisTemplate.opsForStream<String, String>()
.pending(streamKey, properties.consumerGroup)
if (pendingSummary != null && pendingSummary.totalPendingMessages > 0) {
// Get pending messages with details
val pendingMessages = redisTemplate.opsForStream<String, String>()
.pending(
streamKey,
Consumer.from(properties.consumerGroup, properties.consumerName),
Range.unbounded<String>(),
properties.maxBatchSize.toLong()
)
if (pendingMessages.size() > 0) {
// Extract message IDs and convert to array
val messageIdsList = pendingMessages.map { it.id }.toList()
if (messageIdsList.isNotEmpty()) {
// Convert to array for the spread operator
val messageIds = messageIdsList.toTypedArray()
// Claim messages that have been idle for too long
val records = redisTemplate.opsForStream<String, String>()
.claim(
streamKey,
properties.consumerGroup,
properties.consumerName,
properties.claimIdleTimeout,
*messageIds
)
// Process the claimed records
for (record in records) {
processRecord(record)
}
}
}
}
}
} catch (e: Exception) {
logger.error("Error claiming pending messages: ${e.message}", e)
}
}
/**
* Processes a record from a stream.
*
* @param record The record to process
*/
private fun processRecord(record: MapRecord<String, String, String>) {
try {
val data = record.value
val event = serializer.deserialize(data)
val eventType = serializer.getEventType(data)
// Call handlers for the specific event type
eventTypeHandlers[eventType]?.forEach { handler ->
try {
handler(event)
} catch (e: Exception) {
logger.error("Error handling event of type $eventType: ${e.message}", e)
}
}
// Call handlers for all events
allEventHandlers.forEach { handler ->
try {
handler(event)
} catch (e: Exception) {
logger.error("Error handling event: ${e.message}", e)
}
}
// Acknowledge the message
redisTemplate.opsForStream<String, String>()
.acknowledge(properties.consumerGroup, record)
} catch (e: Exception) {
logger.error("Error processing record: ${e.message}", e)
}
}
/**
* Gets the Redis key for the all events stream.
*
* @return The Redis key for the all events stream
*/
private fun getAllEventsStreamKey(): String {
return "${properties.streamPrefix}${properties.allEventsStream}"
}
}
@@ -0,0 +1,336 @@
package at.mocode.infrastructure.eventstore.redis
import at.mocode.core.domain.event.DomainEvent
import at.mocode.infrastructure.eventstore.api.ConcurrencyException
import at.mocode.infrastructure.eventstore.api.EventSerializer
import at.mocode.infrastructure.eventstore.api.EventStore
import at.mocode.infrastructure.eventstore.api.Subscription
import org.slf4j.LoggerFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.connection.stream.ObjectRecord
import org.springframework.data.redis.connection.stream.ReadOffset
import org.springframework.data.redis.connection.stream.Record
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.connection.stream.StreamReadOptions
import org.springframework.data.redis.core.StringRedisTemplate
import org.springframework.data.redis.stream.StreamListener
import org.springframework.data.redis.stream.StreamMessageListenerContainer
import org.springframework.data.redis.stream.Subscription as RedisSubscription
import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
/**
* Redis Streams implementation of EventStore.
*/
class RedisEventStore(
private val redisTemplate: StringRedisTemplate,
private val serializer: EventSerializer,
private val properties: RedisEventStoreProperties
) : EventStore {
private val logger = LoggerFactory.getLogger(RedisEventStore::class.java)
// Cache of stream versions to avoid reading from Redis for every append
private val streamVersionCache = ConcurrentHashMap<UUID, Long>()
// Active subscriptions
private val subscriptions = ConcurrentHashMap<UUID, RedisSubscription>()
// Listener containers for subscriptions
private val listenerContainers = ConcurrentHashMap<UUID, StreamMessageListenerContainer<String, MapRecord<String, String, String>>>()
override fun appendToStream(event: DomainEvent, streamId: UUID, expectedVersion: Long): Long {
return appendToStream(listOf(event), streamId, expectedVersion)
}
override fun appendToStream(events: List<DomainEvent>, streamId: UUID, expectedVersion: Long): Long {
if (events.isEmpty()) {
return expectedVersion
}
// Check if all events belong to the same aggregate
val aggregateId = events.first().aggregateId
if (events.any { it.aggregateId != aggregateId }) {
throw IllegalArgumentException("All events must belong to the same aggregate")
}
// Check if the stream ID matches the aggregate ID
if (streamId != aggregateId) {
throw IllegalArgumentException("Stream ID must match aggregate ID")
}
// Get the current version of the stream
val currentVersion = getStreamVersion(streamId)
// Check for concurrency conflicts
if (expectedVersion != currentVersion) {
throw ConcurrencyException(
"Concurrency conflict: expected version $expectedVersion but got $currentVersion"
)
}
// Append events to the stream
var newVersion = currentVersion
val streamKey = getStreamKey(streamId)
for (event in events) {
newVersion++
// Ensure the event has the correct version
if (event.version != newVersion) {
throw IllegalArgumentException(
"Event version ${event.version} does not match expected version $newVersion"
)
}
// Serialize the event
val eventData = serializer.serialize(event)
// Append to the stream
val result = redisTemplate.opsForStream<String, String>()
.add(streamKey, eventData)
logger.debug("Appended event ${event.eventId} to stream $streamId with ID $result")
// Also append to the all events stream
val allEventsStreamKey = getAllEventsStreamKey()
redisTemplate.opsForStream<String, String>()
.add(allEventsStreamKey, eventData)
}
// Update the version cache
streamVersionCache[streamId] = newVersion
return newVersion
}
override fun readFromStream(streamId: UUID, fromVersion: Long, toVersion: Long?): List<DomainEvent> {
val streamKey = getStreamKey(streamId)
// Check if the stream exists
if (!redisTemplate.hasKey(streamKey)) {
return emptyList()
}
// Calculate the range of events to read
val startOffset = if (fromVersion <= 0) ReadOffset.from("0") else ReadOffset.from("$fromVersion")
val endOffset = toVersion?.let { "=$it" } ?: "+"
// Read events from the stream
val options = StreamReadOptions.empty()
.count(toVersion?.let { (it - fromVersion + 1).toLong() } ?: Long.MAX_VALUE)
val records = redisTemplate.opsForStream<String, String>()
.read(options, StreamOffset.create(streamKey, startOffset))
// Deserialize events
return records?.mapNotNull { record ->
try {
val data = record.value
serializer.deserialize(data)
} catch (e: Exception) {
logger.error("Error deserializing event from stream $streamId: ${e.message}", e)
null
}
} ?: emptyList()
}
override fun readAllEvents(fromPosition: Long, maxCount: Int?): List<DomainEvent> {
val streamKey = getAllEventsStreamKey()
// Check if the stream exists
if (!redisTemplate.hasKey(streamKey)) {
return emptyList()
}
// Calculate the range of events to read
val startOffset = if (fromPosition <= 0) ReadOffset.from("0") else ReadOffset.from("$fromPosition")
// Read events from the stream
val options = StreamReadOptions.empty()
.count(maxCount?.toLong() ?: Long.MAX_VALUE)
val records = redisTemplate.opsForStream<String, String>()
.read(options, StreamOffset.create(streamKey, startOffset))
// Deserialize events
return records?.mapNotNull { record ->
try {
val data = record.value
serializer.deserialize(data)
} catch (e: Exception) {
logger.error("Error deserializing event from all events stream: ${e.message}", e)
null
}
} ?: emptyList()
}
override fun getStreamVersion(streamId: UUID): Long {
// Check the cache first
val cachedVersion = streamVersionCache[streamId]
if (cachedVersion != null) {
return cachedVersion
}
val streamKey = getStreamKey(streamId)
// Check if the stream exists
if (!redisTemplate.hasKey(streamKey)) {
return -1
}
// Get the last event from the stream
val options = StreamReadOptions.empty().count(1)
val records = redisTemplate.opsForStream<String, String>()
.read(options, StreamOffset.create(streamKey, ReadOffset.latest()))
if (records == null || records.isEmpty()) {
return -1
}
// Get the version from the last event
val lastEvent = records.first()
val version = serializer.getVersion(lastEvent.value)
// Update the cache
streamVersionCache[streamId] = version
return version
}
override fun subscribeToStream(
streamId: UUID,
fromVersion: Long,
handler: (DomainEvent) -> Unit
): Subscription {
val streamKey = getStreamKey(streamId)
// Create a unique ID for this subscription
val subscriptionId = UUID.randomUUID()
// Create a listener for the stream
val listener = StreamListener<String, MapRecord<String, String, String>> { record ->
try {
val data = record.value
val event = serializer.deserialize(data)
handler(event)
} catch (e: Exception) {
logger.error("Error handling event from stream $streamId: ${e.message}", e)
}
}
// Create a listener container
val container = StreamMessageListenerContainer
.create(redisTemplate.connectionFactory!!)
// Start from the specified version
val readOffset = if (fromVersion <= 0) ReadOffset.latest() else ReadOffset.from("$fromVersion")
// Create a subscription
val subscription = container.receive(
StreamOffset.create(streamKey, readOffset),
listener
)
// Start the container
container.start()
// Store the subscription and container
subscriptions[subscriptionId] = subscription
listenerContainers[subscriptionId] = container
// Return a subscription object
return object : Subscription {
private val active = AtomicBoolean(true)
override fun unsubscribe() {
if (active.compareAndSet(true, false)) {
subscription.cancel()
container.stop()
subscriptions.remove(subscriptionId)
listenerContainers.remove(subscriptionId)
}
}
override fun isActive(): Boolean {
return active.get()
}
}
}
override fun subscribeToAll(fromPosition: Long, handler: (DomainEvent) -> Unit): Subscription {
val streamKey = getAllEventsStreamKey()
// Create a unique ID for this subscription
val subscriptionId = UUID.randomUUID()
// Create a listener for the stream
val listener = StreamListener<String, MapRecord<String, String, String>> { record ->
try {
val data = record.value
val event = serializer.deserialize(data)
handler(event)
} catch (e: Exception) {
logger.error("Error handling event from all events stream: ${e.message}", e)
}
}
// Create a listener container
val container = StreamMessageListenerContainer
.create(redisTemplate.connectionFactory!!)
// Start from the specified position
val readOffset = if (fromPosition <= 0) ReadOffset.latest() else ReadOffset.from("$fromPosition")
// Create a subscription
val subscription = container.receive(
StreamOffset.create(streamKey, readOffset),
listener
)
// Start the container
container.start()
// Store the subscription and container
subscriptions[subscriptionId] = subscription
listenerContainers[subscriptionId] = container
// Return a subscription object
return object : Subscription {
private val active = AtomicBoolean(true)
override fun unsubscribe() {
if (active.compareAndSet(true, false)) {
subscription.cancel()
container.stop()
subscriptions.remove(subscriptionId)
listenerContainers.remove(subscriptionId)
}
}
override fun isActive(): Boolean {
return active.get()
}
}
}
/**
* Gets the Redis key for a stream.
*
* @param streamId The ID of the stream
* @return The Redis key for the stream
*/
private fun getStreamKey(streamId: UUID): String {
return "${properties.streamPrefix}$streamId"
}
/**
* Gets the Redis key for the all events stream.
*
* @return The Redis key for the all events stream
*/
private fun getAllEventsStreamKey(): String {
return "${properties.streamPrefix}${properties.allEventsStream}"
}
}
@@ -0,0 +1,136 @@
package at.mocode.infrastructure.eventstore.redis
import at.mocode.infrastructure.eventstore.api.EventSerializer
import at.mocode.infrastructure.eventstore.api.EventStore
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.RedisConnectionFactory
import org.springframework.data.redis.connection.RedisPassword
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.StringRedisTemplate
import java.time.Duration
/**
* Redis event store properties.
*/
@ConfigurationProperties(prefix = "redis.event-store")
data class RedisEventStoreProperties(
val host: String = "localhost",
val port: Int = 6379,
val password: String? = null,
val database: Int = 0,
val connectionTimeout: Long = 2000,
val readTimeout: Long = 2000,
val usePooling: Boolean = true,
val maxPoolSize: Int = 8,
val minPoolSize: Int = 2,
val consumerGroup: String = "event-processors",
val consumerName: String = "event-consumer",
val streamPrefix: String = "event-stream:",
val allEventsStream: String = "all-events",
val claimIdleTimeout: Duration = Duration.ofMinutes(1),
val pollTimeout: Duration = Duration.ofMillis(100),
val maxBatchSize: Int = 100,
val createConsumerGroupIfNotExists: Boolean = true
)
/**
* Spring configuration for Redis event store.
*/
@Configuration
@EnableConfigurationProperties(RedisEventStoreProperties::class)
class RedisEventStoreConfiguration {
/**
* Creates a Redis connection factory for the event store.
*
* @param properties Redis event store properties
* @return Redis connection factory
*/
@Bean
@ConditionalOnMissingBean(name = ["eventStoreRedisConnectionFactory"])
fun eventStoreRedisConnectionFactory(properties: RedisEventStoreProperties): RedisConnectionFactory {
val config = RedisStandaloneConfiguration().apply {
hostName = properties.host
port = properties.port
properties.password?.let { password = RedisPassword.of(it) }
database = properties.database
}
return LettuceConnectionFactory(config).apply {
// Configure connection timeouts
afterPropertiesSet()
}
}
/**
* Creates a Redis template for the event store.
*
* @param connectionFactory Redis connection factory
* @return Redis template
*/
@Bean
@ConditionalOnMissingBean(name = ["eventStoreRedisTemplate"])
fun eventStoreRedisTemplate(
@org.springframework.beans.factory.annotation.Qualifier("eventStoreRedisConnectionFactory")
connectionFactory: RedisConnectionFactory
): StringRedisTemplate {
return StringRedisTemplate().apply {
setConnectionFactory(connectionFactory)
afterPropertiesSet()
}
}
/**
* Creates an event serializer.
*
* @return Event serializer
*/
@Bean
@ConditionalOnMissingBean
fun eventSerializer(): EventSerializer {
return JacksonEventSerializer()
}
/**
* Creates a Redis event store.
*
* @param redisTemplate Redis template
* @param eventSerializer Event serializer
* @param properties Redis event store properties
* @return Event store
*/
@Bean
@ConditionalOnMissingBean
fun eventStore(
@org.springframework.beans.factory.annotation.Qualifier("eventStoreRedisTemplate")
redisTemplate: StringRedisTemplate,
eventSerializer: EventSerializer,
properties: RedisEventStoreProperties
): EventStore {
return RedisEventStore(redisTemplate, eventSerializer, properties)
}
/**
* Creates a Redis event consumer.
*
* @param redisTemplate Redis template
* @param eventSerializer Event serializer
* @param properties Redis event store properties
* @return Event consumer
*/
@Bean
@ConditionalOnMissingBean
fun eventConsumer(
@org.springframework.beans.factory.annotation.Qualifier("eventStoreRedisTemplate")
redisTemplate: StringRedisTemplate,
eventSerializer: EventSerializer,
properties: RedisEventStoreProperties
): RedisEventConsumer {
return RedisEventConsumer(redisTemplate, eventSerializer, properties)
}
}
@@ -0,0 +1,296 @@
package at.mocode.infrastructure.eventstore.redis
import at.mocode.core.domain.event.BaseDomainEvent
import at.mocode.core.domain.event.DomainEvent
import at.mocode.infrastructure.eventstore.api.EventSerializer
import at.mocode.infrastructure.eventstore.api.EventStore
import at.mocode.infrastructure.eventstore.api.Subscription
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.StringRedisTemplate
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.time.Duration
import java.time.Instant
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertTrue
/**
* Integration tests for Redis Event Store.
*
* These tests verify the interaction between the Redis Event Store, Event Consumer, and Event Serializer
* in a more realistic scenario.
*/
@Testcontainers
class RedisEventStoreIntegrationTest {
companion object {
@Container
val redisContainer = GenericContainer(DockerImageName.parse("redis:7-alpine"))
.withExposedPorts(6379)
}
private lateinit var redisTemplate: StringRedisTemplate
private lateinit var serializer: EventSerializer
private lateinit var properties: RedisEventStoreProperties
private lateinit var eventStore: EventStore
private lateinit var eventConsumer: RedisEventConsumer
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = StringRedisTemplate()
redisTemplate.setConnectionFactory(connectionFactory)
redisTemplate.afterPropertiesSet()
serializer = JacksonEventSerializer()
// Register test event types
serializer.registerEventType(TestCreatedEvent::class.java, "TestCreated")
serializer.registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
properties = RedisEventStoreProperties(
host = redisHost,
port = redisPort,
streamPrefix = "test-stream:",
allEventsStream = "all-events",
consumerGroup = "test-group",
consumerName = "test-consumer",
createConsumerGroupIfNotExists = true
)
eventStore = RedisEventStore(redisTemplate, serializer, properties)
eventConsumer = RedisEventConsumer(redisTemplate, serializer, properties)
// Clear all streams
val keys = redisTemplate.keys("${properties.streamPrefix}*")
if (keys != null && keys.isNotEmpty()) {
redisTemplate.delete(keys)
}
}
@AfterEach
fun tearDown() {
// Clear all streams
val keys = redisTemplate.keys("${properties.streamPrefix}*")
if (keys != null && keys.isNotEmpty()) {
redisTemplate.delete(keys)
}
}
@Test
fun `test event publishing and consuming with consumer groups`() {
// Create an aggregate ID
val aggregateId = UUID.randomUUID()
// Create events
val event1 = TestCreatedEvent(
aggregateId = aggregateId,
version = 1,
name = "Test Entity"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregateId,
version = 2,
name = "Updated Test Entity"
)
// Set up a latch to wait for events
val latch = CountDownLatch(2)
val receivedEvents = mutableListOf<DomainEvent>()
// Register a handler for TestCreatedEvent
eventConsumer.registerEventHandler("TestCreated") { event ->
receivedEvents.add(event)
latch.countDown()
}
// Register a handler for TestUpdatedEvent
eventConsumer.registerEventHandler("TestUpdated") { event ->
receivedEvents.add(event)
latch.countDown()
}
// Initialize the consumer
eventConsumer.init()
// Append events to the stream
eventStore.appendToStream(event1, aggregateId, -1)
eventStore.appendToStream(event2, aggregateId, 1)
// Manually trigger event polling
eventConsumer.pollEvents()
// Wait for events to be processed
assertTrue(latch.await(5, TimeUnit.SECONDS), "Timed out waiting for events")
// Verify that both events were received
assertEquals(2, receivedEvents.size)
// Verify the first event
val receivedEvent1 = receivedEvents[0] as TestCreatedEvent
assertEquals(aggregateId, receivedEvent1.aggregateId)
assertEquals(1, receivedEvent1.version)
assertEquals("Test Entity", receivedEvent1.name)
// Verify the second event
val receivedEvent2 = receivedEvents[1] as TestUpdatedEvent
assertEquals(aggregateId, receivedEvent2.aggregateId)
assertEquals(2, receivedEvent2.version)
assertEquals("Updated Test Entity", receivedEvent2.name)
// Clean up
eventConsumer.shutdown()
}
@Test
fun `test event subscription and publishing`() {
// Create an aggregate ID
val aggregateId = UUID.randomUUID()
// Set up a latch to wait for events
val latch = CountDownLatch(2)
val receivedEvents = mutableListOf<DomainEvent>()
// Subscribe to the stream
val subscription = eventStore.subscribeToStream(aggregateId) { event ->
receivedEvents.add(event)
latch.countDown()
}
// Create events
val event1 = TestCreatedEvent(
aggregateId = aggregateId,
version = 1,
name = "Test Entity"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregateId,
version = 2,
name = "Updated Test Entity"
)
// Append events to the stream
eventStore.appendToStream(event1, aggregateId, -1)
eventStore.appendToStream(event2, aggregateId, 1)
// Wait for events to be received
assertTrue(latch.await(5, TimeUnit.SECONDS), "Timed out waiting for events")
// Verify that both events were received
assertEquals(2, receivedEvents.size)
// Verify the first event
val receivedEvent1 = receivedEvents[0] as TestCreatedEvent
assertEquals(aggregateId, receivedEvent1.aggregateId)
assertEquals(1, receivedEvent1.version)
assertEquals("Test Entity", receivedEvent1.name)
// Verify the second event
val receivedEvent2 = receivedEvents[1] as TestUpdatedEvent
assertEquals(aggregateId, receivedEvent2.aggregateId)
assertEquals(2, receivedEvent2.version)
assertEquals("Updated Test Entity", receivedEvent2.name)
// Clean up
subscription.unsubscribe()
}
@Test
fun `test multiple consumers with consumer groups`() {
// Create an aggregate ID
val aggregateId = UUID.randomUUID()
// Create events
val event1 = TestCreatedEvent(
aggregateId = aggregateId,
version = 1,
name = "Test Entity"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregateId,
version = 2,
name = "Updated Test Entity"
)
// Set up latches to wait for events
val latch1 = CountDownLatch(2)
val latch2 = CountDownLatch(2)
val receivedEvents1 = mutableListOf<DomainEvent>()
val receivedEvents2 = mutableListOf<DomainEvent>()
// Create a second consumer with a different consumer name
val properties2 = properties.copy(consumerName = "test-consumer-2")
val eventConsumer2 = RedisEventConsumer(redisTemplate, serializer, properties2)
// Register handlers for the first consumer
eventConsumer.registerAllEventsHandler { event ->
receivedEvents1.add(event)
latch1.countDown()
}
// Register handlers for the second consumer
eventConsumer2.registerAllEventsHandler { event ->
receivedEvents2.add(event)
latch2.countDown()
}
// Initialize the consumers
eventConsumer.init()
eventConsumer2.init()
// Append events to the stream
eventStore.appendToStream(event1, aggregateId, -1)
eventStore.appendToStream(event2, aggregateId, 1)
// Manually trigger event polling
eventConsumer.pollEvents()
eventConsumer2.pollEvents()
// Wait for events to be processed by both consumers
assertTrue(latch1.await(5, TimeUnit.SECONDS), "Timed out waiting for events on consumer 1")
assertTrue(latch2.await(5, TimeUnit.SECONDS), "Timed out waiting for events on consumer 2")
// Verify that both consumers received both events
assertEquals(2, receivedEvents1.size)
assertEquals(2, receivedEvents2.size)
// Clean up
eventConsumer.shutdown()
eventConsumer2.shutdown()
}
// Test event classes
class TestCreatedEvent(
override val eventId: UUID = UUID.randomUUID(),
override val timestamp: Instant = Instant.now(),
override val aggregateId: UUID,
override val version: Long,
val name: String
) : BaseDomainEvent(eventId, timestamp, aggregateId, version)
class TestUpdatedEvent(
override val eventId: UUID = UUID.randomUUID(),
override val timestamp: Instant = Instant.now(),
override val aggregateId: UUID,
override val version: Long,
val name: String
) : BaseDomainEvent(eventId, timestamp, aggregateId, version)
}
@@ -0,0 +1,317 @@
package at.mocode.infrastructure.eventstore.redis
import at.mocode.core.domain.event.BaseDomainEvent
import at.mocode.core.domain.event.DomainEvent
import at.mocode.infrastructure.eventstore.api.ConcurrencyException
import at.mocode.infrastructure.eventstore.api.EventSerializer
import at.mocode.infrastructure.eventstore.api.Subscription
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.StringRedisTemplate
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.time.Instant
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
@Testcontainers
class RedisEventStoreTest {
companion object {
@Container
val redisContainer = GenericContainer(DockerImageName.parse("redis:7-alpine"))
.withExposedPorts(6379)
}
private lateinit var redisTemplate: StringRedisTemplate
private lateinit var serializer: EventSerializer
private lateinit var properties: RedisEventStoreProperties
private lateinit var eventStore: RedisEventStore
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = StringRedisTemplate()
redisTemplate.setConnectionFactory(connectionFactory)
redisTemplate.afterPropertiesSet()
serializer = JacksonEventSerializer()
// Register test event types
serializer.registerEventType(TestCreatedEvent::class.java, "TestCreated")
serializer.registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
properties = RedisEventStoreProperties(
host = redisHost,
port = redisPort,
streamPrefix = "test-stream:",
allEventsStream = "all-events"
)
eventStore = RedisEventStore(redisTemplate, serializer, properties)
// Clear all streams
val keys = redisTemplate.keys("${properties.streamPrefix}*")
if (keys != null && keys.isNotEmpty()) {
redisTemplate.delete(keys)
}
}
@AfterEach
fun tearDown() {
// Clear all streams
val keys = redisTemplate.keys("${properties.streamPrefix}*")
if (keys != null && keys.isNotEmpty()) {
redisTemplate.delete(keys)
}
}
@Test
fun `test append and read events`() {
val aggregateId = UUID.randomUUID()
// Create events
val event1 = TestCreatedEvent(
aggregateId = aggregateId,
version = 1,
name = "Test Entity"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregateId,
version = 2,
name = "Updated Test Entity"
)
// Append events
val version1 = eventStore.appendToStream(event1, aggregateId, -1)
assertEquals(1, version1)
val version2 = eventStore.appendToStream(event2, aggregateId, 1)
assertEquals(2, version2)
// Read events
val events = eventStore.readFromStream(aggregateId)
assertEquals(2, events.size)
val firstEvent = events[0] as TestCreatedEvent
assertEquals(aggregateId, firstEvent.aggregateId)
assertEquals(1, firstEvent.version)
assertEquals("Test Entity", firstEvent.name)
val secondEvent = events[1] as TestUpdatedEvent
assertEquals(aggregateId, secondEvent.aggregateId)
assertEquals(2, secondEvent.version)
assertEquals("Updated Test Entity", secondEvent.name)
}
@Test
fun `test append events with concurrency conflict`() {
val aggregateId = UUID.randomUUID()
// Create events
val event1 = TestCreatedEvent(
aggregateId = aggregateId,
version = 1,
name = "Test Entity"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregateId,
version = 2,
name = "Updated Test Entity"
)
// Append first event
val version1 = eventStore.appendToStream(event1, aggregateId, -1)
assertEquals(1, version1)
// Try to append second event with wrong expected version
assertThrows<ConcurrencyException> {
eventStore.appendToStream(event2, aggregateId, 0)
}
// Append second event with correct expected version
val version2 = eventStore.appendToStream(event2, aggregateId, 1)
assertEquals(2, version2)
}
@Test
fun `test append multiple events at once`() {
val aggregateId = UUID.randomUUID()
// Create events
val event1 = TestCreatedEvent(
aggregateId = aggregateId,
version = 1,
name = "Test Entity"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregateId,
version = 2,
name = "Updated Test Entity"
)
// Append events
val version = eventStore.appendToStream(listOf(event1, event2), aggregateId, -1)
assertEquals(2, version)
// Read events
val events = eventStore.readFromStream(aggregateId)
assertEquals(2, events.size)
}
@Test
fun `test read all events`() {
val aggregate1Id = UUID.randomUUID()
val aggregate2Id = UUID.randomUUID()
// Create events for first aggregate
val event1 = TestCreatedEvent(
aggregateId = aggregate1Id,
version = 1,
name = "Test Entity 1"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregate1Id,
version = 2,
name = "Updated Test Entity 1"
)
// Create events for second aggregate
val event3 = TestCreatedEvent(
aggregateId = aggregate2Id,
version = 1,
name = "Test Entity 2"
)
// Append events
eventStore.appendToStream(event1, aggregate1Id, -1)
eventStore.appendToStream(event2, aggregate1Id, 1)
eventStore.appendToStream(event3, aggregate2Id, -1)
// Read all events
val allEvents = eventStore.readAllEvents()
assertEquals(3, allEvents.size)
}
@Test
fun `test subscribe to stream`() {
val aggregateId = UUID.randomUUID()
val latch = CountDownLatch(2)
val receivedEvents = mutableListOf<DomainEvent>()
// Subscribe to stream
val subscription = eventStore.subscribeToStream(aggregateId) { event ->
receivedEvents.add(event)
latch.countDown()
}
// Create events
val event1 = TestCreatedEvent(
aggregateId = aggregateId,
version = 1,
name = "Test Entity"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregateId,
version = 2,
name = "Updated Test Entity"
)
// Append events
eventStore.appendToStream(event1, aggregateId, -1)
eventStore.appendToStream(event2, aggregateId, 1)
// Wait for events to be received
assertTrue(latch.await(5, TimeUnit.SECONDS))
assertEquals(2, receivedEvents.size)
// Unsubscribe
subscription.unsubscribe()
assertFalse(subscription.isActive())
}
@Test
fun `test subscribe to all events`() {
val aggregate1Id = UUID.randomUUID()
val aggregate2Id = UUID.randomUUID()
val latch = CountDownLatch(3)
val receivedEvents = mutableListOf<DomainEvent>()
// Subscribe to all events
val subscription = eventStore.subscribeToAll { event ->
receivedEvents.add(event)
latch.countDown()
}
// Create events for first aggregate
val event1 = TestCreatedEvent(
aggregateId = aggregate1Id,
version = 1,
name = "Test Entity 1"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregate1Id,
version = 2,
name = "Updated Test Entity 1"
)
// Create events for second aggregate
val event3 = TestCreatedEvent(
aggregateId = aggregate2Id,
version = 1,
name = "Test Entity 2"
)
// Append events
eventStore.appendToStream(event1, aggregate1Id, -1)
eventStore.appendToStream(event2, aggregate1Id, 1)
eventStore.appendToStream(event3, aggregate2Id, -1)
// Wait for events to be received
assertTrue(latch.await(5, TimeUnit.SECONDS))
assertEquals(3, receivedEvents.size)
// Unsubscribe
subscription.unsubscribe()
assertFalse(subscription.isActive())
}
// Test event classes
class TestCreatedEvent(
override val eventId: UUID = UUID.randomUUID(),
override val timestamp: Instant = Instant.now(),
override val aggregateId: UUID,
override val version: Long,
val name: String
) : BaseDomainEvent(eventId, timestamp, aggregateId, version)
class TestUpdatedEvent(
override val eventId: UUID = UUID.randomUUID(),
override val timestamp: Instant = Instant.now(),
override val aggregateId: UUID,
override val version: Long,
val name: String
) : BaseDomainEvent(eventId, timestamp, aggregateId, version)
}
@@ -0,0 +1,241 @@
package at.mocode.infrastructure.eventstore.redis
import at.mocode.core.domain.event.BaseDomainEvent
import at.mocode.core.domain.event.DomainEvent
import at.mocode.infrastructure.eventstore.api.EventSerializer
import at.mocode.infrastructure.eventstore.api.EventStore
import at.mocode.infrastructure.eventstore.api.Subscription
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.StringRedisTemplate
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.time.Instant
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertTrue
/**
* Integration tests for Redis Event Store and Event Consumer.
*
* These tests verify the interaction between the Redis Event Store, Event Consumer, and Event Serializer
* in a more realistic scenario.
*/
@Testcontainers
class RedisIntegrationTest {
companion object {
@Container
val redisContainer = GenericContainer(DockerImageName.parse("redis:7-alpine"))
.withExposedPorts(6379)
}
private lateinit var redisTemplate: StringRedisTemplate
private lateinit var serializer: EventSerializer
private lateinit var properties: RedisEventStoreProperties
private lateinit var eventStore: EventStore
private lateinit var eventConsumer: RedisEventConsumer
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = StringRedisTemplate()
redisTemplate.setConnectionFactory(connectionFactory)
redisTemplate.afterPropertiesSet()
serializer = JacksonEventSerializer()
// Register test event types
serializer.registerEventType(TestCreatedEvent::class.java, "TestCreated")
serializer.registerEventType(TestUpdatedEvent::class.java, "TestUpdated")
properties = RedisEventStoreProperties(
host = redisHost,
port = redisPort,
streamPrefix = "test-stream:",
allEventsStream = "all-events",
consumerGroup = "test-group",
consumerName = "test-consumer",
createConsumerGroupIfNotExists = true
)
eventStore = RedisEventStore(redisTemplate, serializer, properties)
eventConsumer = RedisEventConsumer(redisTemplate, serializer, properties)
// Clear all streams
val keys = redisTemplate.keys("${properties.streamPrefix}*")
if (keys != null && keys.isNotEmpty()) {
redisTemplate.delete(keys)
}
}
@AfterEach
fun tearDown() {
// Clear all streams
val keys = redisTemplate.keys("${properties.streamPrefix}*")
if (keys != null && keys.isNotEmpty()) {
redisTemplate.delete(keys)
}
}
@Test
fun `test event publishing and consuming with consumer groups`() {
// Create an aggregate ID
val aggregateId = UUID.randomUUID()
// Create events
val event1 = TestCreatedEvent(
aggregateId = aggregateId,
version = 1,
name = "Test Entity"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregateId,
version = 2,
name = "Updated Test Entity"
)
// Set up a latch to wait for events
val latch = CountDownLatch(2)
val receivedEvents = mutableListOf<DomainEvent>()
// Register a handler for TestCreatedEvent
eventConsumer.registerEventHandler("TestCreated") { event ->
receivedEvents.add(event)
latch.countDown()
}
// Register a handler for TestUpdatedEvent
eventConsumer.registerEventHandler("TestUpdated") { event ->
receivedEvents.add(event)
latch.countDown()
}
// Initialize the consumer
eventConsumer.init()
// Append events to the stream
eventStore.appendToStream(event1, aggregateId, -1)
eventStore.appendToStream(event2, aggregateId, 1)
// Manually trigger event polling
eventConsumer.pollEvents()
// Wait for events to be processed
assertTrue(latch.await(5, TimeUnit.SECONDS), "Timed out waiting for events")
// Verify that both events were received
assertEquals(2, receivedEvents.size)
// Verify the first event
val receivedEvent1 = receivedEvents[0] as TestCreatedEvent
assertEquals(aggregateId, receivedEvent1.aggregateId)
assertEquals(1, receivedEvent1.version)
assertEquals("Test Entity", receivedEvent1.name)
// Verify the second event
val receivedEvent2 = receivedEvents[1] as TestUpdatedEvent
assertEquals(aggregateId, receivedEvent2.aggregateId)
assertEquals(2, receivedEvent2.version)
assertEquals("Updated Test Entity", receivedEvent2.name)
// Clean up
eventConsumer.shutdown()
}
@Test
fun `test multiple consumers with consumer groups`() {
// Create an aggregate ID
val aggregateId = UUID.randomUUID()
// Create events
val event1 = TestCreatedEvent(
aggregateId = aggregateId,
version = 1,
name = "Test Entity"
)
val event2 = TestUpdatedEvent(
aggregateId = aggregateId,
version = 2,
name = "Updated Test Entity"
)
// Set up latches to wait for events
val latch1 = CountDownLatch(2)
val latch2 = CountDownLatch(2)
val receivedEvents1 = mutableListOf<DomainEvent>()
val receivedEvents2 = mutableListOf<DomainEvent>()
// Create a second consumer with a different consumer name
val properties2 = properties.copy(consumerName = "test-consumer-2")
val eventConsumer2 = RedisEventConsumer(redisTemplate, serializer, properties2)
// Register handlers for the first consumer
eventConsumer.registerAllEventsHandler { event ->
receivedEvents1.add(event)
latch1.countDown()
}
// Register handlers for the second consumer
eventConsumer2.registerAllEventsHandler { event ->
receivedEvents2.add(event)
latch2.countDown()
}
// Initialize the consumers
eventConsumer.init()
eventConsumer2.init()
// Append events to the stream
eventStore.appendToStream(event1, aggregateId, -1)
eventStore.appendToStream(event2, aggregateId, 1)
// Manually trigger event polling
eventConsumer.pollEvents()
eventConsumer2.pollEvents()
// Wait for events to be processed by both consumers
assertTrue(latch1.await(5, TimeUnit.SECONDS), "Timed out waiting for events on consumer 1")
assertTrue(latch2.await(5, TimeUnit.SECONDS), "Timed out waiting for events on consumer 2")
// Verify that both consumers received both events
assertEquals(2, receivedEvents1.size)
assertEquals(2, receivedEvents2.size)
// Clean up
eventConsumer.shutdown()
eventConsumer2.shutdown()
}
// Test event classes
class TestCreatedEvent(
override val eventId: UUID = UUID.randomUUID(),
override val timestamp: Instant = Instant.now(),
override val aggregateId: UUID,
override val version: Long,
val name: String
) : BaseDomainEvent(eventId, timestamp, aggregateId, version)
class TestUpdatedEvent(
override val eventId: UUID = UUID.randomUUID(),
override val timestamp: Instant = Instant.now(),
override val aggregateId: UUID,
override val version: Long,
val name: String
) : BaseDomainEvent(eventId, timestamp, aggregateId, version)
}