Skip to main content
EKG’s connector architecture allows you to add new data sources by implementing the BaseConnector interface. This guide shows how to create custom connectors for your infrastructure tools.

Connector Architecture

All connectors inherit from BaseConnector and implement a parse() method that returns nodes and edges.

BaseConnector Interface

The base connector provides helper methods and defines the interface:
from abc import ABC, abstractmethod
from typing import Dict, List, Any
from pydantic import BaseModel

class Node(BaseModel):
    """Represents a node in the knowledge graph."""
    id: str              # Unique identifier (format: "type:name")
    type: str            # Node type (service, database, cache, team)
    name: str            # Human-readable name
    properties: Dict[str, Any] = {}  # Additional properties

class Edge(BaseModel):
    """Represents an edge in the knowledge graph."""
    id: str              # Unique identifier
    type: str            # Edge type (calls, uses, depends_on, owns)
    source: str          # Source node ID
    target: str          # Target node ID
    properties: Dict[str, Any] = {}  # Additional properties

class BaseConnector(ABC):
    """Base class for all connectors."""
    
    def __init__(self, name: str):
        self.name = name
        self.logger = logging.getLogger(f"connector.{name}")
    
    @abstractmethod
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """Parse a configuration file and return nodes and edges."""
        pass

Helper Methods

The BaseConnector provides helper methods for common operations:

Creating Nodes

def _create_node(self, node_type: str, name: str, properties: Dict[str, Any] = None) -> Node:
    """Helper to create a node with consistent ID format."""
    node_id = f"{node_type}:{name}"
    return Node(
        id=node_id,
        type=node_type,
        name=name,
        properties=properties or {}
    )
Usage:
# Create a service node
service_node = self._create_node(
    'service', 
    'api-gateway',
    {'port': 8080, 'team': 'platform-team'}
)
# Result: Node(id='service:api-gateway', type='service', name='api-gateway', ...)

Creating Edges

def _create_edge(self, edge_type: str, source: str, target: str, properties: Dict[str, Any] = None) -> Edge:
    """Helper to create an edge with consistent ID format."""
    edge_id = f"edge:{source}-{edge_type}-{target}"
    return Edge(
        id=edge_id,
        type=edge_type,
        source=source,
        target=target,
        properties=properties or {}
    )
Usage:
# Create a dependency edge
edge = self._create_edge(
    'uses',
    'service:order-service',
    'database:orders-db'
)
# Result: Edge(id='edge:service:order-service-uses-database:orders-db', type='uses', ...)

Extracting Dependencies

def _extract_service_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]:
    """Extract service dependencies from environment variables."""
    dependencies = []
    
    for key, value in env_vars.items():
        if key.endswith('_URL') or key.endswith('_SERVICE_URL'):
            # Extract service name from URL like http://payment-service:8083
            if '://' in value:
                url_part = value.split('://')[1]
                if ':' in url_part:
                    service_name = url_part.split(':')[0]
                    dependencies.append(service_name)
    
    return dependencies

def _extract_database_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]:
    """Extract database dependencies from environment variables."""
    dependencies = []
    
    for key, value in env_vars.items():
        if key == 'DATABASE_URL':
            # Extract database name from URL like postgresql://user:pass@db-name:5432/dbname
            if '@' in value and ':' in value:
                parts = value.split('@')[1].split(':')
                if parts:
                    db_name = parts[0]
                    dependencies.append(db_name)
        elif key == 'REDIS_URL':
            # Extract redis name from URL like redis://redis-main:6379
            if '://' in value:
                url_part = value.split('://')[1]
                if ':' in url_part:
                    redis_name = url_part.split(':')[0]
                    dependencies.append(redis_name)
    
    return dependencies

Example: Docker Compose Connector

Let’s examine the built-in Docker Compose connector as a reference:
import yaml
from typing import Dict, List, Any
from .base import BaseConnector, Node, Edge

class DockerComposeConnector(BaseConnector):
    """Connector for parsing Docker Compose files."""
    
    def __init__(self):
        super().__init__("docker_compose")
    
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """Parse docker-compose.yml file."""
        self.logger.info(f"Parsing Docker Compose file: {file_path}")
        
        try:
            with open(file_path, 'r') as f:
                compose_data = yaml.safe_load(f)
        except Exception as e:
            self.logger.error(f"Failed to parse {file_path}: {e}")
            return [], []
        
        nodes = []
        edges = []
        
        services = compose_data.get('services', {})
        
        for service_name, service_config in services.items():
            # Create service node
            service_node = self._create_service_node(service_name, service_config)
            nodes.append(service_node)
            
            # Create edges for explicit dependencies
            depends_on = service_config.get('depends_on', [])
            for dependency in depends_on:
                edge = self._create_edge(
                    'depends_on',
                    service_node.id,
                    f"service:{dependency}"
                )
                edges.append(edge)
            
            # Create edges from environment variables
            env_vars = self._extract_env_vars(service_config)
            service_deps = self._extract_service_dependencies_from_env(env_vars)
            
            for dep_service in service_deps:
                edge = self._create_edge(
                    'calls',
                    service_node.id,
                    f"service:{dep_service}"
                )
                edges.append(edge)
        
        self.logger.info(f"Parsed {len(nodes)} nodes and {len(edges)} edges")
        return nodes, edges

