Skip to main content

Overview

Flow control methods manage the lifecycle of your Ability and signal the Agent when your Ability is done.
CRITICAL: Every Ability MUST call resume_normal_flow() before exiting.Without it, the Agent goes silent and the user must restart the conversation.

resume_normal_flow()

Returns control to the Agent. You MUST call this when your Ability is done.

Signature

self.capability_worker.resume_normal_flow()
Synchronous - Do NOT use await

When to Call

Call resume_normal_flow() on EVERY exit path:
  • ✅ End of main logic (happy path)
  • ✅ After every break in a loop
  • ✅ Inside except blocks (error fallback)
  • ✅ After timeout
  • ✅ After user says “exit”/“stop”/“quit”
Think of resume_normal_flow() like return in a function - it signals “I’m done, give control back.”

Examples

async def run(self):
    await self.capability_worker.speak("Hello!")
    
    # ALWAYS call resume_normal_flow at the end
    self.capability_worker.resume_normal_flow()

Pre-Launch Checklist

Before shipping any Ability, verify:
  • Called after the main flow completes?
  • Called after every break statement?
  • Called in every except block that ends the ability?
  • Called after timeout logic?
  • Called after user exit detection (“stop”, “quit”, etc.)?
Common mistake:
# ❌ WRONG - resume_normal_flow inside loop
while True:
    user_input = await self.capability_worker.user_response()
    await self.capability_worker.speak("Got it.")
    self.capability_worker.resume_normal_flow()  # Will exit after 1 turn!

# ✅ CORRECT - resume_normal_flow after loop
while True:
    user_input = await self.capability_worker.user_response()
    if "stop" in user_input.lower():
        break
    await self.capability_worker.speak("Got it.")

self.capability_worker.resume_normal_flow()  # After loop ends

send_interrupt_signal()

Sends an interrupt event to stop the current assistant output (speech/audio) and immediately return to user input.

Signature

await self.capability_worker.send_interrupt_signal()
Async - Use await

When to Use

  • User says “stop” during long output
  • Emergency cutoff during audio playback
  • Manual override to return to listening state

Example

async def run(self):
    # Start playing long audio
    await self.capability_worker.speak(
        "I'm about to tell you a very long story..."
    )
    
    # User interrupts
    await self.capability_worker.send_interrupt_signal()
    
    # Now listening for user input
    user_input = await self.capability_worker.user_response()
    
    if "skip" in user_input.lower():
        await self.capability_worker.speak("Skipping ahead.")
    
    self.capability_worker.resume_normal_flow()
Most Abilities don’t need send_interrupt_signal(). Use it only when you need manual control over stopping output.

exec_local_command()

Executes a command on the user’s local device (Mac, Raspberry Pi, etc.) via WebSocket.

Signature

response = await self.capability_worker.exec_local_command(command)
Async - Use await Parameters:
  • command (str): Shell command to execute on the local device
Returns: dict with command output or error information

When to Use

Use exec_local_command() when you need to:
  • Execute terminal commands on the user’s Mac/PC
  • Control local applications or services
  • Access files on the user’s device
  • Run scripts that require local resources
Abilities run in OpenHome’s cloud sandbox. exec_local_command() bridges the cloud → device gap via WebSocket.

Example

async def run(self):
    # Get user's request
    await self.capability_worker.speak("What command would you like to run?")
    user_input = await self.capability_worker.user_response()
    
    # Generate safe command using LLM
    prompt = f"Convert this request to a safe Mac terminal command: {user_input}"
    command = self.capability_worker.text_to_text_response(prompt)
    
    # Execute on local device
    await self.capability_worker.speak(f"Running: {command}")
    result = await self.capability_worker.exec_local_command(command)
    
    # Speak result
    self.worker.editor_logging_handler.info(f"Command result: {result}")
    await self.capability_worker.speak("Command executed successfully.")
    self.capability_worker.resume_normal_flow()
Security: Always validate commands before executing. Never pass user input directly to exec_local_command() without sanitization.
# ❌ DANGEROUS - User could say "rm -rf /"
command = user_input
await self.capability_worker.exec_local_command(command)

# ✅ SAFE - Use LLM to validate and transform
safe_command = self.capability_worker.text_to_text_response(
    f"Convert to safe terminal command: {user_input}. If dangerous, return 'UNSAFE'"
)
if safe_command != "UNSAFE":
    await self.capability_worker.exec_local_command(safe_command)

See Also


send_email()

Sends an email via SMTP. Useful for notifications, reports, or automated emails.

Signature

