Kafka implements the MVVM (Model-View-ViewModel) pattern with an additional Interactor layer for business logic. This architecture is inspired by Tivi and provides clear separation between UI, business logic, and data layers.
MVVM + Interactors Pattern
Architecture Layers
View (Composable)
↕️ (StateFlow)
ViewModel
↕️ (invoke/Flow)
Interactors/Observers
↕️ (Flow)
Repository
↕️ (Flow)
Data Source (Room/API)
Key Difference : Instead of ViewModels directly calling repositories, they use Interactors (for write operations) and Observers (for read operations) to encapsulate business logic.
ViewModels
ViewModels manage UI state and handle user actions. They are injected using kotlin-inject and expose state via StateFlow.
Typical ViewModel Structure
HomepageViewModel
ViewState
Usage in Compose
class HomepageViewModel @Inject constructor (
observeHomepage: ObserveHomepage , // Observer for data
observeUser: ObserveUser ,
private val updateHomepage: UpdateHomepage , // Interactor for updates
private val removeRecentItem: RemoveRecentItem ,
private val navigator: Navigator ,
private val analytics: Analytics ,
private val loadingCounter: ObservableLoadingCounter
) : ViewModel () {
private val uiMessageManager = UiMessageManager ()
// Combine multiple flows into single UI state
val state: StateFlow < HomepageViewState > = combine (
observeHomepage.flow,
observeUser.flow,
loadingCounter.observable,
uiMessageManager.message,
:: HomepageViewState
). stateInDefault (scope = viewModelScope, initialValue = HomepageViewState ())
init {
// Start observing data
observeHomepage (Unit)
observeUser (ObserveUser. Params ())
// Trigger initial data load
updateItems ()
}
private fun updateItems () {
viewModelScope. launch {
updateHomepage (Unit). collectStatus (loadingCounter, uiMessageManager)
}
}
fun openItemDetail (itemId: String , collection: String ?) {
analytics. log { openItemDetail (itemId, collection) }
navigator. navigate (Screen. ItemDetail (itemId))
}
fun removeRecentItem (fileId: String ) {
viewModelScope. launch {
removeRecentItem. invoke (fileId). collect ()
}
}
}
ViewModel Characteristics
State Management
Single StateFlow<ViewState> for UI state
Combine multiple flows using combine()
Immutable state classes
User Actions
Public functions for user interactions
Launch coroutines in viewModelScope
Analytics logging for user events
Navigation
Injected Navigator for screen navigation
Deep linking support
Type-safe navigation arguments
Error Handling
UiMessageManager for error messages
ObservableLoadingCounter for loading states
Retry mechanisms
Interactors (Write Operations)
Interactors encapsulate single use cases and return Flow<InvokeStatus> to indicate operation status.
Base Interactor Class
Interactor Base
InvokeStatus
abstract class Interactor < in P > {
suspend operator fun invoke (
params: P ,
timeoutMs: Long = defaultTimeoutMs
): Flow < InvokeStatus > {
return flow {
try {
withTimeout (timeoutMs) {
emit (InvokeStarted)
doWork (params) // Execute business logic
emit (InvokeSuccess)
}
} catch (t: TimeoutCancellationException ) {
emit ( InvokeError (t))
}
}. catch { t ->
emit ( InvokeError (t))
}
}
suspend fun execute (params: P ) = doWork (params)
protected abstract suspend fun doWork (params: P )
companion object {
private val defaultTimeoutMs = 10 .minutes.inWholeMilliseconds
}
}
Example Interactor Implementation
UpdateHomepage
RemoveRecentItem
UpdateFavorite
class UpdateHomepage @Inject constructor (
private val dispatchers: CoroutineDispatchers ,
private val homepageRepository: HomepageRepository ,
private val itemRepository: ItemRepository ,
private val buildRemoteQuery: BuildRemoteQuery ,
) : Interactor < Unit >() {
override suspend fun doWork (params: Unit ) {
withContext (dispatchers.io) {
// Get IDs from homepage collection
val unFetchedIds = homepageRepository. getHomepageIds ()
. map { ids -> ids. filter { ! itemRepository. exists (it) } }
// Fetch items from API
unFetchedIds. forEach { ids ->
if (ids. isNotEmpty ()) {
val query = ArchiveQuery (). booksByIdentifiers (ids)
val items = itemRepository. updateQuery ( buildRemoteQuery (query))
itemRepository. saveItems (items. filterNot { it.isInappropriate })
}
}
}
}
}
Using Interactors in ViewModels
Collect Status
Simple Collect
Direct Execution
viewModelScope. launch {
updateHomepage (Unit)
. collectStatus (loadingCounter, uiMessageManager)
}
// collectStatus is an extension function that:
// - Updates loading counter on InvokeStarted
// - Clears loading on InvokeSuccess
// - Shows error message on InvokeError
Observers (Read Operations)
Observers provide reactive streams of data using SubjectInteractor.
Base SubjectInteractor Class
SubjectInteractor
Unit Parameter Extension
abstract class SubjectInteractor < P , T > {
private val paramState = MutableSharedFlow < P >(
replay = 1 ,
extraBufferCapacity = 1 ,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
operator fun invoke (params: P ): Flow < T > {
paramState. tryEmit (params)
return flow
}
fun execute (params: P ): Flow < T > = createObservable (params)
abstract fun createObservable (params: P ): Flow < T >
val flow: Flow < T > = paramState
. distinctUntilChanged ()
. flatMapLatest { createObservable (it) }
. distinctUntilChanged ()
suspend fun get (): T = flow. first ()
suspend fun getOrNull (): T ? = flow. firstOrNull ()
}
Example Observer Implementations
ObserveHomepage
ObserveItemDetail
ObserveFavorites
class ObserveHomepage @Inject constructor (
private val coroutineDispatchers: CoroutineDispatchers ,
private val homepageRepository: HomepageRepository ,
) : SubjectInteractor < Unit , Homepage >() {
override fun createObservable (params: Unit ): Flow < Homepage > {
return homepageRepository. observeHomepageCollection ()
. map { Homepage (collection = it) }
. flowOn (coroutineDispatchers.io)
}
}
Using Observers in ViewModels
Start Observing
Combine Multiple Observers
Direct Flow Access
init {
// Start observing - params emitted to paramState
observeHomepage (Unit)
observeUser (ObserveUser. Params ())
}
// The observer's .flow automatically reacts to parameter changes
ResultInteractor
For operations that return a result directly without status flow.
abstract class ResultInteractor < in P , R > {
private val count = atomic ( 0 )
private val loadingState = MutableStateFlow (count. value )
val inProgress: Flow < Boolean > = loadingState. map { it > 0 }
suspend operator fun invoke (
params: P ,
timeout: Duration = DefaultTimeout,
): Result < R > = try {
addLoader ()
runCatching {
withTimeout (timeout) {
doWork (params)
}
}
} finally {
removeLoader ()
}
protected abstract suspend fun doWork (params: P ): R
}
Benefits of This Pattern
Testability
ViewModels can be tested by mocking interactors
Interactors can be tested independently
Clear boundaries for unit tests
Reusability
Interactors can be shared across ViewModels
Common business logic in one place
Consistent error handling
Separation of Concerns
ViewModels focus on UI state
Interactors handle business logic
Repositories manage data
Type Safety
Strong typing with generic parameters
Compile-time checks
Clear data flow
State Management Patterns
Combining Multiple Flows
Multiple Data Sources
Derived State
val state: StateFlow < ViewState > = combine (
observeHomepage.flow, // Flow<Homepage>
observeUser.flow, // Flow<User?>
loadingCounter.observable, // Flow<Boolean>
uiMessageManager.message, // Flow<UiMessage?>
) { homepage, user, isLoading, message ->
ViewState (
homepage = homepage,
user = user,
isLoading = isLoading,
message = message
)
}. stateInDefault (viewModelScope, ViewState ())
Loading and Error Handling
ObservableLoadingCounter
UiMessageManager
collectStatus Extension
class ObservableLoadingCounter {
private val count = AtomicInteger ()
private val loadingState = MutableStateFlow (count. get ())
val observable: Flow < Boolean > =
loadingState. map { it > 0 }. distinctUntilChanged ()
fun addLoader () = loadingState. value = count. incrementAndGet ()
fun removeLoader () = loadingState. value = count. decrementAndGet ()
}
Best Practices
Don’t : Access repositories directly from ViewModels// ❌ Bad
class MyViewModel ( private val repository: MyRepository ) {
fun loadData () {
viewModelScope. launch {
repository. fetchData () // Direct repository access
}
}
}
Do : Use Interactors/Observers to access data// ✅ Good
class MyViewModel (
private val updateData: UpdateData ,
observeData: ObserveData
) {
val state = observeData.flow. stateIn (viewModelScope)
fun loadData () {
viewModelScope. launch {
updateData (Unit). collectStatus (loadingCounter, messageManager)
}
}
}
Use Observers for read operations that return Flow<T>
Use Interactors for write operations that return Flow<InvokeStatus>
Use ResultInteractors when you need Result<T> return type
Keep ViewModels focused on UI state and user actions
Move all business logic to Interactors