Creating a Custom Connector

Let’s create a custom connector for Terraform configurations.
1

Create connector file

Create a new file in the connectors/ directory:
touch connectors/terraform.py
2

Implement the connector class

import json
from typing import Dict, List, Any
from .base import BaseConnector, Node, Edge

class TerraformConnector(BaseConnector):
    """Connector for parsing Terraform state files."""
    
    def __init__(self):
        super().__init__("terraform")
    
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """Parse terraform.tfstate file."""
        self.logger.info(f"Parsing Terraform state: {file_path}")
        
        try:
            with open(file_path, 'r') as f:
                tfstate = json.load(f)
        except Exception as e:
            self.logger.error(f"Failed to parse {file_path}: {e}")
            return [], []
        
        nodes = []
        edges = []
        
        # Parse resources from Terraform state
        resources = tfstate.get('resources', [])
        
        for resource in resources:
            resource_type = resource.get('type')
            resource_name = resource.get('name')
            instances = resource.get('instances', [])
            
            for instance in instances:
                attributes = instance.get('attributes', {})
                
                # Handle different resource types
                if resource_type == 'aws_instance':
                    node = self._parse_ec2_instance(resource_name, attributes)
                    nodes.append(node)
                
                elif resource_type == 'aws_rds_cluster':
                    node = self._parse_rds_cluster(resource_name, attributes)
                    nodes.append(node)
                
                elif resource_type == 'aws_elasticache_cluster':
                    node = self._parse_elasticache(resource_name, attributes)
                    nodes.append(node)
                
                # Extract dependencies from Terraform state
                dependencies = resource.get('dependencies', [])
                for dep in dependencies:
                    edge = self._create_edge(
                        'depends_on',
                        f"resource:{resource_name}",
                        f"resource:{dep}"
                    )
                    edges.append(edge)
        
        self.logger.info(f"Parsed {len(nodes)} nodes and {len(edges)} edges")
        return nodes, edges
    
    def _parse_ec2_instance(self, name: str, attrs: Dict) -> Node:
        """Parse EC2 instance resource."""
        return self._create_node(
            'service',
            name,
            {
                'instance_type': attrs.get('instance_type'),
                'availability_zone': attrs.get('availability_zone'),
                'private_ip': attrs.get('private_ip'),
                'tags': attrs.get('tags', {})
            }
        )
    
    def _parse_rds_cluster(self, name: str, attrs: Dict) -> Node:
        """Parse RDS cluster resource."""
        return self._create_node(
            'database',
            name,
            {
                'engine': attrs.get('engine'),
                'engine_version': attrs.get('engine_version'),
                'endpoint': attrs.get('endpoint'),
                'database_name': attrs.get('database_name')
            }
        )
    
    def _parse_elasticache(self, name: str, attrs: Dict) -> Node:
        """Parse ElastiCache cluster resource."""
        return self._create_node(
            'cache',
            name,
            {
                'engine': attrs.get('engine'),
                'node_type': attrs.get('node_type'),
                'num_cache_nodes': attrs.get('num_cache_nodes')
            }
        )
3

Register the connector

Add the connector to connectors/__init__.py:
from .base import BaseConnector, Node, Edge
from .docker_compose import DockerComposeConnector
from .teams import TeamsConnector
from .kubernetes import KubernetesConnector
from .terraform import TerraformConnector  # Add this line

__all__ = [
    'BaseConnector',
    'Node',
    'Edge',
    'DockerComposeConnector',
    'TeamsConnector',
    'KubernetesConnector',
    'TerraformConnector',  # Add this line
]
4

Use the connector

Add the connector to the data loading process in chat/app.py:
from connectors import (
    DockerComposeConnector, 
    TeamsConnector, 
    KubernetesConnector,
    TerraformConnector  # Add import
)

async def load_configuration_data():
    # ... existing code ...
    
    # Load Terraform data
    terraform_file = data_dir / "terraform.tfstate"
    if terraform_file.exists():
        connector = TerraformConnector()
        nodes, edges = connector.parse(str(terraform_file))
        all_nodes.extend(nodes)
        all_edges.extend(edges)
        logger.info(f"Loaded {len(nodes)} nodes and {len(edges)} edges from Terraform")
5

Test the connector

Create a test file:
import pytest
from connectors.terraform import TerraformConnector

def test_terraform_connector():
    connector = TerraformConnector()
    nodes, edges = connector.parse('data/terraform.tfstate')
    
    assert len(nodes) > 0
    assert len(edges) >= 0
    
    # Verify node format
    for node in nodes:
        assert node.id.startswith('service:') or \
               node.id.startswith('database:') or \
               node.id.startswith('cache:')
        assert node.type in ['service', 'database', 'cache']
        assert node.name
