Skip to main content
GemAI implements real-time message streaming to provide immediate, incremental responses from the Gemini AI model. This creates a responsive chat experience where you see the AI’s response being generated word-by-word.

Streaming architecture

The streaming functionality is built around Kotlin Flow and the Google Generative AI SDK’s streaming capabilities:
GemAIModel.kt
suspend fun sendMessage(
    conversationId: Long,
    content: String
): Flow<GenerateContentResponse> {
    require(conversationId > 0) { "Invalid conversation ID" }
    require(content.isNotBlank()) { "Message content cannot be blank" }
    
    val activeChat = chatLock.withLock {
        if (conversationId != activeChatId || chat == null) {
            initializeChat(conversationId)
        }
        chat ?: throw IllegalStateException("Chat not initialized")
    }
    
    return activeChat.sendMessageStream(content)
}
The method returns a Flow<GenerateContentResponse> that emits response chunks as they arrive from the Gemini API.

Message validation

Before streaming begins, the system validates the input parameters:
require(conversationId > 0) { "Invalid conversation ID" }
require(content.isNotBlank()) { "Message content cannot be blank" }
  • Conversation ID must be a positive value
  • Message content cannot be blank or contain only whitespace
Violations throw IllegalArgumentException with descriptive error messages.

Chat initialization and state

The streaming system maintains chat state using a mutex for thread-safe operations:
State management
private val chatLock = Mutex()
private var chat: Chat? = null
private var activeChatId: Long? = null

Chat session lifecycle

1

Lock acquisition

Acquire the mutex to ensure thread-safe chat initialization
chatLock.withLock {
    // Thread-safe operations
}
2

State check

Verify if chat needs initialization for the given conversation
if (conversationId != activeChatId || chat == null) {
    initializeChat(conversationId)
}
3

Validation

Ensure chat object is ready for streaming
chat ?: throw IllegalStateException("Chat not initialized")
4

Stream

Send the message and return the streaming response
activeChat.sendMessageStream(content)

Initializing chat with history

When a chat session starts, the system loads the conversation’s message history:
GemAIModel.kt
private suspend fun initializeChat(conversationId: Long) {
    val history = try {
        messageDao.getMessages(conversationId).map {
            content(role = it.participant.role) {
                text(it.content)
            }
        }
    } catch (e: Exception) {
        emptyList()
    }
    setChat(geminiAIModel.startChat(history), conversationId)
}
Message history is converted to the Gemini API format, mapping each message’s participant role and content. If history retrieval fails, an empty list is used, effectively starting a fresh conversation.

History format conversion

Messages are transformed from the app’s data model to Google’s AI SDK format:
data class Message(
    val content: String,
    val participant: Participant,  // USER or MODEL
    // ... other fields
)

Streaming response flow

Once the chat is initialized, streaming happens through the Google AI SDK:
Flow streaming
return activeChat.sendMessageStream(content)
This returns a Flow<GenerateContentResponse> that you can collect to receive incremental updates:
Example: Collecting stream
val responseFlow = gemAIModel.sendMessage(
    conversationId = 123,
    content = "Explain Kotlin coroutines"
)

responseFlow.collect { response ->
    // Process each chunk as it arrives
    val chunk = response.text ?: ""
    updateUI(chunk)
}
Each emission from the flow contains a portion of the complete response. You’ll need to accumulate these chunks to build the full message content.

Thread safety with mutex

The streaming implementation uses a Mutex to prevent race conditions:
Thread-safe operations
private val chatLock = Mutex()

val activeChat = chatLock.withLock {
    // Only one coroutine can execute this block at a time
    if (conversationId != activeChatId || chat == null) {
        initializeChat(conversationId)
    }
    chat ?: throw IllegalStateException("Chat not initialized")
}
The mutex ensures that:
  • Only one chat initialization happens at a time
  • Chat state isn’t modified while another coroutine is reading it
  • Conversation switching is handled safely
  • Multiple concurrent message sends don’t corrupt chat state

Use case integration

The streaming functionality is accessed through the SendMessageUseCase:
SendMessageUseCase.kt
class SendMessageUseCase @Inject constructor(
    private val chatRepository: ChatRepository,
    @Dispatcher(GemAIDispatchers.IO) private val dispatcher: CoroutineDispatcher
) : BaseUseCase<Message, Result<Unit, RequestError>> {
    override suspend fun perform(params: Message): Result<Unit, RequestError> =
        withContext(dispatcher) {
            chatRepository.sendMessage(params)
        }
}

Complete streaming workflow

viewModelScope.launch {
    val message = Message.send(
        conversationId = currentConversationId,
        content = userInput
    )
    
    sendMessageUseCase.perform(message)
}

Error handling in streams

The streaming implementation includes robust error handling:
Exception handling
private suspend fun initializeChat(conversationId: Long) {
    val history = try {
        messageDao.getMessages(conversationId).map { /* ... */ }
    } catch (e: Exception) {
        emptyList()  // Fallback to empty history
    }
    setChat(geminiAIModel.startChat(history), conversationId)
}
If message history cannot be loaded, the system gracefully falls back to an empty history, allowing the conversation to continue without context.

State management

The chat state is updated atomically when initialization completes:
Setting active chat
private fun setChat(newChat: Chat, conversationId: Long) {
    activeChatId = conversationId
    chat = newChat
}
This ensures:
  • The active conversation ID is tracked
  • The chat instance is ready for streaming
  • Future messages in the same conversation reuse the existing chat session

Performance considerations

Conversation switching

When you switch conversations, the chat is reinitialized with the new conversation’s history

History caching

Message history is loaded once per conversation and cached in the chat session

Concurrent safety

Mutex prevents race conditions when multiple messages are sent rapidly

Resource cleanup

The base class provides a close() method to cancel ongoing operations when needed

Cleanup and lifecycle

The base class manages coroutine lifecycle:
BaseAIModel.kt
protected val coroutineScope = CoroutineScope(
    SupervisorJob() + Dispatchers.Default + exceptionHandler
)

fun close() {
    coroutineScope.cancel()
}
Call close() when the AI model is no longer needed to properly clean up resources and cancel any ongoing streaming operations.

Build docs developers (and LLMs) love