This example demonstrates how to integrate Composio with LangChain in Python using the Model Context Protocol (MCP) for seamless tool access.
Overview
In this example, you’ll learn how to:
Connect Composio to LangChain via MCP
Create a LangChain agent with Composio tools
Use async operations for better performance
Handle tool execution through MCP client
Prerequisites
Install dependencies
pip install composio langchain langchain-mcp-adapters langchain-openai
Set up environment variables
Create a .env file with your API keys: COMPOSIO_API_KEY = your_composio_api_key
OPENAI_API_KEY = your_openai_api_key
Authenticate with services
Complete Example
import asyncio
from langchain.agents import create_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai.chat_models import ChatOpenAI
from composio import Composio
# Initialize Composio and create a session
composio = Composio()
session = composio.create(
user_id = "user_123" ,
)
async def main ():
try :
# Create MCP client with Composio session
mcp_client = MultiServerMCPClient(
{
"composio" : {
"transport" : "streamable_http" ,
"url" : session.mcp.url,
"headers" : session.mcp.headers,
}
}
)
# Get tools from MCP client
tools = await mcp_client.get_tools()
# Create LangChain agent with the tools
agent = create_agent(
tools = tools,
model = ChatOpenAI( model = "gpt-4o" ),
)
# Execute the agent
result = await agent.ainvoke(
{
"messages" : [
{ "role" : "user" , "content" : "Fetch my last email and summarize?" }
]
}
)
print (result)
except Exception as e:
print (e)
if __name__ == "__main__" :
asyncio.run(main())
How It Works
Initialize Composio Session
Create a Composio session that provides an MCP server endpoint for the user. composio = Composio()
session = composio.create( user_id = "user_123" )
Create MCP Client
Initialize a MultiServerMCPClient that connects to Composio’s MCP server using HTTP streaming. mcp_client = MultiServerMCPClient({
"composio" : {
"transport" : "streamable_http" ,
"url" : session.mcp.url,
"headers" : session.mcp.headers,
}
})
Fetch Tools
Retrieve all available tools from the MCP client. These are automatically formatted for LangChain.
Create Agent
Use LangChain’s create_agent function to build an agent with the MCP tools and your chosen language model.
Invoke Agent
Call the agent asynchronously with your query. The agent will automatically select and execute the appropriate tools.
MCP Client Configuration
transport
string
default: "streamable_http"
The transport protocol for MCP communication. Options:
streamable_http: HTTP-based streaming (recommended)
stdio: Standard input/output (for local processes)
The MCP server URL from your Composio session
Authentication headers for the MCP server
Expected Output
{
'messages' : [
HumanMessage( content = 'Fetch my last email and summarize?' ),
AIMessage( content = 'I \' ll fetch your last email and summarize it.' , tool_calls = [ ... ]),
ToolMessage( content = '{"from": "[email protected] ", ...}' ),
AIMessage( content = 'Your last email was from John... \n\n Summary: ...' )
],
'output' : 'Your last email was from John regarding the project update...'
}
Working with Multiple MCP Servers
You can connect to multiple MCP servers simultaneously:
mcp_client = MultiServerMCPClient(
{
"composio" : {
"transport" : "streamable_http" ,
"url" : composio_session.mcp.url,
"headers" : composio_session.mcp.headers,
},
"other_server" : {
"transport" : "streamable_http" ,
"url" : "https://other-mcp-server.com" ,
"headers" : { "Authorization" : "Bearer token" },
},
}
)
# Get tools from all servers
tools = await mcp_client.get_tools()
LangGraph Integration
For more complex workflows, use LangGraph with MCP:
from langgraph.prebuilt import create_react_agent
# Create a ReAct agent with MCP tools
agent_executor = create_react_agent(
model = ChatOpenAI( model = "gpt-4o" ),
tools = tools,
)
# Stream the agent's execution
async for chunk in agent_executor.astream(
{ "messages" : [{ "role" : "user" , "content" : "Summarize my emails" }]}
):
print (chunk)
Error Handling
async def main ():
try :
mcp_client = MultiServerMCPClient({ ... })
tools = await mcp_client.get_tools()
# Create and run agent
agent = create_agent( tools = tools, model = ChatOpenAI( model = "gpt-4o" ))
result = await agent.ainvoke({ ... })
except ConnectionError as e:
print ( f "Failed to connect to MCP server: { e } " )
except TimeoutError as e:
print ( f "MCP request timeout: { e } " )
except Exception as e:
print ( f "Unexpected error: { e } " )
finally :
# Clean up MCP client connection
await mcp_client.close()
Streaming Responses
For real-time responses:
agent = create_agent(
tools = tools,
model = ChatOpenAI( model = "gpt-4o" , streaming = True ),
)
async for event in agent.astream_events(
{ "messages" : [{ "role" : "user" , "content" : "Fetch my emails" }]},
version = "v1" ,
):
if event[ "event" ] == "on_chat_model_stream" :
print (event[ "data" ][ "chunk" ].content, end = "" , flush = True )
Memory and State
Add conversation memory:
from langchain.memory import ConversationBufferMemory
from langchain.agents import AgentExecutor
memory = ConversationBufferMemory(
memory_key = "chat_history" ,
return_messages = True
)
agent_executor = AgentExecutor(
agent = agent,
tools = tools,
memory = memory,
verbose = True
)
# Multi-turn conversation
result1 = await agent_executor.ainvoke(
{ "input" : "What's my last email about?" }
)
result2 = await agent_executor.ainvoke(
{ "input" : "Reply to that email saying I'll review it tomorrow" }
)
Best Practices
Use Async : Always use async/await for better performance with MCP
Clean Up Connections : Close MCP client connections when done
Handle Timeouts : Set appropriate timeouts for long-running tool operations
Stream Large Responses : Use streaming for better UX with long responses
Next Steps
CrewAI Example Build multi-agent systems with CrewAI
Custom Tools Create custom Python tools