status = self.capability_worker.send_email(
    host="smtp.gmail.com",
    port=465,
    sender_email="[email protected]",
    sender_password="your-app-password",
    receiver_email="[email protected]",
    cc_emails=[],
    subject="Subject Line",
    body="Email body text",
    attachment_paths=[]
)
Synchronous - Do NOT use await Parameters:
  • host (str): SMTP server hostname (e.g., "smtp.gmail.com")
  • port (int): SMTP port (465 for SSL, 587 for TLS)
  • sender_email (str): Sender email address
  • sender_password (str): SMTP password or app password
  • receiver_email (str): Recipient email address
  • cc_emails (list): CC recipients (optional, default [])
  • subject (str): Email subject line
  • body (str): Email body content
  • attachment_paths (list): List of filenames in Ability folder (optional)
Returns: bool - True if sent successfully, False otherwise

Example

async def run(self):
    await self.capability_worker.speak("Sending your daily report...")
    
    # Generate report content
    report = self.capability_worker.text_to_text_response(
        "Generate a brief daily summary"
    )
    
    # Send email
    success = self.capability_worker.send_email(
        host="smtp.gmail.com",
        port=465,
        sender_email="[email protected]",
        sender_password="app-password-here",
        receiver_email="[email protected]",
        cc_emails=[],
        subject="Daily Report",
        body=report,
        attachment_paths=[]
    )
    
    if success:
        await self.capability_worker.speak("Report sent successfully.")
    else:
        await self.capability_worker.speak("Failed to send report.")
    
    self.capability_worker.resume_normal_flow()
Gmail Users: You must use an App Password, not your regular Gmail password.
Security: Never hardcode credentials. Store them in environment variables or use the file storage system to prompt users for configuration on first run.

See Also


AgentWorker Reference

Access AgentWorker via self.worker for logging, session management, and user connection info.

Initialization

class MyAbility(MatchingCapability):
    worker: AgentWorker = None
    capability_worker: CapabilityWorker = None
    
    def call(self, worker: AgentWorker):
        self.worker = worker  # Store AgentWorker reference
        self.capability_worker = CapabilityWorker(self)
        self.worker.session_tasks.create(self.run())

Logging

editor_logging_handler

Always use this. Never use print().
self.worker.editor_logging_handler.info("Something happened")
self.worker.editor_logging_handler.error("Something broke")
self.worker.editor_logging_handler.warning("Something suspicious")
self.worker.editor_logging_handler.debug("Debugging info")

Why Not print()?

  • print() doesn’t appear in the Live Editor logs
  • Logs are critical for debugging deployed Abilities
  • Logging provides timestamps and severity levels

Examples

self.worker.editor_logging_handler.info("Calling weather API...")

response = requests.get(weather_url, timeout=10)

self.worker.editor_logging_handler.info(
    f"Weather API returned: {response.status_code}"
)
Log before and after API calls so you can track when/where things go wrong in production.

Session Tasks

OpenHome’s managed task system ensures async work gets properly cancelled when sessions end.
Never use raw asyncio methods.Raw asyncio tasks can outlive a session. If the user disconnects, your task keeps running as a ghost process.

session_tasks.create()

Launches an async task within the agent’s managed lifecycle.
self.worker.session_tasks.create(coroutine)
Use instead of: asyncio.create_task()

Examples

def call(self, worker: AgentWorker):
    self.worker = worker
    self.capability_worker = CapabilityWorker(self)
    
    # Launch main Ability task
    self.worker.session_tasks.create(self.run())

session_tasks.sleep()

Pauses execution for the specified duration.
await self.worker.session_tasks.sleep(seconds: float)
Use instead of: asyncio.sleep()

Examples

await self.capability_worker.speak("I'll check back in 5 seconds.")
await self.worker.session_tasks.sleep(5.0)
await self.capability_worker.speak("Time's up!")
Use session_tasks.sleep() for delays and timers. It’s automatically cancelled if the user disconnects.

User Connection Info

get_timezone()

Returns the user’s timezone string.
timezone = self.capability_worker.get_timezone()
Synchronous - Do NOT use await Returns: Timezone string like "America/Chicago" or None

Example

timezone = self.capability_worker.get_timezone()

if timezone:
    self.worker.editor_logging_handler.info(f"User timezone: {timezone}")
    # Use for date/time formatting
else:
    self.worker.editor_logging_handler.warning("Timezone unavailable")

user_socket.client.host

The user’s public IP address at connection time.
user_ip = self.worker.user_socket.client.host

Example

user_ip = self.worker.user_socket.client.host
self.worker.editor_logging_handler.info(f"User connected from: {user_ip}")

# Use for IP geolocation (check for cloud IPs first)
if "amazonaws" not in user_ip:
    location = geolocate(user_ip)
Cloud/datacenter IPs won’t give useful location data. Check the ISP name for keywords like “amazon”, “aws”, “google cloud” before using for geolocation.

