Skip to main content
Orchestrating AI agents requires different patterns than traditional ML pipelines. This guide covers the essential techniques for building robust, reproducible agent workflows.

Agent pipeline anatomy

A typical agent pipeline consists of three core components:
from zenml import pipeline, step
from typing import Annotated, Dict, Any

@step
def run_agent(query: str) -> Annotated[Dict[str, Any], "agent_results"]:
    """Execute the agent and capture results."""
    agent = initialize_agent()
    result = agent.process(query)
    
    return {
        "response": result.text,
        "confidence": result.confidence,
        "latency_ms": result.latency_ms,
        "tokens_used": result.tokens_used,
    }

@step
def format_response(
    results: Dict[str, Any]
) -> Annotated[str, "formatted_response"]:
    """Format agent output for consumption."""
    return f"""Response: {results['response']}
    
Confidence: {results['confidence']:.2f}
Latency: {results['latency_ms']:.1f}ms
Tokens: {results['tokens_used']}"""

@pipeline
def agent_pipeline(query: str = "Default query") -> str:
    """Main agent orchestration pipeline."""
    results = run_agent(query)
    formatted = format_response(results)
    return formatted

Key principles

  1. Annotated outputs: Use Annotated[Type, "artifact_name"] to track all outputs as versioned artifacts
  2. Structured results: Return dictionaries or Pydantic models, not just strings
  3. Error handling: Wrap agent calls in try-except blocks with status tracking
  4. Metadata capture: Log execution details (latency, tokens, costs) for analysis

Deployment patterns

Local execution

Run pipelines locally for development and testing:
if __name__ == "__main__":
    # Run with default parameters
    agent_pipeline()
    
    # Run with custom query
    agent_pipeline(query="What's the weather in Berlin?")

HTTP deployment

Deploy as a production HTTP service:
from zenml.config import DeploymentSettings, CORSConfig

deployment_settings = DeploymentSettings(
    app_title="Customer Support Agent",
    cors=CORSConfig(allow_origins=["*"]),
    dashboard_files_path="ui",  # Serve web UI
)

@pipeline(
    settings={"deployment": deployment_settings},
    enable_cache=False
)
def agent_api(query: str) -> str:
    """Agent deployed as HTTP endpoint."""
    response = run_agent(query)
    return format_response(response)
Deploy and invoke:
# Deploy the pipeline
zenml pipeline deploy agent_api --name support-agent

# Get endpoint URL
zenml deployment describe support-agent

# Invoke via CLI
zenml deployment invoke support-agent --query="How do I return an item?"

# Invoke via HTTP
curl -X POST http://localhost:8000/invoke \
  -H "Content-Type: application/json" \
  -d '{"parameters": {"query": "How do I return an item?"}}'

Docker configuration

Package agents with their dependencies:
from zenml.config import DockerSettings

docker_settings = DockerSettings(
    requirements="requirements.txt",
    python_package_installer="uv",  # Fast installs
    environment={
        "OPENAI_API_KEY": "${OPENAI_API_KEY}",
        "LANGFUSE_PUBLIC_KEY": "${LANGFUSE_PUBLIC_KEY}",
        "LANGFUSE_SECRET_KEY": "${LANGFUSE_SECRET_KEY}",
    },
)

@pipeline(settings={"docker": docker_settings})
def agent_pipeline(query: str) -> str:
    response = run_agent(query)
    return format_response(response)

Multi-agent orchestration

Routing pattern

Route queries to specialized agents:
@step
def classify_intent(
    query: str
) -> Annotated[str, "intent"]:
    """Classify query intent for routing."""
    query_lower = query.lower()
    
    if any(word in query_lower for word in ["return", "refund"]):
        return "returns_specialist"
    elif any(word in query_lower for word in ["billing", "payment"]):
        return "billing_specialist"
    elif any(word in query_lower for word in ["technical", "setup"]):
        return "technical_support"
    return "general_support"

@step
def run_specialist(
    query: str, intent: str
) -> Annotated[Dict[str, Any], "specialist_response"]:
    """Execute appropriate specialist agent."""
    agents = {
        "returns_specialist": ReturnsAgent(),
        "billing_specialist": BillingAgent(),
        "technical_support": TechnicalAgent(),
        "general_support": GeneralAgent(),
    }
    
    agent = agents.get(intent, agents["general_support"])
    result = agent.process(query)
    
    return {
        "response": result.text,
        "specialist": intent,
        "confidence": result.confidence,
    }

