Skip to main content
Kafka’s data layer implements the Repository Pattern with Room database for local persistence and Ktor for remote data sources. The architecture ensures a single source of truth while supporting offline-first functionality.

Data Layer Overview

┌─────────────────────────────────────────────────────────────┐
│                     Domain Layer                             │
│              (Interactors & Observers)                       │
└─────────────────────┬───────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                   Repositories                               │
│          (Single Source of Truth)                            │
└──────────┬─────────────────────────┬────────────────────────┘
           │                         │
           ▼                         ▼
    ┌──────────────┐         ┌──────────────┐
    │  Local Data  │         │ Remote Data  │
    │    Source    │         │    Source    │
    │              │         │              │
    │  Room DAOs   │         │ Ktor Client  │
    │  DataStore   │         │  Firestore   │
    └──────────────┘         └──────────────┘

Repository Pattern

Repository Characteristics

Single Source of Truth

Repositories expose a single Flow that combines local and remote data

Offline First

Data is cached locally and displayed immediately while refreshing from network

Reactive

All data exposed as Kotlin Flows for reactive updates

Scoped

Repositories are @ApplicationScope singletons

Example Repository Implementation

package com.kafka.data.feature.homepage

@ApplicationScope
class HomepageRepository @Inject constructor(
    private val firestoreGraph: FirestoreGraph,
    private val homepageMapper: HomepageMapper,
    private val userDataRepository: UserDataRepository,
    private val serializerModule: SerializersModule
) {
    // Reactive flow from Firestore
    fun observeHomepageCollection() =
        firestoreGraph.homepageCollection.snapshots
            .flatMapLatest { it.toHomepage(userDataRepository.getUserCountry()) }

    // Suspending function for one-time fetch
    suspend fun getHomepageIds() = 
        firestoreGraph.homepageCollection.get()
            .documents
            .map { documentSnapshot -> documentSnapshot.getHomepageData() }
            .filter { it.enabled }
            .sortedBy { it.index }
            .mapNotNull { collection ->
                when (collection) {
                    is HomepageCollectionResponse.Column -> collection.itemIds.split(", ")
                    is HomepageCollectionResponse.Row -> collection.itemIds.split(", ")
                    is HomepageCollectionResponse.Grid -> collection.itemIds.split(", ")
                    else -> null
                }
            }
            .distinct()
            .toList()

    private fun QuerySnapshot.toHomepage(country: String?) =
        documents
            .map { it.getHomepageData() }
            .filter { it.enabled }
            .filter { it.filterByTopics(country) }
            .sortedBy { it.index }
            .run { homepageMapper.map(this) }
            .map { it.toList() }
}

Room Database

Kafka uses Room for local data persistence with a well-structured database schema.

Database Configuration

package com.kafka.data.db

@Database(
    entities = [
        ItemDetail::class,
        File::class,
        Item::class,
        QueueEntity::class,
        RecentSearch::class,
        RecentTextItem::class,
        RecentAudioItem::class,
        DownloadRequest::class,
    ],
    version = 9,
    exportSchema = true,
    autoMigrations = [
        AutoMigration(from = 3, to = 4, spec = UserRemovalMigration::class),
        AutoMigration(from = 4, to = 5, spec = RecentAudioMigration::class),
        AutoMigration(from = 7, to = 8),
        AutoMigration(from = 8, to = 9, spec = DownloadRequestsMigration::class),
    ],
)
@ConstructedBy(KafkaDatabaseConstructor::class)
@TypeConverters(AppTypeConverters::class)
abstract class KafkaRoomDatabase : RoomDatabase(), KafkaDatabase {
    @DeleteTable(tableName = "user")
    class UserRemovalMigration : AutoMigrationSpec
    
    @RenameColumn(tableName = "recent_audio", fromColumnName = "fileId", toColumnName = "albumId")
    class RecentAudioMigration : AutoMigrationSpec
    
    @DeleteColumn(tableName = "download_requests", columnName = "created_at")
    @DeleteColumn(tableName = "Item", columnName = "genre")
    class DownloadRequestsMigration : AutoMigrationSpec
}

interface KafkaDatabase {
    fun itemDetailDao(): ItemDetailDao
    fun fileDao(): FileDao
    fun itemDao(): ItemDao
    fun recentSearchDao(): RecentSearchDao
    fun recentTextDao(): RecentTextDao
    fun recentAudioDao(): RecentAudioDao
    fun downloadRequestsDao(): DownloadRequestsDao
}

Entity Definitions

package com.kafka.data.entities

