Skip to main content

Overview

Engineering Knowledge Graph uses a layered architecture that separates concerns and makes the system extensible:

Connectors

Parse raw config files into nodes and edges

Graph Storage

Persist and query the knowledge graph in Neo4j

Query Engine

Execute graph traversals and analysis

Chat Interface

Translate natural language to graph queries

Data flow

Here’s how a query flows through the system:
1

User asks a question

“What breaks if redis-main goes down?”
2

LLM parses intent

Gemini identifies: blast_radius query for cache:redis-main
3

Query engine executes

Runs Cypher queries to find upstream/downstream dependencies
4

Results formatted

Gemini converts raw graph data into a natural language response

Layer 1: Connectors

Connectors parse configuration files and convert them into graph nodes and edges.

Base connector interface

All connectors implement the BaseConnector abstract class:
connectors/base.py
from abc import ABC, abstractmethod
from typing import Dict, List, Any
from pydantic import BaseModel

class Node(BaseModel):
    """Represents a node in the knowledge graph."""
    id: str          # Format: "type:name" (e.g., "service:payment-service")
    type: str        # Node type: service, database, cache, team, etc.
    name: str        # Human-readable name
    properties: Dict[str, Any] = {}  # Arbitrary metadata

class Edge(BaseModel):
    """Represents an edge in the knowledge graph."""
    id: str          # Format: "edge:source-type-target"
    type: str        # Relationship type: calls, uses, owns, depends_on
    source: str      # Source node ID
    target: str      # Target node ID
    properties: Dict[str, Any] = {}

class BaseConnector(ABC):
    """Base class for all connectors."""
    
    @abstractmethod
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        """Parse a configuration file and return nodes and edges."""
        pass

Docker Compose connector

The DockerComposeConnector extracts services, dependencies, and ownership:
connectors/docker_compose.py
class DockerComposeConnector(BaseConnector):
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        with open(file_path, 'r') as f:
            compose_data = yaml.safe_load(f)
        
        nodes = []
        edges = []
        
        for service_name, service_config in compose_data.get('services', {}).items():
            # Create service node
            service_node = self._create_service_node(service_name, service_config)
            nodes.append(service_node)
            
            # Extract dependencies from depends_on
            for dependency in service_config.get('depends_on', []):
                edges.append(self._create_edge(
                    'depends_on',
                    f"service:{service_name}",
                    f"service:{dependency}"
                ))
            
            # Extract dependencies from environment variables
            env_vars = self._extract_env_vars(service_config)
            
            # Parse DATABASE_URL, REDIS_URL, SERVICE_URL patterns
            for dep in self._extract_service_dependencies_from_env(env_vars):
                edges.append(self._create_edge(
                    'calls',
                    f"service:{service_name}",
                    f"service:{dep}"
                ))
        
        return nodes, edges
The connector parses environment variables to find implicit dependencies:
def _extract_database_dependencies_from_env(self, env_vars: Dict[str, str]) -> List[str]:
    dependencies = []
    
    for key, value in env_vars.items():
        if key == 'DATABASE_URL':
            # Parse: postgresql://user:pass@db-name:5432/dbname
            if '@' in value:
                db_name = value.split('@')[1].split(':')[0]
                dependencies.append(db_name)
        elif key == 'REDIS_URL':
            # Parse: redis://redis-main:6379
            if '://' in value:
                redis_name = value.split('://')[1].split(':')[0]
                dependencies.append(redis_name)
    
    return dependencies
For example, this environment variable:
DATABASE_URL=postgresql://postgres:secret@orders-db:5432/orders
Creates an edge: service:order-service --uses--> database:orders-db

Teams connector

The TeamsConnector extracts ownership information:
connectors/teams.py
class TeamsConnector(BaseConnector):
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        with open(file_path, 'r') as f:
            teams_data = yaml.safe_load(f)
        
        nodes = []
        edges = []
        
        for team in teams_data.get('teams', []):
            # Create team node
            team_node = self._create_node(
                'team',
                team['name'],
                {'contact': team.get('contact')}
            )
            nodes.append(team_node)
            
            # Create OWNS edges to assets
            for asset_name in team.get('owns', []):
                # Infer asset type from name patterns
                asset_type = self._infer_asset_type(asset_name)
                edges.append(self._create_edge(
                    'owns',
                    team_node.id,
                    f"{asset_type}:{asset_name}"
                ))
        
        return nodes, edges

Adding a custom connector

To add a new data source (e.g., Terraform):
1

Create the connector class

connectors/terraform.py
from .base import BaseConnector, Node, Edge
import hcl2  # HashiCorp Configuration Language parser

class TerraformConnector(BaseConnector):
    def __init__(self):
        super().__init__("terraform")
    
    def parse(self, file_path: str) -> tuple[List[Node], List[Edge]]:
        with open(file_path, 'r') as f:
            tf_data = hcl2.load(f)
        
        nodes = []
        edges = []
        
        # Parse resources
        for resource in tf_data.get('resource', []):
            # Extract RDS instances, S3 buckets, etc.
            # Create appropriate nodes and edges
            pass
        
        return nodes, edges
