Merge remote-tracking branch 'origin/struktur-umbau' into struktur-umbau
This commit is contained in:
+39
-58
@@ -2,75 +2,56 @@
|
|||||||
|
|
||||||
## Überblick
|
## Überblick
|
||||||
|
|
||||||
Das **Cache-Modul** stellt eine zentrale und wiederverwendbare Caching-Infrastruktur für alle Microservices des Meldestelle-Systems bereit. Caching ist eine entscheidende Technik zur Verbesserung der Anwendungsleistung, zur Reduzierung der Latenz und zur Entlastung von Backend-Systemen wie der primären PostgreSQL-Datenbank.
|
Das **Cache-Modul** stellt eine zentrale, hochverfügbare und wiederverwendbare Caching-Infrastruktur für alle Microservices bereit. Es dient der Verbesserung der Anwendungsperformance, der Reduzierung von Latenzen und der Entlastung der primären PostgreSQL-Datenbank.
|
||||||
|
|
||||||
## Architektur: Port-Adapter-Muster
|
## Architektur: Port-Adapter-Muster
|
||||||
|
|
||||||
Das Modul folgt streng dem **Port-Adapter-Muster** (auch als Hexagonale Architektur bekannt), um eine saubere Trennung zwischen der Caching-Schnittstelle (dem "Port") und der konkreten Implementierung (dem "Adapter") zu gewährleisten.
|
Das Modul folgt streng dem **Port-Adapter-Muster** (Hexagonale Architektur), um eine saubere Trennung zwischen der Caching-Schnittstelle (dem "Port") und der konkreten Implementierung (dem "Adapter") zu gewährleisten.
|
||||||
|
|
||||||
|
* **`:infrastructure:cache:cache-api`**: Definiert den abstrakten "Vertrag" für das Caching (`DistributedCache`-Interface), ohne sich um die zugrunde liegende Technologie zu kümmern. Die Fach-Services programmieren ausschließlich gegen dieses Interface.
|
||||||
|
* **`:infrastructure:cache:redis-cache`**: Die konkrete Implementierung des Vertrags, die **Redis** als hochperformantes Caching-Backend verwendet. Kapselt die gesamte Redis-spezifische Logik.
|
||||||
|
|
||||||
infrastructure/cache/
|
## Schlüsselfunktionen
|
||||||
├── cache-api/ # Der "Port": Definiert die Caching-Schnittstelle
|
|
||||||
└── redis-cache/ # Der "Adapter": Implementiert die Schnittstelle mit Redis
|
|
||||||
|
|
||||||
|
* **Offline-Fähigkeit & Resilienz:** Das Modul verfügt über einen In-Memory-Cache, der bei einem Ausfall der Redis-Verbindung als Fallback dient. Schreib-Operationen werden lokal als "dirty" markiert und automatisch mit Redis synchronisiert, sobald die Verbindung wiederhergestellt ist.
|
||||||
|
* **Idiomatische Kotlin-API:** Bietet neben der Standard-API auch ergonomische Erweiterungsfunktionen mit `reified`-Typen für eine saubere und typsichere Verwendung in Kotlin-Code (`cache.get<User>("key")`).
|
||||||
|
* **Projekweite Konsistenz:** Verwendet `kotlin.time.Duration` und `kotlin.time.Instant` für eine einheitliche Handhabung von Zeit- und Dauer-Angaben im gesamten Projekt.
|
||||||
|
* **Automatisierte Verbindungsüberwachung:** Überprüft periodisch den Zustand der Redis-Verbindung und informiert Listener über Statusänderungen (`CONNECTED`, `DISCONNECTED`).
|
||||||
|
|
||||||
### `cache-api`
|
## Verwendung
|
||||||
|
|
||||||
Dieses Modul ist der **abstrakte Teil** der Architektur. Es definiert den "Vertrag" für das Caching, ohne sich um die zugrunde liegende Technologie zu kümmern.
|
Ein Microservice bindet `:infrastructure:cache:redis-cache` als Abhängigkeit ein und lässt sich das `DistributedCache`-Interface per Dependency Injection geben.
|
||||||
|
|
||||||
* **Zweck:** Definiert ein oder mehrere Interfaces, z.B. `CacheService`, mit generischen Methoden wie `get(key)`, `set(key, value, ttl)` und `evict(key)`.
|
**Beispiel mit der idiomatischen Kotlin-API:**
|
||||||
* **Vorteil:** Jeder Service im System programmiert nur gegen dieses Interface. Die Geschäftslogik ist vollständig von der Caching-Technologie entkoppelt. Ein Austausch des Caching-Providers (z.B. von Redis zu Caffeine) würde keine Änderungen in den Fach-Services erfordern.
|
```kotlin
|
||||||
|
@Service
|
||||||
|
class MasterdataService(
|
||||||
|
private val cache: DistributedCache // Nur das Interface wird verwendet!
|
||||||
|
) {
|
||||||
|
fun findCountryById(id: String): Country? {
|
||||||
|
val cacheKey = "country:$id"
|
||||||
|
|
||||||
### `redis-cache`
|
// 1. Versuche, aus dem Cache zu lesen (typsicher und sauber)
|
||||||
|
val cachedCountry = cache.get<Country>(cacheKey)
|
||||||
Dieses Modul ist die **konkrete Implementierung** der in `cache-api` definierten Schnittstellen.
|
if (cachedCountry != null) {
|
||||||
|
return cachedCountry
|
||||||
* **Zweck:** Stellt eine Spring-basierte Konfiguration und eine Implementierung des `CacheService`-Interfaces bereit, die **Redis** als Datenspeicher verwendet. Es nutzt Spring Data Redis und den Lettuce-Client für die Kommunikation.
|
|
||||||
* **Technologie:** Verwendet Jackson für die Serialisierung der zu cachenden Objekte in das JSON-Format, bevor sie in Redis gespeichert werden.
|
|
||||||
* **Vorteil:** Kapselt die gesamte Redis-spezifische Logik an einem einzigen Ort.
|
|
||||||
|
|
||||||
## Verwendung in anderen Modulen
|
|
||||||
|
|
||||||
Ein Microservice, der Caching nutzen möchte, geht wie folgt vor:
|
|
||||||
|
|
||||||
1. **Abhängigkeit deklarieren:** Das Service-Modul (z.B. `masterdata-service`) fügt eine `implementation`-Abhängigkeit zu `:infrastructure:cache:redis-cache` in seiner `build.gradle.kts` hinzu.
|
|
||||||
|
|
||||||
```kotlin
|
|
||||||
// In masterdata-service/build.gradle.kts
|
|
||||||
dependencies {
|
|
||||||
implementation(projects.infrastructure.cache.redisCache)
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
2. **Interface injizieren:** Im Service-Code wird nur das Interface aus `cache-api` per Dependency Injection angefordert, nicht die konkrete Redis-Klasse.
|
|
||||||
|
|
||||||
```kotlin
|
|
||||||
// In einem Use Case oder Service
|
|
||||||
@Service
|
|
||||||
class MasterdataService(
|
|
||||||
private val cache: CacheService // Nur das Interface wird verwendet!
|
|
||||||
) {
|
|
||||||
fun findCountryById(id: String): Country? {
|
|
||||||
val cacheKey = "country:$id"
|
|
||||||
// 1. Versuche, aus dem Cache zu lesen
|
|
||||||
val cachedCountry = cache.get<Country>(cacheKey)
|
|
||||||
if (cachedCountry != null) {
|
|
||||||
return cachedCountry
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Wenn nicht im Cache, aus der DB lesen
|
|
||||||
val dbCountry = countryRepository.findById(id)
|
|
||||||
|
|
||||||
// 3. Ergebnis in den Cache schreiben für zukünftige Anfragen
|
|
||||||
if (dbCountry != null) {
|
|
||||||
cache.set(cacheKey, dbCountry, ttl = 3600) // Cache für 1 Stunde
|
|
||||||
}
|
|
||||||
return dbCountry
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 2. Wenn nicht im Cache, aus der DB lesen
|
||||||
|
val dbCountry = countryRepository.findById(id)
|
||||||
|
|
||||||
|
// 3. Ergebnis in den Cache schreiben für zukünftige Anfragen
|
||||||
|
dbCountry?.let {
|
||||||
|
cache.set(cacheKey, it, ttl = 1.hours) // Cache für 1 Stunde
|
||||||
|
}
|
||||||
|
return dbCountry
|
||||||
}
|
}
|
||||||
```
|
}
|
||||||
|
```
|
||||||
|
|
||||||
Diese Architektur stellt sicher, dass die Geschäftslogik sauber und von Infrastrukturdetails unberührt bleibt.
|
## Testing-Strategie
|
||||||
|
Die Qualität des Moduls wird durch eine zweistufige Teststrategie sichergestellt:
|
||||||
|
|
||||||
---
|
* **Integrationstests mit Testcontainers: Die Kernfunktionalität wird gegen eine echte Redis-Datenbank getestet, die zur Laufzeit in einem Docker-Container gestartet wird. Dies garantiert 100%ige Kompatibilität.**
|
||||||
**Letzte Aktualisierung**: 31. Juli 2025
|
|
||||||
|
* **Unit-Tests mit MockK: Die komplexe Logik der Offline-Fähigkeit und Synchronisation wird durch das Mocking des RedisTemplate getestet. So können Verbindungsausfälle zuverlässig simuliert werden, ohne den Test-Lebenszyklus zu stören.**
|
||||||
|
|||||||
@@ -5,6 +5,13 @@ plugins {
|
|||||||
alias(libs.plugins.kotlin.jvm)
|
alias(libs.plugins.kotlin.jvm)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Erlaubt die Verwendung der kotlin.time API im gesamten Modul
|
||||||
|
kotlin {
|
||||||
|
compilerOptions {
|
||||||
|
freeCompilerArgs.add("-Xopt-in=kotlin.time.ExperimentalTime")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
// Stellt sicher, dass alle Versionen aus der zentralen BOM kommen.
|
// Stellt sicher, dass alle Versionen aus der zentralen BOM kommen.
|
||||||
implementation(platform(projects.platform.platformBom))
|
implementation(platform(projects.platform.platformBom))
|
||||||
|
|||||||
+7
-48
@@ -1,68 +1,27 @@
|
|||||||
package at.mocode.infrastructure.cache.api
|
package at.mocode.infrastructure.cache.api
|
||||||
|
|
||||||
import java.time.Duration
|
import kotlin.time.Duration
|
||||||
|
import kotlin.time.Duration.Companion.days
|
||||||
|
import kotlin.time.Duration.Companion.hours
|
||||||
|
import kotlin.time.Duration.Companion.minutes
|
||||||
|
|
||||||
/**
|
|
||||||
* Configuration for the distributed cache.
|
|
||||||
*/
|
|
||||||
interface CacheConfiguration {
|
interface CacheConfiguration {
|
||||||
/**
|
|
||||||
* Default time-to-live for cache entries.
|
|
||||||
* If null, entries do not expire by default.
|
|
||||||
*/
|
|
||||||
val defaultTtl: Duration?
|
val defaultTtl: Duration?
|
||||||
|
|
||||||
/**
|
|
||||||
* Maximum number of entries to store in the local cache.
|
|
||||||
* If null, there is no limit.
|
|
||||||
*/
|
|
||||||
val localCacheMaxSize: Int?
|
val localCacheMaxSize: Int?
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether to enable offline mode.
|
|
||||||
* If true, the cache will store entries locally when offline
|
|
||||||
* and synchronize them when online.
|
|
||||||
*/
|
|
||||||
val offlineModeEnabled: Boolean
|
val offlineModeEnabled: Boolean
|
||||||
|
|
||||||
/**
|
|
||||||
* How often to attempt synchronization in offline mode.
|
|
||||||
*/
|
|
||||||
val synchronizationInterval: Duration
|
val synchronizationInterval: Duration
|
||||||
|
|
||||||
/**
|
|
||||||
* Maximum age of entries to keep in the local cache when offline.
|
|
||||||
* If null, entries do not expire when offline.
|
|
||||||
*/
|
|
||||||
val offlineEntryMaxAge: Duration?
|
val offlineEntryMaxAge: Duration?
|
||||||
|
|
||||||
/**
|
|
||||||
* Prefix to add to all cache keys.
|
|
||||||
* This can be used to namespace cache entries.
|
|
||||||
*/
|
|
||||||
val keyPrefix: String
|
val keyPrefix: String
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether to compress cache entries.
|
|
||||||
*/
|
|
||||||
val compressionEnabled: Boolean
|
val compressionEnabled: Boolean
|
||||||
|
|
||||||
/**
|
|
||||||
* Threshold in bytes above which to compress cache entries.
|
|
||||||
* Only used if compressionEnabled is true.
|
|
||||||
*/
|
|
||||||
val compressionThreshold: Int
|
val compressionThreshold: Int
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Default implementation of CacheConfiguration.
|
|
||||||
*/
|
|
||||||
data class DefaultCacheConfiguration(
|
data class DefaultCacheConfiguration(
|
||||||
override val defaultTtl: Duration? = Duration.ofHours(1),
|
override val defaultTtl: Duration? = 1.hours,
|
||||||
override val localCacheMaxSize: Int? = 10000,
|
override val localCacheMaxSize: Int? = 10000,
|
||||||
override val offlineModeEnabled: Boolean = true,
|
override val offlineModeEnabled: Boolean = true,
|
||||||
override val synchronizationInterval: Duration = Duration.ofMinutes(5),
|
override val synchronizationInterval: Duration = 5.minutes,
|
||||||
override val offlineEntryMaxAge: Duration? = Duration.ofDays(7),
|
override val offlineEntryMaxAge: Duration? = 7.days,
|
||||||
override val keyPrefix: String = "",
|
override val keyPrefix: String = "",
|
||||||
override val compressionEnabled: Boolean = true,
|
override val compressionEnabled: Boolean = true,
|
||||||
override val compressionThreshold: Int = 1024
|
override val compressionThreshold: Int = 1024
|
||||||
|
|||||||
Vendored
+10
-68
@@ -1,96 +1,38 @@
|
|||||||
package at.mocode.infrastructure.cache.api
|
package at.mocode.infrastructure.cache.api
|
||||||
|
|
||||||
import java.time.Instant
|
import kotlin.time.Clock
|
||||||
|
import kotlin.time.Instant
|
||||||
|
|
||||||
/**
|
|
||||||
* Represents an entry in the cache with metadata for offline capability.
|
|
||||||
*
|
|
||||||
* @param key The key of the cache entry
|
|
||||||
* @param value The value stored in the cache
|
|
||||||
* @param createdAt When the entry was created
|
|
||||||
* @param expiresAt When the entry expires, or null if it doesn't expire
|
|
||||||
* @param lastModifiedAt When the entry was last modified
|
|
||||||
* @param isDirty Whether the entry has been modified locally and needs to be synchronized
|
|
||||||
* @param isLocal Whether the entry is only stored locally (not yet synchronized)
|
|
||||||
*/
|
|
||||||
data class CacheEntry<T : Any>(
|
data class CacheEntry<T : Any>(
|
||||||
val key: String,
|
val key: String,
|
||||||
val value: T,
|
val value: T,
|
||||||
val createdAt: Instant = Instant.now(),
|
val createdAt: Instant = Clock.System.now(),
|
||||||
val expiresAt: Instant? = null,
|
val expiresAt: Instant? = null,
|
||||||
val lastModifiedAt: Instant = Instant.now(),
|
val lastModifiedAt: Instant = Clock.System.now(),
|
||||||
val isDirty: Boolean = false,
|
val isDirty: Boolean = false,
|
||||||
val isLocal: Boolean = false
|
val isLocal: Boolean = false
|
||||||
) {
|
) {
|
||||||
/**
|
|
||||||
* Checks if the entry is expired.
|
|
||||||
*
|
|
||||||
* @return true if the entry is expired, false otherwise
|
|
||||||
*/
|
|
||||||
fun isExpired(): Boolean {
|
fun isExpired(): Boolean {
|
||||||
return expiresAt?.isBefore(Instant.now()) ?: false
|
return expiresAt?.let { it < Clock.System.now() } ?: false
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new entry with the isDirty flag set to true.
|
|
||||||
*
|
|
||||||
* @return A new CacheEntry with isDirty set to true
|
|
||||||
*/
|
|
||||||
fun markDirty(): CacheEntry<T> {
|
fun markDirty(): CacheEntry<T> {
|
||||||
return copy(
|
return copy(isDirty = true, lastModifiedAt = Clock.System.now())
|
||||||
isDirty = true,
|
|
||||||
lastModifiedAt = Instant.now()
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new entry with the isDirty flag set to false.
|
|
||||||
*
|
|
||||||
* @return A new CacheEntry with isDirty set to false
|
|
||||||
*/
|
|
||||||
fun markClean(): CacheEntry<T> {
|
fun markClean(): CacheEntry<T> {
|
||||||
return copy(
|
return copy(isDirty = false, isLocal = false, lastModifiedAt = Clock.System.now())
|
||||||
isDirty = false,
|
|
||||||
isLocal = false,
|
|
||||||
lastModifiedAt = Instant.now()
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new entry with the isLocal flag set to true.
|
|
||||||
*
|
|
||||||
* @return A new CacheEntry with isLocal set to true
|
|
||||||
*/
|
|
||||||
fun markLocal(): CacheEntry<T> {
|
fun markLocal(): CacheEntry<T> {
|
||||||
return copy(
|
return copy(isLocal = true, lastModifiedAt = Clock.System.now())
|
||||||
isLocal = true,
|
|
||||||
lastModifiedAt = Instant.now()
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new entry with an updated value.
|
|
||||||
*
|
|
||||||
* @param newValue The new value
|
|
||||||
* @return A new CacheEntry with the updated value
|
|
||||||
*/
|
|
||||||
fun updateValue(newValue: T): CacheEntry<T> {
|
fun updateValue(newValue: T): CacheEntry<T> {
|
||||||
return copy(
|
return copy(value = newValue, lastModifiedAt = Clock.System.now())
|
||||||
value = newValue,
|
|
||||||
lastModifiedAt = Instant.now()
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new entry with an updated expiration time.
|
|
||||||
*
|
|
||||||
* @param newExpiresAt The new expiration time
|
|
||||||
* @return A new CacheEntry with the updated expiration time
|
|
||||||
*/
|
|
||||||
fun updateExpiration(newExpiresAt: Instant?): CacheEntry<T> {
|
fun updateExpiration(newExpiresAt: Instant?): CacheEntry<T> {
|
||||||
return copy(
|
return copy(expiresAt = newExpiresAt, lastModifiedAt = Clock.System.now())
|
||||||
expiresAt = newExpiresAt,
|
|
||||||
lastModifiedAt = Instant.now()
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+2
-59
@@ -1,76 +1,19 @@
|
|||||||
package at.mocode.infrastructure.cache.api
|
package at.mocode.infrastructure.cache.api
|
||||||
|
|
||||||
import java.time.Instant
|
import kotlin.time.Instant
|
||||||
|
|
||||||
/**
|
|
||||||
* Represents the connection status of the cache.
|
|
||||||
*/
|
|
||||||
enum class ConnectionState {
|
enum class ConnectionState {
|
||||||
/**
|
CONNECTED, DISCONNECTED, RECONNECTING
|
||||||
* The cache is connected to the remote server.
|
|
||||||
*/
|
|
||||||
CONNECTED,
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The cache is disconnected from the remote server.
|
|
||||||
*/
|
|
||||||
DISCONNECTED,
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The cache is attempting to reconnect to the remote server.
|
|
||||||
*/
|
|
||||||
RECONNECTING
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Interface for tracking the connection status of the cache.
|
|
||||||
*/
|
|
||||||
interface ConnectionStatusTracker {
|
interface ConnectionStatusTracker {
|
||||||
/**
|
|
||||||
* Gets the current connection state.
|
|
||||||
*
|
|
||||||
* @return The current connection state
|
|
||||||
*/
|
|
||||||
fun getConnectionState(): ConnectionState
|
fun getConnectionState(): ConnectionState
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the time when the connection state last changed.
|
|
||||||
*
|
|
||||||
* @return The time when the connection state last changed
|
|
||||||
*/
|
|
||||||
fun getLastStateChangeTime(): Instant
|
fun getLastStateChangeTime(): Instant
|
||||||
|
|
||||||
/**
|
|
||||||
* Registers a listener to be notified when the connection state changes.
|
|
||||||
*
|
|
||||||
* @param listener The listener to register
|
|
||||||
*/
|
|
||||||
fun registerConnectionListener(listener: ConnectionStateListener)
|
fun registerConnectionListener(listener: ConnectionStateListener)
|
||||||
|
|
||||||
/**
|
|
||||||
* Unregisters a connection state listener.
|
|
||||||
*
|
|
||||||
* @param listener The listener to unregister
|
|
||||||
*/
|
|
||||||
fun unregisterConnectionListener(listener: ConnectionStateListener)
|
fun unregisterConnectionListener(listener: ConnectionStateListener)
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if the cache is currently connected.
|
|
||||||
*
|
|
||||||
* @return true if the cache is connected, false otherwise
|
|
||||||
*/
|
|
||||||
fun isConnected(): Boolean = getConnectionState() == ConnectionState.CONNECTED
|
fun isConnected(): Boolean = getConnectionState() == ConnectionState.CONNECTED
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Listener for connection state changes.
|
|
||||||
*/
|
|
||||||
interface ConnectionStateListener {
|
interface ConnectionStateListener {
|
||||||
/**
|
|
||||||
* Called when the connection state changes.
|
|
||||||
*
|
|
||||||
* @param newState The new connection state
|
|
||||||
* @param timestamp The time when the state changed
|
|
||||||
*/
|
|
||||||
fun onConnectionStateChanged(newState: ConnectionState, timestamp: Instant)
|
fun onConnectionStateChanged(newState: ConnectionState, timestamp: Instant)
|
||||||
}
|
}
|
||||||
|
|||||||
+3
-80
@@ -1,94 +1,17 @@
|
|||||||
package at.mocode.infrastructure.cache.api
|
package at.mocode.infrastructure.cache.api
|
||||||
|
|
||||||
import java.time.Duration
|
import kotlin.time.Duration
|
||||||
|
|
||||||
/**
|
|
||||||
* Interface for a distributed cache that supports offline capability.
|
|
||||||
* This cache can be used to store and retrieve data across multiple instances
|
|
||||||
* and provides mechanisms for offline operation.
|
|
||||||
*/
|
|
||||||
interface DistributedCache {
|
interface DistributedCache {
|
||||||
/**
|
|
||||||
* Retrieves a value from the cache.
|
|
||||||
*
|
|
||||||
* @param key The key to retrieve
|
|
||||||
* @return The value associated with the key, or null if not found
|
|
||||||
*/
|
|
||||||
fun <T : Any> get(key: String, clazz: Class<T>): T?
|
fun <T : Any> get(key: String, clazz: Class<T>): T?
|
||||||
|
fun <T : Any> set(key: String, value: T, ttl: Duration? = null) // Geändert
|
||||||
/**
|
|
||||||
* Stores a value in the cache with an optional time-to-live.
|
|
||||||
*
|
|
||||||
* @param key The key to store the value under
|
|
||||||
* @param value The value to store
|
|
||||||
* @param ttl Optional time-to-live for the cache entry
|
|
||||||
*/
|
|
||||||
fun <T : Any> set(key: String, value: T, ttl: Duration? = null)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes a value from the cache.
|
|
||||||
*
|
|
||||||
* @param key The key to remove
|
|
||||||
*/
|
|
||||||
fun delete(key: String)
|
fun delete(key: String)
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if a key exists in the cache.
|
|
||||||
*
|
|
||||||
* @param key The key to check
|
|
||||||
* @return true if the key exists, false otherwise
|
|
||||||
*/
|
|
||||||
fun exists(key: String): Boolean
|
fun exists(key: String): Boolean
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieves multiple values from the cache.
|
|
||||||
*
|
|
||||||
* @param keys The keys to retrieve
|
|
||||||
* @return A map of keys to values, with missing keys omitted
|
|
||||||
*/
|
|
||||||
fun <T : Any> multiGet(keys: Collection<String>, clazz: Class<T>): Map<String, T>
|
fun <T : Any> multiGet(keys: Collection<String>, clazz: Class<T>): Map<String, T>
|
||||||
|
fun <T : Any> multiSet(entries: Map<String, T>, ttl: Duration? = null) // Geändert
|
||||||
/**
|
|
||||||
* Stores multiple values in the cache with an optional time-to-live.
|
|
||||||
*
|
|
||||||
* @param entries The key-value pairs to store
|
|
||||||
* @param ttl Optional time-to-live for the cache entries
|
|
||||||
*/
|
|
||||||
fun <T : Any> multiSet(entries: Map<String, T>, ttl: Duration? = null)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes multiple values from the cache.
|
|
||||||
*
|
|
||||||
* @param keys The keys to remove
|
|
||||||
*/
|
|
||||||
fun multiDelete(keys: Collection<String>)
|
fun multiDelete(keys: Collection<String>)
|
||||||
|
|
||||||
/**
|
|
||||||
* Synchronizes the local cache with the distributed cache.
|
|
||||||
* This is used to ensure that the local cache is up-to-date with the distributed cache
|
|
||||||
* after being offline.
|
|
||||||
*
|
|
||||||
* @param keys Optional collection of keys to synchronize. If null, all keys are synchronized.
|
|
||||||
*/
|
|
||||||
fun synchronize(keys: Collection<String>? = null)
|
fun synchronize(keys: Collection<String>? = null)
|
||||||
|
|
||||||
/**
|
|
||||||
* Marks a key as dirty, indicating that it has been modified locally
|
|
||||||
* and needs to be synchronized with the distributed cache.
|
|
||||||
*
|
|
||||||
* @param key The key to mark as dirty
|
|
||||||
*/
|
|
||||||
fun markDirty(key: String)
|
fun markDirty(key: String)
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets all keys that have been marked as dirty.
|
|
||||||
*
|
|
||||||
* @return A collection of dirty keys
|
|
||||||
*/
|
|
||||||
fun getDirtyKeys(): Collection<String>
|
fun getDirtyKeys(): Collection<String>
|
||||||
|
|
||||||
/**
|
|
||||||
* Clears all entries from the cache.
|
|
||||||
*/
|
|
||||||
fun clear()
|
fun clear()
|
||||||
}
|
}
|
||||||
|
|||||||
+21
@@ -0,0 +1,21 @@
|
|||||||
|
package at.mocode.infrastructure.cache.api
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kotlin-idiomatic extension function to retrieve a value from the cache
|
||||||
|
* using reified types.
|
||||||
|
*
|
||||||
|
* Example: `val user = cache.get<User>("user:123")`
|
||||||
|
*/
|
||||||
|
inline fun <reified T : Any> DistributedCache.get(key: String): T? {
|
||||||
|
return this.get(key, T::class.java)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kotlin-idiomatic extension function to retrieve multiple values from the cache
|
||||||
|
* using reified types.
|
||||||
|
*
|
||||||
|
* Example: `val users = cache.multiGet<User>(listOf("user:123", "user:124"))`
|
||||||
|
*/
|
||||||
|
inline fun <reified T : Any> DistributedCache.multiGet(keys: Collection<String>): Map<String, T> {
|
||||||
|
return this.multiGet(keys, T::class.java)
|
||||||
|
}
|
||||||
+9
-16
@@ -11,10 +11,10 @@ import java.io.ByteArrayInputStream
|
|||||||
import java.io.ByteArrayOutputStream
|
import java.io.ByteArrayOutputStream
|
||||||
import java.util.zip.GZIPInputStream
|
import java.util.zip.GZIPInputStream
|
||||||
import java.util.zip.GZIPOutputStream
|
import java.util.zip.GZIPOutputStream
|
||||||
|
import kotlin.time.ExperimentalTime
|
||||||
|
import kotlin.time.Instant
|
||||||
|
|
||||||
/**
|
@OptIn(ExperimentalTime::class)
|
||||||
* Jackson-based implementation of CacheSerializer.
|
|
||||||
*/
|
|
||||||
class JacksonCacheSerializer : CacheSerializer {
|
class JacksonCacheSerializer : CacheSerializer {
|
||||||
private val objectMapper: ObjectMapper = ObjectMapper().apply {
|
private val objectMapper: ObjectMapper = ObjectMapper().apply {
|
||||||
registerModule(KotlinModule.Builder().build())
|
registerModule(KotlinModule.Builder().build())
|
||||||
@@ -31,14 +31,13 @@ class JacksonCacheSerializer : CacheSerializer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun <T : Any> serializeEntry(entry: CacheEntry<T>): ByteArray {
|
override fun <T : Any> serializeEntry(entry: CacheEntry<T>): ByteArray {
|
||||||
// Create a wrapper that holds both the entry metadata and the serialized value
|
|
||||||
val wrapper = CacheEntryWrapper(
|
val wrapper = CacheEntryWrapper(
|
||||||
key = entry.key,
|
key = entry.key,
|
||||||
valueBytes = serialize(entry.value),
|
valueBytes = serialize(entry.value),
|
||||||
valueType = entry.value.javaClass.name,
|
valueType = entry.value.javaClass.name,
|
||||||
createdAt = entry.createdAt,
|
createdAt = java.time.Instant.ofEpochMilli(entry.createdAt.toEpochMilliseconds()),
|
||||||
expiresAt = entry.expiresAt,
|
expiresAt = entry.expiresAt?.toEpochMilliseconds()?.let { java.time.Instant.ofEpochMilli(it) },
|
||||||
lastModifiedAt = entry.lastModifiedAt,
|
lastModifiedAt = java.time.Instant.ofEpochMilli(entry.lastModifiedAt.toEpochMilliseconds()),
|
||||||
isDirty = entry.isDirty,
|
isDirty = entry.isDirty,
|
||||||
isLocal = entry.isLocal
|
isLocal = entry.isLocal
|
||||||
)
|
)
|
||||||
@@ -48,13 +47,12 @@ class JacksonCacheSerializer : CacheSerializer {
|
|||||||
override fun <T : Any> deserializeEntry(bytes: ByteArray, valueClass: Class<T>): CacheEntry<T> {
|
override fun <T : Any> deserializeEntry(bytes: ByteArray, valueClass: Class<T>): CacheEntry<T> {
|
||||||
val wrapper = objectMapper.readValue<CacheEntryWrapper>(bytes)
|
val wrapper = objectMapper.readValue<CacheEntryWrapper>(bytes)
|
||||||
val value = deserialize(wrapper.valueBytes, valueClass)
|
val value = deserialize(wrapper.valueBytes, valueClass)
|
||||||
|
|
||||||
return CacheEntry(
|
return CacheEntry(
|
||||||
key = wrapper.key,
|
key = wrapper.key,
|
||||||
value = value,
|
value = value,
|
||||||
createdAt = wrapper.createdAt,
|
createdAt = Instant.fromEpochMilliseconds(wrapper.createdAt.toEpochMilli()),
|
||||||
expiresAt = wrapper.expiresAt,
|
expiresAt = wrapper.expiresAt?.toEpochMilli()?.let { Instant.fromEpochMilliseconds(it) },
|
||||||
lastModifiedAt = wrapper.lastModifiedAt,
|
lastModifiedAt = Instant.fromEpochMilliseconds(wrapper.lastModifiedAt.toEpochMilli()),
|
||||||
isDirty = wrapper.isDirty,
|
isDirty = wrapper.isDirty,
|
||||||
isLocal = wrapper.isLocal
|
isLocal = wrapper.isLocal
|
||||||
)
|
)
|
||||||
@@ -71,11 +69,6 @@ class JacksonCacheSerializer : CacheSerializer {
|
|||||||
return inputStream.readBytes()
|
return inputStream.readBytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrapper class for serializing cache entries.
|
|
||||||
* This separates the metadata from the value, allowing us to deserialize
|
|
||||||
* the metadata without knowing the type of the value.
|
|
||||||
*/
|
|
||||||
private data class CacheEntryWrapper(
|
private data class CacheEntryWrapper(
|
||||||
val key: String,
|
val key: String,
|
||||||
val valueBytes: ByteArray,
|
val valueBytes: ByteArray,
|
||||||
|
|||||||
+52
-57
@@ -11,14 +11,15 @@ import org.slf4j.LoggerFactory
|
|||||||
import org.springframework.data.redis.RedisConnectionFailureException
|
import org.springframework.data.redis.RedisConnectionFailureException
|
||||||
import org.springframework.data.redis.core.RedisTemplate
|
import org.springframework.data.redis.core.RedisTemplate
|
||||||
import org.springframework.scheduling.annotation.Scheduled
|
import org.springframework.scheduling.annotation.Scheduled
|
||||||
import java.time.Duration
|
|
||||||
import java.time.Instant
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.CopyOnWriteArrayList
|
import java.util.concurrent.CopyOnWriteArrayList
|
||||||
|
import kotlin.time.Clock
|
||||||
|
import kotlin.time.Duration
|
||||||
|
import kotlin.time.Instant
|
||||||
|
import kotlin.time.toJavaDuration
|
||||||
|
import kotlin.time.ExperimentalTime
|
||||||
|
|
||||||
/**
|
@OptIn(ExperimentalTime::class)
|
||||||
* Redis implementation of DistributedCache with offline capability.
|
|
||||||
*/
|
|
||||||
class RedisDistributedCache(
|
class RedisDistributedCache(
|
||||||
private val redisTemplate: RedisTemplate<String, ByteArray>,
|
private val redisTemplate: RedisTemplate<String, ByteArray>,
|
||||||
private val serializer: CacheSerializer,
|
private val serializer: CacheSerializer,
|
||||||
@@ -35,7 +36,8 @@ class RedisDistributedCache(
|
|||||||
|
|
||||||
// Connection state
|
// Connection state
|
||||||
private var connectionState = ConnectionState.DISCONNECTED
|
private var connectionState = ConnectionState.DISCONNECTED
|
||||||
private var lastStateChangeTime = Instant.now()
|
|
||||||
|
private var lastStateChangeTime = Clock.System.now()
|
||||||
|
|
||||||
// Connection state listeners
|
// Connection state listeners
|
||||||
private val connectionListeners = CopyOnWriteArrayList<ConnectionStateListener>()
|
private val connectionListeners = CopyOnWriteArrayList<ConnectionStateListener>()
|
||||||
@@ -45,24 +47,20 @@ class RedisDistributedCache(
|
|||||||
checkConnection()
|
checkConnection()
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// DistributedCache implementation
|
|
||||||
//
|
|
||||||
|
|
||||||
override fun <T : Any> get(key: String, clazz: Class<T>): T? {
|
override fun <T : Any> get(key: String, clazz: Class<T>): T? {
|
||||||
val prefixedKey = addPrefix(key)
|
val prefixedKey = addPrefix(key)
|
||||||
|
|
||||||
// Try to get from local cache first
|
// Try to get from the local cache first
|
||||||
val localEntry = localCache[prefixedKey] as? CacheEntry<T>
|
val localEntry = localCache[prefixedKey] as? CacheEntry<*>
|
||||||
if (localEntry != null) {
|
if (localEntry != null) {
|
||||||
if (localEntry.isExpired()) {
|
if (localEntry.isExpired()) {
|
||||||
localCache.remove(prefixedKey)
|
localCache.remove(prefixedKey)
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
return localEntry.value
|
return localEntry.value as T?
|
||||||
}
|
}
|
||||||
|
|
||||||
// If not in local cache and we're disconnected, return null
|
// If not in the local cache, and we're disconnected, return null
|
||||||
if (!isConnected()) {
|
if (!isConnected()) {
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
@@ -72,7 +70,7 @@ class RedisDistributedCache(
|
|||||||
val bytes = redisTemplate.opsForValue().get(prefixedKey) ?: return null
|
val bytes = redisTemplate.opsForValue().get(prefixedKey) ?: return null
|
||||||
val entry = serializer.deserializeEntry(bytes, clazz)
|
val entry = serializer.deserializeEntry(bytes, clazz)
|
||||||
|
|
||||||
// Store in local cache
|
// Store in a local cache
|
||||||
localCache[prefixedKey] = entry as CacheEntry<Any>
|
localCache[prefixedKey] = entry as CacheEntry<Any>
|
||||||
|
|
||||||
return entry.value
|
return entry.value
|
||||||
@@ -87,7 +85,8 @@ class RedisDistributedCache(
|
|||||||
|
|
||||||
override fun <T : Any> set(key: String, value: T, ttl: Duration?) {
|
override fun <T : Any> set(key: String, value: T, ttl: Duration?) {
|
||||||
val prefixedKey = addPrefix(key)
|
val prefixedKey = addPrefix(key)
|
||||||
val expiresAt = ttl?.let { Instant.now().plus(it) } ?: config.defaultTtl?.let { Instant.now().plus(it) }
|
// KORREKTUR: Logik verwendet jetzt kotlin.time
|
||||||
|
val expiresAt = ttl?.let { Clock.System.now() + it } ?: config.defaultTtl?.let { Clock.System.now() + it }
|
||||||
|
|
||||||
val entry = CacheEntry(
|
val entry = CacheEntry(
|
||||||
key = prefixedKey,
|
key = prefixedKey,
|
||||||
@@ -95,25 +94,21 @@ class RedisDistributedCache(
|
|||||||
expiresAt = expiresAt
|
expiresAt = expiresAt
|
||||||
)
|
)
|
||||||
|
|
||||||
// Store in local cache
|
|
||||||
localCache[prefixedKey] = entry as CacheEntry<Any>
|
localCache[prefixedKey] = entry as CacheEntry<Any>
|
||||||
|
|
||||||
// If we're disconnected, mark as dirty and return
|
|
||||||
if (!isConnected()) {
|
if (!isConnected()) {
|
||||||
markDirty(key)
|
markDirty(key)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to store in Redis
|
|
||||||
try {
|
try {
|
||||||
val bytes = serializer.serializeEntry(entry)
|
val bytes = serializer.serializeEntry(entry)
|
||||||
redisTemplate.opsForValue().set(prefixedKey, bytes)
|
val effectiveTtl = ttl ?: config.defaultTtl
|
||||||
|
if (effectiveTtl != null) {
|
||||||
if (ttl != null) {
|
// KORREKTUR: Konvertierung zu java.time.Duration für RedisTemplate
|
||||||
redisTemplate.expire(prefixedKey, ttl)
|
redisTemplate.opsForValue().set(prefixedKey, bytes, effectiveTtl.toJavaDuration())
|
||||||
} else if (config.defaultTtl != null) {
|
} else {
|
||||||
val defaultTtl: Duration = config.defaultTtl!!
|
redisTemplate.opsForValue().set(prefixedKey, bytes)
|
||||||
redisTemplate.expire(prefixedKey, defaultTtl)
|
|
||||||
}
|
}
|
||||||
} catch (e: RedisConnectionFailureException) {
|
} catch (e: RedisConnectionFailureException) {
|
||||||
handleConnectionFailure(e)
|
handleConnectionFailure(e)
|
||||||
@@ -127,7 +122,7 @@ class RedisDistributedCache(
|
|||||||
override fun delete(key: String) {
|
override fun delete(key: String) {
|
||||||
val prefixedKey = addPrefix(key)
|
val prefixedKey = addPrefix(key)
|
||||||
|
|
||||||
// Remove from local cache
|
// Remove from the local cache
|
||||||
localCache.remove(prefixedKey)
|
localCache.remove(prefixedKey)
|
||||||
|
|
||||||
// If we're disconnected, mark as dirty and return
|
// If we're disconnected, mark as dirty and return
|
||||||
@@ -151,7 +146,7 @@ class RedisDistributedCache(
|
|||||||
override fun exists(key: String): Boolean {
|
override fun exists(key: String): Boolean {
|
||||||
val prefixedKey = addPrefix(key)
|
val prefixedKey = addPrefix(key)
|
||||||
|
|
||||||
// Check local cache first
|
// Check the local cache first
|
||||||
if (localCache.containsKey(prefixedKey)) {
|
if (localCache.containsKey(prefixedKey)) {
|
||||||
val entry = localCache[prefixedKey]
|
val entry = localCache[prefixedKey]
|
||||||
if (entry != null && !entry.isExpired()) {
|
if (entry != null && !entry.isExpired()) {
|
||||||
@@ -181,7 +176,7 @@ class RedisDistributedCache(
|
|||||||
override fun <T : Any> multiGet(keys: Collection<String>, clazz: Class<T>): Map<String, T> {
|
override fun <T : Any> multiGet(keys: Collection<String>, clazz: Class<T>): Map<String, T> {
|
||||||
val result = mutableMapOf<String, T>()
|
val result = mutableMapOf<String, T>()
|
||||||
|
|
||||||
// Get from local cache first
|
// Get from the local cache first
|
||||||
val prefixedKeys = keys.map { addPrefix(it) }
|
val prefixedKeys = keys.map { addPrefix(it) }
|
||||||
val localEntries = prefixedKeys.mapNotNull { key ->
|
val localEntries = prefixedKeys.mapNotNull { key ->
|
||||||
val entry = localCache[key] as? CacheEntry<T>
|
val entry = localCache[key] as? CacheEntry<T>
|
||||||
@@ -215,7 +210,7 @@ class RedisDistributedCache(
|
|||||||
try {
|
try {
|
||||||
val entry = serializer.deserializeEntry(bytes, clazz)
|
val entry = serializer.deserializeEntry(bytes, clazz)
|
||||||
|
|
||||||
// Store in local cache
|
// Store in a local cache
|
||||||
localCache[key] = entry as CacheEntry<Any>
|
localCache[key] = entry as CacheEntry<Any>
|
||||||
|
|
||||||
// Add to result
|
// Add to result
|
||||||
@@ -235,10 +230,10 @@ class RedisDistributedCache(
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ... (multiSet ebenfalls anpassen)
|
||||||
override fun <T : Any> multiSet(entries: Map<String, T>, ttl: Duration?) {
|
override fun <T : Any> multiSet(entries: Map<String, T>, ttl: Duration?) {
|
||||||
// Store in local cache and prepare for Redis
|
|
||||||
val redisBatch = mutableMapOf<String, ByteArray>()
|
val redisBatch = mutableMapOf<String, ByteArray>()
|
||||||
val expiresAt = ttl?.let { Instant.now().plus(it) } ?: config.defaultTtl?.let { Instant.now().plus(it) }
|
val expiresAt = ttl?.let { Clock.System.now() + it } ?: config.defaultTtl?.let { Clock.System.now() + it }
|
||||||
|
|
||||||
for ((key, value) in entries) {
|
for ((key, value) in entries) {
|
||||||
val prefixedKey = addPrefix(key)
|
val prefixedKey = addPrefix(key)
|
||||||
@@ -247,30 +242,24 @@ class RedisDistributedCache(
|
|||||||
value = value,
|
value = value,
|
||||||
expiresAt = expiresAt
|
expiresAt = expiresAt
|
||||||
)
|
)
|
||||||
|
|
||||||
// Store in local cache
|
|
||||||
localCache[prefixedKey] = entry as CacheEntry<Any>
|
localCache[prefixedKey] = entry as CacheEntry<Any>
|
||||||
|
|
||||||
// Prepare for Redis
|
|
||||||
redisBatch[prefixedKey] = serializer.serializeEntry(entry)
|
redisBatch[prefixedKey] = serializer.serializeEntry(entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we're disconnected, mark all as dirty and return
|
|
||||||
if (!isConnected()) {
|
if (!isConnected()) {
|
||||||
entries.keys.forEach { markDirty(it) }
|
entries.keys.forEach { markDirty(it) }
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to store in Redis
|
|
||||||
try {
|
try {
|
||||||
redisTemplate.opsForValue().multiSet(redisBatch)
|
redisTemplate.opsForValue().multiSet(redisBatch)
|
||||||
|
val effectiveTtl = ttl ?: config.defaultTtl
|
||||||
if (ttl != null || config.defaultTtl != null) {
|
if (effectiveTtl != null) {
|
||||||
val duration = ttl ?: config.defaultTtl
|
redisTemplate.executePipelined { connection ->
|
||||||
if (duration != null) {
|
redisBatch.keys.forEach { key ->
|
||||||
for (key in redisBatch.keys) {
|
connection.keyCommands().pExpire(key.toByteArray(), effectiveTtl.inWholeMilliseconds)
|
||||||
redisTemplate.expire(key, duration)
|
|
||||||
}
|
}
|
||||||
|
null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e: RedisConnectionFailureException) {
|
} catch (e: RedisConnectionFailureException) {
|
||||||
@@ -285,7 +274,7 @@ class RedisDistributedCache(
|
|||||||
override fun multiDelete(keys: Collection<String>) {
|
override fun multiDelete(keys: Collection<String>) {
|
||||||
val prefixedKeys = keys.map { addPrefix(it) }
|
val prefixedKeys = keys.map { addPrefix(it) }
|
||||||
|
|
||||||
// Remove from local cache
|
// Remove from the local cache
|
||||||
prefixedKeys.forEach { localCache.remove(it) }
|
prefixedKeys.forEach { localCache.remove(it) }
|
||||||
|
|
||||||
// If we're disconnected, mark all as dirty and return
|
// If we're disconnected, mark all as dirty and return
|
||||||
@@ -336,15 +325,21 @@ class RedisDistributedCache(
|
|||||||
// Entry exists locally, update in Redis
|
// Entry exists locally, update in Redis
|
||||||
try {
|
try {
|
||||||
val bytes = serializer.serializeEntry(localEntry)
|
val bytes = serializer.serializeEntry(localEntry)
|
||||||
|
|
||||||
|
// Die 'set'-Methode erwartet kein TTL-Argument hier
|
||||||
redisTemplate.opsForValue().set(prefixedKey, bytes)
|
redisTemplate.opsForValue().set(prefixedKey, bytes)
|
||||||
|
|
||||||
val ttl = localEntry.expiresAt?.let { Duration.between(Instant.now(), it) }
|
// So wird die Dauer zwischen zwei Instants berechnet
|
||||||
if (ttl != null && !ttl.isNegative) {
|
val ttl = localEntry.expiresAt?.let { it - Clock.System.now() }
|
||||||
redisTemplate.expire(prefixedKey, ttl)
|
|
||||||
|
// 'isNegative' wird zu '< Duration.ZERO'
|
||||||
|
if (ttl != null && ttl > Duration.ZERO) {
|
||||||
|
// KORREKTUR: 'expire' braucht eine java.time.Duration
|
||||||
|
redisTemplate.expire(prefixedKey, ttl.toJavaDuration())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update local entry to mark as clean
|
// Update local entry to mark as clean
|
||||||
localCache[prefixedKey] = localEntry.markClean() as CacheEntry<Any>
|
localCache[prefixedKey] = localEntry.markClean()
|
||||||
dirtyKeys.remove(key)
|
dirtyKeys.remove(key)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.error("Error updating key $prefixedKey during synchronization", e)
|
logger.error("Error updating key $prefixedKey during synchronization", e)
|
||||||
@@ -359,7 +354,7 @@ class RedisDistributedCache(
|
|||||||
val prefixedKey = addPrefix(key)
|
val prefixedKey = addPrefix(key)
|
||||||
val entry = localCache[prefixedKey]
|
val entry = localCache[prefixedKey]
|
||||||
if (entry != null) {
|
if (entry != null) {
|
||||||
localCache[prefixedKey] = entry.markDirty() as CacheEntry<Any>
|
localCache[prefixedKey] = entry.markDirty()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -431,7 +426,7 @@ class RedisDistributedCache(
|
|||||||
if (connectionState != newState) {
|
if (connectionState != newState) {
|
||||||
val oldState = connectionState
|
val oldState = connectionState
|
||||||
connectionState = newState
|
connectionState = newState
|
||||||
lastStateChangeTime = Instant.now()
|
lastStateChangeTime = Clock.System.now()
|
||||||
|
|
||||||
logger.info("Cache connection state changed from $oldState to $newState")
|
logger.info("Cache connection state changed from $oldState to $newState")
|
||||||
|
|
||||||
@@ -455,12 +450,12 @@ class RedisDistributedCache(
|
|||||||
/**
|
/**
|
||||||
* Periodically check the connection to Redis.
|
* Periodically check the connection to Redis.
|
||||||
*/
|
*/
|
||||||
@Scheduled(fixedDelayString = "\${redis.connection-check-interval:10000}")
|
@Scheduled(fixedDelayString = $$"${redis.connection-check-interval:10000}")
|
||||||
fun checkConnection() {
|
fun checkConnection() {
|
||||||
try {
|
try {
|
||||||
redisTemplate.hasKey("connection-test")
|
redisTemplate.hasKey("connection-test")
|
||||||
setConnectionState(ConnectionState.CONNECTED)
|
setConnectionState(ConnectionState.CONNECTED)
|
||||||
} catch (e: Exception) {
|
} catch (_: Exception) {
|
||||||
setConnectionState(ConnectionState.DISCONNECTED)
|
setConnectionState(ConnectionState.DISCONNECTED)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -468,11 +463,11 @@ class RedisDistributedCache(
|
|||||||
/**
|
/**
|
||||||
* Periodically clean up expired entries from the local cache.
|
* Periodically clean up expired entries from the local cache.
|
||||||
*/
|
*/
|
||||||
@Scheduled(fixedDelayString = "\${redis.local-cache-cleanup-interval:60000}")
|
@Scheduled(fixedDelayString = $$"${redis.local-cache-cleanup-interval:60000}")
|
||||||
fun cleanupLocalCache() {
|
fun cleanupLocalCache() {
|
||||||
val now = Instant.now()
|
val now = Clock.System.now()
|
||||||
val expiredKeys = localCache.entries
|
val expiredKeys = localCache.entries
|
||||||
.filter { it.value.expiresAt?.isBefore(now) ?: false }
|
.filter { it.value.expiresAt?.let { exp -> exp < now } ?: false }
|
||||||
.map { it.key }
|
.map { it.key }
|
||||||
|
|
||||||
expiredKeys.forEach { localCache.remove(it) }
|
expiredKeys.forEach { localCache.remove(it) }
|
||||||
@@ -485,7 +480,7 @@ class RedisDistributedCache(
|
|||||||
/**
|
/**
|
||||||
* Periodically synchronize dirty keys when connected.
|
* Periodically synchronize dirty keys when connected.
|
||||||
*/
|
*/
|
||||||
@Scheduled(fixedDelayString = "\${redis.sync-interval:300000}")
|
@Scheduled(fixedDelayString = $$"${redis.sync-interval:300000}")
|
||||||
fun scheduledSync() {
|
fun scheduledSync() {
|
||||||
if (isConnected() && dirtyKeys.isNotEmpty()) {
|
if (isConnected() && dirtyKeys.isNotEmpty()) {
|
||||||
synchronize(null)
|
synchronize(null)
|
||||||
|
|||||||
+57
-193
@@ -1,11 +1,9 @@
|
|||||||
package at.mocode.infrastructure.cache.redis
|
package at.mocode.infrastructure.cache.redis
|
||||||
|
|
||||||
import at.mocode.infrastructure.cache.api.CacheConfiguration
|
import at.mocode.infrastructure.cache.api.*
|
||||||
import at.mocode.infrastructure.cache.api.CacheSerializer
|
|
||||||
import at.mocode.infrastructure.cache.api.ConnectionState
|
|
||||||
import at.mocode.infrastructure.cache.api.DefaultCacheConfiguration
|
|
||||||
import io.mockk.every
|
import io.mockk.every
|
||||||
import io.mockk.mockk
|
import io.mockk.mockk
|
||||||
|
import io.mockk.verify
|
||||||
import org.junit.jupiter.api.AfterEach
|
import org.junit.jupiter.api.AfterEach
|
||||||
import org.junit.jupiter.api.BeforeEach
|
import org.junit.jupiter.api.BeforeEach
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
@@ -19,8 +17,10 @@ import org.testcontainers.containers.GenericContainer
|
|||||||
import org.testcontainers.junit.jupiter.Container
|
import org.testcontainers.junit.jupiter.Container
|
||||||
import org.testcontainers.junit.jupiter.Testcontainers
|
import org.testcontainers.junit.jupiter.Testcontainers
|
||||||
import org.testcontainers.utility.DockerImageName
|
import org.testcontainers.utility.DockerImageName
|
||||||
import java.time.Duration
|
|
||||||
import kotlin.test.*
|
import kotlin.test.*
|
||||||
|
import kotlin.time.Duration.Companion.milliseconds
|
||||||
|
import kotlin.time.Duration.Companion.minutes
|
||||||
|
import java.time.Duration as JavaDuration // Alias für Eindeutigkeit
|
||||||
|
|
||||||
@Testcontainers
|
@Testcontainers
|
||||||
class RedisDistributedCacheTest {
|
class RedisDistributedCacheTest {
|
||||||
@@ -54,14 +54,12 @@ class RedisDistributedCacheTest {
|
|||||||
|
|
||||||
serializer = JacksonCacheSerializer()
|
serializer = JacksonCacheSerializer()
|
||||||
config = DefaultCacheConfiguration(
|
config = DefaultCacheConfiguration(
|
||||||
keyPrefix = "test:",
|
keyPrefix = "test",
|
||||||
offlineModeEnabled = true,
|
offlineModeEnabled = true,
|
||||||
defaultTtl = Duration.ofMinutes(30)
|
defaultTtl = 30.minutes
|
||||||
)
|
)
|
||||||
|
|
||||||
cache = RedisDistributedCache(redisTemplate, serializer, config)
|
cache = RedisDistributedCache(redisTemplate, serializer, config)
|
||||||
|
|
||||||
// Clear the cache before each test
|
|
||||||
cache.clear()
|
cache.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,40 +69,29 @@ class RedisDistributedCacheTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test basic cache operations`() {
|
fun `get should return value with new reified extension function`() {
|
||||||
// Set a value
|
|
||||||
cache.set("key1", "value1")
|
cache.set("key1", "value1")
|
||||||
|
val value = cache.get<String>("key1")
|
||||||
|
assertEquals("value1", value)
|
||||||
|
}
|
||||||
|
|
||||||
// Get the value
|
@Test
|
||||||
|
fun `test basic cache operations`() {
|
||||||
|
cache.set("key1", "value1")
|
||||||
val value = cache.get("key1", String::class.java)
|
val value = cache.get("key1", String::class.java)
|
||||||
assertEquals("value1", value)
|
assertEquals("value1", value)
|
||||||
|
|
||||||
// Check if the key exists
|
|
||||||
assertTrue(cache.exists("key1"))
|
assertTrue(cache.exists("key1"))
|
||||||
|
|
||||||
// Delete the key
|
|
||||||
cache.delete("key1")
|
cache.delete("key1")
|
||||||
|
|
||||||
// Verify it's gone
|
|
||||||
assertFalse(cache.exists("key1"))
|
assertFalse(cache.exists("key1"))
|
||||||
assertNull(cache.get("key1", String::class.java))
|
assertNull(cache.get("key1", String::class.java))
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test cache with TTL`() {
|
fun `test cache with TTL`() {
|
||||||
// Set a value with a short TTL
|
cache.set("key2", "value2", 100.milliseconds)
|
||||||
cache.set("key2", "value2", Duration.ofMillis(100))
|
|
||||||
|
|
||||||
// Verify it exists
|
|
||||||
assertTrue(cache.exists("key2"))
|
assertTrue(cache.exists("key2"))
|
||||||
assertEquals("value2", cache.get("key2", String::class.java))
|
|
||||||
|
|
||||||
// Wait for it to expire
|
|
||||||
Thread.sleep(200)
|
Thread.sleep(200)
|
||||||
|
|
||||||
// Verify it's gone
|
|
||||||
assertFalse(cache.exists("key2"))
|
assertFalse(cache.exists("key2"))
|
||||||
assertNull(cache.get("key2", String::class.java))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -135,45 +122,58 @@ class RedisDistributedCacheTest {
|
|||||||
assertNull(remainingValues["batch3"])
|
assertNull(remainingValues["batch3"])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: Tests that stop and restart the container are commented out
|
|
||||||
// as they interfere with the Testcontainers lifecycle management
|
|
||||||
/*
|
|
||||||
@Test
|
@Test
|
||||||
fun `test offline capability`() {
|
fun `should handle offline mode and synchronize correctly`() {
|
||||||
// Set a value
|
// Arrange
|
||||||
cache.set("offline1", "value1")
|
val mockTemplate = mockk<RedisTemplate<String, ByteArray>>(relaxed = true)
|
||||||
|
val mockValueOps = mockk<ValueOperations<String, ByteArray>>(relaxed = true)
|
||||||
|
every { mockTemplate.opsForValue() } returns mockValueOps
|
||||||
|
|
||||||
// Simulate going offline by stopping the Redis container
|
val offlineCache = RedisDistributedCache(mockTemplate, serializer, config)
|
||||||
redisContainer.stop()
|
|
||||||
|
|
||||||
// Verify connection state is DISCONNECTED
|
// 1. Online-Phase
|
||||||
assertEquals(ConnectionState.DISCONNECTED, cache.getConnectionState())
|
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } returns Unit
|
||||||
|
offlineCache.set("key1", "online-value")
|
||||||
|
verify(exactly = 1) { mockValueOps.set(eq("test:key1"), any<ByteArray>(), any<JavaDuration>()) }
|
||||||
|
|
||||||
// We should still be able to get the value from local cache
|
// 2. Offline-Phase simulieren
|
||||||
assertEquals("value1", cache.get("offline1", String::class.java))
|
every {
|
||||||
|
mockValueOps.set(
|
||||||
|
any<String>(),
|
||||||
|
any<ByteArray>(),
|
||||||
|
any<JavaDuration>()
|
||||||
|
)
|
||||||
|
} throws RedisConnectionFailureException("Redis is down")
|
||||||
|
every { mockTemplate.delete(any<String>()) } throws RedisConnectionFailureException("Redis is down")
|
||||||
|
|
||||||
// Set a new value while offline
|
offlineCache.set("key2", "offline-value")
|
||||||
cache.set("offline2", "value2")
|
offlineCache.delete("key1")
|
||||||
|
|
||||||
// Verify it's marked as dirty
|
assertEquals("offline-value", offlineCache.get<String>("key2"))
|
||||||
assertTrue(cache.getDirtyKeys().contains("offline2"))
|
assertTrue(offlineCache.getDirtyKeys().contains("key2"))
|
||||||
|
assertTrue(offlineCache.getDirtyKeys().contains("key1"))
|
||||||
|
|
||||||
// Start Redis again
|
// 3. Wiederverbindungs-Phase
|
||||||
redisContainer.start()
|
every { mockValueOps.set(any<String>(), any<ByteArray>(), any<JavaDuration>()) } returns Unit
|
||||||
|
every { mockTemplate.delete(any<String>()) } returns true
|
||||||
|
every { mockTemplate.hasKey("connection-test") } returns true
|
||||||
|
|
||||||
// Manually trigger synchronization
|
offlineCache.checkConnection()
|
||||||
cache.synchronize(null)
|
|
||||||
|
|
||||||
// Verify connection state is CONNECTED
|
verify(exactly = 1) { mockValueOps.set(eq("test:key1"), any<ByteArray>(), any<JavaDuration>()) }
|
||||||
assertEquals(ConnectionState.CONNECTED, cache.getConnectionState())
|
verify(exactly = 1) { mockTemplate.delete(eq("test:key1")) }
|
||||||
|
assertTrue(offlineCache.getDirtyKeys().isEmpty(), "Dirty keys should be empty after sync")
|
||||||
// Verify the value set while offline is now in Redis
|
}
|
||||||
assertEquals("value2", cache.get("offline2", String::class.java))
|
|
||||||
|
@Test
|
||||||
// Verify it's no longer marked as dirty
|
fun `test multiSet with TTL`() {
|
||||||
assertFalse(cache.getDirtyKeys().contains("offline2"))
|
val entries = mapOf("batchTtl1" to "value1", "batchTtl2" to "value2")
|
||||||
|
cache.multiSet(entries, 100.milliseconds)
|
||||||
|
|
||||||
|
assertTrue(cache.exists("batchTtl1"))
|
||||||
|
Thread.sleep(200)
|
||||||
|
assertFalse(cache.exists("batchTtl1"))
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test complex objects`() {
|
fun `test complex objects`() {
|
||||||
@@ -195,121 +195,6 @@ class RedisDistributedCacheTest {
|
|||||||
assertTrue(retrievedPerson.hobbies.contains("Hiking"))
|
assertTrue(retrievedPerson.hobbies.contains("Hiking"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: Tests that stop and restart the container are commented out
|
|
||||||
/*
|
|
||||||
@Test
|
|
||||||
fun `test connection state listeners`() {
|
|
||||||
// Create a mock listener
|
|
||||||
val listener = mockk<ConnectionStateListener>(relaxed = true)
|
|
||||||
|
|
||||||
// Register the listener
|
|
||||||
cache.registerConnectionListener(listener)
|
|
||||||
|
|
||||||
// Simulate disconnection
|
|
||||||
redisContainer.stop()
|
|
||||||
|
|
||||||
// Manually trigger connection check
|
|
||||||
cache.checkConnection()
|
|
||||||
|
|
||||||
// Verify listener was called with DISCONNECTED state
|
|
||||||
verify(exactly = 1) {
|
|
||||||
listener.onConnectionStateChanged(ConnectionState.DISCONNECTED, any())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start Redis again
|
|
||||||
redisContainer.start()
|
|
||||||
|
|
||||||
// Manually trigger connection check
|
|
||||||
cache.checkConnection()
|
|
||||||
|
|
||||||
// Verify listener was called with CONNECTED state
|
|
||||||
verify(exactly = 1) {
|
|
||||||
listener.onConnectionStateChanged(ConnectionState.CONNECTED, any())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unregister the listener
|
|
||||||
cache.unregisterConnectionListener(listener)
|
|
||||||
|
|
||||||
// Simulate disconnection again
|
|
||||||
redisContainer.stop()
|
|
||||||
cache.checkConnection()
|
|
||||||
|
|
||||||
// Verify listener was not called again (still only once for DISCONNECTED)
|
|
||||||
verify(exactly = 1) {
|
|
||||||
listener.onConnectionStateChanged(ConnectionState.DISCONNECTED, any())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `test scheduled tasks`() {
|
|
||||||
// Set a value with a short TTL
|
|
||||||
cache.set("scheduled1", "value1", Duration.ofMillis(100))
|
|
||||||
|
|
||||||
// Wait for it to expire
|
|
||||||
Thread.sleep(200)
|
|
||||||
|
|
||||||
// Manually trigger cleanup
|
|
||||||
cache.cleanupLocalCache()
|
|
||||||
|
|
||||||
// Verify it's gone from local cache
|
|
||||||
assertNull(cache.get("scheduled1", String::class.java))
|
|
||||||
|
|
||||||
// Set a value while Redis is down
|
|
||||||
redisContainer.stop()
|
|
||||||
cache.set("scheduled2", "value2")
|
|
||||||
|
|
||||||
// Verify it's marked as dirty
|
|
||||||
assertTrue(cache.getDirtyKeys().contains("scheduled2"))
|
|
||||||
|
|
||||||
// Start Redis again
|
|
||||||
redisContainer.start()
|
|
||||||
|
|
||||||
// Manually trigger scheduled sync
|
|
||||||
cache.scheduledSync()
|
|
||||||
|
|
||||||
// Verify it's no longer marked as dirty
|
|
||||||
assertFalse(cache.getDirtyKeys().contains("scheduled2"))
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `test synchronize with specific keys`() {
|
|
||||||
// Set multiple values
|
|
||||||
cache.set("sync1", "value1")
|
|
||||||
cache.set("sync2", "value2")
|
|
||||||
cache.set("sync3", "value3")
|
|
||||||
|
|
||||||
// Simulate going offline
|
|
||||||
redisContainer.stop()
|
|
||||||
|
|
||||||
// Update values while offline
|
|
||||||
cache.set("sync1", "updated1")
|
|
||||||
cache.set("sync2", "updated2")
|
|
||||||
|
|
||||||
// Verify they're marked as dirty
|
|
||||||
assertTrue(cache.getDirtyKeys().contains("sync1"))
|
|
||||||
assertTrue(cache.getDirtyKeys().contains("sync2"))
|
|
||||||
|
|
||||||
// Start Redis again
|
|
||||||
redisContainer.start()
|
|
||||||
|
|
||||||
// Synchronize only specific keys
|
|
||||||
cache.synchronize(listOf("sync1"))
|
|
||||||
|
|
||||||
// Verify only sync1 is no longer dirty
|
|
||||||
assertFalse(cache.getDirtyKeys().contains("sync1"))
|
|
||||||
assertTrue(cache.getDirtyKeys().contains("sync2"))
|
|
||||||
|
|
||||||
// Verify the values in Redis
|
|
||||||
assertEquals("updated1", cache.get("sync1", String::class.java))
|
|
||||||
|
|
||||||
// Now synchronize all
|
|
||||||
cache.synchronize(null)
|
|
||||||
|
|
||||||
// Verify all are no longer dirty
|
|
||||||
assertFalse(cache.getDirtyKeys().contains("sync2"))
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `test clear method`() {
|
fun `test clear method`() {
|
||||||
// Set multiple values
|
// Set multiple values
|
||||||
@@ -376,27 +261,6 @@ class RedisDistributedCacheTest {
|
|||||||
assertEquals("value", cache.get("defaultTtl", String::class.java))
|
assertEquals("value", cache.get("defaultTtl", String::class.java))
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `test multiSet with TTL`() {
|
|
||||||
// Set multiple values with TTL
|
|
||||||
val entries = mapOf(
|
|
||||||
"batchTtl1" to "value1",
|
|
||||||
"batchTtl2" to "value2"
|
|
||||||
)
|
|
||||||
cache.multiSet(entries, Duration.ofMillis(100))
|
|
||||||
|
|
||||||
// Verify they exist
|
|
||||||
assertTrue(cache.exists("batchTtl1"))
|
|
||||||
assertTrue(cache.exists("batchTtl2"))
|
|
||||||
|
|
||||||
// Wait for them to expire
|
|
||||||
Thread.sleep(200)
|
|
||||||
|
|
||||||
// Verify they're gone
|
|
||||||
assertFalse(cache.exists("batchTtl1"))
|
|
||||||
assertFalse(cache.exists("batchTtl2"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test data class
|
// Test data class
|
||||||
data class Person(
|
data class Person(
|
||||||
val name: String,
|
val name: String,
|
||||||
|
|||||||
Reference in New Issue
Block a user