Dccmanager.kt

Java boxlabs 6 Views Size: 40.86 KB Posted on: May 29, 26 @ 11:12 AM
  1. /*
  2. * HexDroidIRC - An IRC Client for Android
  3. * Copyright (C) 2026 boxlabs
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  17. */
  18.  
  19. package com.boxlabs.hexdroid
  20.  
  21. import android.content.ContentValues
  22. import android.content.Context
  23. import android.net.Uri
  24. import android.os.Build
  25. import android.os.Environment
  26. import android.provider.MediaStore
  27. import androidx.documentfile.provider.DocumentFile
  28. import kotlin.concurrent.thread
  29. import kotlinx.coroutines.Dispatchers
  30. import kotlinx.coroutines.withContext
  31. import java.io.BufferedOutputStream
  32. import java.io.File
  33. import java.io.FileOutputStream
  34. import java.io.IOException
  35. import java.io.OutputStream
  36. import java.nio.ByteOrder
  37. import java.net.Inet4Address
  38. import java.net.NetworkInterface
  39. import java.net.ServerSocket
  40. import java.net.Socket
  41. import java.nio.ByteBuffer
  42. import java.util.Collections
  43. import java.util.concurrent.atomic.AtomicLong
  44.  
  45. /**
  46.  * Parsed CTCP DCC SEND offer.
  47.  *
  48.  * Supports active DCC and passive/reverse DCC (token + port 0 handshake).
  49.  */
  50. data class DccOffer(
  51.     /** Network id this offer belongs to (filled by the ViewModel). */
  52.     val netId: String = "",
  53.     val from: String,
  54.     val filename: String,
  55.     val ip: String,
  56.     val port: Int,
  57.     val size: Long,
  58.     /** Passive/reverse DCC token (HexChat "Passive DCC"). */
  59.     val token: Long? = null,
  60.     /** Turbo DCC / TSEND: do not send ACKs back to the sender. */
  61.     val turbo: Boolean = false,
  62.     /** SDCC / SSEND: transfer is TLS-wrapped. */
  63.     val secure: Boolean = false,
  64. ) {
  65.     val isPassive: Boolean get() = port == 0 && token != null
  66. }
  67.  
  68. /** Parsed CTCP DCC CHAT offer. */
  69. data class DccChatOffer(
  70.     /** Network id this offer belongs to (filled by the ViewModel). */
  71.     val netId: String = "",
  72.     val from: String,
  73.     val protocol: String = "chat",
  74.     val ip: String,
  75.     val port: Int,
  76.     /** Passive/reverse token (rare; included for forward compatibility). */
  77.     val token: Long? = null,
  78.     /** SDCC / SCHAT: session is TLS-wrapped. */
  79.     val secure: Boolean = false,
  80. ) {
  81.     val isPassive: Boolean get() = port == 0 && token != null
  82. }
  83.  
  84. /**
  85.  * Thrown when a DCC receive ends with fewer bytes than the offer advertised. Distinct from
  86.  * a generic [IOException] so the ViewModel can surface "incomplete" specifically (e.g. offer
  87.  * a retry) rather than treating it like a connection error. Carries the byte counts for the
  88.  * UI message.
  89.  */
  90. class DccIncompleteException(
  91.     val received: Long,
  92.     val expected: Long,
  93. ) : IOException("DCC transfer incomplete: received $received of $expected bytes (the sender closed the connection early)")
  94.  
  95. sealed class DccTransferState {
  96.     data class Incoming(
  97.         val offer: DccOffer,
  98.         val received: Long = 0,
  99.         val startTimeMs: Long = System.currentTimeMillis(),
  100.         val done: Boolean = false,
  101.         val error: String? = null,
  102.         val savedPath: String? = null,
  103.         /** Epoch ms when the transfer finished; null while still in progress.
  104.          *  Stored so the completed avg-speed display uses the actual transfer
  105.          *  duration rather than ever-growing wall-clock time after completion. */
  106.         val endTimeMs: Long? = null
  107.     ) : DccTransferState()
  108.  
  109.     data class Outgoing(
  110.         val target: String,
  111.         val filename: String,
  112.         val fileSize: Long = 0,     // Total bytes; 0 = unknown (e.g. stream source)
  113.         val bytesSent: Long = 0,
  114.         val startTimeMs: Long = System.currentTimeMillis(),
  115.         val done: Boolean = false,
  116.         val error: String? = null,
  117.         /** Epoch ms when the transfer finished; null while in progress. See [Incoming.endTimeMs]. */
  118.         val endTimeMs: Long? = null
  119.     ) : DccTransferState()
  120.  
  121. }
  122.  
  123. /**
  124.  * DCC Manager handles file send/receive and DCC CHAT socket lifecycle.
  125.  *
  126.  * Security notes:
  127.  *  - [bindFirstAvailable] iterates ports min..max; each failed `ServerSocket()` call cleans
  128.  *    up its own OS resources before throwing, so there is no leak on the error path.
  129.  *  - Passive DCC offers are validated: port must be 0 AND token must be present. Malformed
  130.  *    offers (port=0, no token) throw [IllegalArgumentException] before any socket is opened.
  131.  *  - Turbo DCC (TSEND) skips per-chunk ACKs; the caller is responsible for confirming file
  132.  *    integrity out-of-band (e.g. hash check).
  133.  *  - Remote IPs are validated before connecting: loopback, link-local, wildcard, and multicast
  134.  *    addresses are rejected. RFC-1918 private addresses are intentionally allowed to support
  135.  *    LAN DCC between devices on the same network.
  136.  */
  137. class DccManager(ctx: Context) {
  138.  
  139.     // Avoid leaking an Activity context.
  140.     private val ctx: Context = ctx.applicationContext
  141.  
  142.     /**
  143.      * Wraps a plain socket with TLS for SDCC (Secure DCC).
  144.      * Uses the default SSLSocketFactory which trusts system CAs.
  145.      * SDCC peers are often self-signed; we intentionally skip hostname verification
  146.      * since DCC is IP-addressed, not hostname-addressed. The encryption still provides
  147.      * confidentiality against passive eavesdroppers.
  148.      */
  149.     private fun wrapTls(sock: Socket, host: String): Socket {
  150.         val sf = javax.net.ssl.SSLSocketFactory.getDefault() as javax.net.ssl.SSLSocketFactory
  151.         val ssl = sf.createSocket(sock, host, sock.port, true) as javax.net.ssl.SSLSocket
  152.         // Disable hostname verification; DCC uses raw IPs.
  153.         ssl.sslParameters = ssl.sslParameters.also { it.endpointIdentificationAlgorithm = null }
  154.         ssl.startHandshake()
  155.         return ssl
  156.     }
  157.  
  158.     /**
  159.      * Rejects IPs that are structurally invalid targets for a DCC connection.
  160.      *
  161.      * RFC-1918 private addresses (192.168.x.x, 10.x.x.x, 172.16-31.x.x) are intentionally
  162.      * allowed here — LAN DCC between devices on the same network is a normal and common use
  163.      * case (e.g. sharing files with someone on the same Wi-Fi). The SSRF risk for these
  164.      * addresses is low because DCC is a raw TCP connection, not an HTTP request, so it cannot
  165.      * be easily weaponised to exfiltrate data from a local HTTP service.
  166.      *
  167.      * We still block:
  168.      *  - Loopback (127.x.x.x / ::1): no legitimate DCC offer uses localhost.
  169.      *  - Link-local (169.254.x.x / fe80::): APIPA / router-link addresses; not routable.
  170.      *  - Wildcard (0.0.0.0 / ::): meaningless as a connect target.
  171.      *  - Multicast: not a unicast endpoint.
  172.      */
  173.     private fun validateRemoteIp(ip: String) {
  174.         val addr = try {
  175.             java.net.InetAddress.getByName(ip)
  176.         } catch (_: Throwable) {
  177.             throw IllegalArgumentException("DCC: invalid IP address: $ip")
  178.         }
  179.         if (addr.isLoopbackAddress)
  180.             throw IllegalArgumentException("DCC: refusing connection to loopback address $ip")
  181.         if (addr.isLinkLocalAddress)
  182.             throw IllegalArgumentException("DCC: refusing connection to link-local address $ip")
  183.         if (addr.isAnyLocalAddress)
  184.             throw IllegalArgumentException("DCC: refusing connection to wildcard address $ip")
  185.         if (addr.isMulticastAddress)
  186.             throw IllegalArgumentException("DCC: refusing connection to multicast address $ip")
  187.     }
  188.  
  189.     /**
  190.      * Internal fallback directory for DCC files (used when external storage is unavailable).
  191.      */
  192.     private fun internalDccDir(): File = File(ctx.filesDir, "dcc").apply { mkdirs() }
  193.  
  194.     /**
  195.      * Get the default Downloads directory.
  196.      */
  197.     @Suppress("DEPRECATION")
  198.     private fun defaultDownloadsDir(): File {
  199.         return Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS)
  200.     }
  201.  
  202.     /**
  203.      * Create a safe output file for an incoming DCC transfer.
  204.      *
  205.      * @param displayName The filename from the DCC offer
  206.      * @param customFolderUri Optional SAF URI for custom download folder
  207.      * @return Pair of (OutputStream to write to, display path for UI)
  208.      */
  209.     fun createDccOutputStream(displayName: String, customFolderUri: String?): Pair<OutputStream, String> {
  210.         val safeName = sanitizeFilename(displayName)
  211.        
  212.         // If custom folder is set, use SAF
  213.         if (!customFolderUri.isNullOrBlank()) {
  214.             return createSafOutputStream(safeName, customFolderUri)
  215.         }
  216.        
  217.         // On Android 10+, use MediaStore for Downloads
  218.         if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) {
  219.             return createMediaStoreOutputStream(safeName)
  220.         }
  221.        
  222.         // On older Android, write directly to Downloads folder
  223.         return createLegacyOutputStream(safeName)
  224.     }
  225.  
  226.     private fun sanitizeFilename(displayName: String): String {
  227.         val base = displayName.substringAfterLast('/').substringAfterLast('\\').trim()
  228.         return base
  229.             .replace(Regex("[\\/:\\*?\"<>|]"), "_")
  230.             .replace("\u0000", "")
  231.             .let { if (it == "." || it == ".." || it.isBlank()) "dcc_download.bin" else it }
  232.             .take(120)
  233.     }
  234.  
  235.     private fun createSafOutputStream(filename: String, folderUri: String): Pair<OutputStream, String> {
  236.         val treeUri = Uri.parse(folderUri)
  237.         val tree = DocumentFile.fromTreeUri(ctx, treeUri)
  238.             ?: throw IOException("Cannot access folder: $folderUri")
  239.        
  240.         // Find unique filename
  241.         var finalName = filename
  242.         var counter = 1
  243.         while (tree.findFile(finalName) != null) {
  244.             val stem = filename.substringBeforeLast('.', filename)
  245.             val ext = filename.substringAfterLast('.', "")
  246.             finalName = if (ext.isNotBlank()) "$stem ($counter).$ext" else "$stem ($counter)"
  247.             counter++
  248.             if (counter > 999) {
  249.                 finalName = "${stem}_${System.currentTimeMillis()}.${ext.ifBlank { "bin" }}"
  250.                 break
  251.             }
  252.         }
  253.        
  254.         val mimeType = "application/octet-stream"
  255.         val newFile = tree.createFile(mimeType, finalName)
  256.             ?: throw IOException("Failed to create file: $finalName")
  257.        
  258.         val outputStream = ctx.contentResolver.openOutputStream(newFile.uri)
  259.             ?: throw IOException("Failed to open output stream for: $finalName")
  260.        
  261.         return Pair(outputStream, newFile.uri.toString())
  262.     }
  263.  
  264.     private fun createMediaStoreOutputStream(filename: String): Pair<OutputStream, String> {
  265.         val resolver = ctx.contentResolver
  266.        
  267.         // Find unique filename
  268.         var finalName = filename
  269.         var counter = 1
  270.         while (true) {
  271.             val exists = resolver.query(
  272.                 MediaStore.Downloads.EXTERNAL_CONTENT_URI,
  273.                 arrayOf(MediaStore.Downloads._ID),
  274.                 "${MediaStore.Downloads.DISPLAY_NAME} = ?",
  275.                 arrayOf(finalName),
  276.                 null
  277.             )?.use { it.count > 0 } ?: false
  278.            
  279.             if (!exists) break
  280.            
  281.             val stem = filename.substringBeforeLast('.', filename)
  282.             val ext = filename.substringAfterLast('.', "")
  283.             finalName = if (ext.isNotBlank()) "$stem ($counter).$ext" else "$stem ($counter)"
  284.             counter++
  285.             if (counter > 999) {
  286.                 finalName = "${stem}_${System.currentTimeMillis()}.${ext.ifBlank { "bin" }}"
  287.                 break
  288.             }
  289.         }
  290.        
  291.         val values = ContentValues().apply {
  292.             put(MediaStore.Downloads.DISPLAY_NAME, finalName)
  293.             put(MediaStore.Downloads.MIME_TYPE, "application/octet-stream")
  294.             put(MediaStore.Downloads.IS_PENDING, 1)
  295.         }
  296.        
  297.         val uri = resolver.insert(MediaStore.Downloads.EXTERNAL_CONTENT_URI, values)
  298.             ?: throw IOException("Failed to create MediaStore entry for: $finalName")
  299.        
  300.         val outputStream = resolver.openOutputStream(uri)
  301.             ?: throw IOException("Failed to open output stream for: $finalName")
  302.        
  303.         // Return a wrapper that marks IS_PENDING = 0 when closed
  304.         val wrappedStream = object : OutputStream() {
  305.             override fun write(b: Int) = outputStream.write(b)
  306.             override fun write(b: ByteArray) = outputStream.write(b)
  307.             override fun write(b: ByteArray, off: Int, len: Int) = outputStream.write(b, off, len)
  308.             override fun flush() = outputStream.flush()
  309.             override fun close() {
  310.                 outputStream.close()
  311.                 val updateValues = ContentValues().apply {
  312.                     put(MediaStore.Downloads.IS_PENDING, 0)
  313.                 }
  314.                 resolver.update(uri, updateValues, null, null)
  315.             }
  316.         }
  317.        
  318.         // Return the content:// URI string (not a display path) so shareFile can
  319.         // resolve and open the file via ContentResolver on Android 10+.
  320.         return Pair(wrappedStream, uri.toString())
  321.     }
  322.  
  323.     @Suppress("DEPRECATION")
  324.     private fun createLegacyOutputStream(filename: String): Pair<OutputStream, String> {
  325.         val downloadsDir = defaultDownloadsDir()
  326.         downloadsDir.mkdirs()
  327.        
  328.         // Find unique filename
  329.         var file = File(downloadsDir, filename)
  330.         var counter = 1
  331.         while (file.exists()) {
  332.             val stem = filename.substringBeforeLast('.', filename)
  333.             val ext = filename.substringAfterLast('.', "")
  334.             val newName = if (ext.isNotBlank()) "$stem ($counter).$ext" else "$stem ($counter)"
  335.             file = File(downloadsDir, newName)
  336.             counter++
  337.             if (counter > 999) {
  338.                 file = File(downloadsDir, "${stem}_${System.currentTimeMillis()}.${ext.ifBlank { "bin" }}")
  339.                 break
  340.             }
  341.         }
  342.        
  343.         return Pair(FileOutputStream(file), file.absolutePath)
  344.     }
  345.  
  346.     // local IPv4 in dotted notation
  347.     fun localIpv4OrNull(): String? {
  348.         return try {
  349.             val ifaces = Collections.list(NetworkInterface.getNetworkInterfaces())
  350.             for (iface in ifaces) {
  351.                 if (!iface.isUp || iface.isLoopback) continue
  352.                 val addrs = Collections.list(iface.inetAddresses)
  353.                 for (a in addrs) {
  354.                     if (a is Inet4Address && !a.isLoopbackAddress && !a.isLinkLocalAddress) {
  355.                         return a.hostAddress
  356.                     }
  357.                 }
  358.             }
  359.             null
  360.         } catch (_: Throwable) {
  361.             null
  362.         }
  363.     }
  364.  
  365.     // local IPv4 as an unsigned 32-bit integer (decimal string in CTCP)
  366.     fun localIpv4AsInt(): Long {
  367.         val ip = localIpv4OrNull() ?: return 0L
  368.         return ipv4ToLongBestEffort(ip)
  369.     }
  370.  
  371.     /**
  372.      * Standard DCC RECEIVE (we connect to sender's ip:port).
  373.      *
  374.      * @param customFolderUri Optional SAF URI for custom download folder (null = Downloads)
  375.      * @return The path/URI where the file was saved
  376.      */
  377.     suspend fun receive(
  378.         offer: DccOffer,
  379.         customFolderUri: String?,
  380.         onProgress: (Long, Long) -> Unit
  381.     ): String = withContext(Dispatchers.IO) {
  382.         // Passive DCC offers carry port=0; calling Socket(ip, 0) is undefined behaviour.
  383.         // The caller should use receivePassive() for those. Catch misrouted calls early.
  384.         require(offer.port > 0) {
  385.             "DCC RECEIVE: port must be > 0 for active DCC (use receivePassive() for passive offers)"
  386.         }
  387.  
  388.         // Validate the remote IP BEFORE creating the output file. createDccOutputStream
  389.         // creates a real on-disk file (or SAF document); if validateRemoteIp threw after
  390.         // it, the file was leaked as zero bytes. Open the socket here too for the same
  391.         // reason — if the connect fails, no file gets created.
  392.         validateRemoteIp(offer.ip)
  393.         val rawSock = Socket(offer.ip, offer.port)
  394.         val sock = if (offer.secure) wrapTls(rawSock, offer.ip) else rawSock
  395.  
  396.         val (outputStream, savedPath) = try {
  397.             createDccOutputStream(offer.filename, customFolderUri)
  398.         } catch (t: Throwable) {
  399.             runCatching { sock.close() }
  400.             throw t
  401.         }
  402.         var receivedAnyBytes = false
  403.         var received = 0L
  404.         val cancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { sock.close() } }
  405.         try {
  406.             sock.use { s ->
  407.                 // Wrap in a BufferedOutputStream to reduce IPC round-trips for SAF/MediaStore
  408.                 // streams, but keep a reference so we can flush it explicitly before the inner
  409.                 // stream is closed. Without this flush, small files (<256 KB) are fully
  410.                 // received into the buffer but never written: the outer outputStream.use{}
  411.                 // closes the raw stream, silently discarding the buffered bytes.
  412.                 val buffered = java.io.BufferedOutputStream(outputStream, 256 * 1024)
  413.                 try {
  414.                     received = receiveFromSocket(s, buffered, offer.size, offer.turbo) { sent, total ->
  415.                         if (sent > 0) receivedAnyBytes = true
  416.                         onProgress(sent, total)
  417.                     }
  418.                 } finally {
  419.                     runCatching { buffered.flush() }
  420.                     outputStream.close()
  421.                     if (!receivedAnyBytes) deleteSavedPathIfEmpty(savedPath)
  422.                 }
  423.             }
  424.         } finally {
  425.             cancelHandle?.dispose()
  426.         }
  427.  
  428.         // Integrity gate. Only reached on a clean read-loop exit (a mid-transfer socket
  429.         // error propagates above and is reported as a failure by the caller).
  430.         verifyCompleteOrCleanup(savedPath, received, offer.size)
  431.  
  432.         savedPath
  433.     }
  434.  
  435.     /**
  436.      * Passive/Reverse DCC RECEIVE.
  437.      *
  438.      * The remote sender offered port 0 + token. We open a listening port, reply with the port,
  439.      * then accept the incoming connection and receive.
  440.      *
  441.      * @param customFolderUri Optional SAF URI for custom download folder (null = Downloads)
  442.      * @return The path/URI where the file was saved
  443.      */
  444.     suspend fun receivePassive(
  445.         offer: DccOffer,
  446.         portMin: Int,
  447.         portMax: Int,
  448.         customFolderUri: String?,
  449.         onListening: suspend (ipAsInt: Long, port: Int, size: Long, token: Long) -> Unit,
  450.         onProgress: (Long, Long) -> Unit
  451.     ): String = withContext(Dispatchers.IO) {
  452.         // Validate passive offer: must have a token AND port must be 0. Some misbehaving clients
  453.         // send port 0 without a token (malformed passive DCC) - catch this early.
  454.         if (offer.port != 0) throw IllegalArgumentException("Passive DCC offer has non-zero port: ${offer.port}")
  455.         val token = offer.token ?: throw IllegalArgumentException("Passive DCC offer is missing token (port=0 but no token)")
  456.  
  457.         // Bind FIRST so a port-exhausted failure doesn't leave a leaked zero-byte file
  458.         // on disk. Same rationale as the validateRemoteIp-before-createDccOutputStream
  459.         // ordering in receive() above.
  460.         val ss = bindFirstAvailable(portMin, portMax)
  461.         val (outputStream, savedPath) = try {
  462.             createDccOutputStream(offer.filename, customFolderUri)
  463.         } catch (t: Throwable) {
  464.             runCatching { ss.close() }
  465.             throw t
  466.         }
  467.         // If anything goes wrong before bytes start arriving, close+delete the file so the
  468.         // user doesn't end up with a zero-byte placeholder cluttering Downloads.
  469.         var receivedAnyBytes = false
  470.         var received = 0L
  471.         // Close the ServerSocket on cancellation so accept() unblocks immediately.
  472.         val ssCancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { ss.close() } }
  473.         try {
  474.             ss.soTimeout = 45_000
  475.             val ipInt = localIpv4AsInt()
  476.             onListening(ipInt, ss.localPort, offer.size, token)
  477.  
  478.             val rawSock = try {
  479.                 ss.accept()
  480.             } catch (t: Throwable) {
  481.                 outputStream.close()
  482.                 deleteSavedPathIfEmpty(savedPath)
  483.                 throw if (t is java.net.SocketTimeoutException)
  484.                     RuntimeException("DCC RECEIVE timed out waiting for sender to connect")
  485.                 else t
  486.             }
  487.             val sock = try {
  488.                 if (offer.secure) wrapTls(rawSock, rawSock.inetAddress?.hostAddress ?: "0.0.0.0") else rawSock
  489.             } catch (t: Throwable) {
  490.                 runCatching { rawSock.close() }
  491.                 outputStream.close()
  492.                 deleteSavedPathIfEmpty(savedPath)
  493.                 throw t
  494.             }
  495.  
  496.             val sockCancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { sock.close() } }
  497.             try {
  498.                 sock.use { s ->
  499.                     val buffered = java.io.BufferedOutputStream(outputStream, 256 * 1024)
  500.                     try {
  501.                         received = receiveFromSocket(s, buffered, offer.size, offer.turbo) { sent, total ->
  502.                             if (sent > 0) receivedAnyBytes = true
  503.                             onProgress(sent, total)
  504.                         }
  505.                     } finally {
  506.                         runCatching { buffered.flush() }
  507.                         outputStream.close()
  508.                         if (!receivedAnyBytes) deleteSavedPathIfEmpty(savedPath)
  509.                     }
  510.                 }
  511.             } finally {
  512.                 sockCancelHandle?.dispose()
  513.             }
  514.         } finally {
  515.             ssCancelHandle?.dispose()
  516.             runCatching { ss.close() }
  517.         }
  518.  
  519.         // Integrity gate — see receive() for the rationale.
  520.         verifyCompleteOrCleanup(savedPath, received, offer.size)
  521.  
  522.         savedPath
  523.     }
  524.  
  525.     /**
  526.      * Delete of a savedPath when the transfer didn't actually receive any
  527.      * bytes. Handles both file:// paths and SAF/MediaStore content:// URIs. Failures are
  528.      * swallowed: a leaked zero-byte file is annoying but not a correctness issue, so
  529.      * "delete failed" is not worth bubbling up over the original cause.
  530.      */
  531.     private fun deleteSavedPathIfEmpty(savedPath: String) {
  532.         runCatching {
  533.             if (savedPath.startsWith("content://")) {
  534.                 val uri = Uri.parse(savedPath)
  535.                 ctx.contentResolver.delete(uri, null, null)
  536.             } else {
  537.                 File(savedPath).takeIf { it.exists() && it.length() == 0L }?.delete()
  538.             }
  539.         }
  540.     }
  541.  
  542.     /**
  543.      * Unconditional delete of a savedPath, regardless of current size. Used for
  544.      * partial (non-empty) downloads that can't be completed — unlike [deleteSavedPathIfEmpty],
  545.      * which only removes zero-byte placeholders. Failures are swallowed for the same reason.
  546.      */
  547.     private fun deleteSavedPath(savedPath: String) {
  548.         runCatching {
  549.             if (savedPath.startsWith("content://")) {
  550.                 ctx.contentResolver.delete(Uri.parse(savedPath), null, null)
  551.             } else {
  552.                 File(savedPath).delete()
  553.             }
  554.         }
  555.     }
  556.  
  557.     /**
  558.      * Post-transfer completeness check for an incoming DCC file.
  559.      *
  560.      *  - 0 bytes received        -> remove the empty placeholder (mirrors the in-flight
  561.      *                               `!receivedAnyBytes` cleanup; harmless if already gone).
  562.      *  - received < offer size   -> the file is truncated and (with no RESUME support)
  563.      *                               unrecoverable; delete it and throw [DccIncompleteException]
  564.      *                               so the transfer is reported as failed, not complete.
  565.      *  - offer size unknown (0)  -> nothing to verify against; accept whatever arrived.
  566.      *  - received >= offer size  -> success; leave the file in place.
  567.      */
  568.     private fun verifyCompleteOrCleanup(savedPath: String, received: Long, expected: Long) {
  569.         if (received == 0L) {
  570.             deleteSavedPathIfEmpty(savedPath)
  571.             return
  572.         }
  573.         if (expected > 0L && received < expected) {
  574.             deleteSavedPath(savedPath)
  575.             throw DccIncompleteException(received = received, expected = expected)
  576.         }
  577.     }
  578.  
  579.     /**
  580.      * Standard (active) DCC SEND: we listen on a port in portMin..portMax and send when peer connects.
  581.      */
  582.     suspend fun sendFile(
  583.         file: File,
  584.         portMin: Int,
  585.         portMax: Int,
  586.         secure: Boolean = false,
  587.         onClient: suspend (ipAsInt: Long, port: Int, size: Long) -> Unit,
  588.         onProgress: (Long, Long) -> Unit
  589.     ): Unit = withContext(Dispatchers.IO) {
  590.         val size = file.length()
  591.         val ss = bindFirstAvailable(portMin, portMax)
  592.         val cancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { ss.close() } }
  593.         try {
  594.             val ipInt = localIpv4AsInt()
  595.             onClient(ipInt, ss.localPort, size)
  596.  
  597.             ss.soTimeout = 45_000
  598.             val rawSock = try {
  599.                 ss.accept()
  600.             } catch (_: java.net.SocketTimeoutException) {
  601.                 throw RuntimeException("DCC SEND timed out waiting for peer to connect")
  602.             }
  603.             val sock = if (secure) wrapTls(rawSock, rawSock.inetAddress.hostAddress ?: "") else rawSock
  604.  
  605.             sock.use { s ->
  606.                 val sockHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { s.close() } }
  607.                 try {
  608.                     sendOverSocket(s, file, size, onProgress)
  609.                 } finally {
  610.                     sockHandle?.dispose()
  611.                 }
  612.             }
  613.         } finally {
  614.             cancelHandle?.dispose()
  615.             runCatching { ss.close() }
  616.         }
  617.     }
  618.  
  619.     /**
  620.      * Passive/Reverse DCC SEND: peer opened a port; we connect out and send.
  621.      */
  622.     suspend fun sendFileConnect(
  623.         file: File,
  624.         host: String,
  625.         port: Int,
  626.         secure: Boolean = false,
  627.         onProgress: (Long, Long) -> Unit
  628.     ): Unit = withContext(Dispatchers.IO) {
  629.         val size = file.length()
  630.         validateRemoteIp(host)
  631.         val rawSock = Socket(host, port)
  632.         val sock = if (secure) wrapTls(rawSock, host) else rawSock
  633.         val cancelHandle = coroutineContext[kotlinx.coroutines.Job]?.invokeOnCompletion { runCatching { sock.close() } }
  634.         try {
  635.             sock.use { s ->
  636.                 sendOverSocket(s, file, size, onProgress)
  637.             }
  638.         } finally {
  639.             cancelHandle?.dispose()
  640.         }
  641.     }
  642.  
  643.     /**
  644.      * Standard (active) DCC CHAT: we listen on a port in portMin..portMax and accept when peer connects.
  645.      * Returns the connected socket.
  646.      */
  647.     suspend fun startChat(
  648.         portMin: Int,
  649.         portMax: Int,
  650.         onClient: suspend (ipAsInt: Long, port: Int) -> Unit
  651.     ): Socket = withContext(Dispatchers.IO) {
  652.         val ss = bindFirstAvailable(portMin, portMax)
  653.         try {
  654.             val ipInt = localIpv4AsInt()
  655.             onClient(ipInt, ss.localPort)
  656.             ss.soTimeout = 45_000
  657.             val sock = try {
  658.                 ss.accept()
  659.             } catch (_: java.net.SocketTimeoutException) {
  660.                 throw RuntimeException("DCC CHAT timed out waiting for peer to connect")
  661.             }
  662.             sock.tcpNoDelay = true
  663.             sock.keepAlive = true
  664.             sock
  665.         } finally {
  666.             runCatching { ss.close() }
  667.         }
  668.     }
  669.  
  670.     /** Standard (active) DCC CHAT: connect to the offered ip:port and return the connected socket. */
  671.     suspend fun connectChat(offer: DccChatOffer): Socket = withContext(Dispatchers.IO) {
  672.         val rawSock = Socket(offer.ip, offer.port)
  673.         val sock = if (offer.secure) wrapTls(rawSock, offer.ip) else rawSock
  674.         sock.tcpNoDelay = true
  675.         sock.keepAlive = true
  676.         sock
  677.     }
  678.  
  679.     /**
  680.      * Receive bytes from [sock] into [outputStream], ACKing progress per the DCC convention.
  681.      *
  682.      * Throughput design:
  683.      *  - The socket reader and the storage writer run on separate threads, communicating via
  684.      *    a bounded queue. Without this, a slow MediaStore/SAF write stalls the socket read,
  685.      *    the TCP receive window collapses, and the sender throttles. With it, network reads
  686.      *    overlap with storage writes and the receive window stays open.
  687.      *  - The queue is bounded (16 × 256 KB ≈ 4 MB) so a genuinely slow storage layer still
  688.      *    backpressures the reader rather than ballooning RAM.
  689.      *  - 256 KB read buffer + 1 MB SO_RCVBUF reduce per-iteration overhead and let the
  690.      *    kernel buffer a meaningful BDP on higher-RTT links.
  691.      *  - DCC ACKs are sent from the reader as soon as bytes are *received* (not after they
  692.      *    hit disk). The wire-level contract is "we have these bytes" and we do — they're
  693.      *    in our process. This unblocks lockstep senders without waiting for storage I/O.
  694.      *  - [onProgress] is throttled to ~10× per second so we don't fire a StateFlow copy on
  695.      *    every 256 KB chunk. A final progress callback always fires after the loop so the
  696.      *    UI doesn't get stuck at, say, 99.7 %.
  697.      *
  698.      * @return the total number of bytes actually written. The caller compares this against
  699.      *   the advertised offer size to decide success vs. truncation (see [verifyCompleteOrCleanup]).
  700.      */
  701.     private fun receiveFromSocket(
  702.         sock: Socket,
  703.         outputStream: OutputStream,
  704.         expectedSize: Long,
  705.         turbo: Boolean,
  706.         onProgress: (Long, Long) -> Unit
  707.     ): Long {
  708.         sock.tcpNoDelay = true
  709.         // Best-effort; some platforms cap or ignore this. Not fatal if it fails.
  710.         runCatching { sock.receiveBufferSize = 1 * 1024 * 1024 }
  711.  
  712.         val ack64 = expectedSize > 0xFFFFFFFFL
  713.         val ackBuf = ByteArray(if (ack64) 8 else 4)
  714.         val expected: Long? = expectedSize.takeIf { it > 0L }
  715.  
  716.         // Hard ceiling for transfers without an advertised size: 8 GB. Without this,
  717.         // a malicious sender that omits the size field could keep writing forever and
  718.         // fill the device's storage. With a known size we still cap at the advertised
  719.         // size so a sender that lies (offers 1 KB then sends 10 GB) can't bypass.
  720.         val maxAccept: Long = expected ?: (8L * 1024 * 1024 * 1024)
  721.  
  722.         // Producer/consumer plumbing. Locals-shared-across-threads aren't @Volatile-able
  723.         // in Kotlin, so use the java.util.concurrent.atomic primitives.
  724.         val queue = java.util.concurrent.ArrayBlockingQueue<ByteArray>(16)
  725.         val writerError = java.util.concurrent.atomic.AtomicReference<Throwable?>(null)
  726.         val readerDone  = java.util.concurrent.atomic.AtomicBoolean(false)
  727.  
  728.         val writerThread = thread(start = true, isDaemon = true, name = "dcc-writer") {
  729.             try {
  730.                 // Drain until the reader signals done AND the queue is empty. Poll with a
  731.                 // short timeout so we notice `readerDone` even if the queue is empty.
  732.                 while (!readerDone.get() || queue.isNotEmpty()) {
  733.                     val chunk = queue.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS)
  734.                         ?: continue
  735.                     outputStream.write(chunk)
  736.                 }
  737.             } catch (t: Throwable) {
  738.                 writerError.set(t)
  739.             }
  740.         }
  741.  
  742.         var total = 0L
  743.         var lastProgressMs = 0L
  744.         // Cache once; SocketOutputStream is the same instance each call, but avoid the
  745.         // unchecked-cast / lookup in the hot path.
  746.         val ackOut = sock.getOutputStream()
  747.  
  748.         try {
  749.             sock.getInputStream().use { inp ->
  750.                 val buf = ByteArray(256 * 1024)
  751.                 while (true) {
  752.                     // Surface a writer-thread crash as soon as we notice it; don't keep
  753.                     // reading more bytes we can't drain.
  754.                     writerError.get()?.let { throw it }
  755.  
  756.                     val remaining = maxAccept - total
  757.                     if (remaining <= 0L) break
  758.                     val toRead = if (remaining < buf.size) remaining.toInt() else buf.size
  759.                     val n = inp.read(buf, 0, toRead)
  760.                     if (n <= 0) break
  761.  
  762.                     // Hand the chunk to the writer. Copy because `buf` is reused next iter.
  763.                     val chunk = buf.copyOf(n)
  764.                     // Bounded offer with timeout — backpressures the reader if storage is
  765.                     // slow, but lets us re-check writerError instead of blocking forever
  766.                     // if the writer thread has died.
  767.                     while (!queue.offer(chunk, 100, java.util.concurrent.TimeUnit.MILLISECONDS)) {
  768.                         writerError.get()?.let { throw it }
  769.                     }
  770.                     total += n
  771.  
  772.                     if (!turbo) {
  773.                         // ACK the running total of bytes RECEIVED (not bytes written to
  774.                         // disk). The DCC ACK semantic is wire-level acknowledgment.
  775.                         if (ack64) {
  776.                             ackBuf[0] = (total ushr 56).toByte()
  777.                             ackBuf[1] = (total ushr 48).toByte()
  778.                             ackBuf[2] = (total ushr 40).toByte()
  779.                             ackBuf[3] = (total ushr 32).toByte()
  780.                             ackBuf[4] = (total ushr 24).toByte()
  781.                             ackBuf[5] = (total ushr 16).toByte()
  782.                             ackBuf[6] = (total ushr  8).toByte()
  783.                             ackBuf[7] =  total.toByte()
  784.                         } else {
  785.                             val ackInt = total.coerceAtMost(0xFFFFFFFFL)
  786.                             ackBuf[0] = (ackInt ushr 24).toByte()
  787.                             ackBuf[1] = (ackInt ushr 16).toByte()
  788.                             ackBuf[2] = (ackInt ushr  8).toByte()
  789.                             ackBuf[3] =  ackInt.toByte()
  790.                         }
  791.                         runCatching { ackOut.write(ackBuf) }
  792.                     }
  793.  
  794.                     // Time-gated progress so the per-chunk StateFlow copy isn't in the
  795.                     // hot path. ~10 updates/sec is plenty for a smooth progress bar.
  796.                     val now = System.currentTimeMillis()
  797.                     if (now - lastProgressMs >= 100L) {
  798.                         lastProgressMs = now
  799.                         onProgress(total, expectedSize)
  800.                     }
  801.  
  802.                     if (expected != null && total >= expected) break
  803.                 }
  804.             }
  805.         } finally {
  806.             // Signal the writer to drain remaining chunks and exit; wait briefly. If it's
  807.             // stuck (e.g. the storage stream blocked indefinitely), interrupt — better to
  808.             // leak a thread once than to deadlock the receive coroutine.
  809.             readerDone.set(true)
  810.             writerThread.join(10_000)
  811.             if (writerThread.isAlive) {
  812.                 writerThread.interrupt()
  813.                 writerThread.join(2_000)
  814.             }
  815.             // If the writer errored late (while draining), surface that as the failure
  816.             // rather than reporting a clean total.
  817.             writerError.get()?.let { throw it }
  818.         }
  819.  
  820.         // Final progress beat so the UI lands on the actual total rather than the last
  821.         // throttled sample.
  822.         onProgress(total, expectedSize)
  823.         return total
  824.     }
  825.  
  826.     private fun sendOverSocket(
  827.         sock: Socket,
  828.         file: File,
  829.         size: Long,
  830.         onProgress: (Long, Long) -> Unit
  831.     ) {
  832.         sock.tcpNoDelay = true
  833.         // Used by the ACK reader thread.
  834.         sock.soTimeout = 1_000
  835.  
  836.         val acked = AtomicLong(0L)
  837.  
  838.         // ACK width must match what the receiver sends (see receiveFromSocket):
  839.         // 8-byte (64-bit) ACKs for files larger than a 32-bit value, else 4-byte.
  840.         // Reading the wrong width desyncs the ACK stream and breaks completion
  841.         // detection for transfers above 4 GiB.
  842.         val ack64 = size > 0xFFFFFFFFL
  843.         val ackWidth = if (ack64) 8 else 4
  844.  
  845.         fun u32be(b: ByteArray): Long =
  846.             (ByteBuffer.wrap(b, 0, 4).order(ByteOrder.BIG_ENDIAN).int.toLong() and 0xFFFFFFFFL)
  847.  
  848.         fun u32le(b: ByteArray): Long =
  849.             (ByteBuffer.wrap(b, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int.toLong() and 0xFFFFFFFFL)
  850.  
  851.         fun u64be(b: ByteArray): Long =
  852.             ByteBuffer.wrap(b, 0, 8).order(ByteOrder.BIG_ENDIAN).long
  853.  
  854.         fun u64le(b: ByteArray): Long =
  855.             ByteBuffer.wrap(b, 0, 8).order(ByteOrder.LITTLE_ENDIAN).long
  856.  
  857.         fun chooseAck(be: Long, le: Long, last: Long): Long? {
  858.             // DCC ACKs are commonly network byte order, but some clients historically send host order.
  859.             // We choose the value that is monotonic and plausible for this transfer size.
  860.             val cands = sequenceOf(be, le).distinct().filter { it >= last }.toList()
  861.             if (cands.isEmpty()) return null
  862.             if (size > 0L) {
  863.                 // Prefer candidates <= size.
  864.                 val inRange = cands.filter { it <= size }
  865.                 if (inRange.isNotEmpty()) return inRange.maxOrNull()
  866.  
  867.                 // Some clients may overshoot slightly; clamp.
  868.                 val near = cands.filter { it <= size + 1024 * 1024L }
  869.                 if (near.isNotEmpty()) return size
  870.             }
  871.             return cands.maxOrNull()
  872.         }
  873.  
  874.         val ackThread = thread(start = true, isDaemon = true, name = "dcc-ack-reader") {
  875.             val inp = sock.getInputStream()
  876.             val b = ByteArray(ackWidth)
  877.             var off = 0
  878.             var last = 0L
  879.             while (!Thread.currentThread().isInterrupted) {
  880.                 try {
  881.                     val n = inp.read(b, off, ackWidth - off)
  882.                     if (n < 0) break
  883.                     off += n
  884.                     if (off == ackWidth) {
  885.                         val be = if (ack64) u64be(b) else u32be(b)
  886.                         val le = if (ack64) u64le(b) else u32le(b)
  887.                         val chosen = chooseAck(be, le, last)
  888.                         if (chosen != null) {
  889.                             last = chosen
  890.                             acked.set(chosen)
  891.                         }
  892.                         off = 0
  893.                     }
  894.                 } catch (_: java.net.SocketTimeoutException) {
  895.                     // keep polling
  896.                 } catch (_: Throwable) {
  897.                     break
  898.                 }
  899.             }
  900.         }
  901.  
  902.         var sent = 0L
  903.         try {
  904.             val outRaw = sock.getOutputStream()
  905.             val out = BufferedOutputStream(outRaw, 64 * 1024)
  906.             val buf = ByteArray(32 * 1024)
  907.  
  908.             file.inputStream().use { fin ->
  909.                 while (true) {
  910.                     val n = fin.read(buf)
  911.                     if (n <= 0) break
  912.                     try {
  913.                         out.write(buf, 0, n)
  914.                         sent += n
  915.                         onProgress(sent, size)
  916.                     } catch (io: IOException) {
  917.                         // If the peer already ACKed the full size, treat as success.
  918.                         if (size > 0L && acked.get() >= size) break
  919.                         throw io
  920.                     }
  921.                 }
  922.             }
  923.             out.flush()
  924.  
  925.             // Half-close so receiver sees EOF; then wait briefly for final ACK/peer close.
  926.             runCatching { sock.shutdownOutput() }
  927.  
  928.             val deadline = System.currentTimeMillis() + 10_000L
  929.             while (size > 0L && acked.get() < size && System.currentTimeMillis() < deadline) {
  930.                 // If the receiver closed, the ACK thread will stop.
  931.                 if (!ackThread.isAlive) break
  932.                 Thread.sleep(50)
  933.             }
  934.         } finally {
  935.             runCatching { ackThread.interrupt() }
  936.         }
  937. }
  938.  
  939.     private fun bindFirstAvailable(min: Int, max: Int): ServerSocket {
  940.         val a = min.coerceIn(1, 65535)
  941.         val b = max.coerceIn(1, 65535)
  942.         for (p in a..b) {
  943.             try {
  944.                 return ServerSocket(p)
  945.             } catch (_: Throwable) {
  946.                 // try next
  947.             }
  948.         }
  949.         throw IllegalStateException("No free port in $a..$b")
  950.     }
  951.  
  952.     private fun ipv4ToLongBestEffort(ip: String): Long {
  953.         val parts = ip.split(".")
  954.         if (parts.size != 4) return 0L
  955.         return try {
  956.             var out = 0L
  957.             for (p in parts) out = (out shl 8) or (p.toLong() and 0xFFL)
  958.             out
  959.         } catch (_: Throwable) {
  960.             0L
  961.         }
  962.     }
  963.  
  964. }

Raw Paste

Comments 0
Login to post a comment.
  • No comments yet. Be the first.
Login to post a comment. Login or Register
We use cookies. To comply with GDPR in the EU and the UK we have to show you these.

We use cookies and similar technologies to keep this website functional (including spam protection via Google reCAPTCHA or Cloudflare Turnstile), and — with your consent — to measure usage and show ads. See Privacy.