Skip to main content

Integration Tool

The Integration tool manages user connections to third-party services and enables discovery of public integrations from the marketplace.

Overview

Integration capabilities:
  • List connected integrations
  • Search public marketplace integrations
  • Initiate OAuth connections
  • Check integration status
  • Manage custom MCP integrations
# Location: apps/api/app/agents/tools/integration_tool.py

Available Tools

List Integrations

@tool
@with_doc(LIST_INTEGRATIONS)
async def list_integrations(
    config: RunnableConfig,
    search_public_query: Annotated[
        Optional[str],
        "Search query to discover public integrations from marketplace"
    ] = None,
) -> ListIntegrationsResult | str:
    """
    List user integrations and optionally search for suggested public integrations.
    
    Returns structured data for LLM context and streams suggested integrations
    to the frontend for the 'Discover More' section.
    
    Args:
        config: Runtime configuration with user_id
        search_public_query: Natural language search (e.g., "email automation")
    
    Returns:
        {
            "connected": [{"id": "gmail", "name": "Gmail", ...}],
            "available": [{"id": "notion", "name": "Notion", ...}],
            "suggested": [{"id": "zapier", "name": "Zapier", ...}]
        }
    """
    user_id = config.get("configurable", {}).get("user_id")
    if not user_id:
        return "Error: User ID not found in configuration."
    
    writer = get_stream_writer()
    
    # Fetch platform integrations with connection status
    platform_ids = [i.id for i in OAUTH_INTEGRATIONS if i.available]
    status_map = await check_multiple_integrations_status(platform_ids, user_id)
    
    connected_list = []
    available_list = []
    
    for integration in OAUTH_INTEGRATIONS:
        if not integration.available:
            continue
        
        is_connected = status_map.get(integration.id, False)
        info = {
            "id": integration.id,
            "name": integration.name,
            "description": integration.description,
            "category": integration.category,
            "connected": is_connected,
        }
        
        if is_connected:
            connected_list.append(info)
        else:
            available_list.append(info)
    
    # Search for suggested public integrations
    suggested_list = []
    
    if search_public_query and search_public_query.strip():
        query = search_public_query.strip()
        logger.info(f"Searching public integrations with query: {query}")
        
        # Get IDs to exclude
        existing_ids = {i["id"] for i in connected_list + available_list}
        
        # Build flexible word-based search
        words = build_search_patterns(query)
        
        # MongoDB regex search
        word_conditions = []
        for word in words:
            escaped_word = re.escape(word)
            word_conditions.extend([
                {"name": {"$regex": escaped_word, "$options": "i"}},
                {"description": {"$regex": escaped_word, "$options": "i"}},
                {"category": {"$regex": escaped_word, "$options": "i"}},
            ])
        
        search_filter = {
            "is_public": True,
            "integration_id": {"$nin": list(existing_ids)},
            "$or": word_conditions
        }
        
        docs_cursor = integrations_collection.find(search_filter).limit(MAX_SUGGESTED_FOR_LLM)
        
        async for doc in docs_cursor:
            iid = doc.get("integration_id")
            suggested_list.append({
                "id": iid,
                "name": doc.get("name", ""),
                "description": doc.get("description", ""),
                "category": doc.get("category", "custom"),
                "icon_url": doc.get("icon_url"),
                "auth_type": doc.get("mcp_config", {}).get("auth_type"),
                "relevance_score": 1.0,
                "slug": generate_integration_slug(
                    name=doc.get("name", ""),
                    category=doc.get("category", "custom"),
                    integration_id=iid,
                ),
            })
    
    # Stream suggested integrations to frontend
    if suggested_list:
        writer({
            "integration_list_data": {
                "hasSuggestions": True,
                "suggested": suggested_list[:MAX_SUGGESTED_FOR_LLM],
            }
        })
    
    # Return structured data for LLM (with limits)
    return {
        "connected": connected_list[:MAX_CONNECTED_FOR_LLM],
        "available": available_list[:MAX_AVAILABLE_FOR_LLM],
        "suggested": suggested_list[:MAX_SUGGESTED_FOR_LLM],
    }
Usage:
# List user's current integrations
result = await list_integrations(config)

# Search marketplace for new integrations
result = await list_integrations(
    config,
    search_public_query="project management tools"
)

Suggest Integrations

