Skip to main content

Overview

The Gemini Live Telephony app demonstrates how to build a production-grade, real-time conversational AI system that operates over the phone. This application integrates Twilio for telephony, FastAPI for real-time processing, and the Google Gemini Live API for low-latency voice interactions.
Originally built as an “AI Care Assistant,” this architecture can be adapted for customer service, technical support, appointment scheduling, and other voice-based use cases.

Architecture

High-Level Data Flow

System Components

Twilio Programmable Voice

  • Phone number provisioning
  • TwiML webhook handling
  • Bidirectional audio streaming
  • µ-law codec (8kHz)

FastAPI Backend

  • Async WebSocket handling
  • Real-time audio transcoding
  • Session state management
  • Deployed on Cloud Run

Gemini Live API

  • Streaming audio input/output
  • Voice Activity Detection (VAD)
  • Conversation turn management
  • Session resumption

Audio Pipeline

  • 8kHz µ-law ↔ 16kHz PCM
  • 24kHz PCM ↔ 8kHz µ-law
  • High-fidelity resampling
  • Stateful DSP processing

Audio Transcoding Pipeline

The Critical Challenge

The application bridges three incompatible audio formats:
  • Twilio: 8kHz µ-law (telephony standard)
  • Gemini Input: 16kHz PCM (Live API requirement)
  • Gemini Output: 24kHz PCM (high-quality voice synthesis)
Naive chunk-by-chunk resampling introduces audible artifacts. The solution requires stateful streaming DSP.

Transcoding Architecture

import numpy as np
import samplerate
import audioop
import base64

# Initialize stateful resamplers (critical for quality)
resampler_in = samplerate.Resampler('sinc_fastest', channels=1)
resampler_out = samplerate.Resampler('sinc_fastest', channels=1)

async def handle_twilio_to_gemini(websocket, in_q, resampler, call_state):
    """Inbound pipeline: Twilio → Gemini"""
    async for message in websocket.iter_text():
        data = json.loads(message)
        
        if data['event'] == 'media':
            # Step 1: Decode base64 µ-law from Twilio
            mulaw_audio = base64.b64decode(data['media']['payload'])
            
            # Step 2: Convert µ-law to 16-bit PCM
            pcm_8khz = audioop.ulaw2lin(mulaw_audio, 2)  # 2 = 16-bit
            
            # Step 3: Convert to float32 for resampling
            pcm_8khz_float = np.frombuffer(pcm_8khz, dtype=np.int16).astype(np.float32) / 32768.0
            
            # Step 4: Resample 8kHz → 16kHz with stateful resampler
            pcm_16khz_float = resampler.process(
                pcm_8khz_float,
                ratio=16000/8000,
                end_of_input=False  # Critical: maintains filter state
            )
            
            # Step 5: Convert back to int16 PCM bytes
            pcm_16khz_int16 = (pcm_16khz_float * 32768.0).astype(np.int16)
            pcm_16khz_bytes = pcm_16khz_int16.tobytes()
            
            # Step 6: Send to Gemini Live API
            await in_q.put(pcm_16khz_bytes)

async def handle_gemini_to_twilio(websocket, out_q, resampler, call_state):
    """Outbound pipeline: Gemini → Twilio"""
    while call_state['active']:
        # Step 1: Receive 24kHz PCM from Gemini
        pcm_24khz_bytes = await out_q.get()
        
        # Step 2: Convert to float32
        pcm_24khz_float = np.frombuffer(pcm_24khz_bytes, dtype=np.int16).astype(np.float32) / 32768.0
        
        # Step 3: Resample 24kHz → 8kHz
        pcm_8khz_float = resampler.process(
            pcm_24khz_float,
            ratio=8000/24000,
            end_of_input=False
        )
        
        # Step 4: Convert to int16 PCM
        pcm_8khz_int16 = (pcm_8khz_float * 32768.0).astype(np.int16)
        pcm_8khz_bytes = pcm_8khz_int16.tobytes()
        
        # Step 5: Convert PCM to µ-law
        mulaw_audio = audioop.lin2ulaw(pcm_8khz_bytes, 2)
        
        # Step 6: Encode to base64 and send to Twilio
        mulaw_b64 = base64.b64encode(mulaw_audio).decode('utf-8')
        await websocket.send_text(json.dumps({
            "event": "media",
            "streamSid": call_state['stream_sid'],
            "media": {"payload": mulaw_b64}
        }))

