BaseConnector interface. This guide shows how to create custom connectors for your infrastructure tools.
Connector Architecture
All connectors inherit fromBaseConnector and implement a parse() method that returns nodes and edges.
BaseConnector Interface
The base connector provides helper methods and defines the interface:from abc import ABC, abstractmethod
from typing import Dict, List, Any
from pydantic import BaseModel
class Node(BaseModel):
"""Represents a node in the knowledge graph."""
id: str # Unique identifier (format: "type:name")
type: str # Node type (service, database, cache, team)
name: str # Human-readable name
properties: Dict[str, Any] = {} # Additional properties
class Edge(BaseModel):
"""Represents an edge in the knowledge graph."""
id: str # Unique identifier
type: str # Edge type (calls, uses, depends_on, owns)
source: str # Source node ID
target: str # Target node ID
properties: Dict[str, Any] = {} # Additional properties
class BaseConnector(ABC):
"""Base class for all connectors."""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"connector.{name}")
@abstractmethod
def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
"""Parse a configuration file and return nodes and edges."""
pass
Helper Methods
TheBaseConnector provides helper methods for common operations:
Creating Nodes
def _create_node(self, node_type: str, name: str, properties: Dict[str, Any] = None) -> Node:
"""Helper to create a node with consistent ID format."""
node_id = f"{node_type}:{name}"
return Node(
id=node_id,
type=node_type,
name=name,
properties=properties or {}
)
# Create a service node
service_node = self._create_node(
'service',
'api-gateway',
{'port': 8080, 'team': 'platform-team'}
)
# Result: Node(id='service:api-gateway', type='service', name='api-gateway', ...)
Creating Edges
def _create_edge(self, edge_type: str, source: str, target: str, properties: Dict[str, Any] = None) -> Edge:
"""Helper to create an edge with consistent ID format."""
edge_id = f"edge:{source}-{edge_type}-{target}"
return Edge(
id=edge_id,
type=edge_type,
source=source,
target=target,
properties=properties or {}
)
# Create a dependency edge
edge = self._create_edge(
'uses',
'service:order-service',
'database:orders-db'
)
# Result: Edge(id='edge:service:order-service-uses-database:orders-db', type='uses', ...)
Extracting Dependencies
def _extract_service_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]:
"""Extract service dependencies from environment variables."""
dependencies = []
for key, value in env_vars.items():
if key.endswith('_URL') or key.endswith('_SERVICE_URL'):
# Extract service name from URL like http://payment-service:8083
if '://' in value:
url_part = value.split('://')[1]
if ':' in url_part:
service_name = url_part.split(':')[0]
dependencies.append(service_name)
return dependencies
def _extract_database_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]:
"""Extract database dependencies from environment variables."""
dependencies = []
for key, value in env_vars.items():
if key == 'DATABASE_URL':
# Extract database name from URL like postgresql://user:pass@db-name:5432/dbname
if '@' in value and ':' in value:
parts = value.split('@')[1].split(':')
if parts:
db_name = parts[0]
dependencies.append(db_name)
elif key == 'REDIS_URL':
# Extract redis name from URL like redis://redis-main:6379
if '://' in value:
url_part = value.split('://')[1]
if ':' in url_part:
redis_name = url_part.split(':')[0]
dependencies.append(redis_name)
return dependencies
Example: Docker Compose Connector
Let’s examine the built-in Docker Compose connector as a reference:import yaml
from typing import Dict, List, Any
from .base import BaseConnector, Node, Edge
class DockerComposeConnector(BaseConnector):
"""Connector for parsing Docker Compose files."""
def __init__(self):
super().__init__("docker_compose")
def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
"""Parse docker-compose.yml file."""
self.logger.info(f"Parsing Docker Compose file: {file_path}")
try:
with open(file_path, 'r') as f:
compose_data = yaml.safe_load(f)
except Exception as e:
self.logger.error(f"Failed to parse {file_path}: {e}")
return [], []
nodes = []
edges = []
services = compose_data.get('services', {})
for service_name, service_config in services.items():
# Create service node
service_node = self._create_service_node(service_name, service_config)
nodes.append(service_node)
# Create edges for explicit dependencies
depends_on = service_config.get('depends_on', [])
for dependency in depends_on:
edge = self._create_edge(
'depends_on',
service_node.id,
f"service:{dependency}"
)
edges.append(edge)
# Create edges from environment variables
env_vars = self._extract_env_vars(service_config)
service_deps = self._extract_service_dependencies_from_env(env_vars)
for dep_service in service_deps:
edge = self._create_edge(
'calls',
service_node.id,
f"service:{dep_service}"
)
edges.append(edge)
self.logger.info(f"Parsed {len(nodes)} nodes and {len(edges)} edges")
return nodes, edges
Creating a Custom Connector
Let’s create a custom connector for Terraform configurations.Implement the connector class
import json
from typing import Dict, List, Any
from .base import BaseConnector, Node, Edge
class TerraformConnector(BaseConnector):
"""Connector for parsing Terraform state files."""
def __init__(self):
super().__init__("terraform")
def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
"""Parse terraform.tfstate file."""
self.logger.info(f"Parsing Terraform state: {file_path}")
try:
with open(file_path, 'r') as f:
tfstate = json.load(f)
except Exception as e:
self.logger.error(f"Failed to parse {file_path}: {e}")
return [], []
nodes = []
edges = []
# Parse resources from Terraform state
resources = tfstate.get('resources', [])
for resource in resources:
resource_type = resource.get('type')
resource_name = resource.get('name')
instances = resource.get('instances', [])
for instance in instances:
attributes = instance.get('attributes', {})
# Handle different resource types
if resource_type == 'aws_instance':
node = self._parse_ec2_instance(resource_name, attributes)
nodes.append(node)
elif resource_type == 'aws_rds_cluster':
node = self._parse_rds_cluster(resource_name, attributes)
nodes.append(node)
elif resource_type == 'aws_elasticache_cluster':
node = self._parse_elasticache(resource_name, attributes)
nodes.append(node)
# Extract dependencies from Terraform state
dependencies = resource.get('dependencies', [])
for dep in dependencies:
edge = self._create_edge(
'depends_on',
f"resource:{resource_name}",
f"resource:{dep}"
)
edges.append(edge)
self.logger.info(f"Parsed {len(nodes)} nodes and {len(edges)} edges")
return nodes, edges
def _parse_ec2_instance(self, name: str, attrs: Dict) -> Node:
"""Parse EC2 instance resource."""
return self._create_node(
'service',
name,
{
'instance_type': attrs.get('instance_type'),
'availability_zone': attrs.get('availability_zone'),
'private_ip': attrs.get('private_ip'),
'tags': attrs.get('tags', {})
}
)
def _parse_rds_cluster(self, name: str, attrs: Dict) -> Node:
"""Parse RDS cluster resource."""
return self._create_node(
'database',
name,
{
'engine': attrs.get('engine'),
'engine_version': attrs.get('engine_version'),
'endpoint': attrs.get('endpoint'),
'database_name': attrs.get('database_name')
}
)
def _parse_elasticache(self, name: str, attrs: Dict) -> Node:
"""Parse ElastiCache cluster resource."""
return self._create_node(
'cache',
name,
{
'engine': attrs.get('engine'),
'node_type': attrs.get('node_type'),
'num_cache_nodes': attrs.get('num_cache_nodes')
}
)
Register the connector
Add the connector to
connectors/__init__.py:from .base import BaseConnector, Node, Edge
from .docker_compose import DockerComposeConnector
from .teams import TeamsConnector
from .kubernetes import KubernetesConnector
from .terraform import TerraformConnector # Add this line
__all__ = [
'BaseConnector',
'Node',
'Edge',
'DockerComposeConnector',
'TeamsConnector',
'KubernetesConnector',
'TerraformConnector', # Add this line
]
Use the connector
Add the connector to the data loading process in
chat/app.py:from connectors import (
DockerComposeConnector,
TeamsConnector,
KubernetesConnector,
TerraformConnector # Add import
)
async def load_configuration_data():
# ... existing code ...
# Load Terraform data
terraform_file = data_dir / "terraform.tfstate"
if terraform_file.exists():
connector = TerraformConnector()
nodes, edges = connector.parse(str(terraform_file))
all_nodes.extend(nodes)
all_edges.extend(edges)
logger.info(f"Loaded {len(nodes)} nodes and {len(edges)} edges from Terraform")
Test the connector
Create a test file:Run the test:
import pytest
from connectors.terraform import TerraformConnector
def test_terraform_connector():
connector = TerraformConnector()
nodes, edges = connector.parse('data/terraform.tfstate')
assert len(nodes) > 0
assert len(edges) >= 0
# Verify node format
for node in nodes:
assert node.id.startswith('service:') or \
node.id.startswith('database:') or \
node.id.startswith('cache:')
assert node.type in ['service', 'database', 'cache']
assert node.name
pytest tests/test_terraform_connector.py -v
Example: API Connector
Connectors don’t have to parse files—they can also fetch data from APIs:import requests
from typing import Dict, List, Any
from .base import BaseConnector, Node, Edge
import os
class PagerDutyConnector(BaseConnector):
"""Connector for fetching data from PagerDuty API."""
def __init__(self, api_token: str = None):
super().__init__("pagerduty")
self.api_token = api_token or os.getenv('PAGERDUTY_API_TOKEN')
self.base_url = "https://api.pagerduty.com"
def parse(self, file_path: str = None) -> tuple[List[Node], List[Edge]]:
"""Fetch data from PagerDuty API."""
self.logger.info("Fetching data from PagerDuty API")
nodes = []
edges = []
# Fetch services
services = self._fetch_services()
for service in services:
service_id = service['id']
service_name = service['name']
# Create service node
node = self._create_node(
'service',
service_name,
{
'pagerduty_id': service_id,
'status': service.get('status'),
'escalation_policy': service.get('escalation_policy', {}).get('summary')
}
)
nodes.append(node)
# Get oncall for this service
oncalls = self._fetch_oncalls(service_id)
for oncall in oncalls:
team_name = oncall.get('escalation_policy', {}).get('summary')
if team_name:
edge = self._create_edge(
'oncall_for',
f"team:{team_name}",
f"service:{service_name}"
)
edges.append(edge)
self.logger.info(f"Fetched {len(nodes)} nodes and {len(edges)} edges")
return nodes, edges
def _fetch_services(self) -> List[Dict]:
"""Fetch services from PagerDuty."""
headers = {
'Authorization': f'Token token={self.api_token}',
'Accept': 'application/vnd.pagerduty+json;version=2'
}
response = requests.get(
f"{self.base_url}/services",
headers=headers
)
response.raise_for_status()
return response.json().get('services', [])
def _fetch_oncalls(self, service_id: str) -> List[Dict]:
"""Fetch oncall schedules for a service."""
headers = {
'Authorization': f'Token token={self.api_token}',
'Accept': 'application/vnd.pagerduty+json;version=2'
}
response = requests.get(
f"{self.base_url}/oncalls",
headers=headers,
params={'service_ids[]': service_id}
)
response.raise_for_status()
return response.json().get('oncalls', [])
Best Practices
Use consistent node IDs
Always use the format
type:name for node IDs:# Good
node_id = f"service:{service_name}"
# Bad
node_id = service_name
Handle errors gracefully
Return empty lists on parse errors instead of raising exceptions:
try:
data = parse_file(file_path)
except Exception as e:
self.logger.error(f"Parse error: {e}")
return [], [] # Empty nodes and edges
Log parsing progress
Use the built-in logger to track progress:
self.logger.info(f"Parsing {file_path}")
self.logger.debug(f"Found {len(services)} services")
self.logger.warning(f"Skipping invalid service: {name}")
Extract properties
Include relevant metadata as node properties:
properties = {
'team': labels.get('team'),
'oncall': labels.get('oncall'),
'port': port,
'environment': 'production'
}
# Filter out None values
properties = {k: v for k, v in properties.items() if v is not None}
Node and Edge Types
Standard Node Types
service: Application servicesdatabase: Databases (PostgreSQL, MySQL, etc.)cache: Cache systems (Redis, Memcached)team: Engineering teamsresource: Generic infrastructure resources
Standard Edge Types
calls: Service-to-service calls (REST, gRPC)uses: Service uses database/cachedepends_on: Explicit dependencyowns: Team owns service/resourcedeploys: CI/CD relationship
You can define custom node and edge types for your domain. The query engine treats all types uniformly.
Registering Multiple File Formats
A single connector can handle multiple file formats:class InfrastructureConnector(BaseConnector):
"""Connector that handles multiple formats."""
def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
"""Parse file based on extension."""
if file_path.endswith('.json'):
return self._parse_json(file_path)
elif file_path.endswith('.yaml') or file_path.endswith('.yml'):
return self._parse_yaml(file_path)
elif file_path.endswith('.toml'):
return self._parse_toml(file_path)
else:
self.logger.error(f"Unsupported file format: {file_path}")
return [], []
Next Steps
- Review the Docker Compose connector source code
- Check the Teams connector for a simpler example
- Learn about querying to test your connector’s output
- Read the data sources guide for integration patterns