Skip to main content
This collection demonstrates ZenML integration patterns for 12 popular agent frameworks. Each example follows consistent patterns while showcasing framework-specific features.

Overview

All examples are available at:
https://github.com/zenml-io/zenml/tree/main/examples/agent_framework_integrations

Quick start

Run any framework example:
cd agent_framework_integrations/<framework-name>/

# Create virtual environment
uv venv --python 3.11
source .venv/bin/activate

# Install dependencies
uv pip install -r requirements.txt

# Set API key
export OPENAI_API_KEY=sk-xxx

# Run the example
python run.py

Framework examples

LangGraph

Stateful graph-based workflows
# langgraph/langgraph_agent.py
from langchain.agents import create_agent

def get_weather(city: str) -> str:
    """Get weather for a given city."""
    return f"It's always sunny in {city}!"

agent = create_agent(
    model="openai:gpt-4",
    tools=[get_weather],
    system_prompt="You are a helpful assistant",
)

# langgraph/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from langgraph_agent import agent

@step
def run_langgraph_agent(
    query: str
) -> Annotated[Dict[str, Any], "agent_results"]:
    """Execute LangGraph agent."""
    result = agent.invoke(
        {"messages": [{"role": "user", "content": query}]}
    )
    
    return {
        "response": result["messages"][-1].content,
        "status": "success",
    }

@step
def format_response(
    results: Dict[str, Any]
) -> Annotated[str, "formatted_response"]:
    """Format agent output."""
    return f"Response: {results['response']}"

@pipeline
def agent_pipeline(query: str = "What's the weather in SF?") -> str:
    """LangGraph agent pipeline."""
    results = run_langgraph_agent(query)
    formatted = format_response(results)
    return formatted

if __name__ == "__main__":
    print("Running LangGraph agent pipeline...")
    agent_pipeline()
Key features:
  • Graph-based workflow definition
  • State management across nodes
  • ReAct pattern support
  • Complex multi-step reasoning
Requirements:
langgraph>=0.2.0
langchain-core>=0.3.0
langchain-openai>=0.2.0
zenml>=0.75.0

CrewAI

Multi-agent crews with role-based collaboration
# crewai/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from crewai import Agent, Crew, Task
from crewai.tools import tool

@tool("Weather Checker Tool")
def get_weather(city: str) -> str:
    """Get weather for a given city."""
    return f"Current weather in {city}: Sunny, 22°C"

# Define agents
weather_checker = Agent(
    role="Weather Specialist",
    goal="Check weather conditions for {city}",
    backstory="Expert meteorologist",
    tools=[get_weather],
    verbose=True,
)

travel_advisor = Agent(
    role="Travel Advisor",
    goal="Give travel advice based on weather for {city}",
    backstory="Experienced travel consultant",
    verbose=True,
)

# Define tasks
check_weather_task = Task(
    description="Check weather in {city}",
    expected_output="Weather report",
    agent=weather_checker,
)

packing_advice_task = Task(
    description="Provide packing advice for {city}",
    expected_output="List of items to pack",
    agent=travel_advisor,
    context=[check_weather_task],
)

# Create crew
crew = Crew(
    agents=[weather_checker, travel_advisor],
    tasks=[check_weather_task, packing_advice_task],
    verbose=True,
)

@step
def run_crewai_crew(
    city: str
) -> Annotated[Dict[str, Any], "crew_results"]:
    """Execute CrewAI crew."""
    result = crew.kickoff(inputs={"city": city})
    return {"city": city, "result": str(result), "status": "success"}

@step
def format_travel_results(
    crew_data: Dict[str, Any]
) -> Annotated[str, "formatted_results"]:
    """Format crew results."""
    return f"""Travel Planning for {crew_data['city'].upper()}
{'='*50}

{crew_data['result']}
"""

@pipeline
def travel_planning_pipeline(city: str = "Berlin") -> str:
    """CrewAI travel planning pipeline."""
    crew_results = run_crewai_crew(city)
    formatted = format_travel_results(crew_results)
    return formatted

