Skip to main content

Overview

These advanced patterns demonstrate production-ready implementations for real-world applications.

Tool Runner Hooks

Intercept and modify agent behavior at different stages:
examples/tool-runner-hooks/tool_runner_hooks.py
import asyncio

from fast_agent import FastAgent
from fast_agent.agents.agent_types import AgentConfig
from fast_agent.agents.tool_agent import ToolAgent
from fast_agent.agents.tool_runner import ToolRunnerHooks
from fast_agent.context import Context
from fast_agent.interfaces import ToolRunnerHookCapable
from fast_agent.types import PromptMessageExtended

def get_video_call_transcript(video_id: str) -> str:
    return "Assistant: Hi, how can I assist you today?\n\nCustomer: Hi, I wanted to ask you about last invoice I received..."

class HookedToolAgent(ToolAgent, ToolRunnerHookCapable):
    def __init__(
        self,
        config: AgentConfig,
        context: Context | None = None,
    ):
        tools = [get_video_call_transcript]
        super().__init__(config, tools, context)
        self._hooks = ToolRunnerHooks(
            before_llm_call=self._add_style_hint,
            after_tool_call=self._log_tool_result,
        )
    
    @property
    def tool_runner_hooks(self) -> ToolRunnerHooks | None:
        return self._hooks
    
    async def _add_style_hint(self, runner, messages: list[PromptMessageExtended]) -> None:
        if runner.iteration == 0:
            runner.append_messages("Keep the answer to one short sentence.")
    
    async def _log_tool_result(self, runner, message: PromptMessageExtended) -> None:
        if message.tool_results:
            tool_names = ", ".join(message.tool_results.keys())
            print(f"[hook] tool results received: {tool_names}")

fast = FastAgent("Example Tool Use Application (Hooks)")

@fast.custom(HookedToolAgent)
async def main() -> None:
    async with fast.run() as agent:
        await agent.default.generate(
            "What is the topic of the video call no.1234?",
        )
        await agent.interactive()

if __name__ == "__main__":
    asyncio.run(main())
Available hooks: before_llm_call, after_llm_call, before_tool_call, after_tool_call, on_error

Hook Use Cases

Track tool usage and performance:
async def log_tool_usage(runner, message: PromptMessageExtended) -> None:
    if message.tool_results:
        for tool_name, result in message.tool_results.items():
            logger.info(f"Tool: {tool_name}, Duration: {result.duration}ms")
            metrics.increment(f"tool.{tool_name}.calls")
Sanitize or format responses:
async def sanitize_response(runner, messages: list[PromptMessageExtended]) -> None:
    for msg in messages:
        if msg.role == "assistant" and msg.content:
            # Remove sensitive information
            msg.content = sanitize(msg.content)
Monitor token usage:
async def track_tokens(runner, message: PromptMessageExtended) -> None:
    if hasattr(message, 'usage'):
        cost = calculate_cost(message.usage)
        tracker.add_cost(cost)
        logger.info(f"Request cost: ${cost:.4f}")
Implement custom retry behavior:
async def retry_on_error(runner, error: Exception) -> None:
    if isinstance(error, RateLimitError):
        await asyncio.sleep(1)
        # Retry will happen automatically
    else:
        raise error

RAG Integration

Integrate Retrieval-Augmented Generation with Google Vertex AI:
examples/rag/vertex-rag.py
import asyncio
import google.auth
import vertexai
from vertexai import rag
from fast_agent import FastAgent
from fast_agent.config import get_settings

CONFIG_PATH = "fastagent.secrets.yaml"
_settings = get_settings(CONFIG_PATH)
_vertex_ai = getattr(_settings.google, "vertex_ai", {}) if _settings.google else {}
PROJECT_ID = _vertex_ai.get("project_id")
LOCATION = _vertex_ai.get("location")
EMBEDDING_MODEL = "text-embedding-005"

def mini_rag(query: str, drive_id: str, top_k: int) -> object:
    """Query RAG corpus for relevant documents."""
    vertexai.init(project=PROJECT_ID, location=LOCATION)
    
    # Create or get existing corpus
    paths = [f"https://drive.google.com/drive/folders/{drive_id}"]
    existing_corpus = None
    for corpus in rag.list_corpora():
        if corpus.display_name and drive_id in corpus.display_name:
            existing_corpus = corpus
            break
    
    if not existing_corpus:
        # Create new corpus and import files
        embedding_model_config = rag.RagEmbeddingModelConfig(
            vertex_prediction_endpoint=rag.VertexPredictionEndpoint(
                publisher_model=f"publishers/google/models/{EMBEDDING_MODEL}"
            )
        )
        rag_corpus = rag.create_corpus(
            display_name=f"Corpus {drive_id}",
            backend_config=rag.RagVectorDbConfig(
                rag_embedding_model_config=embedding_model_config
            ),
        )
        rag.import_files(
            rag_corpus.name,
            paths,
            transformation_config=rag.TransformationConfig(
                chunking_config=rag.ChunkingConfig(
                    chunk_size=512,
                    chunk_overlap=100,
                ),
            ),
        )
    else:
        rag_corpus = existing_corpus
    
    # Query the corpus
    rag_retrieval_config = rag.RagRetrievalConfig(
        top_k=top_k,
        filter=rag.Filter(vector_distance_threshold=0.5),
    )
    return rag.retrieval_query(
        rag_resources=[rag.RagResource(rag_corpus=rag_corpus.name)],
        text=query,
        rag_retrieval_config=rag_retrieval_config,
    )