Music Mode

Signal long-form audio playback to stop listening and prevent interruptions.

Pattern

# 1. Enter music mode
self.worker.music_mode_event.set()
await self.capability_worker.send_data_over_websocket(
    "music-mode",
    {"mode": "on"}
)

# 2. Play audio
await self.capability_worker.play_audio(audio_bytes)

# 3. Exit music mode
await self.capability_worker.send_data_over_websocket(
    "music-mode",
    {"mode": "off"}
)
self.worker.music_mode_event.clear()

Example

async def play_music(self):
    try:
        # Enter music mode
        self.worker.music_mode_event.set()
        await self.capability_worker.send_data_over_websocket(
            "music-mode",
            {"mode": "on"}
        )
        
        # Play audio
        audio = requests.get("https://example.com/song.mp3").content
        await self.capability_worker.play_audio(audio)
        
    finally:
        # Always exit music mode
        await self.capability_worker.send_data_over_websocket(
            "music-mode",
            {"mode": "off"}
        )
        self.worker.music_mode_event.clear()
    
    self.capability_worker.resume_normal_flow()
See Audio Playback for full details.

Conversation History

get_full_message_history()

Returns the full conversation history from the current session.
history = self.capability_worker.get_full_message_history()
Synchronous - Do NOT use await Returns: List of message objects with role and content

Example

history = self.capability_worker.get_full_message_history()

self.worker.editor_logging_handler.info(
    f"Conversation has {len(history)} messages"
)

# Use for context-aware responses
for msg in history[-3:]:  # Last 3 messages
    self.worker.editor_logging_handler.info(
        f"{msg['role']}: {msg['content']}"
    )
See LLM Documentation for using history with text_to_text_response().

Complete Example

main.py
import json
from src.agent.capability import MatchingCapability
from src.main import AgentWorker
from src.agent.capability_worker import CapabilityWorker

EXIT_WORDS = {"stop", "exit", "quit", "done"}

class HelperAbility(MatchingCapability):
    worker: AgentWorker = None
    capability_worker: CapabilityWorker = None
    
    def call(self, worker: AgentWorker):
        # Initialize
        self.worker = worker
        self.capability_worker = CapabilityWorker(self)
        
        # Launch main task
        self.worker.session_tasks.create(self.run())
    
    async def run(self):
        # Log start
        self.worker.editor_logging_handler.info("HelperAbility started")
        
        try:
            # Greet user
            await self.capability_worker.speak(
                "Hi! Ask me anything, or say stop when done."
            )
            
            # Main loop
            while True:
                user_input = await self.capability_worker.user_response()
                
                if not user_input:
                    continue
                
                # Log input
                self.worker.editor_logging_handler.info(
                    f"User said: {user_input}"
                )
                
                # Check for exit
                if any(word in user_input.lower() for word in EXIT_WORDS):
                    await self.capability_worker.speak("Goodbye!")
                    break
                
                # Generate response
                response = self.capability_worker.text_to_text_response(
                    f"Give a helpful 1-sentence response: {user_input}"
                )
                
                # Log response
                self.worker.editor_logging_handler.info(
                    f"LLM response: {response}"
                )
                
                # Speak response
                await self.capability_worker.speak(response)
        
        except Exception as e:
            # Log error
            self.worker.editor_logging_handler.error(f"Error: {e}")
            await self.capability_worker.speak(
                "Sorry, something went wrong."
            )
        
        finally:
            # ALWAYS resume normal flow
            self.worker.editor_logging_handler.info("HelperAbility ending")
            self.capability_worker.resume_normal_flow()

Best Practices

Always Resume Normal Flow

# ✅ Good - called on all paths
async def run(self):
    try:
        await self.do_work()
    except Exception:
        await self.capability_worker.speak("Error occurred.")
    
    self.capability_worker.resume_normal_flow()  # Always called

# ❌ Bad - forgotten on error path
async def run(self):
    try:
        await self.do_work()
        self.capability_worker.resume_normal_flow()
    except Exception:
        await self.capability_worker.speak("Error occurred.")  # Forgot resume!

Use session_tasks, Not asyncio

# ✅ Good
self.worker.session_tasks.create(self.background_task())
await self.worker.session_tasks.sleep(5.0)

# ❌ Bad - can leak tasks
asyncio.create_task(self.background_task())
await asyncio.sleep(5.0)

Log Everything Important

# ✅ Good
self.worker.editor_logging_handler.info("Calling API...")
result = api_call()
self.worker.editor_logging_handler.info(f"API returned: {result}")

# ❌ Bad - no visibility
result = api_call()

Listening

Always call resume_normal_flow() after input loops

LLM

Use get_full_message_history() for context

Build docs developers (and LLMs) love