if __name__ == "__main__":
    print("Running CrewAI travel planning pipeline...")
    travel_planning_pipeline()
Key features:
  • Role-based agent collaboration
  • Task dependencies and context sharing
  • Built-in tool decorators
  • Sequential task execution
Requirements:
crewai>=0.80.0
crewai-tools>=0.12.0
zenml>=0.75.0

LangChain

Composable chains and runnables
# langchain/langchain_agent.py
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI

def load_docs(input_dict: dict) -> str:
    """Load docs from URL."""
    url = input_dict.get("url")
    if not url:
        return "No URL provided."
    
    try:
        loader = WebBaseLoader(url)
        docs = loader.load()
        return "\n".join([doc.page_content for doc in docs])
    except Exception as e:
        return f"Error loading URL: {str(e)}"

# Define LLM and prompt
llm = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template(
    "Write a concise summary of the following text:\n\n{text}"
)

# Create runnable chain
chain = (
    RunnableLambda(load_docs)
    | RunnableLambda(lambda text: {"text": text})
    | prompt
    | llm
    | StrOutputParser()
)

# langchain/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from langchain_agent import chain

@step
def run_langchain_chain(
    url: str
) -> Annotated[Dict[str, Any], "chain_results"]:
    """Execute LangChain chain."""
    try:
        result = chain.invoke({"url": url})
        return {"summary": result, "status": "success"}
    except Exception as e:
        return {"summary": "", "status": "error", "error": str(e)}

@pipeline
def summarization_pipeline(
    url: str = "https://docs.zenml.io"
) -> str:
    """LangChain document summarization pipeline."""
    results = run_langchain_chain(url)
    return results["summary"]

if __name__ == "__main__":
    print("Running LangChain summarization pipeline...")
    summarization_pipeline()
Key features:
  • Composable runnable chains
  • Document loaders and processors
  • Output parsers for structured data
  • Tool integration via decorators
Requirements:
langchain>=0.3.0
langchain-openai>=0.2.0
langchain-community>=0.3.0
zenml>=0.75.0

LlamaIndex

Function-based agents with async support
# llama_index/agent.py
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI

def get_weather(city: str) -> str:
    """Get weather for a given city."""
    return f"It's always sunny in {city}!"

# Create tool from function
weather_tool = FunctionTool.from_defaults(fn=get_weather)

# Create agent
llm = OpenAI(model="gpt-4o-mini")
agent = ReActAgent(tools=[weather_tool], llm=llm, verbose=True)

# llama_index/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from agent import agent
import asyncio

@step
def run_llamaindex_agent(
    query: str
) -> Annotated[Dict[str, Any], "agent_results"]:
    """Execute LlamaIndex agent."""
    # LlamaIndex agents are async
    response = asyncio.run(agent.aquery(query))
    
    return {
        "response": str(response),
        "status": "success",
    }

@pipeline
def agent_pipeline(query: str = "What's the weather in Tokyo?") -> str:
    """LlamaIndex agent pipeline."""
    results = run_llamaindex_agent(query)
    return results["response"]

if __name__ == "__main__":
    print("Running LlamaIndex agent pipeline...")
    agent_pipeline()
Key features:
  • ReAct agent pattern
  • Async execution support
  • Function-based tools
  • RAG capabilities
Requirements:
llama-index>=0.11.0
llama-index-llms-openai>=0.2.0
zenml>=0.75.0

PydanticAI

Type-safe agents with structured outputs
# pydanticai/pydanticai_agent.py
from pydantic_ai import Agent
from pydantic import BaseModel

class WeatherResponse(BaseModel):
    city: str
    temperature: float
    conditions: str

agent = Agent(
    "openai:gpt-4",
    system_prompt="Provide weather information",
    result_type=WeatherResponse,
)

@agent.tool
def get_current_weather(city: str) -> dict:
    """Get current weather."""
    return {"city": city, "temp": 22, "conditions": "Sunny"}

# pydanticai/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from pydanticai_agent import agent