2

Register in __init__.py

connectors/__init__.py
from .terraform import TerraformConnector

__all__ = ['TerraformConnector', ...]
3

Use in main.py

main.py
from connectors import TerraformConnector

tf_file = data_dir / "main.tf"
if tf_file.exists():
    connector = TerraformConnector()
    nodes, edges = connector.parse(str(tf_file))
    all_nodes.extend(nodes)
    all_edges.extend(edges)

Layer 2: Graph storage

The GraphStorage class handles all Neo4j interactions with a clean interface.

Storage interface

graph/storage.py
from neo4j import GraphDatabase

class GraphStorage:
    def __init__(self, uri: str = None, user: str = None, password: str = None):
        self.uri = uri or os.getenv('NEO4J_URI', 'bolt://localhost:7687')
        self.user = user or os.getenv('NEO4J_USER', 'neo4j')
        self.password = password or os.getenv('NEO4J_PASSWORD', 'password')
        
        self.driver = GraphDatabase.driver(
            self.uri,
            auth=(self.user, self.password)
        )
    
    def add_node(self, node: Node):
        """Add or update a node using MERGE for upsert behavior."""
        with self.driver.session() as session:
            query = """
            MERGE (n {id: $id})
            SET n.type = $type,
                n.name = $name,
                n += $properties
            RETURN n
            """
            session.run(query, {
                'id': node.id,
                'type': node.type,
                'name': node.name,
                'properties': node.properties
            })
    
    def add_edge(self, edge: Edge):
        """Add or update an edge with dynamic relationship type."""
        with self.driver.session() as session:
            # Ensure both nodes exist
            session.run("MERGE (n {id: $id})", {'id': edge.source})
            session.run("MERGE (n {id: $id})", {'id': edge.target})
            
            # Create relationship
            query = f"""
            MATCH (source {{id: $source}})
            MATCH (target {{id: $target}})
            MERGE (source)-[r:{edge.type.upper()}]->(target)
            SET r += $properties
            RETURN r
            """
            session.run(query, {
                'source': edge.source,
                'target': edge.target,
                'properties': edge.properties
            })
Why MERGE? Using MERGE instead of CREATE provides upsert behavior - if a node or edge already exists, it’s updated rather than duplicated. This is critical when loading data from multiple sources that might reference the same entities.

Cypher execution

For complex queries, you can execute raw Cypher:
def execute_cypher(self, query: str, parameters: Dict[str, Any] = None) -> List[Dict[str, Any]]:
    """Execute a custom Cypher query."""
    with self.driver.session() as session:
        result = session.run(query, parameters or {})
        return [dict(record) for record in result]
Example usage:
# Find all services owned by a specific team
result = storage.execute_cypher("""
    MATCH (team {type: 'team', name: $team_name})-[:OWNS]->(asset)
    RETURN asset.name as asset_name, asset.type as asset_type
    ORDER BY asset.type, asset.name
""", {'team_name': 'platform-team'})

Layer 3: Query engine

The QueryEngine provides high-level methods for common graph analysis patterns.

Core query methods

def downstream(self, node_id: str, max_depth: int = 10) -> List[Dict[str, Any]]:
    """Get all transitive dependencies (what this node depends on)."""
    query = f"""
    MATCH path = (start {{id: $node_id}})-[r*1..{max_depth}]->(dependency)
    WITH dependency, min(length(path)) as distance
    RETURN dependency, distance
    ORDER BY distance, dependency.name
    """
    
    result = self.storage.execute_cypher(query, {'node_id': node_id})
    return [
        {**dict(record['dependency']), 'distance': record['distance']}
        for record in result
    ]

Cycle prevention

How cycles are handled: The Cypher queries use [r*1..{max_depth}] syntax with a configurable depth limit (default 10). Neo4j’s graph database engine handles cycle detection internally and won’t infinitely loop. The min(length(path)) function ensures we find the shortest path between nodes.

Layer 4: Chat interface

The chat interface uses Google Gemini to parse natural language queries and format responses.

LLM integration

chat/llm.py
import google.genai as genai
import json

class GeminiLLM:
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.getenv('GEMINI_API_KEY')
        self.client = genai.Client(api_key=self.api_key)
    
    def parse_query_intent(self, user_query: str) -> Dict[str, Any]:
        """Parse user query to determine intent and extract parameters."""
        system_prompt = """
        You are an expert at parsing natural language queries about engineering
        infrastructure and converting them to structured queries.
        
        Query types: get_node, get_nodes, downstream, upstream, blast_radius,
                     path, get_owner, get_team_assets
        
        Node ID format: "type:name" (e.g., "service:order-service")
        
        Respond with JSON only:
        {
          "query_type": "...",
          "parameters": {...},
          "confidence": 0.0-1.0
        }
        """
        
        response = self.client.models.generate_content(
            model='gemini-2.5-flash',
            contents=system_prompt + f"\n\nUser query: {user_query}"
        )
        
        return json.loads(response.text.strip())

Query parser

