Kafka uses Ktor as its HTTP client for making API requests to Archive.org and other backend services. Ktor provides a multiplatform, coroutine-based networking solution with powerful plugin support.
Network Configuration
The NetworkingComponent provides the HTTP client configuration:
core/networking/src/main/java/com/kafka/networking/NetworkingComponent.kt
interface NetworkingComponent {
@Provides
@ApplicationScope
fun provideHttpClient(json: Json): HttpClient = HttpClient {
install(ContentNegotiation) {
json(json)
}
install(HttpTimeout) {
requestTimeoutMillis = Config.API_TIMEOUT
connectTimeoutMillis = Config.API_TIMEOUT
socketTimeoutMillis = Config.API_TIMEOUT
}
install(Logging) {
logger = object : Logger {
override fun log(message: String) {
debug { message }
}
}
level = LogLevel.ALL
}
defaultRequest {
accept(ContentType.parse("application/json"))
}
}
object Config {
val API_TIMEOUT = 40.seconds.inWholeMilliseconds
}
}
Request timeout in milliseconds (40 seconds)
Ktor Plugins
Content Negotiation
Automatically serializes and deserializes JSON using kotlinx.serialization:
install(ContentNegotiation) {
json(json)
}
The JSON configuration is provided separately:
@Provides
@ApplicationScope
fun jsonConfigured(serializersModule: SerializersModule) = Json {
ignoreUnknownKeys = true
useAlternativeNames = false
isLenient = true
prettyPrint = true
encodeDefaults = true
this.serializersModule = serializersModule
}
Ignores unknown JSON fields instead of throwing exceptions
Allows parsing of non-standard JSON (e.g., unquoted keys)
Formats JSON output with indentation for debugging
HTTP Timeout
Configures timeouts for all requests:
install(HttpTimeout) {
requestTimeoutMillis = Config.API_TIMEOUT // Total request timeout
connectTimeoutMillis = Config.API_TIMEOUT // Connection timeout
socketTimeoutMillis = Config.API_TIMEOUT // Socket timeout
}
Logging
Logs all HTTP requests and responses:
install(Logging) {
logger = object : Logger {
override fun log(message: String) {
debug { message } // Uses Kafka's debug logging
}
}
level = LogLevel.ALL // Logs headers, body, and everything
}
Logging level: NONE, INFO, HEADERS, BODY, or ALL
Default Request
Sets default headers for all requests:
defaultRequest {
accept(ContentType.parse("application/json"))
}
Serialization
Ktor uses kotlinx.serialization with custom polymorphic handling:
core/networking/src/main/java/com/kafka/networking/SerializationPolymorphicDefaultPair.kt
data class SerializationPolymorphicDefaultPair<T : Any>(
val base: KClass<T>,
val default: T
)
The serializers module supports polymorphic deserialization:
@Suppress("UNCHECKED_CAST")
@OptIn(InternalSerializationApi::class)
@Provides
@ApplicationScope
fun provideSerializersModule(
polymorphicDefaultPairs: Set<SerializationPolymorphicDefaultPair<*>>,
): SerializersModule = SerializersModule {
polymorphicDefaultPairs.forEach { (base, default) ->
polymorphicDefaultDeserializer(base as KClass<Any>) {
default.serializer()
}
}
}
Making Requests
GET Request
val response: ItemResponse = client.get("https://archive.org/metadata/$itemId") {
parameter("output", "json")
}
POST Request
val response: SubmitResponse = client.post("https://api.kafka.com/submit") {
contentType(ContentType.Application.Json)
setBody(SubmitRequest(data = "example"))
}
Handling Responses
try {
val items: List<Item> = client.get("https://api.kafka.com/items").body()
// Success
} catch (e: ClientRequestException) {
// 4xx errors
println("Client error: ${e.response.status}")
} catch (e: ServerResponseException) {
// 5xx errors
println("Server error: ${e.response.status}")
} catch (e: IOException) {
// Network errors
println("Network error: ${e.message}")
}
Error Handling
core/networking/src/main/java/com/kafka/networking/ErrorMessages.kt
object NetworkErrorMessages {
const val NETWORK_ERROR = "Network error occurred"
const val TIMEOUT_ERROR = "Request timed out"
const val SERVER_ERROR = "Server error occurred"
const val UNKNOWN_ERROR = "Unknown error occurred"
}
fun Throwable.toErrorMessage(): String = when (this) {
is ClientRequestException -> "Client error: ${response.status}"
is ServerResponseException -> NetworkErrorMessages.SERVER_ERROR
is HttpRequestTimeoutException -> NetworkErrorMessages.TIMEOUT_ERROR
is IOException -> NetworkErrorMessages.NETWORK_ERROR
else -> NetworkErrorMessages.UNKNOWN_ERROR
}
Repository Pattern
Networking is typically used through repository classes:
class ItemRepository @Inject constructor(
private val client: HttpClient,
private val itemDao: ItemDao
) {
suspend fun getItem(itemId: String): Result<Item> {
return try {
val response: ItemResponse = client.get(
"https://archive.org/metadata/$itemId"
) {
parameter("output", "json")
}
val item = response.toItem()
itemDao.insert(item) // Cache locally
Result.success(item)
} catch (e: Exception) {
// Try local cache
itemDao.getOrNull(itemId)?.let {
Result.success(it)
} ?: Result.failure(e)
}
}
suspend fun searchItems(query: String): Result<List<Item>> {
return try {
val response: SearchResponse = client.get(
"https://archive.org/advancedsearch.php"
) {
parameter("q", query)
parameter("output", "json")
parameter("rows", 50)
}
Result.success(response.docs)
} catch (e: Exception) {
Result.failure(e)
}
}
}
Streaming Support
Ktor supports streaming for large files:
suspend fun downloadFile(url: String, outputFile: File) {
client.prepareGet(url).execute { response ->
val channel = response.bodyAsChannel()
outputFile.outputStream().use { output ->
while (!channel.isClosedForRead) {
val packet = channel.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
while (!packet.isEmpty) {
val bytes = packet.readBytes()
output.write(bytes)
}
}
}
}
}
Caching Strategy
Kafka implements a cache-first strategy:
- Check local database for cached data
- Make network request if cache is empty or stale
- Update cache with fresh data
- Return result to UI
suspend fun <T> fetchWithCache(
fetchFromNetwork: suspend () -> T,
fetchFromCache: suspend () -> T?,
saveToCache: suspend (T) -> Unit
): Result<T> {
// Try cache first
fetchFromCache()?.let { return Result.success(it) }
// Fetch from network
return try {
val data = fetchFromNetwork()
saveToCache(data)
Result.success(data)
} catch (e: Exception) {
Result.failure(e)
}
}
Dependencies
Gradle configuration:
implementation(libs.ktor.client.core)
implementation(libs.ktor.client.contentnegotiation)
implementation(libs.ktor.client.java) // Android engine
implementation(libs.ktor.client.logging)
implementation(libs.ktor.serialization)
gradle/libs.versions.toml
[versions]
ktor = "3.4.0"
[libraries]
ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" }
ktor-client-contentnegotiation = { module = "io.ktor:ktor-client-content-negotiation", version.ref = "ktor" }
ktor-client-java = { module = "io.ktor:ktor-client-android", version.ref = "ktor" }
ktor-client-logging = { module = "io.ktor:ktor-client-logging", version.ref = "ktor" }
ktor-serialization = { module = "io.ktor:ktor-serialization-kotlinx-json", version.ref = "ktor" }
Ktor uses different HTTP engines per platform:
- Android: Android/Java engine (optimized for Android)
- iOS: Darwin engine (uses NSURLSession)
- Desktop: Java engine (uses java.net.HttpClient)
Features
- Multiplatform: Works on Android, iOS, and Desktop
- Coroutines: Native coroutine support for async operations
- Type-Safe: Kotlin-first API with type safety
- Plugins: Extensible plugin system
- Streaming: Supports streaming large files
- WebSockets: Built-in WebSocket support (if needed)
- Auto-Retry: Can be configured with retry logic
- Connection Pooling: Automatic connection reuse
- Compression: Automatic gzip/deflate support
Best Practices
- Use Repository Pattern: Separate network logic from UI
- Handle Errors: Always catch and handle exceptions
- Cache Data: Cache responses in local database
- Timeout Configuration: Set appropriate timeouts
- Logging: Use logging for debugging (disable in production)
- Content Type: Always set correct Content-Type headers
- Cancellation: Support coroutine cancellation for requests