Skip to main content
Aurora’s connector architecture allows you to integrate with any observability platform or cloud provider. This guide shows you how to build custom connectors.

Connector Architecture

Connectors are organized under server/connectors/ with this structure:
server/connectors/
├── your_connector/
│   ├── __init__.py          # Connector module
│   ├── auth.py              # Authentication logic
│   ├── client.py            # API client
│   └── routes.py            # Flask routes (optional)

Types of Connectors

1. Cloud Provider Connectors

Provide infrastructure access for incident investigation. Examples:
  • gcp_connector - Google Cloud Platform
  • aws_connector - Amazon Web Services
  • azure_connector - Microsoft Azure
  • ovh_connector - OVH Cloud
  • scaleway_connector - Scaleway

2. Observability Connectors

Receive alerts and provide monitoring data. Examples:
  • datadog_connector - Datadog
  • grafana_connector - Grafana
  • dynatrace_connector - Dynatrace
  • splunk_connector - Splunk

3. Collaboration Connectors

Integrate with communication and documentation tools. Examples:
  • slack_connector - Slack
  • pagerduty_connector - PagerDuty
  • confluence_connector - Confluence

4. Code Repository Connectors

Access code and create automated fixes. Examples:
  • github_connector - GitHub
  • bitbucket_connector - Bitbucket

Building a Cloud Provider Connector

Let’s build a connector for a fictional cloud provider “CloudX”.
1

Create the connector module

mkdir -p server/connectors/cloudx_connector
touch server/connectors/cloudx_connector/__init__.py
touch server/connectors/cloudx_connector/auth.py
touch server/connectors/cloudx_connector/client.py
2

Implement authentication

Create server/connectors/cloudx_connector/auth.py:
import os
from typing import Optional
import requests
from utils.auth.token_management import get_token_data, store_token_data

CLOUDX_API_BASE = "https://api.cloudx.com"

def get_cloudx_credentials(user_id: str) -> Optional[dict]:
    """Fetch CloudX credentials from Vault."""
    creds = get_token_data(user_id, "cloudx")
    if not creds:
        return None
    return {
        "api_key": creds.get("api_key"),
        "api_secret": creds.get("api_secret"),
        "region": creds.get("region", "us-west-1")
    }

def validate_cloudx_credentials(api_key: str, api_secret: str) -> bool:
    """Validate CloudX API credentials."""
    try:
        response = requests.get(
            f"{CLOUDX_API_BASE}/v1/account",
            headers={"X-API-Key": api_key, "X-API-Secret": api_secret},
            timeout=10
        )
        return response.status_code == 200
    except Exception:
        return False

def store_cloudx_credentials(user_id: str, api_key: str, api_secret: str, region: str = "us-west-1"):
    """Store CloudX credentials in Vault."""
    if not validate_cloudx_credentials(api_key, api_secret):
        raise ValueError("Invalid CloudX credentials")
    
    store_token_data(user_id, "cloudx", {
        "api_key": api_key,
        "api_secret": api_secret,
        "region": region
    })
3

Create the API client

Create server/connectors/cloudx_connector/client.py:
import logging
from typing import List, Dict, Optional
import requests
from .auth import get_cloudx_credentials, CLOUDX_API_BASE

logger = logging.getLogger(__name__)

