fixing(gradle)
This commit is contained in:
+90
-10
@@ -103,9 +103,65 @@ class RedisEventConsumerResilienceTest {
|
||||
}
|
||||
|
||||
private fun cleanupRedis() {
|
||||
val keys = redisTemplate.keys("${properties.streamPrefix}*")
|
||||
if (!keys.isNullOrEmpty()) {
|
||||
redisTemplate.delete(keys)
|
||||
try {
|
||||
val streamKey = "${properties.streamPrefix}${properties.allEventsStream}"
|
||||
|
||||
// First, try to destroy the consumer group multiple times with retry logic
|
||||
var attempts = 0
|
||||
while (attempts < 3) {
|
||||
try {
|
||||
redisTemplate.opsForStream<String, String>()
|
||||
.destroyGroup(streamKey, properties.consumerGroup)
|
||||
logger.debug("Successfully destroyed consumer group: ${properties.consumerGroup}")
|
||||
break
|
||||
} catch (e: Exception) {
|
||||
attempts++
|
||||
if (e.message?.contains("NOGROUP") == true) {
|
||||
// Group doesn't exist, which is fine
|
||||
break
|
||||
}
|
||||
if (attempts < 3) {
|
||||
Thread.sleep(100) // Wait before retry
|
||||
} else {
|
||||
logger.debug("Could not destroy consumer group after 3 attempts: ${e.message}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for group destruction to complete
|
||||
Thread.sleep(100)
|
||||
|
||||
// Then delete all stream-related keys
|
||||
val keys = redisTemplate.keys("${properties.streamPrefix}*")
|
||||
if (!keys.isNullOrEmpty()) {
|
||||
redisTemplate.delete(keys)
|
||||
logger.debug("Deleted ${keys.size} Redis keys with prefix: ${properties.streamPrefix}")
|
||||
}
|
||||
|
||||
// Wait for Redis operations to complete
|
||||
Thread.sleep(200)
|
||||
|
||||
// Verify cleanup by checking if keys still exist
|
||||
val remainingKeys = redisTemplate.keys("${properties.streamPrefix}*")
|
||||
if (!remainingKeys.isNullOrEmpty()) {
|
||||
logger.warn("Some keys still exist after cleanup: $remainingKeys")
|
||||
// Force delete remaining keys
|
||||
redisTemplate.delete(remainingKeys)
|
||||
Thread.sleep(100)
|
||||
}
|
||||
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Error during Redis cleanup: ${e.message}", e)
|
||||
// Additional cleanup attempt
|
||||
try {
|
||||
Thread.sleep(200)
|
||||
val keys = redisTemplate.keys("${properties.streamPrefix}*")
|
||||
if (!keys.isNullOrEmpty()) {
|
||||
redisTemplate.delete(keys)
|
||||
}
|
||||
} catch (retryException: Exception) {
|
||||
logger.warn("Retry cleanup also failed: ${retryException.message}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,12 +202,26 @@ class RedisEventConsumerResilienceTest {
|
||||
|
||||
eventStore.appendToStream(listOf(event1, event2), aggregateId, 0)
|
||||
|
||||
// Let both consumers poll
|
||||
consumer1.pollEvents()
|
||||
consumer2.pollEvents()
|
||||
// Let both consumers poll multiple times to ensure all events are processed
|
||||
val executor = Executors.newFixedThreadPool(2)
|
||||
|
||||
// Wait for processing
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS), "Events were not processed within timeout")
|
||||
executor.submit {
|
||||
repeat(5) {
|
||||
consumer1.pollEvents()
|
||||
Thread.sleep(50)
|
||||
}
|
||||
}
|
||||
|
||||
executor.submit {
|
||||
repeat(5) {
|
||||
consumer2.pollEvents()
|
||||
Thread.sleep(50)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for processing with increased timeout
|
||||
assertTrue(latch.await(10, TimeUnit.SECONDS), "Events were not processed within timeout")
|
||||
executor.shutdown()
|
||||
|
||||
// Verify that events were processed (by either consumer due to consumer groups)
|
||||
assertTrue(processedEvents.size >= 2, "Expected at least 2 processed events, got ${processedEvents.size}")
|
||||
@@ -285,7 +355,7 @@ class RedisEventConsumerResilienceTest {
|
||||
consumer1.pollEvents()
|
||||
|
||||
// Wait for processing to complete
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS), "Slow events were not processed within timeout")
|
||||
assertTrue(latch.await(10, TimeUnit.SECONDS), "Slow events were not processed within timeout")
|
||||
val totalTime = System.currentTimeMillis() - startTime
|
||||
|
||||
// Verify all events were processed
|
||||
@@ -361,6 +431,9 @@ class RedisEventConsumerResilienceTest {
|
||||
|
||||
@Test
|
||||
fun `should handle event handler exceptions gracefully without stopping processing`() {
|
||||
// Ensure clean state for this test
|
||||
cleanupRedis()
|
||||
|
||||
val aggregateId = UUID.randomUUID()
|
||||
val processedEvents = CopyOnWriteArrayList<String>()
|
||||
val latch = CountDownLatch(3) // Expecting 3 events to be processed (2 success + 1 failure)
|
||||
@@ -388,7 +461,14 @@ class RedisEventConsumerResilienceTest {
|
||||
)
|
||||
|
||||
eventStore.appendToStream(events, aggregateId, 0)
|
||||
consumer1.pollEvents()
|
||||
|
||||
// Poll multiple times to ensure all events are processed
|
||||
// This is necessary because Redis streams might not deliver all events in a single poll
|
||||
for (i in 1..10) {
|
||||
consumer1.pollEvents()
|
||||
Thread.sleep(100)
|
||||
if (latch.count == 0L) break
|
||||
}
|
||||
|
||||
// Wait for processing
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS), "Events were not processed within timeout")
|
||||
|
||||
Reference in New Issue
Block a user