feat(core, network): Port-Guards für Mehrfachstarts von P2P-Server integriert

Signed-off-by: Stefan Mogeritsch <stefan.mo.co@gmail.com>
This commit is contained in:
2026-05-08 12:45:09 +02:00
parent 04a435df1d
commit 3959168695
@@ -15,9 +15,15 @@ import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap
class JvmP2pSyncService : P2pSyncService { class JvmP2pSyncService : P2pSyncService {
companion object {
// Prozessweiter, portbasierter Guard gegen Mehrfachstart
private val startedPorts: MutableSet<Int> = ConcurrentHashMap.newKeySet()
}
private var server: EmbeddedServer<*, *>? = null private var server: EmbeddedServer<*, *>? = null
private var currentPort: Int? = null
private val client = HttpClient { private val client = HttpClient {
install(io.ktor.client.plugins.websocket.WebSockets) install(io.ktor.client.plugins.websocket.WebSockets)
} }
@@ -32,41 +38,66 @@ class JvmP2pSyncService : P2pSyncService {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
override fun startServer(port: Int) { override fun startServer(port: Int) {
if (server != null) return // Instanz-Guard (gleiche Instanz)
if (server != null) {
println("[P2P Server] Bereits gestartet (Instanz) auf Port ${currentPort ?: port} idempotent")
return
}
server = embeddedServer(Netty, port = port) { // Prozessweiter, portbasierter Guard
install(io.ktor.server.websocket.WebSockets) if (!startedPorts.add(port)) {
routing { println("[P2P Server] Bereits gestartet (Prozess) auf Port $port idempotent, kein neuer Bind")
webSocket("/sync") { return
println("[P2P Server] Neuer Peer verbunden") }
activeSessions.add(this)
updatePeers() try {
try { server = embeddedServer(Netty, port = port) {
for (frame in incoming) { install(io.ktor.server.websocket.WebSockets)
if (frame is Frame.Text) { routing {
val text = frame.readText() webSocket("/sync") {
try { println("[P2P Server] Neuer Peer verbunden")
val event = Json.decodeFromString<SyncEvent>(text) activeSessions.add(this)
_incomingEvents.emit(event) updatePeers()
} catch (e: Exception) { try {
println("[P2P Server] Fehler beim Dekodieren: ${e.message}") for (frame in incoming) {
if (frame is Frame.Text) {
val text = frame.readText()
try {
val event = Json.decodeFromString<SyncEvent>(text)
_incomingEvents.emit(event)
} catch (e: Exception) {
println("[P2P Server] Fehler beim Dekodieren: ${e.message}")
}
} }
} }
} finally {
activeSessions.remove(this)
updatePeers()
println("[P2P Server] Peer getrennt")
} }
} finally {
activeSessions.remove(this)
updatePeers()
println("[P2P Server] Peer getrennt")
} }
} }
} }.start(wait = false)
}.start(wait = false) currentPort = port
println("[P2P Server] Gestartet auf Port $port") println("[P2P Server] Gestartet auf Port $port")
} catch (e: Exception) {
// Start fehlgeschlagen -> Port-Lock wieder freigeben
startedPorts.remove(port)
server = null
currentPort = null
println("[P2P Server] Start auf Port $port fehlgeschlagen: ${e.message}")
throw e
}
} }
override fun stopServer() { override fun stopServer() {
server?.stop(1000, 2000) try {
server = null server?.stop(1000, 2000)
} finally {
server = null
currentPort?.let { startedPorts.remove(it) }
currentPort = null
}
} }
override suspend fun connectToPeer(host: String, port: Int) { override suspend fun connectToPeer(host: String, port: Int) {