Skip to main content

Overview

Connectors are the data ingestion layer of the Engineering Knowledge Graph. They parse configuration files from various sources (Docker Compose, Kubernetes, Teams) and transform them into nodes and edges that populate the knowledge graph.

Connector Architecture

BaseConnector Interface

All connectors inherit from BaseConnector (connectors/base.py:30-115), which defines the standard interface:
connectors/base.py
class BaseConnector(ABC):
    """Base class for all connectors."""
    
    def __init__(self, name: str):
        self.name = name
        self.logger = logging.getLogger(f"connector.{name}")
    
    @abstractmethod
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """
        Parse a configuration file and return nodes and edges.
        
        Args:
            file_path: Path to the configuration file
            
        Returns:
            Tuple of (nodes, edges)
        """
        pass
The parse method is the only required implementation for custom connectors. Everything else is optional helper functionality.

Helper Methods

The base connector provides utility methods for common operations:
Creates a node with consistent ID formatting:
connectors/base.py
def _create_node(self, node_type: str, name: str, 
                 properties: Dict[str, Any] = None) -> Node:
    """Helper to create a node with consistent ID format."""
    node_id = f"{node_type}:{name}"
    return Node(
        id=node_id,
        type=node_type,
        name=name,
        properties=properties or {}
    )
Creates an edge with consistent ID formatting:
connectors/base.py
def _create_edge(self, edge_type: str, source: str, target: str,
                 properties: Dict[str, Any] = None) -> Edge:
    """Helper to create an edge with consistent ID format."""
    edge_id = f"edge:{source}-{edge_type}-{target}"
    return Edge(
        id=edge_id,
        type=edge_type,
        source=source,
        target=target,
        properties=properties or {}
    )
Extracts service dependencies from environment variables:
connectors/base.py
def _extract_service_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]:
    """Extract service dependencies from environment variables."""
    dependencies = []
    
    for key, value in env_vars.items():
        if key.endswith('_URL') or key.endswith('_SERVICE_URL'):
            # Extract service name from URL like http://payment-service:8083
            if '://' in value:
                url_part = value.split('://')[1]
                if ':' in url_part:
                    service_name = url_part.split(':')[0]
                    dependencies.append(service_name)
    
    return dependencies
Extracts database/cache dependencies from environment variables:
connectors/base.py
def _extract_database_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]:
    """Extract database dependencies from environment variables."""
    dependencies = []
    
    for key, value in env_vars.items():
        if key == 'DATABASE_URL':
            # Extract database name from URL
            if '@' in value and ':' in value:
                parts = value.split('@')[1].split(':')
                if parts:
                    db_name = parts[0]
                    dependencies.append(db_name)
        elif key == 'REDIS_URL':
            # Extract redis name from URL
            if '://' in value:
                url_part = value.split('://')[1]
                if ':' in url_part:
                    redis_name = url_part.split(':')[0]
                    dependencies.append(redis_name)
    
    return dependencies

Built-in Connectors

DockerComposeConnector

Parses docker-compose.yml files to extract services, databases, and their relationships. File: connectors/docker_compose.py:10
connectors/docker_compose.py
class DockerComposeConnector(BaseConnector):
    """Connector for parsing Docker Compose files."""
    
    def __init__(self):
        super().__init__("docker_compose")
    
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """Parse docker-compose.yml file."""
        with open(file_path, 'r') as f:
            compose_data = yaml.safe_load(f)
        
        nodes = []
        edges = []
        
        services = compose_data.get('services', {})
        
        for service_name, service_config in services.items():
            # Create service node
            service_node = self._create_service_node(service_name, service_config)
            nodes.append(service_node)
            
            # Create edges for explicit dependencies
            depends_on = service_config.get('depends_on', [])
            for dependency in depends_on:
                edge = self._create_edge(
                    'depends_on',
                    service_node.id,
                    f"service:{dependency}"
                )
                edges.append(edge)
The connector creates service nodes with properties extracted from the Docker Compose configuration:
connectors/docker_compose.py
def _create_service_node(self, service_name: str, service_config: Dict[str, Any]) -> Node:
    labels = service_config.get('labels', {})
    ports = service_config.get('ports', [])
    
    # Extract port number if available
    port = None
    if ports:
        port_mapping = ports[0]
        if isinstance(port_mapping, str) and ':' in port_mapping:
            port = int(port_mapping.split(':')[0])
    
    properties = {
        'team': labels.get('team'),
        'oncall': labels.get('oncall'),
        'port': port,
        'image': service_config.get('image'),
        'build': service_config.get('build')
    }
    
    return self._create_node(service_type, service_name, properties)

TeamsConnector

Parses teams.yaml files to create team nodes and ownership relationships. File: connectors/teams.py:10
connectors/teams.py
class TeamsConnector(BaseConnector):
    """Connector for parsing teams configuration files."""
    
    def __init__(self):
        super().__init__("teams")
    
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """Parse teams.yaml file."""
        with open(file_path, 'r') as f:
            teams_data = yaml.safe_load(f)
        
        nodes = []
        edges = []
        
        teams = teams_data.get('teams', [])
        
        for team_config in teams:
            team_name = team_config.get('name')
            
            # Create team node
            team_node = self._create_team_node(team_config)
            nodes.append(team_node)
            
            # Create ownership edges
            owned_services = team_config.get('owns', [])
            for service_name in owned_services:
                service_type = self._infer_service_type(service_name)
                
                edge = self._create_edge(
                    'owns',
                    team_node.id,
                    f"{service_type}:{service_name}"
                )
                edges.append(edge)
