Skip to main content
ZenML integrates seamlessly with all major agent frameworks. This guide shows integration patterns for 12 popular frameworks, complete with working code examples.

Integration patterns

All frameworks follow consistent patterns:
from zenml import pipeline, step
from typing import Annotated, Dict, Any

@step
def run_agent(query: str) -> Annotated[Dict[str, Any], "agent_results"]:
    """Execute framework-specific agent."""
    # Initialize agent (framework-specific)
    agent = initialize_framework_agent()
    
    # Run agent (framework-specific invocation)
    result = agent.invoke(query)
    
    # Return structured results
    return {
        "response": extract_response(result),
        "status": "success",
    }

@pipeline
def agent_pipeline(query: str = "Default query") -> str:
    """Framework-agnostic pipeline structure."""
    results = run_agent(query)
    formatted = format_response(results)
    return formatted

LangGraph

Type: Graph-based workflows with state management
Best for: Complex multi-step agent workflows, ReAct patterns

Integration

from langgraph.graph import StateGraph, END, START
from langchain_core.messages import HumanMessage, BaseMessage
from typing import TypedDict, List

class AgentState(TypedDict):
    messages: List[BaseMessage]
    query_type: str
    response_text: str

def build_langgraph_agent():
    """Build LangGraph workflow."""
    workflow = StateGraph(AgentState)
    
    # Define nodes
    def analyze_node(state: AgentState) -> AgentState:
        # Analysis logic
        return state
    
    def respond_node(state: AgentState) -> AgentState:
        # Response generation
        return state
    
    # Build graph
    workflow.add_node("analyze", analyze_node)
    workflow.add_node("respond", respond_node)
    workflow.add_edge(START, "analyze")
    workflow.add_edge("analyze", "respond")
    workflow.add_edge("respond", END)
    
    return workflow.compile()

@step
def run_langgraph_agent(
    query: str
) -> Annotated[Dict[str, Any], "langgraph_results"]:
    """Execute LangGraph workflow."""
    agent = build_langgraph_agent()
    
    initial_state = AgentState(
        messages=[HumanMessage(content=query)],
        query_type="",
        response_text="",
    )
    
    final_state = agent.invoke(initial_state)
    
    return {
        "response": final_state["response_text"],
        "query_type": final_state["query_type"],
        "status": "success",
    }

Requirements

langgraph>=0.2.0
langchain-core>=0.3.0
langchain-openai>=0.2.0

LangChain

Type: Composable chains and agents
Best for: Chain-based workflows, tool integration

Integration

from langchain.agents import create_agent
from langchain_core.runnables import Runnable
from langchain_openai import ChatOpenAI

def get_weather_tool(city: str) -> str:
    """Get weather for a city."""
    return f"Sunny, 22°C in {city}"

@step
def run_langchain_agent(
    query: str
) -> Annotated[Dict[str, Any], "langchain_results"]:
    """Execute LangChain agent."""
    agent = create_agent(
        model="openai:gpt-4",
        tools=[get_weather_tool],
        system_prompt="You are a helpful assistant",
    )
    
    result = agent.invoke({
        "messages": [{"role": "user", "content": query}]
    })
    
    return {
        "response": result["messages"][-1].content,
        "status": "success",
    }

Requirements

langchain>=0.3.0
langchain-openai>=0.2.0
langchain-community>=0.3.0

CrewAI

Type: Multi-agent crews with task delegation
Best for: Role-based collaboration, complex workflows

Integration

from crewai import Agent, Crew, Task
from crewai.tools import tool

@tool("Weather Tool")
def get_weather(city: str) -> str:
    """Get weather information."""
    return f"Weather in {city}: Sunny, 22°C"

researcher = Agent(
    role="Researcher",
    goal="Research {topic}",
    backstory="Expert researcher",
    tools=[get_weather],
    verbose=True,
)

writer = Agent(
    role="Writer",
    goal="Write about {topic}",
    backstory="Professional writer",
    verbose=True,
)

research_task = Task(
    description="Research {topic}",
    expected_output="Research findings",
    agent=researcher,
)

writing_task = Task(
    description="Write article about {topic}",
    expected_output="Article",
    agent=writer,
    context=[research_task],
)

crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, writing_task],
    verbose=True,
)

@step
def run_crewai_crew(
    topic: str
) -> Annotated[Dict[str, Any], "crew_results"]:
    """Execute CrewAI crew."""
    result = crew.kickoff(inputs={"topic": topic})
    
    return {
        "result": str(result),
        "status": "success",
    }

Requirements

crewai>=0.80.0
crewai-tools>=0.12.0

LlamaIndex

Type: Function-based agents with async support
Best for: RAG workflows, document processing

