Kafka uses the Fetch2 library (Android 14 compatible fork) to manage file downloads from Archive.org. The download manager handles queuing, progress tracking, pause/resume, and background downloads for both audio and text files.
Architecture Overview
The download system consists of several layers:
- Downloader: High-level interface for download operations
- DownloaderImpl: Main implementation with business logic
- FetchDownloadManager: Wrapper around Fetch2 library
- DownloadRequestsDao: Database layer for tracking downloads
Downloader Interface
The Downloader interface defines the core download operations:
core/downloader/src/commonMain/kotlin/tm/alashow/datmusic/downloader/Downloader.kt
interface Downloader {
val newDownloadId: Flow<String>
val downloaderEvents: Flow<DownloaderEvent>
val downloadLocation: Flow<String?>
val hasDownloadsLocation: Flow<Boolean>
suspend fun enqueueFile(fileId: String): Boolean
suspend fun enqueueFile(file: FileEntity): Boolean
suspend fun pause(vararg downloadInfoIds: Int)
suspend fun resume(vararg downloadInfoIds: Int)
suspend fun cancel(vararg downloadInfoIds: Int)
suspend fun retry(vararg downloadInfoIds: Int)
suspend fun delete(vararg downloadInfoIds: Int)
fun requestNewDownloadsLocation()
suspend fun setDownloadsLocation(uri: String?)
suspend fun resetDownloadsLocation()
companion object {
const val DOWNLOADS_LOCATION = "downloads_location"
}
}
DownloaderImpl
The main implementation handles download logic, validation, and error handling:
core/downloader/src/main/java/tm/alashow/datmusic/downloader/DownloaderImpl.kt
@ApplicationScope
class DownloaderImpl @Inject constructor(
private val appContext: Application,
private val fetcher: FetchDownloadManager,
private val downloadInfoMapper: DownloadInfoMapper,
private val preferences: PreferencesStore,
private val repo: DownloadRequestsDao,
private val fileDao: FileDao,
private val analytics: Analytics,
private val snackbarManager: SnackbarManager,
) : Downloader {
override suspend fun enqueueFile(file: FileEntity): Boolean {
debug { "Enqueue audio: $file" }
val downloadRequest = DownloadRequest.fromAudio(file)
// Validate request
if (!validateNewAudioRequest(downloadRequest)) {
return false
}
// Save file to database
fileDao.insert(file)
// Get download destination
val fileDestination = getAudioDownloadFileDestination(file)
if (fileDestination == null) {
pendingEnqueableAudio = file
return false
}
// Validate URL
if (file.downloadUrl == null) {
downloaderMessage(AudioDownloadErrorInvalidUrl)
return false
}
// Build download URL
val downloadUrl = Uri.parse(file.downloadUrl).buildUpon()
.appendQueryParameter("redirect", "")
.build()
.toString()
val fetchRequest = Request(downloadUrl, fileDestination.uri)
return when (val enqueueResult = enqueueDownloadRequest(downloadRequest, fetchRequest)) {
is DownloadEnqueueSuccessful -> {
newDownloadIdState.send(downloadRequest.id)
true
}
is DownloadEnqueueFailed -> {
errorLog { enqueueResult.toString() }
downloaderEvent(DownloaderEvent.DownloaderFetchError(enqueueResult.error))
false
}
}
}
}
The file entity containing download URL and metadata
Internal representation of a download request with tracking information
Request Validation
The downloader validates requests to prevent duplicates and handle existing downloads:
private suspend fun validateNewAudioRequest(
downloadRequest: DownloadRequest
): Boolean {
val existingRequest = repo.exists(downloadRequest.id)
if (existingRequest > 0) {
val oldRequest = repo.entry(downloadRequest.id).first()
val downloadInfo = fetcher.getDownload(oldRequest.requestId)
if (downloadInfo != null) {
when (downloadInfo.status) {
Status.FAILED, Status.CANCELLED -> {
fetcher.delete(downloadInfo.id)
repo.delete(oldRequest.id)
return true // Allow re-download
}
Status.PAUSED -> {
fetcher.resume(oldRequest.requestId)
downloaderMessage(AudioDownloadResumedExisting)
return false // Resume existing
}
Status.QUEUED, Status.DOWNLOADING -> {
downloaderMessage(AudioDownloadAlreadyQueued)
return false // Already downloading
}
Status.COMPLETED -> {
val fileExists = downloadInfo.fileUri
.toDocumentFile(appContext)
.exists()
return if (!fileExists) {
fetcher.delete(downloadInfo.id)
repo.delete(oldRequest.id)
true // Re-download missing file
} else {
false // File exists, do nothing
}
}
}
}
}
return true // Allow new download
}
FetchDownloadManager
The FetchDownloadManager wraps the Fetch2 library with coroutine-based APIs:
core/downloader/src/main/java/tm/alashow/datmusic/downloader/manager/FetchDownloadManager.kt
class FetchDownloadManager @Inject constructor(
private val fetch: Fetch,
) : DownloadManager<Int, Request, Status, Download> {
override suspend fun enqueue(
request: Request
): DownloadEnqueueResult<Request> = suspendCoroutine { continuation ->
fetch.enqueue(
request,
{ request ->
continuation.resume(DownloadEnqueueSuccessful(request))
},
{ error ->
continuation.resume(
DownloadEnqueueFailed(
error.throwable ?: IOException(
"Download error: ${error.name}, code=${error.value}"
)
)
)
}
)
}
override suspend fun getDownload(id: Int): Download? =
suspendCoroutine { continuation ->
fetch.getDownload(id) { continuation.resume(it) }
}
override suspend fun pause(ids: List<Int>) {
fetch.pause(ids)
}
override suspend fun resume(ids: List<Int>) {
fetch.resume(ids)
}
override suspend fun cancel(ids: List<Int>) {
fetch.cancel(ids)
}
override suspend fun delete(ids: List<Int>) {
fetch.delete(ids)
}
}
Download Events
The system uses a Flow-based event system for UI updates:
core/downloader/src/commonMain/kotlin/tm/alashow/datmusic/downloader/DownloaderEvent.kt
sealed class DownloaderEvent {
object ChooseDownloadsLocation : DownloaderEvent()
data class DownloaderFetchError(val error: Throwable) : DownloaderEvent()
}
fun createFetchListener(fetch: Fetch): Flow<Downloadable?> = callbackFlow {
val fetchListener = object : AbstractFetchListener() {
override fun onAdded(download: Download) {
trySend(DownloadAdded(download))
}
override fun onProgress(
download: Download,
etaInMilliSeconds: Long,
downloadedBytesPerSecond: Long
) {
trySend(DownloadProgress(download, etaInMilliSeconds, downloadedBytesPerSecond))
}
override fun onCompleted(download: Download) {
trySend(DownloadCompleted(download))
}
override fun onError(download: Download, error: Error, throwable: Throwable?) {
trySend(DownloadError(download, error, throwable))
}
}
fetch.addListener(fetchListener)
awaitClose { fetch.removeListener(fetchListener) }
}
Download Location Management
Kafka uses Android’s Storage Access Framework (SAF) for download location:
private val downloadsLocationUri = preferences.get(DOWNLOADS_LOCATION, "").map {
when {
it.isEmpty() -> Optional.empty()
else -> Optional.of(it.toUri())
}
}
override suspend fun setDownloadsLocation(uri: String?) {
if (uri == null) {
errorLog { "Downloads URI is null" }
downloaderMessage(DownloadsUnknownError)
return
}
analytics.log { setDownloadLocation(uri.toString()) }
// Take persistable URI permission
appContext.contentResolver.takePersistableUriPermission(
uri.toUri(),
Intent.FLAG_GRANT_READ_URI_PERMISSION or Intent.FLAG_GRANT_WRITE_URI_PERMISSION
)
preferences.save(DOWNLOADS_LOCATION, uri.toString())
// Process pending downloads
pendingEnqueableAudio?.apply {
debug { "Consuming pending enqueuable audio download" }
enqueueFile(this)
pendingEnqueableAudio = null
}
}
The SAF URI for the downloads folder, obtained from user via folder picker
Download Models
core/downloader/src/commonMain/kotlin/tm/alashow/datmusic/downloader/DownloadItem.kt
data class DownloadRequest(
val id: String,
val requestId: Int,
val entityId: String,
val entityType: String
) {
companion object {
fun fromAudio(file: FileEntity) = DownloadRequest(
id = file.fileId,
requestId = 0, // Set by Fetch
entityId = file.fileId,
entityType = "file"
)
}
}
data class FileDownloadItem(
val downloadRequest: DownloadRequest,
val downloadInfo: DownloadInfo
) {
companion object {
fun from(
downloadRequest: DownloadRequest,
downloadInfo: DownloadInfo
) = FileDownloadItem(downloadRequest, downloadInfo)
}
}
Download Messages
core/downloader/src/commonMain/kotlin/tm/alashow/datmusic/downloader/DownloadMessage.kt
sealed class DownloadMessage<T> {
data class Error(val error: Throwable) : DownloadMessage<Throwable>()
}
object AudioDownloadQueued : DownloadMessage<Unit>()
object AudioDownloadAlreadyQueued : DownloadMessage<Unit>()
object AudioDownloadResumedExisting : DownloadMessage<Unit>()
object AudioDownloadErrorInvalidUrl : DownloadMessage<Unit>()
object AudioDownloadErrorFileCreate : DownloadMessage<Unit>()
object DownloadsFolderNotFound : DownloadMessage<Unit>()
Features
Queue Management
- Multiple Downloads: Download multiple files simultaneously
- Priority Queue: Downloads processed in FIFO order
- Auto-Retry: Automatic retry on network failures
- Chunked Downloads: Large files downloaded in chunks
Progress Tracking
- Real-time Progress: Live progress updates via Flow
- Speed Calculation: Download speed in bytes/second
- ETA Calculation: Estimated time remaining
- Percentage Complete: Progress as percentage
State Management
- Pause/Resume: Pause and resume downloads
- Cancel: Cancel ongoing downloads
- Delete: Remove completed downloads
- Retry: Retry failed downloads
Background Operation
- Foreground Service: Runs in foreground service for reliability
- Notifications: Shows progress in notification
- Battery Optimization: Respects battery optimization settings
- Network Handling: Pauses on network loss, resumes when available
Dependencies
gradle/libs.versions.toml
[libraries]
fetch = "com.github.abbas7777:fetch2-android14:3.2.0-beta07"
This is a fork of the original Fetch2 library with Android 14 compatibility fixes.
Usage Example
class DownloadViewModel @Inject constructor(
private val downloader: Downloader
) : ViewModel() {
fun downloadFile(file: FileEntity) {
viewModelScope.launch {
val success = downloader.enqueueFile(file)
if (success) {
println("Download started: ${file.title}")
}
}
}
fun observeDownloads() {
downloader.downloaderEvents.collect { event ->
when (event) {
is DownloaderEvent.ChooseDownloadsLocation -> {
// Show folder picker
}
is DownloaderEvent.DownloaderFetchError -> {
// Show error message
}
}
}
}
}