/*
* HexDroidIRC - An IRC Client for Android
* Copyright (C) 2026 boxlabs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
package com.boxlabs.hexdroid
import android.content.ContentValues
import android.content.Context
import android.net.Uri
import android.os.Build
import android.os.Environment
import android.provider.MediaStore
import androidx.documentfile.provider.DocumentFile
import kotlin.concurrent.thread
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.io.BufferedOutputStream
import java.io.File
import java.io.FileOutputStream
import java.io.IOException
import java.io.OutputStream
import java.nio.ByteOrder
import java.net.Inet4Address
import java.net.NetworkInterface
import java.net.ServerSocket
import java.net.Socket
import java.nio.ByteBuffer
import java.util.Collections
import java.util.concurrent.atomic.AtomicLong
/**
* Parsed CTCP DCC SEND offer.
*
* Supports active DCC and passive/reverse DCC (token + port 0 handshake).
*/
data class DccOffer(
/** Network id this offer belongs to (filled by the ViewModel). */
val netId: String = "",
val from: String,
val filename: String,
val ip: String,
val port: Int,
val size: Long,
/** Passive/reverse DCC token (HexChat "Passive DCC"). */
val token: Long? = null,
/** Turbo DCC / TSEND: do not send ACKs back to the sender. */
val turbo: Boolean = false,
/** SDCC / SSEND: transfer is TLS-wrapped. */
val secure: Boolean = false,
) {
val isPassive: Boolean get() = port == 0 && token != null
}
/** Parsed CTCP DCC CHAT offer. */
data class DccChatOffer(
/** Network id this offer belongs to (filled by the ViewModel). */
val netId: String = "",
val from: String,
val protocol: String = "chat",
val ip: String,
val port: Int,
/** Passive/reverse token (rare; included for forward compatibility). */
val token: Long? = null,
/** SDCC / SCHAT: session is TLS-wrapped. */
val secure: Boolean = false,
) {
val isPassive: Boolean get() = port == 0 && token != null
}
/**
* Thrown when a DCC receive ends with fewer bytes than the offer advertised. Distinct from
* a generic [IOException] so the ViewModel can surface "incomplete" specifically (e.g. offer
* a retry) rather than treating it like a connection error. Carries the byte counts for the
* UI message.
*/
class DccIncompleteException(
val received: Long,
val expected: Long,
) : IOException("DCC transfer incomplete: received $received of $expected bytes (the sender closed the connection early)")
sealed class DccTransferState {
data class Incoming(
val offer: DccOffer,
val received: Long = 0,
val startTimeMs: Long = System.currentTimeMillis(),
val done: Boolean = false,
val error: String? = null,
val savedPath: String? = null,
/** Epoch ms when the transfer finished; null while still in progress.
* Stored so the completed avg-speed display uses the actual transfer
* duration rather than ever-growing wall-clock time after completion. */
val endTimeMs: Long? = null
) : DccTransferState()
data class Outgoing(
val target: String,
val filename: String,
val fileSize: Long = 0, // Total bytes; 0 = unknown (e.g. stream source)
val bytesSent: Long = 0,
val startTimeMs: Long = System.currentTimeMillis(),
val done: Boolean = false,
val error: String? = null,
/** Epoch ms when the transfer finished; null while in progress. See [Incoming.endTimeMs]. */
val endTimeMs: Long? = null
) : DccTransferState()
}
/**
* DCC Manager handles file send/receive and DCC CHAT socket lifecycle.
*
* Security notes:
* - [bindFirstAvailable] iterates ports min..max; each failed `ServerSocket()` call cleans
* up its own OS resources before throwing, so there is no leak on the error path.
* - Passive DCC offers are validated: port must be 0 AND token must be present. Malformed
* offers (port=0, no token) throw [IllegalArgumentException] before any socket is opened.
* - Turbo DCC (TSEND) skips per-chunk ACKs; the caller is responsible for confirming file
* integrity out-of-band (e.g. hash check).
* - Remote IPs are validated before connecting: loopback, link-local, wildcard, and multicast
* addresses are rejected. RFC-1918 private addresses are intentionally allowed to support
* LAN DCC between devices on the same network.
*/
class DccManager(ctx: Context) {
// Avoid leaking an Activity context.
private val ctx: Context = ctx.applicationContext
/**
* Wraps a plain socket with TLS for SDCC (Secure DCC).
* Uses the default SSLSocketFactory which trusts system CAs.
* SDCC peers are often self-signed; we intentionally skip hostname verification
* since DCC is IP-addressed, not hostname-addressed. The encryption still provides
* confidentiality against passive eavesdroppers.
*/
private fun wrapTls(sock: Socket, host: String): Socket {
val sf = javax.net.ssl.SSLSocketFactory.getDefault() as javax.net.ssl.SSLSocketFactory
val ssl = sf.createSocket(sock, host, sock.port, true) as javax.net.ssl.SSLSocket
// Disable hostname verification; DCC uses raw IPs.
ssl.sslParameters = ssl.sslParameters.also { it.endpointIdentificationAlgorithm = null }
ssl.startHandshake()
return ssl
}
/**
* Rejects IPs that are structurally invalid targets for a DCC connection.
*
* RFC-1918 private addresses (192.168.x.x, 10.x.x.x, 172.16-31.x.x) are intentionally
* allowed here — LAN DCC between devices on the same network is a normal and common use
* case (e.g. sharing files with someone on the same Wi-Fi). The SSRF risk for these
* addresses is low because DCC is a raw TCP connection, not an HTTP request, so it cannot
* be easily weaponised to exfiltrate data from a local HTTP service.
*
* We still block:
* - Loopback (127.x.x.x / ::1): no legitimate DCC offer uses localhost.
* - Link-local (169.254.x.x / fe80::): APIPA / router-link addresses; not routable.
* - Wildcard (0.0.0.0 / ::): meaningless as a connect target.
* - Multicast: not a unicast endpoint.
*/
private fun validateRemoteIp(ip: String) {
val addr = try {
java.net.InetAddress.getByName(ip)
} catch (_: Throwable) {
throw IllegalArgumentException("DCC: invalid IP address: $ip")
}
if (addr.isLoopbackAddress)
throw IllegalArgumentException("DCC: refusing connection to loopback address $ip")
if (addr.isLinkLocalAddress)
throw IllegalArgumentException("DCC: refusing connection to link-local address $ip")
if (addr.isAnyLocalAddress)
throw IllegalArgumentException("DCC: refusing connection to wildcard address $ip")
if (addr.isMulticastAddress)
throw IllegalArgumentException("DCC: refusing connection to multicast address $ip")
}
/**
* Internal fallback directory for DCC files (used when external storage is unavailable).
*/
private fun internalDccDir(): File = File(ctx.filesDir, "dcc").apply { mkdirs() }
/**
* Get the default Downloads directory.
*/
@Suppress("DEPRECATION")
private fun defaultDownloadsDir(): File {
return Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS)
}
/**
* Create a safe output file for an incoming DCC transfer.
*
* @param displayName The filename from the DCC offer
* @param customFolderUri Optional SAF URI for custom download folder
* @return Pair of (OutputStream to write to, display path for UI)
*/
fun createDccOutputStream(displayName: String, customFolderUri: String?): Pair {
val safeName = sanitizeFilename(displayName)
// If custom folder is set, use SAF
if (!customFolderUri.isNullOrBlank()) {
return createSafOutputStream(safeName, customFolderUri)
}
// On Android 10+, use MediaStore for Downloads
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) {
return createMediaStoreOutputStream(safeName)
}
// On older Android, write directly to Downloads folder
return createLegacyOutputStream(safeName)
}
private fun sanitizeFilename(displayName: String): String {
val base = displayName.substringAfterLast('/').substringAfterLast('\\').trim()
return base
.replace(Regex("[\\/:\\*?\"<>|]"), "_")
.replace("\u0000", "")
.let { if (it == "." || it == ".." || it.isBlank()) "dcc_download.bin" else it }
.take(120)
}
private fun createSafOutputStream(filename: String, folderUri: String): Pair {
val treeUri = Uri.parse(folderUri)
val tree = DocumentFile.fromTreeUri(ctx, treeUri)
?: throw IOException("Cannot access folder: $folderUri")
// Find unique filename
var finalName = filename
var counter = 1
while (tree.findFile(finalName) != null) {
val stem = filename.substringBeforeLast('.', filename)
val ext = filename.substringAfterLast('.', "")
finalName = if (ext.isNotBlank()) "$stem ($counter).$ext" else "$stem ($counter)"
counter++
if (counter > 999) {
finalName = "${stem}_${System.currentTimeMillis()}.${ext.ifBlank { "bin" }}"
break
}
}
val mimeType = "application/octet-stream"
val newFile = tree.createFile(mimeType, finalName)
?: throw IOException("Failed to create file: $finalName")
val outputStream = ctx.contentResolver.openOutputStream(newFile.uri)
?: throw IOException("Failed to open output stream for: $finalName")
return Pair(outputStream, newFile.uri.toString())
}
private fun createMediaStoreOutputStream(filename: String): Pair {
val resolver = ctx.contentResolver
// Find unique filename
var finalName = filename
var counter = 1
while (true) {
val exists = resolver.query(
MediaStore.Downloads.EXTERNAL_CONTENT_URI,
arrayOf(MediaStore.Downloads._ID),
"${MediaStore.Downloads.DISPLAY_NAME} = ?",
arrayOf(finalName),
null
)?.use { it.count > 0 } ?: false
if (!exists) break
val stem = filename.substringBeforeLast('.', filename)
val ext = filename.substringAfterLast('.', "")
finalName = if (ext.isNotBlank()) "$stem ($counter).$ext" else "$stem ($counter)"
counter++
if (counter > 999) {
finalName = "${stem}_${System.currentTimeMillis()}.${ext.ifBlank { "bin" }}"
break
}
}
val values = ContentValues().apply {
put(MediaStore.Downloads.DISPLAY_NAME, finalName)
put(MediaStore.Downloads.MIME_TYPE, "application/octet-stream")
put(MediaStore.Downloads.IS_PENDING, 1)
}
val uri = resolver.insert(MediaStore.Downloads.EXTERNAL_CONTENT_URI, values)
?: throw IOException("Failed to create MediaStore entry for: $finalName")
val outputStream = resolver.openOutputStream(uri)
?: throw IOException("Failed to open output stream for: $finalName")
// Return a wrapper that marks IS_PENDING = 0 when closed
val wrappedStream = object : OutputStream() {
override fun write(b: Int) = outputStream.write(b)
override fun write(b: ByteArray) = outputStream.write(b)
override fun write(b: ByteArray, off: Int, len: Int) = outputStream.write(b, off, len)
override fun flush() = outputStream.flush()
override fun close() {
outputStream.close()
val updateValues = ContentValues().apply {
put(MediaStore.Downloads.IS_PENDING, 0)
}
resolver.update(uri, updateValues, null, null)
}
}
// Return the content:// URI string (not a display path) so shareFile can
// resolve and open the file via ContentResolver on Android 10+.
return Pair(wrappedStream, uri.toString())
}
@Suppress("DEPRECATION")
private fun createLegacyOutputStream(filename: String): Pair {
val downloadsDir = defaultDownloadsDir()
downloadsDir.mkdirs()
// Find unique filename
var file = File(downloadsDir, filename)
var counter = 1
while (file.exists()) {
val stem = filename.substringBeforeLast('.', filename)
val ext = filename.substringAfterLast('.', "")
val newName = if (ext.isNotBlank()) "$stem ($counter).$ext" else "$stem ($counter)"
file = File(downloadsDir, newName)
counter++
if (counter > 999) {
file = File(downloadsDir, "${stem}_${System.currentTimeMillis()}.${ext.ifBlank { "bin" }}")
break
}
}
return Pair(FileOutputStream(file), file.absolutePath)
}
// local IPv4 in dotted notation
fun localIpv4OrNull(): String? {
return try {
val ifaces = Collections.list(NetworkInterface.getNetworkInterfaces())
for (iface in ifaces) {
if (!iface.isUp || iface.isLoopback) continue
val addrs = Collections.list(iface.inetAddresses)
for (a in addrs) {
if (a is Inet4Address && !a.isLoopbackAddress && !a.isLinkLocalAddress) {
return a.hostAddress
}
}
}
null
} catch (_: Throwable) {
null
}
}
// local IPv4 as an unsigned 32-bit integer (decimal string in CTCP)
fun localIpv4AsInt(): Long {
val ip = localIpv4OrNull() ?: return 0L
return ipv4ToLongBestEffort(ip)
}
/**
* Standard DCC RECEIVE (we connect to sender's ip:port).
*
* @param customFolderUri Optional SAF URI for custom download folder (null = Downloads)
* @return The path/URI where the file was saved
*/
suspend fun receive(
offer: DccOffer,
customFolderUri: String?,
onProgress: (Long, Long) -> Unit
): String = withContext(Dispatchers.IO) {
// Passive DCC offers carry port=0; calling Socket(ip, 0) is undefined behaviour.
// The caller should use receivePassive() for those. Catch misrouted calls early.
require(offer.port > 0) {
"DCC RECEIVE: port must be > 0 for active DCC (use receivePassive() for passive offers)"
}
// Validate the remote IP BEFORE creating the output file. createDccOutputStream
// creates a real on-disk file (or SAF document); if validateRemoteIp threw after
// it, the file was leaked as zero bytes. Open the socket here too for the same
// reason — if the connect fails, no file gets created.
validateRemoteIp(offer.ip)
val rawSock = Socket(offer.ip, offer.port)
val sock = if (offer.secure) wrapTls(rawSock, offer.ip) else rawSock
val (outputStream, savedPath) = try {
createDccOutputStream(offer.filename, customFolderUri)
} catch (t: Throwable) {
runCatching { sock.close() }
throw t
}
var receivedAnyBytes = false
var received = 0L
val cancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { sock.close() } }
try {
sock.use { s ->
// Wrap in a BufferedOutputStream to reduce IPC round-trips for SAF/MediaStore
// streams, but keep a reference so we can flush it explicitly before the inner
// stream is closed. Without this flush, small files (<256 KB) are fully
// received into the buffer but never written: the outer outputStream.use{}
// closes the raw stream, silently discarding the buffered bytes.
val buffered = java.io.BufferedOutputStream(outputStream, 256 * 1024)
try {
received = receiveFromSocket(s, buffered, offer.size, offer.turbo) { sent, total ->
if (sent > 0) receivedAnyBytes = true
onProgress(sent, total)
}
} finally {
runCatching { buffered.flush() }
outputStream.close()
if (!receivedAnyBytes) deleteSavedPathIfEmpty(savedPath)
}
}
} finally {
cancelHandle?.dispose()
}
// Integrity gate. Only reached on a clean read-loop exit (a mid-transfer socket
// error propagates above and is reported as a failure by the caller).
verifyCompleteOrCleanup(savedPath, received, offer.size)
savedPath
}
/**
* Passive/Reverse DCC RECEIVE.
*
* The remote sender offered port 0 + token. We open a listening port, reply with the port,
* then accept the incoming connection and receive.
*
* @param customFolderUri Optional SAF URI for custom download folder (null = Downloads)
* @return The path/URI where the file was saved
*/
suspend fun receivePassive(
offer: DccOffer,
portMin: Int,
portMax: Int,
customFolderUri: String?,
onListening: suspend (ipAsInt: Long, port: Int, size: Long, token: Long) -> Unit,
onProgress: (Long, Long) -> Unit
): String = withContext(Dispatchers.IO) {
// Validate passive offer: must have a token AND port must be 0. Some misbehaving clients
// send port 0 without a token (malformed passive DCC) - catch this early.
if (offer.port != 0) throw IllegalArgumentException("Passive DCC offer has non-zero port: ${offer.port}")
val token = offer.token ?: throw IllegalArgumentException("Passive DCC offer is missing token (port=0 but no token)")
// Bind FIRST so a port-exhausted failure doesn't leave a leaked zero-byte file
// on disk. Same rationale as the validateRemoteIp-before-createDccOutputStream
// ordering in receive() above.
val ss = bindFirstAvailable(portMin, portMax)
val (outputStream, savedPath) = try {
createDccOutputStream(offer.filename, customFolderUri)
} catch (t: Throwable) {
runCatching { ss.close() }
throw t
}
// If anything goes wrong before bytes start arriving, close+delete the file so the
// user doesn't end up with a zero-byte placeholder cluttering Downloads.
var receivedAnyBytes = false
var received = 0L
// Close the ServerSocket on cancellation so accept() unblocks immediately.
val ssCancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { ss.close() } }
try {
ss.soTimeout = 45_000
val ipInt = localIpv4AsInt()
onListening(ipInt, ss.localPort, offer.size, token)
val rawSock = try {
ss.accept()
} catch (t: Throwable) {
outputStream.close()
deleteSavedPathIfEmpty(savedPath)
throw if (t is java.net.SocketTimeoutException)
RuntimeException("DCC RECEIVE timed out waiting for sender to connect")
else t
}
val sock = try {
if (offer.secure) wrapTls(rawSock, rawSock.inetAddress?.hostAddress ?: "0.0.0.0") else rawSock
} catch (t: Throwable) {
runCatching { rawSock.close() }
outputStream.close()
deleteSavedPathIfEmpty(savedPath)
throw t
}
val sockCancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { sock.close() } }
try {
sock.use { s ->
val buffered = java.io.BufferedOutputStream(outputStream, 256 * 1024)
try {
received = receiveFromSocket(s, buffered, offer.size, offer.turbo) { sent, total ->
if (sent > 0) receivedAnyBytes = true
onProgress(sent, total)
}
} finally {
runCatching { buffered.flush() }
outputStream.close()
if (!receivedAnyBytes) deleteSavedPathIfEmpty(savedPath)
}
}
} finally {
sockCancelHandle?.dispose()
}
} finally {
ssCancelHandle?.dispose()
runCatching { ss.close() }
}
// Integrity gate — see receive() for the rationale.
verifyCompleteOrCleanup(savedPath, received, offer.size)
savedPath
}
/**
* Delete of a savedPath when the transfer didn't actually receive any
* bytes. Handles both file:// paths and SAF/MediaStore content:// URIs. Failures are
* swallowed: a leaked zero-byte file is annoying but not a correctness issue, so
* "delete failed" is not worth bubbling up over the original cause.
*/
private fun deleteSavedPathIfEmpty(savedPath: String) {
runCatching {
if (savedPath.startsWith("content://")) {
val uri = Uri.parse(savedPath)
ctx.contentResolver.delete(uri, null, null)
} else {
File(savedPath).takeIf { it.exists() && it.length() == 0L }?.delete()
}
}
}
/**
* Unconditional delete of a savedPath, regardless of current size. Used for
* partial (non-empty) downloads that can't be completed — unlike [deleteSavedPathIfEmpty],
* which only removes zero-byte placeholders. Failures are swallowed for the same reason.
*/
private fun deleteSavedPath(savedPath: String) {
runCatching {
if (savedPath.startsWith("content://")) {
ctx.contentResolver.delete(Uri.parse(savedPath), null, null)
} else {
File(savedPath).delete()
}
}
}
/**
* Post-transfer completeness check for an incoming DCC file.
*
* - 0 bytes received -> remove the empty placeholder (mirrors the in-flight
* `!receivedAnyBytes` cleanup; harmless if already gone).
* - received < offer size -> the file is truncated and (with no RESUME support)
* unrecoverable; delete it and throw [DccIncompleteException]
* so the transfer is reported as failed, not complete.
* - offer size unknown (0) -> nothing to verify against; accept whatever arrived.
* - received >= offer size -> success; leave the file in place.
*/
private fun verifyCompleteOrCleanup(savedPath: String, received: Long, expected: Long) {
if (received == 0L) {
deleteSavedPathIfEmpty(savedPath)
return
}
if (expected > 0L && received < expected) {
deleteSavedPath(savedPath)
throw DccIncompleteException(received = received, expected = expected)
}
}
/**
* Standard (active) DCC SEND: we listen on a port in portMin..portMax and send when peer connects.
*/
suspend fun sendFile(
file: File,
portMin: Int,
portMax: Int,
secure: Boolean = false,
onClient: suspend (ipAsInt: Long, port: Int, size: Long) -> Unit,
onProgress: (Long, Long) -> Unit
): Unit = withContext(Dispatchers.IO) {
val size = file.length()
val ss = bindFirstAvailable(portMin, portMax)
val cancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { ss.close() } }
try {
val ipInt = localIpv4AsInt()
onClient(ipInt, ss.localPort, size)
ss.soTimeout = 45_000
val rawSock = try {
ss.accept()
} catch (_: java.net.SocketTimeoutException) {
throw RuntimeException("DCC SEND timed out waiting for peer to connect")
}
val sock = if (secure) wrapTls(rawSock, rawSock.inetAddress.hostAddress ?: "") else rawSock
sock.use { s ->
val sockHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { s.close() } }
try {
sendOverSocket(s, file, size, onProgress)
} finally {
sockHandle?.dispose()
}
}
} finally {
cancelHandle?.dispose()
runCatching { ss.close() }
}
}
/**
* Passive/Reverse DCC SEND: peer opened a port; we connect out and send.
*/
suspend fun sendFileConnect(
file: File,
host: String,
port: Int,
secure: Boolean = false,
onProgress: (Long, Long) -> Unit
): Unit = withContext(Dispatchers.IO) {
val size = file.length()
validateRemoteIp(host)
val rawSock = Socket(host, port)
val sock = if (secure) wrapTls(rawSock, host) else rawSock
val cancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { sock.close() } }
try {
sock.use { s ->
sendOverSocket(s, file, size, onProgress)
}
} finally {
cancelHandle?.dispose()
}
}
/**
* Standard (active) DCC CHAT: we listen on a port in portMin..portMax and accept when peer connects.
* Returns the connected socket.
*/
suspend fun startChat(
portMin: Int,
portMax: Int,
onClient: suspend (ipAsInt: Long, port: Int) -> Unit
): Socket = withContext(Dispatchers.IO) {
val ss = bindFirstAvailable(portMin, portMax)
try {
val ipInt = localIpv4AsInt()
onClient(ipInt, ss.localPort)
ss.soTimeout = 45_000
val sock = try {
ss.accept()
} catch (_: java.net.SocketTimeoutException) {
throw RuntimeException("DCC CHAT timed out waiting for peer to connect")
}
sock.tcpNoDelay = true
sock.keepAlive = true
sock
} finally {
runCatching { ss.close() }
}
}
/** Standard (active) DCC CHAT: connect to the offered ip:port and return the connected socket. */
suspend fun connectChat(offer: DccChatOffer): Socket = withContext(Dispatchers.IO) {
val rawSock = Socket(offer.ip, offer.port)
val sock = if (offer.secure) wrapTls(rawSock, offer.ip) else rawSock
sock.tcpNoDelay = true
sock.keepAlive = true
sock
}
/**
* Receive bytes from [sock] into [outputStream], ACKing progress per the DCC convention.
*
* Throughput design:
* - The socket reader and the storage writer run on separate threads, communicating via
* a bounded queue. Without this, a slow MediaStore/SAF write stalls the socket read,
* the TCP receive window collapses, and the sender throttles. With it, network reads
* overlap with storage writes and the receive window stays open.
* - The queue is bounded (16 × 256 KB ≈ 4 MB) so a genuinely slow storage layer still
* backpressures the reader rather than ballooning RAM.
* - 256 KB read buffer + 1 MB SO_RCVBUF reduce per-iteration overhead and let the
* kernel buffer a meaningful BDP on higher-RTT links.
* - DCC ACKs are sent from the reader as soon as bytes are *received* (not after they
* hit disk). The wire-level contract is "we have these bytes" and we do — they're
* in our process. This unblocks lockstep senders without waiting for storage I/O.
* - [onProgress] is throttled to ~10× per second so we don't fire a StateFlow copy on
* every 256 KB chunk. A final progress callback always fires after the loop so the
* UI doesn't get stuck at, say, 99.7 %.
*
* @return the total number of bytes actually written. The caller compares this against
* the advertised offer size to decide success vs. truncation (see [verifyCompleteOrCleanup]).
*/
private fun receiveFromSocket(
sock: Socket,
outputStream: OutputStream,
expectedSize: Long,
turbo: Boolean,
onProgress: (Long, Long) -> Unit
): Long {
sock.tcpNoDelay = true
// Best-effort; some platforms cap or ignore this. Not fatal if it fails.
runCatching { sock.receiveBufferSize = 1 * 1024 * 1024 }
val ack64 = expectedSize > 0xFFFFFFFFL
val ackBuf = ByteArray(if (ack64) 8 else 4)
val expected: Long? = expectedSize.takeIf { it > 0L }
// Hard ceiling for transfers without an advertised size: 8 GB. Without this,
// a malicious sender that omits the size field could keep writing forever and
// fill the device's storage. With a known size we still cap at the advertised
// size so a sender that lies (offers 1 KB then sends 10 GB) can't bypass.
val maxAccept: Long = expected ?: (8L * 1024 * 1024 * 1024)
// Producer/consumer plumbing. Locals-shared-across-threads aren't @Volatile-able
// in Kotlin, so use the java.util.concurrent.atomic primitives.
val queue = java.util.concurrent.ArrayBlockingQueue(16)
val writerError = java.util.concurrent.atomic.AtomicReference(null)
val readerDone = java.util.concurrent.atomic.AtomicBoolean(false)
val writerThread = thread(start = true, isDaemon = true, name = "dcc-writer") {
try {
// Drain until the reader signals done AND the queue is empty. Poll with a
// short timeout so we notice `readerDone` even if the queue is empty.
while (!readerDone.get() || queue.isNotEmpty()) {
val chunk = queue.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS)
?: continue
outputStream.write(chunk)
}
} catch (t: Throwable) {
writerError.set(t)
}
}
var total = 0L
var lastProgressMs = 0L
// Cache once; SocketOutputStream is the same instance each call, but avoid the
// unchecked-cast / lookup in the hot path.
val ackOut = sock.getOutputStream()
try {
sock.getInputStream().use { inp ->
val buf = ByteArray(256 * 1024)
while (true) {
// Surface a writer-thread crash as soon as we notice it; don't keep
// reading more bytes we can't drain.
writerError.get()?.let { throw it }
val remaining = maxAccept - total
if (remaining <= 0L) break
val toRead = if (remaining < buf.size) remaining.toInt() else buf.size
val n = inp.read(buf, 0, toRead)
if (n <= 0) break
// Hand the chunk to the writer. Copy because `buf` is reused next iter.
val chunk = buf.copyOf(n)
// Bounded offer with timeout — backpressures the reader if storage is
// slow, but lets us re-check writerError instead of blocking forever
// if the writer thread has died.
while (!queue.offer(chunk, 100, java.util.concurrent.TimeUnit.MILLISECONDS)) {
writerError.get()?.let { throw it }
}
total += n
if (!turbo) {
// ACK the running total of bytes RECEIVED (not bytes written to
// disk). The DCC ACK semantic is wire-level acknowledgment.
if (ack64) {
ackBuf[0] = (total ushr 56).toByte()
ackBuf[1] = (total ushr 48).toByte()
ackBuf[2] = (total ushr 40).toByte()
ackBuf[3] = (total ushr 32).toByte()
ackBuf[4] = (total ushr 24).toByte()
ackBuf[5] = (total ushr 16).toByte()
ackBuf[6] = (total ushr 8).toByte()
ackBuf[7] = total.toByte()
} else {
val ackInt = total.coerceAtMost(0xFFFFFFFFL)
ackBuf[0] = (ackInt ushr 24).toByte()
ackBuf[1] = (ackInt ushr 16).toByte()
ackBuf[2] = (ackInt ushr 8).toByte()
ackBuf[3] = ackInt.toByte()
}
runCatching { ackOut.write(ackBuf) }
}
// Time-gated progress so the per-chunk StateFlow copy isn't in the
// hot path. ~10 updates/sec is plenty for a smooth progress bar.
val now = System.currentTimeMillis()
if (now - lastProgressMs >= 100L) {
lastProgressMs = now
onProgress(total, expectedSize)
}
if (expected != null && total >= expected) break
}
}
} finally {
// Signal the writer to drain remaining chunks and exit; wait briefly. If it's
// stuck (e.g. the storage stream blocked indefinitely), interrupt — better to
// leak a thread once than to deadlock the receive coroutine.
readerDone.set(true)
writerThread.join(10_000)
if (writerThread.isAlive) {
writerThread.interrupt()
writerThread.join(2_000)
}
// If the writer errored late (while draining), surface that as the failure
// rather than reporting a clean total.
writerError.get()?.let { throw it }
}
// Final progress beat so the UI lands on the actual total rather than the last
// throttled sample.
onProgress(total, expectedSize)
return total
}
private fun sendOverSocket(
sock: Socket,
file: File,
size: Long,
onProgress: (Long, Long) -> Unit
) {
sock.tcpNoDelay = true
// Used by the ACK reader thread.
sock.soTimeout = 1_000
val acked = AtomicLong(0L)
// ACK width must match what the receiver sends (see receiveFromSocket):
// 8-byte (64-bit) ACKs for files larger than a 32-bit value, else 4-byte.
// Reading the wrong width desyncs the ACK stream and breaks completion
// detection for transfers above 4 GiB.
val ack64 = size > 0xFFFFFFFFL
val ackWidth = if (ack64) 8 else 4
fun u32be(b: ByteArray): Long =
(ByteBuffer.wrap(b, 0, 4).order(ByteOrder.BIG_ENDIAN).int.toLong() and 0xFFFFFFFFL)
fun u32le(b: ByteArray): Long =
(ByteBuffer.wrap(b, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int.toLong() and 0xFFFFFFFFL)
fun u64be(b: ByteArray): Long =
ByteBuffer.wrap(b, 0, 8).order(ByteOrder.BIG_ENDIAN).long
fun u64le(b: ByteArray): Long =
ByteBuffer.wrap(b, 0, 8).order(ByteOrder.LITTLE_ENDIAN).long
fun chooseAck(be: Long, le: Long, last: Long): Long? {
// DCC ACKs are commonly network byte order, but some clients historically send host order.
// We choose the value that is monotonic and plausible for this transfer size.
val cands = sequenceOf(be, le).distinct().filter { it >= last }.toList()
if (cands.isEmpty()) return null
if (size > 0L) {
// Prefer candidates <= size.
val inRange = cands.filter { it <= size }
if (inRange.isNotEmpty()) return inRange.maxOrNull()
// Some clients may overshoot slightly; clamp.
val near = cands.filter { it <= size + 1024 * 1024L }
if (near.isNotEmpty()) return size
}
return cands.maxOrNull()
}
val ackThread = thread(start = true, isDaemon = true, name = "dcc-ack-reader") {
val inp = sock.getInputStream()
val b = ByteArray(ackWidth)
var off = 0
var last = 0L
while (!Thread.currentThread().isInterrupted) {
try {
val n = inp.read(b, off, ackWidth - off)
if (n < 0) break
off += n
if (off == ackWidth) {
val be = if (ack64) u64be(b) else u32be(b)
val le = if (ack64) u64le(b) else u32le(b)
val chosen = chooseAck(be, le, last)
if (chosen != null) {
last = chosen
acked.set(chosen)
}
off = 0
}
} catch (_: java.net.SocketTimeoutException) {
// keep polling
} catch (_: Throwable) {
break
}
}
}
var sent = 0L
try {
val outRaw = sock.getOutputStream()
val out = BufferedOutputStream(outRaw, 64 * 1024)
val buf = ByteArray(32 * 1024)
file.inputStream().use { fin ->
while (true) {
val n = fin.read(buf)
if (n <= 0) break
try {
out.write(buf, 0, n)
sent += n
onProgress(sent, size)
} catch (io: IOException) {
// If the peer already ACKed the full size, treat as success.
if (size > 0L && acked.get() >= size) break
throw io
}
}
}
out.flush()
// Half-close so receiver sees EOF; then wait briefly for final ACK/peer close.
runCatching { sock.shutdownOutput() }
val deadline = System.currentTimeMillis() + 10_000L
while (size > 0L && acked.get() < size && System.currentTimeMillis() < deadline) {
// If the receiver closed, the ACK thread will stop.
if (!ackThread.isAlive) break
Thread.sleep(50)
}
} finally {
runCatching { ackThread.interrupt() }
}
}
private fun bindFirstAvailable(min: Int, max: Int): ServerSocket {
val a = min.coerceIn(1, 65535)
val b = max.coerceIn(1, 65535)
for (p in a..b) {
try {
return ServerSocket(p)
} catch (_: Throwable) {
// try next
}
}
throw IllegalStateException("No free port in $a..$b")
}
private fun ipv4ToLongBestEffort(ip: String): Long {
val parts = ip.split(".")
if (parts.size != 4) return 0L
return try {
var out = 0L
for (p in parts) out = (out shl 8) or (p.toLong() and 0xFFL)
out
} catch (_: Throwable) {
0L
}
}
}