Team Node Properties:
connectors/teams.py
properties = {
    'lead': team_config.get('lead'),
    'slack_channel': team_config.get('slack_channel'),
    'pagerduty_schedule': team_config.get('pagerduty_schedule')
}

KubernetesConnector

Parses Kubernetes deployment YAML files to extract deployments and services. File: connectors/kubernetes.py:10
connectors/kubernetes.py
class KubernetesConnector(BaseConnector):
    """Connector for parsing Kubernetes deployment files."""
    
    def __init__(self):
        super().__init__("kubernetes")
    
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """Parse k8s-deployments.yaml file."""
        with open(file_path, 'r') as f:
            # Parse multiple YAML documents
            k8s_docs = list(yaml.safe_load_all(f))
        
        nodes = []
        edges = []
        deployments = {}
        services = {}
        
        # First pass: collect all resources
        for doc in k8s_docs:
            if not doc or 'kind' not in doc:
                continue
            
            kind = doc['kind']
            metadata = doc.get('metadata', {})
            name = metadata.get('name')
            
            if kind == 'Deployment':
                deployments[name] = doc
            elif kind == 'Service':
                services[name] = doc
Kubernetes deployments are converted to nodes with container and replica information:
connectors/kubernetes.py
def _create_deployment_node(self, deployment: Dict[str, Any]) -> Node:
    metadata = deployment.get('metadata', {})
    spec = deployment.get('spec', {})
    replicas = spec.get('replicas', 1)
    
    template = spec.get('template', {})
    pod_spec = template.get('spec', {})
    containers = pod_spec.get('containers', [])
    
    container_info = {}
    if containers:
        container = containers[0]
        container_info = {
            'image': container.get('image'),
            'resources': container.get('resources', {})
        }
    
    properties = {
        'namespace': metadata.get('namespace'),
        'team': labels.get('team'),
        'replicas': replicas,
        **container_info
    }

Connector Pluggability

The connector architecture is designed for extensibility. You can create custom connectors for any configuration source.

Creating a Custom Connector

1

Create Connector Class

Inherit from BaseConnector and implement the parse method:
from connectors.base import BaseConnector, Node, Edge
from typing import List, Dict, Any

class CustomConnector(BaseConnector):
    """Connector for custom configuration format."""
    
    def __init__(self):
        super().__init__("custom")
    
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """Parse custom configuration file."""
        nodes = []
        edges = []
        
        # Your parsing logic here
        
        return nodes, edges
2

Use Helper Methods

Leverage base class helpers for consistent node/edge creation:
def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
    nodes = []
    edges = []
    
    # Create a service node
    service_node = self._create_node(
        'service',
        'my-service',
        {'team': 'platform', 'port': 8080}
    )
    nodes.append(service_node)
    
    # Create an ownership edge
    ownership_edge = self._create_edge(
        'owns',
        'team:platform',
        service_node.id
    )
    edges.append(ownership_edge)
    
    return nodes, edges
3

Register Connector

Add your connector to the initialization flow in main.py:
from connectors import CustomConnector

# Load custom configuration
custom_file = data_dir / "custom-config.yaml"
if custom_file.exists():
    connector = CustomConnector()
    nodes, edges = connector.parse(str(custom_file))
    all_nodes.extend(nodes)
    all_edges.extend(edges)

Example: Terraform Connector

Here’s a conceptual example of a Terraform state connector:
class TerraformConnector(BaseConnector):
    """Connector for Terraform state files."""
    
    def __init__(self):
        super().__init__("terraform")
    
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        with open(file_path, 'r') as f:
            tf_state = json.load(f)
        
        nodes = []
        edges = []
        
        # Extract resources
        for resource in tf_state.get('resources', []):
            resource_type = resource.get('type')
            resource_name = resource.get('name')
            
            if resource_type == 'aws_rds_cluster':
                # Create database node
                node = self._create_node(
                    'database',
                    resource_name,
                    {
                        'engine': resource['values'].get('engine'),
                        'provider': 'aws'
                    }
                )
                nodes.append(node)
        
        return nodes, edges

Connector Execution

Connectors are executed during system initialization (main.py:99-129):
main.py
all_nodes = []
all_edges = []

# Load Docker Compose
connector = DockerComposeConnector()
nodes, edges = connector.parse(str(docker_compose_file))
all_nodes.extend(nodes)
all_edges.extend(edges)

# Load Teams
connector = TeamsConnector()
nodes, edges = connector.parse(str(teams_file))
all_nodes.extend(nodes)
all_edges.extend(edges)

# Load Kubernetes (optional)
connector = KubernetesConnector()
nodes, edges = connector.parse(str(k8s_file))
all_nodes.extend(nodes)
all_edges.extend(edges)

# Populate graph
storage.clear_graph()
storage.add_nodes(all_nodes)
storage.add_edges(all_edges)

Best Practices

Idempotent Parsing

Connectors should produce the same output for the same input. Use consistent ID generation.

Error Handling

Handle malformed files gracefully. Log warnings but continue processing valid entries.

Property Filtering

Filter out None values from properties to keep the graph clean.

Type Inference

Use labels, names, and other hints to infer node types when explicit type information is unavailable.
Connectors should not modify the graph directly. They only parse and return data structures that are then loaded by the storage layer.

Next Steps

Knowledge Graph

Understand the graph data model and storage layer.

Query Engine

Learn how to query and traverse the populated graph.

Build docs developers (and LLMs) love