Introduce Ktor-based HTTP server for Masterdata context, implement upsert logic for Altersklasse, Bundesland, and Land repositories, enhance IdempotencyPlugin, and add integration tests.
Build and Publish Docker Images / build-and-push (., backend/services/ping/Dockerfile, ping-service, ping-service) (push) Has been cancelled
Build and Publish Docker Images / build-and-push (., config/docker/caddy/web-app/Dockerfile, web-app, web-app) (push) Has been cancelled
Build and Publish Docker Images / build-and-push (., config/docker/keycloak/Dockerfile, keycloak, keycloak) (push) Has been cancelled
Build and Publish Docker Images / build-and-push (., backend/infrastructure/gateway/Dockerfile, api-gateway, api-gateway) (push) Has been cancelled
Build and Publish Docker Images / build-and-push (., backend/services/ping/Dockerfile, ping-service, ping-service) (push) Has been cancelled
Build and Publish Docker Images / build-and-push (., config/docker/caddy/web-app/Dockerfile, web-app, web-app) (push) Has been cancelled
Build and Publish Docker Images / build-and-push (., config/docker/keycloak/Dockerfile, keycloak, keycloak) (push) Has been cancelled
Build and Publish Docker Images / build-and-push (., backend/infrastructure/gateway/Dockerfile, api-gateway, api-gateway) (push) Has been cancelled
This commit is contained in:
@@ -27,5 +27,6 @@ dependencies {
|
||||
|
||||
// Testing
|
||||
testImplementation(projects.platform.platformTesting)
|
||||
testImplementation(libs.ktor.server.tests)
|
||||
// Ktor 3.x: Verwende das Test-Host-Artefakt statt des veralteten "ktor-server-tests-jvm"
|
||||
testImplementation(libs.ktor.server.testHost)
|
||||
}
|
||||
|
||||
+33
@@ -0,0 +1,33 @@
|
||||
package at.mocode.masterdata.api
|
||||
|
||||
import at.mocode.masterdata.api.plugins.IdempotencyPlugin
|
||||
import at.mocode.masterdata.api.rest.AltersklasseController
|
||||
import at.mocode.masterdata.api.rest.BundeslandController
|
||||
import at.mocode.masterdata.api.rest.CountryController
|
||||
import at.mocode.masterdata.api.rest.PlatzController
|
||||
import io.ktor.server.application.Application
|
||||
import io.ktor.server.routing.routing
|
||||
|
||||
/**
|
||||
* Ktor-Modul für den Masterdata-Bounded-Context.
|
||||
*
|
||||
* - Installiert das IdempotencyPlugin (Header „Idempotency-Key“) global.
|
||||
* - Registriert alle Masterdata-Routen (Country, Bundesland, Altersklasse, Platz).
|
||||
*/
|
||||
fun Application.masterdataApiModule(
|
||||
countryController: CountryController,
|
||||
bundeslandController: BundeslandController,
|
||||
altersklasseController: AltersklasseController,
|
||||
platzController: PlatzController
|
||||
) {
|
||||
// Installiere das Idempotency-Plugin global für alle Routen
|
||||
IdempotencyPlugin.install(this)
|
||||
|
||||
// Registriere die REST-Routen der Controller
|
||||
routing {
|
||||
with(countryController) { registerRoutes() }
|
||||
with(bundeslandController) { registerRoutes() }
|
||||
with(altersklasseController) { registerRoutes() }
|
||||
with(platzController) { registerRoutes() }
|
||||
}
|
||||
}
|
||||
+159
-3
@@ -1,26 +1,182 @@
|
||||
package at.mocode.masterdata.api.plugins
|
||||
|
||||
import io.ktor.http.*
|
||||
import io.ktor.http.content.*
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.server.request.*
|
||||
import io.ktor.util.*
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.Job
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
/**
|
||||
* Minimaler Ktor-Plugin-Skeleton für Idempotency-Key Verarbeitung.
|
||||
* Aktuell: Stellt den Header `Idempotency-Key` als Call-Attribut bereit.
|
||||
* Erweiterung (Cache/Short-Circuit) folgt im nächsten Schritt.
|
||||
* Ktor-Plugin für Idempotency-Key Verarbeitung mit leichtgewichtigem In-Memory-Cache (TTL, Default 5 Min).
|
||||
*
|
||||
* Verhalten:
|
||||
* - Liest Header "Idempotency-Key" für POST/PUT.
|
||||
* - Prüft Cache: bei Treffer (nicht abgelaufen) wird die zuvor gespeicherte Response sofort zurückgegeben (Short-Circuit).
|
||||
* - Nach erfolgreicher Antwort (200/201/204) wird – falls möglich – der serialisierte Response-Body inkl. Status/Headers im Cache abgelegt.
|
||||
*
|
||||
* Hinweise:
|
||||
* - Es werden ausschließlich Antworten zwischengespeichert, deren Body als Text oder ByteArray vorliegt (typisch für JSON).
|
||||
* - Kein Persistenz-Layer (lean MVP). Für Multi-Node/HA später Option B (idempotency_records) umsetzen.
|
||||
*/
|
||||
object IdempotencyPlugin {
|
||||
@JvmInline
|
||||
value class Configuration(val ttl: Duration = Duration.ofMinutes(5))
|
||||
|
||||
val IdempotencyKeyAttr: AttributeKey<String> = AttributeKey("IdempotencyKey")
|
||||
private val CacheAttr: AttributeKey<java.util.concurrent.ConcurrentHashMap<String, CacheEntry>> = AttributeKey("IdempotencyCache")
|
||||
private val InflightAttr: AttributeKey<java.util.concurrent.ConcurrentHashMap<String, CompletableDeferred<CacheEntry>>> = AttributeKey("IdempotencyInflight")
|
||||
private val LeaderFlagAttr: AttributeKey<Boolean> = AttributeKey("IdempotencyLeaderFlag")
|
||||
|
||||
private data class CacheEntry(
|
||||
val status: HttpStatusCode,
|
||||
val contentType: ContentType?,
|
||||
val body: ByteArray,
|
||||
val storedAtMillis: Long
|
||||
)
|
||||
|
||||
// Hinweis: Kein globaler Cache mehr! Der Cache ist pro Application-Instance gebunden,
|
||||
// um Test-Interferenzen und Cross-App-Leaks zu vermeiden.
|
||||
|
||||
fun install(application: Application, configuration: Configuration = Configuration()) {
|
||||
val ttlMillis = configuration.ttl.toMillis()
|
||||
|
||||
// Per-Application Cache initialisieren (falls nicht vorhanden)
|
||||
if (!application.attributes.contains(CacheAttr)) {
|
||||
application.attributes.put(CacheAttr, ConcurrentHashMap())
|
||||
}
|
||||
if (!application.attributes.contains(InflightAttr)) {
|
||||
application.attributes.put(InflightAttr, ConcurrentHashMap())
|
||||
}
|
||||
|
||||
// Vor der eigentlichen Verarbeitung: Cache prüfen und ggf. Short-Circuit
|
||||
application.intercept(ApplicationCallPipeline.Plugins) {
|
||||
val key = call.request.headers["Idempotency-Key"]?.trim()
|
||||
if (!key.isNullOrBlank()) {
|
||||
call.attributes.put(IdempotencyKeyAttr, key)
|
||||
val cache = application.attributes[CacheAttr]
|
||||
val entry = cache[key]
|
||||
if (entry != null) {
|
||||
val now = System.currentTimeMillis()
|
||||
if (now - entry.storedAtMillis <= ttlMillis) {
|
||||
// Short-Circuit mit gecachter Antwort
|
||||
call.response.status(entry.status)
|
||||
val ct = entry.contentType ?: ContentType.Application.Json
|
||||
call.respondBytes(bytes = entry.body, contentType = ct)
|
||||
finish()
|
||||
return@intercept
|
||||
} else if (now - entry.storedAtMillis > ttlMillis) {
|
||||
cache.remove(key)
|
||||
}
|
||||
}
|
||||
|
||||
// Concurrent duplicate handling: warte auf in-flight Ergebnis
|
||||
val inflight = application.attributes[InflightAttr]
|
||||
val parentJob = call.coroutineContext[Job]
|
||||
val deferred = CompletableDeferred<CacheEntry>(parent = parentJob)
|
||||
val existing = inflight.putIfAbsent(key, deferred)
|
||||
if (existing != null) {
|
||||
// Follower: auf Ergebnis warten und sofort antworten
|
||||
try {
|
||||
val result = existing.await()
|
||||
call.response.status(result.status)
|
||||
val ct = result.contentType ?: ContentType.Application.Json
|
||||
call.respondBytes(bytes = result.body, contentType = ct)
|
||||
finish()
|
||||
return@intercept
|
||||
} catch (e: Exception) {
|
||||
// Leader wird Aufräumen übernehmen (invokeOnCompletion). Normale Verarbeitung zulassen.
|
||||
}
|
||||
} else {
|
||||
// Leader dieses Keys für diese Call-Pipeline markieren
|
||||
call.attributes.put(LeaderFlagAttr, true)
|
||||
// Sicherheitsnetz: Wenn der Call endet, aber kein Ergebnis gesetzt wurde,
|
||||
// verhindere hängende Deferreds und bereinige In-Flight-Eintrag.
|
||||
parentJob?.invokeOnCompletion { cause ->
|
||||
val d = inflight.remove(key) ?: return@invokeOnCompletion
|
||||
if (!d.isCompleted) {
|
||||
if (cause != null) d.completeExceptionally(cause)
|
||||
else d.completeExceptionally(IllegalStateException("Idempotency: call finished without completing result"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Nach dem Serialisieren der Antwort: ggf. in Cache legen
|
||||
application.sendPipeline.intercept(ApplicationSendPipeline.After) { subject ->
|
||||
val key = call.attributes.getOrNull(IdempotencyKeyAttr) ?: return@intercept
|
||||
|
||||
val status = call.response.status() ?: return@intercept
|
||||
if (status != HttpStatusCode.OK && status != HttpStatusCode.Created && status != HttpStatusCode.NoContent) return@intercept
|
||||
|
||||
// Nur ausgewählte Content-Typen/Body-Formen cachen
|
||||
var bodyBytes: ByteArray? = null
|
||||
var contentType: ContentType? = null
|
||||
when (subject) {
|
||||
is OutgoingContent.ByteArrayContent -> {
|
||||
bodyBytes = subject.bytes()
|
||||
contentType = subject.contentType
|
||||
}
|
||||
is OutgoingContent.NoContent -> {
|
||||
bodyBytes = ByteArray(0)
|
||||
contentType = subject.contentType
|
||||
}
|
||||
is OutgoingContent.ReadChannelContent -> {
|
||||
// Nicht trivial ohne Consumption; überspringen
|
||||
}
|
||||
is OutgoingContent.WriteChannelContent -> {
|
||||
// Nicht trivial; überspringen
|
||||
}
|
||||
is TextContent -> {
|
||||
bodyBytes = subject.text.toByteArray(Charsets.UTF_8)
|
||||
contentType = subject.contentType
|
||||
}
|
||||
}
|
||||
|
||||
if (bodyBytes != null) {
|
||||
val entry = CacheEntry(
|
||||
status = status,
|
||||
contentType = contentType,
|
||||
body = bodyBytes,
|
||||
storedAtMillis = System.currentTimeMillis()
|
||||
)
|
||||
val cache = application.attributes[CacheAttr]
|
||||
cache[key] = entry
|
||||
|
||||
// Wenn dieser Call der Leader war, vervollständige alle wartenden Requests
|
||||
if (call.attributes.getOrNull(LeaderFlagAttr) == true) {
|
||||
val inflight = application.attributes[InflightAttr]
|
||||
val deferred = inflight.remove(key)
|
||||
deferred?.complete(entry)
|
||||
}
|
||||
} else {
|
||||
// Kein cachebarer Body – in-flight ggf. bereinigen, damit Follower nicht ewig warten
|
||||
if (call.attributes.getOrNull(LeaderFlagAttr) == true) {
|
||||
val inflight = application.attributes[InflightAttr]
|
||||
inflight.remove(key)?.completeExceptionally(IllegalStateException("Idempotency: no cacheable body"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ermöglicht das Leeren des per-Application-Caches (z.B. für Tests).
|
||||
*/
|
||||
fun clear(application: Application) {
|
||||
if (application.attributes.contains(CacheAttr)) {
|
||||
application.attributes[CacheAttr].clear()
|
||||
}
|
||||
if (application.attributes.contains(InflightAttr)) {
|
||||
val inflight = application.attributes[InflightAttr]
|
||||
// Alle offenen Deferreds abbrechen, um Leaks in Tests zu verhindern
|
||||
inflight.values.forEach { d -> if (!d.isCompleted) d.completeExceptionally(CancellationException("Idempotency: cleared")) }
|
||||
inflight.clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+57
@@ -0,0 +1,57 @@
|
||||
package at.mocode.masterdata.api
|
||||
|
||||
import at.mocode.masterdata.api.plugins.IdempotencyPlugin
|
||||
import io.ktor.serialization.kotlinx.json.*
|
||||
import io.ktor.server.application.install
|
||||
import io.ktor.server.plugins.contentnegotiation.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.statement.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.server.routing.*
|
||||
import io.ktor.server.testing.*
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Test
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
class IdempotencyPluginTest {
|
||||
|
||||
@Test
|
||||
fun `second POST with same Idempotency-Key returns cached response and skips handler`() = testApplication {
|
||||
val counter = AtomicInteger(0)
|
||||
|
||||
application {
|
||||
// Install plugin directly for focused unit test
|
||||
this@application.install(ContentNegotiation) { json() }
|
||||
IdempotencyPlugin.install(this@application)
|
||||
routing {
|
||||
post("/echo") {
|
||||
counter.incrementAndGet()
|
||||
// Simuliere erfolgreiche Erstellung mit 201 und JSON-Body (als String, um Serializer-Probleme zu vermeiden)
|
||||
val json = """{"ok":true,"count":${counter.get()}}"""
|
||||
call.respondText(text = json, contentType = ContentType.Application.Json, status = HttpStatusCode.Created)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val key = "test-key-123"
|
||||
val payload = """{"name":"Austria"}"""
|
||||
|
||||
val r1 = client.post("/echo") {
|
||||
header("Idempotency-Key", key)
|
||||
contentType(ContentType.Application.Json)
|
||||
setBody(payload)
|
||||
}
|
||||
val r2 = client.post("/echo") {
|
||||
header("Idempotency-Key", key)
|
||||
contentType(ContentType.Application.Json)
|
||||
setBody(payload)
|
||||
}
|
||||
|
||||
assertThat(r1.status).isEqualTo(HttpStatusCode.Created)
|
||||
assertThat(r2.status).isEqualTo(HttpStatusCode.Created)
|
||||
assertThat(r2.bodyAsText()).isEqualTo(r1.bodyAsText())
|
||||
// Sicherstellen, dass der Handler nur einmal ausgeführt wurde
|
||||
assertThat(counter.get()).isEqualTo(1)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user