The QueryParser connects the LLM to the query engine:
chat/query_parser.py
class QueryParser:
    def __init__(self, query_engine: QueryEngine, llm: GeminiLLM):
        self.query_engine = query_engine
        self.llm = llm
    
    def process_query(self, user_query: str, session_id: str = "default") -> Dict[str, Any]:
        # Parse intent with LLM
        intent = self.llm.parse_query_intent(user_query)
        
        # Execute graph query
        query_type = intent['query_type']
        params = intent['parameters']
        
        if query_type == 'downstream':
            result = self.query_engine.downstream(params['node_id'])
        elif query_type == 'upstream':
            result = self.query_engine.upstream(params['node_id'])
        elif query_type == 'blast_radius':
            result = self.query_engine.blast_radius(params['node_id'])
        # ... handle other query types
        
        # Format response with LLM
        response = self.llm.format_response(result, user_query, query_type)
        
        return {
            'response': response,
            'query_type': query_type,
            'confidence': intent['confidence'],
            'raw_result': result
        }

FastAPI web app

The web interface exposes the query system via REST API:
chat/app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI(title="Engineering Knowledge Graph")

class QueryRequest(BaseModel):
    query: str
    session_id: str = "default"

class QueryResponse(BaseModel):
    response: str
    query_type: str
    confidence: float
    raw_result: Any = None

@app.post("/api/query", response_model=QueryResponse)
async def process_query(query_request: QueryRequest):
    result = query_parser.process_query(
        query_request.query,
        query_request.session_id
    )
    
    return QueryResponse(
        response=result['response'],
        query_type=result['query_type'],
        confidence=result.get('confidence', 0.0),
        raw_result=result.get('raw_result')
    )

@app.get("/api/health")
async def health_check():
    return {
        "status": "healthy",
        "components": {
            "storage": storage is not None,
            "query_engine": query_engine is not None,
            "query_parser": query_parser is not None,
            "neo4j": test_neo4j_connection()
        }
    }

Deployment architecture

The Docker Compose setup runs two services:
docker-compose.yml
version: '3.8'

services:
  neo4j:
    image: neo4j:5.15
    environment:
      - NEO4J_AUTH=neo4j/password
      - NEO4J_PLUGINS=["apoc"]
    ports:
      - "7474:7474"  # Web UI
      - "7687:7687"  # Bolt protocol
    volumes:
      - neo4j_data:/data
      - neo4j_logs:/logs
    healthcheck:
      test: ["CMD", "cypher-shell", "-u", "neo4j", "-p", "password", "RETURN 1"]
      interval: 10s
      timeout: 5s
      retries: 5

  ekg-app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - NEO4J_URI=bolt://neo4j:7687
      - NEO4J_USER=neo4j
      - NEO4J_PASSWORD=password
      - GEMINI_API_KEY=${GEMINI_API_KEY}
    depends_on:
      neo4j:
        condition: service_healthy
    volumes:
      - ./data:/app/data
    command: ["python", "-m", "uvicorn", "chat.app:app", "--host", "0.0.0.0", "--port", "8000"]

volumes:
  neo4j_data:
  neo4j_logs:
Production considerations: This is a prototype. For production use:
  • Add authentication/authorization
  • Use managed Neo4j (Aura) or proper clustering
  • Implement rate limiting on the API
  • Add caching for query results
  • Use async Neo4j driver for better concurrency
  • Monitor Gemini API usage and costs

Design decisions

Neo4j was chosen for:
  • Mature Cypher query language for complex graph traversals
  • ACID transactions and reliability
  • Rich ecosystem with Python driver and Docker support
  • Graph algorithms built-in (shortest path, centrality)
  • Visualization tools for debugging and exploration
Alternatives considered:
  • AWS Neptune: Vendor lock-in, more complex setup
  • ArangoDB: Multi-model but less graph-focused tooling
  • NetworkX: Python-native but no persistence or concurrent access
Gemini 2.5 Flash provides:
  • Fast response times (< 1 second)
  • Generous free tier for prototyping
  • Good structured output (JSON) capabilities
  • Lower cost than GPT-4
The system could easily swap to OpenAI, Anthropic, or open-source LLMs by changing the GeminiLLM class.
Current approach uses full reload via storage.clear_graph() and re-parsing all files.For production, implement:
  • File watching with checksums to detect changes
  • Incremental updates using node/edge timestamps
  • Merge strategy for conflicting data from multiple sources
  • API endpoint /api/reload for manual refresh
Bottlenecks at 10K+ nodes:
  1. LLM API calls - Rate limits and latency for query parsing
  2. Complex Cypher queries - Deep traversals without proper indexing
  3. Memory usage - Loading large result sets into Python objects
  4. Web interface - Rendering large result lists
Solutions:
  • Query result pagination
  • Neo4j indexing on id and type properties
  • Async LLM calls with request batching
  • Result caching (Redis or in-memory)
  • Query optimization (limit depth, use specific edge types)

Next steps

API reference

Explore all available endpoints and query methods

Custom Connectors

Build custom connectors and contribute to EKG

Build docs developers (and LLMs) love