@Entity
data class Item(
    @PrimaryKey val itemId: String = "",
    @Embedded(prefix = "creator_") val creator: Creator? = null,
    val language: List<String>? = null,
    val title: String? = null,
    val description: String? = null,
    val mediaType: String? = null,
    val coverImage: String? = null,
    val collection: List<String>? = null,
    val subject: String? = null,
    val uploader: String? = null,
    val position: Int = 0,
    val rating: Double? = null,
) : BaseEntity {
    val isAudio: Boolean
        get() = mediaType == "audio"

    val isInappropriate: Boolean
        get() = collection?.contains("no-preview") == true
}

data class Creator(val id: String, val name: String)

DAO Implementations

DAOs extend EntityDao for common CRUD operations.
interface EntityDao<in E : BaseEntity> {
    @Insert(onConflict = OnConflictStrategy.REPLACE)
    suspend fun insert(entity: E): Long

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    suspend fun insertAll(vararg entity: E)

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    suspend fun insertAll(entities: List<E>)

    @Update(onConflict = OnConflictStrategy.REPLACE)
    suspend fun update(entity: E)
}

Remote Data Sources

Ktor Client for Archive.org API

interface ArchiveService {
    suspend fun query(query: String): SearchResponse
    suspend fun detail(itemId: String): ItemDetailResponse
    suspend fun search(
        query: String,
        page: Int = 1,
        rows: Int = 50
    ): SearchResponse
}

@ApplicationScope
class ArchiveServiceImpl @Inject constructor(
    private val httpClient: HttpClient,
    private val baseUrl: String
) : ArchiveService {
    
    override suspend fun query(query: String): SearchResponse {
        return httpClient.get("$baseUrl/advancedsearch.php") {
            parameter("q", query)
            parameter("output", "json")
            parameter("rows", 50)
        }.body()
    }

    override suspend fun detail(itemId: String): ItemDetailResponse {
        return httpClient.get("$baseUrl/metadata/$itemId") {
            parameter("output", "json")
        }.body()
    }
}

Firestore for Remote Config

@ApplicationScope
class FirestoreGraph @Inject constructor(
    private val firestore: FirebaseFirestore
) {
    val homepageCollection: CollectionReference
        get() = firestore.collection("homepage_v2")

    val configCollection: CollectionReference
        get() = firestore.collection("config")

    val recommendationsCollection: CollectionReference
        get() = firestore.collection("recommendations")
}

DataStore for Preferences

Kafka uses DataStore for type-safe preference storage.
@ApplicationScope
class PreferencesStore @Inject constructor(
    private val dataStore: DataStore<Preferences>
) {
    private val autoDownloadKey = booleanPreferencesKey("auto_download")
    private val useOnlineReaderKey = booleanPreferencesKey("use_online_reader")
    private val themeKey = stringPreferencesKey("theme")

    val autoDownload: Flow<Boolean> = dataStore.data
        .map { it[autoDownloadKey] ?: false }

    suspend fun setAutoDownload(enabled: Boolean) {
        dataStore.edit { it[autoDownloadKey] = enabled }
    }

    val useOnlineReader: Flow<Boolean> = dataStore.data
        .map { it[useOnlineReaderKey] ?: false }

    suspend fun setUseOnlineReader(enabled: Boolean) {
        dataStore.edit { it[useOnlineReaderKey] = enabled }
    }
}

Data Mapping

Mappers transform data between layers (network → database → domain).
@ApplicationScope
class ItemMapper @Inject constructor() {
    fun map(doc: Doc): Item {
        return Item(
            itemId = doc.identifier,
            title = doc.title,
            description = doc.description,
            creator = doc.creator?.let { Creator(it, it) },
            mediaType = doc.mediatype,
            coverImage = doc.identifier?.let { buildCoverUrl(it) },
            language = doc.language,
            collection = doc.collection,
            subject = doc.subject?.firstOrNull(),
            rating = doc.downloads?.toDoubleOrNull()
        )
    }

    private fun buildCoverUrl(itemId: String) = 
        "https://archive.org/services/img/$itemId"
}

Data Flow Example

Complete data flow from UI to database:
User Action (Homepage Screen)
      |

ViewModel.updateItems()
      |

UpdateHomepage Interactor
      |

HomepageRepository.getHomepageIds()
      |

Firestore (Remote)
      |

ItemRepository.updateQuery()
      |

ArchiveService (Ktor)
      |

ItemMapper.map()
      |

ItemDao.insertAll()
      |

Room Database
      |

ItemDao.observe() (Flow)
      |

ObserveHomepage (Observer)
      |

ViewModel.state (StateFlow)
      |

Composable UI

Best Practices

Single Source of Truth

Always read from database, never directly from network

Offline First

Cache data locally and display immediately

Flow-Based

Use Flow for reactive data streams

Coroutine Context

Use withContext(dispatchers.io) for database operations

Mappers

Transform data at repository layer

Error Handling

Handle errors in repositories, expose via Flow

Build docs developers (and LLMs) love