Skip to main content
This example demonstrates how to integrate Composio with LangChain in Python using the Model Context Protocol (MCP) for seamless tool access.

Overview

In this example, you’ll learn how to:
  • Connect Composio to LangChain via MCP
  • Create a LangChain agent with Composio tools
  • Use async operations for better performance
  • Handle tool execution through MCP client

Prerequisites

1

Install dependencies

pip install composio langchain langchain-mcp-adapters langchain-openai
2

Set up environment variables

Create a .env file with your API keys:
COMPOSIO_API_KEY=your_composio_api_key
OPENAI_API_KEY=your_openai_api_key
3

Authenticate with services

composio add gmail

Complete Example

import asyncio
from langchain.agents import create_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai.chat_models import ChatOpenAI
from composio import Composio

# Initialize Composio and create a session
composio = Composio()
session = composio.create(
    user_id="user_123",
)

async def main():
    try:
        # Create MCP client with Composio session
        mcp_client = MultiServerMCPClient(
            {
                "composio": {
                    "transport": "streamable_http",
                    "url": session.mcp.url,
                    "headers": session.mcp.headers,
                }
            }
        )

        # Get tools from MCP client
        tools = await mcp_client.get_tools()

        # Create LangChain agent with the tools
        agent = create_agent(
            tools=tools,
            model=ChatOpenAI(model="gpt-4o"),
        )

        # Execute the agent
        result = await agent.ainvoke(
            {
                "messages": [
                    {"role": "user", "content": "Fetch my last email and summarize?"}
                ]
            }
        )

        print(result)
    except Exception as e:
        print(e)

if __name__ == "__main__":
    asyncio.run(main())

How It Works

1

Initialize Composio Session

Create a Composio session that provides an MCP server endpoint for the user.
composio = Composio()
session = composio.create(user_id="user_123")
2

Create MCP Client

Initialize a MultiServerMCPClient that connects to Composio’s MCP server using HTTP streaming.
mcp_client = MultiServerMCPClient({
    "composio": {
        "transport": "streamable_http",
        "url": session.mcp.url,
        "headers": session.mcp.headers,
    }
})
3

Fetch Tools

Retrieve all available tools from the MCP client. These are automatically formatted for LangChain.
4

Create Agent

Use LangChain’s create_agent function to build an agent with the MCP tools and your chosen language model.
5

Invoke Agent

Call the agent asynchronously with your query. The agent will automatically select and execute the appropriate tools.

MCP Client Configuration

transport
string
default:"streamable_http"
The transport protocol for MCP communication. Options:
  • streamable_http: HTTP-based streaming (recommended)
  • stdio: Standard input/output (for local processes)
url
string
required
The MCP server URL from your Composio session
headers
dict
required
Authentication headers for the MCP server

Expected Output

{
  'messages': [
    HumanMessage(content='Fetch my last email and summarize?'),
    AIMessage(content='I\'ll fetch your last email and summarize it.', tool_calls=[...]),
    ToolMessage(content='{"from": "[email protected]", ...}'),
    AIMessage(content='Your last email was from John...\n\nSummary: ...')
  ],
  'output': 'Your last email was from John regarding the project update...'
}

Working with Multiple MCP Servers

You can connect to multiple MCP servers simultaneously:
mcp_client = MultiServerMCPClient(
    {
        "composio": {
            "transport": "streamable_http",
            "url": composio_session.mcp.url,
            "headers": composio_session.mcp.headers,
        },
        "other_server": {
            "transport": "streamable_http",
            "url": "https://other-mcp-server.com",
            "headers": {"Authorization": "Bearer token"},
        },
    }
)

# Get tools from all servers
tools = await mcp_client.get_tools()

LangGraph Integration

For more complex workflows, use LangGraph with MCP:
from langgraph.prebuilt import create_react_agent

# Create a ReAct agent with MCP tools
agent_executor = create_react_agent(
    model=ChatOpenAI(model="gpt-4o"),
    tools=tools,
)

# Stream the agent's execution
async for chunk in agent_executor.astream(
    {"messages": [{"role": "user", "content": "Summarize my emails"}]}
):
    print(chunk)

Error Handling

async def main():
    try:
        mcp_client = MultiServerMCPClient({...})
        tools = await mcp_client.get_tools()
        
        # Create and run agent
        agent = create_agent(tools=tools, model=ChatOpenAI(model="gpt-4o"))
        result = await agent.ainvoke({...})
        
    except ConnectionError as e:
        print(f"Failed to connect to MCP server: {e}")
    except TimeoutError as e:
        print(f"MCP request timeout: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    finally:
        # Clean up MCP client connection
        await mcp_client.close()

Streaming Responses

For real-time responses:
agent = create_agent(
    tools=tools,
    model=ChatOpenAI(model="gpt-4o", streaming=True),
)

async for event in agent.astream_events(
    {"messages": [{"role": "user", "content": "Fetch my emails"}]},
    version="v1",
):
    if event["event"] == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="", flush=True)

Memory and State

Add conversation memory:
from langchain.memory import ConversationBufferMemory
from langchain.agents import AgentExecutor

memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    memory=memory,
    verbose=True
)

# Multi-turn conversation
result1 = await agent_executor.ainvoke(
    {"input": "What's my last email about?"}
)

result2 = await agent_executor.ainvoke(
    {"input": "Reply to that email saying I'll review it tomorrow"}
)

Best Practices

Use Async: Always use async/await for better performance with MCP
Clean Up Connections: Close MCP client connections when done
Handle Timeouts: Set appropriate timeouts for long-running tool operations
Stream Large Responses: Use streaming for better UX with long responses

Next Steps

CrewAI Example

Build multi-agent systems with CrewAI

Custom Tools

Create custom Python tools

Build docs developers (and LLMs) love