@tool
async def suggest_integrations(
    query: Annotated[
        str,
        "Search query to find relevant public integrations"
    ],
    config: RunnableConfig,
) -> ListIntegrationsResult | str:
    """
    Search for and suggest public integrations from the marketplace.
    
    Use when user wants to discover new integrations, find alternatives,
    or explore what's available in a specific category.
    
    Examples:
    - "email tools"
    - "project management"
    - "social media"
    - "CRM systems"
    - "Slack alternatives"
    """
    return await list_integrations.ainvoke(
        {"search_public_query": query},
        config=config
    )

Connect Integration

@tool
@with_doc(CONNECT_INTEGRATION)
async def connect_integration(
    integration_names: Annotated[
        List[str],
        "List of integration names or IDs to connect"
    ],
    config: RunnableConfig,
) -> str:
    """
    Initiate OAuth connection flow for integrations.
    
    Args:
        integration_names: List like ['gmail', 'notion', 'twitter']
        config: Runtime configuration
    
    Returns:
        Status message for each integration
    """
    user_id = config.get("configurable", {}).get("user_id")
    if not user_id:
        return "Error: User ID not found in configuration."
    
    # Ensure list format
    if isinstance(integration_names, str):
        integration_names = [integration_names]
    
    writer = get_stream_writer()
    results = []
    connections_to_initiate = []
    
    for integration_name in integration_names:
        # Find integration by name or ID
        integration = None
        search_name = integration_name.lower().strip()
        
        for integ in OAUTH_INTEGRATIONS:
            if (
                integ.id.lower() == search_name
                or integ.name.lower() == search_name
                or (integ.short_name and integ.short_name.lower() == search_name)
            ):
                integration = integ
                break
        
        if not integration:
            available = [i.name for i in OAUTH_INTEGRATIONS if i.available]
            results.append(
                f"❌ '{integration_name}' not found. "
                f"Available: {', '.join(available[:5])}..."
            )
            continue
        
        if not integration.available:
            results.append(f"⏳ {integration.name} is not available yet. Coming soon!")
            continue
        
        # Check if already connected
        is_connected = await check_single_integration_status(integration.id, user_id)
        if is_connected:
            results.append(f"✅ {integration.name} is already connected!")
            continue
        
        # Queue for connection
        connections_to_initiate.append(integration)
    
    # Initiate connections
    for integration in connections_to_initiate:
        writer({"progress": f"Initiating {integration.name} connection..."})
        
        integration_data = {
            "integration_id": integration.id,
            "message": f"To use {integration.name} features, please connect your account.",
        }
        
        writer({"integration_connection_required": integration_data})
        
        results.append(
            f"🔗 Connection initiated for {integration.name}. "
            f"Please follow the authentication flow."
        )
    
    return "\n".join(results) if results else "No integrations to connect."
Usage:
# Connect single integration
await connect_integration(["gmail"], config)

# Connect multiple integrations
await connect_integration(["gmail", "notion", "calendar"], config)

Check Integration Status

@tool
@with_doc(CHECK_INTEGRATIONS_STATUS)
async def check_integrations_status(
    integration_names: Annotated[
        List[str],
        "List of integration names or IDs to check"
    ],
    config: RunnableConfig,
) -> str:
    """
    Check connection status for multiple integrations.
    
    Args:
        integration_names: List like ['gmail', 'notion']
        config: Runtime configuration
    
    Returns:
        Status summary for each integration
    """
    user_id = config.get("configurable", {}).get("user_id")
    if not user_id:
        return "Error: User ID not found in configuration."
    
    results = []
    
    for integration_name in integration_names:
        search_name = integration_name.lower().strip()
        integration = None
        
        for integ in OAUTH_INTEGRATIONS:
            if (
                integ.id.lower() == search_name
                or integ.name.lower() == search_name
                or (integ.short_name and integ.short_name.lower() == search_name)
            ):
                integration = integ
                break
        
        if not integration:
            results.append(f"❓ {integration_name}: Not found")
            continue
        
        # Check status
        is_connected = await check_single_integration_status(integration.id, user_id)
        status = "✅ Connected" if is_connected else "⚪ Not Connected"
        results.append(f"{integration.name}: {status}")
    
    return "\n".join(results)

OAuth Configuration

# Location: apps/api/app/config/oauth_config.py
from pydantic import BaseModel

class OAuthIntegration(BaseModel):
    id: str
    name: str
    short_name: Optional[str]
    description: str
    category: str
    available: bool
    scopes: List[str]
    authorization_url: str
    token_url: str

