From 59f608b553eda00cd950b4b0a5a38a28f03e6aa2 Mon Sep 17 00:00:00 2001 From: Stefan Mogeritsch Date: Thu, 12 Feb 2026 20:13:35 +0100 Subject: [PATCH] refactor: replace Redis references with Valkey in tests and cache modules Updated test cases in `ValkeyEventStoreTest` and cache implementation in `ValkeyDistributedCache` to fully transition from Redis to Valkey. Adjusted configurations, templates, connection handling, and exception management to reflect Valkey-specific behavior and APIs. --- .../cache/valkey-cache/build.gradle.kts | 7 + .../cache/valkey/ValkeyConfiguration.kt | 28 +- .../cache/valkey/ValkeyDistributedCache.kt | 1124 ++++++++--------- ...ValkeyDistributedCacheConfigurationTest.kt | 20 +- .../ValkeyDistributedCacheEdgeCasesTest.kt | 20 +- .../ValkeyDistributedCacheIntegrationTest.kt | 20 +- .../ValkeyDistributedCachePerformanceTest.kt | 20 +- .../ValkeyDistributedCacheResilienceTest.kt | 50 +- .../valkey/ValkeyDistributedCacheTest.kt | 43 +- .../src/test/resources/logback-test.xml | 56 +- .../valkey-event-store/build.gradle.kts | 5 + .../eventstore/valkey/ValkeyEventConsumer.kt | 476 +++---- .../eventstore/valkey/ValkeyEventStore.kt | 15 +- .../valkey/ValkeyEventStoreConfiguration.kt | 35 +- ...ValkeyCacheAndEventStoreIntegrationTest.kt | 10 +- .../ValkeyEventConsumerResilienceTest.kt | 14 +- .../ValkeyEventStoreConfigurationTest.kt | 16 +- .../ValkeyEventStoreErrorHandlingTest.kt | 14 +- .../valkey/ValkeyEventStoreIntegrationTest.kt | 14 +- .../valkey/ValkeyEventStoreStreamTest.kt | 14 +- .../eventstore/valkey/ValkeyEventStoreTest.kt | 156 +-- .../valkey/ValkeyIntegrationTest.kt | 14 +- .../infrastructure/gateway/build.gradle.kts | 2 +- .../messaging-client/build.gradle.kts | 54 +- gradle/libs.versions.toml | 5 +- 25 files changed, 1131 insertions(+), 1101 deletions(-) diff --git a/backend/infrastructure/cache/valkey-cache/build.gradle.kts b/backend/infrastructure/cache/valkey-cache/build.gradle.kts index d473b8f3..a88cd3df 100644 --- a/backend/infrastructure/cache/valkey-cache/build.gradle.kts +++ b/backend/infrastructure/cache/valkey-cache/build.gradle.kts @@ -28,6 +28,13 @@ dependencies { // OPTIMIERUNG: Verwendung des `valkey-cache`-Bundles aus libs.versions.toml. // Dieses Bundle enthält Spring Data Valkey, Lettuce und Jackson-Module. implementation(libs.bundles.valkey.cache) + // Benötigt für Lettuce-basierten Valkey-Client (LettuceConnectionFactory) + implementation(libs.lettuce.core) + // Für Boot-Autoconfiguration-Annotations wie @ConfigurationProperties, + // @EnableConfigurationProperties und @ConditionalOnMissingBean + implementation("org.springframework.boot:spring-boot-autoconfigure") + // Optional, generiert Metadata f. @ConfigurationProperties (zur IDE-Unterstützung) + compileOnly("org.springframework.boot:spring-boot-configuration-processor") // Stellt alle Test-Abhängigkeiten gebündelt bereit. testImplementation(projects.platform.platformTesting) testImplementation(libs.bundles.testing.jvm) diff --git a/backend/infrastructure/cache/valkey-cache/src/main/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyConfiguration.kt b/backend/infrastructure/cache/valkey-cache/src/main/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyConfiguration.kt index eecb4d16..d29f34ff 100644 --- a/backend/infrastructure/cache/valkey-cache/src/main/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyConfiguration.kt +++ b/backend/infrastructure/cache/valkey-cache/src/main/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyConfiguration.kt @@ -3,18 +3,18 @@ package at.mocode.infrastructure.cache.valkey import at.mocode.infrastructure.cache.api.CacheConfiguration import at.mocode.infrastructure.cache.api.CacheSerializer import at.mocode.infrastructure.cache.api.DefaultCacheConfiguration +import io.valkey.springframework.data.valkey.connection.ValkeyConnectionFactory +import io.valkey.springframework.data.valkey.connection.ValkeyPassword +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.ValkeyTemplate +import io.valkey.springframework.data.valkey.serializer.StringValkeySerializer import org.springframework.beans.factory.annotation.Qualifier -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.data.redis.connection.RedisConnectionFactory -import org.springframework.data.redis.connection.RedisPassword -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.RedisTemplate -import org.springframework.data.redis.serializer.StringRedisSerializer /** * Valkey connection properties. @@ -46,11 +46,11 @@ class ValkeyConfiguration { * @return Valkey connection factory */ @Bean - fun valkeyConnectionFactory(properties: ValkeyProperties): RedisConnectionFactory { - val config = RedisStandaloneConfiguration().apply { + fun valkeyConnectionFactory(properties: ValkeyProperties): ValkeyConnectionFactory { + val config = ValkeyStandaloneConfiguration().apply { hostName = properties.host port = properties.port - properties.password?.let { password = RedisPassword.of(it) } + properties.password?.let { password = ValkeyPassword.of(it) } database = properties.database } @@ -68,11 +68,11 @@ class ValkeyConfiguration { */ @Bean fun valkeyTemplate( - @Qualifier("valkeyConnectionFactory") connectionFactory: RedisConnectionFactory - ): RedisTemplate { - return RedisTemplate().apply { + @Qualifier("valkeyConnectionFactory") connectionFactory: ValkeyConnectionFactory + ): ValkeyTemplate { + return ValkeyTemplate().apply { setConnectionFactory(connectionFactory) - keySerializer = StringRedisSerializer() + keySerializer = StringValkeySerializer() // Use default serializer for values (byte arrays) afterPropertiesSet() } diff --git a/backend/infrastructure/cache/valkey-cache/src/main/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCache.kt b/backend/infrastructure/cache/valkey-cache/src/main/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCache.kt index 550af652..12b4414e 100644 --- a/backend/infrastructure/cache/valkey-cache/src/main/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCache.kt +++ b/backend/infrastructure/cache/valkey-cache/src/main/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCache.kt @@ -7,9 +7,9 @@ import at.mocode.infrastructure.cache.api.ConnectionState import at.mocode.infrastructure.cache.api.ConnectionStateListener import at.mocode.infrastructure.cache.api.ConnectionStatusTracker import at.mocode.infrastructure.cache.api.DistributedCache +import io.valkey.springframework.data.valkey.ValkeyConnectionFailureException +import io.valkey.springframework.data.valkey.core.ValkeyTemplate import org.slf4j.LoggerFactory -import org.springframework.data.redis.RedisConnectionFailureException -import org.springframework.data.redis.core.RedisTemplate import org.springframework.scheduling.annotation.Scheduled import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList @@ -21,609 +21,609 @@ import kotlin.time.ExperimentalTime @OptIn(ExperimentalTime::class) class ValkeyDistributedCache( - private val valkeyTemplate: RedisTemplate, + private val valkeyTemplate: ValkeyTemplate, private val serializer: CacheSerializer, private val config: CacheConfiguration ) : DistributedCache, ConnectionStatusTracker { - private val logger = LoggerFactory.getLogger(ValkeyDistributedCache::class.java) + private val logger = LoggerFactory.getLogger(ValkeyDistributedCache::class.java) - // Local cache for offline capability - private val localCache = ConcurrentHashMap>() + // Local cache for offline capability + private val localCache = ConcurrentHashMap>() - // Set of keys that have been modified locally and need to be synchronized - private val dirtyKeys = ConcurrentHashMap.newKeySet() + // Set of keys that have been modified locally and need to be synchronized + private val dirtyKeys = ConcurrentHashMap.newKeySet() - // Connection state - private var connectionState = ConnectionState.DISCONNECTED + // Connection state + private var connectionState = ConnectionState.DISCONNECTED - private var lastStateChangeTime = Clock.System.now() + private var lastStateChangeTime = Clock.System.now() - // Connection state listeners - private val connectionListeners = CopyOnWriteArrayList() + // Connection state listeners + private val connectionListeners = CopyOnWriteArrayList() - // Performance metrics tracking - private var totalOperations = 0L - private var successfulOperations = 0L - private var lastMetricsLogTime = Clock.System.now() + // Performance metrics tracking + private var totalOperations = 0L + private var successfulOperations = 0L + private var lastMetricsLogTime = Clock.System.now() - init { - // Try to connect to Valkey - checkConnection() - } + init { + // Try to connect to Valkey + checkConnection() + } - override fun get(key: String, clazz: Class): T? { - val prefixedKey = addPrefix(key) + override fun get(key: String, clazz: Class): T? { + val prefixedKey = addPrefix(key) - // Try to get from the local cache first - val localEntry = localCache[prefixedKey] as? CacheEntry<*> - if (localEntry != null) { - if (localEntry.isExpired()) { - localCache.remove(prefixedKey) - return null - } - @Suppress("UNCHECKED_CAST") - return localEntry.value as T? - } - - // If not in the local cache, and we're disconnected, return null - if (!isConnected()) { - return null - } - - // Try to get from Valkey - try { - val bytes = valkeyTemplate.opsForValue().get(prefixedKey) ?: run { - trackOperation(true) // successful operation, just no data - return null - } - val entry = serializer.deserializeEntry(bytes, clazz) - - // Store in a local cache - @Suppress("UNCHECKED_CAST") - localCache[prefixedKey] = entry as CacheEntry - enforceLocalCacheSize() - - trackOperation(true) - return entry.value - } catch (e: RedisConnectionFailureException) { - handleConnectionFailure(e) - trackOperation(false) - return null - } catch (e: Exception) { - logger.error("Error getting value from Valkey for key $prefixedKey", e) - trackOperation(false) - return null - } - } - - override fun set(key: String, value: T, ttl: Duration?) { - val prefixedKey = addPrefix(key) - // KORREKTUR: Logik verwendet jetzt kotlin.time - val expiresAt = ttl?.let { Clock.System.now() + it } ?: config.defaultTtl?.let { Clock.System.now() + it } - - val entry = CacheEntry( - key = prefixedKey, - value = value, - expiresAt = expiresAt - ) - - @Suppress("UNCHECKED_CAST") - localCache[prefixedKey] = entry as CacheEntry - enforceLocalCacheSize() - - if (!isConnected()) { - markDirty(key) - return - } - - try { - val bytes = serializer.serializeEntry(entry) - val effectiveTtl = ttl ?: config.defaultTtl - if (effectiveTtl != null) { - // KORREKTUR: Konvertierung zu java.time.Duration für RedisTemplate - valkeyTemplate.opsForValue().set(prefixedKey, bytes, effectiveTtl.toJavaDuration()) - } else { - valkeyTemplate.opsForValue().set(prefixedKey, bytes) - } - trackOperation(true) - } catch (e: RedisConnectionFailureException) { - handleConnectionFailure(e) - markDirty(key) - trackOperation(false) - } catch (e: Exception) { - logger.error("Error setting value in Valkey for key $prefixedKey", e) - markDirty(key) - trackOperation(false) - } - } - - override fun delete(key: String) { - val prefixedKey = addPrefix(key) - - // Remove from the local cache + // Try to get from the local cache first + val localEntry = localCache[prefixedKey] as? CacheEntry<*> + if (localEntry != null) { + if (localEntry.isExpired()) { localCache.remove(prefixedKey) + return null + } + @Suppress("UNCHECKED_CAST") + return localEntry.value as T? + } - // If we're disconnected, mark as dirty and return - if (!isConnected()) { - markDirty(key) - return + // If not in the local cache, and we're disconnected, return null + if (!isConnected()) { + return null + } + + // Try to get from Valkey + try { + val bytes = valkeyTemplate.opsForValue().get(prefixedKey) ?: run { + trackOperation(true) // successful operation, just no data + return null + } + val entry = serializer.deserializeEntry(bytes, clazz) + + // Store in a local cache + @Suppress("UNCHECKED_CAST") + localCache[prefixedKey] = entry as CacheEntry + enforceLocalCacheSize() + + trackOperation(true) + return entry.value + } catch (e: ValkeyConnectionFailureException) { + handleConnectionFailure(e) + trackOperation(false) + return null + } catch (e: Exception) { + logger.error("Error getting value from Valkey for key $prefixedKey", e) + trackOperation(false) + return null + } + } + + override fun set(key: String, value: T, ttl: Duration?) { + val prefixedKey = addPrefix(key) + // KORREKTUR: Logik verwendet jetzt kotlin.time + val expiresAt = ttl?.let { Clock.System.now() + it } ?: config.defaultTtl?.let { Clock.System.now() + it } + + val entry = CacheEntry( + key = prefixedKey, + value = value, + expiresAt = expiresAt + ) + + @Suppress("UNCHECKED_CAST") + localCache[prefixedKey] = entry as CacheEntry + enforceLocalCacheSize() + + if (!isConnected()) { + markDirty(key) + return + } + + try { + val bytes = serializer.serializeEntry(entry) + val effectiveTtl = ttl ?: config.defaultTtl + if (effectiveTtl != null) { + // KORREKTUR: Konvertierung zu java.time.Duration für ValkeyTemplate + valkeyTemplate.opsForValue().set(prefixedKey, bytes, effectiveTtl.toJavaDuration()) + } else { + valkeyTemplate.opsForValue().set(prefixedKey, bytes) + } + trackOperation(true) + } catch (e: ValkeyConnectionFailureException) { + handleConnectionFailure(e) + markDirty(key) + trackOperation(false) + } catch (e: Exception) { + logger.error("Error setting value in Valkey for key $prefixedKey", e) + markDirty(key) + trackOperation(false) + } + } + + override fun delete(key: String) { + val prefixedKey = addPrefix(key) + + // Remove from the local cache + localCache.remove(prefixedKey) + + // If we're disconnected, mark as dirty and return + if (!isConnected()) { + markDirty(key) + return + } + + // Try to delete from Valkey + try { + valkeyTemplate.delete(prefixedKey) + } catch (e: ValkeyConnectionFailureException) { + handleConnectionFailure(e) + markDirty(key) + } catch (e: Exception) { + logger.error("Error deleting value from Valkey for key $prefixedKey", e) + markDirty(key) + } + } + + override fun exists(key: String): Boolean { + val prefixedKey = addPrefix(key) + + // Check the local cache first + if (localCache.containsKey(prefixedKey)) { + val entry = localCache[prefixedKey] + if (entry != null && !entry.isExpired()) { + return true + } + // Remove expired entry + localCache.remove(prefixedKey) + } + + // If we're disconnected, return false + if (!isConnected()) { + return false + } + + // Check Valkey + try { + return valkeyTemplate.hasKey(prefixedKey) ?: false + } catch (e: ValkeyConnectionFailureException) { + handleConnectionFailure(e) + return false + } catch (e: Exception) { + logger.error("Error checking if key exists in Valkey for key $prefixedKey", e) + return false + } + } + + override fun multiGet(keys: Collection, clazz: Class): Map { + val result = mutableMapOf() + + // Get from the local cache first + val prefixedKeys = keys.map { addPrefix(it) } + val localEntries = prefixedKeys.mapNotNull { key -> + @Suppress("UNCHECKED_CAST") + val entry = localCache[key] as? CacheEntry + if (entry != null && !entry.isExpired()) { + key to entry.value + } else { + null + } + }.toMap() + + result.putAll(localEntries.mapKeys { removePrefix(it.key) }) + + // If we're disconnected, return local entries + if (!isConnected()) { + return result + } + + // Get missing keys from Valkey + val missingKeys = prefixedKeys.filter { !localEntries.containsKey(it) } + if (missingKeys.isEmpty()) { + return result + } + + try { + val valkeyEntries = valkeyTemplate.opsForValue().multiGet(missingKeys) + if (valkeyEntries != null) { + for (i in missingKeys.indices) { + val key = missingKeys[i] + val bytes = valkeyEntries[i] + if (bytes != null) { + try { + val entry = serializer.deserializeEntry(bytes, clazz) + + // Store in a local cache + @Suppress("UNCHECKED_CAST") + localCache[key] = entry as CacheEntry + enforceLocalCacheSize() + + // Add to result + result[removePrefix(key)] = entry.value + } catch (e: Exception) { + logger.error("Error deserializing entry for key $key", e) + } + } } + } + } catch (e: ValkeyConnectionFailureException) { + handleConnectionFailure(e) + } catch (e: Exception) { + logger.error("Error getting multiple values from Valkey", e) + } - // Try to delete from Valkey + return result + } + + // ... (multiSet ebenfalls anpassen) + override fun multiSet(entries: Map, ttl: Duration?) { + val valkeyBatch = mutableMapOf() + val expiresAt = ttl?.let { Clock.System.now() + it } ?: config.defaultTtl?.let { Clock.System.now() + it } + + for ((key, value) in entries) { + val prefixedKey = addPrefix(key) + val entry = CacheEntry( + key = prefixedKey, + value = value, + expiresAt = expiresAt + ) + @Suppress("UNCHECKED_CAST") + localCache[prefixedKey] = entry as CacheEntry + enforceLocalCacheSize() + valkeyBatch[prefixedKey] = serializer.serializeEntry(entry) + } + + if (!isConnected()) { + entries.keys.forEach { markDirty(it) } + return + } + + try { + valkeyTemplate.opsForValue().multiSet(valkeyBatch) + val effectiveTtl = ttl ?: config.defaultTtl + if (effectiveTtl != null) { + valkeyTemplate.executePipelined { connection -> + valkeyBatch.keys.forEach { key -> + connection.keyCommands().pExpire(key.toByteArray(), effectiveTtl.inWholeMilliseconds) + } + null + } + } + } catch (e: ValkeyConnectionFailureException) { + handleConnectionFailure(e) + entries.keys.forEach { markDirty(it) } + } catch (e: Exception) { + logger.error("Error setting multiple values in Valkey", e) + entries.keys.forEach { markDirty(it) } + } + } + + override fun multiDelete(keys: Collection) { + val prefixedKeys = keys.map { addPrefix(it) } + + // Remove from the local cache + prefixedKeys.forEach { localCache.remove(it) } + + // If we're disconnected, mark all as dirty and return + if (!isConnected()) { + keys.forEach { markDirty(it) } + return + } + + // Try to delete from Valkey + try { + valkeyTemplate.delete(prefixedKeys) + } catch (e: ValkeyConnectionFailureException) { + handleConnectionFailure(e) + keys.forEach { markDirty(it) } + } catch (e: Exception) { + logger.error("Error deleting multiple values from Valkey", e) + keys.forEach { markDirty(it) } + } + } + + override fun synchronize(keys: Collection?) { + if (!isConnected()) { + logger.debug("Cannot synchronize while disconnected") + return + } + + val keysToSync = keys ?: getDirtyKeys() + if (keysToSync.isEmpty()) { + logger.debug("No keys to synchronize") + return + } + + logger.debug("Synchronizing ${keysToSync.size} keys") + + for (key in keysToSync) { + val prefixedKey = addPrefix(key) + val localEntry = localCache[prefixedKey] + + if (localEntry == null) { + // Entry was deleted locally, delete from Valkey try { - valkeyTemplate.delete(prefixedKey) - } catch (e: RedisConnectionFailureException) { - handleConnectionFailure(e) - markDirty(key) + valkeyTemplate.delete(prefixedKey) + dirtyKeys.remove(key) } catch (e: Exception) { - logger.error("Error deleting value from Valkey for key $prefixedKey", e) - markDirty(key) + logger.error("Error deleting key $prefixedKey during synchronization", e) } - } - - override fun exists(key: String): Boolean { - val prefixedKey = addPrefix(key) - - // Check the local cache first - if (localCache.containsKey(prefixedKey)) { - val entry = localCache[prefixedKey] - if (entry != null && !entry.isExpired()) { - return true - } - // Remove expired entry - localCache.remove(prefixedKey) - } - - // If we're disconnected, return false - if (!isConnected()) { - return false - } - - // Check Valkey + } else { + // Entry exists locally, update in Valkey try { - return valkeyTemplate.hasKey(prefixedKey) ?: false - } catch (e: RedisConnectionFailureException) { - handleConnectionFailure(e) - return false + val bytes = serializer.serializeEntry(localEntry) + + // Die 'set'-Methode erwartet kein TTL-Argument hier + valkeyTemplate.opsForValue().set(prefixedKey, bytes) + + // So wird die Dauer zwischen zwei Instants berechnet + val ttl = localEntry.expiresAt?.let { it - Clock.System.now() } + + // 'isNegative' wird zu '< Duration.ZERO' + if (ttl != null && ttl > Duration.ZERO) { + // KORREKTUR: 'expire' braucht eine java.time.Duration + valkeyTemplate.expire(prefixedKey, ttl.toJavaDuration()) + } + + // Update local entry to mark as clean + localCache[prefixedKey] = localEntry.markClean() + dirtyKeys.remove(key) } catch (e: Exception) { - logger.error("Error checking if key exists in Valkey for key $prefixedKey", e) - return false + logger.error("Error updating key $prefixedKey during synchronization", e) } + } + } + } + + override fun markDirty(key: String) { + dirtyKeys.add(key) + + val prefixedKey = addPrefix(key) + val entry = localCache[prefixedKey] + if (entry != null) { + localCache[prefixedKey] = entry.markDirty() + } + } + + override fun getDirtyKeys(): Collection { + return dirtyKeys.toList() + } + + override fun clear() { + // Clear local cache + localCache.clear() + dirtyKeys.clear() + + // If we're disconnected, return + if (!isConnected()) { + return } - override fun multiGet(keys: Collection, clazz: Class): Map { - val result = mutableMapOf() + // Try to clear Valkey + try { + val keys = valkeyTemplate.keys("${config.keyPrefix}*") + if (keys != null && keys.isNotEmpty()) { + valkeyTemplate.delete(keys) + } + } catch (e: ValkeyConnectionFailureException) { + handleConnectionFailure(e) + } catch (e: Exception) { + logger.error("Error clearing Valkey cache", e) + } + } - // Get from the local cache first - val prefixedKeys = keys.map { addPrefix(it) } - val localEntries = prefixedKeys.mapNotNull { key -> - @Suppress("UNCHECKED_CAST") - val entry = localCache[key] as? CacheEntry - if (entry != null && !entry.isExpired()) { - key to entry.value - } else { - null - } - }.toMap() + // + // ConnectionStatusTracker implementation + // - result.putAll(localEntries.mapKeys { removePrefix(it.key) }) + override fun getConnectionState(): ConnectionState { + return connectionState + } - // If we're disconnected, return local entries - if (!isConnected()) { - return result - } + override fun getLastStateChangeTime(): Instant { + return lastStateChangeTime + } - // Get missing keys from Valkey - val missingKeys = prefixedKeys.filter { !localEntries.containsKey(it) } - if (missingKeys.isEmpty()) { - return result - } + override fun registerConnectionListener(listener: ConnectionStateListener) { + connectionListeners.add(listener) + } + override fun unregisterConnectionListener(listener: ConnectionStateListener) { + connectionListeners.remove(listener) + } + + // + // Helper methods + // + + private fun addPrefix(key: String): String { + return if (config.keyPrefix.isEmpty()) key else "${config.keyPrefix}:$key" + } + + private fun removePrefix(key: String): String { + return if (config.keyPrefix.isEmpty()) key else key.substring(config.keyPrefix.length + 1) + } + + /** + * Erzwingt die maximale Größe des lokalen Caches, indem die am längsten nicht + * mehr modifizierten Einträge entfernt werden. + */ + private fun enforceLocalCacheSize() { + val max = config.localCacheMaxSize ?: return + val overflow = localCache.size - max + if (overflow <= 0) return + val toEvict = localCache.entries + .sortedBy { it.value.lastModifiedAt } + .take(overflow) + .map { it.key } + toEvict.forEach { localCache.remove(it) } + logger.debug("Evicted ${toEvict.size} entries to enforce local cache size limit $max") + } + + private fun handleConnectionFailure(e: Exception) { + logger.warn("Valkey connection failure: ${e.message}") + setConnectionState(ConnectionState.DISCONNECTED) + } + + private fun setConnectionState(newState: ConnectionState) { + if (connectionState != newState) { + val oldState = connectionState + connectionState = newState + lastStateChangeTime = Clock.System.now() + + logger.info("Cache connection state changed from $oldState to $newState") + + // Notify listeners + val timestamp = lastStateChangeTime + connectionListeners.forEach { listener -> try { - val valkeyEntries = valkeyTemplate.opsForValue().multiGet(missingKeys) - if (valkeyEntries != null) { - for (i in missingKeys.indices) { - val key = missingKeys[i] - val bytes = valkeyEntries[i] - if (bytes != null) { - try { - val entry = serializer.deserializeEntry(bytes, clazz) - - // Store in a local cache - @Suppress("UNCHECKED_CAST") - localCache[key] = entry as CacheEntry - enforceLocalCacheSize() - - // Add to result - result[removePrefix(key)] = entry.value - } catch (e: Exception) { - logger.error("Error deserializing entry for key $key", e) - } - } - } - } - } catch (e: RedisConnectionFailureException) { - handleConnectionFailure(e) + listener.onConnectionStateChanged(newState, timestamp) } catch (e: Exception) { - logger.error("Error getting multiple values from Valkey", e) + logger.error("Error notifying connection listener", e) } + } - return result + // If reconnected, synchronize dirty keys + if (oldState != ConnectionState.CONNECTED && newState == ConnectionState.CONNECTED) { + synchronize(null) + } } + } - // ... (multiSet ebenfalls anpassen) - override fun multiSet(entries: Map, ttl: Duration?) { - val valkeyBatch = mutableMapOf() - val expiresAt = ttl?.let { Clock.System.now() + it } ?: config.defaultTtl?.let { Clock.System.now() + it } + /** + * Prüft periodisch die Verbindung zu Valkey. + */ + @Scheduled(fixedDelayString = "\${valkey.connection-check-interval:10000}") + fun checkConnection() { + try { + valkeyTemplate.hasKey("connection-test") + setConnectionState(ConnectionState.CONNECTED) + } catch (_: Exception) { + setConnectionState(ConnectionState.DISCONNECTED) + } + } - for ((key, value) in entries) { - val prefixedKey = addPrefix(key) - val entry = CacheEntry( - key = prefixedKey, - value = value, - expiresAt = expiresAt - ) - @Suppress("UNCHECKED_CAST") - localCache[prefixedKey] = entry as CacheEntry - enforceLocalCacheSize() - valkeyBatch[prefixedKey] = serializer.serializeEntry(entry) - } - - if (!isConnected()) { - entries.keys.forEach { markDirty(it) } - return - } - - try { - valkeyTemplate.opsForValue().multiSet(valkeyBatch) - val effectiveTtl = ttl ?: config.defaultTtl - if (effectiveTtl != null) { - valkeyTemplate.executePipelined { connection -> - valkeyBatch.keys.forEach { key -> - connection.keyCommands().pExpire(key.toByteArray(), effectiveTtl.inWholeMilliseconds) - } - null - } - } - } catch (e: RedisConnectionFailureException) { - handleConnectionFailure(e) - entries.keys.forEach { markDirty(it) } - } catch (e: Exception) { - logger.error("Error setting multiple values in Valkey", e) - entries.keys.forEach { markDirty(it) } + /** + * Bereinigt periodisch abgelaufene Einträge aus dem lokalen Cache. + */ + @Scheduled(fixedDelayString = "\${valkey.local-cache-cleanup-interval:60000}") + fun cleanupLocalCache() { + val now = Clock.System.now() + val expiredKeys = localCache.entries + .filter { it.value.expiresAt?.let { exp -> exp < now } ?: false } + .map { it.key } + + expiredKeys.forEach { localCache.remove(it) } + + if (expiredKeys.isNotEmpty()) { + logger.debug("Removed ${expiredKeys.size} expired entries from local cache") + } + } + + /** + * Synchronisiert periodisch schmutzige Schlüssel, sobald verbunden. + */ + @Scheduled(fixedDelayString = "\${valkey.sync-interval:300000}") + fun scheduledSync() { + if (isConnected() && dirtyKeys.isNotEmpty()) { + synchronize(null) + } + } + + // + // Performance monitoring and optimization methods + // + + /** + * Zeichnet eine Cache-Operation für Metriken auf. + */ + private fun trackOperation(success: Boolean) { + synchronized(this) { + totalOperations++ + if (success) successfulOperations++ + } + } + + /** + * Liefert aktuelle Performance-Metriken. + */ + fun getPerformanceMetrics(): Map { + val now = Clock.System.now() + val successRate = if (totalOperations > 0) { + (successfulOperations.toDouble() / totalOperations.toDouble()) * 100.0 + } else 0.0 + + return mapOf( + "totalOperations" to totalOperations, + "successfulOperations" to successfulOperations, + "successRate" to String.format("%.1f%%", successRate), + "dirtyKeysCount" to dirtyKeys.size, + "localCacheSize" to localCache.size, + "connectionState" to connectionState.name, + "lastStateChangeTime" to lastStateChangeTime, + "uptimeSinceLastMetrics" to (now - lastMetricsLogTime) + ) + } + + /** + * Loggt Performance-Metriken (periodisch aufgerufen). + */ + @Scheduled(fixedDelayString = $$"${valkey.metrics-log-interval:300000}") + fun logPerformanceMetrics() { + val metrics = getPerformanceMetrics() + logger.info("Cache performance metrics: $metrics") + lastMetricsLogTime = Clock.System.now() + } + + /** + * Cache-Warming-Helfer – lädt angegebene Schlüssel vor. + */ + fun warmCache(keys: Collection, dataLoader: (String) -> Any?) { + logger.info("Starting cache warming for ${keys.size} keys") + var warmedCount = 0 + val startTime = Clock.System.now() + + keys.forEach { key -> + if (!exists(key)) { + val data = dataLoader(key) + if (data != null) { + set(key, data, config.defaultTtl) + warmedCount++ } + } } - override fun multiDelete(keys: Collection) { - val prefixedKeys = keys.map { addPrefix(it) } + val duration = Clock.System.now() - startTime + logger.info("Cache warming completed: $warmedCount/${keys.size} keys loaded in $duration") + } - // Remove from the local cache - prefixedKeys.forEach { localCache.remove(it) } + /** + * Bulk-Cache-Warming mit Batch-Operationen. + */ + fun warmCacheBulk(keyDataMap: Map, ttl: Duration? = null) { + logger.info("Starting bulk cache warming for ${keyDataMap.size} entries") + val startTime = Clock.System.now() - // If we're disconnected, mark all as dirty and return - if (!isConnected()) { - keys.forEach { markDirty(it) } - return - } + multiSet(keyDataMap, ttl ?: config.defaultTtl) - // Try to delete from Valkey - try { - valkeyTemplate.delete(prefixedKeys) - } catch (e: RedisConnectionFailureException) { - handleConnectionFailure(e) - keys.forEach { markDirty(it) } - } catch (e: Exception) { - logger.error("Error deleting multiple values from Valkey", e) - keys.forEach { markDirty(it) } - } - } + val duration = Clock.System.now() - startTime + logger.info("Bulk cache warming completed: ${keyDataMap.size} entries loaded in $duration") + } - override fun synchronize(keys: Collection?) { - if (!isConnected()) { - logger.debug("Cannot synchronize while disconnected") - return - } + /** + * Liefert den Cache-Gesundheitsstatus. + */ + fun getHealthStatus(): Map { + val metrics = getPerformanceMetrics() + val successRate = metrics["successRate"] as String + val successRateValue = successRate.replace("%", "").toDoubleOrNull() ?: 0.0 - val keysToSync = keys ?: getDirtyKeys() - if (keysToSync.isEmpty()) { - logger.debug("No keys to synchronize") - return - } - - logger.debug("Synchronizing ${keysToSync.size} keys") - - for (key in keysToSync) { - val prefixedKey = addPrefix(key) - val localEntry = localCache[prefixedKey] - - if (localEntry == null) { - // Entry was deleted locally, delete from Valkey - try { - valkeyTemplate.delete(prefixedKey) - dirtyKeys.remove(key) - } catch (e: Exception) { - logger.error("Error deleting key $prefixedKey during synchronization", e) - } - } else { - // Entry exists locally, update in Valkey - try { - val bytes = serializer.serializeEntry(localEntry) - - // Die 'set'-Methode erwartet kein TTL-Argument hier - valkeyTemplate.opsForValue().set(prefixedKey, bytes) - - // So wird die Dauer zwischen zwei Instants berechnet - val ttl = localEntry.expiresAt?.let { it - Clock.System.now() } - - // 'isNegative' wird zu '< Duration.ZERO' - if (ttl != null && ttl > Duration.ZERO) { - // KORREKTUR: 'expire' braucht eine java.time.Duration - valkeyTemplate.expire(prefixedKey, ttl.toJavaDuration()) - } - - // Update local entry to mark as clean - localCache[prefixedKey] = localEntry.markClean() - dirtyKeys.remove(key) - } catch (e: Exception) { - logger.error("Error updating key $prefixedKey during synchronization", e) - } - } - } - } - - override fun markDirty(key: String) { - dirtyKeys.add(key) - - val prefixedKey = addPrefix(key) - val entry = localCache[prefixedKey] - if (entry != null) { - localCache[prefixedKey] = entry.markDirty() - } - } - - override fun getDirtyKeys(): Collection { - return dirtyKeys.toList() - } - - override fun clear() { - // Clear local cache - localCache.clear() - dirtyKeys.clear() - - // If we're disconnected, return - if (!isConnected()) { - return - } - - // Try to clear Valkey - try { - val keys = valkeyTemplate.keys("${config.keyPrefix}*") - if (keys != null && keys.isNotEmpty()) { - valkeyTemplate.delete(keys) - } - } catch (e: RedisConnectionFailureException) { - handleConnectionFailure(e) - } catch (e: Exception) { - logger.error("Error clearing Valkey cache", e) - } - } - - // - // ConnectionStatusTracker implementation - // - - override fun getConnectionState(): ConnectionState { - return connectionState - } - - override fun getLastStateChangeTime(): Instant { - return lastStateChangeTime - } - - override fun registerConnectionListener(listener: ConnectionStateListener) { - connectionListeners.add(listener) - } - - override fun unregisterConnectionListener(listener: ConnectionStateListener) { - connectionListeners.remove(listener) - } - - // - // Helper methods - // - - private fun addPrefix(key: String): String { - return if (config.keyPrefix.isEmpty()) key else "${config.keyPrefix}:$key" - } - - private fun removePrefix(key: String): String { - return if (config.keyPrefix.isEmpty()) key else key.substring(config.keyPrefix.length + 1) - } - - /** - * Erzwingt die maximale Größe des lokalen Caches, indem die am längsten nicht - * mehr modifizierten Einträge entfernt werden. - */ - private fun enforceLocalCacheSize() { - val max = config.localCacheMaxSize ?: return - val overflow = localCache.size - max - if (overflow <= 0) return - val toEvict = localCache.entries - .sortedBy { it.value.lastModifiedAt } - .take(overflow) - .map { it.key } - toEvict.forEach { localCache.remove(it) } - logger.debug("Evicted ${toEvict.size} entries to enforce local cache size limit $max") - } - - private fun handleConnectionFailure(e: Exception) { - logger.warn("Valkey connection failure: ${e.message}") - setConnectionState(ConnectionState.DISCONNECTED) - } - - private fun setConnectionState(newState: ConnectionState) { - if (connectionState != newState) { - val oldState = connectionState - connectionState = newState - lastStateChangeTime = Clock.System.now() - - logger.info("Cache connection state changed from $oldState to $newState") - - // Notify listeners - val timestamp = lastStateChangeTime - connectionListeners.forEach { listener -> - try { - listener.onConnectionStateChanged(newState, timestamp) - } catch (e: Exception) { - logger.error("Error notifying connection listener", e) - } - } - - // If reconnected, synchronize dirty keys - if (oldState != ConnectionState.CONNECTED && newState == ConnectionState.CONNECTED) { - synchronize(null) - } - } - } - - /** - * Prüft periodisch die Verbindung zu Valkey. - */ - @Scheduled(fixedDelayString = "\${valkey.connection-check-interval:10000}") - fun checkConnection() { - try { - valkeyTemplate.hasKey("connection-test") - setConnectionState(ConnectionState.CONNECTED) - } catch (_: Exception) { - setConnectionState(ConnectionState.DISCONNECTED) - } - } - - /** - * Bereinigt periodisch abgelaufene Einträge aus dem lokalen Cache. - */ - @Scheduled(fixedDelayString = "\${valkey.local-cache-cleanup-interval:60000}") - fun cleanupLocalCache() { - val now = Clock.System.now() - val expiredKeys = localCache.entries - .filter { it.value.expiresAt?.let { exp -> exp < now } ?: false } - .map { it.key } - - expiredKeys.forEach { localCache.remove(it) } - - if (expiredKeys.isNotEmpty()) { - logger.debug("Removed ${expiredKeys.size} expired entries from local cache") - } - } - - /** - * Synchronisiert periodisch schmutzige Schlüssel, sobald verbunden. - */ - @Scheduled(fixedDelayString = "\${valkey.sync-interval:300000}") - fun scheduledSync() { - if (isConnected() && dirtyKeys.isNotEmpty()) { - synchronize(null) - } - } - - // - // Performance monitoring and optimization methods - // - - /** - * Zeichnet eine Cache-Operation für Metriken auf. - */ - private fun trackOperation(success: Boolean) { - synchronized(this) { - totalOperations++ - if (success) successfulOperations++ - } - } - - /** - * Liefert aktuelle Performance-Metriken. - */ - fun getPerformanceMetrics(): Map { - val now = Clock.System.now() - val successRate = if (totalOperations > 0) { - (successfulOperations.toDouble() / totalOperations.toDouble()) * 100.0 - } else 0.0 - - return mapOf( - "totalOperations" to totalOperations, - "successfulOperations" to successfulOperations, - "successRate" to String.format("%.1f%%", successRate), - "dirtyKeysCount" to dirtyKeys.size, - "localCacheSize" to localCache.size, - "connectionState" to connectionState.name, - "lastStateChangeTime" to lastStateChangeTime, - "uptimeSinceLastMetrics" to (now - lastMetricsLogTime) - ) - } - - /** - * Loggt Performance-Metriken (periodisch aufgerufen). - */ - @Scheduled(fixedDelayString = "\${valkey.metrics-log-interval:300000}") - fun logPerformanceMetrics() { - val metrics = getPerformanceMetrics() - logger.info("Cache performance metrics: $metrics") - lastMetricsLogTime = Clock.System.now() - } - - /** - * Cache-Warming-Helfer – lädt angegebene Schlüssel vor. - */ - fun warmCache(keys: Collection, dataLoader: (String) -> Any?) { - logger.info("Starting cache warming for ${keys.size} keys") - var warmedCount = 0 - val startTime = Clock.System.now() - - keys.forEach { key -> - if (!exists(key)) { - val data = dataLoader(key) - if (data != null) { - set(key, data, config.defaultTtl) - warmedCount++ - } - } - } - - val duration = Clock.System.now() - startTime - logger.info("Cache warming completed: $warmedCount/${keys.size} keys loaded in $duration") - } - - /** - * Bulk-Cache-Warming mit Batch-Operationen. - */ - fun warmCacheBulk(keyDataMap: Map, ttl: Duration? = null) { - logger.info("Starting bulk cache warming for ${keyDataMap.size} entries") - val startTime = Clock.System.now() - - multiSet(keyDataMap, ttl ?: config.defaultTtl) - - val duration = Clock.System.now() - startTime - logger.info("Bulk cache warming completed: ${keyDataMap.size} entries loaded in $duration") - } - - /** - * Liefert den Cache-Gesundheitsstatus. - */ - fun getHealthStatus(): Map { - val metrics = getPerformanceMetrics() - val successRate = metrics["successRate"] as String - val successRateValue = successRate.replace("%", "").toDoubleOrNull() ?: 0.0 - - return mapOf( - "healthy" to (connectionState == ConnectionState.CONNECTED && successRateValue >= 90.0), - "connectionState" to connectionState.name, - "successRate" to successRate, - "localCacheUtilization" to if (config.localCacheMaxSize != null) { - "${localCache.size}/${config.localCacheMaxSize}" - } else "${localCache.size}/unlimited", - "dirtyKeysCount" to dirtyKeys.size, - "lastHealthCheck" to Clock.System.now() - ) - } + return mapOf( + "healthy" to (connectionState == ConnectionState.CONNECTED && successRateValue >= 90.0), + "connectionState" to connectionState.name, + "successRate" to successRate, + "localCacheUtilization" to if (config.localCacheMaxSize != null) { + "${localCache.size}/${config.localCacheMaxSize}" + } else "${localCache.size}/unlimited", + "dirtyKeysCount" to dirtyKeys.size, + "lastHealthCheck" to Clock.System.now() + ) + } } diff --git a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheConfigurationTest.kt b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheConfigurationTest.kt index e3f96310..41bf485a 100644 --- a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheConfigurationTest.kt +++ b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheConfigurationTest.kt @@ -5,12 +5,12 @@ import at.mocode.infrastructure.cache.api.DefaultCacheConfiguration import at.mocode.infrastructure.cache.api.get import at.mocode.infrastructure.cache.api.multiGet import io.github.oshai.kotlinlogging.KotlinLogging +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.ValkeyTemplate +import io.valkey.springframework.data.valkey.serializer.StringValkeySerializer import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.RedisTemplate -import org.springframework.data.redis.serializer.StringRedisSerializer import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -37,14 +37,14 @@ class ValkeyDistributedCacheConfigurationTest { @Container val valkeyContainer = GenericContainer( - DockerImageName.parse("valkey/valkey:9-alpine") - .asCompatibleSubstituteFor("valkey") + DockerImageName.parse("valkey/valkey:8.0.2-alpine") + .asCompatibleSubstituteFor("redis") ).apply { withExposedPorts(6379) } } - private lateinit var valkeyTemplate: RedisTemplate + private lateinit var valkeyTemplate: ValkeyTemplate private lateinit var serializer: CacheSerializer @BeforeEach @@ -52,13 +52,13 @@ class ValkeyDistributedCacheConfigurationTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = RedisTemplate().apply { + valkeyTemplate = ValkeyTemplate().apply { setConnectionFactory(connectionFactory) - keySerializer = StringRedisSerializer() + keySerializer = StringValkeySerializer() afterPropertiesSet() } diff --git a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheEdgeCasesTest.kt b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheEdgeCasesTest.kt index fa5561b5..a3d39532 100644 --- a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheEdgeCasesTest.kt +++ b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheEdgeCasesTest.kt @@ -2,12 +2,12 @@ package at.mocode.infrastructure.cache.valkey import at.mocode.infrastructure.cache.api.* import io.github.oshai.kotlinlogging.KotlinLogging +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.ValkeyTemplate +import io.valkey.springframework.data.valkey.serializer.StringValkeySerializer import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.RedisTemplate -import org.springframework.data.redis.serializer.StringRedisSerializer import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -29,14 +29,14 @@ class ValkeyDistributedCacheEdgeCasesTest { @Container val valkeyContainer = GenericContainer( - DockerImageName.parse("valkey/valkey:9-alpine") - .asCompatibleSubstituteFor("valkey") + DockerImageName.parse("valkey/valkey:8.0.2-alpine") + .asCompatibleSubstituteFor("redis") ).apply { withExposedPorts(6379) } } - private lateinit var valkeyTemplate: RedisTemplate + private lateinit var valkeyTemplate: ValkeyTemplate private lateinit var serializer: CacheSerializer private lateinit var config: CacheConfiguration private lateinit var cache: ValkeyDistributedCache @@ -46,13 +46,13 @@ class ValkeyDistributedCacheEdgeCasesTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = RedisTemplate().apply { + valkeyTemplate = ValkeyTemplate().apply { setConnectionFactory(connectionFactory) - keySerializer = StringRedisSerializer() + keySerializer = StringValkeySerializer() afterPropertiesSet() } diff --git a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheIntegrationTest.kt b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheIntegrationTest.kt index 5550fd52..e84c6f48 100644 --- a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheIntegrationTest.kt +++ b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheIntegrationTest.kt @@ -2,14 +2,14 @@ package at.mocode.infrastructure.cache.valkey import at.mocode.infrastructure.cache.api.* import io.github.oshai.kotlinlogging.KotlinLogging +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.ValkeyTemplate +import io.valkey.springframework.data.valkey.serializer.StringValkeySerializer import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.RedisTemplate -import org.springframework.data.redis.serializer.StringRedisSerializer import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -36,14 +36,14 @@ class ValkeyDistributedCacheIntegrationTest { @Container val valkeyContainer = GenericContainer( - DockerImageName.parse("valkey/valkey:9-alpine") - .asCompatibleSubstituteFor("valkey") + DockerImageName.parse("valkey/valkey:8.0.2-alpine") + .asCompatibleSubstituteFor("redis") ).apply { withExposedPorts(6379) } } - private lateinit var valkeyTemplate: RedisTemplate + private lateinit var valkeyTemplate: ValkeyTemplate private lateinit var serializer: CacheSerializer private lateinit var config: CacheConfiguration @@ -52,13 +52,13 @@ class ValkeyDistributedCacheIntegrationTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = RedisTemplate().apply { + valkeyTemplate = ValkeyTemplate().apply { setConnectionFactory(connectionFactory) - keySerializer = StringRedisSerializer() + keySerializer = StringValkeySerializer() afterPropertiesSet() } diff --git a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCachePerformanceTest.kt b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCachePerformanceTest.kt index e5b45850..cf90743a 100644 --- a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCachePerformanceTest.kt +++ b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCachePerformanceTest.kt @@ -2,15 +2,15 @@ package at.mocode.infrastructure.cache.valkey import at.mocode.infrastructure.cache.api.* import io.github.oshai.kotlinlogging.KotlinLogging +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.ValkeyTemplate +import io.valkey.springframework.data.valkey.serializer.StringValkeySerializer import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.RedisTemplate -import org.springframework.data.redis.serializer.StringRedisSerializer import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -32,14 +32,14 @@ class ValkeyDistributedCachePerformanceTest { @Container val valkeyContainer = GenericContainer( - DockerImageName.parse("valkey/valkey:9-alpine") - .asCompatibleSubstituteFor("valkey") + DockerImageName.parse("valkey/valkey:8.0.2-alpine") + .asCompatibleSubstituteFor("redis") ).apply { withExposedPorts(6379) } } - private lateinit var valkeyTemplate: RedisTemplate + private lateinit var valkeyTemplate: ValkeyTemplate private lateinit var serializer: CacheSerializer private lateinit var config: CacheConfiguration private lateinit var cache: ValkeyDistributedCache @@ -49,13 +49,13 @@ class ValkeyDistributedCachePerformanceTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = RedisTemplate().apply { + valkeyTemplate = ValkeyTemplate().apply { setConnectionFactory(connectionFactory) - keySerializer = StringRedisSerializer() + keySerializer = StringValkeySerializer() afterPropertiesSet() } diff --git a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheResilienceTest.kt b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheResilienceTest.kt index 15ae651c..86939afb 100644 --- a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheResilienceTest.kt +++ b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheResilienceTest.kt @@ -4,16 +4,16 @@ import at.mocode.infrastructure.cache.api.* import io.github.oshai.kotlinlogging.KotlinLogging import io.mockk.every import io.mockk.mockk +import io.valkey.springframework.data.valkey.ValkeyConnectionFailureException +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.ValkeyTemplate +import io.valkey.springframework.data.valkey.core.ValueOperations +import io.valkey.springframework.data.valkey.serializer.StringValkeySerializer import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.springframework.data.redis.RedisConnectionFailureException -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.RedisTemplate -import org.springframework.data.redis.core.ValueOperations -import org.springframework.data.redis.serializer.StringRedisSerializer import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -38,14 +38,14 @@ class ValkeyDistributedCacheResilienceTest { @Container val valkeyContainer = GenericContainer( - DockerImageName.parse("valkey/valkey:9-alpine") - .asCompatibleSubstituteFor("valkey") + DockerImageName.parse("valkey/valkey:8.0.2-alpine") + .asCompatibleSubstituteFor("redis") ).apply { withExposedPorts(6379) } } - private lateinit var valkeyTemplate: RedisTemplate + private lateinit var valkeyTemplate: ValkeyTemplate private lateinit var serializer: CacheSerializer private lateinit var config: CacheConfiguration @@ -54,13 +54,13 @@ class ValkeyDistributedCacheResilienceTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = RedisTemplate().apply { + valkeyTemplate = ValkeyTemplate().apply { setConnectionFactory(connectionFactory) - keySerializer = StringRedisSerializer() + keySerializer = StringValkeySerializer() afterPropertiesSet() } @@ -76,7 +76,7 @@ class ValkeyDistributedCacheResilienceTest { fun `test connection timeout scenarios`() = runBlocking { logger.info { "Testing connection timeout scenarios" } - val mockTemplate = mockk>() + val mockTemplate = mockk>() val mockValueOps = mockk>() every { mockTemplate.opsForValue() } returns mockValueOps @@ -117,7 +117,7 @@ class ValkeyDistributedCacheResilienceTest { fun `test partial valkey failures`() { logger.info { "Testing partial Valkey failures" } - val mockTemplate = mockk>() + val mockTemplate = mockk>() val mockValueOps = mockk>() every { mockTemplate.opsForValue() } returns mockValueOps @@ -128,14 +128,14 @@ class ValkeyDistributedCacheResilienceTest { // Simulate intermittent connection failures (fail every 3rd operation) every { mockValueOps.get(any()) } answers { if (failureCounter.incrementAndGet() % 3 == 0) { - throw RedisConnectionFailureException("Intermittent failure") + throw ValkeyConnectionFailureException("Intermittent failure") as Throwable } serializer.serializeEntry(CacheEntry("test", "value")) } every { mockValueOps.set(any(), any(), any()) } answers { if (failureCounter.incrementAndGet() % 3 == 0) { - throw RedisConnectionFailureException("Intermittent failure") + throw ValkeyConnectionFailureException("Intermittent failure") } } @@ -197,20 +197,20 @@ class ValkeyDistributedCacheResilienceTest { // Phase 2: Simulate network partition by creating a new cache with a broken connection logger.info { "Phase 2: Simulating network partition" } - val mockTemplate = mockk>() + val mockTemplate = mockk>() val mockValueOps = mockk>() every { mockTemplate.opsForValue() } returns mockValueOps - every { mockValueOps.get(any()) } throws RedisConnectionFailureException("Network partition") + every { mockValueOps.get(any()) } throws ValkeyConnectionFailureException("Network partition") every { mockValueOps.set( any(), any(), any() ) - } throws RedisConnectionFailureException("Network partition") - every { mockTemplate.delete(any()) } throws RedisConnectionFailureException("Network partition") - every { mockTemplate.hasKey(any()) } throws RedisConnectionFailureException("Network partition") + } throws ValkeyConnectionFailureException("Network partition") + every { mockTemplate.delete(any()) } throws ValkeyConnectionFailureException("Network partition") + every { mockTemplate.hasKey(any()) } throws ValkeyConnectionFailureException("Network partition") val partitionedCache = ValkeyDistributedCache(mockTemplate, serializer, config) @@ -235,7 +235,7 @@ class ValkeyDistributedCacheResilienceTest { fun `test reconnection and synchronization after network issues`() { logger.info { "Testing reconnection and synchronization" } - val mockTemplate = mockk>() + val mockTemplate = mockk>() val mockValueOps = mockk>() every { mockTemplate.opsForValue() } returns mockValueOps @@ -243,15 +243,15 @@ class ValkeyDistributedCacheResilienceTest { val reconnectingCache = ValkeyDistributedCache(mockTemplate, serializer, config) // Phase 1: Simulate disconnection - every { mockValueOps.get(any()) } throws RedisConnectionFailureException("Disconnected") + every { mockValueOps.get(any()) } throws ValkeyConnectionFailureException("Disconnected") every { mockValueOps.set( any(), any(), any() ) - } throws RedisConnectionFailureException("Disconnected") - every { mockTemplate.hasKey(any()) } throws RedisConnectionFailureException("Disconnected") + } throws ValkeyConnectionFailureException("Disconnected") + every { mockTemplate.hasKey(any()) } throws ValkeyConnectionFailureException("Disconnected") reconnectingCache.set("reconnect-test-1", "value-1") reconnectingCache.set("reconnect-test-2", "value-2") diff --git a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheTest.kt b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheTest.kt index 3587af70..d1d40e7a 100644 --- a/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheTest.kt +++ b/backend/infrastructure/cache/valkey-cache/src/test/kotlin/at/mocode/infrastructure/cache/valkey/ValkeyDistributedCacheTest.kt @@ -5,15 +5,15 @@ import io.github.oshai.kotlinlogging.KotlinLogging import io.mockk.every import io.mockk.mockk import io.mockk.verify +import io.valkey.springframework.data.valkey.ValkeyConnectionFailureException +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.ValkeyTemplate +import io.valkey.springframework.data.valkey.core.ValueOperations +import io.valkey.springframework.data.valkey.serializer.StringValkeySerializer import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.springframework.data.redis.RedisConnectionFailureException -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.RedisTemplate -import org.springframework.data.redis.core.ValueOperations -import org.springframework.data.redis.serializer.StringRedisSerializer import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -31,14 +31,14 @@ class ValkeyDistributedCacheTest { @Container val valkeyContainer = GenericContainer( - DockerImageName.parse("valkey/valkey:9-alpine") - .asCompatibleSubstituteFor("valkey") + DockerImageName.parse("valkey/valkey:8.0.2-alpine") + .asCompatibleSubstituteFor("redis") ).apply { withExposedPorts(6379) } } - private lateinit var valkeyTemplate: RedisTemplate + private lateinit var valkeyTemplate: ValkeyTemplate private lateinit var serializer: CacheSerializer private lateinit var config: CacheConfiguration private lateinit var cache: ValkeyDistributedCache @@ -48,13 +48,13 @@ class ValkeyDistributedCacheTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = RedisTemplate().apply { + valkeyTemplate = ValkeyTemplate().apply { setConnectionFactory(connectionFactory) - keySerializer = StringRedisSerializer() + keySerializer = StringValkeySerializer() afterPropertiesSet() } @@ -131,7 +131,7 @@ class ValkeyDistributedCacheTest { @Test fun `should handle offline mode and synchronize correctly`() { // Arrange - val mockTemplate = mockk>(relaxed = true) + val mockTemplate = mockk>(relaxed = true) val mockValueOps = mockk>(relaxed = true) every { mockTemplate.opsForValue() } returns mockValueOps @@ -157,10 +157,15 @@ class ValkeyDistributedCacheTest { any(), any() ) - } throws RedisConnectionFailureException("Valkey is down") - every { mockValueOps.set(any(), any()) } throws RedisConnectionFailureException("Valkey is down") + } throws ValkeyConnectionFailureException("Valkey is down") + every { + mockValueOps.set( + any(), + any() + ) + } throws ValkeyConnectionFailureException("Valkey is down") - every { mockTemplate.delete(any()) } throws RedisConnectionFailureException("Valkey is down") + every { mockTemplate.delete(any()) } throws ValkeyConnectionFailureException("Valkey is down") offlineCache.set("key2", "offline-value") offlineCache.delete("key1") @@ -246,13 +251,13 @@ class ValkeyDistributedCacheTest { @Test fun `test handling valkey connection failures`() { // Create a mock ValkeyTemplate and ValueOperations - val mockTemplate = mockk>() + val mockTemplate = mockk>() val mockValueOps = mockk>() // Configure the mock to throw connection failure every { mockTemplate.opsForValue() } returns mockValueOps - every { mockValueOps.get(any()) } throws RedisConnectionFailureException("Test connection failure") - every { mockTemplate.hasKey(any()) } throws RedisConnectionFailureException("Test connection failure") + every { mockValueOps.get(any()) } throws ValkeyConnectionFailureException("Test connection failure") + every { mockTemplate.hasKey(any()) } throws ValkeyConnectionFailureException("Test connection failure") // Create a cache with the mock val mockCache = ValkeyDistributedCache(mockTemplate, serializer, config) diff --git a/backend/infrastructure/cache/valkey-cache/src/test/resources/logback-test.xml b/backend/infrastructure/cache/valkey-cache/src/test/resources/logback-test.xml index f4742ebc..5dfca106 100644 --- a/backend/infrastructure/cache/valkey-cache/src/test/resources/logback-test.xml +++ b/backend/infrastructure/cache/valkey-cache/src/test/resources/logback-test.xml @@ -1,40 +1,40 @@ - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + - - + + - - + + - - + + - - + + - - + + - - + + - - - + + + - - - + + + - - - - + + + + diff --git a/backend/infrastructure/event-store/valkey-event-store/build.gradle.kts b/backend/infrastructure/event-store/valkey-event-store/build.gradle.kts index 2b4c85ac..038db082 100644 --- a/backend/infrastructure/event-store/valkey-event-store/build.gradle.kts +++ b/backend/infrastructure/event-store/valkey-event-store/build.gradle.kts @@ -30,6 +30,11 @@ dependencies { // OPTIMIERUNG: Wiederverwendung des `valkey-cache`-Bundles, da es die // gleichen Technologien (Spring Data Valkey, Lettuce, Jackson) verwendet implementation(libs.bundles.valkey.cache) + // Benötigt für Lettuce-basierten Valkey-Client (LettuceConnectionFactory) + implementation(libs.lettuce.core) + // Für Boot-Autoconfiguration-Annotations (z. B. @ConditionalOnMissingBean, + // @ConfigurationProperties, @EnableConfigurationProperties) + implementation("org.springframework.boot:spring-boot-autoconfigure") // Stellt Jakarta Annotations bereit (z. B. @PostConstruct), die von Spring verwendet werden implementation(libs.jakarta.annotation.api) // Für Kotlin-spezifische Coroutines-Integration mit Spring diff --git a/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventConsumer.kt b/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventConsumer.kt index 777dcada..b1e59582 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventConsumer.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventConsumer.kt @@ -2,12 +2,16 @@ package at.mocode.infrastructure.eventstore.valkey import at.mocode.core.domain.event.DomainEvent import at.mocode.infrastructure.eventstore.api.EventSerializer +import io.valkey.springframework.data.valkey.connection.stream.Consumer +import io.valkey.springframework.data.valkey.connection.stream.MapRecord +import io.valkey.springframework.data.valkey.connection.stream.ReadOffset +import io.valkey.springframework.data.valkey.connection.stream.StreamOffset +import io.valkey.springframework.data.valkey.connection.stream.StreamReadOptions +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate import jakarta.annotation.PostConstruct import jakarta.annotation.PreDestroy import org.slf4j.LoggerFactory import org.springframework.data.domain.Range -import org.springframework.data.redis.connection.stream.* -import org.springframework.data.redis.core.StringRedisTemplate import org.springframework.scheduling.annotation.Scheduled import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList @@ -16,272 +20,272 @@ import java.util.concurrent.CopyOnWriteArrayList * Consumer for Valkey Streams that processes events using consumer groups. */ class ValkeyEventConsumer( - private val valkeyTemplate: StringRedisTemplate, + private val valkeyTemplate: StringValkeyTemplate, private val serializer: EventSerializer, private val properties: ValkeyEventStoreProperties ) { - private val logger = LoggerFactory.getLogger(ValkeyEventConsumer::class.java) - private val eventTypeHandlers = ConcurrentHashMap Unit>>() - private val allEventHandlers = CopyOnWriteArrayList<(DomainEvent) -> Unit>() - private var running = false + private val logger = LoggerFactory.getLogger(ValkeyEventConsumer::class.java) + private val eventTypeHandlers = ConcurrentHashMap Unit>>() + private val allEventHandlers = CopyOnWriteArrayList<(DomainEvent) -> Unit>() + private var running = false - /** - * Initializes the consumer. - */ - @PostConstruct - fun init() { - if (properties.createConsumerGroupIfNotExists) { - createConsumerGroupsIfNotExist() + /** + * Initializes the consumer. + */ + @PostConstruct + fun init() { + if (properties.createConsumerGroupIfNotExists) { + createConsumerGroupsIfNotExist() + } + } + + /** + * Stops the consumer. + */ + @PreDestroy + fun shutdown() { + running = false + } + + /** + * Registers a handler for a specific event type. + * + * @param eventType The type of event to handle + * @param handler The handler to call when an event of the specified type is received + */ + fun registerEventHandler(eventType: String, handler: (DomainEvent) -> Unit) { + eventTypeHandlers.computeIfAbsent(eventType) { CopyOnWriteArrayList() }.add(handler) + logger.debug("Registered handler for event type: $eventType") + } + + /** + * Registers a handler for all events. + * + * @param handler The handler to call when any event is received + */ + fun registerAllEventsHandler(handler: (DomainEvent) -> Unit) { + allEventHandlers.add(handler) + logger.debug("Registered handler for all events") + } + + /** + * Unregisters a handler for a specific event type. + * + * @param eventType The type of event + * @param handler The handler to unregister + */ + fun unregisterEventHandler(eventType: String, handler: (DomainEvent) -> Unit) { + eventTypeHandlers[eventType]?.remove(handler) + logger.debug("Unregistered handler for event type: $eventType") + } + + /** + * Unregisters a handler for all events. + * + * @param handler The handler to unregister + */ + fun unregisterAllEventsHandler(handler: (DomainEvent) -> Unit) { + allEventHandlers.remove(handler) + logger.debug("Unregistered handler for all events") + } + + /** + * Creates consumer groups for all streams if they don't exist. + */ + private fun createConsumerGroupsIfNotExist() { + try { + val allEventsStreamKey = getAllEventsStreamKey() + try { + valkeyTemplate.opsForStream() + .add(allEventsStreamKey, mapOf("init" to "init")) + logger.debug("Ensured all-events stream has messages: $allEventsStreamKey") + } catch (e: Exception) { + logger.debug("All-events stream might already have messages: ${e.message}") + } + + createConsumerGroupIfNotExists(allEventsStreamKey) + + val streamKeys = valkeyTemplate.keys("${properties.streamPrefix}*") + + for (streamKey in streamKeys) { + if (streamKey != allEventsStreamKey) { + createConsumerGroupIfNotExists(streamKey) } + } + } catch (e: Exception) { + logger.error("Error creating consumer groups: ${e.message}", e) + } + } + + /** + * Creates a consumer group for a stream if it doesn't exist. + * + * @param streamKey The key of the stream + */ + private fun createConsumerGroupIfNotExists(streamKey: String) { + try { + try { + valkeyTemplate.opsForStream() + .add(streamKey, mapOf("init" to "init")) + logger.debug("Ensured stream has messages: $streamKey") + } catch (e: Exception) { + logger.debug("Stream $streamKey might already have messages: ${e.message}") + } + + try { + valkeyTemplate.opsForStream() + .createGroup(streamKey, ReadOffset.latest(), properties.consumerGroup) + logger.debug("Created consumer group ${properties.consumerGroup} for stream: $streamKey") + } catch (e: Exception) { + logger.debug("Could not create consumer group ${properties.consumerGroup} for stream: $streamKey: ${e.message}") + } + } catch (e: Exception) { + logger.error("Error creating consumer group for stream $streamKey: ${e.message}", e) + } + } + + /** + * Periodic polls for new events from all streams. + */ + @Scheduled(fixedDelayString = "\${valkey.event-store.poll-interval:100}") + fun pollEvents() { + if (!running) { + running = true } - /** - * Stops the consumer. - */ - @PreDestroy - fun shutdown() { - running = false + try { + pollStream(getAllEventsStreamKey()) + claimPendingMessages() + } catch (e: Exception) { + logger.error("Error polling events: ${e.message}", e) } + } - /** - * Registers a handler for a specific event type. - * - * @param eventType The type of event to handle - * @param handler The handler to call when an event of the specified type is received - */ - fun registerEventHandler(eventType: String, handler: (DomainEvent) -> Unit) { - eventTypeHandlers.computeIfAbsent(eventType) { CopyOnWriteArrayList() }.add(handler) - logger.debug("Registered handler for event type: $eventType") - } + /** + * Polls a stream for new events. + * + * @param streamKey The key of the stream to poll + */ + private fun pollStream(streamKey: String) { + try { + val options = StreamReadOptions.empty() + .count(properties.maxBatchSize.toLong()) + .block(properties.pollTimeout) - /** - * Registers a handler for all events. - * - * @param handler The handler to call when any event is received - */ - fun registerAllEventsHandler(handler: (DomainEvent) -> Unit) { - allEventHandlers.add(handler) - logger.debug("Registered handler for all events") - } + val records = valkeyTemplate.opsForStream() + .read( + Consumer.from(properties.consumerGroup, properties.consumerName), + options, + StreamOffset.create(streamKey, ReadOffset.lastConsumed()) + ) - /** - * Unregisters a handler for a specific event type. - * - * @param eventType The type of event - * @param handler The handler to unregister - */ - fun unregisterEventHandler(eventType: String, handler: (DomainEvent) -> Unit) { - eventTypeHandlers[eventType]?.remove(handler) - logger.debug("Unregistered handler for event type: $eventType") - } - - /** - * Unregisters a handler for all events. - * - * @param handler The handler to unregister - */ - fun unregisterAllEventsHandler(handler: (DomainEvent) -> Unit) { - allEventHandlers.remove(handler) - logger.debug("Unregistered handler for all events") - } - - /** - * Creates consumer groups for all streams if they don't exist. - */ - private fun createConsumerGroupsIfNotExist() { - try { - val allEventsStreamKey = getAllEventsStreamKey() - try { - valkeyTemplate.opsForStream() - .add(allEventsStreamKey, mapOf("init" to "init")) - logger.debug("Ensured all-events stream has messages: $allEventsStreamKey") - } catch (e: Exception) { - logger.debug("All-events stream might already have messages: ${e.message}") - } - - createConsumerGroupIfNotExists(allEventsStreamKey) - - val streamKeys = valkeyTemplate.keys("${properties.streamPrefix}*") - - for (streamKey in streamKeys) { - if (streamKey != allEventsStreamKey) { - createConsumerGroupIfNotExists(streamKey) - } - } - } catch (e: Exception) { - logger.error("Error creating consumer groups: ${e.message}", e) + if (records != null) { + for (record in records) { + processRecord(record) } + } + } catch (e: Exception) { + val message = e.message + if (message == null || !message.contains("NOGROUP")) { + logger.error("Error polling stream $streamKey: ${e.message}", e) + } } + } - /** - * Creates a consumer group for a stream if it doesn't exist. - * - * @param streamKey The key of the stream - */ - private fun createConsumerGroupIfNotExists(streamKey: String) { - try { - try { - valkeyTemplate.opsForStream() - .add(streamKey, mapOf("init" to "init")) - logger.debug("Ensured stream has messages: $streamKey") - } catch (e: Exception) { - logger.debug("Stream $streamKey might already have messages: ${e.message}") - } + /** + * Claims pending messages that have been idle for too long. + */ + private fun claimPendingMessages() { + try { + val streamKey = getAllEventsStreamKey() - try { - valkeyTemplate.opsForStream() - .createGroup(streamKey, ReadOffset.latest(), properties.consumerGroup) - logger.debug("Created consumer group ${properties.consumerGroup} for stream: $streamKey") - } catch (e: Exception) { - logger.debug("Could not create consumer group ${properties.consumerGroup} for stream: $streamKey: ${e.message}") - } - } catch (e: Exception) { - logger.error("Error creating consumer group for stream $streamKey: ${e.message}", e) - } - } + val pendingSummary = valkeyTemplate.opsForStream() + .pending(streamKey, properties.consumerGroup) - /** - * Periodic polls for new events from all streams. - */ - @Scheduled(fixedDelayString = $$"${valkey.event-store.poll-interval:100}") - fun pollEvents() { - if (!running) { - running = true - } + if (pendingSummary != null && pendingSummary.totalPendingMessages > 0) { + val pendingMessages = valkeyTemplate.opsForStream() + .pending( + streamKey, + Consumer.from(properties.consumerGroup, properties.consumerName), + Range.unbounded(), + properties.maxBatchSize.toLong() + ) - try { - pollStream(getAllEventsStreamKey()) - claimPendingMessages() - } catch (e: Exception) { - logger.error("Error polling events: ${e.message}", e) - } - } + if (pendingMessages.size() > 0) { + val messageIdsList = pendingMessages.map { it.id }.toList() - /** - * Polls a stream for new events. - * - * @param streamKey The key of the stream to poll - */ - private fun pollStream(streamKey: String) { - try { - val options = StreamReadOptions.empty() - .count(properties.maxBatchSize.toLong()) - .block(properties.pollTimeout) + if (messageIdsList.isNotEmpty()) { + val messageIds = messageIdsList.toTypedArray() val records = valkeyTemplate.opsForStream() - .read( - Consumer.from(properties.consumerGroup, properties.consumerName), - options, - StreamOffset.create(streamKey, ReadOffset.lastConsumed()) - ) + .claim( + streamKey, + properties.consumerGroup, + properties.consumerName, + properties.claimIdleTimeout, + *messageIds + ) - if (records != null) { - for (record in records) { - processRecord(record) - } - } - } catch (e: Exception) { - val message = e.message - if (message == null || !message.contains("NOGROUP")) { - logger.error("Error polling stream $streamKey: ${e.message}", e) + for (record in records) { + processRecord(record) } + } } + } + } catch (e: Exception) { + logger.error("Error claiming pending messages: ${e.message}", e) } + } - /** - * Claims pending messages that have been idle for too long. - */ - private fun claimPendingMessages() { + /** + * Processes a record from a stream. + * + * @param record The record to process + */ + private fun processRecord(record: MapRecord) { + try { + val data = record.value + + if (data.size == 1 && data.containsKey("init") && data["init"] == "init") { + logger.debug("Skipping init message") + valkeyTemplate.opsForStream() + .acknowledge(properties.consumerGroup, record) + return + } + + val event = serializer.deserialize(data) + val eventType = serializer.getEventType(data) + + eventTypeHandlers[eventType]?.forEach { handler -> try { - val streamKey = getAllEventsStreamKey() - - val pendingSummary = valkeyTemplate.opsForStream() - .pending(streamKey, properties.consumerGroup) - - if (pendingSummary != null && pendingSummary.totalPendingMessages > 0) { - val pendingMessages = valkeyTemplate.opsForStream() - .pending( - streamKey, - Consumer.from(properties.consumerGroup, properties.consumerName), - Range.unbounded(), - properties.maxBatchSize.toLong() - ) - - if (pendingMessages.size() > 0) { - val messageIdsList = pendingMessages.map { it.id }.toList() - - if (messageIdsList.isNotEmpty()) { - val messageIds = messageIdsList.toTypedArray() - - val records = valkeyTemplate.opsForStream() - .claim( - streamKey, - properties.consumerGroup, - properties.consumerName, - properties.claimIdleTimeout, - *messageIds - ) - - for (record in records) { - processRecord(record) - } - } - } - } + handler(event) } catch (e: Exception) { - logger.error("Error claiming pending messages: ${e.message}", e) + logger.error("Error handling event of type $eventType: ${e.message}", e) } - } + } - /** - * Processes a record from a stream. - * - * @param record The record to process - */ - private fun processRecord(record: MapRecord) { + allEventHandlers.forEach { handler -> try { - val data = record.value - - if (data.size == 1 && data.containsKey("init") && data["init"] == "init") { - logger.debug("Skipping init message") - valkeyTemplate.opsForStream() - .acknowledge(properties.consumerGroup, record) - return - } - - val event = serializer.deserialize(data) - val eventType = serializer.getEventType(data) - - eventTypeHandlers[eventType]?.forEach { handler -> - try { - handler(event) - } catch (e: Exception) { - logger.error("Error handling event of type $eventType: ${e.message}", e) - } - } - - allEventHandlers.forEach { handler -> - try { - handler(event) - } catch (e: Exception) { - logger.error("Error handling event: ${e.message}", e) - } - } - - valkeyTemplate.opsForStream() - .acknowledge(properties.consumerGroup, record) - + handler(event) } catch (e: Exception) { - logger.error("Error processing record: ${e.message}", e) + logger.error("Error handling event: ${e.message}", e) } - } + } - /** - * Gets the Valkey key for the all-events stream. - * - * @return The Valkey key for the all-events stream - */ - private fun getAllEventsStreamKey(): String { - return "${properties.streamPrefix}${properties.allEventsStream}" + valkeyTemplate.opsForStream() + .acknowledge(properties.consumerGroup, record) + + } catch (e: Exception) { + logger.error("Error processing record: ${e.message}", e) } + } + + /** + * Gets the Valkey key for the all-events stream. + * + * @return The Valkey key for the all-events stream + */ + private fun getAllEventsStreamKey(): String { + return "${properties.streamPrefix}${properties.allEventsStream}" + } } diff --git a/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStore.kt b/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStore.kt index 6b1fff10..39ae176a 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStore.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStore.kt @@ -8,16 +8,17 @@ import at.mocode.infrastructure.eventstore.api.ConcurrencyException import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventStore import at.mocode.infrastructure.eventstore.api.Subscription +import io.valkey.springframework.data.valkey.core.SessionCallback +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate +import io.valkey.springframework.data.valkey.core.ValkeyOperations import org.slf4j.LoggerFactory import org.springframework.dao.DataAccessException import org.springframework.data.domain.Range -import org.springframework.data.redis.core.SessionCallback -import org.springframework.data.redis.core.StringRedisTemplate import java.util.concurrent.ConcurrentHashMap import kotlin.uuid.Uuid class ValkeyEventStore( - private val valkeyTemplate: StringRedisTemplate, + private val valkeyTemplate: StringValkeyTemplate, private val serializer: EventSerializer, private val properties: ValkeyEventStoreProperties ) : EventStore { @@ -131,8 +132,8 @@ class ValkeyEventStore( try { valkeyTemplate.execute(object : SessionCallback> { @Throws(DataAccessException::class) - override fun execute(operations: org.springframework.data.redis.core.RedisOperations): List { - val streamOps = (operations as StringRedisTemplate).opsForStream() + override fun execute(operations: ValkeyOperations): List { + val streamOps = (operations as StringValkeyTemplate).opsForStream() operations.multi() @@ -178,8 +179,8 @@ class ValkeyEventStore( try { valkeyTemplate.execute(object : SessionCallback> { @Throws(DataAccessException::class) - override fun execute(operations: org.springframework.data.redis.core.RedisOperations): List { - val streamOps = (operations as StringRedisTemplate).opsForStream() + override fun execute(operations: ValkeyOperations): List { + val streamOps = (operations as StringValkeyTemplate).opsForStream() operations.multi() streamOps.add(streamKey, eventData) diff --git a/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreConfiguration.kt b/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreConfiguration.kt index a3dd114c..1fb857d0 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreConfiguration.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/main/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreConfiguration.kt @@ -2,16 +2,17 @@ package at.mocode.infrastructure.eventstore.valkey import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventStore -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import io.valkey.springframework.data.valkey.connection.ValkeyConnectionFactory +import io.valkey.springframework.data.valkey.connection.ValkeyPassword +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.data.redis.connection.RedisConnectionFactory -import org.springframework.data.redis.connection.RedisPassword -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.StringRedisTemplate import java.time.Duration /** @@ -53,11 +54,11 @@ class ValkeyEventStoreConfiguration { */ @Bean @ConditionalOnMissingBean(name = ["eventStoreValkeyConnectionFactory"]) - fun eventStoreValkeyConnectionFactory(properties: ValkeyEventStoreProperties): RedisConnectionFactory { - val config = RedisStandaloneConfiguration().apply { + fun eventStoreValkeyConnectionFactory(properties: ValkeyEventStoreProperties): ValkeyConnectionFactory { + val config = ValkeyStandaloneConfiguration().apply { hostName = properties.host port = properties.port - properties.password?.let { password = RedisPassword.of(it) } + properties.password?.let { password = ValkeyPassword.of(it) } database = properties.database } @@ -76,10 +77,10 @@ class ValkeyEventStoreConfiguration { @Bean @ConditionalOnMissingBean(name = ["eventStoreValkeyTemplate"]) fun eventStoreValkeyTemplate( - @org.springframework.beans.factory.annotation.Qualifier("eventStoreValkeyConnectionFactory") - connectionFactory: RedisConnectionFactory - ): StringRedisTemplate { - return StringRedisTemplate().apply { + @Qualifier("eventStoreValkeyConnectionFactory") + connectionFactory: ValkeyConnectionFactory + ): StringValkeyTemplate { + return StringValkeyTemplate().apply { setConnectionFactory(connectionFactory) afterPropertiesSet() } @@ -107,8 +108,8 @@ class ValkeyEventStoreConfiguration { @Bean @ConditionalOnMissingBean fun eventStore( - @org.springframework.beans.factory.annotation.Qualifier("eventStoreValkeyTemplate") - valkeyTemplate: StringRedisTemplate, + @Qualifier("eventStoreValkeyTemplate") + valkeyTemplate: StringValkeyTemplate, eventSerializer: EventSerializer, properties: ValkeyEventStoreProperties ): EventStore { @@ -126,8 +127,8 @@ class ValkeyEventStoreConfiguration { @Bean @ConditionalOnMissingBean fun eventConsumer( - @org.springframework.beans.factory.annotation.Qualifier("eventStoreValkeyTemplate") - valkeyTemplate: StringRedisTemplate, + @Qualifier("eventStoreValkeyTemplate") + valkeyTemplate: StringValkeyTemplate, eventSerializer: EventSerializer, properties: ValkeyEventStoreProperties ): ValkeyEventConsumer { diff --git a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyCacheAndEventStoreIntegrationTest.kt b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyCacheAndEventStoreIntegrationTest.kt index da044d0c..083919f5 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyCacheAndEventStoreIntegrationTest.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyCacheAndEventStoreIntegrationTest.kt @@ -8,6 +8,8 @@ import at.mocode.infrastructure.cache.valkey.JacksonCacheSerializer import at.mocode.infrastructure.cache.valkey.ValkeyConfiguration import at.mocode.infrastructure.cache.valkey.ValkeyDistributedCache import at.mocode.infrastructure.eventstore.api.EventStore +import io.valkey.springframework.data.valkey.connection.ValkeyConnectionFactory +import io.valkey.springframework.data.valkey.core.ValkeyTemplate import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Assertions.assertEquals @@ -20,8 +22,6 @@ import org.springframework.boot.test.context.SpringBootTest import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import -import org.springframework.data.redis.connection.RedisConnectionFactory -import org.springframework.data.redis.core.RedisTemplate import org.springframework.test.context.DynamicPropertyRegistry import org.springframework.test.context.DynamicPropertySource import org.testcontainers.containers.GenericContainer @@ -96,7 +96,7 @@ class ValkeyCacheAndEventStoreIntegrationTest { class TestConfig { @Bean fun distributedCache( - @Qualifier("valkeyTemplate") valkeyTemplate: RedisTemplate, + @Qualifier("valkeyTemplate") valkeyTemplate: ValkeyTemplate, cacheConfiguration: CacheConfiguration ): DistributedCache { return ValkeyDistributedCache( @@ -116,11 +116,11 @@ class ValkeyCacheAndEventStoreIntegrationTest { // Verify separate ConnectionFactories @Autowired @Qualifier("valkeyConnectionFactory") - private lateinit var cacheConnectionFactory: RedisConnectionFactory + private lateinit var cacheConnectionFactory: ValkeyConnectionFactory @Autowired @Qualifier("eventStoreValkeyConnectionFactory") - private lateinit var eventStoreConnectionFactory: RedisConnectionFactory + private lateinit var eventStoreConnectionFactory: ValkeyConnectionFactory @Test fun `test both modules can be used simultaneously without conflicts`(): Unit = runBlocking { diff --git a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventConsumerResilienceTest.kt b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventConsumerResilienceTest.kt index cee8a4e7..173fef85 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventConsumerResilienceTest.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventConsumerResilienceTest.kt @@ -6,6 +6,9 @@ import at.mocode.core.domain.model.AggregateId import at.mocode.core.domain.model.EventType import at.mocode.core.domain.model.EventVersion import at.mocode.infrastructure.eventstore.api.EventSerializer +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate import kotlinx.serialization.Serializable import kotlinx.serialization.Transient import org.junit.jupiter.api.AfterEach @@ -14,9 +17,6 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.slf4j.LoggerFactory -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.StringRedisTemplate import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -35,11 +35,11 @@ class ValkeyEventConsumerResilienceTest { companion object { @Container - val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine")) + val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine")) .withExposedPorts(6379) } - private lateinit var valkeyTemplate: StringRedisTemplate + private lateinit var valkeyTemplate: StringValkeyTemplate private lateinit var serializer: EventSerializer private lateinit var properties: ValkeyEventStoreProperties private lateinit var eventStore: ValkeyEventStore @@ -51,11 +51,11 @@ class ValkeyEventConsumerResilienceTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = StringRedisTemplate(connectionFactory) + valkeyTemplate = StringValkeyTemplate(connectionFactory) serializer = JacksonEventSerializer().apply { registerEventType(ResilienceTestEvent::class.java, "ResilienceTestEvent") diff --git a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreConfigurationTest.kt b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreConfigurationTest.kt index 63c0b02c..05242edf 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreConfigurationTest.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreConfigurationTest.kt @@ -2,6 +2,8 @@ package at.mocode.infrastructure.eventstore.valkey import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventStore +import io.valkey.springframework.data.valkey.connection.ValkeyConnectionFactory +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -11,8 +13,6 @@ import org.springframework.boot.autoconfigure.AutoConfigurations import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.test.context.runner.ApplicationContextRunner import org.springframework.context.annotation.Configuration -import org.springframework.data.redis.connection.RedisConnectionFactory -import org.springframework.data.redis.core.StringRedisTemplate import java.time.Duration /** @@ -71,8 +71,8 @@ class ValkeyEventStoreConfigurationTest { assertTrue(context.containsBean("eventConsumer")) // Verify bean types - assertNotNull(context.getBean("eventStoreValkeyConnectionFactory")) - assertNotNull(context.getBean("eventStoreValkeyTemplate")) + assertNotNull(context.getBean("eventStoreValkeyConnectionFactory")) + assertNotNull(context.getBean("eventStoreValkeyTemplate")) assertNotNull(context.getBean("eventSerializer")) assertNotNull(context.getBean("eventStore")) assertNotNull(context.getBean("eventConsumer")) @@ -160,7 +160,7 @@ class ValkeyEventStoreConfigurationTest { "valkey.event-store.database=1" ) .run { context -> - val connectionFactory = context.getBean("eventStoreValkeyConnectionFactory") + val connectionFactory = context.getBean("eventStoreValkeyConnectionFactory") assertNotNull(connectionFactory) // Verify the connection factory is properly configured @@ -176,7 +176,7 @@ class ValkeyEventStoreConfigurationTest { fun `should handle Valkey template creation correctly`() { contextRunner .run { context -> - val valkeyTemplate = context.getBean("eventStoreValkeyTemplate") + val valkeyTemplate = context.getBean("eventStoreValkeyTemplate") assertNotNull(valkeyTemplate) // Verify the template is properly set up @@ -211,7 +211,7 @@ class ValkeyEventStoreConfigurationTest { assertTrue(eventStore is ValkeyEventStore) // Verify dependencies are wired correctly - val valkeyTemplate = context.getBean("eventStoreValkeyTemplate") + val valkeyTemplate = context.getBean("eventStoreValkeyTemplate") val eventSerializer = context.getBean("eventSerializer") val properties = context.getBean() @@ -231,7 +231,7 @@ class ValkeyEventStoreConfigurationTest { assertNotNull(eventConsumer) // Verify dependencies are available - val valkeyTemplate = context.getBean("eventStoreValkeyTemplate") + val valkeyTemplate = context.getBean("eventStoreValkeyTemplate") val eventSerializer = context.getBean("eventSerializer") val properties = context.getBean() diff --git a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreErrorHandlingTest.kt b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreErrorHandlingTest.kt index d9009290..c9a1f86a 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreErrorHandlingTest.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreErrorHandlingTest.kt @@ -6,6 +6,9 @@ import at.mocode.core.domain.model.EventType import at.mocode.core.domain.model.EventVersion import at.mocode.infrastructure.eventstore.api.ConcurrencyException import at.mocode.infrastructure.eventstore.api.EventSerializer +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate import kotlinx.serialization.Serializable import kotlinx.serialization.Transient import org.junit.jupiter.api.AfterEach @@ -13,9 +16,6 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.StringRedisTemplate import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -32,11 +32,11 @@ class ValkeyEventStoreErrorHandlingTest { companion object { @Container - val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine")) + val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine")) .withExposedPorts(6379) } - private lateinit var valkeyTemplate: StringRedisTemplate + private lateinit var valkeyTemplate: StringValkeyTemplate private lateinit var serializer: EventSerializer private lateinit var properties: ValkeyEventStoreProperties private lateinit var eventStore: ValkeyEventStore @@ -46,11 +46,11 @@ class ValkeyEventStoreErrorHandlingTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = StringRedisTemplate(connectionFactory) + valkeyTemplate = StringValkeyTemplate(connectionFactory) serializer = JacksonEventSerializer().apply { registerEventType(TestErrorEvent::class.java, "TestErrorEvent") diff --git a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreIntegrationTest.kt b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreIntegrationTest.kt index f3bb3dc7..680439c7 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreIntegrationTest.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreIntegrationTest.kt @@ -7,14 +7,14 @@ import at.mocode.core.domain.event.DomainEvent import at.mocode.core.domain.model.* import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventStore +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.StringRedisTemplate import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -30,11 +30,11 @@ class ValkeyEventStoreIntegrationTest { companion object { @Container - val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine")) + val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine")) .withExposedPorts(6379) } - private lateinit var valkeyTemplate: StringRedisTemplate + private lateinit var valkeyTemplate: StringValkeyTemplate private lateinit var serializer: EventSerializer private lateinit var properties: ValkeyEventStoreProperties private lateinit var eventStore: EventStore @@ -45,11 +45,11 @@ class ValkeyEventStoreIntegrationTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = StringRedisTemplate(connectionFactory) + valkeyTemplate = StringValkeyTemplate(connectionFactory) serializer = JacksonEventSerializer().apply { registerEventType(TestCreatedEvent::class.java, "TestCreated") diff --git a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreStreamTest.kt b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreStreamTest.kt index c8e85ad9..7fba5c86 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreStreamTest.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreStreamTest.kt @@ -5,6 +5,9 @@ import at.mocode.core.domain.model.AggregateId import at.mocode.core.domain.model.EventType import at.mocode.core.domain.model.EventVersion import at.mocode.infrastructure.eventstore.api.EventSerializer +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate import kotlinx.serialization.Serializable import kotlinx.serialization.Transient import org.junit.jupiter.api.AfterEach @@ -12,9 +15,6 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.slf4j.LoggerFactory -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.StringRedisTemplate import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -31,11 +31,11 @@ class ValkeyEventStoreStreamTest { companion object { @Container - val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine")) + val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine")) .withExposedPorts(6379) } - private lateinit var valkeyTemplate: StringRedisTemplate + private lateinit var valkeyTemplate: StringValkeyTemplate private lateinit var serializer: EventSerializer private lateinit var properties: ValkeyEventStoreProperties private lateinit var eventStore: ValkeyEventStore @@ -45,11 +45,11 @@ class ValkeyEventStoreStreamTest { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = StringRedisTemplate(connectionFactory) + valkeyTemplate = StringValkeyTemplate(connectionFactory) serializer = JacksonEventSerializer().apply { registerEventType(StreamTestEvent::class.java, "StreamTestEvent") diff --git a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreTest.kt b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreTest.kt index 298ef2f4..c99a77e6 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreTest.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyEventStoreTest.kt @@ -6,6 +6,9 @@ import at.mocode.core.domain.model.EventType import at.mocode.core.domain.model.EventVersion import at.mocode.infrastructure.eventstore.api.ConcurrencyException import at.mocode.infrastructure.eventstore.api.EventSerializer +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate import kotlinx.serialization.Serializable import kotlinx.serialization.Transient import org.junit.jupiter.api.AfterEach @@ -13,9 +16,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.StringRedisTemplate import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -25,93 +25,93 @@ import kotlin.uuid.Uuid @Testcontainers class ValkeyEventStoreTest { - companion object { - @Container - val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine")) - .withExposedPorts(6379) + companion object { + @Container + val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine")) + .withExposedPorts(6379) + } + + private lateinit var valkeyTemplate: StringValkeyTemplate + private lateinit var serializer: EventSerializer + private lateinit var properties: ValkeyEventStoreProperties + private lateinit var eventStore: ValkeyEventStore + + @BeforeEach + fun setUp() { + val valkeyPort = valkeyContainer.getMappedPort(6379) + val valkeyHost = valkeyContainer.host + + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) + val connectionFactory = LettuceConnectionFactory(valkeyConfig) + connectionFactory.afterPropertiesSet() + + valkeyTemplate = StringValkeyTemplate(connectionFactory) + + serializer = JacksonEventSerializer().apply { + registerEventType(TestCreatedEvent::class.java, "TestCreated") + registerEventType(TestUpdatedEvent::class.java, "TestUpdated") } - private lateinit var valkeyTemplate: StringRedisTemplate - private lateinit var serializer: EventSerializer - private lateinit var properties: ValkeyEventStoreProperties - private lateinit var eventStore: ValkeyEventStore - - @BeforeEach - fun setUp() { - val valkeyPort = valkeyContainer.getMappedPort(6379) - val valkeyHost = valkeyContainer.host - - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) - val connectionFactory = LettuceConnectionFactory(valkeyConfig) - connectionFactory.afterPropertiesSet() - - valkeyTemplate = StringRedisTemplate(connectionFactory) - - serializer = JacksonEventSerializer().apply { - registerEventType(TestCreatedEvent::class.java, "TestCreated") - registerEventType(TestUpdatedEvent::class.java, "TestUpdated") - } - - properties = ValkeyEventStoreProperties().apply { - streamPrefix = "test-stream:" - } - eventStore = ValkeyEventStore(valkeyTemplate, serializer, properties) - cleanupValkey() + properties = ValkeyEventStoreProperties().apply { + streamPrefix = "test-stream:" } + eventStore = ValkeyEventStore(valkeyTemplate, serializer, properties) + cleanupValkey() + } - @AfterEach - fun tearDown() = cleanupValkey() + @AfterEach + fun tearDown() = cleanupValkey() - private fun cleanupValkey() { - val keys = valkeyTemplate.keys("${properties.streamPrefix}*") - if (!keys.isNullOrEmpty()) { - valkeyTemplate.delete(keys) - } + private fun cleanupValkey() { + val keys = valkeyTemplate.keys("${properties.streamPrefix}*") + if (!keys.isNullOrEmpty()) { + valkeyTemplate.delete(keys) } + } - @Test - fun `append and read events should work correctly for new stream`() { - val aggregateId = Uuid.random() - val event1 = TestCreatedEvent(AggregateId(aggregateId), EventVersion(1L), "Test Entity") - val event2 = TestUpdatedEvent(AggregateId(aggregateId), EventVersion(2L), "Updated Test Entity") + @Test + fun `append and read events should work correctly for new stream`() { + val aggregateId = Uuid.random() + val event1 = TestCreatedEvent(AggregateId(aggregateId), EventVersion(1L), "Test Entity") + val event2 = TestUpdatedEvent(AggregateId(aggregateId), EventVersion(2L), "Updated Test Entity") - eventStore.appendToStream(listOf(event1, event2), aggregateId, 0) + eventStore.appendToStream(listOf(event1, event2), aggregateId, 0) - val events = eventStore.readFromStream(aggregateId) - assertEquals(2, events.size) + val events = eventStore.readFromStream(aggregateId) + assertEquals(2, events.size) - val firstEvent = events[0] as TestCreatedEvent - assertEquals(EventVersion(1L), firstEvent.version) - assertEquals("Test Entity", firstEvent.name) + val firstEvent = events[0] as TestCreatedEvent + assertEquals(EventVersion(1L), firstEvent.version) + assertEquals("Test Entity", firstEvent.name) - val secondEvent = events[1] as TestUpdatedEvent - assertEquals(EventVersion(2L), secondEvent.version) - assertEquals("Updated Test Entity", secondEvent.name) + val secondEvent = events[1] as TestUpdatedEvent + assertEquals(EventVersion(2L), secondEvent.version) + assertEquals("Updated Test Entity", secondEvent.name) + } + + @Test + fun `appending with wrong expected version should throw ConcurrencyException`() { + val aggregateId = Uuid.random() + val event1 = TestCreatedEvent(AggregateId(aggregateId), EventVersion(1L), "Test Entity") + eventStore.appendToStream(listOf(event1), aggregateId, 0) // Stream is now at version 1 + + val event2 = TestUpdatedEvent(AggregateId(aggregateId), EventVersion(2L), "Updated Test Entity") + assertThrows { + eventStore.appendToStream(listOf(event2), aggregateId, 0) } + } - @Test - fun `appending with wrong expected version should throw ConcurrencyException`() { - val aggregateId = Uuid.random() - val event1 = TestCreatedEvent(AggregateId(aggregateId), EventVersion(1L), "Test Entity") - eventStore.appendToStream(listOf(event1), aggregateId, 0) // Stream is now at version 1 + @Serializable + data class TestCreatedEvent( + @Transient override val aggregateId: AggregateId = AggregateId(Uuid.random()), + @Transient override val version: EventVersion = EventVersion(0), + val name: String + ) : BaseDomainEvent(aggregateId, EventType("TestCreated"), version) - val event2 = TestUpdatedEvent(AggregateId(aggregateId), EventVersion(2L), "Updated Test Entity") - assertThrows { - eventStore.appendToStream(listOf(event2), aggregateId, 0) - } - } - - @Serializable - data class TestCreatedEvent( - @Transient override val aggregateId: AggregateId = AggregateId(Uuid.random()), - @Transient override val version: EventVersion = EventVersion(0), - val name: String - ) : BaseDomainEvent(aggregateId, EventType("TestCreated"), version) - - @Serializable - data class TestUpdatedEvent( - @Transient override val aggregateId: AggregateId = AggregateId(Uuid.random()), - @Transient override val version: EventVersion = EventVersion(0), - val name: String - ) : BaseDomainEvent(aggregateId, EventType("TestUpdated"), version) + @Serializable + data class TestUpdatedEvent( + @Transient override val aggregateId: AggregateId = AggregateId(Uuid.random()), + @Transient override val version: EventVersion = EventVersion(0), + val name: String + ) : BaseDomainEvent(aggregateId, EventType("TestUpdated"), version) } diff --git a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyIntegrationTest.kt b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyIntegrationTest.kt index e292f850..7786407b 100644 --- a/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyIntegrationTest.kt +++ b/backend/infrastructure/event-store/valkey-event-store/src/test/kotlin/at/mocode/infrastructure/eventstore/valkey/ValkeyIntegrationTest.kt @@ -9,15 +9,15 @@ import at.mocode.core.domain.model.EventType import at.mocode.core.domain.model.EventVersion import at.mocode.infrastructure.eventstore.api.EventSerializer import at.mocode.infrastructure.eventstore.api.EventStore +import io.valkey.springframework.data.valkey.connection.ValkeyStandaloneConfiguration +import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory +import io.valkey.springframework.data.valkey.core.StringValkeyTemplate import kotlinx.serialization.Serializable import kotlinx.serialization.Transient import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.springframework.data.redis.connection.RedisStandaloneConfiguration -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory -import org.springframework.data.redis.core.StringRedisTemplate import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers @@ -29,11 +29,11 @@ class ValkeyIntegrationTest { companion object { @Container - val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:9-alpine")) + val valkeyContainer: GenericContainer<*> = GenericContainer(DockerImageName.parse("valkey/valkey:8.0.2-alpine")) .withExposedPorts(6379) } - private lateinit var valkeyTemplate: StringRedisTemplate + private lateinit var valkeyTemplate: StringValkeyTemplate private lateinit var serializer: EventSerializer private lateinit var properties: ValkeyEventStoreProperties private lateinit var eventStore: EventStore @@ -43,10 +43,10 @@ class ValkeyIntegrationTest { fun setUp() { val valkeyPort = valkeyContainer.getMappedPort(6379) val valkeyHost = valkeyContainer.host - val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort) + val valkeyConfig = ValkeyStandaloneConfiguration(valkeyHost, valkeyPort) val connectionFactory = LettuceConnectionFactory(valkeyConfig) connectionFactory.afterPropertiesSet() - valkeyTemplate = StringRedisTemplate(connectionFactory) + valkeyTemplate = StringValkeyTemplate(connectionFactory) serializer = JacksonEventSerializer().apply { registerEventType(TestCreatedEvent::class.java, "TestCreated") registerEventType(TestUpdatedEvent::class.java, "TestUpdated") diff --git a/backend/infrastructure/gateway/build.gradle.kts b/backend/infrastructure/gateway/build.gradle.kts index 8340f46e..1644f599 100644 --- a/backend/infrastructure/gateway/build.gradle.kts +++ b/backend/infrastructure/gateway/build.gradle.kts @@ -31,7 +31,7 @@ dependencies { // Resilience (Reactive) - WICHTIG: Reactor-Variante für WebFlux! implementation(libs.spring.cloud.starter.circuitbreaker.reactor.resilience4j) - implementation(libs.spring.boot.starter.data.redis) + implementation(libs.spring.data.valkey) implementation(libs.micrometer.tracing.bridge.brave) testImplementation(projects.platform.platformTesting) diff --git a/backend/infrastructure/messaging/messaging-client/build.gradle.kts b/backend/infrastructure/messaging/messaging-client/build.gradle.kts index 60f96e45..afda70cc 100644 --- a/backend/infrastructure/messaging/messaging-client/build.gradle.kts +++ b/backend/infrastructure/messaging/messaging-client/build.gradle.kts @@ -1,43 +1,49 @@ // Dieses Modul stellt High-Level-Clients (Producer/Consumer) für die // Interaktion mit Apache Kafka bereit. Es baut auf der `messaging-config` auf. plugins { - alias(libs.plugins.kotlinJvm) - alias(libs.plugins.kotlinSpring) - alias(libs.plugins.spring.boot) - alias(libs.plugins.spring.dependencyManagement) + alias(libs.plugins.kotlinJvm) + alias(libs.plugins.kotlinSpring) + alias(libs.plugins.spring.boot) + alias(libs.plugins.spring.dependencyManagement) } // Deaktiviert die Erstellung eines ausführbaren Jars für dieses Bibliotheks-Modul. tasks.bootJar { - enabled = false + enabled = false } // Stellt sicher, dass stattdessen ein reguläres Jar gebaut wird tasks.jar { - enabled = true + enabled = true } dependencies { - // Stellt sicher, dass alle Versionen aus der zentralen BOM kommen. - implementation(platform(projects.platform.platformBom)) - // Stellt gemeinsame Abhängigkeiten bereit. - implementation(projects.platform.platformDependencies) + // Stellt sicher, dass alle Versionen aus der zentralen BOM kommen. + implementation(platform(projects.platform.platformBom)) + // Stellt gemeinsame Abhängigkeiten bereit. + implementation(projects.platform.platformDependencies) - // Spring Boot / Spring Framework APIs used directly in this module - // (e.g. ConditionalOnMissingBean) - implementation("org.springframework.boot:spring-boot-autoconfigure") - implementation("org.springframework.boot:spring-boot") + // Spring Boot / Spring Framework APIs used directly in this module + // (e.g. ConditionalOnMissingBean) + implementation("org.springframework.boot:spring-boot-autoconfigure") + implementation("org.springframework.boot:spring-boot") - // Spring Kafka (ReactiveKafkaProducerTemplate, etc.) - implementation(libs.spring.kafka) + // Spring Kafka (ReactiveKafkaProducerTemplate, etc.) + implementation(libs.spring.kafka) - // Jakarta annotations used by Spring / configuration classes - implementation(libs.jakarta.annotation.api) + // Jakarta annotations used by Spring / configuration classes + implementation(libs.jakarta.annotation.api) - // Baut auf der zentralen Kafka-Konfiguration auf und erbt deren Abhängigkeiten. - implementation(projects.backend.infrastructure.messaging.messagingConfig) - // Fügt die reaktive Kafka-Implementierung hinzu (Project Reactor). - implementation(libs.reactor.kafka) - // Stellt alle Test-Abhängigkeiten gebündelt bereit. - testImplementation(projects.platform.platformTesting) + // Baut auf der zentralen Kafka-Konfiguration auf und erbt deren Abhängigkeiten. + implementation(projects.backend.infrastructure.messaging.messagingConfig) + // Fügt die reaktive Kafka-Implementierung hinzu (Project Reactor). + implementation(libs.reactor.kafka) + // Stellt alle Test-Abhängigkeiten gebündelt bereit. + testImplementation(projects.platform.platformTesting) +} + +// JVM Native Access (JDK 22+): Unterdrückt Warnung/Blockade für Snappy (org.xerial.snappy) +tasks.withType().configureEach { + // Erfordert JDK 21+; ab zukünftigen Versionen sonst Fehler statt Warnung + jvmArgs("--enable-native-access=ALL-UNNAMED") } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index af7081be..0c6a5a93 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -59,6 +59,7 @@ flyway = "11.19.1" redisson = "4.0.0" # Spring Boot 3.5.x manages Lettuce 6.6.x; keep aligned to avoid binary/API mismatches. lettuce = "6.6.0.RELEASE" +springDataValkey = "0.2.0" # Observability micrometer = "1.16.1" @@ -229,6 +230,7 @@ flyway-postgresql = { module = "org.flywaydb:flyway-database-postgresql", versio redisson = { module = "org.redisson:redisson", version.ref = "redisson" } lettuce-core = { module = "io.lettuce:lettuce-core", version.ref = "lettuce" } +spring-data-valkey = { module = "io.valkey.springframework.data:spring-data-valkey", version.ref = "springDataValkey" } micrometer-prometheus = { module = "io.micrometer:micrometer-registry-prometheus", version.ref = "micrometer" } micrometer-tracing-bridge-brave = { module = "io.micrometer:micrometer-tracing-bridge-brave", version.ref = "micrometerTracing" } @@ -342,8 +344,7 @@ database-complete = [ "flyway-postgresql" ] valkey-cache = [ - "spring-boot-starter-data-redis", - "lettuce-core", + "spring-data-valkey", "jackson-module-kotlin", "jackson-datatype-jsr310" ]