Why python-samplerate?

The application uses samplerate (libsamplerate wrapper) instead of scipy.signal.resample:

Stateful Processing

Maintains filter state across chunks to prevent discontinuities

Low Latency

Optimized for real-time streaming with minimal buffering

High Fidelity

SRC_SINC_FASTEST provides excellent quality-to-speed ratio

Production Ready

Battle-tested in audio production and telephony systems

Gemini Live API Integration

Session Configuration

from google import genai
from google.genai import types

# Initialize Gemini client with Vertex AI
client = genai.Client(
    vertexai=True,
    project=os.getenv("GOOGLE_CLOUD_PROJECT"),
    location=os.getenv("GOOGLE_CLOUD_LOCATION"),
)

# Configure Live session
config = types.LiveConnectConfig(
    system_instruction=BASE_SYSTEM_INSTRUCTION,
    
    # Voice configuration
    voice_config={
        "prebuilt_voice_config": {
            "voice_name": "Puck"  # Options: Puck, Charon, Kore, Fenrir, Aoede
        }
    },
    
    # Audio settings
    generation_config={
        "temperature": 0.7,
        "top_p": 0.9,
        "top_k": 40,
    },
    
    # Real-time input configuration
    real_time_input={
        "enable_explicit_conversation_turn": False  # Continuous conversation
    },
    
    # Voice Activity Detection
    speech_config={
        "voice_activity_detection": {
            "mode": "AUTO"  # Automatic turn-taking
        }
    }
)

Session Management

async def run_gemini_session(client, model_id, in_q, out_q, call_state):
    """Manage persistent Gemini Live session"""
    session_handle = None  # For session resumption
    
    try:
        async with client.aio.live.connect(
            model=model_id,
            config=config,
            resume_handle=session_handle  # Resume if reconnecting
        ) as session:
            
            # Start concurrent tasks
            sender_task = asyncio.create_task(
                sender_loop(session, in_q, call_state)
            )
            heartbeat_task = asyncio.create_task(
                heartbeat_loop(session, call_state)
            )
            
            # Main receiver loop
            async for response in session.receive():
                if response.server_content:
                    # Audio output from Gemini
                    if response.server_content.model_turn:
                        for part in response.server_content.model_turn.parts:
                            if part.inline_data:
                                audio_data = part.inline_data.data
                                await out_q.put(audio_data)
                
                elif response.session_resumption_update:
                    # Save session handle for reconnection
                    session_handle = response.session_resumption_update.new_handle
                    logger.info(f"Session handle updated: {session_handle[:20]}...")
            
            # Cleanup
            sender_task.cancel()
            heartbeat_task.cancel()
    
    except Exception as e:
        logger.error(f"Gemini session error: {e}")
        call_state['active'] = False

Voice Activity Detection (VAD)

Gemini Live API’s VAD automatically detects:
  • Speech start: User begins speaking
  • Speech end: User stops speaking
  • Turn-taking: When to let AI respond
# Configure VAD sensitivity
speech_config={
    "voice_activity_detection": {
        "mode": "AUTO",  # or "MANUAL"
        "threshold": 0.5  # Sensitivity: 0.0 (aggressive) to 1.0 (conservative)
    }
}

FastAPI WebSocket Handler

from fastapi import FastAPI, WebSocket, Response
import asyncio

app = FastAPI(title="Gemini Live Telephony")

@app.post("/twiml")
async def get_twiml():
    """Twilio webhook: Return TwiML to establish WebSocket"""
    service_url = os.getenv("SERVICE_URL").replace("https://", "")
    
    twiml = f'''<Response>
        <Connect>
            <Stream url="wss://{service_url}/ws/twilio" />
        </Connect>
    </Response>'''
    
    return Response(content=twiml, media_type="application/xml")