class CloudXClient:
    """API client for CloudX."""
    
    def __init__(self, user_id: str):
        self.user_id = user_id
        self.credentials = get_cloudx_credentials(user_id)
        if not self.credentials:
            raise ValueError(f"No CloudX credentials for user {user_id}")
        
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": self.credentials["api_key"],
            "X-API-Secret": self.credentials["api_secret"],
            "Content-Type": "application/json"
        })
        self.region = self.credentials["region"]
    
    def list_instances(self) -> List[Dict]:
        """List compute instances."""
        try:
            response = self.session.get(
                f"{CLOUDX_API_BASE}/v1/compute/instances",
                params={"region": self.region},
                timeout=30
            )
            response.raise_for_status()
            return response.json().get("instances", [])
        except Exception as e:
            logger.error(f"Failed to list CloudX instances: {e}")
            return []
    
    def get_instance_logs(self, instance_id: str, limit: int = 100) -> List[str]:
        """Fetch logs for an instance."""
        try:
            response = self.session.get(
                f"{CLOUDX_API_BASE}/v1/compute/instances/{instance_id}/logs",
                params={"limit": limit},
                timeout=30
            )
            response.raise_for_status()
            return response.json().get("logs", [])
        except Exception as e:
            logger.error(f"Failed to fetch logs for instance {instance_id}: {e}")
            return []
    
    def get_metrics(self, instance_id: str, metric_name: str, start_time: str, end_time: str) -> Dict:
        """Fetch metrics for an instance."""
        try:
            response = self.session.get(
                f"{CLOUDX_API_BASE}/v1/monitoring/metrics",
                params={
                    "instance_id": instance_id,
                    "metric": metric_name,
                    "start": start_time,
                    "end": end_time
                },
                timeout=30
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            logger.error(f"Failed to fetch metrics: {e}")
            return {}
4

Add Flask routes (optional)

Create server/connectors/cloudx_connector/routes.py for OAuth or webhook endpoints:
import logging
from flask import Blueprint, request, jsonify
from utils.auth.stateless_auth import get_user_id_from_request
from .auth import store_cloudx_credentials
from .client import CloudXClient

logger = logging.getLogger(__name__)
cloudx_bp = Blueprint("cloudx", __name__)

@cloudx_bp.route("/api/cloudx/connect", methods=["POST"])
def connect_cloudx():
    """Connect a CloudX account."""
    user_id = get_user_id_from_request()
    if not user_id:
        return jsonify({"error": "Missing user_id"}), 400
    
    data = request.get_json()
    api_key = data.get("api_key")
    api_secret = data.get("api_secret")
    region = data.get("region", "us-west-1")
    
    if not all([api_key, api_secret]):
        return jsonify({"error": "Missing api_key or api_secret"}), 400
    
    try:
        store_cloudx_credentials(user_id, api_key, api_secret, region)
        return jsonify({"success": True, "message": "CloudX connected"}), 200
    except ValueError as e:
        return jsonify({"error": str(e)}), 400
    except Exception as e:
        logger.exception("Failed to connect CloudX account")
        return jsonify({"error": "Internal error"}), 500

@cloudx_bp.route("/api/cloudx/instances", methods=["GET"])
def list_instances():
    """List CloudX instances."""
    user_id = get_user_id_from_request()
    if not user_id:
        return jsonify({"error": "Missing user_id"}), 400
    
    try:
        client = CloudXClient(user_id)
        instances = client.list_instances()
        return jsonify({"instances": instances}), 200
    except ValueError as e:
        return jsonify({"error": str(e)}), 400
    except Exception as e:
        logger.exception("Failed to list CloudX instances")
        return jsonify({"error": "Internal error"}), 500
5

Register the connector

Add to server/main_compute.py:
from connectors.cloudx_connector.routes import cloudx_bp
app.register_blueprint(cloudx_bp)
6

Create LangGraph tools

Make CloudX accessible to the RCA agent. Create server/chat/backend/agent/tools/cloudx_tools.py:
from langchain.tools import tool
from connectors.cloudx_connector.client import CloudXClient

@tool
def list_cloudx_instances(user_id: str) -> str:
    """List all CloudX compute instances for investigation.
    
    Args:
        user_id: User ID for credential lookup
    
    Returns:
        JSON string of instances with IDs, names, status, and IPs
    """
    try:
        client = CloudXClient(user_id)
        instances = client.list_instances()
        return json.dumps(instances, indent=2)
    except Exception as e:
        return f"Error listing instances: {str(e)}"

@tool
def get_cloudx_instance_logs(user_id: str, instance_id: str, limit: int = 100) -> str:
    """Fetch recent logs from a CloudX instance.
    
    Args:
        user_id: User ID for credential lookup
        instance_id: CloudX instance ID
        limit: Maximum number of log lines to return
    
    Returns:
        Log lines as a string
    """
    try:
        client = CloudXClient(user_id)
        logs = client.get_instance_logs(instance_id, limit)
        return "\n".join(logs)
    except Exception as e:
        return f"Error fetching logs: {str(e)}"
Register tools in server/chat/backend/agent/tools/__init__.py:
from .cloudx_tools import list_cloudx_instances, get_cloudx_instance_logs

AVAILABLE_TOOLS = [
    # ... existing tools ...
    list_cloudx_instances,
    get_cloudx_instance_logs,
]

Building an Observability Connector

Let’s build a connector for a fictional monitoring tool “MetricsX”.
1

Create the webhook endpoint

Create server/connectors/metricsx_connector/routes.py:
import logging
from datetime import datetime
from flask import Blueprint, request, jsonify
from utils.db.connection_pool import db_pool
from chat.background.task import run_background_chat

logger = logging.getLogger(__name__)
metricsx_bp = Blueprint("metricsx", __name__)

@metricsx_bp.route("/api/metricsx/webhook", methods=["POST"])
def metricsx_webhook():
    """Receive alerts from MetricsX."""
    try:
        payload = request.get_json()
        
        # Extract alert details
        alert_id = payload.get("alert_id")
        alert_title = payload.get("alert_name", "Unknown Alert")
        severity = payload.get("severity", "medium").lower()
        service = payload.get("service", "unknown")
        environment = payload.get("environment", "production")
        
        # Map severity
        severity_map = {"critical": "high", "warning": "medium", "info": "low"}
        severity = severity_map.get(severity, "medium")
        
        # Store raw payload in database
        with db_pool.get_admin_connection() as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    """INSERT INTO metricsx_alerts (alert_id, payload, received_at)
                       VALUES (%s, %s, %s)
                       RETURNING id""",
                    (alert_id, payload, datetime.utcnow())
                )
                row = cursor.fetchone()
                db_alert_id = row[0]
                conn.commit()
        
        # Create incident
        with db_pool.get_admin_connection() as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    """INSERT INTO incidents (
                           user_id, source_type, source_alert_id, status, severity,
                           alert_title, alert_service, alert_environment, started_at
                       ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
                       RETURNING id""",
                    (
                        "system",  # Or extract from payload
                        "metricsx",
                        str(db_alert_id),
                        "investigating",
                        severity,
                        alert_title,
                        service,
                        environment,
                        datetime.utcnow()
                    )
                )
                incident_row = cursor.fetchone()
                incident_id = str(incident_row[0])
                conn.commit()
        
        # Trigger RCA investigation
        run_background_chat.delay(
            user_id="system",
            incident_id=incident_id,
            trigger_metadata={"source": "metricsx", "alert_id": alert_id}
        )
        
        logger.info(f"Created incident {incident_id} from MetricsX alert {alert_id}")
        return jsonify({"success": True, "incident_id": incident_id}), 200
        
    except Exception as e:
        logger.exception("Failed to process MetricsX webhook")
        return jsonify({"error": "Internal error"}), 500
