Connectors are the data ingestion layer of the Engineering Knowledge Graph. They parse configuration files from various sources (Docker Compose, Kubernetes, Teams) and transform them into nodes and edges that populate the knowledge graph.
All connectors inherit from BaseConnector (connectors/base.py:30-115), which defines the standard interface:
connectors/base.py
class BaseConnector(ABC): """Base class for all connectors.""" def __init__(self, name: str): self.name = name self.logger = logging.getLogger(f"connector.{name}") @abstractmethod def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]: """ Parse a configuration file and return nodes and edges. Args: file_path: Path to the configuration file Returns: Tuple of (nodes, edges) """ pass
The parse method is the only required implementation for custom connectors. Everything else is optional helper functionality.
The base connector provides utility methods for common operations:
_create_node()
Creates a node with consistent ID formatting:
connectors/base.py
def _create_node(self, node_type: str, name: str, properties: Dict[str, Any] = None) -> Node: """Helper to create a node with consistent ID format.""" node_id = f"{node_type}:{name}" return Node( id=node_id, type=node_type, name=name, properties=properties or {} )
_create_edge()
Creates an edge with consistent ID formatting:
connectors/base.py
def _create_edge(self, edge_type: str, source: str, target: str, properties: Dict[str, Any] = None) -> Edge: """Helper to create an edge with consistent ID format.""" edge_id = f"edge:{source}-{edge_type}-{target}" return Edge( id=edge_id, type=edge_type, source=source, target=target, properties=properties or {} )
_extract_service_dependencies_from_env()
Extracts service dependencies from environment variables:
connectors/base.py
def _extract_service_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]: """Extract service dependencies from environment variables.""" dependencies = [] for key, value in env_vars.items(): if key.endswith('_URL') or key.endswith('_SERVICE_URL'): # Extract service name from URL like http://payment-service:8083 if '://' in value: url_part = value.split('://')[1] if ':' in url_part: service_name = url_part.split(':')[0] dependencies.append(service_name) return dependencies
_extract_database_dependencies_from_env()
Extracts database/cache dependencies from environment variables:
connectors/base.py
def _extract_database_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]: """Extract database dependencies from environment variables.""" dependencies = [] for key, value in env_vars.items(): if key == 'DATABASE_URL': # Extract database name from URL if '@' in value and ':' in value: parts = value.split('@')[1].split(':') if parts: db_name = parts[0] dependencies.append(db_name) elif key == 'REDIS_URL': # Extract redis name from URL if '://' in value: url_part = value.split('://')[1] if ':' in url_part: redis_name = url_part.split(':')[0] dependencies.append(redis_name) return dependencies
Parses docker-compose.yml files to extract services, databases, and their relationships.File: connectors/docker_compose.py:10
connectors/docker_compose.py
class DockerComposeConnector(BaseConnector): """Connector for parsing Docker Compose files.""" def __init__(self): super().__init__("docker_compose") def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]: """Parse docker-compose.yml file.""" with open(file_path, 'r') as f: compose_data = yaml.safe_load(f) nodes = [] edges = [] services = compose_data.get('services', {}) for service_name, service_config in services.items(): # Create service node service_node = self._create_service_node(service_name, service_config) nodes.append(service_node) # Create edges for explicit dependencies depends_on = service_config.get('depends_on', []) for dependency in depends_on: edge = self._create_edge( 'depends_on', service_node.id, f"service:{dependency}" ) edges.append(edge)
Node Extraction
Edge Extraction
Type Detection
The connector creates service nodes with properties extracted from the Docker Compose configuration:
connectors/docker_compose.py
def _create_service_node(self, service_name: str, service_config: Dict[str, Any]) -> Node: labels = service_config.get('labels', {}) ports = service_config.get('ports', []) # Extract port number if available port = None if ports: port_mapping = ports[0] if isinstance(port_mapping, str) and ':' in port_mapping: port = int(port_mapping.split(':')[0]) properties = { 'team': labels.get('team'), 'oncall': labels.get('oncall'), 'port': port, 'image': service_config.get('image'), 'build': service_config.get('build') } return self._create_node(service_type, service_name, properties)
Three types of edges are created:
depends_on: Explicit Docker Compose dependencies
calls: Service-to-service calls inferred from environment variables
uses: Database/cache usage inferred from connection strings
connectors/docker_compose.py
# Create edges for service-to-service callsenv_vars = self._extract_env_vars(service_config)service_deps = self._extract_service_dependencies_from_env(env_vars)for dep_service in service_deps: edge = self._create_edge( 'calls', service_node.id, f"service:{dep_service}" ) edges.append(edge)# Create edges for database/cache dependenciesdb_deps = self._extract_database_dependencies_from_env(env_vars)for db_name in db_deps: edge = self._create_edge( 'uses', service_node.id, f"{target_type}:{db_name}" ) edges.append(edge)
Service types are inferred from images or labels:
connectors/docker_compose.py
def _determine_service_type(self, service_config: Dict[str, Any]) -> str: """Determine the type of service based on configuration.""" labels = service_config.get('labels', {}) image = service_config.get('image', '') # Check labels first if labels.get('type'): return labels['type'] # Infer from image name if 'postgres' in image.lower(): return 'database' elif 'redis' in image.lower(): return 'cache' elif 'mysql' in image.lower() or 'mongodb' in image.lower(): return 'database' else: return 'service'
Dependencies are extracted from environment variables in container specs:
connectors/kubernetes.py
def _extract_k8s_service_dependencies(self, env_vars: Dict[str, str]) -> List[str]: dependencies = [] for key, value in env_vars.items(): if key.endswith('_SERVICE_URL') or key.endswith('_URL'): # Format: http://service-name.namespace.svc.cluster.local:port if 'svc.cluster.local' in value: parts = value.split('://') if len(parts) > 1: host_part = parts[1].split('.')[0] dependencies.append(host_part) return dependencies