OAUTH_INTEGRATIONS = [
    OAuthIntegration(
        id="gmail",
        name="Gmail",
        short_name="gmail",
        description="Email management and automation",
        category="communication",
        available=True,
        scopes=["https://www.googleapis.com/auth/gmail.modify"],
        authorization_url="https://accounts.google.com/o/oauth2/v2/auth",
        token_url="https://oauth2.googleapis.com/token",
    ),
    # ... more integrations
]

Integration Status Checking

# Location: apps/api/app/services/oauth/oauth_service.py
async def check_integration_status(
    integration_id: str,
    user_id: str,
) -> bool:
    """
    Check if user has connected an integration.
    
    Args:
        integration_id: Integration identifier
        user_id: User identifier
    
    Returns:
        True if connected and valid, False otherwise
    """
    # Check database for OAuth tokens
    user_integration = await user_integrations_collection.find_one({
        "user_id": user_id,
        "integration_id": integration_id,
    })
    
    if not user_integration:
        return False
    
    # Check token validity
    if user_integration.get("status") == "connected":
        # Verify token hasn't expired
        expires_at = user_integration.get("expires_at")
        if expires_at and datetime.now() < expires_at:
            return True
        
        # Try to refresh token
        refreshed = await refresh_oauth_token(user_integration)
        return refreshed
    
    return False

async def check_multiple_integrations_status(
    integration_ids: List[str],
    user_id: str,
) -> Dict[str, bool]:
    """
    Batch check integration status for efficiency.
    
    Returns:
        {"gmail": True, "notion": False, ...}
    """
    # Batch query
    cursor = user_integrations_collection.find({
        "user_id": user_id,
        "integration_id": {"$in": integration_ids},
    })
    
    status_map = {iid: False for iid in integration_ids}
    
    async for doc in cursor:
        integration_id = doc["integration_id"]
        is_connected = doc.get("status") == "connected"
        status_map[integration_id] = is_connected
    
    return status_map

Search Patterns

SEARCH_STOPWORDS = {
    "a", "an", "the", "to", "for", "with",
    "and", "or", "in", "on", "my"
}

def build_search_patterns(query: str) -> list[str]:
    """
    Extract individual words from query for flexible matching.
    
    Example:
        "Render deployment" -> ["render", "deployment"]
    
    This allows matching "Render" when query is "Render deployment"
    """
    words = re.split(r"[\s,;]+", query.lower())
    return [w for w in words if len(w) >= 2 and w not in SEARCH_STOPWORDS]

Frontend Integration Flow

Custom MCP Integrations

Users can add custom MCP (Model Context Protocol) servers:
# Custom integration stored in MongoDB
{
    "integration_id": "custom_api_123",
    "name": "My Custom API",
    "description": "Internal company API",
    "category": "custom",
    "source": "custom",
    "mcp_config": {
        "command": "node",
        "args": ["/path/to/server.js"],
        "env": {"API_KEY": "..."},
        "auth_type": "api_key"
    },
    "is_public": False,
    "created_by": "user_123"
}

Usage Examples

Discovering Integrations

# User: "What CRM tools are available?"
await suggest_integrations(
    query="CRM systems",
    config=config
)
# Returns: Salesforce, HubSpot, Pipedrive, etc.

Checking Before Tool Use

# Before using Gmail tool
is_connected = await check_integrations_status(["gmail"], config)

if "Not Connected" in is_connected:
    # Prompt user to connect
    await connect_integration(["gmail"], config)
else:
    # Proceed with Gmail operation
    await send_email(...)
Integration search uses flexible word-based matching with MongoDB regex, making it resilient to typos and variations in search queries.

Best Practices

1. Check Status Before Operations

# Good: Check first
status = await check_integrations_status(["notion"], config)
if "Connected" in status:
    await notion_operation()

# Avoid: Assume connection
await notion_operation()  # May fail if not connected

2. Batch Status Checks

# Good: Single batch check
status_map = await check_multiple_integrations_status(
    ["gmail", "calendar", "notion"],
    user_id
)

# Avoid: Multiple individual checks
for integration in ["gmail", "calendar", "notion"]:
    await check_integration_status(integration, user_id)
# Good: Specific search
await suggest_integrations("project management for software teams", config)

# Okay: Generic search
await suggest_integrations("project management", config)

Next Steps

Build docs developers (and LLMs) love