Streaming architecture
The streaming functionality is built around Kotlin Flow and the Google Generative AI SDK’s streaming capabilities:GemAIModel.kt
The method returns a
Flow<GenerateContentResponse> that emits response chunks as they arrive from the Gemini API.Message validation
Before streaming begins, the system validates the input parameters:Input validation requirements
Input validation requirements
- Conversation ID must be a positive value
- Message content cannot be blank or contain only whitespace
IllegalArgumentException with descriptive error messages.Chat initialization and state
The streaming system maintains chat state using a mutex for thread-safe operations:State management
Chat session lifecycle
Initializing chat with history
When a chat session starts, the system loads the conversation’s message history:GemAIModel.kt
Message history is converted to the Gemini API format, mapping each message’s participant role and content. If history retrieval fails, an empty list is used, effectively starting a fresh conversation.
History format conversion
Messages are transformed from the app’s data model to Google’s AI SDK format:- App format
- AI SDK format
Streaming response flow
Once the chat is initialized, streaming happens through the Google AI SDK:Flow streaming
Flow<GenerateContentResponse> that you can collect to receive incremental updates:
Example: Collecting stream
Thread safety with mutex
The streaming implementation uses aMutex to prevent race conditions:
Thread-safe operations
Why use a mutex?
Why use a mutex?
The mutex ensures that:
- Only one chat initialization happens at a time
- Chat state isn’t modified while another coroutine is reading it
- Conversation switching is handled safely
- Multiple concurrent message sends don’t corrupt chat state
Use case integration
The streaming functionality is accessed through theSendMessageUseCase:
SendMessageUseCase.kt
Complete streaming workflow
Error handling in streams
The streaming implementation includes robust error handling:Exception handling
If message history cannot be loaded, the system gracefully falls back to an empty history, allowing the conversation to continue without context.
State management
The chat state is updated atomically when initialization completes:Setting active chat
- The active conversation ID is tracked
- The chat instance is ready for streaming
- Future messages in the same conversation reuse the existing chat session
Performance considerations
Conversation switching
When you switch conversations, the chat is reinitialized with the new conversation’s history
History caching
Message history is loaded once per conversation and cached in the chat session
Concurrent safety
Mutex prevents race conditions when multiple messages are sent rapidly
Resource cleanup
The base class provides a
close() method to cancel ongoing operations when neededCleanup and lifecycle
The base class manages coroutine lifecycle:BaseAIModel.kt