@step
def run_pydanticai_agent(
    query: str
) -> Annotated[Dict[str, Any], "agent_results"]:
    """Execute PydanticAI agent."""
    result = agent.run_sync(query)
    
    return {
        "response": result.data.model_dump(),
        "status": "success",
    }

@pipeline
def agent_pipeline(
    query: str = "What's the weather in Paris?"
) -> Dict[str, Any]:
    """PydanticAI agent pipeline."""
    results = run_pydanticai_agent(query)
    return results

if __name__ == "__main__":
    print("Running PydanticAI agent pipeline...")
    result = agent_pipeline()
    print(f"Result: {result}")
Key features:
  • Type-safe with Pydantic models
  • Structured output validation
  • Built-in tool decorators
  • Sync and async support
Requirements:
pydantic-ai>=0.0.14
pydantic>=2.0.0
zenml>=0.75.0

Common patterns

Pipeline structure

All examples follow this pattern:
from zenml import pipeline, step
from typing import Annotated, Dict, Any

@step
def run_agent(query: str) -> Annotated[Dict[str, Any], "results"]:
    """Execute framework-specific agent."""
    # Initialize agent
    agent = initialize_framework_agent()
    
    # Run agent
    result = agent.invoke(query)
    
    # Return structured results
    return {
        "response": extract_response(result),
        "status": "success",
    }

@step
def format_response(
    results: Dict[str, Any]
) -> Annotated[str, "formatted"]:
    """Format agent output."""
    return f"Response: {results['response']}"

@pipeline
def agent_pipeline(query: str = "Default query") -> str:
    """Framework-agnostic pipeline."""
    results = run_agent(query)
    formatted = format_response(results)
    return formatted

Error handling

@step
def run_agent_with_error_handling(
    query: str
) -> Annotated[Dict[str, Any], "results"]:
    """Execute agent with comprehensive error handling."""
    try:
        agent = initialize_agent()
        result = agent.process(query)
        
        return {
            "response": result.text,
            "status": "success",
        }
    except Exception as e:
        return {
            "response": "I apologize, but I'm experiencing technical difficulties.",
            "status": "error",
            "error_message": str(e),
        }

Deployment

All examples support deployment:
# Deploy any framework pipeline
zenml pipeline deploy agent_pipeline --name my-agent

# Invoke deployed agent
zenml deployment invoke my-agent --query="Your question here"

Framework-specific notes

Async frameworks

LlamaIndex and Semantic Kernel require async handling:
import asyncio

@step
def run_async_agent(query: str) -> Annotated[Dict[str, Any], "results"]:
    """Execute async agent."""
    async def run():
        return await agent.aquery(query)
    
    result = asyncio.run(run())
    return {"response": str(result), "status": "success"}

Response extraction

Different frameworks return different types:
# LangGraph/LangChain: Message objects
response = result["messages"][-1].content

# CrewAI: CrewOutput objects
response = str(result)

# LlamaIndex: Response objects
response = str(result)

# PydanticAI: Pydantic models
response = result.data.model_dump()

# OpenAI SDK: Message content
response = messages.data[0].content[0].text.value

Additional frameworks

The collection includes 7 more frameworks:
  • Haystack: RAG pipelines with retrieval
  • OpenAI Agents SDK: Official OpenAI implementation
  • Semantic Kernel: Microsoft’s plugin architecture
  • Autogen: Conversational multi-agent systems
  • AWS Strands: AWS Bedrock integration
  • Qwen-Agent: Qwen model integration
  • Google ADK: Gemini-powered agents
See the Agent Frameworks page for detailed integration patterns.

Running all examples

Test all frameworks:
cd agent_framework_integrations/

# Run all examples
for dir in */; do
    echo "Testing $dir"
    cd "$dir"
    uv pip install -r requirements.txt
    python run.py
    cd ..
done

Next steps

Agent frameworks

Detailed integration patterns for all 12 frameworks

Deploying agents

Deploy agents as production HTTP services

Agent comparison

Systematic evaluation of agent architectures

Orchestrating agents

Production agent orchestration patterns

Build docs developers (and LLMs) love