Overview
Flow control methods manage the lifecycle of your Ability and signal the Agent when your Ability is done.
CRITICAL: Every Ability MUST call resume_normal_flow() before exiting. Without it, the Agent goes silent and the user must restart the conversation.
resume_normal_flow()
Returns control to the Agent. You MUST call this when your Ability is done.
Signature
self .capability_worker.resume_normal_flow()
Synchronous - Do NOT use await
When to Call
Call resume_normal_flow() on EVERY exit path :
✅ End of main logic (happy path)
✅ After every break in a loop
✅ Inside except blocks (error fallback)
✅ After timeout
✅ After user says “exit”/“stop”/“quit”
Think of resume_normal_flow() like return in a function - it signals “I’m done, give control back.”
Examples
Basic Usage
With Loop Exit
With Error Handling
Multiple Exit Paths
async def run ( self ):
await self .capability_worker.speak( "Hello!" )
# ALWAYS call resume_normal_flow at the end
self .capability_worker.resume_normal_flow()
Pre-Launch Checklist
Before shipping any Ability, verify:
Common mistake: # ❌ WRONG - resume_normal_flow inside loop
while True :
user_input = await self .capability_worker.user_response()
await self .capability_worker.speak( "Got it." )
self .capability_worker.resume_normal_flow() # Will exit after 1 turn!
# ✅ CORRECT - resume_normal_flow after loop
while True :
user_input = await self .capability_worker.user_response()
if "stop" in user_input.lower():
break
await self .capability_worker.speak( "Got it." )
self .capability_worker.resume_normal_flow() # After loop ends
send_interrupt_signal()
Sends an interrupt event to stop the current assistant output (speech/audio) and immediately return to user input.
Signature
await self .capability_worker.send_interrupt_signal()
Async - Use await
When to Use
User says “stop” during long output
Emergency cutoff during audio playback
Manual override to return to listening state
Example
async def run ( self ):
# Start playing long audio
await self .capability_worker.speak(
"I'm about to tell you a very long story..."
)
# User interrupts
await self .capability_worker.send_interrupt_signal()
# Now listening for user input
user_input = await self .capability_worker.user_response()
if "skip" in user_input.lower():
await self .capability_worker.speak( "Skipping ahead." )
self .capability_worker.resume_normal_flow()
Most Abilities don’t need send_interrupt_signal(). Use it only when you need manual control over stopping output.
exec_local_command()
Executes a command on the user’s local device (Mac, Raspberry Pi, etc.) via WebSocket.
Signature
response = await self .capability_worker.exec_local_command(command)
Async - Use await
Parameters:
command (str): Shell command to execute on the local device
Returns: dict with command output or error information
When to Use
Use exec_local_command() when you need to:
Execute terminal commands on the user’s Mac/PC
Control local applications or services
Access files on the user’s device
Run scripts that require local resources
Abilities run in OpenHome’s cloud sandbox. exec_local_command() bridges the cloud → device gap via WebSocket.
Example
async def run ( self ):
# Get user's request
await self .capability_worker.speak( "What command would you like to run?" )
user_input = await self .capability_worker.user_response()
# Generate safe command using LLM
prompt = f "Convert this request to a safe Mac terminal command: { user_input } "
command = self .capability_worker.text_to_text_response(prompt)
# Execute on local device
await self .capability_worker.speak( f "Running: { command } " )
result = await self .capability_worker.exec_local_command(command)
# Speak result
self .worker.editor_logging_handler.info( f "Command result: { result } " )
await self .capability_worker.speak( "Command executed successfully." )
self .capability_worker.resume_normal_flow()
Security: Always validate commands before executing. Never pass user input directly to exec_local_command() without sanitization.# ❌ DANGEROUS - User could say "rm -rf /"
command = user_input
await self .capability_worker.exec_local_command(command)
# ✅ SAFE - Use LLM to validate and transform
safe_command = self .capability_worker.text_to_text_response(
f "Convert to safe terminal command: { user_input } . If dangerous, return 'UNSAFE'"
)
if safe_command != "UNSAFE" :
await self .capability_worker.exec_local_command(safe_command)
See Also
send_email()
Sends an email via SMTP. Useful for notifications, reports, or automated emails.
Signature
status = self .capability_worker.send_email(
host = "smtp.gmail.com" ,
port = 465 ,
sender_email = "[email protected] " ,
sender_password = "your-app-password" ,
receiver_email = "[email protected] " ,
cc_emails = [],
subject = "Subject Line" ,
body = "Email body text" ,
attachment_paths = []
)
Synchronous - Do NOT use await
Parameters:
host (str): SMTP server hostname (e.g., "smtp.gmail.com")
port (int): SMTP port (465 for SSL, 587 for TLS)
sender_email (str): Sender email address
sender_password (str): SMTP password or app password
receiver_email (str): Recipient email address
cc_emails (list): CC recipients (optional, default [])
subject (str): Email subject line
body (str): Email body content
attachment_paths (list): List of filenames in Ability folder (optional)
Returns: bool - True if sent successfully, False otherwise
Example
async def run ( self ):
await self .capability_worker.speak( "Sending your daily report..." )
# Generate report content
report = self .capability_worker.text_to_text_response(
"Generate a brief daily summary"
)
# Send email
success = self .capability_worker.send_email(
host = "smtp.gmail.com" ,
port = 465 ,
sender_email = "[email protected] " ,
sender_password = "app-password-here" ,
receiver_email = "[email protected] " ,
cc_emails = [],
subject = "Daily Report" ,
body = report,
attachment_paths = []
)
if success:
await self .capability_worker.speak( "Report sent successfully." )
else :
await self .capability_worker.speak( "Failed to send report." )
self .capability_worker.resume_normal_flow()
Gmail Users: You must use an App Password , not your regular Gmail password.
Security: Never hardcode credentials. Store them in environment variables or use the file storage system to prompt users for configuration on first run.
See Also
AgentWorker Reference
Access AgentWorker via self.worker for logging, session management, and user connection info.
Initialization
class MyAbility ( MatchingCapability ):
worker: AgentWorker = None
capability_worker: CapabilityWorker = None
def call ( self , worker : AgentWorker):
self .worker = worker # Store AgentWorker reference
self .capability_worker = CapabilityWorker( self )
self .worker.session_tasks.create( self .run())
Logging
editor_logging_handler
Always use this. Never use print().
self .worker.editor_logging_handler.info( "Something happened" )
self .worker.editor_logging_handler.error( "Something broke" )
self .worker.editor_logging_handler.warning( "Something suspicious" )
self .worker.editor_logging_handler.debug( "Debugging info" )
Why Not print()?
print() doesn’t appear in the Live Editor logs
Logs are critical for debugging deployed Abilities
Logging provides timestamps and severity levels
Examples
API Calls
Error Logging
Debugging
Warnings
self .worker.editor_logging_handler.info( "Calling weather API..." )
response = requests.get(weather_url, timeout = 10 )
self .worker.editor_logging_handler.info(
f "Weather API returned: { response.status_code } "
)
Log before and after API calls so you can track when/where things go wrong in production.
Session Tasks
OpenHome’s managed task system ensures async work gets properly cancelled when sessions end.
Never use raw asyncio methods. Raw asyncio tasks can outlive a session. If the user disconnects, your task keeps running as a ghost process.
session_tasks.create()
Launches an async task within the agent’s managed lifecycle.
self .worker.session_tasks.create(coroutine)
Use instead of: asyncio.create_task()
Examples
Launch Ability
Background Task
def call ( self , worker : AgentWorker):
self .worker = worker
self .capability_worker = CapabilityWorker( self )
# Launch main Ability task
self .worker.session_tasks.create( self .run())
session_tasks.sleep()
Pauses execution for the specified duration.
await self .worker.session_tasks.sleep(seconds: float )
Use instead of: asyncio.sleep()
Examples
Delay
Timer
Retry with Backoff
await self .capability_worker.speak( "I'll check back in 5 seconds." )
await self .worker.session_tasks.sleep( 5.0 )
await self .capability_worker.speak( "Time's up!" )
Use session_tasks.sleep() for delays and timers. It’s automatically cancelled if the user disconnects.
User Connection Info
get_timezone()
Returns the user’s timezone string.
timezone = self .capability_worker.get_timezone()
Synchronous - Do NOT use await
Returns: Timezone string like "America/Chicago" or None
Example
timezone = self .capability_worker.get_timezone()
if timezone:
self .worker.editor_logging_handler.info( f "User timezone: { timezone } " )
# Use for date/time formatting
else :
self .worker.editor_logging_handler.warning( "Timezone unavailable" )
user_socket.client.host
The user’s public IP address at connection time.
user_ip = self .worker.user_socket.client.host
Example
user_ip = self .worker.user_socket.client.host
self .worker.editor_logging_handler.info( f "User connected from: { user_ip } " )
# Use for IP geolocation (check for cloud IPs first)
if "amazonaws" not in user_ip:
location = geolocate(user_ip)
Cloud/datacenter IPs won’t give useful location data. Check the ISP name for keywords like “amazon”, “aws”, “google cloud” before using for geolocation.
Music Mode
Signal long-form audio playback to stop listening and prevent interruptions.
Pattern
# 1. Enter music mode
self .worker.music_mode_event.set()
await self .capability_worker.send_data_over_websocket(
"music-mode" ,
{ "mode" : "on" }
)
# 2. Play audio
await self .capability_worker.play_audio(audio_bytes)
# 3. Exit music mode
await self .capability_worker.send_data_over_websocket(
"music-mode" ,
{ "mode" : "off" }
)
self .worker.music_mode_event.clear()
Example
async def play_music ( self ):
try :
# Enter music mode
self .worker.music_mode_event.set()
await self .capability_worker.send_data_over_websocket(
"music-mode" ,
{ "mode" : "on" }
)
# Play audio
audio = requests.get( "https://example.com/song.mp3" ).content
await self .capability_worker.play_audio(audio)
finally :
# Always exit music mode
await self .capability_worker.send_data_over_websocket(
"music-mode" ,
{ "mode" : "off" }
)
self .worker.music_mode_event.clear()
self .capability_worker.resume_normal_flow()
See Audio Playback for full details.
Conversation History
get_full_message_history()
Returns the full conversation history from the current session.
history = self .capability_worker.get_full_message_history()
Synchronous - Do NOT use await
Returns: List of message objects with role and content
Example
history = self .capability_worker.get_full_message_history()
self .worker.editor_logging_handler.info(
f "Conversation has { len (history) } messages"
)
# Use for context-aware responses
for msg in history[ - 3 :]: # Last 3 messages
self .worker.editor_logging_handler.info(
f " { msg[ 'role' ] } : { msg[ 'content' ] } "
)
See LLM Documentation for using history with text_to_text_response().
Complete Example
import json
from src.agent.capability import MatchingCapability
from src.main import AgentWorker
from src.agent.capability_worker import CapabilityWorker
EXIT_WORDS = { "stop" , "exit" , "quit" , "done" }
class HelperAbility ( MatchingCapability ):
worker: AgentWorker = None
capability_worker: CapabilityWorker = None
def call ( self , worker : AgentWorker):
# Initialize
self .worker = worker
self .capability_worker = CapabilityWorker( self )
# Launch main task
self .worker.session_tasks.create( self .run())
async def run ( self ):
# Log start
self .worker.editor_logging_handler.info( "HelperAbility started" )
try :
# Greet user
await self .capability_worker.speak(
"Hi! Ask me anything, or say stop when done."
)
# Main loop
while True :
user_input = await self .capability_worker.user_response()
if not user_input:
continue
# Log input
self .worker.editor_logging_handler.info(
f "User said: { user_input } "
)
# Check for exit
if any (word in user_input.lower() for word in EXIT_WORDS ):
await self .capability_worker.speak( "Goodbye!" )
break
# Generate response
response = self .capability_worker.text_to_text_response(
f "Give a helpful 1-sentence response: { user_input } "
)
# Log response
self .worker.editor_logging_handler.info(
f "LLM response: { response } "
)
# Speak response
await self .capability_worker.speak(response)
except Exception as e:
# Log error
self .worker.editor_logging_handler.error( f "Error: { e } " )
await self .capability_worker.speak(
"Sorry, something went wrong."
)
finally :
# ALWAYS resume normal flow
self .worker.editor_logging_handler.info( "HelperAbility ending" )
self .capability_worker.resume_normal_flow()
Best Practices
Always Resume Normal Flow
# ✅ Good - called on all paths
async def run ( self ):
try :
await self .do_work()
except Exception :
await self .capability_worker.speak( "Error occurred." )
self .capability_worker.resume_normal_flow() # Always called
# ❌ Bad - forgotten on error path
async def run ( self ):
try :
await self .do_work()
self .capability_worker.resume_normal_flow()
except Exception :
await self .capability_worker.speak( "Error occurred." ) # Forgot resume!
Use session_tasks, Not asyncio
# ✅ Good
self .worker.session_tasks.create( self .background_task())
await self .worker.session_tasks.sleep( 5.0 )
# ❌ Bad - can leak tasks
asyncio.create_task( self .background_task())
await asyncio.sleep( 5.0 )
Log Everything Important
# ✅ Good
self .worker.editor_logging_handler.info( "Calling API..." )
result = api_call()
self .worker.editor_logging_handler.info( f "API returned: { result } " )
# ❌ Bad - no visibility
result = api_call()
Listening Always call resume_normal_flow() after input loops
LLM Use get_full_message_history() for context