Run the test:
pytest tests/test_terraform_connector.py -v

Example: API Connector

Connectors don’t have to parse files—they can also fetch data from APIs:
import requests
from typing import Dict, List, Any
from .base import BaseConnector, Node, Edge
import os

class PagerDutyConnector(BaseConnector):
    """Connector for fetching data from PagerDuty API."""
    
    def __init__(self, api_token: str = None):
        super().__init__("pagerduty")
        self.api_token = api_token or os.getenv('PAGERDUTY_API_TOKEN')
        self.base_url = "https://api.pagerduty.com"
    
    def parse(self, file_path: str = None) -> tuple[List[Node], List[Edge]]:
        """Fetch data from PagerDuty API."""
        self.logger.info("Fetching data from PagerDuty API")
        
        nodes = []
        edges = []
        
        # Fetch services
        services = self._fetch_services()
        
        for service in services:
            service_id = service['id']
            service_name = service['name']
            
            # Create service node
            node = self._create_node(
                'service',
                service_name,
                {
                    'pagerduty_id': service_id,
                    'status': service.get('status'),
                    'escalation_policy': service.get('escalation_policy', {}).get('summary')
                }
            )
            nodes.append(node)
            
            # Get oncall for this service
            oncalls = self._fetch_oncalls(service_id)
            for oncall in oncalls:
                team_name = oncall.get('escalation_policy', {}).get('summary')
                if team_name:
                    edge = self._create_edge(
                        'oncall_for',
                        f"team:{team_name}",
                        f"service:{service_name}"
                    )
                    edges.append(edge)
        
        self.logger.info(f"Fetched {len(nodes)} nodes and {len(edges)} edges")
        return nodes, edges
    
    def _fetch_services(self) -> List[Dict]:
        """Fetch services from PagerDuty."""
        headers = {
            'Authorization': f'Token token={self.api_token}',
            'Accept': 'application/vnd.pagerduty+json;version=2'
        }
        
        response = requests.get(
            f"{self.base_url}/services",
            headers=headers
        )
        response.raise_for_status()
        
        return response.json().get('services', [])
    
    def _fetch_oncalls(self, service_id: str) -> List[Dict]:
        """Fetch oncall schedules for a service."""
        headers = {
            'Authorization': f'Token token={self.api_token}',
            'Accept': 'application/vnd.pagerduty+json;version=2'
        }
        
        response = requests.get(
            f"{self.base_url}/oncalls",
            headers=headers,
            params={'service_ids[]': service_id}
        )
        response.raise_for_status()
        
        return response.json().get('oncalls', [])

Best Practices

1

Use consistent node IDs

Always use the format type:name for node IDs:
# Good
node_id = f"service:{service_name}"

# Bad
node_id = service_name
2

Handle errors gracefully

Return empty lists on parse errors instead of raising exceptions:
try:
    data = parse_file(file_path)
except Exception as e:
    self.logger.error(f"Parse error: {e}")
    return [], []  # Empty nodes and edges
3

Log parsing progress

Use the built-in logger to track progress:
self.logger.info(f"Parsing {file_path}")
self.logger.debug(f"Found {len(services)} services")
self.logger.warning(f"Skipping invalid service: {name}")
4

Extract properties

Include relevant metadata as node properties:
properties = {
    'team': labels.get('team'),
    'oncall': labels.get('oncall'),
    'port': port,
    'environment': 'production'
}
# Filter out None values
properties = {k: v for k, v in properties.items() if v is not None}
5

Write tests

Test connectors with sample data:
def test_connector_with_sample_file():
    connector = MyConnector()
    nodes, edges = connector.parse('tests/fixtures/sample.yaml')
    
    assert len(nodes) == 5
    assert nodes[0].id == 'service:my-service'
    assert edges[0].type == 'depends_on'

Node and Edge Types

Standard Node Types

  • service: Application services
  • database: Databases (PostgreSQL, MySQL, etc.)
  • cache: Cache systems (Redis, Memcached)
  • team: Engineering teams
  • resource: Generic infrastructure resources

Standard Edge Types

  • calls: Service-to-service calls (REST, gRPC)
  • uses: Service uses database/cache
  • depends_on: Explicit dependency
  • owns: Team owns service/resource
  • deploys: CI/CD relationship
You can define custom node and edge types for your domain. The query engine treats all types uniformly.

Registering Multiple File Formats

A single connector can handle multiple file formats:
class InfrastructureConnector(BaseConnector):
    """Connector that handles multiple formats."""
    
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """Parse file based on extension."""
        if file_path.endswith('.json'):
            return self._parse_json(file_path)
        elif file_path.endswith('.yaml') or file_path.endswith('.yml'):
            return self._parse_yaml(file_path)
        elif file_path.endswith('.toml'):
            return self._parse_toml(file_path)
        else:
            self.logger.error(f"Unsupported file format: {file_path}")
            return [], []

Next Steps

Build docs developers (and LLMs) love