feat(core+frontend): add P2P sync infrastructure with WebSocket support
- **Core Updates:** - Implemented `P2pSyncService` interface with platform-specific WebSocket implementations (`JvmP2pSyncService` and no-op for JS). - Developed `SyncEvent` sealed class hierarchy to handle peer synchronization events (e.g., `PingEvent`, `PongEvent`, `DataChangedEvent`, etc.). - **Frontend Integration:** - Introduced `SyncManager` to manage peer discovery and synchronization, coupled with `NetworkDiscoveryService`. - Updated dependency injection to include `syncModule` for platform-specific sync service initialization. - Enhanced `BewerbViewModel` to support new sync capabilities, including observing sync events and UI updates for connected peers. - **Backend Enhancements:** - Added ZNS-specific fields (`zns_nummer`, `zns_abteilung`) to Bewerb table for idempotent imports. - Introduced import ZNS logic to handle duplicates and align with SyncManager updates. - **UI Improvements:** - Enhanced `TurnierBewerbeTab` with updated dialogs (ZNS imports, sync status) and dynamic previews. - Improved network syncing feedback and error handling in frontend components. - **DB Changes:** - Added migration for new column fields in the Bewerb table with relevant indexing for ZNS import optimizations.
This commit is contained in:
@@ -21,13 +21,21 @@ kotlin {
|
||||
implementation(libs.ktor.client.serialization.kotlinx.json)
|
||||
implementation(libs.ktor.client.auth)
|
||||
implementation(libs.ktor.client.logging)
|
||||
api(libs.ktor.client.websockets.common)
|
||||
implementation(libs.kotlinx.coroutines.core)
|
||||
implementation(libs.kotlinx.datetime)
|
||||
api(libs.koin.core)
|
||||
}
|
||||
|
||||
jvmMain.dependencies {
|
||||
implementation(libs.ktor.client.cio)
|
||||
implementation("org.jmdns:jmdns:3.5.5")
|
||||
implementation(libs.ktor.client.websockets)
|
||||
implementation(libs.ktor.server.core)
|
||||
implementation(libs.ktor.server.netty)
|
||||
implementation(libs.ktor.server.websockets)
|
||||
implementation(libs.ktor.server.contentNegotiation)
|
||||
implementation(libs.ktor.server.serialization.kotlinx.json)
|
||||
implementation(libs.jmdns)
|
||||
}
|
||||
|
||||
jsMain.dependencies {
|
||||
|
||||
+2
-1
@@ -11,6 +11,7 @@ import org.koin.core.module.Module
|
||||
import org.koin.core.qualifier.named
|
||||
import org.koin.dsl.module
|
||||
import at.mocode.frontend.core.network.discovery.discoveryModule
|
||||
import at.mocode.frontend.core.network.sync.syncModule
|
||||
|
||||
/**
|
||||
* Schnittstelle zur Token-Bereitstellung – entkoppelt core-network von core-auth.
|
||||
@@ -23,7 +24,7 @@ interface TokenProvider { fun getAccessToken(): String? }
|
||||
* - "apiClient": Konfigurierter Client für das API-Gateway (Auth-Header, Retry, Timeout)
|
||||
*/
|
||||
val networkModule: Module = module {
|
||||
includes(discoveryModule)
|
||||
includes(discoveryModule, syncModule)
|
||||
|
||||
// 1. Basis-Client (für Auth-Endpunkte, ohne Bearer-Token)
|
||||
single(named("baseHttpClient")) {
|
||||
|
||||
+38
@@ -0,0 +1,38 @@
|
||||
package at.mocode.frontend.core.network.sync
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
|
||||
/**
|
||||
* Interface für den P2P-Synchronisationsdienst.
|
||||
*/
|
||||
interface P2pSyncService {
|
||||
/**
|
||||
* Startet den Sync-Server auf dieser Instanz.
|
||||
*/
|
||||
fun startServer(port: Int)
|
||||
|
||||
/**
|
||||
* Stoppt den Sync-Server.
|
||||
*/
|
||||
fun stopServer()
|
||||
|
||||
/**
|
||||
* Verbindet sich mit einem anderen Peer.
|
||||
*/
|
||||
suspend fun connectToPeer(host: String, port: Int)
|
||||
|
||||
/**
|
||||
* Sendet ein Event an alle verbundenen Peers.
|
||||
*/
|
||||
suspend fun broadcastEvent(event: SyncEvent)
|
||||
|
||||
/**
|
||||
* Stream von eingehenden Events von anderen Peers.
|
||||
*/
|
||||
val incomingEvents: Flow<SyncEvent>
|
||||
|
||||
/**
|
||||
* Liste der aktuell verbundenen Peers (Host:Port).
|
||||
*/
|
||||
val connectedPeers: Flow<List<String>>
|
||||
}
|
||||
+53
@@ -0,0 +1,53 @@
|
||||
package at.mocode.frontend.core.network.sync
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
|
||||
/**
|
||||
* Basis-Interface für alle P2P-Sync-Nachrichten.
|
||||
*/
|
||||
@Serializable
|
||||
sealed interface SyncEvent {
|
||||
val timestamp: Long
|
||||
val senderId: String
|
||||
}
|
||||
|
||||
/**
|
||||
* Heartbeat-Event zur Überprüfung der Verbindung.
|
||||
*/
|
||||
@Serializable
|
||||
data class PingEvent(
|
||||
override val timestamp: Long,
|
||||
override val senderId: String
|
||||
) : SyncEvent
|
||||
|
||||
/**
|
||||
* Antwort auf ein Ping-Event.
|
||||
*/
|
||||
@Serializable
|
||||
data class PongEvent(
|
||||
override val timestamp: Long,
|
||||
override val senderId: String
|
||||
) : SyncEvent
|
||||
|
||||
/**
|
||||
* Ankündigung einer Datenänderung (z.B. neuer Bewerb oder Startliste).
|
||||
*/
|
||||
@Serializable
|
||||
data class DataChangedEvent(
|
||||
override val timestamp: Long,
|
||||
override val senderId: String,
|
||||
val entityType: String,
|
||||
val entityId: String,
|
||||
val operation: String // "CREATED", "UPDATED", "DELETED"
|
||||
) : SyncEvent
|
||||
|
||||
/**
|
||||
* Anforderung von Daten von einem Peer.
|
||||
*/
|
||||
@Serializable
|
||||
data class DataRequestEvent(
|
||||
override val timestamp: Long,
|
||||
override val senderId: String,
|
||||
val entityType: String,
|
||||
val entityId: String
|
||||
) : SyncEvent
|
||||
+48
@@ -0,0 +1,48 @@
|
||||
package at.mocode.frontend.core.network.sync
|
||||
|
||||
import at.mocode.frontend.core.network.discovery.NetworkDiscoveryService
|
||||
import kotlinx.coroutines.*
|
||||
|
||||
/**
|
||||
* Manager, der mDNS Discovery und P2P Sync verbindet.
|
||||
* Er lauscht auf neu entdeckte Dienste und baut automatisch Verbindungen auf.
|
||||
*/
|
||||
class SyncManager(
|
||||
private val discoveryService: NetworkDiscoveryService,
|
||||
private val syncService: P2pSyncService
|
||||
) {
|
||||
private val scope = CoroutineScope(SupervisorJob())
|
||||
private val knownPeers = mutableSetOf<String>()
|
||||
|
||||
fun start(port: Int) {
|
||||
// Eigenen Dienst registrieren und Server starten
|
||||
discoveryService.registerService(port)
|
||||
syncService.startServer(port)
|
||||
discoveryService.startDiscovery()
|
||||
|
||||
// Regelmäßig nach neuen Peers suchen und verbinden
|
||||
scope.launch {
|
||||
while (isActive) {
|
||||
val discovered = discoveryService.getDiscoveredServices()
|
||||
discovered.forEach { service ->
|
||||
val peerKey = "${service.host}:${service.port}"
|
||||
if (!knownPeers.contains(peerKey)) {
|
||||
// Prüfen, ob wir es nicht selbst sind (einfacher Check über Port,
|
||||
// in Realität über eine Node-ID im Metadata)
|
||||
// TODO: Node-ID Vergleich
|
||||
println("[SyncManager] Neuer Peer entdeckt: $peerKey. Verbinde...")
|
||||
syncService.connectToPeer(service.host, service.port)
|
||||
knownPeers.add(peerKey)
|
||||
}
|
||||
}
|
||||
delay(5000) // Alle 5 Sekunden prüfen
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
scope.cancel()
|
||||
discoveryService.stopDiscovery()
|
||||
syncService.stopServer()
|
||||
}
|
||||
}
|
||||
+8
@@ -0,0 +1,8 @@
|
||||
package at.mocode.frontend.core.network.sync
|
||||
|
||||
import org.koin.core.module.Module
|
||||
|
||||
/**
|
||||
* Erwartetes Koin-Modul für den P2P-Sync.
|
||||
*/
|
||||
expect val syncModule: Module
|
||||
+23
@@ -0,0 +1,23 @@
|
||||
package at.mocode.frontend.core.network.sync
|
||||
|
||||
import org.koin.core.module.Module
|
||||
import org.koin.dsl.module
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.emptyFlow
|
||||
|
||||
/**
|
||||
* JS-spezifische Implementierung (vorerst No-op).
|
||||
*/
|
||||
actual val syncModule: Module = module {
|
||||
single<P2pSyncService> { NoOpP2pSyncService() }
|
||||
single { SyncManager(get(), get()) }
|
||||
}
|
||||
|
||||
class NoOpP2pSyncService : P2pSyncService {
|
||||
override fun startServer(port: Int) {}
|
||||
override fun stopServer() {}
|
||||
override suspend fun connectToPeer(host: String, port: Int) {}
|
||||
override suspend fun broadcastEvent(event: SyncEvent) {}
|
||||
override val incomingEvents: Flow<SyncEvent> = emptyFlow()
|
||||
override val connectedPeers: Flow<List<String>> = emptyFlow()
|
||||
}
|
||||
+115
@@ -0,0 +1,115 @@
|
||||
package at.mocode.frontend.core.network.sync
|
||||
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.engine.*
|
||||
import io.ktor.server.netty.*
|
||||
import io.ktor.server.routing.*
|
||||
import io.ktor.server.websocket.*
|
||||
import io.ktor.websocket.*
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.plugins.websocket.*
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.decodeFromString
|
||||
import java.util.Collections
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class JvmP2pSyncService : P2pSyncService {
|
||||
private var server: EmbeddedServer<*, *>? = null
|
||||
private val client = HttpClient {
|
||||
install(io.ktor.client.plugins.websocket.WebSockets)
|
||||
}
|
||||
|
||||
private val _incomingEvents = MutableSharedFlow<SyncEvent>()
|
||||
override val incomingEvents: Flow<SyncEvent> = _incomingEvents.asSharedFlow()
|
||||
|
||||
private val activeSessions = Collections.synchronizedSet(LinkedHashSet<DefaultWebSocketSession>())
|
||||
private val _connectedPeers = MutableStateFlow<List<String>>(emptyList())
|
||||
override val connectedPeers: Flow<List<String>> = _connectedPeers.asStateFlow()
|
||||
|
||||
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
|
||||
|
||||
override fun startServer(port: Int) {
|
||||
if (server != null) return
|
||||
|
||||
server = embeddedServer(Netty, port = port) {
|
||||
install(io.ktor.server.websocket.WebSockets)
|
||||
routing {
|
||||
webSocket("/sync") {
|
||||
println("[P2P Server] Neuer Peer verbunden")
|
||||
activeSessions.add(this)
|
||||
updatePeers()
|
||||
try {
|
||||
for (frame in incoming) {
|
||||
if (frame is Frame.Text) {
|
||||
val text = frame.readText()
|
||||
try {
|
||||
val event = Json.decodeFromString<SyncEvent>(text)
|
||||
_incomingEvents.emit(event)
|
||||
} catch (e: Exception) {
|
||||
println("[P2P Server] Fehler beim Dekodieren: ${e.message}")
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
activeSessions.remove(this)
|
||||
updatePeers()
|
||||
println("[P2P Server] Peer getrennt")
|
||||
}
|
||||
}
|
||||
}
|
||||
}.start(wait = false)
|
||||
println("[P2P Server] Gestartet auf Port $port")
|
||||
}
|
||||
|
||||
override fun stopServer() {
|
||||
server?.stop(1000, 2000)
|
||||
server = null
|
||||
}
|
||||
|
||||
override suspend fun connectToPeer(host: String, port: Int) {
|
||||
scope.launch {
|
||||
try {
|
||||
client.webSocket(host = host, port = port, path = "/sync") {
|
||||
println("[P2P Client] Verbunden mit $host:$port")
|
||||
activeSessions.add(this)
|
||||
updatePeers()
|
||||
try {
|
||||
for (frame in incoming) {
|
||||
if (frame is Frame.Text) {
|
||||
val text = frame.readText()
|
||||
val event = Json.decodeFromString<SyncEvent>(text)
|
||||
_incomingEvents.emit(event)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
activeSessions.remove(this)
|
||||
updatePeers()
|
||||
println("[P2P Client] Verbindung zu $host:$port beendet")
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
println("[P2P Client] Fehler bei Verbindung zu $host:$port: ${e.message}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun broadcastEvent(event: SyncEvent) {
|
||||
val text = Json.encodeToString(event)
|
||||
activeSessions.toList().forEach { session ->
|
||||
try {
|
||||
session.send(Frame.Text(text))
|
||||
} catch (e: Exception) {
|
||||
println("[P2P] Fehler beim Senden an Session: ${e.message}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun updatePeers() {
|
||||
// Da wir keine einfachen IPs in den Sessions haben ohne tieferes Casting,
|
||||
// nutzen wir hier erst mal einen Platzhalter oder zählen nur.
|
||||
_connectedPeers.value = activeSessions.map { "Peer-${it.hashCode()}" }
|
||||
}
|
||||
}
|
||||
+12
@@ -0,0 +1,12 @@
|
||||
package at.mocode.frontend.core.network.sync
|
||||
|
||||
import org.koin.core.module.Module
|
||||
import org.koin.dsl.module
|
||||
|
||||
/**
|
||||
* JVM-spezifische Implementierung des SyncModules.
|
||||
*/
|
||||
actual val syncModule: Module = module {
|
||||
single<P2pSyncService> { JvmP2pSyncService() }
|
||||
single { SyncManager(get(), get()) }
|
||||
}
|
||||
Reference in New Issue
Block a user