@app.websocket("/ws/twilio")
async def websocket_twilio_endpoint(websocket: WebSocket):
    """Main WebSocket endpoint for Twilio media streams"""
    await websocket.accept()
    
    # Initialize state
    call_state = {
        'active': False,
        'stream_sid': None
    }
    
    # Create queues for audio flow
    in_q = asyncio.Queue()   # Twilio → Gemini
    out_q = asyncio.Queue()  # Gemini → Twilio
    
    # Initialize stateful resamplers
    resampler_in = samplerate.Resampler('sinc_fastest', channels=1)
    resampler_out = samplerate.Resampler('sinc_fastest', channels=1)
    
    # Start concurrent tasks
    tasks = [
        asyncio.create_task(
            handle_twilio_to_gemini(websocket, in_q, resampler_in, call_state)
        ),
        asyncio.create_task(
            handle_gemini_to_twilio(websocket, out_q, resampler_out, call_state)
        ),
        asyncio.create_task(
            run_gemini_session(client, MODEL_ID, in_q, out_q, call_state)
        ),
    ]
    
    try:
        # Wait for any task to complete (typically on hangup)
        await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    finally:
        # Cleanup
        call_state['active'] = False
        for task in tasks:
            task.cancel()
        
        logger.info("Call ended, resources cleaned up")

Cloud Run Deployment

Critical Configuration

#!/bin/bash
# deploy.sh

PROJECT_ID="your-project-id"
REGION="us-central1"
SERVICE_NAME="gemini-live-telephony"
IMAGE="gcr.io/${PROJECT_ID}/${SERVICE_NAME}:latest"

# Build container
docker build -t ${IMAGE} .
docker push ${IMAGE}

# Deploy to Cloud Run with production settings
gcloud run deploy ${SERVICE_NAME} \
  --image ${IMAGE} \
  --region ${REGION} \
  --platform managed \
  --allow-unauthenticated \
  \
  `# Critical settings for low-latency voice` \
  --min-instances=1 \
  --max-instances=100 \
  --timeout=3600 \
  --memory=2Gi \
  --cpu=2 \
  --concurrency=1 \
  --session-affinity \
  --no-cpu-throttling \
  \
  `# Environment variables` \
  --set-env-vars="GOOGLE_CLOUD_PROJECT=${PROJECT_ID}" \
  --set-env-vars="GOOGLE_CLOUD_LOCATION=${REGION}" \
  --set-env-vars="GOOGLE_GENAI_MODEL=gemini-2.0-flash-native-audio" \
  --set-env-vars="SERVICE_URL=https://SERVICE_URL_PLACEHOLDER"

# Get service URL
SERVICE_URL=$(gcloud run services describe ${SERVICE_NAME} \
  --region=${REGION} \
  --format='value(status.url)')

echo "Service URL: ${SERVICE_URL}"
echo "Configure Twilio webhook: ${SERVICE_URL}/twiml"

Configuration Rationale

min-instances=1

Eliminates cold starts. Critical for phone calls - users won’t wait 10s for first response.

concurrency=1

Each instance handles one call. Prevents resource contention for CPU-intensive audio processing.

session-affinity

Routes WebSocket reconnections to same instance, preserving in-memory resampler state.

no-cpu-throttling

Ensures full CPU access. Audio resampling is latency-sensitive and CPU-bound.

timeout=3600

1-hour timeout for long calls. WebSocket connections must persist for call duration.

memory=2Gi, cpu=2

Sufficient resources for real-time DSP without swapping or throttling.

Dockerfile

FROM python:3.12-slim

# Install system dependencies for audio processing
RUN apt-get update && apt-get install -y \
    libsamplerate0 \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Run with uvicorn
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Twilio Configuration

1

Create Twilio Account

Sign up at twilio.com/try-twilio for free trial
2

Get Phone Number

Navigate to Phone Numbers → Buy a Number. Choose a number with Voice capabilities.
3

Configure Webhook

  1. Go to Phone Numbers → Manage → Active Numbers
  2. Click your phone number
  3. Under “Voice & Fax”, set:
    • A CALL COMES IN: Webhook
    • URL: https://YOUR_CLOUD_RUN_URL/twiml
    • HTTP Method: POST
  4. Save configuration
4

Test Call

Dial your Twilio number. You should hear the Gemini Live voice assistant respond.

Local Development with ngrok

For local testing before Cloud Run deployment:
1

Install ngrok

Download from ngrok.com/download
2

Start Local Server

uvicorn main:app --host 0.0.0.0 --port 8000
3

