(fix) Umbau zu SCS

### API-Gateway erweitern
- Bestehenden API-Gateway-Service mit zusätzlichen Funktionen ausstatten:
    - Rate Limiting implementieren
    - Request/Response Logging verbessern
    - Cross-Service Tracing mit eindeutigen Request-IDs einführen
This commit is contained in:
stefan
2025-07-21 17:17:40 +02:00
parent 44ad8faad6
commit f8eade8091
7 changed files with 1381 additions and 48 deletions
@@ -0,0 +1,177 @@
package at.mocode.gateway.config
import at.mocode.shared.config.AppConfig
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.plugins.calllogging.*
import io.ktor.server.request.*
import io.ktor.util.*
import org.slf4j.LoggerFactory
import org.slf4j.event.Level
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random
/**
* Configuration for log sampling in the API Gateway.
*
* This configuration adds support for:
* - Sampling logs for high-traffic endpoints to reduce log volume
* - Configurable sampling rate and thresholds
* - Always logging errors and specific paths regardless of sampling
* - Periodic reset of request counters
*/
// Logger for log sampling
private val logger = LoggerFactory.getLogger("LogSampling")
// Map to track request counts by path for log sampling
private val requestCountsByPath = ConcurrentHashMap<String, AtomicInteger>()
// Map to track high-traffic paths that are being sampled
private val sampledPaths = ConcurrentHashMap<String, Boolean>()
// Attribute key for storing whether a request should be logged
val SHOULD_LOG_REQUEST_KEY = AttributeKey<Boolean>("ShouldLogRequest")
// Scheduler to reset request counts periodically
private val requestCountResetScheduler = Executors.newSingleThreadScheduledExecutor().apply {
scheduleAtFixedRate({
try {
// Reset all counters every minute
requestCountsByPath.clear()
// Log which paths are being sampled
if (sampledPaths.isNotEmpty()) {
val sampledPathsList = sampledPaths.keys.joinToString(", ")
logger.info("Currently sampling high-traffic paths: $sampledPathsList")
}
// Clear sampled paths to re-evaluate in the next period
sampledPaths.clear()
} catch (e: Exception) {
logger.error("Error in request count reset scheduler", e)
}
}, 1, 1, TimeUnit.MINUTES)
}
/**
* Configures log sampling for the API Gateway.
*/
fun Application.configureLogSampling() {
val loggingConfig = AppConfig.logging
// Log configuration information
if (loggingConfig.enableLogSampling) {
log.info("Log sampling ENABLED with rate: ${loggingConfig.samplingRate}%")
log.info("High traffic threshold: ${loggingConfig.highTrafficThreshold} requests per minute")
log.info("Always log paths: ${loggingConfig.alwaysLogPaths.joinToString(", ")}")
log.info("Always log errors: ${loggingConfig.alwaysLogErrors}")
} else {
log.info("Log sampling DISABLED")
return
}
// Install interceptor to apply log sampling logic
intercept(ApplicationCallPipeline.Monitoring) {
val path = call.request.path()
// Determine if this request should be logged
val shouldLog = shouldLogRequest(path, null, loggingConfig)
// Store the decision in call attributes for later use
call.attributes.put(SHOULD_LOG_REQUEST_KEY, shouldLog)
// Continue processing the request
proceed()
// Update the decision based on the response status (for error logging)
if (!shouldLog && loggingConfig.alwaysLogErrors) {
val status = call.response.status()
if (status != null && status.value >= 400) {
call.attributes.put(SHOULD_LOG_REQUEST_KEY, true)
}
}
}
// Modify the CallLogging plugin to respect the sampling decision
environment.monitor.subscribe(CallLogging.LoggingConfig) { loggingConfig ->
// Add a filter to the CallLogging plugin
loggingConfig.filter { call ->
// Check if the request should be logged based on sampling
val shouldLog = call.attributes.getOrNull(SHOULD_LOG_REQUEST_KEY) ?: true
// Apply the original filter as well (exclude paths)
val originalFilter = !AppConfig.logging.excludePaths.any { call.request.path().startsWith(it) }
// Only log if both filters pass
shouldLog && originalFilter
}
}
}
/**
* Determines if a request should be logged based on sampling configuration.
*
* @param path The request path
* @param statusCode The response status code (null for request phase)
* @param loggingConfig The logging configuration
* @return True if the request should be logged, false otherwise
*/
private fun shouldLogRequest(path: String, statusCode: HttpStatusCode?, loggingConfig: at.mocode.shared.config.LoggingConfig): Boolean {
// If sampling is disabled, always log
if (!loggingConfig.enableLogSampling) {
return true
}
// Always log errors if configured
if (loggingConfig.alwaysLogErrors && statusCode != null && statusCode.value >= 400) {
return true
}
// Always log specific paths if configured
val normalizedPath = path.trimStart('/')
if (loggingConfig.alwaysLogPaths.any { normalizedPath.startsWith(it.trimStart('/')) }) {
return true
}
// Get or create counter for this path
val basePath = extractBasePath(path)
val counter = requestCountsByPath.computeIfAbsent(basePath) { AtomicInteger(0) }
val count = counter.incrementAndGet()
// Check if this is a high-traffic path
if (count >= loggingConfig.highTrafficThreshold) {
// Mark this path as being sampled
sampledPaths[basePath] = true
// Sample based on configured rate
return Random.nextInt(100) < loggingConfig.samplingRate
}
// Not a high-traffic path, log normally
return true
}
/**
* Extracts the base path from a full path for grouping similar requests.
* For example, "/api/v1/users/123" becomes "/api/v1/users"
*/
private fun extractBasePath(path: String): String {
val parts = path.split("/").filter { it.isNotEmpty() }
// Handle special cases
if (parts.isEmpty()) return "/"
// For API paths, include up to the resource name (typically 3 parts: api, version, resource)
if (parts.size >= 1 && parts[0] == "api") {
val depth = minOf(3, parts.size)
return "/" + parts.take(depth).joinToString("/")
}
// For other paths, include up to 2 parts
val depth = minOf(2, parts.size)
return "/" + parts.take(depth).joinToString("/")
}
@@ -4,18 +4,113 @@ import at.mocode.dto.base.ApiResponse
import at.mocode.shared.config.AppConfig
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.plugins.calllogging.*
import io.ktor.server.plugins.callloging.*
import io.ktor.server.plugins.statuspages.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import org.slf4j.event.Level
import java.util.*
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.random.Random
/**
* Monitoring and logging configuration for the API Gateway.
*
* Configures request logging, error handling, and status pages.
* Works together with RequestTracingConfig for cross-service tracing.
* Includes log sampling for high-traffic endpoints to reduce log volume.
*/
// Map to track request counts by path for log sampling
private val requestCountsByPath = ConcurrentHashMap<String, AtomicInteger>()
// Map to track high-traffic paths that are being sampled
private val sampledPaths = ConcurrentHashMap<String, Boolean>()
// Scheduler to reset request counts periodically
private val requestCountResetScheduler = Executors.newSingleThreadScheduledExecutor().apply {
scheduleAtFixedRate({
// Reset all counters every minute
requestCountsByPath.clear()
// Log which paths are being sampled
if (sampledPaths.isNotEmpty()) {
val sampledPathsList = sampledPaths.keys.joinToString(", ")
println("[LogSampling] Currently sampling high-traffic paths: $sampledPathsList")
}
// Clear sampled paths to re-evaluate in the next period
sampledPaths.clear()
}, 1, 1, TimeUnit.MINUTES)
}
/**
* Determines if a request should be logged based on sampling configuration.
*
* @param path The request path
* @param statusCode The response status code
* @param loggingConfig The logging configuration
* @return True if the request should be logged, false otherwise
*/
private fun shouldLogRequest(path: String, statusCode: HttpStatusCode?, loggingConfig: at.mocode.shared.config.LoggingConfig): Boolean {
// If sampling is disabled, always log
if (!loggingConfig.enableLogSampling) {
return true
}
// Always log errors if configured
if (loggingConfig.alwaysLogErrors && statusCode != null && statusCode.value >= 400) {
return true
}
// Always log specific paths if configured
val normalizedPath = path.trimStart('/')
if (loggingConfig.alwaysLogPaths.any { normalizedPath.startsWith(it.trimStart('/')) }) {
return true
}
// Get or create counter for this path
val basePath = extractBasePath(path)
val counter = requestCountsByPath.computeIfAbsent(basePath) { AtomicInteger(0) }
val count = counter.incrementAndGet()
// Check if this is a high-traffic path
if (count >= loggingConfig.highTrafficThreshold) {
// Mark this path as being sampled
sampledPaths[basePath] = true
// Sample based on configured rate
return Random.nextInt(100) < loggingConfig.samplingRate
}
// Not a high-traffic path, log normally
return true
}
/**
* Extracts the base path from a full path for grouping similar requests.
* For example, "/api/v1/users/123" becomes "/api/v1/users"
*/
private fun extractBasePath(path: String): String {
val parts = path.split("/").filter { it.isNotEmpty() }
// Handle special cases
if (parts.isEmpty()) return "/"
// For API paths, include up to the resource name (typically 3 parts: api, version, resource)
if (parts.size >= 1 && parts[0] == "api") {
val depth = minOf(3, parts.size)
return "/" + parts.take(depth).joinToString("/")
}
// For other paths, include up to 2 parts
val depth = minOf(2, parts.size)
return "/" + parts.take(depth).joinToString("/")
}
fun Application.configureMonitoring() {
val loggingConfig = AppConfig.logging
@@ -42,22 +137,20 @@ fun Application.configureMonitoring() {
val path = call.request.path()
val userAgent = call.request.headers["User-Agent"]
val clientIp = call.request.local.remoteHost
val timestamp = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
// Generiere eine Correlation-ID für das Request-Tracking
val correlationId = UUID.randomUUID().toString()
// Füge Correlation-ID als Response-Header hinzu
if (loggingConfig.includeCorrelationId) {
call.response.header("X-Correlation-ID", correlationId)
}
// Get the request ID from the call attributes (set by RequestTracingConfig)
val requestId = call.attributes.getOrNull(REQUEST_ID_KEY) ?: "no-request-id"
if (loggingConfig.useStructuredLogging) {
// Strukturiertes Logging-Format
buildString {
append("timestamp=$timestamp ")
append("method=$httpMethod ")
append("path=$path ")
append("status=$status ")
append("client=$clientIp ")
append("requestId=$requestId ")
// Log Headers wenn konfiguriert
if (loggingConfig.logRequestHeaders) {
@@ -70,12 +163,31 @@ fun Application.configureMonitoring() {
if (contentType != null) {
append("contentType=$contentType ")
}
// Log all headers if in debug mode, filtering sensitive data
if (loggingConfig.level.equals("DEBUG", ignoreCase = true)) {
append("headers={")
call.request.headers.entries().joinTo(this, ", ") {
if (isSensitiveHeader(it.key)) {
"${it.key}=*****"
} else {
"${it.key}=${it.value.joinToString(",")}"
}
}
append("} ")
}
}
// Log Query-Parameter wenn konfiguriert
if (loggingConfig.logRequestParameters && call.request.queryParameters.entries().isNotEmpty()) {
append("params={")
call.request.queryParameters.entries().joinTo(this, ", ") { "${it.key}=${it.value.joinToString(",")}" }
call.request.queryParameters.entries().joinTo(this, ", ") {
if (isSensitiveParameter(it.key)) {
"${it.key}=*****"
} else {
"${it.key}=${it.value.joinToString(",")}"
}
}
append("} ")
}
@@ -83,29 +195,51 @@ fun Application.configureMonitoring() {
append("userAgent=\"${userAgent.replace("\"", "\\\"")}\" ")
}
// Füge Correlation-ID hinzu, wenn konfiguriert
if (loggingConfig.includeCorrelationId) {
append("correlationId=$correlationId ")
// Log response time if available from RequestTracingConfig
call.attributes.getOrNull(REQUEST_START_TIME_KEY)?.let { startTime ->
val duration = System.currentTimeMillis() - startTime
append("duration=${duration}ms ")
}
// Add performance metrics
val memoryUsage = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()
append("memoryUsage=${memoryUsage}b ")
// Add additional performance metrics in debug mode
if (loggingConfig.level.equals("DEBUG", ignoreCase = true)) {
val availableProcessors = Runtime.getRuntime().availableProcessors()
val maxMemory = Runtime.getRuntime().maxMemory()
append("processors=$availableProcessors ")
append("maxMemory=${maxMemory}b ")
}
}
} else {
// Einfaches Logging-Format
"$status: $httpMethod $path - $clientIp - $userAgent"
val duration = call.attributes.getOrNull(REQUEST_START_TIME_KEY)?.let {
" - Duration: ${System.currentTimeMillis() - it}ms"
} ?: ""
"$timestamp - $status: $httpMethod $path - RequestID: $requestId - $clientIp - $userAgent$duration"
}
}
}
// Erweiterte Logging-Konfiguration für den API-Gateway
log.info("API Gateway konfiguriert mit erweitertem Logging")
log.info("API Gateway konfiguriert mit erweitertem Logging und Cross-Service Tracing")
log.info("Logging-Konfiguration: level=${loggingConfig.level}, " +
"logRequests=${loggingConfig.logRequests}, " +
"logResponses=${loggingConfig.logResponses}, " +
"logRequestHeaders=${loggingConfig.logRequestHeaders}, " +
"logRequestParameters=${loggingConfig.logRequestParameters}")
"logRequestParameters=${loggingConfig.logRequestParameters}, " +
"requestIdHeader=${loggingConfig.requestIdHeader}, " +
"propagateRequestId=${loggingConfig.propagateRequestId}")
install(StatusPages) {
exception<Throwable> { call, cause ->
call.application.log.error("Unhandled exception", cause)
// Get the request ID for error logging
val requestId = call.attributes.getOrNull(REQUEST_ID_KEY) ?: "no-request-id"
call.application.log.error("Unhandled exception - RequestID: $requestId", cause)
call.respond(
HttpStatusCode.InternalServerError,
ApiResponse.error<Any>("Internal server error: ${cause.message}")
@@ -113,6 +247,10 @@ fun Application.configureMonitoring() {
}
status(HttpStatusCode.NotFound) { call, status ->
// Get the request ID for error logging
val requestId = call.attributes.getOrNull(REQUEST_ID_KEY) ?: "no-request-id"
call.application.log.warn("Not found - Path: ${call.request.path()} - RequestID: $requestId")
call.respond(
status,
ApiResponse.error<Any>("Endpoint not found: ${call.request.path()}")
@@ -120,6 +258,10 @@ fun Application.configureMonitoring() {
}
status(HttpStatusCode.Unauthorized) { call, status ->
// Get the request ID for error logging
val requestId = call.attributes.getOrNull(REQUEST_ID_KEY) ?: "no-request-id"
call.application.log.warn("Unauthorized access - Path: ${call.request.path()} - RequestID: $requestId")
call.respond(
status,
ApiResponse.error<Any>("Authentication required")
@@ -127,10 +269,48 @@ fun Application.configureMonitoring() {
}
status(HttpStatusCode.Forbidden) { call, status ->
// Get the request ID for error logging
val requestId = call.attributes.getOrNull(REQUEST_ID_KEY) ?: "no-request-id"
call.application.log.warn("Forbidden access - Path: ${call.request.path()} - RequestID: $requestId")
call.respond(
status,
ApiResponse.error<Any>("Access forbidden")
)
}
// Rate limit exceeded
status(HttpStatusCode.TooManyRequests) { call, status ->
// Get the request ID for error logging
val requestId = call.attributes.getOrNull(REQUEST_ID_KEY) ?: "no-request-id"
call.application.log.warn("Rate limit exceeded - Path: ${call.request.path()} - RequestID: $requestId")
call.respond(
status,
ApiResponse.error<Any>("Rate limit exceeded. Please try again later.")
)
}
}
}
/**
* Determines if a header is sensitive and should be masked in logs.
*/
private fun isSensitiveHeader(headerName: String): Boolean {
val sensitiveHeaders = listOf(
"authorization", "cookie", "set-cookie", "x-api-key", "api-key",
"password", "token", "secret", "credential", "apikey"
)
return sensitiveHeaders.any { headerName.lowercase().contains(it) }
}
/**
* Determines if a parameter is sensitive and should be masked in logs.
*/
private fun isSensitiveParameter(paramName: String): Boolean {
val sensitiveParams = listOf(
"password", "token", "secret", "credential", "apikey", "key",
"auth", "pin", "code", "otp", "cvv", "ssn", "credit"
)
return sensitiveParams.any { paramName.lowercase().contains(it) }
}
@@ -0,0 +1,517 @@
package at.mocode.gateway.config
import at.mocode.shared.config.AppConfig
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.plugins.ratelimit.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import kotlin.time.Duration.Companion.minutes
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.nio.charset.StandardCharsets
import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.AtomicInteger
/**
* Configuration for advanced rate limiting in the API Gateway.
*
* This configuration adds support for:
* - Global rate limiting
* - Endpoint-specific rate limiting
* - User-type-specific rate limiting
* - Rate limit headers in responses
* - Token parsing caching for improved performance
* - Adaptive rate limiting based on server load
*/
// Cache for parsed JWT tokens to avoid repeated decoding
// Key: Token hash, Value: Parsed token data (userId to userType mapping)
private val tokenCache = ConcurrentHashMap<Int, Pair<String, String>>()
// Cache expiration settings
private const val TOKEN_CACHE_MAX_SIZE = 10000 // Maximum number of tokens to cache
private const val TOKEN_CACHE_EXPIRATION_MINUTES = 60L // Cache expiration time in minutes
// Schedule cache cleanup to prevent memory leaks
private val cacheCleanupScheduler = java.util.Timer("token-cache-cleanup").apply {
schedule(object : java.util.TimerTask() {
override fun run() {
if (tokenCache.size > TOKEN_CACHE_MAX_SIZE) {
// If cache exceeds max size, remove oldest entries (simple approach)
val keysToRemove = tokenCache.keys.take(tokenCache.size - TOKEN_CACHE_MAX_SIZE / 2)
keysToRemove.forEach { tokenCache.remove(it) }
}
}
}, TimeUnit.MINUTES.toMillis(10), TimeUnit.MINUTES.toMillis(10))
}
/**
* Adaptive rate limiting configuration.
* These settings control how rate limits are adjusted based on server load.
*/
private object AdaptiveRateLimiting {
// Enable/disable adaptive rate limiting
const val ENABLED = true
// Thresholds for CPU usage (percentage)
const val CPU_MEDIUM_LOAD_THRESHOLD = 60.0 // Medium load threshold (60%)
const val CPU_HIGH_LOAD_THRESHOLD = 80.0 // High load threshold (80%)
// Thresholds for memory usage (percentage)
const val MEMORY_MEDIUM_LOAD_THRESHOLD = 70.0 // Medium load threshold (70%)
const val MEMORY_HIGH_LOAD_THRESHOLD = 85.0 // High load threshold (85%)
// Rate limit adjustment factors
const val MEDIUM_LOAD_FACTOR = 0.7 // Reduce limits to 70% under medium load
const val HIGH_LOAD_FACTOR = 0.4 // Reduce limits to 40% under high load
// Monitoring interval in milliseconds
const val MONITORING_INTERVAL_MS = 5000L // Check every 5 seconds
// Current load factor (starts at 1.0 = 100%)
val currentLoadFactor = AtomicInteger(100)
// Get the current load factor as a double (0.0-1.0)
fun getCurrentLoadFactor(): Double = currentLoadFactor.get() / 100.0
// Initialize the load monitoring
init {
if (ENABLED) {
startLoadMonitoring()
}
}
/**
* Start monitoring server load and adjusting the rate limit factor.
*/
private fun startLoadMonitoring() {
val timer = java.util.Timer("adaptive-rate-limit-monitor", true)
val operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
val runtime = Runtime.getRuntime()
timer.schedule(object : java.util.TimerTask() {
override fun run() {
try {
// Get CPU load (if available)
val cpuLoad = if (operatingSystemMXBean is com.sun.management.OperatingSystemMXBean) {
operatingSystemMXBean.processCpuLoad * 100
} else {
// Fallback if the specific implementation is not available
operatingSystemMXBean.systemLoadAverage.takeIf { it >= 0 }?.let {
it * 100 / runtime.availableProcessors()
} ?: 50.0 // Default to 50% if not available
}
// Get memory usage
val maxMemory = runtime.maxMemory().toDouble()
val usedMemory = (runtime.totalMemory() - runtime.freeMemory()).toDouble()
val memoryUsage = (usedMemory / maxMemory) * 100
// Determine load factor based on CPU and memory usage
val newLoadFactor = when {
cpuLoad > CPU_HIGH_LOAD_THRESHOLD || memoryUsage > MEMORY_HIGH_LOAD_THRESHOLD ->
(HIGH_LOAD_FACTOR * 100).toInt()
cpuLoad > CPU_MEDIUM_LOAD_THRESHOLD || memoryUsage > MEMORY_MEDIUM_LOAD_THRESHOLD ->
(MEDIUM_LOAD_FACTOR * 100).toInt()
else -> 100 // Normal load = 100%
}
// Update the load factor if it changed
val oldLoadFactor = currentLoadFactor.getAndSet(newLoadFactor)
if (oldLoadFactor != newLoadFactor) {
println("[AdaptiveRateLimiting] Load factor changed: ${oldLoadFactor/100.0} -> ${newLoadFactor/100.0} " +
"(CPU: ${String.format("%.1f", cpuLoad)}%, Memory: ${String.format("%.1f", memoryUsage)}%)")
}
} catch (e: Exception) {
// If any error occurs, reset to normal load
currentLoadFactor.set(100)
println("[AdaptiveRateLimiting] Error monitoring system load: ${e.message}")
}
}
}, 0, MONITORING_INTERVAL_MS)
}
/**
* Adjust a rate limit based on the current server load.
*/
fun adjustRateLimit(baseLimit: Int): Int {
if (!ENABLED) return baseLimit
val factor = getCurrentLoadFactor()
return (baseLimit * factor).toInt().coerceAtLeast(1) // Ensure at least 1 request is allowed
}
}
/**
* Efficient hashing function for request keys.
* Uses FNV-1a hash algorithm which is fast and has good distribution.
*/
private fun efficientHash(input: String): Int {
val bytes = input.toByteArray(StandardCharsets.UTF_8)
var hash = 0x811c9dc5.toInt() // FNV-1a prime
for (byte in bytes) {
hash = hash xor byte.toInt()
hash = hash * 0x01000193 // FNV-1a prime multiplier
}
return hash
}
/**
* Generates an efficient request key from multiple inputs.
* Avoids string concatenation by hashing each input separately and combining the hashes.
*/
private fun generateRequestKey(vararg inputs: String?): String {
var combinedHash = 0
for (input in inputs) {
if (input != null && input.isNotEmpty()) {
// Combine hashes using XOR and bit rotation for better distribution
val inputHash = efficientHash(input)
combinedHash = (combinedHash xor inputHash) + ((combinedHash shl 5) + (combinedHash shr 2))
}
}
return combinedHash.toString()
}
fun Application.configureRateLimiting() {
val config = AppConfig.rateLimit
if (!config.enabled) {
log.info("Rate limiting is disabled")
return
}
install(RateLimit) {
// Global rate limiting configuration
global {
// Limit based on configuration, adjusted for server load
rateLimiter(
limit = AdaptiveRateLimiting.adjustRateLimit(config.globalLimit),
refillPeriod = config.globalPeriodMinutes.minutes
)
// Enhanced request-key based on IP address and optional User-Agent
// Using efficient hashing for better performance
requestKey { call ->
val ip = call.request.local.remoteHost
val userAgent = call.request.userAgent() ?: ""
// Use efficient hashing to generate request key
generateRequestKey(ip, userAgent)
}
}
// Endpoint-specific rate limiting
for ((endpoint, limitConfig) in config.endpointLimits) {
register(RateLimitName(endpoint)) {
// Limit based on configuration, adjusted for server load
rateLimiter(
limit = AdaptiveRateLimiting.adjustRateLimit(limitConfig.limit),
refillPeriod = limitConfig.periodMinutes.minutes
)
// Enhanced request-key with IP and optional request ID
// Using efficient hashing for better performance
requestKey { call ->
val ip = call.request.local.remoteHost
val requestId = call.attributes.getOrNull(REQUEST_ID_KEY)?.toString() ?: ""
val endpoint = endpoint // Include endpoint in the key for better separation
// Use efficient hashing to generate request key
generateRequestKey(ip, requestId, endpoint)
}
}
}
// User-type-specific rate limiting
register(RateLimitName("anonymous")) {
// Limit based on configuration, adjusted for server load
rateLimiter(
limit = AdaptiveRateLimiting.adjustRateLimit(config.userTypeLimits["anonymous"]?.limit ?: config.globalLimit),
refillPeriod = (config.userTypeLimits["anonymous"]?.periodMinutes ?: config.globalPeriodMinutes).minutes
)
// Enhanced request-key with IP and user agent for anonymous users
// Using efficient hashing for better performance
requestKey { call ->
val ip = call.request.local.remoteHost
val userAgent = call.request.userAgent() ?: ""
// Use efficient hashing to generate request key with "anon" prefix for type separation
generateRequestKey("anon", ip, userAgent)
}
}
register(RateLimitName("authenticated")) {
// Limit based on configuration, adjusted for server load
rateLimiter(
limit = AdaptiveRateLimiting.adjustRateLimit(config.userTypeLimits["authenticated"]?.limit ?: config.globalLimit),
refillPeriod = (config.userTypeLimits["authenticated"]?.periodMinutes ?: config.globalPeriodMinutes).minutes
)
// Using efficient hashing for better performance
requestKey { call ->
// Use user ID from JWT token if available, otherwise use IP
val userId = call.request.header("Authorization")?.let { extractUserIdFromToken(it) }
val ip = call.request.local.remoteHost
// Use efficient hashing to generate request key with "auth" prefix for type separation
generateRequestKey("auth", userId ?: "", ip)
}
}
register(RateLimitName("admin")) {
// Limit based on configuration, adjusted for server load
rateLimiter(
limit = AdaptiveRateLimiting.adjustRateLimit(config.userTypeLimits["admin"]?.limit ?: config.globalLimit),
refillPeriod = (config.userTypeLimits["admin"]?.periodMinutes ?: config.globalPeriodMinutes).minutes
)
// Using efficient hashing for better performance
requestKey { call ->
// Use user ID from JWT token if available, otherwise use IP
val userId = call.request.header("Authorization")?.let { extractUserIdFromToken(it) }
val ip = call.request.local.remoteHost
// Use efficient hashing to generate request key with "admin" prefix for type separation
generateRequestKey("admin", userId ?: "", ip)
}
}
}
// Add rate limit headers to all responses
if (config.includeHeaders) {
intercept(ApplicationCallPipeline.Plugins) {
// Get current load factor for adaptive rate limiting
val loadFactor = AdaptiveRateLimiting.getCurrentLoadFactor()
val adjustedGlobalLimit = AdaptiveRateLimiting.adjustRateLimit(config.globalLimit)
// Add basic rate limit headers
call.response.header("X-RateLimit-Enabled", "true")
call.response.header("X-RateLimit-Limit", config.globalLimit.toString())
call.response.header("X-RateLimit-Adjusted-Limit", adjustedGlobalLimit.toString())
// Add adaptive rate limiting information
call.response.header("X-RateLimit-Load-Factor", String.format("%.2f", loadFactor))
call.response.header("X-RateLimit-Adaptive", AdaptiveRateLimiting.ENABLED.toString())
// Add standard rate limit headers
call.response.header("X-RateLimit-Policy", "${config.globalLimit} requests per ${config.globalPeriodMinutes} minutes")
call.response.header("X-RateLimit-Adjusted-Policy", "${adjustedGlobalLimit} requests per ${config.globalPeriodMinutes} minutes")
// Add estimated reset time (simplified version)
val resetTimeSeconds = config.globalPeriodMinutes * 60
call.response.header("X-RateLimit-Reset", resetTimeSeconds.toString())
// Add retry-after header if rate limited (status code 429)
if (call.response.status() == HttpStatusCode.TooManyRequests) {
// Calculate retry-after value based on rate limit configuration
val retryAfter = (config.globalPeriodMinutes * 60 / config.globalLimit).coerceAtLeast(1)
call.response.header(HttpHeaders.RetryAfter, retryAfter.toString())
}
// Add more detailed headers based on the request path
val path = call.request.path()
config.endpointLimits.entries.find { path.startsWith("/${it.key}") }?.let { (endpoint, limitConfig) ->
// Calculate adjusted limit for this endpoint
val adjustedEndpointLimit = AdaptiveRateLimiting.adjustRateLimit(limitConfig.limit)
call.response.header("X-RateLimit-Endpoint", endpoint)
call.response.header("X-RateLimit-Endpoint-Limit", limitConfig.limit.toString())
call.response.header("X-RateLimit-Endpoint-Adjusted-Limit", adjustedEndpointLimit.toString())
call.response.header("X-RateLimit-Endpoint-Period", "${limitConfig.periodMinutes}m")
call.response.header("X-RateLimit-Endpoint-Reset", (limitConfig.periodMinutes * 60).toString())
}
// Add user type rate limit headers if authenticated
val authHeader = call.request.header("Authorization")
if (authHeader != null) {
val userType = determineUserType(authHeader)
config.userTypeLimits[userType]?.let { limitConfig ->
// Calculate adjusted limit for this user type
val adjustedUserTypeLimit = AdaptiveRateLimiting.adjustRateLimit(limitConfig.limit)
call.response.header("X-RateLimit-UserType", userType)
call.response.header("X-RateLimit-UserType-Limit", limitConfig.limit.toString())
call.response.header("X-RateLimit-UserType-Adjusted-Limit", adjustedUserTypeLimit.toString())
call.response.header("X-RateLimit-UserType-Period", "${limitConfig.periodMinutes}m")
call.response.header("X-RateLimit-UserType-Reset", (limitConfig.periodMinutes * 60).toString())
}
}
// Log rate limiting information if rate limited
if (call.response.status() == HttpStatusCode.TooManyRequests) {
val requestId = call.attributes.getOrNull(REQUEST_ID_KEY) ?: "no-request-id"
val retryAfter = (config.globalPeriodMinutes * 60 / config.globalLimit).coerceAtLeast(1)
val loadFactor = AdaptiveRateLimiting.getCurrentLoadFactor()
val originalLimit = config.globalLimit
val adjustedLimit = AdaptiveRateLimiting.adjustRateLimit(originalLimit)
application.log.warn("Rate limit exceeded - Path: ${call.request.path()} - " +
"RequestID: $requestId - Client: ${call.request.local.remoteHost} - " +
"RetryAfter: ${retryAfter}s - " +
"LoadFactor: ${String.format("%.2f", loadFactor)} - " +
"OriginalLimit: $originalLimit - AdjustedLimit: $adjustedLimit")
}
}
}
// Log basic rate limiting configuration
log.info("Rate limiting configured with global limit: ${config.globalLimit}/${config.globalPeriodMinutes}m")
log.info("Endpoint-specific limits: ${config.endpointLimits.size} configured")
log.info("User-type-specific limits: ${config.userTypeLimits.size} configured")
// Log adaptive rate limiting configuration
if (AdaptiveRateLimiting.ENABLED) {
log.info("Adaptive rate limiting ENABLED with current load factor: ${String.format("%.2f", AdaptiveRateLimiting.getCurrentLoadFactor())}")
log.info("Adaptive thresholds - CPU: Medium=${AdaptiveRateLimiting.CPU_MEDIUM_LOAD_THRESHOLD}%, High=${AdaptiveRateLimiting.CPU_HIGH_LOAD_THRESHOLD}%")
log.info("Adaptive thresholds - Memory: Medium=${AdaptiveRateLimiting.MEMORY_MEDIUM_LOAD_THRESHOLD}%, High=${AdaptiveRateLimiting.MEMORY_HIGH_LOAD_THRESHOLD}%")
log.info("Adaptive factors - Medium load: ${AdaptiveRateLimiting.MEDIUM_LOAD_FACTOR}, High load: ${AdaptiveRateLimiting.HIGH_LOAD_FACTOR}")
log.info("Adaptive monitoring interval: ${AdaptiveRateLimiting.MONITORING_INTERVAL_MS}ms")
// Log examples of adjusted limits
log.info("Example adjusted limits at current load factor (${String.format("%.2f", AdaptiveRateLimiting.getCurrentLoadFactor())}): " +
"Global: ${config.globalLimit}${AdaptiveRateLimiting.adjustRateLimit(config.globalLimit)}")
} else {
log.info("Adaptive rate limiting DISABLED")
}
}
/**
* Extract user ID from JWT token.
* Parses the JWT token to extract the user ID from the subject claim.
* Uses caching to avoid repeated parsing of the same token.
*/
private fun extractUserIdFromToken(authHeader: String): String? {
try {
// Remove "Bearer " prefix if present
val token = if (authHeader.startsWith("Bearer ")) {
authHeader.substring(7)
} else {
authHeader
}
// Calculate token hash for cache lookup
val tokenHash = token.hashCode()
// Check if token is in cache
val cachedValue = tokenCache[tokenHash]
if (cachedValue != null) {
// Return cached user ID
return cachedValue.first
}
// Token not in cache, parse it
// Split the token into parts
val parts = token.split(".")
if (parts.size != 3) {
return null // Not a valid JWT token
}
// Decode the payload (second part) - this is the expensive operation we want to cache
val payload = String(java.util.Base64.getUrlDecoder().decode(parts[1]))
// Extract the subject (user ID) using a simple regex
// In a production environment, use a proper JWT library
val subjectRegex = "\"sub\"\\s*:\\s*\"([^\"]+)\"".toRegex()
val matchResult = subjectRegex.find(payload)
// Determine user type in the same parsing operation to avoid duplicate work
val userType = determineUserTypeFromPayload(payload)
// Get the user ID
val userId = matchResult?.groupValues?.get(1) ?: token.hashCode().toString()
// Store in cache for future use
tokenCache[tokenHash] = Pair(userId, userType)
return userId
} catch (e: Exception) {
// If any error occurs during parsing, fall back to using the token hash
return authHeader.hashCode().toString()
}
}
/**
* Determine user type from JWT token.
* Parses the JWT token to extract the user role from the claims.
* Uses caching to avoid repeated parsing of the same token.
*/
private fun determineUserType(authHeader: String): String {
try {
// Remove "Bearer " prefix if present
val token = if (authHeader.startsWith("Bearer ")) {
authHeader.substring(7)
} else {
authHeader
}
// Calculate token hash for cache lookup
val tokenHash = token.hashCode()
// Check if token is in cache
val cachedValue = tokenCache[tokenHash]
if (cachedValue != null) {
// Return cached user type
return cachedValue.second
}
// Token not in cache, parse it
// Split the token into parts
val parts = token.split(".")
if (parts.size != 3) {
return "authenticated" // Default to authenticated if not a valid JWT
}
// Decode the payload (second part)
val payload = String(java.util.Base64.getUrlDecoder().decode(parts[1]))
// Determine user type from payload
val userType = determineUserTypeFromPayload(payload)
// Extract user ID in the same parsing operation to avoid duplicate work
val subjectRegex = "\"sub\"\\s*:\\s*\"([^\"]+)\"".toRegex()
val matchResult = subjectRegex.find(payload)
val userId = matchResult?.groupValues?.get(1) ?: token.hashCode().toString()
// Store in cache for future use
tokenCache[tokenHash] = Pair(userId, userType)
return userType
} catch (e: Exception) {
// If any error occurs during parsing, default to authenticated
return "authenticated"
}
}
/**
* Helper function to determine user type from JWT payload.
* Extracted to avoid code duplication between extractUserIdFromToken and determineUserType.
*/
private fun determineUserTypeFromPayload(payload: String): String {
try {
// Extract the role using a simple regex
// Look for role, roles, or authorities claims
val roleRegex = "\"(role|roles|authorities)\"\\s*:\\s*\"([^\"]+)\"".toRegex()
val matchResult = roleRegex.find(payload)
if (matchResult != null) {
val role = matchResult.groupValues[2].lowercase()
return when {
role.contains("admin") -> "admin"
else -> "authenticated"
}
}
// Check for an array of roles
val rolesArrayRegex = "\"(role|roles|authorities)\"\\s*:\\s*\\[([^\\]]+)\\]".toRegex()
val arrayMatchResult = rolesArrayRegex.find(payload)
if (arrayMatchResult != null) {
val rolesArray = arrayMatchResult.groupValues[2]
return when {
rolesArray.contains("admin") -> "admin"
else -> "authenticated"
}
}
// Default to authenticated if no role information found
return "authenticated"
} catch (e: Exception) {
// If any error occurs during parsing, default to authenticated
return "authenticated"
}
}
@@ -0,0 +1,248 @@
package at.mocode.gateway.config
import at.mocode.shared.config.AppConfig
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.plugins.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.util.*
import java.util.UUID
/**
* Configuration for request tracing and cross-service correlation.
*
* This configuration adds support for:
* - Request ID generation and propagation
* - Cross-service tracing
* - Correlation ID extraction from incoming requests
* - Correlation ID propagation to outgoing requests
*/
// Define attribute key for storing request ID in the ApplicationCall
val REQUEST_ID_KEY = AttributeKey<String>("RequestId")
val REQUEST_START_TIME_KEY = AttributeKey<Long>("RequestStartTime")
/**
* Configures request tracing for the API Gateway.
*/
fun Application.configureRequestTracing() {
val config = AppConfig.logging
// Install a hook to intercept all incoming requests
intercept(ApplicationCallPipeline.Monitoring) {
// Store the start time for timing measurements
val startTime = System.currentTimeMillis()
call.attributes.put(REQUEST_START_TIME_KEY, startTime)
// Try to extract request ID from incoming request headers
val requestId = if (config.generateRequestIdIfMissing) {
call.request.header(config.requestIdHeader) ?: generateRequestId()
} else {
call.request.header(config.requestIdHeader) ?: "no-request-id"
}
// Store the request ID in the call attributes for later use
call.attributes.put(REQUEST_ID_KEY, requestId)
// Add tracing headers to the response
if (config.propagateRequestId) {
// Add the primary request ID header
call.response.header(config.requestIdHeader, requestId)
// Add additional tracing headers for better cross-service correlation
call.response.header("X-Correlation-ID", requestId)
call.response.header("X-Request-Start-Time", startTime.toString())
call.response.header("X-Service-Name", AppConfig.appInfo.name)
call.response.header("X-Service-Version", AppConfig.appInfo.version)
// Add trace parent header for W3C trace context compatibility
// Format: 00-traceid-parentid-01 (version-traceid-parentid-flags)
val traceId = requestId.replace("-", "").takeLast(32).padStart(32, '0')
val parentId = requestId.hashCode().toString(16).takeLast(16).padStart(16, '0')
call.response.header("traceparent", "00-$traceId-$parentId-01")
}
// Log the request with enhanced tracing information
if (config.logRequests) {
val clientIp = call.request.origin.remoteHost
val userAgent = call.request.userAgent() ?: "unknown"
val referer = call.request.header("Referer") ?: "-"
val contentType = call.request.contentType().toString()
val contentLength = call.request.header(HttpHeaders.ContentLength) ?: "0"
val host = call.request.host()
val scheme = call.request.local.scheme
val port = call.request.port()
val method = call.request.httpMethod.value
val path = call.request.path()
val queryString = call.request.queryString().let { if (it.isNotEmpty()) "?$it" else "" }
// Extract trace context from incoming request if present
val traceParent = call.request.header("traceparent") ?: "-"
val traceState = call.request.header("tracestate") ?: "-"
if (config.useStructuredLogging) {
application.log.info(
"type=request " +
"requestId=$requestId " +
"method=$method " +
"path=$path " +
"query=$queryString " +
"scheme=$scheme " +
"host=$host " +
"port=$port " +
"client=$clientIp " +
"userAgent=\"$userAgent\" " +
"referer=\"$referer\" " +
"contentType=$contentType " +
"contentLength=$contentLength " +
"traceParent=$traceParent " +
"traceState=$traceState " +
"timestamp=${System.currentTimeMillis()}"
)
} else {
application.log.info(
"Incoming request: $method $path$queryString - " +
"Host: $host:$port - " +
"Scheme: $scheme - " +
"Client: $clientIp - " +
"UserAgent: $userAgent - " +
"Referer: $referer - " +
"ContentType: $contentType - " +
"ContentLength: $contentLength - " +
"RequestID: $requestId - " +
"TraceParent: $traceParent"
)
}
}
}
// Install a hook to intercept all outgoing responses
intercept(ApplicationCallPipeline.Plugins) {
// Get the request ID from the call attributes
val requestId = call.attributes[REQUEST_ID_KEY]
// Process the request
proceed()
// Calculate response time if configured
if (config.logResponseTime) {
val startTime = call.attributes[REQUEST_START_TIME_KEY]
val endTime = System.currentTimeMillis()
val duration = endTime - startTime
// Add timing information to response headers
call.response.header("X-Response-Time", "$duration")
// Log the response with enhanced tracing information
if (config.logResponses) {
val status = call.response.status() ?: HttpStatusCode.OK
val path = call.request.path()
val method = call.request.httpMethod.value
val contentType = call.response.headers["Content-Type"] ?: "-"
val contentLength = call.response.headers["Content-Length"] ?: "0"
// Get memory usage for performance monitoring
val memoryUsage = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()
// Extract trace context from response
val traceParent = call.response.headers["traceparent"] ?: "-"
if (config.useStructuredLogging) {
application.log.info(
"type=response " +
"requestId=$requestId " +
"method=$method " +
"path=$path " +
"status=$status " +
"duration=${duration}ms " +
"contentType=$contentType " +
"contentLength=$contentLength " +
"traceParent=$traceParent " +
"memoryUsage=${memoryUsage}b " +
"timestamp=${System.currentTimeMillis()}"
)
} else {
application.log.info(
"Response: $status - " +
"Method: $method - " +
"Path: $path - " +
"RequestID: $requestId - " +
"Duration: ${duration}ms - " +
"ContentType: $contentType - " +
"ContentLength: $contentLength - " +
"TraceParent: $traceParent - " +
"MemoryUsage: ${memoryUsage}b"
)
}
}
} else if (config.logResponses) {
// Log the response without timing information but with enhanced tracing data
val status = call.response.status() ?: HttpStatusCode.OK
val path = call.request.path()
val method = call.request.httpMethod.value
val contentType = call.response.headers["Content-Type"] ?: "-"
val contentLength = call.response.headers["Content-Length"] ?: "0"
// Get memory usage for performance monitoring
val memoryUsage = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()
// Extract trace context from response
val traceParent = call.response.headers["traceparent"] ?: "-"
if (config.useStructuredLogging) {
application.log.info(
"type=response " +
"requestId=$requestId " +
"method=$method " +
"path=$path " +
"status=$status " +
"contentType=$contentType " +
"contentLength=$contentLength " +
"traceParent=$traceParent " +
"memoryUsage=${memoryUsage}b " +
"timestamp=${System.currentTimeMillis()}"
)
} else {
application.log.info(
"Response: $status - " +
"Method: $method - " +
"Path: $path - " +
"RequestID: $requestId - " +
"ContentType: $contentType - " +
"ContentLength: $contentLength - " +
"TraceParent: $traceParent - " +
"MemoryUsage: ${memoryUsage}b"
)
}
}
}
log.info("Request tracing configured with header: ${config.requestIdHeader}")
}
/**
* Generates a new request ID with enhanced context information.
*
* Format: prefix-environment-service-timestamp-uuid
* Example: req-prod-gateway-1627384950123-550e8400-e29b-41d4-a716-446655440000
*/
private fun generateRequestId(): String {
val uuid = UUID.randomUUID().toString()
val timestamp = System.currentTimeMillis()
// Get environment prefix safely (first 4 chars or less)
val environment = AppConfig.environment.toString().let { env ->
if (env.length > 4) env.substring(0, 4) else env
}.lowercase()
// Get service name, replacing spaces with dashes
val serviceName = AppConfig.appInfo.name.replace(" ", "-").lowercase()
return "req-$environment-$serviceName-$timestamp-$uuid"
}
/**
* Extension function to get the request ID from the call.
*/
fun ApplicationCall.requestId(): String = attributes[REQUEST_ID_KEY]
@@ -1,6 +1,9 @@
package at.mocode.gateway
import at.mocode.gateway.config.configureMonitoring
import at.mocode.gateway.config.configureOpenApi
import at.mocode.gateway.config.configureRateLimiting
import at.mocode.gateway.config.configureRequestTracing
import at.mocode.gateway.config.configureSwagger
import at.mocode.gateway.routing.docRoutes
import at.mocode.shared.config.AppConfig
@@ -11,10 +14,8 @@ import io.ktor.server.http.content.*
import io.ktor.server.plugins.calllogging.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.plugins.cors.routing.*
import io.ktor.server.plugins.ratelimit.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlin.time.Duration.Companion.minutes
fun Application.module() {
val config = AppConfig
@@ -34,6 +35,8 @@ fun Application.module() {
}
allowHeader(HttpHeaders.ContentType)
allowHeader(HttpHeaders.Authorization)
// Add request ID header to allowed headers
allowHeader(config.logging.requestIdHeader)
allowMethod(HttpMethod.Options)
allowMethod(HttpMethod.Get)
allowMethod(HttpMethod.Post)
@@ -42,40 +45,19 @@ fun Application.module() {
}
}
// Call-Logging installieren
if (config.logging.logRequests) {
install(CallLogging)
}
// Erweiterte Monitoring- und Logging-Konfiguration
configureMonitoring()
// Request Tracing für Cross-Service Tracing konfigurieren
configureRequestTracing()
// Enhanced Rate Limiting konfigurieren
configureRateLimiting()
// OpenAPI und Swagger UI konfigurieren
configureOpenApi()
configureSwagger()
// Rate Limiting konfigurieren
if (config.rateLimit.enabled) {
install(RateLimit) {
// Globale Rate Limiting Konfiguration
global {
// Limit basierend auf Konfiguration
rateLimiter(
limit = config.rateLimit.globalLimit,
refillPeriod = config.rateLimit.globalPeriodMinutes.minutes
)
// Request-Key basierend auf IP-Adresse
requestKey { call -> call.request.local.remoteHost }
}
// Konfiguriere Rate Limiting für spezifische Routen
// Wir verwenden hier einen Interceptor, um die Response-Header hinzuzufügen
if (config.rateLimit.includeHeaders) {
this@module.intercept(ApplicationCallPipeline.Plugins) {
call.response.header("X-RateLimit-Enabled", "true")
call.response.header("X-RateLimit-Limit", config.rateLimit.globalLimit.toString())
}
}
}
}
routing {
// Hauptrouten
get("/") {