From 3959168695086a8b2886d175d13e2675132cbb87 Mon Sep 17 00:00:00 2001 From: Stefan Mogeritsch Date: Fri, 8 May 2026 12:45:09 +0200 Subject: [PATCH] =?UTF-8?q?feat(core,=20network):=20Port-Guards=20f=C3=BCr?= =?UTF-8?q?=20Mehrfachstarts=20von=20P2P-Server=20integriert?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stefan Mogeritsch --- .../core/network/sync/JvmP2pSyncService.kt | 83 +++++++++++++------ 1 file changed, 57 insertions(+), 26 deletions(-) diff --git a/frontend/core/network/src/jvmMain/kotlin/at/mocode/frontend/core/network/sync/JvmP2pSyncService.kt b/frontend/core/network/src/jvmMain/kotlin/at/mocode/frontend/core/network/sync/JvmP2pSyncService.kt index b0fc802f..101d3b57 100644 --- a/frontend/core/network/src/jvmMain/kotlin/at/mocode/frontend/core/network/sync/JvmP2pSyncService.kt +++ b/frontend/core/network/src/jvmMain/kotlin/at/mocode/frontend/core/network/sync/JvmP2pSyncService.kt @@ -15,9 +15,15 @@ import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.serialization.json.Json import java.util.* +import java.util.concurrent.ConcurrentHashMap class JvmP2pSyncService : P2pSyncService { + companion object { + // Prozessweiter, portbasierter Guard gegen Mehrfachstart + private val startedPorts: MutableSet = ConcurrentHashMap.newKeySet() + } private var server: EmbeddedServer<*, *>? = null + private var currentPort: Int? = null private val client = HttpClient { install(io.ktor.client.plugins.websocket.WebSockets) } @@ -32,41 +38,66 @@ class JvmP2pSyncService : P2pSyncService { private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) override fun startServer(port: Int) { - if (server != null) return + // Instanz-Guard (gleiche Instanz) + if (server != null) { + println("[P2P Server] Bereits gestartet (Instanz) auf Port ${currentPort ?: port} – idempotent") + return + } - server = embeddedServer(Netty, port = port) { - install(io.ktor.server.websocket.WebSockets) - routing { - webSocket("/sync") { - println("[P2P Server] Neuer Peer verbunden") - activeSessions.add(this) - updatePeers() - try { - for (frame in incoming) { - if (frame is Frame.Text) { - val text = frame.readText() - try { - val event = Json.decodeFromString(text) - _incomingEvents.emit(event) - } catch (e: Exception) { - println("[P2P Server] Fehler beim Dekodieren: ${e.message}") + // Prozessweiter, portbasierter Guard + if (!startedPorts.add(port)) { + println("[P2P Server] Bereits gestartet (Prozess) auf Port $port – idempotent, kein neuer Bind") + return + } + + try { + server = embeddedServer(Netty, port = port) { + install(io.ktor.server.websocket.WebSockets) + routing { + webSocket("/sync") { + println("[P2P Server] Neuer Peer verbunden") + activeSessions.add(this) + updatePeers() + try { + for (frame in incoming) { + if (frame is Frame.Text) { + val text = frame.readText() + try { + val event = Json.decodeFromString(text) + _incomingEvents.emit(event) + } catch (e: Exception) { + println("[P2P Server] Fehler beim Dekodieren: ${e.message}") + } } } + } finally { + activeSessions.remove(this) + updatePeers() + println("[P2P Server] Peer getrennt") } - } finally { - activeSessions.remove(this) - updatePeers() - println("[P2P Server] Peer getrennt") } } - } - }.start(wait = false) - println("[P2P Server] Gestartet auf Port $port") + }.start(wait = false) + currentPort = port + println("[P2P Server] Gestartet auf Port $port") + } catch (e: Exception) { + // Start fehlgeschlagen -> Port-Lock wieder freigeben + startedPorts.remove(port) + server = null + currentPort = null + println("[P2P Server] Start auf Port $port fehlgeschlagen: ${e.message}") + throw e + } } override fun stopServer() { - server?.stop(1000, 2000) - server = null + try { + server?.stop(1000, 2000) + } finally { + server = null + currentPort?.let { startedPorts.remove(it) } + currentPort = null + } } override suspend fun connectToPeer(host: String, port: Int) {