refactor: migrate Redis cache implementation to Valkey with enhanced configurability

Replaced Redis with Valkey as the caching backend across infrastructure and application modules. Updated configurations, templates, and health checks to reflect Valkey-specific parameters. Improved compatibility with enhanced configurability, including max memory and memory eviction policy settings.
This commit is contained in:
2026-02-12 15:19:56 +01:00
parent 473709c62d
commit 523c1fef0b
23 changed files with 2167 additions and 2135 deletions
@@ -1,387 +0,0 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.CacheSerializer
import at.mocode.infrastructure.cache.api.DefaultCacheConfiguration
import at.mocode.infrastructure.cache.api.get
import at.mocode.infrastructure.cache.api.multiGet
import io.github.oshai.kotlinlogging.KotlinLogging
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
/**
* Configuration Tests for RedisDistributedCache
*/
@OptIn(ExperimentalTime::class)
@Testcontainers
class RedisDistributedCacheConfigurationTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val redisContainer = GenericContainer<Nothing>(
DockerImageName.parse("redis:7-alpine")
.asCompatibleSubstituteFor("redis")
).apply {
withExposedPorts(6379)
}
}
private lateinit var redisTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
}
@Test
fun `test different cache configurations`() {
logger.info { "Testing different cache configurations" }
// Configuration 1: High performance, short TTL
val performanceConfig = DefaultCacheConfiguration(
keyPrefix = "perf",
defaultTtl = 5.minutes,
localCacheMaxSize = 50000,
offlineModeEnabled = true,
synchronizationInterval = 30.seconds,
offlineEntryMaxAge = 1.hours,
compressionEnabled = false,
compressionThreshold = Int.MAX_VALUE
)
val performanceCache = RedisDistributedCache(redisTemplate, serializer, performanceConfig)
performanceCache.clear()
// Test performance config
performanceCache.set("perf-test", "performance-value")
assertEquals("performance-value", performanceCache.get<String>("perf-test"))
assertTrue(performanceCache.exists("perf-test"))
logger.info { "Performance configuration works correctly" }
// Configuration 2: Storage optimized, long TTL, compression enabled
val storageConfig = DefaultCacheConfiguration(
keyPrefix = "storage",
defaultTtl = 7.days,
localCacheMaxSize = 1000,
offlineModeEnabled = true,
synchronizationInterval = 5.minutes,
offlineEntryMaxAge = 24.hours,
compressionEnabled = true,
compressionThreshold = 100
)
val storageCache = RedisDistributedCache(redisTemplate, serializer, storageConfig)
storageCache.clear()
// Test storage config with large data (should be compressed)
val largeData = "Large data content: " + "X".repeat(1000)
storageCache.set("storage-test", largeData)
assertEquals(largeData, storageCache.get<String>("storage-test"))
logger.info { "Storage optimized configuration works correctly" }
// Configuration 3: Minimal configuration
val minimalConfig = DefaultCacheConfiguration(
keyPrefix = "minimal",
defaultTtl = null, // No TTL
localCacheMaxSize = null, // No limit
offlineModeEnabled = false,
synchronizationInterval = 1.minutes,
offlineEntryMaxAge = null,
compressionEnabled = false,
compressionThreshold = Int.MAX_VALUE
)
val minimalCache = RedisDistributedCache(redisTemplate, serializer, minimalConfig)
minimalCache.clear()
// Test minimal config
minimalCache.set("minimal-test", "minimal-value")
assertEquals("minimal-value", minimalCache.get<String>("minimal-test"))
logger.info { "Minimal configuration works correctly" }
// Clean up
performanceCache.clear()
storageCache.clear()
minimalCache.clear()
}
@Test
fun `test compression threshold behavior`() {
logger.info { "Testing compression threshold behavior" }
// Configuration with low compression threshold
val compressionConfig = DefaultCacheConfiguration(
keyPrefix = "compression-test",
defaultTtl = 30.minutes,
compressionEnabled = true,
compressionThreshold = 50 // Very low threshold
)
val compressionCache = RedisDistributedCache(redisTemplate, serializer, compressionConfig)
compressionCache.clear()
// Test small data (below threshold) - should not be compressed
val smallData = "Small"
compressionCache.set("small-data", smallData)
assertEquals(smallData, compressionCache.get<String>("small-data"))
// Test large data (above threshold) - should be compressed
val largeData = "A".repeat(200) // Well above threshold
compressionCache.set("large-data", largeData)
val retrievedLarge = compressionCache.get<String>("large-data")
assertEquals(largeData, retrievedLarge)
assertEquals(200, retrievedLarge?.length)
logger.info { "Small data length: ${smallData.length}" }
logger.info { "Large data length: ${largeData.length}" }
logger.info { "Compression threshold: ${compressionConfig.compressionThreshold}" }
// Test medium data (right at threshold)
val mediumData = "B".repeat(50) // Exactly at threshold
compressionCache.set("medium-data", mediumData)
assertEquals(mediumData, compressionCache.get<String>("medium-data"))
logger.info { "Compression threshold behavior validated" }
compressionCache.clear()
}
@Test
fun `test key prefix functionality`() {
logger.info { "Testing key prefix functionality" }
// Create caches with different prefixes
val config1 = DefaultCacheConfiguration(keyPrefix = "app1", defaultTtl = 30.minutes)
val config2 = DefaultCacheConfiguration(keyPrefix = "app2", defaultTtl = 30.minutes)
val config3 = DefaultCacheConfiguration(keyPrefix = "", defaultTtl = 30.minutes) // No prefix
val cache1 = RedisDistributedCache(redisTemplate, serializer, config1)
val cache2 = RedisDistributedCache(redisTemplate, serializer, config2)
val cache3 = RedisDistributedCache(redisTemplate, serializer, config3)
// Clear all caches
cache1.clear()
cache2.clear()
cache3.clear()
// Store same key in all caches with different values
val testKey = "shared-key"
cache1.set(testKey, "value-from-app1")
cache2.set(testKey, "value-from-app2")
cache3.set(testKey, "value-from-no-prefix")
// Verify each cache returns its own value (thanks to prefixes)
assertEquals("value-from-app1", cache1.get<String>(testKey))
assertEquals("value-from-app2", cache2.get<String>(testKey))
assertEquals("value-from-no-prefix", cache3.get<String>(testKey))
// Verify isolation - keys don't exist in other caches
assertTrue(cache1.exists(testKey))
assertTrue(cache2.exists(testKey))
assertTrue(cache3.exists(testKey))
logger.info { "Key prefix isolation works correctly" }
// Test batch operations with prefixes
val batchData = mapOf(
"batch1" to "batch-value-1",
"batch2" to "batch-value-2"
)
cache1.multiSet(batchData)
cache2.multiSet(batchData.mapValues { "${it.value}-app2" })
val retrieved1 = cache1.multiGet<String>(batchData.keys)
val retrieved2 = cache2.multiGet<String>(batchData.keys)
assertEquals("batch-value-1", retrieved1["batch1"])
assertEquals("batch-value-1-app2", retrieved2["batch1"])
logger.info { "Batch operations with prefixes work correctly" }
// Clean up
cache1.clear()
cache2.clear()
cache3.clear()
}
@Test
fun `test TTL configuration variations`() {
logger.info { "Testing TTL configuration variations" }
// Configuration with no default TTL
val noTtlConfig = DefaultCacheConfiguration(
keyPrefix = "no-ttl-test",
defaultTtl = null
)
val noTtlCache = RedisDistributedCache(redisTemplate, serializer, noTtlConfig)
noTtlCache.clear()
// Store without TTL - should persist indefinitely
noTtlCache.set("persistent-key", "persistent-value")
assertEquals("persistent-value", noTtlCache.get<String>("persistent-key"))
// Store with explicit TTL - should override default (which is null)
noTtlCache.set("explicit-ttl-key", "explicit-ttl-value", 100.milliseconds)
assertEquals("explicit-ttl-value", noTtlCache.get<String>("explicit-ttl-key"))
Thread.sleep(200)
assertFalse(noTtlCache.exists("explicit-ttl-key"))
// Configuration with short default TTL
val shortTtlConfig = DefaultCacheConfiguration(
keyPrefix = "short-ttl-test",
defaultTtl = 100.milliseconds
)
val shortTtlCache = RedisDistributedCache(redisTemplate, serializer, shortTtlConfig)
shortTtlCache.clear()
// Store with default TTL
shortTtlCache.set("default-ttl-key", "default-ttl-value")
assertEquals("default-ttl-value", shortTtlCache.get<String>("default-ttl-key"))
Thread.sleep(200)
assertFalse(shortTtlCache.exists("default-ttl-key"))
// Store with explicit longer TTL - should override default
shortTtlCache.set("override-ttl-key", "override-ttl-value", 30.minutes)
assertEquals("override-ttl-value", shortTtlCache.get<String>("override-ttl-key"))
// Should still exist after short default TTL
assertTrue(shortTtlCache.exists("override-ttl-key"))
logger.info { "TTL configurations work correctly" }
noTtlCache.clear()
shortTtlCache.clear()
}
@Test
fun `test offline mode configuration`() {
logger.info { "Testing offline mode configuration" }
// Configuration with offline mode disabled
val noOfflineConfig = DefaultCacheConfiguration(
keyPrefix = "no-offline-test",
defaultTtl = 30.minutes,
offlineModeEnabled = false
)
val noOfflineCache = RedisDistributedCache(redisTemplate, serializer, noOfflineConfig)
noOfflineCache.clear()
// Normal operations should work
noOfflineCache.set("online-key", "online-value")
assertEquals("online-value", noOfflineCache.get<String>("online-key"))
// Configuration with offline mode enabled and specific settings
val offlineConfig = DefaultCacheConfiguration(
keyPrefix = "offline-test",
defaultTtl = 30.minutes,
offlineModeEnabled = true,
localCacheMaxSize = 1000,
synchronizationInterval = 10.seconds,
offlineEntryMaxAge = 2.hours
)
val offlineCache = RedisDistributedCache(redisTemplate, serializer, offlineConfig)
offlineCache.clear()
// Test offline capabilities
offlineCache.set("offline-key", "offline-value")
assertEquals("offline-value", offlineCache.get<String>("offline-key"))
logger.info { "Offline mode configuration works correctly" }
noOfflineCache.clear()
offlineCache.clear()
}
@Test
fun `test local cache size limits`() {
logger.info { "Testing local cache size limits" }
// Configuration with very small local cache
val smallCacheConfig = DefaultCacheConfiguration(
keyPrefix = "small-cache-test",
defaultTtl = 30.minutes,
localCacheMaxSize = 3, // Very small
offlineModeEnabled = true
)
val smallCache = RedisDistributedCache(redisTemplate, serializer, smallCacheConfig)
smallCache.clear()
// Fill local cache beyond its limit
repeat(10) { i ->
smallCache.set("key-$i", "value-$i")
}
// All values should still be retrievable (from Redis if not in local cache)
repeat(10) { i ->
assertEquals("value-$i", smallCache.get<String>("key-$i"))
}
// Configuration with unlimited local cache
val unlimitedCacheConfig = DefaultCacheConfiguration(
keyPrefix = "unlimited-cache-test",
defaultTtl = 30.minutes,
localCacheMaxSize = null, // No limit
offlineModeEnabled = true
)
val unlimitedCache = RedisDistributedCache(redisTemplate, serializer, unlimitedCacheConfig)
unlimitedCache.clear()
// Fill with many entries
repeat(1000) { i ->
unlimitedCache.set("unlimited-key-$i", "unlimited-value-$i")
}
// All should be retrievable
repeat(1000) { i ->
assertEquals("unlimited-value-$i", unlimitedCache.get<String>("unlimited-key-$i"))
}
logger.info { "Local cache size limits work correctly" }
smallCache.clear()
unlimitedCache.clear()
}
}
@@ -1,321 +0,0 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.minutes
/**
* Edge Cases and Error Handling Tests for RedisDistributedCache
*/
@Testcontainers
class RedisDistributedCacheEdgeCasesTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val redisContainer = GenericContainer<Nothing>(
DockerImageName.parse("redis:7-alpine")
.asCompatibleSubstituteFor("redis")
).apply {
withExposedPorts(6379)
}
}
private lateinit var redisTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
private lateinit var cache: RedisDistributedCache
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "edge-test",
defaultTtl = 30.minutes,
compressionEnabled = true,
compressionThreshold = 1024
)
cache = RedisDistributedCache(redisTemplate, serializer, config)
cache.clear()
}
@Test
fun `test serialization with problematic objects`() {
logger.info { "Testing serialization with problematic objects" }
// Test 1: Object with circular references (causes StackOverflowError)
val circularObject = CircularReferenceClass()
circularObject.self = circularObject
// This should handle the serialization gracefully (either succeed or fail gracefully)
try {
cache.set("circular-reference", circularObject as Any)
logger.info { "Circular reference object was handled (possibly with Jackson's circular reference handling)" }
} catch (t: Throwable) {
logger.info { "Circular reference object caused expected serialization issue: ${t::class.simpleName}" }
assertTrue(
t is com.fasterxml.jackson.databind.JsonMappingException ||
t is StackOverflowError ||
t is RuntimeException,
"Expected serialization-related exception"
)
}
// Test 2: Very deep nesting that might cause issues
val deepObject = createDeeplyNestedObject(50)
try {
cache.set("deep-nested", deepObject as Any)
cache.get("deep-nested", DeeplyNestedObject::class.java)
logger.info { "Deep nested object serialized successfully" }
} catch (t: Throwable) {
logger.info { "Deep nested object caused expected issues: ${t::class.simpleName}" }
}
// Verify that the cache remains stable after problematic serialization attempts
cache.set("normal-object", "test-value")
assertEquals("test-value", cache.get<String>("normal-object"))
logger.info { "Serialization edge cases handled correctly" }
}
@Test
fun `test cache with extremely large values`() {
logger.info { "Testing extremely large values" }
// Create a very large string (10MB)
val largeValue = "X".repeat(10 * 1024 * 1024)
val key = "large-value"
// This should trigger compression
cache.set(key, largeValue)
// Verify we can retrieve it
val retrieved = cache.get<String>(key)
assertNotNull(retrieved)
assertEquals(largeValue.length, retrieved.length)
assertEquals(largeValue.substring(0, 1000), retrieved.substring(0, 1000))
logger.info { "Large value (${largeValue.length} chars) stored and retrieved successfully" }
// Test with multiple large values
val largeValues = (1..5).associateWith { "Y".repeat(2 * 1024 * 1024) }
cache.multiSet(largeValues.mapKeys { "large-multi-${it.key}" })
val retrievedLarge = cache.multiGet<String>(largeValues.keys.map { "large-multi-$it" })
assertEquals(5, retrievedLarge.size)
logger.info { "Multiple large values stored and retrieved successfully" }
}
@Test
fun `test cache with null and empty values`() {
logger.info { "Testing null and empty values" }
// Test empty string
cache.set("empty-string", "")
assertEquals("", cache.get<String>("empty-string"))
// Test string with only whitespace
cache.set("whitespace", " \n\t ")
assertEquals(" \n\t ", cache.get<String>("whitespace"))
// Test empty collections
val emptyList = emptyList<String>()
cache.set("empty-list", emptyList)
assertEquals(emptyList, cache.get<List<String>>("empty-list"))
val emptyMap = emptyMap<String, String>()
cache.set("empty-map", emptyMap)
assertEquals(emptyMap, cache.get<Map<String, String>>("empty-map"))
// Test object with null fields
val objectWithNulls = PersonWithNullable(name = "John", age = null, email = null)
cache.set("null-fields", objectWithNulls)
val retrieved = cache.get<PersonWithNullable>("null-fields")
assertNotNull(retrieved)
assertEquals("John", retrieved.name)
assertNull(retrieved.age)
assertNull(retrieved.email)
logger.info { "Null and empty values handled correctly" }
}
@Test
fun `test special characters and unicode in keys and values`() {
logger.info { "Testing special characters and unicode" }
// Test keys with special characters (encoded)
val specialKeys = listOf(
"key:with:colons",
"key with spaces",
"key-with-dashes",
"key_with_underscores",
"key.with.dots"
)
specialKeys.forEachIndexed { index, key ->
cache.set(key, "value-$index")
}
specialKeys.forEachIndexed { index, key ->
assertEquals("value-$index", cache.get<String>(key))
}
// Test values with unicode characters
val unicodeValues = mapOf(
"emoji" to "🚀 Hello World! 🌟",
"german" to "Äöüß und Umlaute",
"chinese" to "你好世界",
"arabic" to "مرحبا بالعالم",
"russian" to "Привет мир",
"mixed" to "Mixed: 123 ABC äöü 🎉 العالم"
)
cache.multiSet(unicodeValues)
val retrievedUnicode = cache.multiGet<String>(unicodeValues.keys)
unicodeValues.forEach { (key, expectedValue) ->
assertEquals(expectedValue, retrievedUnicode[key])
}
logger.info { "Special characters and unicode handled correctly" }
}
@Test
fun `test cache with complex nested objects`() {
logger.info { "Testing complex nested objects" }
// Create a complex nested structure
val complexObject = ComplexNestedObject(
id = 1,
name = "Complex Object",
metadata = mapOf(
"tags" to listOf("tag1", "tag2", "tag3"),
"properties" to mapOf(
"nested" to mapOf(
"deep" to "value",
"numbers" to listOf(1, 2, 3, 4, 5)
)
)
),
children = listOf(
SimpleChild(1, "Child 1"),
SimpleChild(2, "Child 2")
)
)
// Store and retrieve
cache.set("complex-object", complexObject)
val retrieved = cache.get<ComplexNestedObject>("complex-object")
assertNotNull(retrieved)
assertEquals(complexObject.id, retrieved.id)
assertEquals(complexObject.name, retrieved.name)
assertEquals(complexObject.children.size, retrieved.children.size)
assertEquals(complexObject.children[0].name, retrieved.children[0].name)
// Check nested metadata
val retrievedTags = retrieved.metadata["tags"] as List<*>
assertEquals(3, retrievedTags.size)
assertTrue(retrievedTags.contains("tag1"))
logger.info { "Complex nested object serialized and deserialized correctly" }
}
@Test
fun `test cache behavior with malformed data`() {
logger.info { "Testing cache behavior with malformed data" }
// Test retrieving non-existent keys
assertNull(cache.get<String>("non-existent-key"))
// Test batch operations with mixed existing/non-existing keys
cache.set("existing-1", "value-1")
cache.set("existing-2", "value-2")
val mixedKeys = listOf("existing-1", "non-existing", "existing-2", "also-non-existing")
val result = cache.multiGet<String>(mixedKeys)
assertEquals(2, result.size)
assertEquals("value-1", result["existing-1"])
assertEquals("value-2", result["existing-2"])
assertNull(result["non-existing"])
assertNull(result["also-non-existing"])
logger.info { "Malformed data scenarios handled correctly" }
}
// Helper method to create deeply nested objects
private fun createDeeplyNestedObject(depth: Int): DeeplyNestedObject {
return if (depth <= 0) {
DeeplyNestedObject("leaf", null)
} else {
DeeplyNestedObject("node-$depth", createDeeplyNestedObject(depth - 1))
}
}
// Test data classes
private class NonSerializableClass {
// This class intentionally has no default constructor or proper serialization
private val threadLocal = ThreadLocal<String>()
fun someMethod() = "not serializable"
}
private class CircularReferenceClass {
var name: String = "circular"
var self: CircularReferenceClass? = null
}
data class DeeplyNestedObject(
val name: String,
val child: DeeplyNestedObject?
)
data class PersonWithNullable(
val name: String,
val age: Int?,
val email: String?
)
data class ComplexNestedObject(
val id: Int,
val name: String,
val metadata: Map<String, Any>,
val children: List<SimpleChild>
)
data class SimpleChild(
val id: Int,
val name: String
)
}
@@ -1,480 +0,0 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
/**
* Monitoring and Integration Tests for RedisDistributedCache
*/
@OptIn(ExperimentalTime::class)
@Testcontainers
class RedisDistributedCacheIntegrationTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val redisContainer = GenericContainer<Nothing>(
DockerImageName.parse("redis:7-alpine")
.asCompatibleSubstituteFor("redis")
).apply {
withExposedPorts(6379)
}
}
private lateinit var redisTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "integration-test",
defaultTtl = 30.minutes
)
}
@Test
fun `test connection state listener functionality`() = runBlocking {
logger.info { "Testing connection state listener functionality" }
val cache = RedisDistributedCache(redisTemplate, serializer, config)
cache.clear()
val stateChanges = mutableListOf<Pair<ConnectionState, kotlin.time.Instant>>()
val latch = CountDownLatch(1)
val listener = object : ConnectionStateListener {
override fun onConnectionStateChanged(newState: ConnectionState, timestamp: kotlin.time.Instant) {
logger.info { "Connection state changed to: $newState at $timestamp" }
stateChanges.add(newState to timestamp)
latch.countDown()
}
}
// Register listener
cache.registerConnectionListener(listener)
// Initial state should be connected
assertEquals(ConnectionState.CONNECTED, cache.getConnectionState())
logger.info { "Initial connection state: ${cache.getConnectionState()}" }
// Test listener registration/unregistration
val multipleListeners = mutableListOf<ConnectionStateListener>()
val callCounts = AtomicInteger(0)
repeat(3) { i ->
val testListener = object : ConnectionStateListener {
override fun onConnectionStateChanged(newState: ConnectionState, timestamp: kotlin.time.Instant) {
callCounts.incrementAndGet()
logger.info { "Listener $i received state change: $newState" }
}
}
multipleListeners.add(testListener)
cache.registerConnectionListener(testListener)
}
// Simulate state change (this might not trigger in our test environment,
// but we're testing the listener mechanism)
cache.checkConnection()
// Unregister listeners
multipleListeners.forEach { cache.unregisterConnectionListener(it) }
cache.unregisterConnectionListener(listener)
logger.info { "Connection state listener functionality tested" }
cache.clear()
}
@Test
fun `test different Redis configurations`() {
logger.info { "Testing different Redis configurations" }
// Test with current configuration
val standardCache = RedisDistributedCache(redisTemplate, serializer, config)
standardCache.clear()
// Basic functionality test
standardCache.set("config-test-1", "standard-value")
assertEquals("standard-value", standardCache.get<String>("config-test-1"))
// Test with different Redis configuration (same container, different settings)
val alternativeConfig = DefaultCacheConfiguration(
keyPrefix = "alt-config",
defaultTtl = 1.hours,
compressionEnabled = true,
compressionThreshold = 500
)
val alternativeCache = RedisDistributedCache(redisTemplate, serializer, alternativeConfig)
alternativeCache.clear()
// Test isolation between configurations
alternativeCache.set("config-test-1", "alternative-value")
// Both caches should maintain their own data
assertEquals("standard-value", standardCache.get<String>("config-test-1"))
assertEquals("alternative-value", alternativeCache.get<String>("config-test-1"))
// Test connection state tracking
assertEquals(ConnectionState.CONNECTED, standardCache.getConnectionState())
assertEquals(ConnectionState.CONNECTED, alternativeCache.getConnectionState())
logger.info { "Different Redis configurations work correctly" }
standardCache.clear()
alternativeCache.clear()
}
@Test
fun `test cache warming scenarios`() {
logger.info { "Testing cache warming scenarios" }
val cache = RedisDistributedCache(redisTemplate, serializer, config)
cache.clear()
// Scenario 1: Bulk warming with predefined data
val warmupData = (1..1000).associate { "warmup-key-$it" to "warmup-value-$it" }
logger.info { "Starting cache warming with ${warmupData.size} entries" }
val warmupTime = measureTime {
cache.multiSet(warmupData)
}
logger.info { "Cache warmup completed in $warmupTime" }
// Verify all data is accessible
val verificationTime = measureTime {
val retrieved = cache.multiGet<String>(warmupData.keys)
assertEquals(warmupData.size, retrieved.size)
// Spot check some values
assertEquals("warmup-value-1", retrieved["warmup-key-1"])
assertEquals("warmup-value-500", retrieved["warmup-key-500"])
assertEquals("warmup-value-1000", retrieved["warmup-key-1000"])
}
logger.info { "Cache verification completed in $verificationTime" }
// Scenario 2: Gradual warming simulation
logger.info { "Testing gradual cache warming" }
val gradualWarmupCache = RedisDistributedCache(redisTemplate, serializer,
DefaultCacheConfiguration(keyPrefix = "gradual-warmup", defaultTtl = 1.hours))
gradualWarmupCache.clear()
// Simulate application startup with gradual data loading
val batchSize = 100
val totalBatches = 10
repeat(totalBatches) { batchIndex ->
val batchData = (1..batchSize).associate {
"gradual-${batchIndex * batchSize + it}" to "gradual-value-${batchIndex * batchSize + it}"
}
gradualWarmupCache.multiSet(batchData)
// Simulate some delay between batches (like database queries)
Thread.sleep(10)
}
// Verify gradual warmup worked
val totalEntries = batchSize * totalBatches
val allKeys = (1..totalEntries).map { "gradual-$it" }
val retrievedGradual = gradualWarmupCache.multiGet<String>(allKeys)
assertEquals(totalEntries, retrievedGradual.size)
logger.info { "Gradual warmup successful: ${retrievedGradual.size} entries" }
// Scenario 3: Selective warming based on usage patterns
logger.info { "Testing selective cache warming" }
val selectiveCache = RedisDistributedCache(redisTemplate, serializer,
DefaultCacheConfiguration(keyPrefix = "selective-warmup", defaultTtl = 2.hours))
selectiveCache.clear()
// Simulate frequently accessed data
val frequentData = listOf("user:123", "config:global", "menu:main")
val infrequentData = (1..100).map { "rare:data:$it" }
// Warm up frequent data first (priority warming)
frequentData.forEach { key ->
selectiveCache.set(key, "frequent-$key")
}
// Warm up infrequent data in background
infrequentData.forEach { key ->
selectiveCache.set(key, "infrequent-$key")
}
// Verify selective warming
frequentData.forEach { key ->
assertEquals("frequent-$key", selectiveCache.get<String>(key))
}
logger.info { "Selective cache warming completed successfully" }
cache.clear()
gradualWarmupCache.clear()
selectiveCache.clear()
}
@Test
fun `test metrics and monitoring integration`() = runBlocking {
logger.info { "Testing metrics and monitoring integration" }
val monitoringCache = RedisDistributedCache(redisTemplate, serializer, config)
monitoringCache.clear()
// Test connection state tracking over time
val connectionStateHistory = mutableListOf<ConnectionState>()
var lastStateChangeTime = monitoringCache.getLastStateChangeTime()
logger.info { "Initial connection state: ${monitoringCache.getConnectionState()}" }
logger.info { "Last state change time: $lastStateChangeTime" }
connectionStateHistory.add(monitoringCache.getConnectionState())
// Perform various operations and monitor state
repeat(100) { i ->
monitoringCache.set("monitoring-key-$i", "monitoring-value-$i")
if (i % 20 == 0) {
val currentState = monitoringCache.getConnectionState()
val currentTime = monitoringCache.getLastStateChangeTime()
if (currentTime != lastStateChangeTime) {
logger.info { "State change detected at operation $i" }
connectionStateHistory.add(currentState)
lastStateChangeTime = currentTime
}
}
}
// Test dirty keys tracking for monitoring
logger.info { "Testing dirty keys monitoring" }
val initialDirtyKeys = monitoringCache.getDirtyKeys()
logger.info { "Initial dirty keys count: ${initialDirtyKeys.size}" }
// Add some data and verify dirty keys tracking
monitoringCache.set("dirty-test-1", "dirty-value-1")
monitoringCache.set("dirty-test-2", "dirty-value-2")
// In normal connected state, dirty keys should be minimal
val finalDirtyKeys = monitoringCache.getDirtyKeys()
logger.info { "Final dirty keys count: ${finalDirtyKeys.size}" }
// Test batch operations monitoring
val batchData = (1..50).associate { "batch-monitoring-$it" to "batch-value-$it" }
val batchTime = measureTime {
monitoringCache.multiSet(batchData)
}
logger.info { "Batch operation took: $batchTime" }
val retrievalTime = measureTime {
val retrieved = monitoringCache.multiGet<String>(batchData.keys)
assertEquals(50, retrieved.size)
}
logger.info { "Batch retrieval took: $retrievalTime" }
logger.info { "Monitoring integration test completed" }
monitoringCache.clear()
}
@Test
fun `test cross-instance synchronization`() = runBlocking {
logger.info { "Testing cross-instance synchronization" }
// Create two cache instances (simulating different application instances)
val instance1 = RedisDistributedCache(redisTemplate, serializer,
DefaultCacheConfiguration(keyPrefix = "sync-test", defaultTtl = 1.hours))
val instance2 = RedisDistributedCache(redisTemplate, serializer,
DefaultCacheConfiguration(keyPrefix = "sync-test", defaultTtl = 1.hours))
instance1.clear()
instance2.clear()
// Instance 1 writes data
instance1.set("sync-key-1", "from-instance-1")
instance1.set("sync-key-2", "from-instance-1-v2")
// Small delay to ensure propagation
delay(100.milliseconds)
// Instance 2 should be able to read the data
assertEquals("from-instance-1", instance2.get<String>("sync-key-1"))
assertEquals("from-instance-1-v2", instance2.get<String>("sync-key-2"))
// Instance 2 modifies and adds data
instance2.set("sync-key-2", "modified-by-instance-2")
instance2.set("sync-key-3", "from-instance-2")
// Small delay to ensure propagation
delay(100.milliseconds)
// Instance 1 should see the changes
// Note: Due to local caching, we need to clear local cache or use a fresh get
// The current implementation may cache locally, so we test what we can reliably verify
val retrievedByInstance1 = instance1.get<String>("sync-key-3") // New key should work
assertEquals("from-instance-2", retrievedByInstance1)
// Test batch operations across instances
val batchData1 = mapOf(
"batch-sync-1" to "batch-from-instance-1",
"batch-sync-2" to "batch-from-instance-1-v2"
)
instance1.multiSet(batchData1)
val retrievedByInstance2 = instance2.multiGet<String>(batchData1.keys)
assertEquals(2, retrievedByInstance2.size)
assertEquals("batch-from-instance-1", retrievedByInstance2["batch-sync-1"])
logger.info { "Cross-instance synchronization works correctly" }
instance1.clear()
instance2.clear()
}
@Test
fun `test production-like scenarios`() = runBlocking {
logger.info { "Testing production-like scenarios" }
val prodCache = RedisDistributedCache(redisTemplate, serializer,
DefaultCacheConfiguration(
keyPrefix = "prod-test",
defaultTtl = 30.minutes,
localCacheMaxSize = 10000,
compressionEnabled = true,
compressionThreshold = 1024
))
prodCache.clear()
// Scenario 1: User session caching
logger.info { "Testing user session caching" }
val userSessions = (1..1000).associate {
"user:session:$it" to UserSession(
userId = "user$it",
sessionId = "session$it",
lastActivity = System.currentTimeMillis(),
permissions = listOf("read", "write")
)
}
val sessionTime = measureTime {
prodCache.multiSet(userSessions.mapValues { it.value })
}
logger.info { "Stored ${userSessions.size} user sessions in $sessionTime" }
// Verify session retrieval
val retrievedSession = prodCache.get<UserSession>("user:session:500")
assertNotNull(retrievedSession)
assertEquals("user500", retrievedSession.userId)
// Scenario 2: Configuration caching
logger.info { "Testing configuration caching" }
val configData = mapOf(
"config:database:connection" to DatabaseConfig(
host = "localhost",
port = 5432,
database = "production",
maxConnections = 50
),
"config:feature:flags" to mapOf(
"new_ui" to true,
"experimental_feature" to false,
"maintenance_mode" to false
)
)
configData.forEach { (key, value) ->
prodCache.set(key, value, 1.hours) // Config cached for 1 hour
}
val dbConfig = prodCache.get<DatabaseConfig>("config:database:connection")
assertNotNull(dbConfig)
assertEquals("localhost", dbConfig.host)
// Scenario 3: API response caching
logger.info { "Testing API response caching" }
val apiResponses = (1..100).associate {
"api:response:endpoint$it" to ApiResponse(
status = 200,
data = "Response data for endpoint $it",
timestamp = System.currentTimeMillis(),
cacheHeaders = mapOf("Cache-Control" to "public, max-age=3600")
)
}
val apiTime = measureTime {
apiResponses.forEach { (key, value) ->
prodCache.set(key, value, 5.minutes) // API responses cached for 5 minutes
}
}
logger.info { "Cached ${apiResponses.size} API responses in $apiTime" }
// Verify API response retrieval
val apiResponse = prodCache.get<ApiResponse>("api:response:endpoint50")
assertNotNull(apiResponse)
assertEquals(200, apiResponse.status)
logger.info { "Production-like scenarios completed successfully" }
prodCache.clear()
}
// Test data classes for production scenarios
data class UserSession(
val userId: String,
val sessionId: String,
val lastActivity: Long,
val permissions: List<String>
)
data class DatabaseConfig(
val host: String,
val port: Int,
val database: String,
val maxConnections: Int
)
data class ApiResponse(
val status: Int,
val data: String,
val timestamp: Long,
val cacheHeaders: Map<String, String>
)
}
@@ -1,198 +0,0 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.minutes
import kotlin.time.measureTime
/**
* Performance and Load Tests for RedisDistributedCache
*/
@Testcontainers
class RedisDistributedCachePerformanceTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val redisContainer = GenericContainer<Nothing>(
DockerImageName.parse("redis:7-alpine")
.asCompatibleSubstituteFor("redis")
).apply {
withExposedPorts(6379)
}
}
private lateinit var redisTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
private lateinit var cache: RedisDistributedCache
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "perf-test",
defaultTtl = 30.minutes
)
cache = RedisDistributedCache(redisTemplate, serializer, config)
cache.clear()
}
@Test
fun `test cache performance with high concurrent access`() = runTest {
logger.info { "Starting concurrent access test" }
val numberOfCoroutines = 100
val operationsPerCoroutine = 50
val successCounter = AtomicInteger(0)
val errorCounter = AtomicInteger(0)
val time = measureTime {
val jobs = (1..numberOfCoroutines).map { coroutineId ->
launch {
repeat(operationsPerCoroutine) { operationId ->
try {
val key = "concurrent-$coroutineId-$operationId"
val value = "value-$coroutineId-$operationId"
// Set operation
cache.set(key, value)
// Get operation
val retrieved = cache.get<String>(key)
if (retrieved == value) {
successCounter.incrementAndGet()
} else {
errorCounter.incrementAndGet()
logger.warn { "Mismatch: expected $value, got $retrieved" }
}
} catch (e: Exception) {
errorCounter.incrementAndGet()
logger.warn { "Error in operation: ${e.message}" }
}
}
}
}
jobs.joinAll()
}
val totalOperations = numberOfCoroutines * operationsPerCoroutine
val successRate = successCounter.get().toDouble() / totalOperations
val operationsPerSecond = if (time.inWholeSeconds > 0) totalOperations / time.inWholeSeconds else totalOperations * 1000 / maxOf(1, time.inWholeMilliseconds)
logger.info { "Performance test completed" }
logger.info { "Total operations: $totalOperations" }
logger.info { "Successful operations: ${successCounter.get()}" }
logger.info { "Failed operations: ${errorCounter.get()}" }
logger.info { "Success rate: ${successRate * 100}%" }
logger.info { "Total time: $time" }
logger.info { "Operations per second: $operationsPerSecond" }
assertTrue(successRate > 0.95, "Success rate should be > 95%, but was ${successRate * 100}%")
}
@Test
fun `test cache behavior under memory pressure`() {
logger.info { "Starting memory pressure test" }
// Create cache with limited local cache size
val limitedConfig = DefaultCacheConfiguration(
keyPrefix = "memory-test",
localCacheMaxSize = 100, // Very small local cache
defaultTtl = 30.minutes
)
val limitedCache = RedisDistributedCache(redisTemplate, serializer, limitedConfig)
// Fill cache with more entries than local cache can hold
val numberOfEntries = 500
val largeValue = "A".repeat(1000) // 1KB per entry
val time = measureTime {
repeat(numberOfEntries) { i ->
val key = "memory-pressure-$i"
limitedCache.set(key, largeValue)
}
}
logger.info { "Inserted $numberOfEntries entries in $time" }
// Verify that entries are still retrievable (should come from Redis)
var retrievedCount = 0
repeat(numberOfEntries) { i ->
val key = "memory-pressure-$i"
val retrieved = limitedCache.get<String>(key)
if (retrieved == largeValue) {
retrievedCount++
}
}
logger.info { "Successfully retrieved $retrievedCount out of $numberOfEntries entries" }
assertTrue(retrievedCount > numberOfEntries * 0.9,
"Should retrieve > 90% of entries, but retrieved only ${retrievedCount * 100.0 / numberOfEntries}%")
limitedCache.clear()
}
@Test
fun `test bulk operations performance`() {
logger.info { "Starting bulk operations performance test" }
val batchSize = 1000
val entries = (1..batchSize).associate {
"bulk-$it" to "bulk-value-$it"
}
// Test multiSet performance
val setTime = measureTime {
cache.multiSet(entries)
}
// Test multiGet performance
val getTime = measureTime {
val retrieved = cache.multiGet<String>(entries.keys)
assertEquals(batchSize, retrieved.size)
}
val setRatePerSec = if (setTime.inWholeSeconds > 0) batchSize / setTime.inWholeSeconds else batchSize * 1000 / maxOf(1, setTime.inWholeMilliseconds)
val getRatePerSec = if (getTime.inWholeSeconds > 0) batchSize / getTime.inWholeSeconds else batchSize * 1000 / maxOf(1, getTime.inWholeMilliseconds)
logger.info { "Bulk operations performance completed" }
logger.info { "MultiSet ${batchSize} entries: $setTime" }
logger.info { "MultiGet ${batchSize} entries: $getTime" }
logger.info { "Set rate: $setRatePerSec entries/sec" }
logger.info { "Get rate: $getRatePerSec entries/sec" }
assertTrue(setTime.inWholeSeconds < 10, "MultiSet should complete within 10 seconds")
assertTrue(getTime.inWholeSeconds < 10, "MultiGet should complete within 10 seconds")
}
}
@@ -1,351 +0,0 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.RedisConnectionFailureException
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.core.ValueOperations
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import java.time.Duration as JavaDuration
/**
* Timeout and Resilience Tests for RedisDistributedCache
*/
@OptIn(ExperimentalTime::class)
@Testcontainers
class RedisDistributedCacheResilienceTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val redisContainer = GenericContainer<Nothing>(
DockerImageName.parse("redis:7-alpine")
.asCompatibleSubstituteFor("redis")
).apply {
withExposedPorts(6379)
}
}
private lateinit var redisTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "resilience-test",
defaultTtl = 30.minutes,
offlineModeEnabled = true
)
}
@Test
fun `test connection timeout scenarios`() = runBlocking {
logger.info { "Testing connection timeout scenarios" }
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
every { mockTemplate.opsForValue() } returns mockValueOps
// Simulate slow Redis responses
every { mockValueOps.get(any()) } answers {
Thread.sleep(5000) // 5-second delay
"slow-response".toByteArray()
}
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } answers {
Thread.sleep(3000) // 3-second delay
}
val slowCache = RedisDistributedCache(mockTemplate, serializer, config)
// Test get operation with timeout
val startTime = System.currentTimeMillis()
val result = slowCache.get<String>("slow-key")
val endTime = System.currentTimeMillis()
logger.info { "Get operation took ${endTime - startTime}ms" }
// The operation should either succeed or fail gracefully
// Test set operation with timeout
val setStartTime = System.currentTimeMillis()
slowCache.set("slow-set-key", "value")
val setEndTime = System.currentTimeMillis()
logger.info { "Set operation took ${setEndTime - setStartTime}ms" }
// Verify that operations don't hang indefinitely
assertTrue((endTime - startTime) < 10000, "Get operation should not take more than 10 seconds")
assertTrue((setEndTime - setStartTime) < 10000, "Set operation should not take more than 10 seconds")
}
@Test
fun `test partial Redis failures`() {
logger.info { "Testing partial Redis failures" }
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
every { mockTemplate.opsForValue() } returns mockValueOps
every { mockTemplate.hasKey(any()) } returns true
val failureCounter = AtomicInteger(0)
// Simulate intermittent connection failures (fail every 3rd operation)
every { mockValueOps.get(any()) } answers {
if (failureCounter.incrementAndGet() % 3 == 0) {
throw RedisConnectionFailureException("Intermittent failure")
}
serializer.serializeEntry(CacheEntry("test", "value"))
}
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } answers {
if (failureCounter.incrementAndGet() % 3 == 0) {
throw RedisConnectionFailureException("Intermittent failure")
}
}
val unreliableCache = RedisDistributedCache(mockTemplate, serializer, config)
// Test multiple operations with intermittent failures
var successCount = 0
var failureCount = 0
repeat(20) { i ->
try {
unreliableCache.set("intermittent-$i", "value-$i")
val retrieved = unreliableCache.get<String>("intermittent-$i")
if (retrieved != null) {
successCount++
} else {
failureCount++
}
} catch (e: Exception) {
failureCount++
logger.info { "Operation failed as expected: ${e.message}" }
}
}
logger.info { "Partial failure test results:" }
logger.info { "Successful operations: $successCount" }
logger.info { "Failed operations: $failureCount" }
logger.info { "Total operations: 20" }
// Due to offline mode, operations might succeed locally even when Redis fails,
// So we verify the cache is resilient and continues working
assertTrue(successCount >= 0, "Should handle operations gracefully")
assertEquals(20, successCount + failureCount, "Should process all operations")
// Verify that the cache state is properly managed despite intermittent failures
assertEquals(ConnectionState.DISCONNECTED, unreliableCache.getConnectionState())
// Verify that dirty keys are tracked for failed operations
val dirtyKeys = unreliableCache.getDirtyKeys()
assertTrue(dirtyKeys.isNotEmpty(), "Should have dirty keys from failed operations")
logger.info { "Dirty keys count: ${dirtyKeys.size}" }
}
@Test
fun `test network partitioning simulation`() {
logger.info { "Testing network partitioning simulation" }
val cache = RedisDistributedCache(redisTemplate, serializer, config)
cache.clear()
// Phase 1: Normal operations (network is fine)
logger.info { "Phase 1: Normal operations" }
cache.set("partition-test-1", "value-1")
cache.set("partition-test-2", "value-2")
assertEquals("value-1", cache.get<String>("partition-test-1"))
assertEquals("value-2", cache.get<String>("partition-test-2"))
assertEquals(ConnectionState.CONNECTED, cache.getConnectionState())
// Phase 2: Simulate network partition by creating a new cache with a broken connection
logger.info { "Phase 2: Simulating network partition" }
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
every { mockTemplate.opsForValue() } returns mockValueOps
every { mockValueOps.get(any()) } throws RedisConnectionFailureException("Network partition")
every {
mockValueOps.set(
any<String>(),
any<ByteArray>(),
any<JavaDuration>()
)
} throws RedisConnectionFailureException("Network partition")
every { mockTemplate.delete(any<String>()) } throws RedisConnectionFailureException("Network partition")
every { mockTemplate.hasKey(any()) } throws RedisConnectionFailureException("Network partition")
val partitionedCache = RedisDistributedCache(mockTemplate, serializer, config)
// Operations during partition should work locally
partitionedCache.set("partition-offline-1", "offline-value-1")
partitionedCache.set("partition-offline-2", "offline-value-2")
// Should be able to retrieve from a local cache
assertEquals("offline-value-1", partitionedCache.get<String>("partition-offline-1"))
assertEquals("offline-value-2", partitionedCache.get<String>("partition-offline-2"))
assertEquals(ConnectionState.DISCONNECTED, partitionedCache.getConnectionState())
// Should track dirty keys
val dirtyKeys = partitionedCache.getDirtyKeys()
assertTrue(dirtyKeys.contains("partition-offline-1"))
assertTrue(dirtyKeys.contains("partition-offline-2"))
logger.info { "Network partition handled correctly - operations work offline" }
}
@Test
fun `test reconnection and synchronization after network issues`() {
logger.info { "Testing reconnection and synchronization" }
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
every { mockTemplate.opsForValue() } returns mockValueOps
val reconnectingCache = RedisDistributedCache(mockTemplate, serializer, config)
// Phase 1: Simulate disconnection
every { mockValueOps.get(any()) } throws RedisConnectionFailureException("Disconnected")
every {
mockValueOps.set(
any<String>(),
any<ByteArray>(),
any<JavaDuration>()
)
} throws RedisConnectionFailureException("Disconnected")
every { mockTemplate.hasKey(any()) } throws RedisConnectionFailureException("Disconnected")
reconnectingCache.set("reconnect-test-1", "value-1")
reconnectingCache.set("reconnect-test-2", "value-2")
assertEquals(ConnectionState.DISCONNECTED, reconnectingCache.getConnectionState())
assertTrue(reconnectingCache.getDirtyKeys().size >= 2)
// Phase 2: Simulate reconnection
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } returns Unit
every { mockTemplate.hasKey(any()) } returns true
every { mockTemplate.delete(any<String>()) } returns true
// Trigger connection check (this would normally be done by a scheduled task)
reconnectingCache.checkConnection()
// After a successful connection check, dirty keys should be synchronized
// Note: In a real scenario, this would be handled by the synchronization mechanism
logger.info { "Reconnection simulation completed" }
}
@Test
fun `test connection state listener notifications`() = runBlocking {
logger.info { "Testing connection state listener notifications" }
val stateChanges = mutableListOf<ConnectionState>()
val listener = object : ConnectionStateListener {
override fun onConnectionStateChanged(newState: ConnectionState, timestamp: kotlin.time.Instant) {
logger.info { "Connection state changed to: $newState at $timestamp" }
stateChanges.add(newState)
}
}
val cache = RedisDistributedCache(redisTemplate, serializer, config)
cache.registerConnectionListener(listener)
// Initially should be connected
assertEquals(ConnectionState.CONNECTED, cache.getConnectionState())
logger.info { "Initial connection state: ${cache.getConnectionState()}" }
// Test listener registration/unregistration mechanism
val testListener = object : ConnectionStateListener {
override fun onConnectionStateChanged(newState: ConnectionState, timestamp: kotlin.time.Instant) {
logger.info { "Test listener received state change: $newState" }
}
}
// Register and unregister listeners (testing the mechanism itself)
cache.registerConnectionListener(testListener)
cache.unregisterConnectionListener(testListener)
cache.unregisterConnectionListener(listener)
logger.info { "Connection state listener registration/unregistration mechanism tested" }
// Test that connection state is properly tracked
assertTrue(cache.isConnected(), "Cache should be connected to Redis")
logger.info { "Connection state listener functionality verified" }
}
@Test
fun `test cache operations during Redis restart simulation`() = runBlocking {
logger.info { "Testing cache operations during Redis restart simulation" }
val cache = RedisDistributedCache(redisTemplate, serializer, config)
cache.clear()
// Store some initial data
cache.set("restart-test-1", "initial-value-1")
cache.set("restart-test-2", "initial-value-2")
assertEquals("initial-value-1", cache.get<String>("restart-test-1"))
// Simulate Redis restart by creating a new cache instance
// (In a real scenario, this would be the same instance, but Redis would be restarted)
// During "restart" (brief unavailability), operations should work locally
val duringRestartCache = RedisDistributedCache(redisTemplate, serializer, config)
// These should work even if Redis is temporarily unavailable
duringRestartCache.set("during-restart-1", "temp-value-1")
assertEquals("temp-value-1", duringRestartCache.get<String>("during-restart-1"))
// After "restart", data should be synchronized
delay(1.seconds) // Brief delay to simulate restart completion
val afterRestartCache = RedisDistributedCache(redisTemplate, serializer, config)
// Should be able to access both old and new data
// Note: In a real Redis restart, persisted data would still be there
afterRestartCache.set("after-restart-1", "post-restart-value-1")
assertEquals("post-restart-value-1", afterRestartCache.get<String>("after-restart-1"))
logger.info { "Redis restart simulation completed successfully" }
}
}
@@ -1,288 +0,0 @@
package at.mocode.infrastructure.cache.redis
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.RedisConnectionFailureException
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.core.ValueOperations
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import kotlin.test.*
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import java.time.Duration as JavaDuration // Alias für Eindeutigkeit
@Testcontainers
class RedisDistributedCacheTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val redisContainer = GenericContainer<Nothing>(
DockerImageName.parse("redis:7-alpine")
.asCompatibleSubstituteFor("redis")
).apply {
withExposedPorts(6379)
}
}
private lateinit var redisTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
private lateinit var cache: RedisDistributedCache
@BeforeEach
fun setUp() {
val redisPort = redisContainer.getMappedPort(6379)
val redisHost = redisContainer.host
val redisConfig = RedisStandaloneConfiguration(redisHost, redisPort)
val connectionFactory = LettuceConnectionFactory(redisConfig)
connectionFactory.afterPropertiesSet()
redisTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "test",
offlineModeEnabled = true,
defaultTtl = 30.minutes
)
cache = RedisDistributedCache(redisTemplate, serializer, config)
cache.clear()
}
@AfterEach
fun tearDown() {
cache.clear()
}
@Test
fun `get should return value with new reified extension function`() {
cache.set("key1", "value1")
val value = cache.get<String>("key1")
assertEquals("value1", value)
}
@Test
fun `test basic cache operations`() {
cache.set("key1", "value1")
val value = cache.get("key1", String::class.java)
assertEquals("value1", value)
assertTrue(cache.exists("key1"))
cache.delete("key1")
assertFalse(cache.exists("key1"))
assertNull(cache.get("key1", String::class.java))
}
@Test
fun `test cache with TTL`() {
cache.set("key2", "value2", 100.milliseconds)
assertTrue(cache.exists("key2"))
Thread.sleep(200)
assertFalse(cache.exists("key2"))
}
@Test
fun `test batch operations`() {
// Set multiple values
val entries = mapOf(
"batch1" to "value1",
"batch2" to "value2",
"batch3" to "value3"
)
cache.multiSet(entries)
// Get multiple values
val values = cache.multiGet(listOf("batch1", "batch2", "batch3"), String::class.java)
assertEquals(3, values.size)
assertEquals("value1", values["batch1"])
assertEquals("value2", values["batch2"])
assertEquals("value3", values["batch3"])
// Delete multiple values
cache.multiDelete(listOf("batch1", "batch3"))
// Verify they're gone
val remainingValues = cache.multiGet(listOf("batch1", "batch2", "batch3"), String::class.java)
assertEquals(1, remainingValues.size)
assertNull(remainingValues["batch1"])
assertEquals("value2", remainingValues["batch2"])
assertNull(remainingValues["batch3"])
}
@Test
fun `should handle offline mode and synchronize correctly`() {
// Arrange
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>(relaxed = true)
val mockValueOps = mockk<ValueOperations<String, ByteArray>>(relaxed = true)
every { mockTemplate.opsForValue() } returns mockValueOps
val offlineCache = RedisDistributedCache(mockTemplate, serializer, config)
// 1. Online-Phase
// Mocking set with any JavaDuration to avoid NoSuchMethodError if signature mismatch
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } returns Unit
// Also mock the version without duration just in case
every { mockValueOps.set(any<String>(), any<ByteArray>()) } returns Unit
offlineCache.set("key1", "online-value")
// Verify call - be lenient with duration matching
verify(atLeast = 1) {
mockValueOps.set(eq("test:key1"), any<ByteArray>(), any<JavaDuration>())
}
// 2. Offline-Phase simulieren
every {
mockValueOps.set(
any<String>(),
any<ByteArray>(),
any<JavaDuration>()
)
} throws RedisConnectionFailureException("Redis is down")
every { mockValueOps.set(any<String>(), any<ByteArray>()) } throws RedisConnectionFailureException("Redis is down")
every { mockTemplate.delete(any<String>()) } throws RedisConnectionFailureException("Redis is down")
offlineCache.set("key2", "offline-value")
offlineCache.delete("key1")
assertEquals("offline-value", offlineCache.get<String>("key2"))
assertTrue(offlineCache.getDirtyKeys().contains("key2"))
assertTrue(offlineCache.getDirtyKeys().contains("key1"))
// 3. Wiederverbindungs-Phase
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } returns Unit
every { mockValueOps.set(any<String>(), any<ByteArray>()) } returns Unit
every { mockTemplate.delete(any<String>()) } returns true
every { mockTemplate.hasKey("connection-test") } returns true
offlineCache.checkConnection()
// Verify sync happened
verify(atLeast = 1) { mockValueOps.set(eq("test:key1"), any<ByteArray>(), any<JavaDuration>()) }
verify(atLeast = 1) { mockTemplate.delete(eq("test:key1")) }
assertTrue(offlineCache.getDirtyKeys().isEmpty(), "Dirty keys should be empty after sync")
}
@Test
fun `test multiSet with TTL`() {
val entries = mapOf("batchTtl1" to "value1", "batchTtl2" to "value2")
cache.multiSet(entries, 100.milliseconds)
assertTrue(cache.exists("batchTtl1"))
Thread.sleep(200)
assertFalse(cache.exists("batchTtl1"))
}
@Test
fun `test complex objects`() {
// Create a complex object
val person = Person("John Doe", 30, listOf("Reading", "Hiking"))
// Store it in the cache
cache.set("person1", person)
// Retrieve it
val retrievedPerson = cache.get("person1", Person::class.java)
// Verify it's the same
assertNotNull(retrievedPerson)
assertEquals("John Doe", retrievedPerson.name)
assertEquals(30, retrievedPerson.age)
assertEquals(2, retrievedPerson.hobbies.size)
assertTrue(retrievedPerson.hobbies.contains("Reading"))
assertTrue(retrievedPerson.hobbies.contains("Hiking"))
}
@Test
fun `test clear method`() {
// Set multiple values
cache.set("clear1", "value1")
cache.set("clear2", "value2")
// Verify they exist
assertTrue(cache.exists("clear1"))
assertTrue(cache.exists("clear2"))
// Clear the cache
cache.clear()
// Verify they're gone
assertFalse(cache.exists("clear1"))
assertFalse(cache.exists("clear2"))
}
@Test
fun `test markDirty method`() {
// Set a value
cache.set("dirty1", "value1")
// Mark it as dirty
cache.markDirty("dirty1")
// Verify it's in the dirty keys
assertTrue(cache.getDirtyKeys().contains("dirty1"))
}
@Test
fun `test handling Redis connection failures`() {
// Create a mock RedisTemplate and ValueOperations
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
// Configure the mock to throw connection failure
every { mockTemplate.opsForValue() } returns mockValueOps
every { mockValueOps.get(any()) } throws RedisConnectionFailureException("Test connection failure")
every { mockTemplate.hasKey(any()) } throws RedisConnectionFailureException("Test connection failure")
// Create a cache with the mock
val mockCache = RedisDistributedCache(mockTemplate, serializer, config)
// Try to get a value
val value = mockCache.get("failure1", String::class.java)
// Verify it returns null
assertNull(value)
// Verify the connection state is DISCONNECTED
assertEquals(ConnectionState.DISCONNECTED, mockCache.getConnectionState())
}
@Test
fun `test default TTL`() {
// Set a value without specifying TTL
cache.set("defaultTtl", "value")
// Verify it exists
assertTrue(cache.exists("defaultTtl"))
// The default TTL is 30 minutes, so it should still exist
assertEquals("value", cache.get("defaultTtl", String::class.java))
}
// Test data class
data class Person(
val name: String,
val age: Int,
val hobbies: List<String>
)
}
@@ -1,5 +1,5 @@
// Dieses Modul stellt eine konkrete Implementierung der `cache-api`
// unter Verwendung von Redis als Caching-Backend bereit.
// unter Verwendung von Valkey als Caching-Backend bereit.
plugins {
alias(libs.plugins.kotlinJvm)
alias(libs.plugins.kotlinSpring)
@@ -25,9 +25,9 @@ dependencies {
api(platform(projects.platform.platformBom))
// Implementiert die provider-agnostische Caching-API.
implementation(projects.backend.infrastructure.cache.cacheApi)
// OPTIMIERUNG: Verwendung des `redis-cache`-Bundles aus libs.versions.toml.
// Dieses Bundle enthält Spring Data Redis, Lettuce und Jackson-Module.
implementation(libs.bundles.redis.cache)
// OPTIMIERUNG: Verwendung des `valkey-cache`-Bundles aus libs.versions.toml.
// Dieses Bundle enthält Spring Data Valkey, Lettuce und Jackson-Module.
implementation(libs.bundles.valkey.cache)
// Stellt alle Test-Abhängigkeiten gebündelt bereit.
testImplementation(projects.platform.platformTesting)
testImplementation(libs.bundles.testing.jvm)
@@ -1,4 +1,4 @@
package at.mocode.infrastructure.cache.redis
package at.mocode.infrastructure.cache.valkey
import at.mocode.infrastructure.cache.api.CacheEntry
import at.mocode.infrastructure.cache.api.CacheSerializer
@@ -1,4 +1,4 @@
package at.mocode.infrastructure.cache.redis
package at.mocode.infrastructure.cache.valkey
import at.mocode.infrastructure.cache.api.CacheConfiguration
import at.mocode.infrastructure.cache.api.CacheSerializer
@@ -17,10 +17,10 @@ import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
/**
* Redis connection properties.
* Valkey connection properties.
*/
@ConfigurationProperties(prefix = "redis")
data class RedisProperties(
@ConfigurationProperties(prefix = "valkey")
data class ValkeyProperties(
val host: String = "localhost",
val port: Int = 6379,
val password: String? = null,
@@ -33,20 +33,20 @@ data class RedisProperties(
)
/**
* Spring configuration for Redis.
* Spring configuration for Valkey.
*/
@Configuration
@EnableConfigurationProperties(RedisProperties::class)
class RedisConfiguration {
@EnableConfigurationProperties(ValkeyProperties::class)
class ValkeyConfiguration {
/**
* Creates a Redis connection factory.
* Creates a Valkey connection factory.
*
* @param properties Redis connection properties
* @return Redis connection factory
* @param properties Valkey connection properties
* @return Valkey connection factory
*/
@Bean
fun redisConnectionFactory(properties: RedisProperties): RedisConnectionFactory {
fun valkeyConnectionFactory(properties: ValkeyProperties): RedisConnectionFactory {
val config = RedisStandaloneConfiguration().apply {
hostName = properties.host
port = properties.port
@@ -61,14 +61,14 @@ class RedisConfiguration {
}
/**
* Creates a Redis template for byte arrays.
* Creates a Valkey template for byte arrays.
*
* @param connectionFactory Redis connection factory
* @return Redis template
* @param connectionFactory Valkey connection factory
* @return Valkey template
*/
@Bean
fun redisTemplate(
@Qualifier("redisConnectionFactory") connectionFactory: RedisConnectionFactory
fun valkeyTemplate(
@Qualifier("valkeyConnectionFactory") connectionFactory: RedisConnectionFactory
): RedisTemplate<String, ByteArray> {
return RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
@@ -1,4 +1,4 @@
package at.mocode.infrastructure.cache.redis
package at.mocode.infrastructure.cache.valkey
import at.mocode.infrastructure.cache.api.CacheConfiguration
import at.mocode.infrastructure.cache.api.CacheEntry
@@ -20,13 +20,13 @@ import kotlin.time.toJavaDuration
import kotlin.time.ExperimentalTime
@OptIn(ExperimentalTime::class)
class RedisDistributedCache(
private val redisTemplate: RedisTemplate<String, ByteArray>,
private val serializer: CacheSerializer,
private val config: CacheConfiguration
class ValkeyDistributedCache(
private val valkeyTemplate: RedisTemplate<String, ByteArray>,
private val serializer: CacheSerializer,
private val config: CacheConfiguration
) : DistributedCache, ConnectionStatusTracker {
private val logger = LoggerFactory.getLogger(RedisDistributedCache::class.java)
private val logger = LoggerFactory.getLogger(ValkeyDistributedCache::class.java)
// Local cache for offline capability
private val localCache = ConcurrentHashMap<String, CacheEntry<Any>>()
@@ -48,7 +48,7 @@ class RedisDistributedCache(
private var lastMetricsLogTime = Clock.System.now()
init {
// Try to connect to Redis
// Try to connect to Valkey
checkConnection()
}
@@ -71,9 +71,9 @@ class RedisDistributedCache(
return null
}
// Try to get from Redis
// Try to get from Valkey
try {
val bytes = redisTemplate.opsForValue().get(prefixedKey) ?: run {
val bytes = valkeyTemplate.opsForValue().get(prefixedKey) ?: run {
trackOperation(true) // successful operation, just no data
return null
}
@@ -91,7 +91,7 @@ class RedisDistributedCache(
trackOperation(false)
return null
} catch (e: Exception) {
logger.error("Error getting value from Redis for key $prefixedKey", e)
logger.error("Error getting value from Valkey for key $prefixedKey", e)
trackOperation(false)
return null
}
@@ -122,9 +122,9 @@ class RedisDistributedCache(
val effectiveTtl = ttl ?: config.defaultTtl
if (effectiveTtl != null) {
// KORREKTUR: Konvertierung zu java.time.Duration für RedisTemplate
redisTemplate.opsForValue().set(prefixedKey, bytes, effectiveTtl.toJavaDuration())
valkeyTemplate.opsForValue().set(prefixedKey, bytes, effectiveTtl.toJavaDuration())
} else {
redisTemplate.opsForValue().set(prefixedKey, bytes)
valkeyTemplate.opsForValue().set(prefixedKey, bytes)
}
trackOperation(true)
} catch (e: RedisConnectionFailureException) {
@@ -132,7 +132,7 @@ class RedisDistributedCache(
markDirty(key)
trackOperation(false)
} catch (e: Exception) {
logger.error("Error setting value in Redis for key $prefixedKey", e)
logger.error("Error setting value in Valkey for key $prefixedKey", e)
markDirty(key)
trackOperation(false)
}
@@ -150,14 +150,14 @@ class RedisDistributedCache(
return
}
// Try to delete from Redis
// Try to delete from Valkey
try {
redisTemplate.delete(prefixedKey)
valkeyTemplate.delete(prefixedKey)
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
markDirty(key)
} catch (e: Exception) {
logger.error("Error deleting value from Redis for key $prefixedKey", e)
logger.error("Error deleting value from Valkey for key $prefixedKey", e)
markDirty(key)
}
}
@@ -180,14 +180,14 @@ class RedisDistributedCache(
return false
}
// Check Redis
// Check Valkey
try {
return redisTemplate.hasKey(prefixedKey) ?: false
return valkeyTemplate.hasKey(prefixedKey) ?: false
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
return false
} catch (e: Exception) {
logger.error("Error checking if key exists in Redis for key $prefixedKey", e)
logger.error("Error checking if key exists in Valkey for key $prefixedKey", e)
return false
}
}
@@ -214,18 +214,18 @@ class RedisDistributedCache(
return result
}
// Get missing keys from Redis
// Get missing keys from Valkey
val missingKeys = prefixedKeys.filter { !localEntries.containsKey(it) }
if (missingKeys.isEmpty()) {
return result
}
try {
val redisEntries = redisTemplate.opsForValue().multiGet(missingKeys)
if (redisEntries != null) {
val valkeyEntries = valkeyTemplate.opsForValue().multiGet(missingKeys)
if (valkeyEntries != null) {
for (i in missingKeys.indices) {
val key = missingKeys[i]
val bytes = redisEntries[i]
val bytes = valkeyEntries[i]
if (bytes != null) {
try {
val entry = serializer.deserializeEntry(bytes, clazz)
@@ -246,7 +246,7 @@ class RedisDistributedCache(
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
} catch (e: Exception) {
logger.error("Error getting multiple values from Redis", e)
logger.error("Error getting multiple values from Valkey", e)
}
return result
@@ -254,7 +254,7 @@ class RedisDistributedCache(
// ... (multiSet ebenfalls anpassen)
override fun <T : Any> multiSet(entries: Map<String, T>, ttl: Duration?) {
val redisBatch = mutableMapOf<String, ByteArray>()
val valkeyBatch = mutableMapOf<String, ByteArray>()
val expiresAt = ttl?.let { Clock.System.now() + it } ?: config.defaultTtl?.let { Clock.System.now() + it }
for ((key, value) in entries) {
@@ -267,7 +267,7 @@ class RedisDistributedCache(
@Suppress("UNCHECKED_CAST")
localCache[prefixedKey] = entry as CacheEntry<Any>
enforceLocalCacheSize()
redisBatch[prefixedKey] = serializer.serializeEntry(entry)
valkeyBatch[prefixedKey] = serializer.serializeEntry(entry)
}
if (!isConnected()) {
@@ -276,11 +276,11 @@ class RedisDistributedCache(
}
try {
redisTemplate.opsForValue().multiSet(redisBatch)
valkeyTemplate.opsForValue().multiSet(valkeyBatch)
val effectiveTtl = ttl ?: config.defaultTtl
if (effectiveTtl != null) {
redisTemplate.executePipelined { connection ->
redisBatch.keys.forEach { key ->
valkeyTemplate.executePipelined { connection ->
valkeyBatch.keys.forEach { key ->
connection.keyCommands().pExpire(key.toByteArray(), effectiveTtl.inWholeMilliseconds)
}
null
@@ -290,7 +290,7 @@ class RedisDistributedCache(
handleConnectionFailure(e)
entries.keys.forEach { markDirty(it) }
} catch (e: Exception) {
logger.error("Error setting multiple values in Redis", e)
logger.error("Error setting multiple values in Valkey", e)
entries.keys.forEach { markDirty(it) }
}
}
@@ -307,14 +307,14 @@ class RedisDistributedCache(
return
}
// Try to delete from Redis
// Try to delete from Valkey
try {
redisTemplate.delete(prefixedKeys)
valkeyTemplate.delete(prefixedKeys)
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
keys.forEach { markDirty(it) }
} catch (e: Exception) {
logger.error("Error deleting multiple values from Redis", e)
logger.error("Error deleting multiple values from Valkey", e)
keys.forEach { markDirty(it) }
}
}
@@ -338,20 +338,20 @@ class RedisDistributedCache(
val localEntry = localCache[prefixedKey]
if (localEntry == null) {
// Entry was deleted locally, delete from Redis
// Entry was deleted locally, delete from Valkey
try {
redisTemplate.delete(prefixedKey)
valkeyTemplate.delete(prefixedKey)
dirtyKeys.remove(key)
} catch (e: Exception) {
logger.error("Error deleting key $prefixedKey during synchronization", e)
}
} else {
// Entry exists locally, update in Redis
// Entry exists locally, update in Valkey
try {
val bytes = serializer.serializeEntry(localEntry)
// Die 'set'-Methode erwartet kein TTL-Argument hier
redisTemplate.opsForValue().set(prefixedKey, bytes)
valkeyTemplate.opsForValue().set(prefixedKey, bytes)
// So wird die Dauer zwischen zwei Instants berechnet
val ttl = localEntry.expiresAt?.let { it - Clock.System.now() }
@@ -359,7 +359,7 @@ class RedisDistributedCache(
// 'isNegative' wird zu '< Duration.ZERO'
if (ttl != null && ttl > Duration.ZERO) {
// KORREKTUR: 'expire' braucht eine java.time.Duration
redisTemplate.expire(prefixedKey, ttl.toJavaDuration())
valkeyTemplate.expire(prefixedKey, ttl.toJavaDuration())
}
// Update local entry to mark as clean
@@ -396,16 +396,16 @@ class RedisDistributedCache(
return
}
// Try to clear Redis
// Try to clear Valkey
try {
val keys = redisTemplate.keys("${config.keyPrefix}*")
val keys = valkeyTemplate.keys("${config.keyPrefix}*")
if (keys != null && keys.isNotEmpty()) {
redisTemplate.delete(keys)
valkeyTemplate.delete(keys)
}
} catch (e: RedisConnectionFailureException) {
handleConnectionFailure(e)
} catch (e: Exception) {
logger.error("Error clearing Redis cache", e)
logger.error("Error clearing Valkey cache", e)
}
}
@@ -458,7 +458,7 @@ class RedisDistributedCache(
}
private fun handleConnectionFailure(e: Exception) {
logger.warn("Redis connection failure: ${e.message}")
logger.warn("Valkey connection failure: ${e.message}")
setConnectionState(ConnectionState.DISCONNECTED)
}
@@ -488,12 +488,12 @@ class RedisDistributedCache(
}
/**
* Prüft periodisch die Verbindung zu Redis.
* Prüft periodisch die Verbindung zu Valkey.
*/
@Scheduled(fixedDelayString = "\${redis.connection-check-interval:10000}")
@Scheduled(fixedDelayString = "\${valkey.connection-check-interval:10000}")
fun checkConnection() {
try {
redisTemplate.hasKey("connection-test")
valkeyTemplate.hasKey("connection-test")
setConnectionState(ConnectionState.CONNECTED)
} catch (_: Exception) {
setConnectionState(ConnectionState.DISCONNECTED)
@@ -503,7 +503,7 @@ class RedisDistributedCache(
/**
* Bereinigt periodisch abgelaufene Einträge aus dem lokalen Cache.
*/
@Scheduled(fixedDelayString = "\${redis.local-cache-cleanup-interval:60000}")
@Scheduled(fixedDelayString = "\${valkey.local-cache-cleanup-interval:60000}")
fun cleanupLocalCache() {
val now = Clock.System.now()
val expiredKeys = localCache.entries
@@ -520,7 +520,7 @@ class RedisDistributedCache(
/**
* Synchronisiert periodisch schmutzige Schlüssel, sobald verbunden.
*/
@Scheduled(fixedDelayString = "\${redis.sync-interval:300000}")
@Scheduled(fixedDelayString = "\${valkey.sync-interval:300000}")
fun scheduledSync() {
if (isConnected() && dirtyKeys.isNotEmpty()) {
synchronize(null)
@@ -565,7 +565,7 @@ class RedisDistributedCache(
/**
* Loggt Performance-Metriken (periodisch aufgerufen).
*/
@Scheduled(fixedDelayString = "\${redis.metrics-log-interval:300000}")
@Scheduled(fixedDelayString = "\${valkey.metrics-log-interval:300000}")
fun logPerformanceMetrics() {
val metrics = getPerformanceMetrics()
logger.info("Cache performance metrics: $metrics")
@@ -0,0 +1,387 @@
package at.mocode.infrastructure.cache.valkey
import at.mocode.infrastructure.cache.api.CacheSerializer
import at.mocode.infrastructure.cache.api.DefaultCacheConfiguration
import at.mocode.infrastructure.cache.api.get
import at.mocode.infrastructure.cache.api.multiGet
import io.github.oshai.kotlinlogging.KotlinLogging
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
/**
* Configuration Tests for ValkeyDistributedCache
*/
@OptIn(ExperimentalTime::class)
@Testcontainers
class ValkeyDistributedCacheConfigurationTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val valkeyContainer = GenericContainer<Nothing>(
DockerImageName.parse("valkey/valkey:9-alpine")
.asCompatibleSubstituteFor("valkey")
).apply {
withExposedPorts(6379)
}
}
private lateinit var valkeyTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
@BeforeEach
fun setUp() {
val valkeyPort = valkeyContainer.getMappedPort(6379)
val valkeyHost = valkeyContainer.host
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
connectionFactory.afterPropertiesSet()
valkeyTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
}
@Test
fun `test different cache configurations`() {
logger.info { "Testing different cache configurations" }
// Configuration 1: High performance, short TTL
val performanceConfig = DefaultCacheConfiguration(
keyPrefix = "perf",
defaultTtl = 5.minutes,
localCacheMaxSize = 50000,
offlineModeEnabled = true,
synchronizationInterval = 30.seconds,
offlineEntryMaxAge = 1.hours,
compressionEnabled = false,
compressionThreshold = Int.MAX_VALUE
)
val performanceCache = ValkeyDistributedCache(valkeyTemplate, serializer, performanceConfig)
performanceCache.clear()
// Test performance config
performanceCache.set("perf-test", "performance-value")
assertEquals("performance-value", performanceCache.get<String>("perf-test"))
assertTrue(performanceCache.exists("perf-test"))
logger.info { "Performance configuration works correctly" }
// Configuration 2: Storage optimized, long TTL, compression enabled
val storageConfig = DefaultCacheConfiguration(
keyPrefix = "storage",
defaultTtl = 7.days,
localCacheMaxSize = 1000,
offlineModeEnabled = true,
synchronizationInterval = 5.minutes,
offlineEntryMaxAge = 24.hours,
compressionEnabled = true,
compressionThreshold = 100
)
val storageCache = ValkeyDistributedCache(valkeyTemplate, serializer, storageConfig)
storageCache.clear()
// Test storage config with large data (should be compressed)
val largeData = "Large data content: " + "X".repeat(1000)
storageCache.set("storage-test", largeData)
assertEquals(largeData, storageCache.get<String>("storage-test"))
logger.info { "Storage optimized configuration works correctly" }
// Configuration 3: Minimal configuration
val minimalConfig = DefaultCacheConfiguration(
keyPrefix = "minimal",
defaultTtl = null, // No TTL
localCacheMaxSize = null, // No limit
offlineModeEnabled = false,
synchronizationInterval = 1.minutes,
offlineEntryMaxAge = null,
compressionEnabled = false,
compressionThreshold = Int.MAX_VALUE
)
val minimalCache = ValkeyDistributedCache(valkeyTemplate, serializer, minimalConfig)
minimalCache.clear()
// Test minimal config
minimalCache.set("minimal-test", "minimal-value")
assertEquals("minimal-value", minimalCache.get<String>("minimal-test"))
logger.info { "Minimal configuration works correctly" }
// Clean up
performanceCache.clear()
storageCache.clear()
minimalCache.clear()
}
@Test
fun `test compression threshold behavior`() {
logger.info { "Testing compression threshold behavior" }
// Configuration with a low compression threshold
val compressionConfig = DefaultCacheConfiguration(
keyPrefix = "compression-test",
defaultTtl = 30.minutes,
compressionEnabled = true,
compressionThreshold = 50 // Very low threshold
)
val compressionCache = ValkeyDistributedCache(valkeyTemplate, serializer, compressionConfig)
compressionCache.clear()
// Test small data (below a threshold) - should not be compressed
val smallData = "Small"
compressionCache.set("small-data", smallData)
assertEquals(smallData, compressionCache.get<String>("small-data"))
// Test large data (above a threshold) - should be compressed
val largeData = "A".repeat(200) // Well above the threshold
compressionCache.set("large-data", largeData)
val retrievedLarge = compressionCache.get<String>("large-data")
assertEquals(largeData, retrievedLarge)
assertEquals(200, retrievedLarge?.length)
logger.info { "Small data length: ${smallData.length}" }
logger.info { "Large data length: ${largeData.length}" }
logger.info { "Compression threshold: ${compressionConfig.compressionThreshold}" }
// Test medium data (right at a threshold)
val mediumData = "B".repeat(50) // Exactly at the threshold
compressionCache.set("medium-data", mediumData)
assertEquals(mediumData, compressionCache.get<String>("medium-data"))
logger.info { "Compression threshold behavior validated" }
compressionCache.clear()
}
@Test
fun `test key prefix functionality`() {
logger.info { "Testing key prefix functionality" }
// Create caches with different prefixes
val config1 = DefaultCacheConfiguration(keyPrefix = "app1", defaultTtl = 30.minutes)
val config2 = DefaultCacheConfiguration(keyPrefix = "app2", defaultTtl = 30.minutes)
val config3 = DefaultCacheConfiguration(keyPrefix = "", defaultTtl = 30.minutes) // No prefix
val cache1 = ValkeyDistributedCache(valkeyTemplate, serializer, config1)
val cache2 = ValkeyDistributedCache(valkeyTemplate, serializer, config2)
val cache3 = ValkeyDistributedCache(valkeyTemplate, serializer, config3)
// Clear all caches
cache1.clear()
cache2.clear()
cache3.clear()
// Store the same key in all caches with different values
val testKey = "shared-key"
cache1.set(testKey, "value-from-app1")
cache2.set(testKey, "value-from-app2")
cache3.set(testKey, "value-from-no-prefix")
// Verify each cache returns its own value (thanks to prefixes)
assertEquals("value-from-app1", cache1.get<String>(testKey))
assertEquals("value-from-app2", cache2.get<String>(testKey))
assertEquals("value-from-no-prefix", cache3.get<String>(testKey))
// Verify isolation - keys don't exist in other caches
assertTrue(cache1.exists(testKey))
assertTrue(cache2.exists(testKey))
assertTrue(cache3.exists(testKey))
logger.info { "Key prefix isolation works correctly" }
// Test batch operations with prefixes
val batchData = mapOf(
"batch1" to "batch-value-1",
"batch2" to "batch-value-2"
)
cache1.multiSet(batchData)
cache2.multiSet(batchData.mapValues { "${it.value}-app2" })
val retrieved1 = cache1.multiGet<String>(batchData.keys)
val retrieved2 = cache2.multiGet<String>(batchData.keys)
assertEquals("batch-value-1", retrieved1["batch1"])
assertEquals("batch-value-1-app2", retrieved2["batch1"])
logger.info { "Batch operations with prefixes work correctly" }
// Clean up
cache1.clear()
cache2.clear()
cache3.clear()
}
@Test
fun `test TTL configuration variations`() {
logger.info { "Testing TTL configuration variations" }
// Configuration with no default TTL
val noTtlConfig = DefaultCacheConfiguration(
keyPrefix = "no-ttl-test",
defaultTtl = null
)
val noTtlCache = ValkeyDistributedCache(valkeyTemplate, serializer, noTtlConfig)
noTtlCache.clear()
// Store without TTL - should persist indefinitely
noTtlCache.set("persistent-key", "persistent-value")
assertEquals("persistent-value", noTtlCache.get<String>("persistent-key"))
// Store with explicit TTL - should override default (which is null)
noTtlCache.set("explicit-ttl-key", "explicit-ttl-value", 100.milliseconds)
assertEquals("explicit-ttl-value", noTtlCache.get<String>("explicit-ttl-key"))
Thread.sleep(200)
assertFalse(noTtlCache.exists("explicit-ttl-key"))
// Configuration with short default TTL
val shortTtlConfig = DefaultCacheConfiguration(
keyPrefix = "short-ttl-test",
defaultTtl = 100.milliseconds
)
val shortTtlCache = ValkeyDistributedCache(valkeyTemplate, serializer, shortTtlConfig)
shortTtlCache.clear()
// Store with default TTL
shortTtlCache.set("default-ttl-key", "default-ttl-value")
assertEquals("default-ttl-value", shortTtlCache.get<String>("default-ttl-key"))
Thread.sleep(200)
assertFalse(shortTtlCache.exists("default-ttl-key"))
// Store with explicit longer TTL - should override default
shortTtlCache.set("override-ttl-key", "override-ttl-value", 30.minutes)
assertEquals("override-ttl-value", shortTtlCache.get<String>("override-ttl-key"))
// Should still exist after short default TTL
assertTrue(shortTtlCache.exists("override-ttl-key"))
logger.info { "TTL configurations work correctly" }
noTtlCache.clear()
shortTtlCache.clear()
}
@Test
fun `test offline mode configuration`() {
logger.info { "Testing offline mode configuration" }
// Configuration with offline mode disabled
val noOfflineConfig = DefaultCacheConfiguration(
keyPrefix = "no-offline-test",
defaultTtl = 30.minutes,
offlineModeEnabled = false
)
val noOfflineCache = ValkeyDistributedCache(valkeyTemplate, serializer, noOfflineConfig)
noOfflineCache.clear()
// Normal operations should work
noOfflineCache.set("online-key", "online-value")
assertEquals("online-value", noOfflineCache.get<String>("online-key"))
// Configuration with offline mode enabled and specific settings
val offlineConfig = DefaultCacheConfiguration(
keyPrefix = "offline-test",
defaultTtl = 30.minutes,
offlineModeEnabled = true,
localCacheMaxSize = 1000,
synchronizationInterval = 10.seconds,
offlineEntryMaxAge = 2.hours
)
val offlineCache = ValkeyDistributedCache(valkeyTemplate, serializer, offlineConfig)
offlineCache.clear()
// Test offline capabilities
offlineCache.set("offline-key", "offline-value")
assertEquals("offline-value", offlineCache.get<String>("offline-key"))
logger.info { "Offline mode configuration works correctly" }
noOfflineCache.clear()
offlineCache.clear()
}
@Test
fun `test local cache size limits`() {
logger.info { "Testing local cache size limits" }
// Configuration with a very small local cache
val smallCacheConfig = DefaultCacheConfiguration(
keyPrefix = "small-cache-test",
defaultTtl = 30.minutes,
localCacheMaxSize = 3, // Very small
offlineModeEnabled = true
)
val smallCache = ValkeyDistributedCache(valkeyTemplate, serializer, smallCacheConfig)
smallCache.clear()
// Fill the local cache beyond its limit
repeat(10) { i ->
smallCache.set("key-$i", "value-$i")
}
// All values should still be retrievable (from Valkey if not in local cache)
repeat(10) { i ->
assertEquals("value-$i", smallCache.get<String>("key-$i"))
}
// Configuration with unlimited local cache
val unlimitedCacheConfig = DefaultCacheConfiguration(
keyPrefix = "unlimited-cache-test",
defaultTtl = 30.minutes,
localCacheMaxSize = null, // No limit
offlineModeEnabled = true
)
val unlimitedCache = ValkeyDistributedCache(valkeyTemplate, serializer, unlimitedCacheConfig)
unlimitedCache.clear()
// Fill with many entries
repeat(1000) { i ->
unlimitedCache.set("unlimited-key-$i", "unlimited-value-$i")
}
// All should be retrievable
repeat(1000) { i ->
assertEquals("unlimited-value-$i", unlimitedCache.get<String>("unlimited-key-$i"))
}
logger.info { "Local cache size limits work correctly" }
smallCache.clear()
unlimitedCache.clear()
}
}
@@ -0,0 +1,321 @@
package at.mocode.infrastructure.cache.valkey
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.minutes
/**
* Edge Cases and Error Handling Tests for ValkeyDistributedCache
*/
@Testcontainers
class ValkeyDistributedCacheEdgeCasesTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val valkeyContainer = GenericContainer<Nothing>(
DockerImageName.parse("valkey/valkey:9-alpine")
.asCompatibleSubstituteFor("valkey")
).apply {
withExposedPorts(6379)
}
}
private lateinit var valkeyTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
private lateinit var cache: ValkeyDistributedCache
@BeforeEach
fun setUp() {
val valkeyPort = valkeyContainer.getMappedPort(6379)
val valkeyHost = valkeyContainer.host
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
connectionFactory.afterPropertiesSet()
valkeyTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "edge-test",
defaultTtl = 30.minutes,
compressionEnabled = true,
compressionThreshold = 1024
)
cache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
cache.clear()
}
@Test
fun `test serialization with problematic objects`() {
logger.info { "Testing serialization with problematic objects" }
// Test 1: Object with circular references (causes StackOverflowError)
val circularObject = CircularReferenceClass()
circularObject.self = circularObject
// This should handle the serialization gracefully (either succeed or fail gracefully)
try {
cache.set("circular-reference", circularObject as Any)
logger.info { "Circular reference object was handled (possibly with Jackson's circular reference handling)" }
} catch (t: Throwable) {
logger.info { "Circular reference object caused expected serialization issue: ${t::class.simpleName}" }
assertTrue(
t is com.fasterxml.jackson.databind.JsonMappingException ||
t is StackOverflowError ||
t is RuntimeException,
"Expected serialization-related exception"
)
}
// Test 2: Very deep nesting that might cause issues
val deepObject = createDeeplyNestedObject(50)
try {
cache.set("deep-nested", deepObject as Any)
cache.get("deep-nested", DeeplyNestedObject::class.java)
logger.info { "Deep nested object serialized successfully" }
} catch (t: Throwable) {
logger.info { "Deep nested object caused expected issues: ${t::class.simpleName}" }
}
// Verify that the cache remains stable after problematic serialization attempts
cache.set("normal-object", "test-value")
assertEquals("test-value", cache.get<String>("normal-object"))
logger.info { "Serialization edge cases handled correctly" }
}
@Test
fun `test cache with extremely large values`() {
logger.info { "Testing extremely large values" }
// Create a very large string (10MB)
val largeValue = "X".repeat(10 * 1024 * 1024)
val key = "large-value"
// This should trigger compression
cache.set(key, largeValue)
// Verify we can retrieve it
val retrieved = cache.get<String>(key)
assertNotNull(retrieved)
assertEquals(largeValue.length, retrieved.length)
assertEquals(largeValue.substring(0, 1000), retrieved.substring(0, 1000))
logger.info { "Large value (${largeValue.length} chars) stored and retrieved successfully" }
// Test with multiple large values
val largeValues = (1..5).associateWith { "Y".repeat(2 * 1024 * 1024) }
cache.multiSet(largeValues.mapKeys { "large-multi-${it.key}" })
val retrievedLarge = cache.multiGet<String>(largeValues.keys.map { "large-multi-$it" })
assertEquals(5, retrievedLarge.size)
logger.info { "Multiple large values stored and retrieved successfully" }
}
@Test
fun `test cache with null and empty values`() {
logger.info { "Testing null and empty values" }
// Test empty string
cache.set("empty-string", "")
assertEquals("", cache.get<String>("empty-string"))
// Test string with only whitespace
cache.set("whitespace", " \n\t ")
assertEquals(" \n\t ", cache.get<String>("whitespace"))
// Test empty collections
val emptyList = emptyList<String>()
cache.set("empty-list", emptyList)
assertEquals(emptyList, cache.get<List<String>>("empty-list"))
val emptyMap = emptyMap<String, String>()
cache.set("empty-map", emptyMap)
assertEquals(emptyMap, cache.get<Map<String, String>>("empty-map"))
// Test object with null fields
val objectWithNulls = PersonWithNullable(name = "John", age = null, email = null)
cache.set("null-fields", objectWithNulls)
val retrieved = cache.get<PersonWithNullable>("null-fields")
assertNotNull(retrieved)
assertEquals("John", retrieved.name)
assertNull(retrieved.age)
assertNull(retrieved.email)
logger.info { "Null and empty values handled correctly" }
}
@Test
fun `test special characters and unicode in keys and values`() {
logger.info { "Testing special characters and unicode" }
// Test keys with special characters (encoded)
val specialKeys = listOf(
"key:with:colons",
"key with spaces",
"key-with-dashes",
"key_with_underscores",
"key.with.dots"
)
specialKeys.forEachIndexed { index, key ->
cache.set(key, "value-$index")
}
specialKeys.forEachIndexed { index, key ->
assertEquals("value-$index", cache.get<String>(key))
}
// Test values with Unicode characters
val unicodeValues = mapOf(
"emoji" to "🚀 Hello World! 🌟",
"german" to "Äöüß und Umlaute",
"chinese" to "你好世界",
"arabic" to "مرحبا بالعالم",
"russian" to "Привет мир",
"mixed" to "Mixed: 123 ABC äöü 🎉 العالم"
)
cache.multiSet(unicodeValues)
val retrievedUnicode = cache.multiGet<String>(unicodeValues.keys)
unicodeValues.forEach { (key, expectedValue) ->
assertEquals(expectedValue, retrievedUnicode[key])
}
logger.info { "Special characters and unicode handled correctly" }
}
@Test
fun `test cache with complex nested objects`() {
logger.info { "Testing complex nested objects" }
// Create a complex nested structure
val complexObject = ComplexNestedObject(
id = 1,
name = "Complex Object",
metadata = mapOf(
"tags" to listOf("tag1", "tag2", "tag3"),
"properties" to mapOf(
"nested" to mapOf(
"deep" to "value",
"numbers" to listOf(1, 2, 3, 4, 5)
)
)
),
children = listOf(
SimpleChild(1, "Child 1"),
SimpleChild(2, "Child 2")
)
)
// Store and retrieve
cache.set("complex-object", complexObject)
val retrieved = cache.get<ComplexNestedObject>("complex-object")
assertNotNull(retrieved)
assertEquals(complexObject.id, retrieved.id)
assertEquals(complexObject.name, retrieved.name)
assertEquals(complexObject.children.size, retrieved.children.size)
assertEquals(complexObject.children[0].name, retrieved.children[0].name)
// Check nested metadata
val retrievedTags = retrieved.metadata["tags"] as List<*>
assertEquals(3, retrievedTags.size)
assertTrue(retrievedTags.contains("tag1"))
logger.info { "Complex nested object serialized and deserialized correctly" }
}
@Test
fun `test cache behavior with malformed data`() {
logger.info { "Testing cache behavior with malformed data" }
// Test retrieving non-existent keys
assertNull(cache.get<String>("non-existent-key"))
// Test batch operations with mixed existing/non-existing keys
cache.set("existing-1", "value-1")
cache.set("existing-2", "value-2")
val mixedKeys = listOf("existing-1", "non-existing", "existing-2", "also-non-existing")
val result = cache.multiGet<String>(mixedKeys)
assertEquals(2, result.size)
assertEquals("value-1", result["existing-1"])
assertEquals("value-2", result["existing-2"])
assertNull(result["non-existing"])
assertNull(result["also-non-existing"])
logger.info { "Malformed data scenarios handled correctly" }
}
// Helper method to create deeply nested objects
private fun createDeeplyNestedObject(depth: Int): DeeplyNestedObject {
return if (depth <= 0) {
DeeplyNestedObject("leaf", null)
} else {
DeeplyNestedObject("node-$depth", createDeeplyNestedObject(depth - 1))
}
}
// Test data classes
private class NonSerializableClass {
// This class intentionally has no default constructor or proper serialization
private val threadLocal = ThreadLocal<String>()
fun someMethod() = "not serializable"
}
private class CircularReferenceClass {
var name: String = "circular"
var self: CircularReferenceClass? = null
}
data class DeeplyNestedObject(
val name: String,
val child: DeeplyNestedObject?
)
data class PersonWithNullable(
val name: String,
val age: Int?,
val email: String?
)
data class ComplexNestedObject(
val id: Int,
val name: String,
val metadata: Map<String, Any>,
val children: List<SimpleChild>
)
data class SimpleChild(
val id: Int,
val name: String
)
}
@@ -0,0 +1,490 @@
package at.mocode.infrastructure.cache.valkey
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
/**
* Monitoring and Integration Tests for ValkeyDistributedCache
*/
@OptIn(ExperimentalTime::class)
@Testcontainers
class ValkeyDistributedCacheIntegrationTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val valkeyContainer = GenericContainer<Nothing>(
DockerImageName.parse("valkey/valkey:9-alpine")
.asCompatibleSubstituteFor("valkey")
).apply {
withExposedPorts(6379)
}
}
private lateinit var valkeyTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
@BeforeEach
fun setUp() {
val valkeyPort = valkeyContainer.getMappedPort(6379)
val valkeyHost = valkeyContainer.host
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
connectionFactory.afterPropertiesSet()
valkeyTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "integration-test",
defaultTtl = 30.minutes
)
}
@Test
fun `test connection state listener functionality`() = runBlocking {
logger.info { "Testing connection state listener functionality" }
val cache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
cache.clear()
val stateChanges = mutableListOf<Pair<ConnectionState, kotlin.time.Instant>>()
val latch = CountDownLatch(1)
val listener = object : ConnectionStateListener {
override fun onConnectionStateChanged(newState: ConnectionState, timestamp: kotlin.time.Instant) {
logger.info { "Connection state changed to: $newState at $timestamp" }
stateChanges.add(newState to timestamp)
latch.countDown()
}
}
// Register listener
cache.registerConnectionListener(listener)
// Initial state should be connected
assertEquals(ConnectionState.CONNECTED, cache.getConnectionState())
logger.info { "Initial connection state: ${cache.getConnectionState()}" }
// Test listener registration/unregistration
val multipleListeners = mutableListOf<ConnectionStateListener>()
val callCounts = AtomicInteger(0)
repeat(3) { i ->
val testListener = object : ConnectionStateListener {
override fun onConnectionStateChanged(newState: ConnectionState, timestamp: kotlin.time.Instant) {
callCounts.incrementAndGet()
logger.info { "Listener $i received state change: $newState" }
}
}
multipleListeners.add(testListener)
cache.registerConnectionListener(testListener)
}
// Simulate state change (this might not trigger in our test environment,
// but we're testing the listener mechanism)
cache.checkConnection()
// Unregister listeners
multipleListeners.forEach { cache.unregisterConnectionListener(it) }
cache.unregisterConnectionListener(listener)
logger.info { "Connection state listener functionality tested" }
cache.clear()
}
@Test
fun `test different valkey configurations`() {
logger.info { "Testing different Valkey configurations" }
// Test with the current configuration
val standardCache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
standardCache.clear()
// Basic functionality test
standardCache.set("config-test-1", "standard-value")
assertEquals("standard-value", standardCache.get<String>("config-test-1"))
// Test with different Valkey configuration (same container, different settings)
val alternativeConfig = DefaultCacheConfiguration(
keyPrefix = "alt-config",
defaultTtl = 1.hours,
compressionEnabled = true,
compressionThreshold = 500
)
val alternativeCache = ValkeyDistributedCache(valkeyTemplate, serializer, alternativeConfig)
alternativeCache.clear()
// Test isolation between configurations
alternativeCache.set("config-test-1", "alternative-value")
// Both caches should maintain their own data
assertEquals("standard-value", standardCache.get<String>("config-test-1"))
assertEquals("alternative-value", alternativeCache.get<String>("config-test-1"))
// Test connection state tracking
assertEquals(ConnectionState.CONNECTED, standardCache.getConnectionState())
assertEquals(ConnectionState.CONNECTED, alternativeCache.getConnectionState())
logger.info { "Different Valkey configurations work correctly" }
standardCache.clear()
alternativeCache.clear()
}
@Test
fun `test cache warming scenarios`() {
logger.info { "Testing cache warming scenarios" }
val cache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
cache.clear()
// Scenario 1: Bulk warming with predefined data
val warmupData = (1..1000).associate { "warmup-key-$it" to "warmup-value-$it" }
logger.info { "Starting cache warming with ${warmupData.size} entries" }
val warmupTime = measureTime {
cache.multiSet(warmupData)
}
logger.info { "Cache warmup completed in $warmupTime" }
// Verify all data is accessible
val verificationTime = measureTime {
val retrieved = cache.multiGet<String>(warmupData.keys)
assertEquals(warmupData.size, retrieved.size)
// Spot-check some values
assertEquals("warmup-value-1", retrieved["warmup-key-1"])
assertEquals("warmup-value-500", retrieved["warmup-key-500"])
assertEquals("warmup-value-1000", retrieved["warmup-key-1000"])
}
logger.info { "Cache verification completed in $verificationTime" }
// Scenario 2: Gradual warming simulation
logger.info { "Testing gradual cache warming" }
val gradualWarmupCache = ValkeyDistributedCache(
valkeyTemplate, serializer,
DefaultCacheConfiguration(keyPrefix = "gradual-warmup", defaultTtl = 1.hours)
)
gradualWarmupCache.clear()
// Simulate application startup with gradual data loading
val batchSize = 100
val totalBatches = 10
repeat(totalBatches) { batchIndex ->
val batchData = (1..batchSize).associate {
"gradual-${batchIndex * batchSize + it}" to "gradual-value-${batchIndex * batchSize + it}"
}
gradualWarmupCache.multiSet(batchData)
// Simulate some delay between batches (like database queries)
Thread.sleep(10)
}
// Verify gradual warmup worked
val totalEntries = batchSize * totalBatches
val allKeys = (1..totalEntries).map { "gradual-$it" }
val retrievedGradual = gradualWarmupCache.multiGet<String>(allKeys)
assertEquals(totalEntries, retrievedGradual.size)
logger.info { "Gradual warmup successful: ${retrievedGradual.size} entries" }
// Scenario 3: Selective warming based on usage patterns
logger.info { "Testing selective cache warming" }
val selectiveCache = ValkeyDistributedCache(
valkeyTemplate, serializer,
DefaultCacheConfiguration(keyPrefix = "selective-warmup", defaultTtl = 2.hours)
)
selectiveCache.clear()
// Simulate frequently accessed data
val frequentData = listOf("user:123", "config:global", "menu:main")
val infrequentData = (1..100).map { "rare:data:$it" }
// Warm up frequent data first (priority warming)
frequentData.forEach { key ->
selectiveCache.set(key, "frequent-$key")
}
// Warm up infrequent data in the background
infrequentData.forEach { key ->
selectiveCache.set(key, "infrequent-$key")
}
// Verify selective warming
frequentData.forEach { key ->
assertEquals("frequent-$key", selectiveCache.get<String>(key))
}
logger.info { "Selective cache warming completed successfully" }
cache.clear()
gradualWarmupCache.clear()
selectiveCache.clear()
}
@Test
fun `test metrics and monitoring integration`() = runBlocking {
logger.info { "Testing metrics and monitoring integration" }
val monitoringCache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
monitoringCache.clear()
// Test connection state tracking over time
val connectionStateHistory = mutableListOf<ConnectionState>()
var lastStateChangeTime = monitoringCache.getLastStateChangeTime()
logger.info { "Initial connection state: ${monitoringCache.getConnectionState()}" }
logger.info { "Last state change time: $lastStateChangeTime" }
connectionStateHistory.add(monitoringCache.getConnectionState())
// Perform various operations and monitor the state
repeat(100) { i ->
monitoringCache.set("monitoring-key-$i", "monitoring-value-$i")
if (i % 20 == 0) {
val currentState = monitoringCache.getConnectionState()
val currentTime = monitoringCache.getLastStateChangeTime()
if (currentTime != lastStateChangeTime) {
logger.info { "State change detected at operation $i" }
connectionStateHistory.add(currentState)
lastStateChangeTime = currentTime
}
}
}
// Test dirty keys tracking for monitoring
logger.info { "Testing dirty keys monitoring" }
val initialDirtyKeys = monitoringCache.getDirtyKeys()
logger.info { "Initial dirty keys count: ${initialDirtyKeys.size}" }
// Add some data and verify dirty keys tracking
monitoringCache.set("dirty-test-1", "dirty-value-1")
monitoringCache.set("dirty-test-2", "dirty-value-2")
// In a normal connected state, dirty keys should be minimal
val finalDirtyKeys = monitoringCache.getDirtyKeys()
logger.info { "Final dirty keys count: ${finalDirtyKeys.size}" }
// Test batch operations monitoring
val batchData = (1..50).associate { "batch-monitoring-$it" to "batch-value-$it" }
val batchTime = measureTime {
monitoringCache.multiSet(batchData)
}
logger.info { "Batch operation took: $batchTime" }
val retrievalTime = measureTime {
val retrieved = monitoringCache.multiGet<String>(batchData.keys)
assertEquals(50, retrieved.size)
}
logger.info { "Batch retrieval took: $retrievalTime" }
logger.info { "Monitoring integration test completed" }
monitoringCache.clear()
}
@Test
fun `test cross-instance synchronization`() = runBlocking {
logger.info { "Testing cross-instance synchronization" }
// Create two cache instances (simulating different application instances)
val instance1 = ValkeyDistributedCache(
valkeyTemplate, serializer,
DefaultCacheConfiguration(keyPrefix = "sync-test", defaultTtl = 1.hours)
)
val instance2 = ValkeyDistributedCache(
valkeyTemplate, serializer,
DefaultCacheConfiguration(keyPrefix = "sync-test", defaultTtl = 1.hours)
)
instance1.clear()
instance2.clear()
// Instance 1 writes data
instance1.set("sync-key-1", "from-instance-1")
instance1.set("sync-key-2", "from-instance-1-v2")
// Small delay to ensure propagation
delay(100.milliseconds)
// Instance 2 should be able to read the data
assertEquals("from-instance-1", instance2.get<String>("sync-key-1"))
assertEquals("from-instance-1-v2", instance2.get<String>("sync-key-2"))
// Instance 2 modifies and adds data
instance2.set("sync-key-2", "modified-by-instance-2")
instance2.set("sync-key-3", "from-instance-2")
// Small delay to ensure propagation
delay(100.milliseconds)
// Instance 1 should see the changes
// Note: Due to local caching, we need to clear local cache or use a fresh get
// The current implementation may cache locally, so we test what we can reliably verify
val retrievedByInstance1 = instance1.get<String>("sync-key-3") // New key should work
assertEquals("from-instance-2", retrievedByInstance1)
// Test batch operations across instances
val batchData1 = mapOf(
"batch-sync-1" to "batch-from-instance-1",
"batch-sync-2" to "batch-from-instance-1-v2"
)
instance1.multiSet(batchData1)
val retrievedByInstance2 = instance2.multiGet<String>(batchData1.keys)
assertEquals(2, retrievedByInstance2.size)
assertEquals("batch-from-instance-1", retrievedByInstance2["batch-sync-1"])
logger.info { "Cross-instance synchronization works correctly" }
instance1.clear()
instance2.clear()
}
@Test
fun `test production-like scenarios`() = runBlocking {
logger.info { "Testing production-like scenarios" }
val prodCache = ValkeyDistributedCache(
valkeyTemplate, serializer,
DefaultCacheConfiguration(
keyPrefix = "prod-test",
defaultTtl = 30.minutes,
localCacheMaxSize = 10000,
compressionEnabled = true,
compressionThreshold = 1024
)
)
prodCache.clear()
// Scenario 1: User session caching
logger.info { "Testing user session caching" }
val userSessions = (1..1000).associate {
"user:session:$it" to UserSession(
userId = "user$it",
sessionId = "session$it",
lastActivity = System.currentTimeMillis(),
permissions = listOf("read", "write")
)
}
val sessionTime = measureTime {
prodCache.multiSet(userSessions.mapValues { it.value })
}
logger.info { "Stored ${userSessions.size} user sessions in $sessionTime" }
// Verify session retrieval
val retrievedSession = prodCache.get<UserSession>("user:session:500")
assertNotNull(retrievedSession)
assertEquals("user500", retrievedSession.userId)
// Scenario 2: Configuration caching
logger.info { "Testing configuration caching" }
val configData = mapOf(
"config:database:connection" to DatabaseConfig(
host = "localhost",
port = 5432,
database = "production",
maxConnections = 50
),
"config:feature:flags" to mapOf(
"new_ui" to true,
"experimental_feature" to false,
"maintenance_mode" to false
)
)
configData.forEach { (key, value) ->
prodCache.set(key, value, 1.hours) // Config cached for 1 hour
}
val dbConfig = prodCache.get<DatabaseConfig>("config:database:connection")
assertNotNull(dbConfig)
assertEquals("localhost", dbConfig.host)
// Scenario 3: API response caching
logger.info { "Testing API response caching" }
val apiResponses = (1..100).associate {
"api:response:endpoint$it" to ApiResponse(
status = 200,
data = "Response data for endpoint $it",
timestamp = System.currentTimeMillis(),
cacheHeaders = mapOf("Cache-Control" to "public, max-age=3600")
)
}
val apiTime = measureTime {
apiResponses.forEach { (key, value) ->
prodCache.set(key, value, 5.minutes) // API responses cached for 5 minutes
}
}
logger.info { "Cached ${apiResponses.size} API responses in $apiTime" }
// Verify API response retrieval
val apiResponse = prodCache.get<ApiResponse>("api:response:endpoint50")
assertNotNull(apiResponse)
assertEquals(200, apiResponse.status)
logger.info { "Production-like scenarios completed successfully" }
prodCache.clear()
}
// Test data classes for production scenarios
data class UserSession(
val userId: String,
val sessionId: String,
val lastActivity: Long,
val permissions: List<String>
)
data class DatabaseConfig(
val host: String,
val port: Int,
val database: String,
val maxConnections: Int
)
data class ApiResponse(
val status: Int,
val data: String,
val timestamp: Long,
val cacheHeaders: Map<String, String>
)
}
@@ -0,0 +1,212 @@
package at.mocode.infrastructure.cache.valkey
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.minutes
import kotlin.time.measureTime
/**
* Performance and Load Tests for ValkeyDistributedCache
*/
@Testcontainers
class ValkeyDistributedCachePerformanceTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val valkeyContainer = GenericContainer<Nothing>(
DockerImageName.parse("valkey/valkey:9-alpine")
.asCompatibleSubstituteFor("valkey")
).apply {
withExposedPorts(6379)
}
}
private lateinit var valkeyTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
private lateinit var cache: ValkeyDistributedCache
@BeforeEach
fun setUp() {
val valkeyPort = valkeyContainer.getMappedPort(6379)
val valkeyHost = valkeyContainer.host
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
connectionFactory.afterPropertiesSet()
valkeyTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "perf-test",
defaultTtl = 30.minutes
)
cache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
cache.clear()
}
@Test
fun `test cache performance with high concurrent access`() = runTest {
logger.info { "Starting concurrent access test" }
val numberOfCoroutines = 100
val operationsPerCoroutine = 50
val successCounter = AtomicInteger(0)
val errorCounter = AtomicInteger(0)
val time = measureTime {
val jobs = (1..numberOfCoroutines).map { coroutineId ->
launch {
repeat(operationsPerCoroutine) { operationId ->
try {
val key = "concurrent-$coroutineId-$operationId"
val value = "value-$coroutineId-$operationId"
// Set operation
cache.set(key, value)
// Get operation
val retrieved = cache.get<String>(key)
if (retrieved == value) {
successCounter.incrementAndGet()
} else {
errorCounter.incrementAndGet()
logger.warn { "Mismatch: expected $value, got $retrieved" }
}
} catch (e: Exception) {
errorCounter.incrementAndGet()
logger.warn { "Error in operation: ${e.message}" }
}
}
}
}
jobs.joinAll()
}
val totalOperations = numberOfCoroutines * operationsPerCoroutine
val successRate = successCounter.get().toDouble() / totalOperations
val operationsPerSecond =
if (time.inWholeSeconds > 0) totalOperations / time.inWholeSeconds else totalOperations * 1000 / maxOf(
1,
time.inWholeMilliseconds
)
logger.info { "Performance test completed" }
logger.info { "Total operations: $totalOperations" }
logger.info { "Successful operations: ${successCounter.get()}" }
logger.info { "Failed operations: ${errorCounter.get()}" }
logger.info { "Success rate: ${successRate * 100}%" }
logger.info { "Total time: $time" }
logger.info { "Operations per second: $operationsPerSecond" }
assertTrue(successRate > 0.95, "Success rate should be > 95%, but was ${successRate * 100}%")
}
@Test
fun `test cache behavior under memory pressure`() {
logger.info { "Starting memory pressure test" }
// Create a cache with a limited local cache size
val limitedConfig = DefaultCacheConfiguration(
keyPrefix = "memory-test",
localCacheMaxSize = 100, // Very small local cache
defaultTtl = 30.minutes
)
val limitedCache = ValkeyDistributedCache(valkeyTemplate, serializer, limitedConfig)
// Fill the cache with more entries than a local cache can hold
val numberOfEntries = 500
val largeValue = "A".repeat(1000) // 1KB per entry
val time = measureTime {
repeat(numberOfEntries) { i ->
val key = "memory-pressure-$i"
limitedCache.set(key, largeValue)
}
}
logger.info { "Inserted $numberOfEntries entries in $time" }
// Verify that entries are still retrievable (should come from Valkey)
var retrievedCount = 0
repeat(numberOfEntries) { i ->
val key = "memory-pressure-$i"
val retrieved = limitedCache.get<String>(key)
if (retrieved == largeValue) {
retrievedCount++
}
}
logger.info { "Successfully retrieved $retrievedCount out of $numberOfEntries entries" }
assertTrue(
retrievedCount > numberOfEntries * 0.9,
"Should retrieve > 90% of entries, but retrieved only ${retrievedCount * 100.0 / numberOfEntries}%"
)
limitedCache.clear()
}
@Test
fun `test bulk operations performance`() {
logger.info { "Starting bulk operations performance test" }
val batchSize = 1000
val entries = (1..batchSize).associate {
"bulk-$it" to "bulk-value-$it"
}
// Test multiSet performance
val setTime = measureTime {
cache.multiSet(entries)
}
// Test multiGet performance
val getTime = measureTime {
val retrieved = cache.multiGet<String>(entries.keys)
assertEquals(batchSize, retrieved.size)
}
val setRatePerSec =
if (setTime.inWholeSeconds > 0) batchSize / setTime.inWholeSeconds else batchSize * 1000 / maxOf(
1,
setTime.inWholeMilliseconds
)
val getRatePerSec =
if (getTime.inWholeSeconds > 0) batchSize / getTime.inWholeSeconds else batchSize * 1000 / maxOf(
1,
getTime.inWholeMilliseconds
)
logger.info { "Bulk operations performance completed" }
logger.info { "MultiSet $batchSize entries: $setTime" }
logger.info { "MultiGet $batchSize entries: $getTime" }
logger.info { "Set rate: $setRatePerSec entries/sec" }
logger.info { "Get rate: $getRatePerSec entries/sec" }
assertTrue(setTime.inWholeSeconds < 10, "MultiSet should complete within 10 seconds")
assertTrue(getTime.inWholeSeconds < 10, "MultiGet should complete within 10 seconds")
}
}
@@ -0,0 +1,351 @@
package at.mocode.infrastructure.cache.valkey
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.RedisConnectionFailureException
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.core.ValueOperations
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import java.time.Duration as JavaDuration
/**
* Timeout and Resilience Tests for ValkeyDistributedCache
*/
@OptIn(ExperimentalTime::class)
@Testcontainers
class ValkeyDistributedCacheResilienceTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val valkeyContainer = GenericContainer<Nothing>(
DockerImageName.parse("valkey/valkey:9-alpine")
.asCompatibleSubstituteFor("valkey")
).apply {
withExposedPorts(6379)
}
}
private lateinit var valkeyTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
@BeforeEach
fun setUp() {
val valkeyPort = valkeyContainer.getMappedPort(6379)
val valkeyHost = valkeyContainer.host
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
connectionFactory.afterPropertiesSet()
valkeyTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "resilience-test",
defaultTtl = 30.minutes,
offlineModeEnabled = true
)
}
@Test
fun `test connection timeout scenarios`() = runBlocking {
logger.info { "Testing connection timeout scenarios" }
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
every { mockTemplate.opsForValue() } returns mockValueOps
// Simulate slow Valkey responses
every { mockValueOps.get(any()) } answers {
Thread.sleep(5000) // 5-second delay
"slow-response".toByteArray()
}
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } answers {
Thread.sleep(3000) // 3-second delay
}
val slowCache = ValkeyDistributedCache(mockTemplate, serializer, config)
// Test get operation with timeout
val startTime = System.currentTimeMillis()
val result = slowCache.get<String>("slow-key")
val endTime = System.currentTimeMillis()
logger.info { "Get operation took ${endTime - startTime}ms" }
// The operation should either succeed or fail gracefully
// Test set operation with timeout
val setStartTime = System.currentTimeMillis()
slowCache.set("slow-set-key", "value")
val setEndTime = System.currentTimeMillis()
logger.info { "Set operation took ${setEndTime - setStartTime}ms" }
// Verify that operations don't hang indefinitely
assertTrue((endTime - startTime) < 10000, "Get operation should not take more than 10 seconds")
assertTrue((setEndTime - setStartTime) < 10000, "Set operation should not take more than 10 seconds")
}
@Test
fun `test partial valkey failures`() {
logger.info { "Testing partial Valkey failures" }
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
every { mockTemplate.opsForValue() } returns mockValueOps
every { mockTemplate.hasKey(any()) } returns true
val failureCounter = AtomicInteger(0)
// Simulate intermittent connection failures (fail every 3rd operation)
every { mockValueOps.get(any()) } answers {
if (failureCounter.incrementAndGet() % 3 == 0) {
throw RedisConnectionFailureException("Intermittent failure")
}
serializer.serializeEntry(CacheEntry("test", "value"))
}
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } answers {
if (failureCounter.incrementAndGet() % 3 == 0) {
throw RedisConnectionFailureException("Intermittent failure")
}
}
val unreliableCache = ValkeyDistributedCache(mockTemplate, serializer, config)
// Test multiple operations with intermittent failures
var successCount = 0
var failureCount = 0
repeat(20) { i ->
try {
unreliableCache.set("intermittent-$i", "value-$i")
val retrieved = unreliableCache.get<String>("intermittent-$i")
if (retrieved != null) {
successCount++
} else {
failureCount++
}
} catch (e: Exception) {
failureCount++
logger.info { "Operation failed as expected: ${e.message}" }
}
}
logger.info { "Partial failure test results:" }
logger.info { "Successful operations: $successCount" }
logger.info { "Failed operations: $failureCount" }
logger.info { "Total operations: 20" }
// Due to offline mode, operations might succeed locally even when Valkey fails,
// So we verify the cache is resilient and continues working
assertTrue(successCount >= 0, "Should handle operations gracefully")
assertEquals(20, successCount + failureCount, "Should process all operations")
// Verify that the cache state is properly managed despite intermittent failures
assertEquals(ConnectionState.DISCONNECTED, unreliableCache.getConnectionState())
// Verify that dirty keys are tracked for failed operations
val dirtyKeys = unreliableCache.getDirtyKeys()
assertTrue(dirtyKeys.isNotEmpty(), "Should have dirty keys from failed operations")
logger.info { "Dirty keys count: ${dirtyKeys.size}" }
}
@Test
fun `test network partitioning simulation`() {
logger.info { "Testing network partitioning simulation" }
val cache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
cache.clear()
// Phase 1: Normal operations (network is fine)
logger.info { "Phase 1: Normal operations" }
cache.set("partition-test-1", "value-1")
cache.set("partition-test-2", "value-2")
assertEquals("value-1", cache.get<String>("partition-test-1"))
assertEquals("value-2", cache.get<String>("partition-test-2"))
assertEquals(ConnectionState.CONNECTED, cache.getConnectionState())
// Phase 2: Simulate network partition by creating a new cache with a broken connection
logger.info { "Phase 2: Simulating network partition" }
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
every { mockTemplate.opsForValue() } returns mockValueOps
every { mockValueOps.get(any()) } throws RedisConnectionFailureException("Network partition")
every {
mockValueOps.set(
any<String>(),
any<ByteArray>(),
any<JavaDuration>()
)
} throws RedisConnectionFailureException("Network partition")
every { mockTemplate.delete(any<String>()) } throws RedisConnectionFailureException("Network partition")
every { mockTemplate.hasKey(any()) } throws RedisConnectionFailureException("Network partition")
val partitionedCache = ValkeyDistributedCache(mockTemplate, serializer, config)
// Operations during partition should work locally
partitionedCache.set("partition-offline-1", "offline-value-1")
partitionedCache.set("partition-offline-2", "offline-value-2")
// Should be able to retrieve from a local cache
assertEquals("offline-value-1", partitionedCache.get<String>("partition-offline-1"))
assertEquals("offline-value-2", partitionedCache.get<String>("partition-offline-2"))
assertEquals(ConnectionState.DISCONNECTED, partitionedCache.getConnectionState())
// Should track dirty keys
val dirtyKeys = partitionedCache.getDirtyKeys()
assertTrue(dirtyKeys.contains("partition-offline-1"))
assertTrue(dirtyKeys.contains("partition-offline-2"))
logger.info { "Network partition handled correctly - operations work offline" }
}
@Test
fun `test reconnection and synchronization after network issues`() {
logger.info { "Testing reconnection and synchronization" }
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
every { mockTemplate.opsForValue() } returns mockValueOps
val reconnectingCache = ValkeyDistributedCache(mockTemplate, serializer, config)
// Phase 1: Simulate disconnection
every { mockValueOps.get(any()) } throws RedisConnectionFailureException("Disconnected")
every {
mockValueOps.set(
any<String>(),
any<ByteArray>(),
any<JavaDuration>()
)
} throws RedisConnectionFailureException("Disconnected")
every { mockTemplate.hasKey(any()) } throws RedisConnectionFailureException("Disconnected")
reconnectingCache.set("reconnect-test-1", "value-1")
reconnectingCache.set("reconnect-test-2", "value-2")
assertEquals(ConnectionState.DISCONNECTED, reconnectingCache.getConnectionState())
assertTrue(reconnectingCache.getDirtyKeys().size >= 2)
// Phase 2: Simulate reconnection
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } returns Unit
every { mockTemplate.hasKey(any()) } returns true
every { mockTemplate.delete(any<String>()) } returns true
// Trigger connection check (this would normally be done by a scheduled task)
reconnectingCache.checkConnection()
// After a successful connection check, dirty keys should be synchronized
// Note: In a real scenario, this would be handled by the synchronization mechanism
logger.info { "Reconnection simulation completed" }
}
@Test
fun `test connection state listener notifications`() = runBlocking {
logger.info { "Testing connection state listener notifications" }
val stateChanges = mutableListOf<ConnectionState>()
val listener = object : ConnectionStateListener {
override fun onConnectionStateChanged(newState: ConnectionState, timestamp: kotlin.time.Instant) {
logger.info { "Connection state changed to: $newState at $timestamp" }
stateChanges.add(newState)
}
}
val cache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
cache.registerConnectionListener(listener)
// Initially should be connected
assertEquals(ConnectionState.CONNECTED, cache.getConnectionState())
logger.info { "Initial connection state: ${cache.getConnectionState()}" }
// Test listener registration/unregistration mechanism
val testListener = object : ConnectionStateListener {
override fun onConnectionStateChanged(newState: ConnectionState, timestamp: kotlin.time.Instant) {
logger.info { "Test listener received state change: $newState" }
}
}
// Register and unregister listeners (testing the mechanism itself)
cache.registerConnectionListener(testListener)
cache.unregisterConnectionListener(testListener)
cache.unregisterConnectionListener(listener)
logger.info { "Connection state listener registration/unregistration mechanism tested" }
// Test that connection state is properly tracked
assertTrue(cache.isConnected(), "Cache should be connected to Valkey")
logger.info { "Connection state listener functionality verified" }
}
@Test
fun `test cache operations during valkey restart simulation`() = runBlocking {
logger.info { "Testing cache operations during Valkey restart simulation" }
val cache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
cache.clear()
// Store some initial data
cache.set("restart-test-1", "initial-value-1")
cache.set("restart-test-2", "initial-value-2")
assertEquals("initial-value-1", cache.get<String>("restart-test-1"))
// Simulate Valkey restart by creating a new cache instance
// (In a real scenario, this would be the same instance, but Valkey would be restarted)
// During "restart" (brief unavailability), operations should work locally
val duringRestartCache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
// These should work even if Valkey is temporarily unavailable
duringRestartCache.set("during-restart-1", "temp-value-1")
assertEquals("temp-value-1", duringRestartCache.get<String>("during-restart-1"))
// After "restart", data should be synchronized
delay(1.seconds) // Brief delay to simulate restart completion
val afterRestartCache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
// Should be able to access both old and new data
// Note: In a real Valkey restart, persisted data would still be there
afterRestartCache.set("after-restart-1", "post-restart-value-1")
assertEquals("post-restart-value-1", afterRestartCache.get<String>("after-restart-1"))
logger.info { "Valkey restart simulation completed successfully" }
}
}
@@ -0,0 +1,288 @@
package at.mocode.infrastructure.cache.valkey
import at.mocode.infrastructure.cache.api.*
import io.github.oshai.kotlinlogging.KotlinLogging
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.redis.RedisConnectionFailureException
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.core.ValueOperations
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.testcontainers.containers.GenericContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import kotlin.test.*
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import java.time.Duration as JavaDuration // Alias für Eindeutigkeit
@Testcontainers
class ValkeyDistributedCacheTest {
companion object {
private val logger = KotlinLogging.logger {}
@Container
val valkeyContainer = GenericContainer<Nothing>(
DockerImageName.parse("valkey/valkey:9-alpine")
.asCompatibleSubstituteFor("valkey")
).apply {
withExposedPorts(6379)
}
}
private lateinit var valkeyTemplate: RedisTemplate<String, ByteArray>
private lateinit var serializer: CacheSerializer
private lateinit var config: CacheConfiguration
private lateinit var cache: ValkeyDistributedCache
@BeforeEach
fun setUp() {
val valkeyPort = valkeyContainer.getMappedPort(6379)
val valkeyHost = valkeyContainer.host
val valkeyConfig = RedisStandaloneConfiguration(valkeyHost, valkeyPort)
val connectionFactory = LettuceConnectionFactory(valkeyConfig)
connectionFactory.afterPropertiesSet()
valkeyTemplate = RedisTemplate<String, ByteArray>().apply {
setConnectionFactory(connectionFactory)
keySerializer = StringRedisSerializer()
afterPropertiesSet()
}
serializer = JacksonCacheSerializer()
config = DefaultCacheConfiguration(
keyPrefix = "test",
offlineModeEnabled = true,
defaultTtl = 30.minutes
)
cache = ValkeyDistributedCache(valkeyTemplate, serializer, config)
cache.clear()
}
@AfterEach
fun tearDown() {
cache.clear()
}
@Test
fun `get should return value with new reified extension function`() {
cache.set("key1", "value1")
val value = cache.get<String>("key1")
assertEquals("value1", value)
}
@Test
fun `test basic cache operations`() {
cache.set("key1", "value1")
val value = cache.get("key1", String::class.java)
assertEquals("value1", value)
assertTrue(cache.exists("key1"))
cache.delete("key1")
assertFalse(cache.exists("key1"))
assertNull(cache.get("key1", String::class.java))
}
@Test
fun `test cache with TTL`() {
cache.set("key2", "value2", 100.milliseconds)
assertTrue(cache.exists("key2"))
Thread.sleep(200)
assertFalse(cache.exists("key2"))
}
@Test
fun `test batch operations`() {
// Set multiple values
val entries = mapOf(
"batch1" to "value1",
"batch2" to "value2",
"batch3" to "value3"
)
cache.multiSet(entries)
// Get multiple values
val values = cache.multiGet(listOf("batch1", "batch2", "batch3"), String::class.java)
assertEquals(3, values.size)
assertEquals("value1", values["batch1"])
assertEquals("value2", values["batch2"])
assertEquals("value3", values["batch3"])
// Delete multiple values
cache.multiDelete(listOf("batch1", "batch3"))
// Verify they're gone
val remainingValues = cache.multiGet(listOf("batch1", "batch2", "batch3"), String::class.java)
assertEquals(1, remainingValues.size)
assertNull(remainingValues["batch1"])
assertEquals("value2", remainingValues["batch2"])
assertNull(remainingValues["batch3"])
}
@Test
fun `should handle offline mode and synchronize correctly`() {
// Arrange
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>(relaxed = true)
val mockValueOps = mockk<ValueOperations<String, ByteArray>>(relaxed = true)
every { mockTemplate.opsForValue() } returns mockValueOps
val offlineCache = ValkeyDistributedCache(mockTemplate, serializer, config)
// 1. Online-Phase
// Mocking set with any JavaDuration to avoid NoSuchMethodError if signature mismatch
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } returns Unit
// Also mock the version without duration just in case
every { mockValueOps.set(any<String>(), any<ByteArray>()) } returns Unit
offlineCache.set("key1", "online-value")
// Verify call - be lenient with duration matching
verify(atLeast = 1) {
mockValueOps.set(eq("test:key1"), any<ByteArray>(), any<JavaDuration>())
}
// 2. Offline-Phase simulieren
every {
mockValueOps.set(
any<String>(),
any<ByteArray>(),
any<JavaDuration>()
)
} throws RedisConnectionFailureException("Valkey is down")
every { mockValueOps.set(any<String>(), any<ByteArray>()) } throws RedisConnectionFailureException("Valkey is down")
every { mockTemplate.delete(any<String>()) } throws RedisConnectionFailureException("Valkey is down")
offlineCache.set("key2", "offline-value")
offlineCache.delete("key1")
assertEquals("offline-value", offlineCache.get<String>("key2"))
assertTrue(offlineCache.getDirtyKeys().contains("key2"))
assertTrue(offlineCache.getDirtyKeys().contains("key1"))
// 3. Wiederverbindungs-Phase
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } returns Unit
every { mockValueOps.set(any<String>(), any<ByteArray>()) } returns Unit
every { mockTemplate.delete(any<String>()) } returns true
every { mockTemplate.hasKey("connection-test") } returns true
offlineCache.checkConnection()
// Verify sync happened
verify(atLeast = 1) { mockValueOps.set(eq("test:key1"), any<ByteArray>(), any<JavaDuration>()) }
verify(atLeast = 1) { mockTemplate.delete(eq("test:key1")) }
assertTrue(offlineCache.getDirtyKeys().isEmpty(), "Dirty keys should be empty after sync")
}
@Test
fun `test multiSet with TTL`() {
val entries = mapOf("batchTtl1" to "value1", "batchTtl2" to "value2")
cache.multiSet(entries, 100.milliseconds)
assertTrue(cache.exists("batchTtl1"))
Thread.sleep(200)
assertFalse(cache.exists("batchTtl1"))
}
@Test
fun `test complex objects`() {
// Create a complex object
val person = Person("John Doe", 30, listOf("Reading", "Hiking"))
// Store it in the cache
cache.set("person1", person)
// Retrieve it
val retrievedPerson = cache.get("person1", Person::class.java)
// Verify it's the same
assertNotNull(retrievedPerson)
assertEquals("John Doe", retrievedPerson.name)
assertEquals(30, retrievedPerson.age)
assertEquals(2, retrievedPerson.hobbies.size)
assertTrue(retrievedPerson.hobbies.contains("Reading"))
assertTrue(retrievedPerson.hobbies.contains("Hiking"))
}
@Test
fun `test clear method`() {
// Set multiple values
cache.set("clear1", "value1")
cache.set("clear2", "value2")
// Verify they exist
assertTrue(cache.exists("clear1"))
assertTrue(cache.exists("clear2"))
// Clear the cache
cache.clear()
// Verify they're gone
assertFalse(cache.exists("clear1"))
assertFalse(cache.exists("clear2"))
}
@Test
fun `test markDirty method`() {
// Set a value
cache.set("dirty1", "value1")
// Mark it as dirty
cache.markDirty("dirty1")
// Verify it's in the dirty keys
assertTrue(cache.getDirtyKeys().contains("dirty1"))
}
@Test
fun `test handling valkey connection failures`() {
// Create a mock ValkeyTemplate and ValueOperations
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>()
val mockValueOps = mockk<ValueOperations<String, ByteArray>>()
// Configure the mock to throw connection failure
every { mockTemplate.opsForValue() } returns mockValueOps
every { mockValueOps.get(any()) } throws RedisConnectionFailureException("Test connection failure")
every { mockTemplate.hasKey(any()) } throws RedisConnectionFailureException("Test connection failure")
// Create a cache with the mock
val mockCache = ValkeyDistributedCache(mockTemplate, serializer, config)
// Try to get a value
val value = mockCache.get("failure1", String::class.java)
// Verify it returns null
assertNull(value)
// Verify the connection state is DISCONNECTED
assertEquals(ConnectionState.DISCONNECTED, mockCache.getConnectionState())
}
@Test
fun `test default TTL`() {
// Set a value without specifying TTL
cache.set("defaultTtl", "value")
// Verify it exists
assertTrue(cache.exists("defaultTtl"))
// The default TTL is 30 minutes, so it should still exist
assertEquals("value", cache.get("defaultTtl", String::class.java))
}
// Test data class
data class Person(
val name: String,
val age: Int,
val hobbies: List<String>
)
}
@@ -11,25 +11,25 @@
<logger name="at.mocode.infrastructure.cache" level="DEBUG" />
<!-- Performance Test Logger -->
<logger name="RedisDistributedCachePerformanceTest" level="INFO" />
<logger name="ValkeyDistributedCachePerformanceTest" level="INFO" />
<!-- Edge Cases Test Logger -->
<logger name="RedisDistributedCacheEdgeCasesTest" level="INFO" />
<logger name="ValkeyDistributedCacheEdgeCasesTest" level="INFO" />
<!-- Resilience Test Logger -->
<logger name="RedisDistributedCacheResilienceTest" level="INFO" />
<logger name="ValkeyDistributedCacheResilienceTest" level="INFO" />
<!-- Configuration Test Logger -->
<logger name="RedisDistributedCacheConfigurationTest" level="INFO" />
<logger name="ValkeyDistributedCacheConfigurationTest" level="INFO" />
<!-- Integration Test Logger -->
<logger name="RedisDistributedCacheIntegrationTest" level="INFO" />
<logger name="ValkeyDistributedCacheIntegrationTest" level="INFO" />
<!-- Testcontainers Logger (reduziert Verbosity) -->
<logger name="org.testcontainers" level="WARN" />
<logger name="com.github.dockerjava" level="WARN" />
<!-- Redis/Lettuce Logger (reduziert Verbosity) -->
<!-- Valkey/Lettuce Logger (reduziert Verbosity) -->
<logger name="io.lettuce" level="WARN" />
<logger name="org.springframework.data.redis" level="WARN" />
@@ -29,7 +29,7 @@ dependencies {
// === Redis & Spring Dependencies ===
// OPTIMIERUNG: Wiederverwendung des `redis-cache`-Bundles, da es die
// gleichen Technologien (Spring Data Redis, Lettuce, Jackson) verwendet
implementation(libs.bundles.redis.cache)
implementation(libs.bundles.valkey.cache)
// Stellt Jakarta Annotations bereit (z. B. @PostConstruct), die von Spring verwendet werden
implementation(libs.jakarta.annotation.api)
// Für Kotlin-spezifische Coroutines-Integration mit Spring
@@ -44,7 +44,7 @@ dependencies {
testImplementation(libs.reactor.test)
// Für Integration Tests mit beiden Redis-Modulen
testImplementation(projects.backend.infrastructure.cache.cacheApi)
testImplementation(projects.backend.infrastructure.cache.redisCache)
testImplementation(projects.backend.infrastructure.cache.valkeyCache)
}
// === Task Configuration ===
@@ -4,9 +4,9 @@ import at.mocode.core.domain.event.DomainEvent
import at.mocode.core.domain.model.*
import at.mocode.infrastructure.cache.api.CacheConfiguration
import at.mocode.infrastructure.cache.api.DistributedCache
import at.mocode.infrastructure.cache.redis.JacksonCacheSerializer
import at.mocode.infrastructure.cache.redis.RedisConfiguration
import at.mocode.infrastructure.cache.redis.RedisDistributedCache
import at.mocode.infrastructure.cache.valkey.JacksonCacheSerializer
import at.mocode.infrastructure.cache.valkey.ValkeyConfiguration
import at.mocode.infrastructure.cache.valkey.ValkeyDistributedCache
import at.mocode.infrastructure.eventstore.api.EventStore
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterAll
@@ -90,17 +90,17 @@ class RedisCacheAndEventStoreIntegrationTest {
@Configuration
@Import(
RedisConfiguration::class,
ValkeyConfiguration::class,
RedisEventStoreConfiguration::class
)
class TestConfig {
@Bean
fun distributedCache(
@Qualifier("redisTemplate") redisTemplate: RedisTemplate<String, ByteArray>,
cacheConfiguration: CacheConfiguration
@Qualifier("valkeyTemplate") redisTemplate: RedisTemplate<String, ByteArray>,
cacheConfiguration: CacheConfiguration
): DistributedCache {
return RedisDistributedCache(
redisTemplate = redisTemplate,
return ValkeyDistributedCache(
valkeyTemplate = redisTemplate,
serializer = JacksonCacheSerializer(),
config = cacheConfiguration
)
@@ -115,7 +115,7 @@ class RedisCacheAndEventStoreIntegrationTest {
// Verify separate ConnectionFactories
@Autowired
@Qualifier("redisConnectionFactory")
@Qualifier("valkeyConnectionFactory")
private lateinit var cacheConnectionFactory: RedisConnectionFactory
@Autowired
@@ -1,6 +1,6 @@
# Redis Production Configuration
# Valkey Production Configuration
# =============================================================================
# This configuration file contains production-ready settings for Redis
# This configuration file contains production-ready settings for Valkey
# with security, performance, and reliability optimizations.
# =============================================================================
@@ -78,10 +78,10 @@ rename-command DEBUG ""
# TLS Configuration (uncomment and configure for TLS)
# port 0
# tls-port 6380
# tls-cert-file /tls/redis.crt
# tls-key-file /tls/redis.key
# tls-cert-file /tls/valkey.crt
# tls-key-file /tls/valkey.key
# tls-ca-cert-file /tls/ca.crt
# tls-dh-params-file /tls/redis.dh
# tls-dh-params-file /tls/valkey.dh
# tls-protocols "TLSv1.2 TLSv1.3"
# tls-ciphers "ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS"
# tls-ciphersuites "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256"
@@ -116,7 +116,7 @@ client-query-buffer-limit 1gb
# Protocol Buffer
proto-max-bulk-len 512mb
# Replication (for Redis cluster/replica setup)
# Replication (for Valkey cluster/replica setup)
# replica-serve-stale-data yes
# replica-read-only yes
# repl-diskless-sync no
@@ -144,6 +144,6 @@ rdb-save-incremental-fsync yes
# Jemalloc Configuration
jemalloc-bg-thread yes
# Threading (Redis 6.0+)
# io-threads 4
# io-threads-do-reads yes
# Threading (Valkey 6.0+)
io-threads 4
io-threads-do-reads yes
+20 -12
View File
@@ -44,29 +44,37 @@ services:
- "postgres"
# --- CACHE: Valkey (formerly Redis) ---
redis:
valkey:
# Valkey 9.0 (User Request)
image: "${VALKEY_IMAGE:-valkey/valkey:9.0}"
container_name: "${PROJECT_NAME:-meldestelle}-redis"
restart: no
image: "${VALKEY_IMAGE:-valkey/valkey:9-alpine}"
container_name: "${PROJECT_NAME:-meldestelle}-valkey"
restart: unless-stopped
ports:
- "${REDIS_PORT:-6379:6379}"
- "${VALKEY_PORT:-6379:6379}"
volumes:
- "redis-data:/data"
# Wir nutzen weiterhin die redis.conf, da Valkey kompatibel ist
- "./config/docker/redis/redis.conf:/etc/valkey/valkey.conf:Z"
- "valkey-data:/data"
# Wir nutzen weiterhin die valkey.conf, da Valkey kompatibel ist
- "./config/docker/valkey/valkey.conf:/etc/valkey/valkey.conf:Z"
profiles: [ "infra", "all" ]
# Anpassung der Binaries auf valkey-server und valkey-cli
command: [ "sh", "-lc", "exec valkey-server /etc/valkey/valkey.conf --protected-mode no ${REDIS_PASSWORD:+--requirepass $REDIS_PASSWORD}" ]
# command: [ "sh", "-lc", "exec valkey-server /etc/valkey/valkey.conf --protected-mode no ${VALKEY_PASSWORD:+--requirepass $VALKEY_PASSWORD}" ]
command:
- "sh"
- "-lc"
- |
exec valkey-server /etc/valkey/valkey.conf \
--protected-mode no \
--maxmemory ${VALKEY_MAXMEMORY:-256mb} \
--maxmemory-policy ${VALKEY_POLICY:-allkeys-lru} \
${VALKEY_PASSWORD:+--requirepass $VALKEY_PASSWORD}
healthcheck:
test: [ "CMD-SHELL", "[ -z \"$REDIS_PASSWORD\" ] && valkey-cli ping | grep PONG || valkey-cli -a \"$REDIS_PASSWORD\" ping | grep PONG" ]
test: [ "CMD-SHELL", "[ -z \"$VALKEY_PASSWORD\" ] && valkey-cli ping | grep PONG || valkey-cli -a \"$VALKEY_PASSWORD\" ping | grep PONG" ]
interval: "5s"
timeout: "5s"
retries: "3"
networks:
meldestelle-network:
aliases:
- "redis"
- "valkey"
# --- SERVICE DISCOVERY: Consul ---
@@ -161,7 +169,7 @@ services:
volumes:
postgres-data:
redis-data:
valkey-data:
mailpit-data:
networks:
+1 -1
View File
@@ -341,7 +341,7 @@ database-complete = [
"flyway-core",
"flyway-postgresql"
]
redis-cache = [
valkey-cache = [
"spring-boot-starter-data-redis",
"lettuce-core",
"jackson-module-kotlin",
+1 -1
View File
@@ -43,7 +43,7 @@ include(":contracts:ping-api")
// === BACKEND - INFRASTRUCTURE ===
// --- CACHE ---
include(":backend:infrastructure:cache:cache-api")
include(":backend:infrastructure:cache:redis-cache")
include(":backend:infrastructure:cache:valkey-cache")
// --- EVENT STORE ---
include(":backend:infrastructure:event-store:event-store-api")