Integration

from llama_index.core.agent import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
import asyncio

def get_weather(city: str) -> str:
    """Get weather for a city."""
    return f"Sunny in {city}"

weather_tool = FunctionTool.from_defaults(fn=get_weather)

@step
def run_llamaindex_agent(
    query: str
) -> Annotated[Dict[str, Any], "llamaindex_results"]:
    """Execute LlamaIndex agent."""
    llm = OpenAI(model="gpt-4o-mini")
    agent = ReActAgent(
        tools=[weather_tool],
        llm=llm,
        verbose=True,
    )
    
    # LlamaIndex agents are async
    response = asyncio.run(agent.aquery(query))
    
    return {
        "response": str(response),
        "status": "success",
    }

Requirements

llama-index>=0.11.0
llama-index-llms-openai>=0.2.0

PydanticAI

Type: Type-safe agents with validation
Best for: Structured outputs, type safety

Integration

from pydantic_ai import Agent
from pydantic import BaseModel

class WeatherResponse(BaseModel):
    city: str
    temperature: float
    conditions: str

agent = Agent(
    "openai:gpt-4",
    system_prompt="Provide weather information",
    result_type=WeatherResponse,
)

@agent.tool
def get_current_weather(city: str) -> dict:
    """Get current weather."""
    return {"city": city, "temp": 22, "conditions": "Sunny"}

@step
def run_pydanticai_agent(
    query: str
) -> Annotated[Dict[str, Any], "pydanticai_results"]:
    """Execute PydanticAI agent."""
    result = agent.run_sync(query)
    
    return {
        "response": result.data.model_dump(),
        "status": "success",
    }

Requirements

pydantic-ai>=0.0.14
pydantic>=2.0.0

Haystack

Type: RAG pipelines with components
Best for: Document retrieval, search-based agents

Integration

from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder

@step
def run_haystack_agent(
    query: str
) -> Annotated[Dict[str, Any], "haystack_results"]:
    """Execute Haystack pipeline."""
    pipeline = Pipeline()
    
    prompt_builder = PromptBuilder(
        template="Answer this question: {{query}}"
    )
    generator = OpenAIGenerator(model="gpt-4")
    
    pipeline.add_component("prompt", prompt_builder)
    pipeline.add_component("llm", generator)
    pipeline.connect("prompt.prompt", "llm.prompt")
    
    result = pipeline.run({"prompt": {"query": query}})
    
    return {
        "response": result["llm"]["replies"][0],
        "status": "success",
    }

Requirements

haystack-ai>=2.7.0

OpenAI Agents SDK

Type: Official OpenAI agent implementation
Best for: OpenAI-native workflows, function calling

Integration

from openai import OpenAI
from openai.agents import Agent as OpenAIAgent

client = OpenAI()

@step
def run_openai_agent(
    query: str
) -> Annotated[Dict[str, Any], "openai_results"]:
    """Execute OpenAI agent."""
    agent = OpenAIAgent(
        name="Assistant",
        instructions="You are a helpful assistant",
        tools=[{"type": "code_interpreter"}],
        model="gpt-4",
    )
    
    thread = client.beta.threads.create()
    message = client.beta.threads.messages.create(
        thread_id=thread.id,
        role="user",
        content=query,
    )
    
    run = client.beta.threads.runs.create_and_poll(
        thread_id=thread.id,
        agent_id=agent.id,
    )
    
    messages = client.beta.threads.messages.list(thread_id=thread.id)
    response = messages.data[0].content[0].text.value
    
    return {
        "response": response,
        "status": "success",
    }

Requirements

openai>=1.54.0

Semantic Kernel

Type: Microsoft’s plugin-based architecture
Best for: Enterprise workflows, Azure integration

Integration

from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.functions import kernel_function
import asyncio

class WeatherPlugin:
    @kernel_function(name="get_weather")
    def get_weather(self, city: str) -> str:
        """Get weather for a city."""
        return f"Sunny in {city}"

@step
def run_semantic_kernel_agent(
    query: str
) -> Annotated[Dict[str, Any], "sk_results"]:
    """Execute Semantic Kernel agent."""
    kernel = Kernel()
    
    service = OpenAIChatCompletion(
        ai_model_id="gpt-4",
        api_key=os.getenv("OPENAI_API_KEY"),
    )
    kernel.add_service(service)
    kernel.add_plugin(WeatherPlugin(), "WeatherPlugin")
    
    async def run_agent():
        result = await kernel.invoke(
            service_id="gpt-4",
            function_name="get_weather",
            city="Berlin",
        )
        return str(result)
    
    response = asyncio.run(run_agent())
    
    return {
        "response": response,
        "status": "success",
    }

