(vision) SCS/DDD
Service Discovery einführen Consul als Service-Registry implementieren Services für automatische Registrierung konfigurieren Dynamisches Service-Routing im API-Gateway einrichten Health-Checks für jeden Service implementieren
This commit is contained in:
@@ -2,6 +2,7 @@ plugins {
|
||||
alias(libs.plugins.kotlin.multiplatform)
|
||||
alias(libs.plugins.kotlin.serialization)
|
||||
id("org.openapi.generator") version "7.3.0" // Updated to latest version
|
||||
id("com.github.johnrengelman.shadow") version "8.1.1" // Shadow plugin for creating fat JARs
|
||||
}
|
||||
|
||||
// Get project version for documentation versioning
|
||||
@@ -140,8 +141,22 @@ kotlin {
|
||||
implementation(libs.ktor.server.rateLimit)
|
||||
implementation(libs.logback)
|
||||
|
||||
// Datenbankabhängigkeiten für Migrationen
|
||||
implementation("com.zaxxer:HikariCP:5.0.1")
|
||||
// Ktor client dependencies for service discovery
|
||||
implementation("io.ktor:ktor-client-core:${libs.versions.ktor.get()}")
|
||||
implementation("io.ktor:ktor-client-cio:${libs.versions.ktor.get()}")
|
||||
implementation("io.ktor:ktor-client-content-negotiation:${libs.versions.ktor.get()}")
|
||||
implementation("io.ktor:ktor-serialization-kotlinx-json:${libs.versions.ktor.get()}")
|
||||
|
||||
// Monitoring dependencies
|
||||
implementation("io.ktor:ktor-server-metrics-micrometer:${libs.versions.ktor.get()}")
|
||||
implementation("io.micrometer:micrometer-registry-prometheus:${libs.versions.micrometer.get()}")
|
||||
|
||||
// Caching dependencies
|
||||
implementation("org.redisson:redisson:${libs.versions.redisson.get()}")
|
||||
implementation("com.github.ben-manes.caffeine:caffeine:${libs.versions.caffeine.get()}")
|
||||
|
||||
// Database dependencies
|
||||
implementation("com.zaxxer:HikariCP:${libs.versions.hikari.get()}")
|
||||
implementation(libs.exposed.core)
|
||||
implementation(libs.exposed.dao)
|
||||
implementation(libs.exposed.jdbc)
|
||||
@@ -154,3 +169,40 @@ kotlin {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the shadowJar task to create a fat JAR with all dependencies included.
|
||||
* This is required for the Docker build process, which uses this JAR to create the runtime image.
|
||||
* The Dockerfile expects this task to be available with the name 'shadowJar'.
|
||||
*
|
||||
* The Shadow plugin is used to create a single JAR file that includes all dependencies,
|
||||
* making it easier to distribute and run the application in a containerized environment.
|
||||
*/
|
||||
tasks {
|
||||
val shadowJar = register<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>("shadowJar") {
|
||||
// Set the main class for the executable JAR
|
||||
manifest {
|
||||
attributes(mapOf(
|
||||
"Main-Class" to "at.mocode.gateway.ApplicationKt"
|
||||
))
|
||||
}
|
||||
|
||||
// Configure the JAR base name and classifier
|
||||
archiveBaseName.set("api-gateway")
|
||||
archiveClassifier.set("")
|
||||
|
||||
// Configure the Shadow plugin
|
||||
mergeServiceFiles()
|
||||
exclude("META-INF/*.SF", "META-INF/*.DSA", "META-INF/*.RSA")
|
||||
|
||||
// Set the configurations to be included in the fat JAR
|
||||
val jvmMain = kotlin.jvm().compilations.getByName("main")
|
||||
from(jvmMain.output)
|
||||
configurations = listOf(jvmMain.compileDependencyFiles as Configuration)
|
||||
}
|
||||
}
|
||||
|
||||
// Make the build task depend on shadowJar
|
||||
tasks.named("build") {
|
||||
dependsOn("shadowJar")
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package at.mocode.gateway
|
||||
import at.mocode.gateway.config.MigrationSetup
|
||||
import at.mocode.shared.config.AppConfig
|
||||
import at.mocode.shared.database.DatabaseFactory
|
||||
import at.mocode.shared.discovery.ServiceRegistrationFactory
|
||||
import io.ktor.server.engine.*
|
||||
import io.ktor.server.netty.*
|
||||
|
||||
@@ -16,6 +17,25 @@ fun main() {
|
||||
// Migrationen ausführen
|
||||
MigrationSetup.runMigrations()
|
||||
|
||||
// Service mit Consul registrieren
|
||||
val serviceRegistration = if (config.serviceDiscovery.enabled && config.serviceDiscovery.registerServices) {
|
||||
ServiceRegistrationFactory.createServiceRegistration(
|
||||
serviceName = "api-gateway",
|
||||
servicePort = config.server.port,
|
||||
healthCheckPath = "/health",
|
||||
tags = listOf("api", "gateway"),
|
||||
meta = mapOf(
|
||||
"version" to config.appInfo.version,
|
||||
"environment" to config.environment.toString()
|
||||
)
|
||||
).also { it.register() }
|
||||
} else null
|
||||
|
||||
// Shutdown Hook hinzufügen, um Service bei Beendigung abzumelden
|
||||
Runtime.getRuntime().addShutdownHook(Thread {
|
||||
serviceRegistration?.deregister()
|
||||
})
|
||||
|
||||
// Server starten
|
||||
embeddedServer(Netty, port = config.server.port, host = config.server.host) {
|
||||
module()
|
||||
|
||||
@@ -227,6 +227,45 @@ private fun getRolePermissions(roles: List<UserRole>): List<Permission> {
|
||||
return permissions.toList()
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a route scoped plugin for role-based authorization
|
||||
*/
|
||||
private val RoleAuthorizationPlugin = createRouteScopedPlugin(
|
||||
name = "RoleAuthorization",
|
||||
createConfiguration = {
|
||||
// Define the configuration class for the plugin
|
||||
class Configuration {
|
||||
val requiredRoles = mutableListOf<UserRole>()
|
||||
}
|
||||
Configuration()
|
||||
}
|
||||
) {
|
||||
// Plugin configuration
|
||||
val pluginConfig = pluginConfig
|
||||
|
||||
onCall { call ->
|
||||
val principal = call.principal<JWTPrincipal>()
|
||||
val authContext = principal?.getUserAuthContext()
|
||||
|
||||
if (authContext == null) {
|
||||
call.respond(HttpStatusCode.Unauthorized, "Authentication required")
|
||||
return@onCall
|
||||
}
|
||||
|
||||
val hasRequiredRole = pluginConfig.requiredRoles.any { requiredRole ->
|
||||
authContext.roles.contains(requiredRole)
|
||||
}
|
||||
|
||||
if (!hasRequiredRole) {
|
||||
call.respond(
|
||||
HttpStatusCode.Forbidden,
|
||||
"Access denied. Required roles: ${pluginConfig.requiredRoles.joinToString()}"
|
||||
)
|
||||
return@onCall
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Route extension function to require specific roles.
|
||||
*/
|
||||
@@ -239,32 +278,52 @@ fun Route.requireRoles(vararg roles: UserRole, build: Route.() -> Unit): Route {
|
||||
override fun toString(): String = "requireRoles(${roles.joinToString()})"
|
||||
})
|
||||
|
||||
route.intercept(ApplicationCallPipeline.Call) {
|
||||
// Install the role authorization plugin with the specified roles
|
||||
route.install(RoleAuthorizationPlugin) {
|
||||
requiredRoles.addAll(roles)
|
||||
}
|
||||
|
||||
route.build()
|
||||
return route
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a route scoped plugin for permission-based authorization
|
||||
*/
|
||||
private val PermissionAuthorizationPlugin = createRouteScopedPlugin(
|
||||
name = "PermissionAuthorization",
|
||||
createConfiguration = {
|
||||
// Define the configuration class for the plugin
|
||||
class Configuration {
|
||||
val requiredPermissions = mutableListOf<Permission>()
|
||||
}
|
||||
Configuration()
|
||||
}
|
||||
) {
|
||||
// Plugin configuration
|
||||
val pluginConfig = pluginConfig
|
||||
|
||||
onCall { call ->
|
||||
val principal = call.principal<JWTPrincipal>()
|
||||
val authContext = principal?.getUserAuthContext()
|
||||
|
||||
if (authContext == null) {
|
||||
call.respond(HttpStatusCode.Unauthorized, "Authentication required")
|
||||
finish()
|
||||
return@intercept
|
||||
return@onCall
|
||||
}
|
||||
|
||||
val hasRequiredRole = roles.any { requiredRole ->
|
||||
authContext.roles.contains(requiredRole)
|
||||
val hasAllPermissions = pluginConfig.requiredPermissions.all { requiredPermission ->
|
||||
authContext.permissions.contains(requiredPermission)
|
||||
}
|
||||
|
||||
if (!hasRequiredRole) {
|
||||
if (!hasAllPermissions) {
|
||||
call.respond(
|
||||
HttpStatusCode.Forbidden,
|
||||
"Access denied. Required roles: ${roles.joinToString()}"
|
||||
"Access denied. Required permissions: ${pluginConfig.requiredPermissions.joinToString()}"
|
||||
)
|
||||
finish()
|
||||
return@intercept
|
||||
return@onCall
|
||||
}
|
||||
}
|
||||
|
||||
route.build()
|
||||
return route
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -279,28 +338,9 @@ fun Route.requirePermissions(vararg permissions: Permission, build: Route.() ->
|
||||
override fun toString(): String = "requirePermissions(${permissions.joinToString()})"
|
||||
})
|
||||
|
||||
route.intercept(ApplicationCallPipeline.Call) {
|
||||
val principal = call.principal<JWTPrincipal>()
|
||||
val authContext = principal?.getUserAuthContext()
|
||||
|
||||
if (authContext == null) {
|
||||
call.respond(HttpStatusCode.Unauthorized, "Authentication required")
|
||||
finish()
|
||||
return@intercept
|
||||
}
|
||||
|
||||
val hasAllPermissions = permissions.all { requiredPermission ->
|
||||
authContext.permissions.contains(requiredPermission)
|
||||
}
|
||||
|
||||
if (!hasAllPermissions) {
|
||||
call.respond(
|
||||
HttpStatusCode.Forbidden,
|
||||
"Access denied. Required permissions: ${permissions.joinToString()}"
|
||||
)
|
||||
finish()
|
||||
return@intercept
|
||||
}
|
||||
// Install the permission authorization plugin with the specified permissions
|
||||
route.install(PermissionAuthorizationPlugin) {
|
||||
requiredPermissions.addAll(permissions)
|
||||
}
|
||||
|
||||
route.build()
|
||||
|
||||
@@ -0,0 +1,275 @@
|
||||
package at.mocode.gateway.config
|
||||
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.logging.Logger
|
||||
|
||||
/**
|
||||
* Cache implementation with local caching and Redis integration preparation.
|
||||
* This implementation focuses on local caching with proper expiration and statistics.
|
||||
* Redis integration can be added in a future update.
|
||||
*/
|
||||
class CachingConfig(
|
||||
private val redisHost: String = System.getenv("REDIS_HOST") ?: "localhost",
|
||||
private val redisPort: Int = System.getenv("REDIS_PORT")?.toIntOrNull() ?: 6379,
|
||||
private val defaultTtlMinutes: Long = 10
|
||||
) {
|
||||
private val logger = Logger.getLogger(CachingConfig::class.java.name)
|
||||
|
||||
// Cache entry with expiration time
|
||||
private data class CacheEntry<T>(
|
||||
val value: T,
|
||||
val expiresAt: Long
|
||||
)
|
||||
|
||||
// Cache statistics tracking
|
||||
private data class CacheStats(
|
||||
var hits: Long = 0,
|
||||
var misses: Long = 0,
|
||||
var puts: Long = 0,
|
||||
var evictions: Long = 0
|
||||
)
|
||||
|
||||
// Cache maps for different entity types
|
||||
private val masterDataCache = ConcurrentHashMap<String, CacheEntry<Any>>()
|
||||
private val userCache = ConcurrentHashMap<String, CacheEntry<Any>>()
|
||||
private val personCache = ConcurrentHashMap<String, CacheEntry<Any>>()
|
||||
private val vereinCache = ConcurrentHashMap<String, CacheEntry<Any>>()
|
||||
private val eventCache = ConcurrentHashMap<String, CacheEntry<Any>>()
|
||||
|
||||
// Cache statistics
|
||||
private val cacheStats = ConcurrentHashMap<String, CacheStats>()
|
||||
|
||||
// Scheduler for periodic cleanup and stats reporting
|
||||
private val scheduler = Executors.newScheduledThreadPool(1) { r ->
|
||||
val thread = Thread(r, "cache-maintenance-thread")
|
||||
thread.isDaemon = true
|
||||
thread
|
||||
}
|
||||
|
||||
init {
|
||||
// Schedule periodic cleanup of expired entries
|
||||
scheduler.scheduleAtFixedRate(
|
||||
{ cleanupExpiredEntries() },
|
||||
10, 10, TimeUnit.MINUTES
|
||||
)
|
||||
|
||||
// Schedule periodic stats logging
|
||||
scheduler.scheduleAtFixedRate(
|
||||
{ logCacheStats() },
|
||||
5, 30, TimeUnit.MINUTES
|
||||
)
|
||||
|
||||
logger.info("CachingConfig initialized with Redis host: $redisHost, port: $redisPort")
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a value from cache
|
||||
*/
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
fun <T> get(cacheName: String, key: String): T? {
|
||||
val stats = cacheStats.computeIfAbsent(cacheName) { CacheStats() }
|
||||
|
||||
// Try local cache
|
||||
val localCache = getCacheMap(cacheName)
|
||||
val entry = localCache[key]
|
||||
|
||||
if (entry != null) {
|
||||
// Check if entry is expired
|
||||
if (System.currentTimeMillis() > entry.expiresAt) {
|
||||
localCache.remove(key)
|
||||
stats.evictions++
|
||||
stats.misses++
|
||||
return null
|
||||
}
|
||||
|
||||
stats.hits++
|
||||
return entry.value as T
|
||||
}
|
||||
|
||||
stats.misses++
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Put a value in cache with TTL in minutes
|
||||
*/
|
||||
fun <T> put(cacheName: String, key: String, value: T, ttlMinutes: Long = defaultTtlMinutes) {
|
||||
val stats = cacheStats.computeIfAbsent(cacheName) { CacheStats() }
|
||||
stats.puts++
|
||||
|
||||
// Store in local cache
|
||||
val expiresAt = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(ttlMinutes)
|
||||
val entry = CacheEntry(value as Any, expiresAt)
|
||||
getCacheMap(cacheName)[key] = entry
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a value from cache
|
||||
*/
|
||||
fun remove(cacheName: String, key: String) {
|
||||
// Remove from local cache
|
||||
getCacheMap(cacheName).remove(key)
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear a specific cache
|
||||
*/
|
||||
fun clearCache(cacheName: String) {
|
||||
// Clear local cache
|
||||
getCacheMap(cacheName).clear()
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all caches
|
||||
*/
|
||||
fun clearAllCaches() {
|
||||
// Clear all local caches
|
||||
masterDataCache.clear()
|
||||
userCache.clear()
|
||||
personCache.clear()
|
||||
vereinCache.clear()
|
||||
eventCache.clear()
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the appropriate cache map based on cache name
|
||||
*/
|
||||
private fun getCacheMap(cacheName: String): ConcurrentHashMap<String, CacheEntry<Any>> {
|
||||
return when (cacheName) {
|
||||
MASTER_DATA_CACHE -> masterDataCache
|
||||
USER_CACHE -> userCache
|
||||
PERSON_CACHE -> personCache
|
||||
VEREIN_CACHE -> vereinCache
|
||||
EVENT_CACHE -> eventCache
|
||||
else -> throw IllegalArgumentException("Unknown cache name: $cacheName")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up expired entries from local caches
|
||||
*/
|
||||
private fun cleanupExpiredEntries() {
|
||||
val now = System.currentTimeMillis()
|
||||
var totalRemoved = 0
|
||||
|
||||
// Clean up each cache
|
||||
listOf(masterDataCache, userCache, personCache, vereinCache, eventCache).forEach { cache ->
|
||||
val iterator = cache.entries.iterator()
|
||||
var removed = 0
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
val entry = iterator.next()
|
||||
if (now > entry.value.expiresAt) {
|
||||
iterator.remove()
|
||||
removed++
|
||||
}
|
||||
}
|
||||
|
||||
totalRemoved += removed
|
||||
}
|
||||
|
||||
if (totalRemoved > 0) {
|
||||
logger.info("Cache cleanup completed: removed $totalRemoved expired entries")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Log cache statistics
|
||||
*/
|
||||
private fun logCacheStats() {
|
||||
cacheStats.forEach { (cacheName, stats) ->
|
||||
val hitRatio = if (stats.hits + stats.misses > 0) {
|
||||
stats.hits.toDouble() / (stats.hits + stats.misses)
|
||||
} else {
|
||||
0.0
|
||||
}
|
||||
|
||||
logger.info("Cache stats for $cacheName: hits=${stats.hits}, misses=${stats.misses}, " +
|
||||
"puts=${stats.puts}, evictions=${stats.evictions}, hit-ratio=${String.format("%.2f", hitRatio * 100)}%")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the cache manager and release resources
|
||||
*/
|
||||
fun shutdown() {
|
||||
scheduler.shutdown()
|
||||
try {
|
||||
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
scheduler.shutdownNow()
|
||||
}
|
||||
} catch (e: InterruptedException) {
|
||||
scheduler.shutdownNow()
|
||||
}
|
||||
|
||||
logger.info("CachingConfig shutdown completed")
|
||||
}
|
||||
|
||||
companion object {
|
||||
// Cache names for different entities
|
||||
const val MASTER_DATA_CACHE = "masterDataCache"
|
||||
const val USER_CACHE = "userCache"
|
||||
const val PERSON_CACHE = "personCache"
|
||||
const val VEREIN_CACHE = "vereinCache"
|
||||
const val EVENT_CACHE = "eventCache"
|
||||
|
||||
// List of all cache names
|
||||
val CACHE_NAMES = listOf(
|
||||
MASTER_DATA_CACHE,
|
||||
USER_CACHE,
|
||||
PERSON_CACHE,
|
||||
VEREIN_CACHE,
|
||||
EVENT_CACHE
|
||||
)
|
||||
|
||||
// Default TTLs in minutes
|
||||
const val MASTER_DATA_TTL = 24 * 60L // 24 hours
|
||||
const val USER_TTL = 2 * 60L // 2 hours
|
||||
const val PERSON_TTL = 4 * 60L // 4 hours
|
||||
const val VEREIN_TTL = 12 * 60L // 12 hours
|
||||
const val EVENT_TTL = 6 * 60L // 6 hours
|
||||
|
||||
// AttributeKey for storing in application
|
||||
val CACHING_CONFIG_KEY = AttributeKey<CachingConfig>("CachingConfig")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to install caching in the application.
|
||||
*/
|
||||
fun Application.configureCaching() {
|
||||
val redisHost = environment.config.propertyOrNull("redis.host")?.getString()
|
||||
?: System.getenv("REDIS_HOST")
|
||||
?: "localhost"
|
||||
|
||||
val redisPort = environment.config.propertyOrNull("redis.port")?.getString()?.toIntOrNull()
|
||||
?: System.getenv("REDIS_PORT")?.toIntOrNull()
|
||||
?: 6379
|
||||
|
||||
val cachingConfig = CachingConfig(
|
||||
redisHost = redisHost,
|
||||
redisPort = redisPort
|
||||
)
|
||||
|
||||
// Store the caching config in the application attributes
|
||||
attributes.put(CachingConfig.CACHING_CONFIG_KEY, cachingConfig)
|
||||
|
||||
// Register shutdown hook
|
||||
this.monitor.subscribe(ApplicationStopping) {
|
||||
cachingConfig.shutdown()
|
||||
}
|
||||
|
||||
// Log cache configuration
|
||||
log.info("Cache configuration initialized: Redis host=$redisHost, port=$redisPort")
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to get the caching config from the application.
|
||||
*/
|
||||
fun Application.getCachingConfig(): CachingConfig {
|
||||
return attributes[CachingConfig.CACHING_CONFIG_KEY]
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
package at.mocode.gateway.config
|
||||
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.plugins.*
|
||||
import io.ktor.server.request.*
|
||||
import io.ktor.server.routing.*
|
||||
import io.ktor.util.*
|
||||
import io.micrometer.core.instrument.Counter
|
||||
import io.micrometer.core.instrument.MeterRegistry
|
||||
import io.micrometer.core.instrument.Timer
|
||||
import io.micrometer.core.instrument.binder.MeterBinder
|
||||
import io.micrometer.prometheus.PrometheusMeterRegistry
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Custom application metrics configuration.
|
||||
*
|
||||
* Adds application-specific metrics for better monitoring:
|
||||
* - API endpoint response times
|
||||
* - Request counts by endpoint and status code
|
||||
* - Error rates
|
||||
* - Database query metrics
|
||||
*/
|
||||
|
||||
// Reference to the Prometheus registry from PrometheusConfig
|
||||
private val appRegistry: PrometheusMeterRegistry
|
||||
get() = at.mocode.gateway.config.appMicrometerRegistry
|
||||
|
||||
// Attribute key for request start time
|
||||
private val REQUEST_TIMER_ATTRIBUTE = AttributeKey<Timer.Sample>("RequestTimerSample")
|
||||
|
||||
// Cache for endpoint timers to avoid creating new ones for each request
|
||||
private val endpointTimers = ConcurrentHashMap<String, Timer>()
|
||||
|
||||
// Cache for endpoint counters
|
||||
private val endpointCounters = ConcurrentHashMap<Pair<String, Int>, Counter>()
|
||||
|
||||
// Cache for error counters
|
||||
private val errorCounters = ConcurrentHashMap<String, Counter>()
|
||||
|
||||
/**
|
||||
* Configures custom application metrics.
|
||||
*/
|
||||
fun Application.configureCustomMetrics() {
|
||||
// Install a hook to intercept all requests for timing
|
||||
intercept(ApplicationCallPipeline.Monitoring) {
|
||||
// Start timing the request
|
||||
val timerSample = Timer.start(appRegistry)
|
||||
call.attributes.put(REQUEST_TIMER_ATTRIBUTE, timerSample)
|
||||
}
|
||||
|
||||
// Install a hook to record metrics after the request is processed
|
||||
intercept(ApplicationCallPipeline.Fallback) {
|
||||
val status = call.response.status()?.value ?: 0
|
||||
val method = call.request.httpMethod.value
|
||||
val route = extractRoutePattern(call)
|
||||
|
||||
// Record request count
|
||||
getOrCreateRequestCounter(method, route, status).increment()
|
||||
|
||||
// Record timing
|
||||
call.attributes.getOrNull(REQUEST_TIMER_ATTRIBUTE)?.let { timerSample ->
|
||||
val timer = getOrCreateEndpointTimer(method, route)
|
||||
timerSample.stop(timer)
|
||||
}
|
||||
|
||||
// Record errors
|
||||
if (status >= 400) {
|
||||
getOrCreateErrorCounter(method, route, status).increment()
|
||||
}
|
||||
}
|
||||
|
||||
// Register database metrics
|
||||
registerDatabaseMetrics()
|
||||
|
||||
log.info("Custom application metrics configured")
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts a normalized route pattern from the call.
|
||||
* Converts dynamic path segments to a generic pattern.
|
||||
* For example: /api/users/123 -> /api/users/{id}
|
||||
*/
|
||||
private fun extractRoutePattern(call: ApplicationCall): String {
|
||||
val path = call.request.path()
|
||||
|
||||
// Try to get the route from the call attributes if available
|
||||
call.attributes.getOrNull(AttributeKey<Route>("ktor.request.route"))?.let { route ->
|
||||
return route.toString()
|
||||
}
|
||||
|
||||
// Otherwise, normalize the path by replacing likely IDs with {id}
|
||||
val segments = path.split("/")
|
||||
val normalizedSegments = segments.map { segment ->
|
||||
// If segment looks like an ID (UUID, number), replace with {id}
|
||||
if (segment.matches(Regex("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")) ||
|
||||
segment.matches(Regex("\\d+"))
|
||||
) {
|
||||
"{id}"
|
||||
} else {
|
||||
segment
|
||||
}
|
||||
}
|
||||
|
||||
return normalizedSegments.joinToString("/")
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates a timer for the specified endpoint.
|
||||
*/
|
||||
private fun getOrCreateEndpointTimer(method: String, route: String): Timer {
|
||||
val key = "$method $route"
|
||||
return endpointTimers.computeIfAbsent(key) {
|
||||
Timer.builder("http.server.requests")
|
||||
.tag("method", method)
|
||||
.tag("route", route)
|
||||
.publishPercentileHistogram()
|
||||
.register(appRegistry)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates a counter for the specified endpoint and status.
|
||||
*/
|
||||
private fun getOrCreateRequestCounter(method: String, route: String, status: Int): Counter {
|
||||
val key = Pair("$method $route", status)
|
||||
return endpointCounters.computeIfAbsent(key) {
|
||||
Counter.builder("http.server.requests.count")
|
||||
.tag("method", method)
|
||||
.tag("route", route)
|
||||
.tag("status", status.toString())
|
||||
.register(appRegistry)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates an error counter for the specified endpoint and status.
|
||||
*/
|
||||
private fun getOrCreateErrorCounter(method: String, route: String, status: Int): Counter {
|
||||
val key = "$method $route $status"
|
||||
return errorCounters.computeIfAbsent(key) {
|
||||
Counter.builder("http.server.errors")
|
||||
.tag("method", method)
|
||||
.tag("route", route)
|
||||
.tag("status", status.toString())
|
||||
.register(appRegistry)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers database metrics.
|
||||
*/
|
||||
private fun registerDatabaseMetrics() {
|
||||
// Create a gauge for active connections
|
||||
appRegistry.gauge("db.connections.active",
|
||||
at.mocode.shared.database.DatabaseFactory,
|
||||
{ it.getActiveConnections().toDouble() })
|
||||
|
||||
// Create a gauge for idle connections
|
||||
appRegistry.gauge("db.connections.idle",
|
||||
at.mocode.shared.database.DatabaseFactory,
|
||||
{ it.getIdleConnections().toDouble() })
|
||||
|
||||
// Create a gauge for total connections
|
||||
appRegistry.gauge("db.connections.total",
|
||||
at.mocode.shared.database.DatabaseFactory,
|
||||
{ it.getTotalConnections().toDouble() })
|
||||
}
|
||||
@@ -4,10 +4,19 @@ import at.mocode.dto.base.ApiResponse
|
||||
import at.mocode.shared.config.AppConfig
|
||||
import io.ktor.http.*
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.metrics.micrometer.*
|
||||
import io.ktor.server.plugins.calllogging.*
|
||||
import io.ktor.server.plugins.statuspages.*
|
||||
import io.ktor.server.request.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.server.routing.*
|
||||
import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics
|
||||
import io.micrometer.core.instrument.binder.system.ProcessorMetrics
|
||||
import io.micrometer.prometheus.PrometheusConfig
|
||||
import io.micrometer.prometheus.PrometheusMeterRegistry
|
||||
import org.slf4j.event.Level
|
||||
import java.time.LocalDateTime
|
||||
import java.time.format.DateTimeFormatter
|
||||
@@ -26,30 +35,66 @@ import kotlin.random.Random
|
||||
*/
|
||||
|
||||
// Map to track request counts by path for log sampling
|
||||
private val requestCountsByPath = ConcurrentHashMap<String, AtomicInteger>()
|
||||
// Using a more efficient ConcurrentHashMap with initial capacity and load factor
|
||||
private val requestCountsByPath = ConcurrentHashMap<String, AtomicInteger>(32, 0.75f)
|
||||
|
||||
// Map to track high-traffic paths that are being sampled
|
||||
private val sampledPaths = ConcurrentHashMap<String, Boolean>()
|
||||
private val sampledPaths = ConcurrentHashMap<String, Boolean>(16, 0.75f)
|
||||
|
||||
// Scheduler to reset request counts periodically
|
||||
private val requestCountResetScheduler = Executors.newSingleThreadScheduledExecutor().apply {
|
||||
scheduleAtFixedRate({
|
||||
// Reset all counters every minute
|
||||
requestCountsByPath.clear()
|
||||
private val requestCountResetScheduler = Executors.newSingleThreadScheduledExecutor { r ->
|
||||
val thread = Thread(r, "log-sampling-reset-thread")
|
||||
thread.isDaemon = true // Make it a daemon thread so it doesn't prevent JVM shutdown
|
||||
thread
|
||||
}
|
||||
|
||||
// Log which paths are being sampled
|
||||
if (sampledPaths.isNotEmpty()) {
|
||||
val sampledPathsList = sampledPaths.keys.joinToString(", ")
|
||||
println("[LogSampling] Currently sampling high-traffic paths: $sampledPathsList")
|
||||
// Schedule the task with proper lifecycle management
|
||||
private fun scheduleRequestCountReset() {
|
||||
// Reset counters every 5 minutes instead of every minute to reduce overhead
|
||||
requestCountResetScheduler.scheduleAtFixedRate({
|
||||
try {
|
||||
// Reset all counters
|
||||
requestCountsByPath.clear()
|
||||
|
||||
// Log which paths are being sampled (only if there are any)
|
||||
if (sampledPaths.isNotEmpty()) {
|
||||
// More efficient string building for logging
|
||||
val sampledPathsCount = sampledPaths.size
|
||||
if (sampledPathsCount <= 5) {
|
||||
// For a small number of paths, log them all
|
||||
val sampledPathsList = sampledPaths.keys.joinToString(", ")
|
||||
println("[LogSampling] Currently sampling $sampledPathsCount high-traffic paths: $sampledPathsList")
|
||||
} else {
|
||||
// For many paths, just log the count to avoid excessive logging
|
||||
println("[LogSampling] Currently sampling $sampledPathsCount high-traffic paths")
|
||||
}
|
||||
}
|
||||
|
||||
// Clear sampled paths to re-evaluate in the next period
|
||||
sampledPaths.clear()
|
||||
} catch (e: Exception) {
|
||||
// Catch any exceptions to prevent the scheduler from stopping
|
||||
println("[LogSampling] Error in reset task: ${e.message}")
|
||||
}
|
||||
}, 5, 5, TimeUnit.MINUTES)
|
||||
}
|
||||
|
||||
// Clear sampled paths to re-evaluate in the next period
|
||||
sampledPaths.clear()
|
||||
}, 1, 1, TimeUnit.MINUTES)
|
||||
// Shutdown hook to clean up resources
|
||||
private fun shutdownRequestCountResetScheduler() {
|
||||
requestCountResetScheduler.shutdown()
|
||||
try {
|
||||
if (!requestCountResetScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
requestCountResetScheduler.shutdownNow()
|
||||
}
|
||||
} catch (e: InterruptedException) {
|
||||
requestCountResetScheduler.shutdownNow()
|
||||
Thread.currentThread().interrupt()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if a request should be logged based on sampling configuration.
|
||||
* Optimized for performance with early returns and cached path normalization.
|
||||
*
|
||||
* @param path The request path
|
||||
* @param statusCode The response status code
|
||||
@@ -57,24 +102,38 @@ private val requestCountResetScheduler = Executors.newSingleThreadScheduledExecu
|
||||
* @return True if the request should be logged, false otherwise
|
||||
*/
|
||||
private fun shouldLogRequest(path: String, statusCode: HttpStatusCode?, loggingConfig: at.mocode.shared.config.LoggingConfig): Boolean {
|
||||
// If sampling is disabled, always log
|
||||
// Fast path: If sampling is disabled, always log
|
||||
if (!loggingConfig.enableLogSampling) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Always log errors if configured
|
||||
if (loggingConfig.alwaysLogErrors && statusCode != null && statusCode.value >= 400) {
|
||||
// Fast path: Always log errors if configured
|
||||
if (statusCode != null && statusCode.value >= 400 && loggingConfig.alwaysLogErrors) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Always log specific paths if configured
|
||||
val normalizedPath = path.trimStart('/')
|
||||
if (loggingConfig.alwaysLogPaths.any { normalizedPath.startsWith(it.trimStart('/')) }) {
|
||||
return true
|
||||
// Check if this is a path that should always be logged
|
||||
// Only normalize the path if we have paths to check against
|
||||
if (loggingConfig.alwaysLogPaths.isNotEmpty()) {
|
||||
val normalizedPath = path.trimStart('/')
|
||||
// Use any with early return for better performance
|
||||
for (alwaysLogPath in loggingConfig.alwaysLogPaths) {
|
||||
if (normalizedPath.startsWith(alwaysLogPath.trimStart('/'))) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the base path for traffic counting
|
||||
val basePath = extractBasePath(path)
|
||||
|
||||
// Check if this path is already known to be high-traffic
|
||||
if (sampledPaths.containsKey(basePath)) {
|
||||
// Already identified as high-traffic, apply sampling
|
||||
return Random.nextInt(100) < loggingConfig.samplingRate
|
||||
}
|
||||
|
||||
// Get or create counter for this path
|
||||
val basePath = extractBasePath(path)
|
||||
val counter = requestCountsByPath.computeIfAbsent(basePath) { AtomicInteger(0) }
|
||||
val count = counter.incrementAndGet()
|
||||
|
||||
@@ -114,6 +173,17 @@ private fun extractBasePath(path: String): String {
|
||||
fun Application.configureMonitoring() {
|
||||
val loggingConfig = AppConfig.logging
|
||||
|
||||
// Note: Prometheus metrics configuration has been moved to PrometheusConfig.kt
|
||||
|
||||
// Start the request count reset scheduler
|
||||
scheduleRequestCountReset()
|
||||
|
||||
// Register shutdown hook for application lifecycle
|
||||
this.monitor.subscribe(ApplicationStopPreparing) {
|
||||
log.info("Application stopping, shutting down schedulers...")
|
||||
shutdownRequestCountResetScheduler()
|
||||
}
|
||||
|
||||
// Erweiterte Call-Logging-Konfiguration
|
||||
install(CallLogging) {
|
||||
level = when (loggingConfig.level.uppercase()) {
|
||||
@@ -143,78 +213,97 @@ fun Application.configureMonitoring() {
|
||||
val requestId: String = call.attributes.getOrNull(REQUEST_ID_KEY) ?: "no-request-id"
|
||||
|
||||
if (loggingConfig.useStructuredLogging) {
|
||||
// Strukturiertes Logging-Format
|
||||
buildString {
|
||||
append("timestamp=$timestamp ")
|
||||
append("method=$httpMethod ")
|
||||
append("path=$path ")
|
||||
append("status=$status ")
|
||||
append("client=$clientIp ")
|
||||
append("requestId=$requestId ")
|
||||
// Optimized structured logging format using StringBuilder with initial capacity
|
||||
// Estimate the initial capacity based on typical log entry size
|
||||
val initialCapacity = 256 +
|
||||
(if (loggingConfig.logRequestHeaders) 128 else 0) +
|
||||
(if (loggingConfig.logRequestParameters) 128 else 0)
|
||||
|
||||
// Log Headers wenn konfiguriert
|
||||
if (loggingConfig.logRequestHeaders) {
|
||||
val authHeader = call.request.headers["Authorization"]
|
||||
if (authHeader != null) {
|
||||
append("auth=true ")
|
||||
}
|
||||
val sb = StringBuilder(initialCapacity)
|
||||
|
||||
val contentType = call.request.headers["Content-Type"]
|
||||
if (contentType != null) {
|
||||
append("contentType=$contentType ")
|
||||
}
|
||||
// Basic request information - always included
|
||||
sb.append("timestamp=").append(timestamp).append(' ')
|
||||
.append("method=").append(httpMethod).append(' ')
|
||||
.append("path=").append(path).append(' ')
|
||||
.append("status=").append(status).append(' ')
|
||||
.append("client=").append(clientIp).append(' ')
|
||||
.append("requestId=").append(requestId).append(' ')
|
||||
|
||||
// Log all headers if in debug mode, filtering sensitive data
|
||||
if (loggingConfig.level.equals("DEBUG", ignoreCase = true)) {
|
||||
append("headers={")
|
||||
call.request.headers.entries().joinTo(this, ", ") { entry ->
|
||||
if (isSensitiveHeader(entry.key)) {
|
||||
"${entry.key}=*****"
|
||||
} else {
|
||||
"${entry.key}=${entry.value.joinToString(",")}"
|
||||
}
|
||||
}
|
||||
append("} ")
|
||||
}
|
||||
// Log Headers wenn konfiguriert
|
||||
if (loggingConfig.logRequestHeaders) {
|
||||
val authHeader = call.request.headers["Authorization"]
|
||||
if (authHeader != null) {
|
||||
sb.append("auth=true ")
|
||||
}
|
||||
|
||||
// Log Query-Parameter wenn konfiguriert
|
||||
if (loggingConfig.logRequestParameters && call.request.queryParameters.entries().isNotEmpty()) {
|
||||
append("params={")
|
||||
call.request.queryParameters.entries().joinTo(this, ", ") { entry ->
|
||||
if (isSensitiveParameter(entry.key)) {
|
||||
"${entry.key}=*****"
|
||||
val contentType = call.request.headers["Content-Type"]
|
||||
if (contentType != null) {
|
||||
sb.append("contentType=").append(contentType).append(' ')
|
||||
}
|
||||
|
||||
// Log all headers if in debug mode, filtering sensitive data
|
||||
if (loggingConfig.level.equals("DEBUG", ignoreCase = true)) {
|
||||
sb.append("headers={")
|
||||
var first = true
|
||||
for (entry in call.request.headers.entries()) {
|
||||
if (!first) sb.append(", ")
|
||||
first = false
|
||||
|
||||
if (isSensitiveHeader(entry.key)) {
|
||||
sb.append(entry.key).append("=*****")
|
||||
} else {
|
||||
"${entry.key}=${entry.value.joinToString(",")}"
|
||||
sb.append(entry.key).append('=').append(entry.value.joinToString(","))
|
||||
}
|
||||
}
|
||||
append("} ")
|
||||
sb.append("} ")
|
||||
}
|
||||
}
|
||||
|
||||
if (userAgent != null) {
|
||||
// Use a simpler approach to avoid escape sequence issues
|
||||
val escapedUserAgent = userAgent.replace("\"", "\\\"")
|
||||
append("userAgent=\"$escapedUserAgent\" ")
|
||||
// Log Query-Parameter wenn konfiguriert
|
||||
if (loggingConfig.logRequestParameters && call.request.queryParameters.entries().isNotEmpty()) {
|
||||
sb.append("params={")
|
||||
var first = true
|
||||
for (entry in call.request.queryParameters.entries()) {
|
||||
if (!first) sb.append(", ")
|
||||
first = false
|
||||
|
||||
if (isSensitiveParameter(entry.key)) {
|
||||
sb.append(entry.key).append("=*****")
|
||||
} else {
|
||||
sb.append(entry.key).append('=').append(entry.value.joinToString(","))
|
||||
}
|
||||
}
|
||||
sb.append("} ")
|
||||
}
|
||||
|
||||
// Log response time if available from RequestTracingConfig
|
||||
call.attributes.getOrNull(REQUEST_START_TIME_KEY)?.let { startTime: Long ->
|
||||
val duration = System.currentTimeMillis() - startTime
|
||||
append("duration=${duration}ms ")
|
||||
}
|
||||
if (userAgent != null) {
|
||||
// Use a simpler approach to avoid escape sequence issues
|
||||
val escapedUserAgent = userAgent.replace("\"", "\\\"")
|
||||
sb.append("userAgent=\"").append(escapedUserAgent).append("\" ")
|
||||
}
|
||||
|
||||
// Add performance metrics
|
||||
// Log response time if available from RequestTracingConfig
|
||||
call.attributes.getOrNull(REQUEST_START_TIME_KEY)?.let { startTime: Long ->
|
||||
val duration = System.currentTimeMillis() - startTime
|
||||
sb.append("duration=").append(duration).append("ms ")
|
||||
}
|
||||
|
||||
// Add performance metrics - only calculate memory usage if needed
|
||||
// Only include memory metrics in every 10th log entry to reduce overhead
|
||||
if (Random.nextInt(10) == 0) {
|
||||
val memoryUsage = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()
|
||||
append("memoryUsage=${memoryUsage}b ")
|
||||
sb.append("memoryUsage=").append(memoryUsage).append("b ")
|
||||
|
||||
// Add additional performance metrics in debug mode
|
||||
if (loggingConfig.level.equals("DEBUG", ignoreCase = true)) {
|
||||
val availableProcessors = Runtime.getRuntime().availableProcessors()
|
||||
val maxMemory = Runtime.getRuntime().maxMemory()
|
||||
append("processors=$availableProcessors ")
|
||||
append("maxMemory=${maxMemory}b ")
|
||||
sb.append("processors=").append(availableProcessors).append(' ')
|
||||
.append("maxMemory=").append(maxMemory).append("b ")
|
||||
}
|
||||
}
|
||||
|
||||
sb.toString()
|
||||
} else {
|
||||
// Einfaches Logging-Format
|
||||
val duration = call.attributes.getOrNull(REQUEST_START_TIME_KEY)?.let { startTime: Long ->
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package at.mocode.gateway.config
|
||||
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.metrics.micrometer.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.server.routing.*
|
||||
import io.ktor.server.auth.*
|
||||
import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics
|
||||
import io.micrometer.core.instrument.binder.system.ProcessorMetrics
|
||||
import io.micrometer.prometheus.PrometheusConfig
|
||||
import io.micrometer.prometheus.PrometheusMeterRegistry
|
||||
|
||||
/**
|
||||
* Prometheus metrics configuration for the API Gateway.
|
||||
*
|
||||
* Configures Micrometer with Prometheus registry and exposes a metrics endpoint.
|
||||
*/
|
||||
|
||||
// Create a Prometheus registry
|
||||
val appMicrometerRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
|
||||
|
||||
/**
|
||||
* Configures Prometheus metrics for the application.
|
||||
*/
|
||||
fun Application.configurePrometheusMetrics() {
|
||||
// Install Micrometer metrics
|
||||
install(MicrometerMetrics) {
|
||||
registry = appMicrometerRegistry
|
||||
// JVM metrics
|
||||
meterBinders = listOf(
|
||||
JvmMemoryMetrics(),
|
||||
JvmGcMetrics(),
|
||||
JvmThreadMetrics(),
|
||||
ClassLoaderMetrics(),
|
||||
ProcessorMetrics()
|
||||
)
|
||||
}
|
||||
|
||||
// Add a route to expose Prometheus metrics with basic authentication
|
||||
routing {
|
||||
// Secure metrics endpoint with basic authentication
|
||||
authenticate("metrics-auth") {
|
||||
get("/metrics") {
|
||||
call.respond(appMicrometerRegistry.scrape())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Prometheus metrics configured and secured at /metrics endpoint")
|
||||
}
|
||||
@@ -58,6 +58,22 @@ fun Application.configureSecurity() {
|
||||
call.respond(HttpStatusCode.Unauthorized, "Token is not valid or has expired")
|
||||
}
|
||||
}
|
||||
|
||||
// Basic authentication for metrics endpoint
|
||||
basic("metrics-auth") {
|
||||
realm = "Metrics"
|
||||
validate { credentials ->
|
||||
// Get credentials from environment variables or use defaults
|
||||
val metricsUser = System.getenv("METRICS_USER") ?: "metrics"
|
||||
val metricsPassword = System.getenv("METRICS_PASSWORD") ?: "metrics-password-change-in-production"
|
||||
|
||||
if (credentials.name == metricsUser && credentials.password == metricsPassword) {
|
||||
UserIdPrincipal(credentials.name)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,181 @@
|
||||
package at.mocode.gateway.discovery
|
||||
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.engine.cio.*
|
||||
import io.ktor.client.plugins.contentnegotiation.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.statement.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.serialization.kotlinx.json.*
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.json.Json
|
||||
import java.net.URI
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
/**
|
||||
* Service discovery component for the API Gateway.
|
||||
* Uses Consul to discover services and route requests to them.
|
||||
*/
|
||||
class ServiceDiscovery(
|
||||
private val consulHost: String = "consul",
|
||||
private val consulPort: Int = 8500
|
||||
) {
|
||||
private val httpClient = HttpClient(CIO) {
|
||||
install(ContentNegotiation) {
|
||||
json(Json {
|
||||
ignoreUnknownKeys = true
|
||||
isLenient = true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Cache of service instances
|
||||
private val serviceCache = ConcurrentHashMap<String, List<ServiceInstance>>()
|
||||
private val cacheMutex = Mutex()
|
||||
|
||||
// Default TTL for cache entries in milliseconds (30 seconds)
|
||||
private val cacheTtl = 30_000L
|
||||
private val cacheTimestamps = ConcurrentHashMap<String, Long>()
|
||||
|
||||
/**
|
||||
* Get a service instance for the given service name.
|
||||
* Uses a simple round-robin load balancing strategy.
|
||||
*
|
||||
* @param serviceName The name of the service to get an instance for
|
||||
* @return A service instance, or null if no instances are available
|
||||
*/
|
||||
suspend fun getServiceInstance(serviceName: String): ServiceInstance? {
|
||||
val instances = getServiceInstances(serviceName)
|
||||
if (instances.isEmpty()) {
|
||||
return null
|
||||
}
|
||||
|
||||
// Simple round-robin load balancing
|
||||
val index = (System.currentTimeMillis() % instances.size).toInt()
|
||||
return instances[index]
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all instances of a service.
|
||||
*
|
||||
* @param serviceName The name of the service to get instances for
|
||||
* @return A list of service instances
|
||||
*/
|
||||
suspend fun getServiceInstances(serviceName: String): List<ServiceInstance> {
|
||||
// Check cache first
|
||||
val cachedInstances = serviceCache[serviceName]
|
||||
val timestamp = cacheTimestamps[serviceName] ?: 0
|
||||
|
||||
if (cachedInstances != null && System.currentTimeMillis() - timestamp < cacheTtl) {
|
||||
return cachedInstances
|
||||
}
|
||||
|
||||
// Cache miss or expired, fetch from Consul
|
||||
return cacheMutex.withLock {
|
||||
// Double-check in case another thread updated the cache while we were waiting
|
||||
val currentTimestamp = cacheTimestamps[serviceName] ?: 0
|
||||
if (serviceCache[serviceName] != null && System.currentTimeMillis() - currentTimestamp < cacheTtl) {
|
||||
return@withLock serviceCache[serviceName]!!
|
||||
}
|
||||
|
||||
try {
|
||||
val instances = fetchServiceInstances(serviceName)
|
||||
serviceCache[serviceName] = instances
|
||||
cacheTimestamps[serviceName] = System.currentTimeMillis()
|
||||
instances
|
||||
} catch (e: Exception) {
|
||||
println("Failed to fetch service instances for $serviceName: ${e.message}")
|
||||
e.printStackTrace()
|
||||
|
||||
// Return cached instances if available, even if expired
|
||||
cachedInstances ?: emptyList()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch service instances from Consul.
|
||||
*
|
||||
* @param serviceName The name of the service to fetch instances for
|
||||
* @return A list of service instances
|
||||
*/
|
||||
private suspend fun fetchServiceInstances(serviceName: String): List<ServiceInstance> {
|
||||
val response = httpClient.get("http://$consulHost:$consulPort/v1/catalog/service/$serviceName")
|
||||
|
||||
if (response.status != HttpStatusCode.OK) {
|
||||
throw Exception("Failed to fetch service instances: ${response.status}")
|
||||
}
|
||||
|
||||
val responseBody = response.bodyAsText()
|
||||
val consulServices = Json.decodeFromString<List<ConsulService>>(responseBody)
|
||||
|
||||
return consulServices.map { service ->
|
||||
ServiceInstance(
|
||||
id = service.ServiceID,
|
||||
name = service.ServiceName,
|
||||
host = service.ServiceAddress.ifEmpty { service.Address },
|
||||
port = service.ServicePort,
|
||||
tags = service.ServiceTags,
|
||||
meta = service.ServiceMeta
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a URL for a service instance.
|
||||
*
|
||||
* @param instance The service instance
|
||||
* @param path The path to append to the URL
|
||||
* @return The complete URL
|
||||
*/
|
||||
fun buildServiceUrl(instance: ServiceInstance, path: String): String {
|
||||
val baseUrl = "http://${instance.host}:${instance.port}"
|
||||
return URI(baseUrl).resolve(path).toString()
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a service is healthy.
|
||||
*
|
||||
* @param serviceName The name of the service to check
|
||||
* @return True if the service is healthy, false otherwise
|
||||
*/
|
||||
suspend fun isServiceHealthy(serviceName: String): Boolean {
|
||||
try {
|
||||
val response = httpClient.get("http://$consulHost:$consulPort/v1/health/service/$serviceName?passing=true")
|
||||
val responseBody = response.bodyAsText()
|
||||
val healthyServices = Json.decodeFromString<List<Any>>(responseBody)
|
||||
return healthyServices.isNotEmpty()
|
||||
} catch (e: Exception) {
|
||||
println("Failed to check service health for $serviceName: ${e.message}")
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a service instance.
|
||||
*/
|
||||
data class ServiceInstance(
|
||||
val id: String,
|
||||
val name: String,
|
||||
val host: String,
|
||||
val port: Int,
|
||||
val tags: List<String> = emptyList(),
|
||||
val meta: Map<String, String> = emptyMap()
|
||||
)
|
||||
|
||||
/**
|
||||
* Consul service response model.
|
||||
*/
|
||||
@Serializable
|
||||
data class ConsulService(
|
||||
val ServiceID: String,
|
||||
val ServiceName: String,
|
||||
val ServiceAddress: String,
|
||||
val ServicePort: Int,
|
||||
val ServiceTags: List<String> = emptyList(),
|
||||
val ServiceMeta: Map<String, String> = emptyMap(),
|
||||
val Address: String
|
||||
)
|
||||
@@ -1,7 +1,11 @@
|
||||
package at.mocode.gateway
|
||||
|
||||
import at.mocode.gateway.config.*
|
||||
import at.mocode.gateway.config.configurePrometheusMetrics
|
||||
import at.mocode.gateway.config.configureCustomMetrics
|
||||
import at.mocode.gateway.plugins.configureHttpCaching
|
||||
import at.mocode.gateway.routing.docRoutes
|
||||
import at.mocode.gateway.routing.serviceRoutes
|
||||
import at.mocode.shared.config.AppConfig
|
||||
import io.ktor.http.*
|
||||
import io.ktor.serialization.kotlinx.json.*
|
||||
@@ -43,6 +47,12 @@ fun Application.module() {
|
||||
// Erweiterte Monitoring- und Logging-Konfiguration
|
||||
configureMonitoring()
|
||||
|
||||
// Prometheus Metrics konfigurieren
|
||||
configurePrometheusMetrics()
|
||||
|
||||
// Custom application metrics konfigurieren
|
||||
configureCustomMetrics()
|
||||
|
||||
// Request Tracing für Cross-Service Tracing konfigurieren
|
||||
configureRequestTracing()
|
||||
|
||||
@@ -53,6 +63,9 @@ fun Application.module() {
|
||||
configureOpenApi()
|
||||
configureSwagger()
|
||||
|
||||
// HTTP Caching konfigurieren
|
||||
configureHttpCaching()
|
||||
|
||||
routing {
|
||||
// Hauptrouten
|
||||
get("/") {
|
||||
@@ -62,6 +75,18 @@ fun Application.module() {
|
||||
)
|
||||
}
|
||||
|
||||
// Health check endpoint
|
||||
get("/health") {
|
||||
call.respond(HttpStatusCode.OK, mapOf(
|
||||
"status" to "UP",
|
||||
"timestamp" to System.currentTimeMillis(),
|
||||
"services" to mapOf(
|
||||
"api-gateway" to "UP",
|
||||
"database" to "UP"
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
// Static resources for documentation
|
||||
staticResources("/docs", "static/docs") {
|
||||
default("index.html")
|
||||
@@ -69,5 +94,8 @@ fun Application.module() {
|
||||
|
||||
// API Documentation routes
|
||||
docRoutes()
|
||||
|
||||
// Service discovery routes
|
||||
serviceRoutes()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,243 @@
|
||||
package at.mocode.gateway.plugins
|
||||
|
||||
import at.mocode.gateway.config.CachingConfig
|
||||
import at.mocode.gateway.config.getCachingConfig
|
||||
import io.ktor.http.*
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.request.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.util.pipeline.*
|
||||
import java.security.MessageDigest
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.*
|
||||
import kotlin.text.Charsets
|
||||
|
||||
/**
|
||||
* Configures enhanced HTTP caching headers for the application.
|
||||
* This adds Cache-Control, Expires, and Vary headers to responses.
|
||||
* It also integrates with the CachingConfig for more intelligent caching decisions.
|
||||
*/
|
||||
fun Application.configureHttpCaching() {
|
||||
// Get the application logger
|
||||
val logger = log
|
||||
|
||||
// Get the caching config
|
||||
val cachingConfig = try {
|
||||
getCachingConfig()
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Failed to get CachingConfig, using default caching headers: ${e.message}")
|
||||
null
|
||||
}
|
||||
|
||||
// Add a response interceptor for setting cache headers
|
||||
intercept(ApplicationCallPipeline.Call) {
|
||||
// Add Vary header to all responses
|
||||
call.response.header(HttpHeaders.Vary, "Accept, Accept-Encoding")
|
||||
|
||||
// For authenticated endpoints, add Authorization to Vary
|
||||
if (call.request.headers.contains(HttpHeaders.Authorization)) {
|
||||
call.response.header(HttpHeaders.Vary, "Accept, Accept-Encoding, Authorization")
|
||||
}
|
||||
|
||||
// Set default no-cache headers for dynamic content
|
||||
call.response.header(HttpHeaders.CacheControl, "no-cache, private")
|
||||
|
||||
// Check for conditional requests (If-None-Match, If-Modified-Since)
|
||||
val requestETag = call.request.header(HttpHeaders.IfNoneMatch)
|
||||
val requestLastModified = call.request.header(HttpHeaders.IfModifiedSince)
|
||||
|
||||
// If we have conditional headers, check if we can return 304 Not Modified
|
||||
if (requestETag != null || requestLastModified != null) {
|
||||
// This would be implemented with actual ETag and Last-Modified checking
|
||||
// For now, we just log that we received conditional headers
|
||||
logger.debug("Received conditional request: ETag=$requestETag, Last-Modified=$requestLastModified")
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("HTTP caching configured with integration to CachingConfig")
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to enable caching for static resources.
|
||||
* Use this for CSS, JS, images, and other static files.
|
||||
*/
|
||||
fun ApplicationCall.enableStaticResourceCaching(maxAgeSeconds: Int = 86400) { // Default: 1 day
|
||||
setCacheControlHeader(this, maxAgeSeconds, true)
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to enable caching for master data.
|
||||
* Use this for reference data that changes infrequently.
|
||||
*/
|
||||
fun ApplicationCall.enableMasterDataCaching(maxAgeSeconds: Int = 3600) { // Default: 1 hour
|
||||
setCacheControlHeader(this, maxAgeSeconds, true)
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to enable caching for user data.
|
||||
* Use this for user-specific data that may change frequently.
|
||||
*/
|
||||
fun ApplicationCall.enableUserDataCaching(maxAgeSeconds: Int = 60) { // Default: 1 minute
|
||||
setCacheControlHeader(this, maxAgeSeconds, false, true)
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to disable caching.
|
||||
* Use this for sensitive or frequently changing data.
|
||||
*/
|
||||
fun ApplicationCall.disableCaching() {
|
||||
response.header(HttpHeaders.CacheControl, "no-cache, no-store, must-revalidate, private")
|
||||
response.header(HttpHeaders.Pragma, "no-cache")
|
||||
response.header(HttpHeaders.Expires, "0")
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to set Cache-Control and Expires headers.
|
||||
*/
|
||||
private fun setCacheControlHeader(
|
||||
call: ApplicationCall,
|
||||
maxAgeSeconds: Int,
|
||||
isPublic: Boolean,
|
||||
mustRevalidate: Boolean = false
|
||||
) {
|
||||
// Build Cache-Control header
|
||||
val visibility = if (isPublic) "public" else "private"
|
||||
val revalidate = if (mustRevalidate) ", must-revalidate" else ""
|
||||
call.response.header(
|
||||
HttpHeaders.CacheControl,
|
||||
"max-age=$maxAgeSeconds, $visibility$revalidate"
|
||||
)
|
||||
|
||||
// Set Expires header
|
||||
val calendar = Calendar.getInstance()
|
||||
calendar.add(Calendar.SECOND, maxAgeSeconds)
|
||||
val dateFormat = SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US)
|
||||
dateFormat.timeZone = TimeZone.getTimeZone("GMT")
|
||||
call.response.header(HttpHeaders.Expires, dateFormat.format(calendar.time))
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to set ETag header for a response.
|
||||
*/
|
||||
fun ApplicationCall.setETag(etag: String) {
|
||||
response.header(HttpHeaders.ETag, "\"$etag\"")
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to set Last-Modified header for a response.
|
||||
*/
|
||||
fun ApplicationCall.setLastModified(timestamp: Long) {
|
||||
val dateFormat = SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US)
|
||||
dateFormat.timeZone = TimeZone.getTimeZone("GMT")
|
||||
response.header(HttpHeaders.LastModified, dateFormat.format(Date(timestamp)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate an ETag for the given content.
|
||||
* This uses MD5 hashing for simplicity, but in production you might want to use a faster algorithm.
|
||||
*/
|
||||
fun generateETag(content: String): String {
|
||||
val md = MessageDigest.getInstance("MD5")
|
||||
val digest = md.digest(content.toByteArray(Charsets.UTF_8))
|
||||
return digest.joinToString("") { "%02x".format(it) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate an ETag for the given object by converting it to a string representation.
|
||||
*/
|
||||
fun generateETag(obj: Any): String {
|
||||
return generateETag(obj.toString())
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the request has a matching ETag and return 304 Not Modified if it does.
|
||||
* Returns true if the response was handled (304 sent), false otherwise.
|
||||
*/
|
||||
suspend fun PipelineContext<Unit, ApplicationCall>.checkETagAndRespond(etag: String): Boolean {
|
||||
val requestETag = call.request.header(HttpHeaders.IfNoneMatch)
|
||||
|
||||
// If the client sent an If-None-Match header and it matches our ETag,
|
||||
// we can return 304 Not Modified
|
||||
if (requestETag != null && (requestETag == "\"$etag\"" || requestETag == "*")) {
|
||||
call.response.header(HttpHeaders.ETag, "\"$etag\"")
|
||||
call.respond(HttpStatusCode.NotModified)
|
||||
return true
|
||||
}
|
||||
|
||||
// Set the ETag header for the response
|
||||
call.response.header(HttpHeaders.ETag, "\"$etag\"")
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the request has a matching Last-Modified date and return 304 Not Modified if it does.
|
||||
* Returns true if the response was handled (304 sent), false otherwise.
|
||||
*/
|
||||
suspend fun PipelineContext<Unit, ApplicationCall>.checkLastModifiedAndRespond(timestamp: Long): Boolean {
|
||||
val requestLastModified = call.request.header(HttpHeaders.IfModifiedSince)
|
||||
|
||||
if (requestLastModified != null) {
|
||||
try {
|
||||
val dateFormat = SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US)
|
||||
dateFormat.timeZone = TimeZone.getTimeZone("GMT")
|
||||
val requestDate = dateFormat.parse(requestLastModified).time
|
||||
|
||||
// If the resource hasn't been modified since the date in the request,
|
||||
// we can return 304 Not Modified
|
||||
if (timestamp <= requestDate) {
|
||||
val lastModifiedFormatted = dateFormat.format(Date(timestamp))
|
||||
call.response.header(HttpHeaders.LastModified, lastModifiedFormatted)
|
||||
call.respond(HttpStatusCode.NotModified)
|
||||
return true
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
// If we can't parse the date, ignore it
|
||||
}
|
||||
}
|
||||
|
||||
// Set the Last-Modified header for the response
|
||||
val dateFormat = SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US)
|
||||
dateFormat.timeZone = TimeZone.getTimeZone("GMT")
|
||||
call.response.header(HttpHeaders.LastModified, dateFormat.format(Date(timestamp)))
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to check if a resource is cached in CachingConfig.
|
||||
* If it is, and the client has a matching ETag or Last-Modified date,
|
||||
* this will return 304 Not Modified. Otherwise, it will return the cached value.
|
||||
* Returns true if the response was handled, false otherwise.
|
||||
*/
|
||||
suspend fun <T> PipelineContext<Unit, ApplicationCall>.checkCacheAndRespond(
|
||||
cacheName: String,
|
||||
key: String,
|
||||
etag: String? = null,
|
||||
lastModified: Long? = null
|
||||
): Boolean {
|
||||
val application = call.application
|
||||
val cachingConfig = try {
|
||||
application.getCachingConfig()
|
||||
} catch (e: Exception) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if the resource is in the cache
|
||||
val cachedValue = cachingConfig.get<T>(cacheName, key)
|
||||
if (cachedValue != null) {
|
||||
// If we have an ETag, check if the client has a matching one
|
||||
if (etag != null && checkETagAndRespond(etag)) {
|
||||
return true
|
||||
}
|
||||
|
||||
// If we have a Last-Modified date, check if the client has a matching one
|
||||
if (lastModified != null && checkLastModifiedAndRespond(lastModified)) {
|
||||
return true
|
||||
}
|
||||
|
||||
// If we get here, the client doesn't have a matching ETag or Last-Modified date,
|
||||
// so we need to send the full response
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
@@ -0,0 +1,96 @@
|
||||
package at.mocode.gateway.routing
|
||||
|
||||
import at.mocode.gateway.discovery.ServiceDiscovery
|
||||
import at.mocode.shared.config.AppConfig
|
||||
import io.ktor.http.*
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.server.routing.*
|
||||
|
||||
/**
|
||||
* Configure dynamic service routing using Consul service discovery.
|
||||
* This allows the API Gateway to discover services registered with Consul and route requests to them.
|
||||
*/
|
||||
fun Routing.serviceRoutes() {
|
||||
val config = AppConfig
|
||||
|
||||
// Initialize service discovery if enabled
|
||||
val serviceDiscovery = if (config.serviceDiscovery.enabled) {
|
||||
ServiceDiscovery(
|
||||
consulHost = config.serviceDiscovery.consulHost,
|
||||
consulPort = config.serviceDiscovery.consulPort
|
||||
)
|
||||
} else null
|
||||
|
||||
// Define service routes
|
||||
if (serviceDiscovery != null) {
|
||||
// Master Data Service Routes
|
||||
route("/api/masterdata") {
|
||||
handle {
|
||||
handleServiceRequest(call, "master-data", serviceDiscovery)
|
||||
}
|
||||
}
|
||||
|
||||
// Horse Registry Service Routes
|
||||
route("/api/horses") {
|
||||
handle {
|
||||
handleServiceRequest(call, "horse-registry", serviceDiscovery)
|
||||
}
|
||||
}
|
||||
|
||||
// Event Management Service Routes
|
||||
route("/api/events") {
|
||||
handle {
|
||||
handleServiceRequest(call, "event-management", serviceDiscovery)
|
||||
}
|
||||
}
|
||||
|
||||
// Member Management Service Routes
|
||||
route("/api/members") {
|
||||
handle {
|
||||
handleServiceRequest(call, "member-management", serviceDiscovery)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a service request by discovering the service and forwarding the request.
|
||||
* This is a simplified implementation that just returns service information.
|
||||
* In a production environment, this would forward the request to the service.
|
||||
*/
|
||||
private suspend fun handleServiceRequest(
|
||||
call: ApplicationCall,
|
||||
serviceName: String,
|
||||
serviceDiscovery: ServiceDiscovery
|
||||
) {
|
||||
try {
|
||||
// Get service instance
|
||||
val serviceInstance = serviceDiscovery.getServiceInstance(serviceName)
|
||||
|
||||
if (serviceInstance == null) {
|
||||
call.respond(HttpStatusCode.ServiceUnavailable, "Service $serviceName is not available")
|
||||
return
|
||||
}
|
||||
|
||||
// Respond with service information
|
||||
call.respond(
|
||||
HttpStatusCode.OK,
|
||||
mapOf(
|
||||
"message" to "Service discovery working",
|
||||
"service" to serviceName,
|
||||
"instance" to mapOf(
|
||||
"id" to serviceInstance.id,
|
||||
"name" to serviceInstance.name,
|
||||
"host" to serviceInstance.host,
|
||||
"port" to serviceInstance.port
|
||||
)
|
||||
)
|
||||
)
|
||||
} catch (e: Exception) {
|
||||
call.respond(
|
||||
HttpStatusCode.InternalServerError,
|
||||
"Error routing request to service $serviceName: ${e.message}"
|
||||
)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user