feat(desktop, network): Chat-Funktion hinzugefügt und P2P-Sync verbessert

Signed-off-by: Stefan Mogeritsch <stefan.mo.co@gmail.com>
This commit is contained in:
2026-05-11 13:57:49 +02:00
parent 1a4753cd73
commit e389fe9bce
17 changed files with 514 additions and 426 deletions
@@ -9,79 +9,79 @@ import javax.crypto.spec.IvParameterSpec
import javax.crypto.spec.SecretKeySpec
class FileBackupService(private val deviceName: String) : BackupService {
private val json = Json { prettyPrint = true }
private val json = Json { prettyPrint = true }
override fun exportDelta(data: String, targetPath: String, sharedKey: String): Result<String> {
return try {
val timestamp = System.currentTimeMillis()
val checksum = calculateChecksum(data)
val payload = BackupPayload(timestamp, deviceName, data, checksum)
val jsonContent = json.encodeToString(payload)
override fun exportDelta(data: String, targetPath: String, sharedKey: String): Result<String> {
return try {
val timestamp = System.currentTimeMillis()
val checksum = calculateChecksum(data)
val payload = BackupPayload(timestamp, deviceName, data, checksum)
val jsonContent = json.encodeToString(payload)
val encryptedData = encrypt(jsonContent, sharedKey)
val encryptedData = encrypt(jsonContent, sharedKey)
val dir = File(targetPath)
if (!dir.exists()) dir.mkdirs()
val dir = File(targetPath)
if (!dir.exists()) dir.mkdirs()
val fileName = "delta_${timestamp}_${deviceName}.msbackup"
val file = File(dir, fileName)
file.writeText(encryptedData)
val fileName = "delta_${timestamp}_${deviceName}.msbackup"
val file = File(dir, fileName)
file.writeText(encryptedData)
println("[Plan-USB] Export erfolgreich: ${file.absolutePath}")
Result.success(file.absoluteName)
} catch (e: Exception) {
println("[Plan-USB] Export fehlgeschlagen: ${e.message}")
Result.failure(e)
}
println("[Plan-USB] Export erfolgreich: ${file.absolutePath}")
Result.success(file.absoluteName)
} catch (e: Exception) {
println("[Plan-USB] Export fehlgeschlagen: ${e.message}")
Result.failure(e)
}
}
override fun importDelta(filePath: String, sharedKey: String): Result<String> {
return try {
val file = File(filePath)
val encryptedData = file.readText()
val jsonContent = decrypt(encryptedData, sharedKey)
val payload = json.decodeFromString<BackupPayload>(jsonContent)
override fun importDelta(filePath: String, sharedKey: String): Result<String> {
return try {
val file = File(filePath)
val encryptedData = file.readText()
val jsonContent = decrypt(encryptedData, sharedKey)
val payload = json.decodeFromString<BackupPayload>(jsonContent)
if (calculateChecksum(payload.data) != payload.checksum) {
throw Exception("Checksummenfehler: Daten wurden möglicherweise manipuliert.")
}
if (calculateChecksum(payload.data) != payload.checksum) {
throw Exception("Checksummenfehler: Daten wurden möglicherweise manipuliert.")
}
println("[Plan-USB] Import erfolgreich von ${payload.deviceName}")
Result.success(payload.data)
} catch (e: Exception) {
println("[Plan-USB] Import fehlgeschlagen: ${e.message}")
Result.failure(e)
}
println("[Plan-USB] Import erfolgreich von ${payload.deviceName}")
Result.success(payload.data)
} catch (e: Exception) {
println("[Plan-USB] Import fehlgeschlagen: ${e.message}")
Result.failure(e)
}
}
private fun calculateChecksum(data: String): String {
val bytes = MessageDigest.getInstance("SHA-256").digest(data.toByteArray())
return bytes.joinToString("") { "%02x".format(it) }
}
private fun calculateChecksum(data: String): String {
val bytes = MessageDigest.getInstance("SHA-256").digest(data.toByteArray())
return bytes.joinToString("") { "%02x".format(it) }
}
private fun encrypt(data: String, key: String): String {
val secretKey = generateKey(key)
val cipher = Cipher.getInstance("AES/CBC/PKCS5Padding")
val iv = IvParameterSpec(ByteArray(16)) // Vereinfacht für PoC
cipher.init(Cipher.ENCRYPT_MODE, secretKey, iv)
val encrypted = cipher.doFinal(data.toByteArray())
return Base64.getEncoder().encodeToString(encrypted)
}
private fun encrypt(data: String, key: String): String {
val secretKey = generateKey(key)
val cipher = Cipher.getInstance("AES/CBC/PKCS5Padding")
val iv = IvParameterSpec(ByteArray(16)) // Vereinfacht für PoC
cipher.init(Cipher.ENCRYPT_MODE, secretKey, iv)
val encrypted = cipher.doFinal(data.toByteArray())
return Base64.getEncoder().encodeToString(encrypted)
}
private fun decrypt(encrypted: String, key: String): String {
val secretKey = generateKey(key)
val cipher = Cipher.getInstance("AES/CBC/PKCS5Padding")
val iv = IvParameterSpec(ByteArray(16))
cipher.init(Cipher.DECRYPT_MODE, secretKey, iv)
val decrypted = cipher.doFinal(Base64.getDecoder().decode(encrypted))
return String(decrypted)
}
private fun decrypt(encrypted: String, key: String): String {
val secretKey = generateKey(key)
val cipher = Cipher.getInstance("AES/CBC/PKCS5Padding")
val iv = IvParameterSpec(ByteArray(16))
cipher.init(Cipher.DECRYPT_MODE, secretKey, iv)
val decrypted = cipher.doFinal(Base64.getDecoder().decode(encrypted))
return String(decrypted)
}
private fun generateKey(key: String): SecretKeySpec {
val sha = MessageDigest.getInstance("SHA-256")
val keyBytes = sha.digest(key.toByteArray()).copyOf(16) // AES-128 für Kompatibilität
return SecretKeySpec(keyBytes, "AES")
}
private fun generateKey(key: String): SecretKeySpec {
val sha = MessageDigest.getInstance("SHA-256")
val keyBytes = sha.digest(key.toByteArray()).copyOf(16) // AES-128 für Kompatibilität
return SecretKeySpec(keyBytes, "AES")
}
}
private val File.absoluteName: String get() = this.name
@@ -9,6 +9,6 @@ import org.koin.dsl.module
* JVM-spezifische Implementierung des DiscoveryModules.
*/
actual val discoveryModule: Module = module {
single<NetworkDiscoveryService> { JmDnsDiscoveryService() }
single<BackupService> { (deviceName: String) -> FileBackupService(deviceName) }
single<NetworkDiscoveryService> { JmDnsDiscoveryService() }
single<BackupService> { (deviceName: String) -> FileBackupService(deviceName) }
}
@@ -22,8 +22,10 @@ class JmDnsDiscoveryService : NetworkDiscoveryService {
private val registeredSet = ConcurrentHashMap.newKeySet<String>() // key: "${name}@${addr.hostAddress}:$port"
// Debounce/Guards
@Volatile private var lastStartRequestedAt: Long = 0L
@Volatile private var lastStartIp: String? = null
@Volatile
private var lastStartRequestedAt: Long = 0L
@Volatile
private var lastStartIp: String? = null
private val _discoveredServices = MutableStateFlow<List<DiscoveredService>>(emptyList())
override val discoveredServices: StateFlow<List<DiscoveredService>> = _discoveredServices.asStateFlow()
@@ -149,7 +151,10 @@ class JmDnsDiscoveryService : NetworkDiscoveryService {
val name = iface.name.lowercase()
// Filtere Docker/Bridged/VETH/VM-Schnittstellen heraus
if (iface.isLoopback || !iface.isUp || iface.isVirtual) continue
if (name.startsWith("br-") || name.startsWith("docker") || name.startsWith("veth") || name.contains("vmnet") || name.contains("virbr")) continue
if (name.startsWith("br-") || name.startsWith("docker") || name.startsWith("veth") || name.contains("vmnet") || name.contains(
"virbr"
)
) continue
val inetAddresses = iface.inetAddresses
while (inetAddresses.hasMoreElements()) {
@@ -172,7 +177,8 @@ class JmDnsDiscoveryService : NetworkDiscoveryService {
// Bevorzuge private LAN IPv4 (192.168.x.x, 10.x.x.x, 172.16-31.x.x)
fun isPrivateIPv4(a: InetAddress): Boolean {
val h = a.hostAddress
return h.startsWith("192.168.") || h.startsWith("10.") || (h.startsWith("172.") && h.split('.').getOrNull(1)?.toIntOrNull() in 16..31)
return h.startsWith("192.168.") || h.startsWith("10.") || (h.startsWith("172.") && h.split('.').getOrNull(1)
?.toIntOrNull() in 16..31)
}
return addresses.sortedWith(compareByDescending<InetAddress> { isPrivateIPv4(it) }
.thenBy { it.hostAddress })
@@ -2,145 +2,181 @@ package at.mocode.frontend.core.network.sync
import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration.Companion.milliseconds
/**
* JVM-spezifische Implementierung des P2pSyncService mit Fokus auf Stabilität.
* Beinhaltet Reconnection-Logik, Heartbeats und robustes Session-Management.
*/
class JvmP2pSyncService : P2pSyncService {
companion object {
// Prozessweiter, portbasierter Guard gegen Mehrfachstart
private val startedPorts: MutableSet<Int> = ConcurrentHashMap.newKeySet()
companion object {
private val startedPorts: MutableSet<Int> = ConcurrentHashMap.newKeySet()
private const val RECONNECT_DELAY_MS = 3000L
private const val PING_INTERVAL_MS = 5000L
private const val PING_TIMEOUT_MS = 10000L
}
private var server: EmbeddedServer<*, *>? = null
private var currentPort: Int? = null
private val client = HttpClient {
install(WebSockets) {
pingInterval = PING_INTERVAL_MS.milliseconds
}
private var server: EmbeddedServer<*, *>? = null
private var currentPort: Int? = null
private val client = HttpClient {
install(io.ktor.client.plugins.websocket.WebSockets)
}
private val _incomingEvents = MutableSharedFlow<SyncEvent>(extraBufferCapacity = 64)
override val incomingEvents: Flow<SyncEvent> = _incomingEvents.asSharedFlow()
private val activeSessions = Collections.synchronizedSet(LinkedHashSet<DefaultWebSocketSession>())
private val _connectedPeers = MutableStateFlow<List<String>>(emptyList())
override val connectedPeers: Flow<List<String>> = _connectedPeers.asStateFlow()
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val connectionJobs = ConcurrentHashMap<String, Job>()
override fun startServer(port: Int) {
if (server != null) {
println("[P2P Server] Bereits aktiv auf Port ${currentPort ?: port}")
return
}
private val _incomingEvents = MutableSharedFlow<SyncEvent>()
override val incomingEvents: Flow<SyncEvent> = _incomingEvents.asSharedFlow()
private val activeSessions = Collections.synchronizedSet(LinkedHashSet<DefaultWebSocketSession>())
private val _connectedPeers = MutableStateFlow<List<String>>(emptyList())
override val connectedPeers: Flow<List<String>> = _connectedPeers.asStateFlow()
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
override fun startServer(port: Int) {
// Instanz-Guard (gleiche Instanz)
if (server != null) {
println("[P2P Server] Bereits gestartet (Instanz) auf Port ${currentPort ?: port} idempotent")
return
}
// Prozessweiter, portbasierter Guard
println("[P2P Server] Versuche Port $port zu reservieren...")
if (!startedPorts.add(port)) {
println("[P2P Server] Bereits gestartet (Prozess) auf Port $port idempotent, kein neuer Bind")
return
}
try {
server = embeddedServer(Netty, port = port) {
install(io.ktor.server.websocket.WebSockets)
routing {
webSocket("/sync") {
println("[P2P Server] Neuer Peer verbunden")
activeSessions.add(this)
updatePeers()
try {
for (frame in incoming) {
if (frame is Frame.Text) {
val text = frame.readText()
try {
val event = Json.decodeFromString<SyncEvent>(text)
_incomingEvents.emit(event)
} catch (e: Exception) {
println("[P2P Server] Fehler beim Dekodieren: ${e.message}")
}
}
}
} finally {
activeSessions.remove(this)
updatePeers()
println("[P2P Server] Peer getrennt")
}
}
}
}.start(wait = false)
currentPort = port
} catch (e: Exception) {
// Start fehlgeschlagen -> Port-Lock wieder freigeben
startedPorts.remove(port)
server = null
currentPort = null
println("[P2P Server] Start auf Port $port fehlgeschlagen: ${e.message}")
throw e
}
if (!startedPorts.add(port)) {
println("[P2P Server] Port $port wird bereits von einer anderen Instanz genutzt.")
return
}
override fun stopServer() {
try {
server?.stop(1000, 2000)
} finally {
server = null
currentPort?.let { startedPorts.remove(it) }
currentPort = null
try {
server = embeddedServer(Netty, port = port, host = "0.0.0.0") {
install(io.ktor.server.websocket.WebSockets) {
pingPeriod = PING_INTERVAL_MS.milliseconds
timeout = PING_TIMEOUT_MS.milliseconds
}
}
override suspend fun connectToPeer(host: String, port: Int) {
scope.launch {
routing {
webSocket("/sync") {
val remote = call.request.local.remoteAddress
println("[P2P Server] Neuer Peer verbunden: $remote")
activeSessions.add(this)
updatePeers()
try {
client.webSocket(host = host, port = port, path = "/sync") {
println("[P2P Client] Verbunden mit $host:$port")
activeSessions.add(this)
updatePeers()
try {
for (frame in incoming) {
if (frame is Frame.Text) {
val text = frame.readText()
val event = Json.decodeFromString<SyncEvent>(text)
_incomingEvents.emit(event)
}
}
} finally {
activeSessions.remove(this)
updatePeers()
println("[P2P Client] Verbindung zu $host:$port beendet")
}
for (frame in incoming) {
if (frame is Frame.Text) {
val text = frame.readText()
try {
val event = Json.decodeFromString<SyncEvent>(text)
_incomingEvents.emit(event)
} catch (ex: Exception) {
println("[P2P Server] Fehler beim Dekodieren von $remote: ${ex.message}")
}
}
} catch (e: Exception) {
println("[P2P Client] Fehler bei Verbindung zu $host:$port: ${e.message}")
}
} catch (ex: Exception) {
println("[P2P Server] Verbindung zu $remote unterbrochen: ${ex.message}")
} finally {
activeSessions.remove(this)
updatePeers()
println("[P2P Server] Peer $remote getrennt")
}
}
}
}.start(wait = false)
currentPort = port
println("[P2P Server] Erfolgreich gestartet auf Port $port")
} catch (ex: Exception) {
startedPorts.remove(port)
server = null
currentPort = null
println("[P2P Server] Fehler beim Starten des Servers auf Port $port: ${ex.message}")
throw ex
}
}
override suspend fun broadcastEvent(event: SyncEvent) {
val text = Json.encodeToString(event)
activeSessions.toList().forEach { session ->
override fun stopServer() {
connectionJobs.values.forEach { it.cancel() }
connectionJobs.clear()
try {
server?.stop(1000, 2000)
} finally {
server = null
currentPort?.let { startedPorts.remove(it) }
currentPort = null
println("[P2P Server] Server gestoppt.")
}
}
override suspend fun connectToPeer(host: String, port: Int) {
val peerKey = "$host:$port"
connectionJobs[peerKey]?.cancel()
val job = scope.launch {
while (isActive) {
try {
println("[P2P Client] Verbindungsversuch zu $peerKey...")
client.webSocket(host = host, port = port, path = "/sync") {
println("[P2P Client] Verbunden mit $peerKey")
activeSessions.add(this)
updatePeers()
try {
session.send(Frame.Text(text))
} catch (e: Exception) {
println("[P2P] Fehler beim Senden an Session: ${e.message}")
for (frame in incoming) {
if (frame is Frame.Text) {
val text = frame.readText()
val event = Json.decodeFromString<SyncEvent>(text)
_incomingEvents.emit(event)
}
}
} catch (ex: Exception) {
println("[P2P Client] Verbindung zu $peerKey abgebrochen: ${ex.message}")
} finally {
activeSessions.remove(this)
updatePeers()
println("[P2P Client] Session mit $peerKey beendet.")
}
}
} catch (ex: Exception) {
println("[P2P Client] Konnte keine Verbindung zu $peerKey herstellen: ${ex.message}")
}
}
private fun updatePeers() {
// Da wir keine einfachen IPs in den Sessions haben ohne tieferes Casting,
// nutzen wir hier erst mal einen Platzhalter oder zählen nur.
_connectedPeers.value = activeSessions.map { "Peer-${it.hashCode()}" }
if (isActive) {
println("[P2P Client] Erneuter Versuch für $peerKey in ${RECONNECT_DELAY_MS}ms...")
delay(RECONNECT_DELAY_MS.milliseconds)
}
}
}
connectionJobs[peerKey] = job
}
override suspend fun broadcastEvent(event: SyncEvent) {
val text = Json.encodeToString(event)
val sessions = activeSessions.toList()
sessions.forEach { session ->
try {
if (session.isActive) {
session.send(Frame.Text(text))
}
} catch (_: Exception) {
// Session wird durch Heartbeat/Loop automatisch bereinigt
}
}
}
private fun updatePeers() {
_connectedPeers.value = activeSessions.map { session ->
when (session) {
is DefaultWebSocketServerSession -> session.call.request.local.remoteAddress
else -> "Outgoing-Peer"
}
}.distinct()
}
}
@@ -7,6 +7,6 @@ import org.koin.dsl.module
* JVM-spezifische Implementierung des SyncModules.
*/
actual val syncModule: Module = module {
single<P2pSyncService> { JvmP2pSyncService() }
single { SyncManager(get(), get()) }
single<P2pSyncService> { JvmP2pSyncService() }
single { SyncManager(get(), get()) }
}
@@ -5,33 +5,33 @@ import kotlin.test.Test
class JvmP2pSyncServiceTest {
@Test
fun starting_server_twice_on_same_port_should_not_fail_but_use_guard() = runTest {
val service1 = JvmP2pSyncService()
val service2 = JvmP2pSyncService()
val port = 9091
@Test
fun starting_server_twice_on_same_port_should_not_fail_but_use_guard() = runTest {
val service1 = JvmP2pSyncService()
val service2 = JvmP2pSyncService()
val port = 9091
try {
service1.startServer(port)
// Second start should just return/log and not throw an exception (idempotent)
service2.startServer(port)
} finally {
service1.stopServer()
service2.stopServer()
}
try {
service1.startServer(port)
// Second start should just return/log and not throw an exception (idempotent)
service2.startServer(port)
} finally {
service1.stopServer()
service2.stopServer()
}
}
@Test
fun stopping_server_should_release_port_lock() = runTest {
val service1 = JvmP2pSyncService()
val service2 = JvmP2pSyncService()
val port = 9092
@Test
fun stopping_server_should_release_port_lock() = runTest {
val service1 = JvmP2pSyncService()
val service2 = JvmP2pSyncService()
val port = 9092
service1.startServer(port)
service1.stopServer()
service1.startServer(port)
service1.stopServer()
// After stopping, starting again on same port (even from different instance) should work
service2.startServer(port)
service2.stopServer()
}
// After stopping, starting again on same port (even from different instance) should work
service2.startServer(port)
service2.stopServer()
}
}
@@ -1,23 +1,23 @@
package at.mocode.frontend.core.network.sync
import org.koin.core.module.Module
import org.koin.dsl.module
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import org.koin.core.module.Module
import org.koin.dsl.module
/**
* Wasm-spezifische Implementierung (vorerst No-op).
*/
actual val syncModule: Module = module {
single<P2pSyncService> { NoOpP2pSyncService() }
single { SyncManager(get(), get()) }
single<P2pSyncService> { NoOpP2pSyncService() }
single { SyncManager(get(), get()) }
}
class NoOpP2pSyncService : P2pSyncService {
override fun startServer(port: Int) {}
override fun stopServer() {}
override suspend fun connectToPeer(host: String, port: Int) {}
override suspend fun broadcastEvent(event: SyncEvent) {}
override val incomingEvents: Flow<SyncEvent> = emptyFlow()
override val connectedPeers: Flow<List<String>> = emptyFlow()
override fun startServer(port: Int) {}
override fun stopServer() {}
override suspend fun connectToPeer(host: String, port: Int) {}
override suspend fun broadcastEvent(event: SyncEvent) {}
override val incomingEvents: Flow<SyncEvent> = emptyFlow()
override val connectedPeers: Flow<List<String>> = emptyFlow()
}