refactoring(infra-event-store)

This commit is contained in:
2025-08-15 22:19:04 +02:00
parent 355e272562
commit d2172229b1
3 changed files with 423 additions and 61 deletions
@@ -0,0 +1,241 @@
package at.mocode.infrastructure.eventstore.redis
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.LongAdder
import kotlin.time.toKotlinDuration
/**
* Comprehensive metrics tracking for Redis Event Store operations.
*
* Tracks performance metrics, error rates, and operational statistics
* to provide insights into event store health and performance.
*/
class EventStoreMetrics {
private val logger = LoggerFactory.getLogger(EventStoreMetrics::class.java)
// Operation counters
private val appendOperations = LongAdder()
private val appendBatchOperations = LongAdder()
private val readOperations = LongAdder()
private val subscriptionOperations = LongAdder()
// Success/Error tracking
private val successfulOperations = LongAdder()
private val failedOperations = LongAdder()
private val concurrencyExceptions = LongAdder()
// Performance metrics
private val totalOperationTime = LongAdder()
private val maxOperationTime = AtomicLong(0)
private val operationTimestamps = ConcurrentHashMap<String, Instant>()
// Cache metrics
private val cacheHits = LongAdder()
private val cacheMisses = LongAdder()
// Event statistics
private val totalEventsAppended = LongAdder()
private val totalEventsRead = LongAdder()
private val lastMetricsReport = AtomicLong(System.currentTimeMillis())
/**
* Records the start of an operation for timing purposes.
*/
fun startOperation(operationId: String) {
operationTimestamps[operationId] = Instant.now()
}
/**
* Records a successful append operation.
*/
fun recordAppendSuccess(operationId: String, eventCount: Int = 1, isBatch: Boolean = false) {
recordOperationEnd(operationId, true)
appendOperations.increment()
if (isBatch) appendBatchOperations.increment()
totalEventsAppended.add(eventCount.toLong())
logger.debug("[METRICS] Append operation completed successfully. Events: {}, Batch: {}", eventCount, isBatch)
}
/**
* Records a failed append operation.
*/
fun recordAppendFailure(operationId: String, error: Throwable? = null, isConcurrencyException: Boolean = false) {
recordOperationEnd(operationId, false)
if (isConcurrencyException) {
concurrencyExceptions.increment()
}
logger.debug("[METRICS] Append operation failed. Concurrency conflict: {}, Error: {}",
isConcurrencyException, error?.message ?: "Unknown")
}
/**
* Records a successful read operation.
*/
fun recordReadSuccess(operationId: String, eventCount: Int) {
recordOperationEnd(operationId, true)
readOperations.increment()
totalEventsRead.add(eventCount.toLong())
logger.debug("[METRICS] Read operation completed successfully. Events: {}", eventCount)
}
/**
* Records a failed read operation.
*/
fun recordReadFailure(operationId: String, error: Throwable? = null) {
recordOperationEnd(operationId, false)
logger.debug("[METRICS] Read operation failed. Error: {}", error?.message ?: "Unknown")
}
/**
* Records a cache hit.
*/
fun recordCacheHit() {
cacheHits.increment()
}
/**
* Records a cache miss.
*/
fun recordCacheMiss() {
cacheMisses.increment()
}
/**
* Records a subscription operation.
*/
fun recordSubscription() {
subscriptionOperations.increment()
logger.debug("[METRICS] New subscription created")
}
private fun recordOperationEnd(operationId: String, success: Boolean) {
val startTime = operationTimestamps.remove(operationId)
if (startTime != null) {
val duration = Duration.between(startTime, Instant.now())
val durationMs = duration.toMillis()
totalOperationTime.add(durationMs)
maxOperationTime.updateAndGet { current -> maxOf(current, durationMs) }
if (success) {
successfulOperations.increment()
} else {
failedOperations.increment()
}
}
}
/**
* Gets comprehensive metrics summary.
*/
fun getMetrics(): EventStoreMetricsSnapshot {
val totalOps = successfulOperations.sum() + failedOperations.sum()
val successRate = if (totalOps > 0) (successfulOperations.sum().toDouble() / totalOps * 100) else 0.0
val avgOperationTime = if (totalOps > 0) (totalOperationTime.sum().toDouble() / totalOps) else 0.0
val cacheHitRate = run {
val totalCacheOps = cacheHits.sum() + cacheMisses.sum()
if (totalCacheOps > 0) (cacheHits.sum().toDouble() / totalCacheOps * 100) else 0.0
}
return EventStoreMetricsSnapshot(
totalOperations = totalOps,
successfulOperations = successfulOperations.sum(),
failedOperations = failedOperations.sum(),
successRate = successRate,
appendOperations = appendOperations.sum(),
batchAppendOperations = appendBatchOperations.sum(),
readOperations = readOperations.sum(),
subscriptionOperations = subscriptionOperations.sum(),
concurrencyExceptions = concurrencyExceptions.sum(),
totalEventsAppended = totalEventsAppended.sum(),
totalEventsRead = totalEventsRead.sum(),
averageOperationTimeMs = avgOperationTime,
maxOperationTimeMs = maxOperationTime.get(),
cacheHits = cacheHits.sum(),
cacheMisses = cacheMisses.sum(),
cacheHitRate = cacheHitRate
)
}
/**
* Logs performance metrics if enough time has passed since the last report.
*/
fun logPerformanceMetrics() {
val now = System.currentTimeMillis()
val lastReport = lastMetricsReport.get()
// Log metrics every 5 minutes
if (now - lastReport > 300_000) {
if (lastMetricsReport.compareAndSet(lastReport, now)) {
val metrics = getMetrics()
logger.info("[PERFORMANCE_METRICS] {}", metrics.toLogString())
}
}
}
/**
* Resets all metrics. Useful for testing.
*/
internal fun reset() {
appendOperations.reset()
appendBatchOperations.reset()
readOperations.reset()
subscriptionOperations.reset()
successfulOperations.reset()
failedOperations.reset()
concurrencyExceptions.reset()
totalOperationTime.reset()
maxOperationTime.set(0)
operationTimestamps.clear()
cacheHits.reset()
cacheMisses.reset()
totalEventsAppended.reset()
totalEventsRead.reset()
lastMetricsReport.set(System.currentTimeMillis())
}
}
/**
* Immutable snapshot of event store metrics at a point in time.
*/
data class EventStoreMetricsSnapshot(
val totalOperations: Long,
val successfulOperations: Long,
val failedOperations: Long,
val successRate: Double,
val appendOperations: Long,
val batchAppendOperations: Long,
val readOperations: Long,
val subscriptionOperations: Long,
val concurrencyExceptions: Long,
val totalEventsAppended: Long,
val totalEventsRead: Long,
val averageOperationTimeMs: Double,
val maxOperationTimeMs: Long,
val cacheHits: Long,
val cacheMisses: Long,
val cacheHitRate: Double
) {
fun toLogString(): String {
return "EventStore Metrics: " +
"Operations=${totalOperations}, " +
"Success Rate=${String.format("%.1f%%", successRate)}, " +
"Appends=${appendOperations} (${batchAppendOperations} batches), " +
"Reads=${readOperations}, " +
"Subscriptions=${subscriptionOperations}, " +
"Events Appended=${totalEventsAppended}, " +
"Events Read=${totalEventsRead}, " +
"Avg Time=${String.format("%.1fms", averageOperationTimeMs)}, " +
"Max Time=${maxOperationTimeMs}ms, " +
"Cache Hit Rate=${String.format("%.1f%%", cacheHitRate)}, " +
"Concurrency Conflicts=${concurrencyExceptions}"
}
}
@@ -21,44 +21,74 @@ class RedisEventStore(
) : EventStore {
private val logger = LoggerFactory.getLogger(RedisEventStore::class.java)
private val streamVersionCache = ConcurrentHashMap<UUID, Long>()
private val metrics = EventStoreMetrics()
override fun appendToStream(events: List<DomainEvent>, streamId: UUID, expectedVersion: Long): Long {
if (events.isEmpty()) {
logger.debug("Empty event list provided for stream {}, returning current version", streamId)
return getStreamVersion(streamId)
}
val operationId = "batch-append-${System.nanoTime()}"
metrics.startOperation(operationId)
val aggregateId = events.first().aggregateId
require(events.all { it.aggregateId == aggregateId }) {
"All events must belong to the same aggregate. Expected: $aggregateId, but found mixed aggregate IDs"
}
require(streamId == aggregateId.value) {
"Stream ID $streamId must match aggregate ID ${aggregateId.value}"
}
logger.debug("Appending {} events to stream {} with expected version {}", events.size, streamId, expectedVersion)
var currentVersion = getStreamVersion(streamId)
if (currentVersion != expectedVersion) {
logger.warn("Version conflict detected for stream {}. Expected: {}, current: {}", streamId, expectedVersion, currentVersion)
streamVersionCache.remove(streamId) // Invalidate cache on conflict
val actualVersion = getStreamVersion(streamId) // Re-fetch from Redis
if (actualVersion != expectedVersion) {
throw ConcurrencyException("Concurrency conflict for stream $streamId: expected version $expectedVersion but got $actualVersion")
try {
if (events.isEmpty()) {
logger.debug("Empty event list provided for stream {}, returning current version", streamId)
val version = getStreamVersion(streamId)
metrics.recordAppendSuccess(operationId, 0, true)
return version
}
currentVersion = actualVersion
}
for (event in events) {
currentVersion = appendToStreamInternal(event, streamId, currentVersion)
}
val aggregateId = events.first().aggregateId
require(events.all { it.aggregateId == aggregateId }) {
"All events must belong to the same aggregate. Expected: $aggregateId, but found mixed aggregate IDs"
}
require(streamId == aggregateId.value) {
"Stream ID $streamId must match aggregate ID ${aggregateId.value}"
}
logger.info("Successfully appended {} events to stream {}. New version: {}", events.size, streamId, currentVersion)
return currentVersion
logger.debug("Appending {} events to stream {} with expected version {}", events.size, streamId, expectedVersion)
val currentVersion = validateAndGetCurrentVersion(streamId, expectedVersion)
val newVersion = appendEventsInBatch(events, streamId, currentVersion)
logger.info("Successfully appended {} events to stream {}. New version: {}", events.size, streamId, newVersion)
metrics.recordAppendSuccess(operationId, events.size, true)
metrics.logPerformanceMetrics()
return newVersion
} catch (e: ConcurrencyException) {
metrics.recordAppendFailure(operationId, e, true)
throw e
} catch (e: Exception) {
metrics.recordAppendFailure(operationId, e, false)
throw e
}
}
override fun appendToStream(event: DomainEvent, streamId: UUID, expectedVersion: Long): Long {
logger.debug("Appending single event to stream {} with expected version {}", streamId, expectedVersion)
val operationId = "single-append-${System.nanoTime()}"
metrics.startOperation(operationId)
try {
logger.debug("Appending single event to stream {} with expected version {}", streamId, expectedVersion)
val currentVersion = validateAndGetCurrentVersion(streamId, expectedVersion)
val newVersion = appendToStreamInternal(event, streamId, currentVersion)
logger.info("Successfully appended event to stream {}. New version: {}", streamId, newVersion)
metrics.recordAppendSuccess(operationId, 1, false)
metrics.logPerformanceMetrics()
return newVersion
} catch (e: ConcurrencyException) {
metrics.recordAppendFailure(operationId, e, true)
throw e
} catch (e: Exception) {
metrics.recordAppendFailure(operationId, e, false)
throw e
}
}
/**
* Validates the expected version and returns the current version, handling cache invalidation on conflicts.
*/
private fun validateAndGetCurrentVersion(streamId: UUID, expectedVersion: Long): Long {
var currentVersion = getStreamVersion(streamId)
if (currentVersion != expectedVersion) {
@@ -71,9 +101,55 @@ class RedisEventStore(
currentVersion = actualVersion
}
val newVersion = appendToStreamInternal(event, streamId, currentVersion)
logger.info("Successfully appended event to stream {}. New version: {}", streamId, newVersion)
return newVersion
return currentVersion
}
/**
* Appends multiple events in a single Redis transaction for optimal performance.
*/
private fun appendEventsInBatch(events: List<DomainEvent>, streamId: UUID, currentVersion: Long): Long {
val streamKey = getStreamKey(streamId)
val allEventsStreamKey = getAllEventsStreamKey()
// Validate all events have correct sequential versions
events.forEachIndexed { index, event ->
val expectedVersion = currentVersion + index + 1
require(event.version.value == expectedVersion) {
"Event ${index} version ${event.version.value} does not match expected version $expectedVersion for stream $streamId"
}
}
logger.debug("Writing {} events to stream {} and all-events stream in single transaction", events.size, streamId)
try {
redisTemplate.execute(object : SessionCallback<List<Any>> {
@Throws(DataAccessException::class)
override fun <K : Any?, V : Any?> execute(operations: org.springframework.data.redis.core.RedisOperations<K, V>): List<Any> {
val streamOps = (operations as StringRedisTemplate).opsForStream<String, String>()
operations.multi()
// Add all events to both streams in a single transaction
events.forEach { event ->
val eventData = serializer.serialize(event)
streamOps.add(streamKey, eventData)
streamOps.add(allEventsStreamKey, eventData)
}
return operations.exec()
}
})
val newVersion = currentVersion + events.size
streamVersionCache[streamId] = newVersion
logger.debug("Successfully wrote {} events to Redis streams in batch, updated cache version to {}", events.size, newVersion)
return newVersion
} catch (e: Exception) {
logger.error("Failed to append {} events in batch for stream {}: {}", events.size, streamId, e.message, e)
streamVersionCache.remove(streamId)
throw e
}
}
private fun appendToStreamInternal(event: DomainEvent, streamId: UUID, currentVersion: Long): Long {
@@ -113,24 +189,41 @@ class RedisEventStore(
}
override fun readFromStream(streamId: UUID, fromVersion: Long, toVersion: Long?): List<DomainEvent> {
val streamKey = getStreamKey(streamId)
val range = Range.of(Range.Bound.inclusive("-"), Range.Bound.unbounded())
val operationId = "read-stream-${System.nanoTime()}"
metrics.startOperation(operationId)
val records = redisTemplate.opsForStream<String, String>().range(streamKey, range)
val events = records?.mapNotNull { record ->
try {
serializer.deserialize(record.value)
} catch (e: Exception) {
logger.error("Error deserializing event from stream {}: {}", streamId, e.message, e)
null
}
} ?: emptyList()
try {
val streamKey = getStreamKey(streamId)
val range = Range.of(Range.Bound.inclusive("-"), Range.Bound.unbounded())
return events.filter { it.version >= EventVersion(fromVersion) && (toVersion == null || it.version <= EventVersion(toVersion)) }
val records = redisTemplate.opsForStream<String, String>().range(streamKey, range)
val events = records?.mapNotNull { record ->
try {
serializer.deserialize(record.value)
} catch (e: Exception) {
logger.error("Error deserializing event from stream {}: {}", streamId, e.message, e)
null
}
} ?: emptyList()
val filteredEvents = events.filter { it.version >= EventVersion(fromVersion) && (toVersion == null || it.version <= EventVersion(toVersion)) }
metrics.recordReadSuccess(operationId, filteredEvents.size)
return filteredEvents
} catch (e: Exception) {
metrics.recordReadFailure(operationId, e)
throw e
}
}
override fun getStreamVersion(streamId: UUID): Long {
streamVersionCache[streamId]?.let { return it }
streamVersionCache[streamId]?.let {
metrics.recordCacheHit()
return it
}
metrics.recordCacheMiss()
val streamKey = getStreamKey(streamId)
val size = redisTemplate.opsForStream<String, String>().size(streamKey) ?: 0L
streamVersionCache[streamId] = size
@@ -146,30 +239,43 @@ class RedisEventStore(
}
override fun readAllEvents(fromPosition: Long, maxCount: Int?): List<DomainEvent> {
val allEventsStreamKey = getAllEventsStreamKey()
val range = Range.of(Range.Bound.inclusive("-"), Range.Bound.unbounded())
val operationId = "read-all-events-${System.nanoTime()}"
metrics.startOperation(operationId)
val records = redisTemplate.opsForStream<String, String>().range(allEventsStreamKey, range)
val events = records?.mapNotNull { record ->
try {
serializer.deserialize(record.value)
} catch (e: Exception) {
logger.error("Error deserializing event from all events stream: {}", e.message, e)
null
try {
val allEventsStreamKey = getAllEventsStreamKey()
val range = Range.of(Range.Bound.inclusive("-"), Range.Bound.unbounded())
val records = redisTemplate.opsForStream<String, String>().range(allEventsStreamKey, range)
val events = records?.mapNotNull { record ->
try {
serializer.deserialize(record.value)
} catch (e: Exception) {
logger.error("Error deserializing event from all events stream: {}", e.message, e)
null
}
} ?: emptyList()
val filteredEvents = events.drop(fromPosition.toInt())
val result = if (maxCount != null && maxCount > 0) {
filteredEvents.take(maxCount)
} else {
filteredEvents
}
} ?: emptyList()
val filteredEvents = events.drop(fromPosition.toInt())
return if (maxCount != null && maxCount > 0) {
filteredEvents.take(maxCount)
} else {
filteredEvents
metrics.recordReadSuccess(operationId, result.size)
return result
} catch (e: Exception) {
metrics.recordReadFailure(operationId, e)
throw e
}
}
override fun subscribeToStream(streamId: UUID, fromVersion: Long, handler: (DomainEvent) -> Unit): Subscription {
// Basic implementation - for full functionality, integrate with RedisEventConsumer
logger.info("Stream subscription for streamId {} from version {} - basic implementation", streamId, fromVersion)
metrics.recordSubscription()
return BasicSubscription {
logger.info("Unsubscribed from stream {}", streamId)
}
@@ -178,6 +284,7 @@ class RedisEventStore(
override fun subscribeToAll(fromPosition: Long, handler: (DomainEvent) -> Unit): Subscription {
// Basic implementation - for full functionality, integrate with RedisEventConsumer
logger.info("All events subscription from position {} - basic implementation", fromPosition)
metrics.recordSubscription()
return BasicSubscription {
logger.info("Unsubscribed from all events")
}