2

Create database table

Add a migration for the MetricsX alerts table:
CREATE TABLE metricsx_alerts (
    id SERIAL PRIMARY KEY,
    alert_id VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    received_at TIMESTAMP NOT NULL DEFAULT NOW(),
    UNIQUE(alert_id)
);

CREATE INDEX idx_metricsx_alerts_alert_id ON metricsx_alerts(alert_id);
CREATE INDEX idx_metricsx_alerts_received_at ON metricsx_alerts(received_at DESC);
3

Register the blueprint

In server/main_compute.py:
from connectors.metricsx_connector.routes import metricsx_bp
app.register_blueprint(metricsx_bp)

Credential Storage with Vault

All connectors should use Vault for secrets:
from utils.auth.token_management import store_token_data, get_token_data

# Store credentials
store_token_data(user_id, "your_provider", {
    "api_key": "...",
    "api_secret": "...",
    "additional_field": "..."
})

# Retrieve credentials
creds = get_token_data(user_id, "your_provider")
if creds:
    api_key = creds.get("api_key")
Vault paths follow this pattern:
vault:kv/data/aurora/users/{user_id}/{provider}

Testing Your Connector

1

Unit tests

Create server/tests/connectors/test_cloudx_connector.py:
import pytest
from connectors.cloudx_connector.client import CloudXClient
from connectors.cloudx_connector.auth import validate_cloudx_credentials

def test_validate_credentials():
    # Mock API response
    assert validate_cloudx_credentials("valid_key", "valid_secret") == True
    assert validate_cloudx_credentials("invalid", "invalid") == False

def test_list_instances():
    client = CloudXClient("test_user_id")
    instances = client.list_instances()
    assert isinstance(instances, list)
2

Integration tests

Test the full flow:
# Start Aurora
make dev

# Connect a test account
curl -X POST http://localhost:5080/api/cloudx/connect \
  -H "Content-Type: application/json" \
  -d '{"api_key": "test", "api_secret": "test", "region": "us-west-1"}'

# List instances
curl http://localhost:5080/api/cloudx/instances
3

Test with RCA agent

Create a test incident and verify the LangGraph agent can use your tools:
  1. Create an incident manually or via webhook
  2. Watch Celery worker logs for tool invocations
  3. Verify tool output appears in citations

Best Practices

Always handle API errors gracefully:
try:
    response = self.session.get(url, timeout=30)
    response.raise_for_status()
    return response.json()
except requests.Timeout:
    logger.error("API timeout")
    return {}
except requests.HTTPError as e:
    logger.error(f"HTTP error: {e.response.status_code}")
    return {}
except Exception as e:
    logger.exception("Unexpected error")
    return {}
Respect API rate limits:
import time
from functools import wraps

def rate_limit(calls: int, period: int):
    """Rate limit decorator."""
    def decorator(func):
        last_called = [0.0]
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            if elapsed < period / calls:
                time.sleep((period / calls) - elapsed)
            last_called[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limit(calls=10, period=1)  # 10 calls per second
def api_call(self):
    pass
Use Redis to cache expensive API calls:
from utils.cache.redis_client import get_redis_client
import json

def get_instances_cached(self, user_id: str, ttl: int = 300):
    """Get instances with 5-minute cache."""
    redis = get_redis_client()
    cache_key = f"cloudx:instances:{user_id}"
    
    cached = redis.get(cache_key)
    if cached:
        return json.loads(cached)
    
    instances = self.list_instances()
    redis.setex(cache_key, ttl, json.dumps(instances))
    return instances
Use structured logging for debugging:
logger.info(
    "Fetched CloudX instances",
    extra={
        "user_id": user_id,
        "region": self.region,
        "instance_count": len(instances)
    }
)

Example Connectors

Study these existing connectors for reference:
  • GCP Connector: server/connectors/gcp_connector/ - OAuth2 flow, API client patterns
  • Slack Connector: server/connectors/slack_connector/ - OAuth, webhook handling
  • Datadog Connector: Event ingestion, alert correlation
  • Confluence Connector: server/connectors/confluence_connector/ - Search service, API client

Next Steps

First Investigation

Test your connector with a real incident

Backup & Restore

Ensure your connector data is backed up

Build docs developers (and LLMs) love