Expose with ngrok

ngrok http 8000
Copy the forwarding URL (e.g., https://abc123.ngrok-free.app)
4

Update Environment

# In .env file
SERVICE_URL=https://abc123.ngrok-free.app
Restart FastAPI server to load new URL.
5

Configure Twilio

Set webhook to https://abc123.ngrok-free.app/twiml
6

Test

Call Twilio number. Watch real-time logs in your terminal.

Monitoring & Debugging

Cloud Logging

# Stream logs from Cloud Run
gcloud logging tail "resource.type=cloud_run_revision AND resource.labels.service_name=gemini-live-telephony" --format=json

# Filter for errors
gcloud logging read "resource.type=cloud_run_revision AND severity>=ERROR" --limit 50

Key Metrics to Monitor

  • WebSocket duration: Should match call length
  • Audio queue depths: in_q.qsize(), out_q.qsize() should stay < 10
  • Resampler latency: Process time per chunk should be < 5ms
  • Gemini API latency: Time from audio input to output
  • Cold start frequency: Should be 0 with min-instances=1

Common Issues

Check:
  • Resampler state is preserved (not recreated per chunk)
  • Queue depths are reasonable
  • CPU throttling disabled in Cloud Run
  • No GC pauses (use gc.set_threshold() tuning)
Verify:
  • min-instances=1 to avoid cold starts
  • Cloud Run region close to users
  • No network egress throttling
  • Gemini Live API location matches Cloud Run region
Possible causes:
  • Missing heartbeat loop
  • Timeout < call duration
  • Session handle not preserved
  • WebSocket disconnection without reconnect logic
Check:
  • SERVICE_URL environment variable is correct
  • Cloud Run service allows unauthenticated requests
  • TwiML response is valid XML
  • WebSocket URL uses wss:// not ws://

Production Considerations

State Management

For production, externalize conversation state to Redis:
import redis.asyncio as redis

# Initialize Redis client
redis_client = redis.from_url(
    "redis://MEMORYSTORE_IP:6379",
    decode_responses=True
)

async def save_session_handle(call_sid: str, handle: str):
    """Save session handle for resumption"""
    await redis_client.setex(
        f"session:{call_sid}",
        3600,  # 1 hour TTL
        handle
    )

async def get_session_handle(call_sid: str) -> str:
    """Retrieve session handle"""
    return await redis_client.get(f"session:{call_sid}")

Security Enhancements

from fastapi import Header, HTTPException
import hmac
import hashlib

def verify_twilio_signature(signature: str, url: str, params: dict):
    """Verify Twilio webhook signature"""
    auth_token = os.getenv("TWILIO_AUTH_TOKEN")
    
    # Construct signature
    data = url + ''.join([f"{k}{v}" for k, v in sorted(params.items())])
    expected = base64.b64encode(
        hmac.new(
            auth_token.encode('utf-8'),
            data.encode('utf-8'),
            hashlib.sha256
        ).digest()
    ).decode('utf-8')
    
    if not hmac.compare_digest(expected, signature):
        raise HTTPException(status_code=403, detail="Invalid signature")

@app.post("/twiml")
async def get_twiml(x_twilio_signature: str = Header(None)):
    # Verify signature before processing
    verify_twilio_signature(x_twilio_signature, request.url, request.form)
    # ... rest of handler

Cost Optimization

# Use Cloud Run with traffic splitting for A/B testing
gcloud run services update-traffic gemini-live-telephony \
  --to-revisions=v1=50,v2=50

# Set max-instances to control peak cost
gcloud run services update gemini-live-telephony \
  --max-instances=50  # Caps at 50 concurrent calls

# Use Committed Use Discounts for predictable workloads
gcloud compute commitments create telephony-cud \
  --resources=vcpu=100,memory=200GB \
  --plan=12-month

Key Takeaways

Stateful Resampling

Use python-samplerate with persistent resampler objects to avoid audio artifacts

Cloud Run for Voice

Configure min-instances=1, concurrency=1, no-cpu-throttling for low latency

Session Resumption

Preserve Gemini session handles for seamless reconnection

Production State

Externalize state to Memorystore for horizontal scaling

Next Steps

Resources

Build docs developers (and LLMs) love