fast = FastAgent("Google Vertex RAG")

@fast.agent(
    name="vertex_rag",
    function_tools=[mini_rag],
)
async def main():
    async with fast.run() as agent:
        result = await agent(
            "Produce a short top 5 prioritized list about customer pain points. From RAG, select 50 relevant chunks."
        )
        print(result)

if __name__ == "__main__":
    asyncio.run(main())
Configure Vertex AI credentials in fastagent.secrets.yaml:
google:
  vertex_ai:
    enabled: true
    project_id: your-project-id
    location: us-central1

FastAPI Integration

Simple Integration

Basic FastAPI integration with agent lifecycle management:
examples/fastapi/fastapi-simple.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fast_agent import FastAgent

# Create FastAgent without parsing CLI args
fast = FastAgent("fast-agent demo", parse_cli_args=False, quiet=True)

@fast.agent(name="helper", instruction="You are a helpful AI Agent.", default=True)
async def decorator():
    pass

# Keep FastAgent running for the app lifetime
@asynccontextmanager
async def lifespan(app: FastAPI):
    async with fast.run() as agents:
        app.state.agents = agents
        yield

app = FastAPI(lifespan=lifespan)

class AskRequest(BaseModel):
    message: str

class AskResponse(BaseModel):
    response: str

@app.post("/ask", response_model=AskResponse)
async def ask(req: AskRequest) -> AskResponse:
    try:
        result = await app.state.agents.send(req.message)
        return AskResponse(response=result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
Run with:
uvicorn fastapi-simple:app --reload

Advanced Integration

Manual lifecycle control with Core API:
examples/fastapi/fastapi-advanced.py
from contextlib import asynccontextmanager
from fastapi import Body, FastAPI, HTTPException
from fast_agent import PromptMessageExtended
from fast_agent.agents import McpAgent
from fast_agent.agents.agent_types import AgentConfig
from fast_agent.core import Core
from fast_agent.core.direct_factory import get_model_factory

core = Core(name="fast-agent core")

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Manual lifecycle control
    await core.initialize()
    
    cfg = AgentConfig(
        name="core_agent",
        instruction="You are a helpful AI Agent.",
    )
    
    agent = McpAgent(config=cfg, context=core.context)
    await agent.initialize()
    
    llm_factory = get_model_factory(core.context, model=cfg.model)
    await agent.attach_llm(llm_factory)
    
    app.state.agent = agent
    try:
        yield
    finally:
        try:
            await agent.shutdown()
        finally:
            await core.cleanup()

app = FastAPI(lifespan=lifespan)

@app.post("/ask", response_model=PromptMessageExtended)
async def ask(body: str = Body(..., media_type="text/plain")) -> PromptMessageExtended:
    try:
        # Call generate() to return the full multipart message
        result = await app.state.agent.generate(body)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
The advanced pattern gives you full control over agent initialization and cleanup, useful for custom configurations.

OpenTelemetry Integration

Monitor your agents with OpenTelemetry:
fastagent.config.yaml
observability:
  otel:
    enabled: true
    endpoint: "http://localhost:4318"
    service_name: "fast-agent"
    traces:
      enabled: true
      sample_rate: 1.0
    metrics:
      enabled: true
Instrument your code:
from fast_agent.observability import get_tracer, get_meter

tracer = get_tracer(__name__)
meter = get_meter(__name__)

# Create metrics
request_counter = meter.create_counter(
    "agent.requests",
    description="Number of agent requests"
)

@fast.agent(name="monitored_agent")
async def main():
    async with fast.run() as agent:
        with tracer.start_as_current_span("agent.process"):
            request_counter.add(1)
            result = await agent.send("Hello")
            return result

Agent-to-Agent Communication

Connect agents across different processes or machines:
examples/a2a/server.py
import asyncio
from fast_agent import FastAgent

fast = FastAgent("A2A Server")

@fast.agent(
    name="worker",
    instruction="You are a worker agent that processes tasks.",
    servers=["filesystem"],
)
async def main():
    # Start A2A server
    await fast.start_a2a_server(host="0.0.0.0", port=8765)

if __name__ == "__main__":
    asyncio.run(main())
Connect from another agent:
examples/a2a/agent_executor.py
import asyncio
from fast_agent import FastAgent

fast = FastAgent("A2A Client")

@fast.agent(
    name="coordinator",
    instruction="You coordinate tasks with remote workers.",
    a2a_connections=[
        {"name": "worker", "url": "ws://localhost:8765"}
    ],
)
async def main():
    async with fast.run() as agent:
        # Call remote agent as a tool
        result = await agent.coordinator.send(
            "Process this file using the worker agent"
        )
        print(result)

if __name__ == "__main__":
    asyncio.run(main())
A2A communication is experimental and subject to change. Use in production with caution.

Custom Model Providers

Integrate custom LLM providers:
from fast_agent.llm.base import BaseLLM, LLMConfig
from fast_agent.types import PromptMessageExtended

class CustomLLM(BaseLLM):
    def __init__(self, config: LLMConfig):
        super().__init__(config)
        # Initialize your custom provider
        self.client = CustomClient(api_key=config.api_key)
    
    async def generate(
        self,
        messages: list[PromptMessageExtended],
        **kwargs
    ) -> PromptMessageExtended:
        # Implement your generation logic
        response = await self.client.complete(messages)
        return self._format_response(response)

# Register the provider
from fast_agent.llm.model_factory import ModelFactory

ModelFactory.register_provider("custom", CustomLLM)

# Use in your agent
@fast.agent(model="custom:my-model")
async def main():
    async with fast.run() as agent:
        await agent.interactive()

Production Best Practices

Implement comprehensive error handling:
from fast_agent.exceptions import AgentError, ToolError, LLMError

try:
    result = await agent.send(message)
except ToolError as e:
    logger.error(f"Tool failed: {e.tool_name} - {e.message}")
    # Handle tool failure
except LLMError as e:
    logger.error(f"LLM error: {e}")
    # Handle LLM failure
except AgentError as e:
    logger.error(f"Agent error: {e}")
    # Handle general agent error
Implement rate limiting for API calls:
from aiolimiter import AsyncLimiter

rate_limiter = AsyncLimiter(max_rate=10, time_period=1)  # 10 requests per second

async def rate_limited_send(agent, message):
    async with rate_limiter:
        return await agent.send(message)
Use environment variables and secret managers:
fastagent.secrets.yaml
llm:
  anthropic:
    api_key: "${ANTHROPIC_API_KEY}"
  openai:
    api_key: "${OPENAI_API_KEY}"

mcp:
  servers:
    brave-search:
      env:
        BRAVE_API_KEY: "${BRAVE_API_KEY}"
Load from AWS Secrets Manager:
import boto3
import json

def load_secrets():
    client = boto3.client('secretsmanager')
    response = client.get_secret_value(SecretId='fast-agent/prod')
    return json.loads(response['SecretString'])
Implement health check endpoints:
@app.get("/health")
async def health_check():
    try:
        # Check agent is responsive
        await app.state.agent.send("ping")
        return {"status": "healthy", "agent": "ready"}
    except Exception as e:
        raise HTTPException(status_code=503, detail=str(e))

@app.get("/ready")
async def readiness_check():
    if not app.state.agent:
        raise HTTPException(status_code=503, detail="Agent not initialized")
    return {"status": "ready"}
Handle shutdown gracefully:
import signal

shutdown_event = asyncio.Event()

def signal_handler(sig, frame):
    logger.info("Shutting down gracefully...")
    shutdown_event.set()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

async def main():
    async with fast.run() as agent:
        try:
            await shutdown_event.wait()
        finally:
            logger.info("Cleanup complete")

Performance Optimization

Streaming Responses

@app.post("/stream")
async def stream_response(req: AskRequest):
    async def generate():
        async for chunk in app.state.agent.stream(req.message):
            yield f"data: {chunk}\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

Caching

from functools import lru_cache
import hashlib

class CachedAgent:
    def __init__(self, agent):
        self.agent = agent
        self.cache = {}
    
    async def send(self, message: str) -> str:
        # Simple hash-based caching
        cache_key = hashlib.md5(message.encode()).hexdigest()
        
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        result = await self.agent.send(message)
        self.cache[cache_key] = result
        return result

Connection Pooling

@fast.agent(
    name="pooled_agent",
    servers=["fetch"],
    connection_persistence=True,  # Reuse MCP connections
)
async def main():
    async with fast.run() as agent:
        # Connections are pooled and reused
        await agent.interactive()

Next Steps

Configuration

Learn about all configuration options

Basic Examples

Back to basic usage examples

API Reference

Explore the complete API

GitHub

Contribute to Fast Agent on GitHub

Build docs developers (and LLMs) love