Overview
The BaseConnector class is an abstract base class that defines the interface for all connectors in the Engineering Knowledge Graph. Connectors parse configuration files from different sources (Docker Compose, Kubernetes, Teams) and transform them into a unified graph structure of nodes and edges.
Class Definition
from abc import ABC, abstractmethod
from typing import Dict, List, Any
from connectors.base import BaseConnector, Node, Edge
class BaseConnector(ABC):
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"connector.{name}")
Source: connectors/base.py:30-35
Data Models
Node
Represents a node in the knowledge graph.
class Node(BaseModel):
"""Represents a node in the knowledge graph."""
id: str
type: str
name: str
properties: Dict[str, Any] = {}
Source: connectors/base.py:13-18
Unique identifier for the node, typically formatted as {type}:{name}
Type of the node (e.g., “service”, “database”, “cache”, “team”, “deployment”)
Human-readable name of the node
properties
Dict[str, Any]
default:"{}"
Additional metadata and attributes for the node
Edge
Represents a relationship between two nodes in the knowledge graph.
class Edge(BaseModel):
"""Represents an edge in the knowledge graph."""
id: str
type: str
source: str
target: str
properties: Dict[str, Any] = {}
Source: connectors/base.py:21-27
Unique identifier for the edge, formatted as edge:{source}-{type}-{target}
Type of relationship (e.g., “depends_on”, “calls”, “uses”, “owns”, “exposes”)
properties
Dict[str, Any]
default:"{}"
Additional metadata for the relationship
Abstract Methods
parse()
Parse a configuration file and return nodes and edges.
@abstractmethod
def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
"""
Parse a configuration file and return nodes and edges.
Args:
file_path: Path to the configuration file
Returns:
Tuple of (nodes, edges)
"""
pass
Source: connectors/base.py:37-48
Path to the configuration file to parse
Returns: tuple[List[Node], List[Edge]]
A tuple containing:
- List of
Node objects extracted from the configuration
- List of
Edge objects representing relationships
Helper Methods
_create_node()
Helper method to create a node with consistent ID format.
def _create_node(self, node_type: str, name: str, properties: Dict[str, Any] = None) -> Node:
"""Helper to create a node with consistent ID format."""
node_id = f"{node_type}:{name}"
return Node(
id=node_id,
type=node_type,
name=name,
properties=properties or {}
)
Source: connectors/base.py:50-58
Type of the node (e.g., “service”, “database”, “team”)
properties
Dict[str, Any]
default:"None"
Optional dictionary of node properties
Returns: Node
_create_edge()
Helper method to create an edge with consistent ID format.
def _create_edge(self, edge_type: str, source: str, target: str, properties: Dict[str, Any] = None) -> Edge:
"""Helper to create an edge with consistent ID format."""
edge_id = f"edge:{source}-{edge_type}-{target}"
return Edge(
id=edge_id,
type=edge_type,
source=source,
target=target,
properties=properties or {}
)
Source: connectors/base.py:60-69
Type of relationship (e.g., “depends_on”, “calls”, “uses”)
properties
Dict[str, Any]
default:"None"
Optional dictionary of edge properties
Returns: Edge
Extract service dependencies from environment variables.
def _extract_service_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]:
"""Extract service dependencies from environment variables."""
dependencies = []
for key, value in env_vars.items():
if key.endswith('_URL') or key.endswith('_SERVICE_URL'):
# Extract service name from URL like http://payment-service:8083
if '://' in value:
url_part = value.split('://')[1]
if ':' in url_part:
service_name = url_part.split(':')[0]
dependencies.append(service_name)
return dependencies
Source: connectors/base.py:71-84
Dictionary of environment variable key-value pairs
Returns: List[str] - List of service names extracted from URLs
Identifies service dependencies by looking for environment variables ending in _URL or _SERVICE_URL and parsing the service name from URLs like http://payment-service:8083.
Extract database and cache dependencies from environment variables.
def _extract_database_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]:
"""Extract database dependencies from environment variables."""
dependencies = []
for key, value in env_vars.items():
if key == 'DATABASE_URL':
# Extract database name from URL like postgresql://user:pass@db-name:5432/dbname
if '@' in value and ':' in value:
parts = value.split('@')[1].split(':')
if parts:
db_name = parts[0]
dependencies.append(db_name)
elif key == 'REDIS_URL':
# Extract redis name from URL like redis://redis-main:6379
if '://' in value:
url_part = value.split('://')[1]
if ':' in url_part:
redis_name = url_part.split(':')[0]
dependencies.append(redis_name)
return dependencies
Source: connectors/base.py:86-106
Dictionary of environment variable key-value pairs
Returns: List[str] - List of database/cache names
Supports:
- PostgreSQL URLs:
postgresql://user:pass@db-name:5432/dbname
- Redis URLs:
redis://redis-main:6379
_get_dependency_type()
Determine the type of dependency based on name and environment key.
def _get_dependency_type(self, dependency_name: str, env_key: str) -> str:
"""Determine the type of dependency based on name and environment key."""
if env_key == 'REDIS_URL' or 'redis' in dependency_name.lower():
return 'cache'
elif env_key == 'DATABASE_URL' or any(db_type in dependency_name.lower() for db_type in ['db', 'database', 'postgres', 'mysql', 'mongo']):
return 'database'
else:
return 'database' # Default to database
Source: connectors/base.py:108-115
Returns: str - Either “cache” or “database”
Usage Example
from connectors.base import BaseConnector, Node, Edge
from typing import List, Dict, Any
class CustomConnector(BaseConnector):
def __init__(self):
super().__init__("custom")
def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
nodes = []
edges = []
# Create a service node
service_node = self._create_node(
"service",
"my-service",
{"port": 8080, "team": "platform"}
)
nodes.append(service_node)
# Create a database node
db_node = self._create_node("database", "postgres-db")
nodes.append(db_node)
# Create relationship
edge = self._create_edge(
"uses",
service_node.id,
db_node.id
)
edges.append(edge)
return nodes, edges
# Usage
connector = CustomConnector()
nodes, edges = connector.parse("config.yaml")