@pipeline
def multi_agent_pipeline(query: str) -> str:
    """Multi-specialist agent system."""
    intent = classify_intent(query)
    results = run_specialist(query, intent)
    return format_response(results)

CrewAI integration

Orchestrate agent crews:
from crewai import Agent, Crew, Task
from crewai.tools import tool

@tool("Weather Checker Tool")
def get_weather(city: str) -> str:
    """Get weather for a given city."""
    return f"Current weather in {city}: Sunny, 22°C"

weather_checker = Agent(
    role="Weather Specialist",
    goal="Check weather conditions for {city}",
    backstory="Expert meteorologist",
    tools=[get_weather],
    verbose=True,
)

travel_advisor = Agent(
    role="Travel Advisor",
    goal="Give travel advice based on weather for {city}",
    backstory="Experienced travel consultant",
    verbose=True,
)

check_weather_task = Task(
    description="Check weather in {city}",
    expected_output="Weather report",
    agent=weather_checker,
)

packing_advice_task = Task(
    description="Provide packing advice for {city}",
    expected_output="List of items to pack",
    agent=travel_advisor,
    context=[check_weather_task],
)

crew = Crew(
    agents=[weather_checker, travel_advisor],
    tasks=[check_weather_task, packing_advice_task],
    verbose=True,
)

@step
def run_crew(city: str) -> Annotated[Dict[str, Any], "crew_results"]:
    """Execute CrewAI crew."""
    result = crew.kickoff(inputs={"city": city})
    return {"city": city, "result": str(result), "status": "success"}

@pipeline
def travel_planning_pipeline(city: str = "Berlin") -> str:
    """CrewAI travel planning."""
    crew_results = run_crew(city)
    formatted = format_travel_results(crew_results)
    return formatted

LangGraph workflows

Build stateful agent workflows:
from langgraph.graph import StateGraph, END, START
from langchain_core.messages import HumanMessage
from typing import TypedDict, List

class AgentState(TypedDict):
    """State for customer service workflow."""
    messages: List[BaseMessage]
    query_type: str
    confidence: float
    response_text: str

class CustomerServiceAgent:
    """LangGraph-based agent."""
    
    def __init__(self):
        self.graph = self._build_graph()
    
    def _build_graph(self):
        workflow = StateGraph(AgentState)
        
        # Add nodes
        workflow.add_node("analyze_query", self._analyze_query)
        workflow.add_node("classify_intent", self._classify_intent)
        workflow.add_node("generate_response", self._generate_response)
        workflow.add_node("validate_response", self._validate_response)
        
        # Add edges
        workflow.add_edge(START, "analyze_query")
        workflow.add_edge("analyze_query", "classify_intent")
        workflow.add_edge("classify_intent", "generate_response")
        workflow.add_edge("generate_response", "validate_response")
        workflow.add_edge("validate_response", END)
        
        return workflow.compile()
    
    def _analyze_query(self, state: AgentState) -> AgentState:
        """Analyze query complexity."""
        query = state["messages"][-1].content
        state["confidence"] = 0.9 if len(query.split()) < 10 else 0.8
        return state
    
    def _classify_intent(self, state: AgentState) -> AgentState:
        """Classify customer intent."""
        query = state["messages"][-1].content.lower()
        if "return" in query or "refund" in query:
            state["query_type"] = "returns"
        elif "billing" in query or "payment" in query:
            state["query_type"] = "billing"
        else:
            state["query_type"] = "general"
        return state
    
    def _generate_response(self, state: AgentState) -> AgentState:
        """Generate response based on intent."""
        query = state["messages"][-1].content
        query_type = state["query_type"]
        
        # Call LLM or use fallback logic
        response = generate_response_for_type(query, query_type)
        state["response_text"] = response
        return state
    
    def _validate_response(self, state: AgentState) -> AgentState:
        """Validate response quality."""
        if len(state["response_text"]) < 20:
            state["response_text"] = "Could you provide more details?"
            state["confidence"] = 0.6
        return state
    
    def process(self, query: str) -> Dict[str, Any]:
        """Process query through workflow."""
        initial_state = AgentState(
            messages=[HumanMessage(content=query)],
            query_type="",
            confidence=0.8,
            response_text="",
        )
        final_state = self.graph.invoke(initial_state)
        return {
            "response": final_state["response_text"],
            "query_type": final_state["query_type"],
            "confidence": final_state["confidence"],
        }