Requirements

semantic-kernel>=1.14.0

Autogen

Type: Conversational multi-agent systems
Best for: Agent-to-agent conversations, code execution

Integration

from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager

@step
def run_autogen_agents(
    task: str
) -> Annotated[Dict[str, Any], "autogen_results"]:
    """Execute Autogen multi-agent conversation."""
    config_list = [{"model": "gpt-4", "api_key": os.getenv("OPENAI_API_KEY")}]
    
    assistant = AssistantAgent(
        name="Assistant",
        llm_config={"config_list": config_list},
    )
    
    user_proxy = UserProxyAgent(
        name="UserProxy",
        human_input_mode="NEVER",
        code_execution_config={"use_docker": False},
    )
    
    user_proxy.initiate_chat(assistant, message=task)
    
    return {
        "response": assistant.last_message()["content"],
        "status": "success",
    }

Requirements

autogen>=0.2.0

AWS Strands

Type: Simple agent execution on AWS
Best for: AWS Bedrock integration, simple agents

Integration

from aws_agents import Agent as AWSAgent
import boto3

@step
def run_aws_strand_agent(
    query: str
) -> Annotated[Dict[str, Any], "aws_results"]:
    """Execute AWS Strands agent."""
    bedrock = boto3.client("bedrock-runtime")
    
    agent = AWSAgent(
        model_id="anthropic.claude-3-sonnet",
        client=bedrock,
    )
    
    response = agent(query)
    
    return {
        "response": str(response),
        "status": "success",
    }

Requirements

aws-agents>=0.1.0
boto3>=1.34.0

Qwen-Agent

Type: Function calling with Qwen models
Best for: Qwen model integration, MCP tools

Integration

from qwen_agent import Agent as QwenAgent
from qwen_agent.tools import BaseTool

class WeatherTool(BaseTool):
    name = "get_weather"
    description = "Get weather for a city"
    
    def call(self, params: dict) -> str:
        city = params.get("city", "Unknown")
        return f"Sunny in {city}"

@step
def run_qwen_agent(
    query: str
) -> Annotated[Dict[str, Any], "qwen_results"]:
    """Execute Qwen agent."""
    agent = QwenAgent(
        function_list=[WeatherTool()],
        llm_config={"model": "qwen-turbo"},
    )
    
    response = agent.run(query)
    
    return {
        "response": response[-1]["content"],
        "status": "success",
    }

Requirements

qwen-agent>=0.0.10

Google ADK

Type: Gemini-powered agents
Best for: Google Cloud integration, Gemini models

Integration

from google_adk import Agent as GoogleAgent
import google.generativeai as genai

@step
def run_google_adk_agent(
    query: str
) -> Annotated[Dict[str, Any], "google_results"]:
    """Execute Google ADK agent."""
    genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
    
    agent = GoogleAgent(
        model="gemini-2.0-flash-exp",
        system_instruction="You are a helpful assistant",
    )
    
    response = agent.run(query)
    
    return {
        "response": str(response),
        "status": "success",
    }

Requirements

google-adk>=0.1.0
google-generativeai>=0.8.0

Framework comparison

FrameworkComplexityAsync SupportBest Use Case
LangGraphHighYesComplex workflows
LangChainMediumYesChain composition
CrewAIMediumNoMulti-agent teams
LlamaIndexMediumYesRAG workflows
PydanticAILowYesType-safe agents
HaystackMediumNoDocument search
OpenAI SDKLowNoOpenAI-native
Semantic KernelMediumYesEnterprise/Azure
AutogenHighNoAgent conversations
AWS StrandsLowNoAWS Bedrock
Qwen-AgentLowNoQwen models
Google ADKLowNoGemini models

Deployment configuration

All frameworks can be deployed with Docker settings:
from zenml.config import DockerSettings

docker_settings = DockerSettings(
    requirements="requirements.txt",
    python_package_installer="uv",
    environment={
        "OPENAI_API_KEY": "${OPENAI_API_KEY}",
        "LANGFUSE_PUBLIC_KEY": "${LANGFUSE_PUBLIC_KEY}",
        "LANGFUSE_SECRET_KEY": "${LANGFUSE_SECRET_KEY}",
    },
)

@pipeline(settings={"docker": docker_settings})
def agent_pipeline(query: str) -> str:
    results = run_agent(query)
    return format_response(results)

Next steps

Orchestrating agents

Patterns for building production agent workflows

Agent evaluation

Systematic evaluation of agent performance

Framework integrations

Complete working examples for all 12 frameworks

Deploying agents

Deploy agents as HTTP services

Build docs developers (and LLMs) love