@step
def run_langgraph_agent(
    query: str
) -> Annotated[Dict[str, Any], "langgraph_results"]:
    """Execute LangGraph workflow."""
    agent = CustomerServiceAgent()
    return agent.process(query)

@pipeline
def langgraph_pipeline(query: str) -> str:
    """LangGraph agent pipeline."""
    results = run_langgraph_agent(query)
    return format_response(results)

Hybrid architectures

Combine traditional ML with LLM agents:
@step
def train_classifier(
    queries: List[str], labels: List[str]
) -> Annotated[Any, "intent_classifier"]:
    """Train intent classification model."""
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.naive_bayes import MultinomialNB
    from sklearn.pipeline import Pipeline
    
    classifier = Pipeline([
        ('tfidf', TfidfVectorizer(max_features=100)),
        ('clf', MultinomialNB()),
    ])
    classifier.fit(queries, labels)
    return classifier

@step
def hybrid_agent(
    query: str, classifier: Any
) -> Annotated[Dict[str, Any], "hybrid_response"]:
    """Use classifier + LLM for better results."""
    # Fast classification with traditional ML
    intent = classifier.predict([query])[0]
    confidence = classifier.predict_proba([query]).max()
    
    # Use LLM only for complex queries
    if confidence < 0.7:
        response = call_llm(query, intent)
    else:
        response = get_template_response(intent)
    
    return {
        "response": response,
        "intent": intent,
        "confidence": confidence,
        "used_llm": confidence < 0.7,
    }

@pipeline
def hybrid_pipeline() -> str:
    """Hybrid ML + LLM architecture."""
    # Train classifier once
    queries, labels = load_training_data()
    classifier = train_classifier(queries, labels)
    
    # Use for inference
    test_query = "I want to return my order"
    results = hybrid_agent(test_query, classifier)
    return format_response(results)

Error handling

Robust error handling for production:
@step
def run_agent_with_retry(
    query: str, max_retries: int = 3
) -> Annotated[Dict[str, Any], "agent_results"]:
    """Execute agent with retry logic."""
    import time
    
    for attempt in range(max_retries):
        try:
            agent = initialize_agent()
            result = agent.process(query)
            
            return {
                "response": result.text,
                "status": "success",
                "attempt": attempt + 1,
            }
        except Exception as e:
            if attempt == max_retries - 1:
                return {
                    "response": "I apologize, but I'm experiencing technical difficulties.",
                    "status": "error",
                    "error_message": str(e),
                    "attempt": attempt + 1,
                }
            time.sleep(2 ** attempt)  # Exponential backoff

Observability integration

Track agent performance with Langfuse:
from langfuse import Langfuse

langfuse = Langfuse(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
)

@step
def run_agent_with_tracing(
    query: str
) -> Annotated[Dict[str, Any], "agent_results"]:
    """Agent execution with Langfuse tracing."""
    trace = langfuse.trace(
        name="agent_execution",
        metadata={"query_length": len(query)},
    )
    
    with trace.span(name="agent_processing") as span:
        agent = initialize_agent()
        result = agent.process(query)
        
        span.update(
            metadata={
                "tokens_used": result.tokens_used,
                "latency_ms": result.latency_ms,
            }
        )
    
    trace.update(output=result.text)
    
    return {
        "response": result.text,
        "trace_id": trace.id,
        "tokens_used": result.tokens_used,
    }

Best practices

  1. Use artifacts: Store all agent outputs as versioned artifacts with Annotated
  2. Capture metadata: Log latency, tokens, costs, and confidence scores
  3. Handle errors gracefully: Return status fields and fallback responses
  4. Enable caching carefully: Set enable_cache=False for non-deterministic agents
  5. Structure outputs: Use Pydantic models or dicts, not raw strings
  6. Deploy with Docker: Package dependencies with DockerSettings
  7. Monitor production: Integrate observability tools like Langfuse
  8. Test systematically: Build evaluation pipelines to compare architectures

Next steps

Agent frameworks

Integration guides for 12+ agent frameworks

Agent evaluation

Build systematic evaluation pipelines

Deploying agents

Complete deployment example with web UI

Agent comparison

Compare multiple agent architectures

Build docs developers (and LLMs) love