Overview
Engineering Knowledge Graph uses a layered architecture that separates concerns and makes the system extensible:
Connectors Parse raw config files into nodes and edges
Graph Storage Persist and query the knowledge graph in Neo4j
Query Engine Execute graph traversals and analysis
Chat Interface Translate natural language to graph queries
Data flow
Here’s how a query flows through the system:
User asks a question
“What breaks if redis-main goes down?”
LLM parses intent
Gemini identifies: blast_radius query for cache:redis-main
Query engine executes
Runs Cypher queries to find upstream/downstream dependencies
Results formatted
Gemini converts raw graph data into a natural language response
Layer 1: Connectors
Connectors parse configuration files and convert them into graph nodes and edges.
Base connector interface
All connectors implement the BaseConnector abstract class:
from abc import ABC , abstractmethod
from typing import Dict, List, Any
from pydantic import BaseModel
class Node ( BaseModel ):
"""Represents a node in the knowledge graph."""
id : str # Format: "type:name" (e.g., "service:payment-service")
type : str # Node type: service, database, cache, team, etc.
name: str # Human-readable name
properties: Dict[ str , Any] = {} # Arbitrary metadata
class Edge ( BaseModel ):
"""Represents an edge in the knowledge graph."""
id : str # Format: "edge:source-type-target"
type : str # Relationship type: calls, uses, owns, depends_on
source: str # Source node ID
target: str # Target node ID
properties: Dict[ str , Any] = {}
class BaseConnector ( ABC ):
"""Base class for all connectors."""
@abstractmethod
def parse ( self , file_path : str ) -> tuple[List[Node], List[Edge]]:
"""Parse a configuration file and return nodes and edges."""
pass
Docker Compose connector
The DockerComposeConnector extracts services, dependencies, and ownership:
connectors/docker_compose.py
class DockerComposeConnector ( BaseConnector ):
def parse ( self , file_path : str ) -> tuple[List[Node], List[Edge]]:
with open (file_path, 'r' ) as f:
compose_data = yaml.safe_load(f)
nodes = []
edges = []
for service_name, service_config in compose_data.get( 'services' , {}).items():
# Create service node
service_node = self ._create_service_node(service_name, service_config)
nodes.append(service_node)
# Extract dependencies from depends_on
for dependency in service_config.get( 'depends_on' , []):
edges.append( self ._create_edge(
'depends_on' ,
f "service: { service_name } " ,
f "service: { dependency } "
))
# Extract dependencies from environment variables
env_vars = self ._extract_env_vars(service_config)
# Parse DATABASE_URL, REDIS_URL, SERVICE_URL patterns
for dep in self ._extract_service_dependencies_from_env(env_vars):
edges.append( self ._create_edge(
'calls' ,
f "service: { service_name } " ,
f "service: { dep } "
))
return nodes, edges
How dependency extraction works
Teams connector
The TeamsConnector extracts ownership information:
class TeamsConnector ( BaseConnector ):
def parse ( self , file_path : str ) -> tuple[List[Node], List[Edge]]:
with open (file_path, 'r' ) as f:
teams_data = yaml.safe_load(f)
nodes = []
edges = []
for team in teams_data.get( 'teams' , []):
# Create team node
team_node = self ._create_node(
'team' ,
team[ 'name' ],
{ 'contact' : team.get( 'contact' )}
)
nodes.append(team_node)
# Create OWNS edges to assets
for asset_name in team.get( 'owns' , []):
# Infer asset type from name patterns
asset_type = self ._infer_asset_type(asset_name)
edges.append( self ._create_edge(
'owns' ,
team_node.id,
f " { asset_type } : { asset_name } "
))
return nodes, edges
Adding a custom connector
To add a new data source (e.g., Terraform):
Create the connector class
from .base import BaseConnector, Node, Edge
import hcl2 # HashiCorp Configuration Language parser
class TerraformConnector ( BaseConnector ):
def __init__ ( self ):
super (). __init__ ( "terraform" )
def parse ( self , file_path : str ) -> tuple[List[Node], List[Edge]]:
with open (file_path, 'r' ) as f:
tf_data = hcl2.load(f)
nodes = []
edges = []
# Parse resources
for resource in tf_data.get( 'resource' , []):
# Extract RDS instances, S3 buckets, etc.
# Create appropriate nodes and edges
pass
return nodes, edges
Register in __init__.py
from .terraform import TerraformConnector
__all__ = [ 'TerraformConnector' , ... ]
Use in main.py
from connectors import TerraformConnector
tf_file = data_dir / "main.tf"
if tf_file.exists():
connector = TerraformConnector()
nodes, edges = connector.parse( str (tf_file))
all_nodes.extend(nodes)
all_edges.extend(edges)
Layer 2: Graph storage
The GraphStorage class handles all Neo4j interactions with a clean interface.
Storage interface
from neo4j import GraphDatabase
class GraphStorage :
def __init__ ( self , uri : str = None , user : str = None , password : str = None ):
self .uri = uri or os.getenv( 'NEO4J_URI' , 'bolt://localhost:7687' )
self .user = user or os.getenv( 'NEO4J_USER' , 'neo4j' )
self .password = password or os.getenv( 'NEO4J_PASSWORD' , 'password' )
self .driver = GraphDatabase.driver(
self .uri,
auth = ( self .user, self .password)
)
def add_node ( self , node : Node):
"""Add or update a node using MERGE for upsert behavior."""
with self .driver.session() as session:
query = """
MERGE (n {id: $id} )
SET n.type = $type,
n.name = $name,
n += $properties
RETURN n
"""
session.run(query, {
'id' : node.id,
'type' : node.type,
'name' : node.name,
'properties' : node.properties
})
def add_edge ( self , edge : Edge):
"""Add or update an edge with dynamic relationship type."""
with self .driver.session() as session:
# Ensure both nodes exist
session.run( "MERGE (n {id: $id} )" , { 'id' : edge.source})
session.run( "MERGE (n {id: $id} )" , { 'id' : edge.target})
# Create relationship
query = f """
MATCH (source {{ id: $source }} )
MATCH (target {{ id: $target }} )
MERGE (source)-[r: { edge.type.upper() } ]->(target)
SET r += $properties
RETURN r
"""
session.run(query, {
'source' : edge.source,
'target' : edge.target,
'properties' : edge.properties
})
Why MERGE? Using MERGE instead of CREATE provides upsert behavior - if a node or edge already exists, it’s updated rather than duplicated. This is critical when loading data from multiple sources that might reference the same entities.
Cypher execution
For complex queries, you can execute raw Cypher:
def execute_cypher ( self , query : str , parameters : Dict[ str , Any] = None ) -> List[Dict[ str , Any]]:
"""Execute a custom Cypher query."""
with self .driver.session() as session:
result = session.run(query, parameters or {})
return [ dict (record) for record in result]
Example usage:
# Find all services owned by a specific team
result = storage.execute_cypher( """
MATCH (team {type: 'team', name: $team_name})-[:OWNS]->(asset)
RETURN asset.name as asset_name, asset.type as asset_type
ORDER BY asset.type, asset.name
""" , { 'team_name' : 'platform-team' })
Layer 3: Query engine
The QueryEngine provides high-level methods for common graph analysis patterns.
Core query methods
downstream()
upstream()
blast_radius()
path()
def downstream ( self , node_id : str , max_depth : int = 10 ) -> List[Dict[ str , Any]]:
"""Get all transitive dependencies (what this node depends on)."""
query = f """
MATCH path = (start {{ id: $node_id }} )-[r*1.. { max_depth } ]->(dependency)
WITH dependency, min(length(path)) as distance
RETURN dependency, distance
ORDER BY distance, dependency.name
"""
result = self .storage.execute_cypher(query, { 'node_id' : node_id})
return [
{ ** dict (record[ 'dependency' ]), 'distance' : record[ 'distance' ]}
for record in result
]
Cycle prevention
How cycles are handled: The Cypher queries use [r*1..{max_depth}] syntax with a configurable depth limit (default 10). Neo4j’s graph database engine handles cycle detection internally and won’t infinitely loop. The min(length(path)) function ensures we find the shortest path between nodes.
Layer 4: Chat interface
The chat interface uses Google Gemini to parse natural language queries and format responses.
LLM integration
import google.genai as genai
import json
class GeminiLLM :
def __init__ ( self , api_key : str = None ):
self .api_key = api_key or os.getenv( 'GEMINI_API_KEY' )
self .client = genai.Client( api_key = self .api_key)
def parse_query_intent ( self , user_query : str ) -> Dict[ str , Any]:
"""Parse user query to determine intent and extract parameters."""
system_prompt = """
You are an expert at parsing natural language queries about engineering
infrastructure and converting them to structured queries.
Query types: get_node, get_nodes, downstream, upstream, blast_radius,
path, get_owner, get_team_assets
Node ID format: "type:name" (e.g., "service:order-service")
Respond with JSON only:
{
"query_type": "...",
"parameters": {...},
"confidence": 0.0-1.0
}
"""
response = self .client.models.generate_content(
model = 'gemini-2.5-flash' ,
contents = system_prompt + f " \n\n User query: { user_query } "
)
return json.loads(response.text.strip())
Query parser
The QueryParser connects the LLM to the query engine:
class QueryParser :
def __init__ ( self , query_engine : QueryEngine, llm : GeminiLLM):
self .query_engine = query_engine
self .llm = llm
def process_query ( self , user_query : str , session_id : str = "default" ) -> Dict[ str , Any]:
# Parse intent with LLM
intent = self .llm.parse_query_intent(user_query)
# Execute graph query
query_type = intent[ 'query_type' ]
params = intent[ 'parameters' ]
if query_type == 'downstream' :
result = self .query_engine.downstream(params[ 'node_id' ])
elif query_type == 'upstream' :
result = self .query_engine.upstream(params[ 'node_id' ])
elif query_type == 'blast_radius' :
result = self .query_engine.blast_radius(params[ 'node_id' ])
# ... handle other query types
# Format response with LLM
response = self .llm.format_response(result, user_query, query_type)
return {
'response' : response,
'query_type' : query_type,
'confidence' : intent[ 'confidence' ],
'raw_result' : result
}
FastAPI web app
The web interface exposes the query system via REST API:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI( title = "Engineering Knowledge Graph" )
class QueryRequest ( BaseModel ):
query: str
session_id: str = "default"
class QueryResponse ( BaseModel ):
response: str
query_type: str
confidence: float
raw_result: Any = None
@app.post ( "/api/query" , response_model = QueryResponse)
async def process_query ( query_request : QueryRequest):
result = query_parser.process_query(
query_request.query,
query_request.session_id
)
return QueryResponse(
response = result[ 'response' ],
query_type = result[ 'query_type' ],
confidence = result.get( 'confidence' , 0.0 ),
raw_result = result.get( 'raw_result' )
)
@app.get ( "/api/health" )
async def health_check ():
return {
"status" : "healthy" ,
"components" : {
"storage" : storage is not None ,
"query_engine" : query_engine is not None ,
"query_parser" : query_parser is not None ,
"neo4j" : test_neo4j_connection()
}
}
Deployment architecture
The Docker Compose setup runs two services:
version : '3.8'
services :
neo4j :
image : neo4j:5.15
environment :
- NEO4J_AUTH=neo4j/password
- NEO4J_PLUGINS=["apoc"]
ports :
- "7474:7474" # Web UI
- "7687:7687" # Bolt protocol
volumes :
- neo4j_data:/data
- neo4j_logs:/logs
healthcheck :
test : [ "CMD" , "cypher-shell" , "-u" , "neo4j" , "-p" , "password" , "RETURN 1" ]
interval : 10s
timeout : 5s
retries : 5
ekg-app :
build : .
ports :
- "8000:8000"
environment :
- NEO4J_URI=bolt://neo4j:7687
- NEO4J_USER=neo4j
- NEO4J_PASSWORD=password
- GEMINI_API_KEY=${GEMINI_API_KEY}
depends_on :
neo4j :
condition : service_healthy
volumes :
- ./data:/app/data
command : [ "python" , "-m" , "uvicorn" , "chat.app:app" , "--host" , "0.0.0.0" , "--port" , "8000" ]
volumes :
neo4j_data :
neo4j_logs :
Production considerations: This is a prototype. For production use:
Add authentication/authorization
Use managed Neo4j (Aura) or proper clustering
Implement rate limiting on the API
Add caching for query results
Use async Neo4j driver for better concurrency
Monitor Gemini API usage and costs
Design decisions
Why Neo4j over other graph databases?
Neo4j was chosen for:
Mature Cypher query language for complex graph traversals
ACID transactions and reliability
Rich ecosystem with Python driver and Docker support
Graph algorithms built-in (shortest path, centrality)
Visualization tools for debugging and exploration
Alternatives considered:
AWS Neptune: Vendor lock-in, more complex setup
ArangoDB: Multi-model but less graph-focused tooling
NetworkX: Python-native but no persistence or concurrent access
Why Gemini over other LLMs?
Gemini 2.5 Flash provides:
Fast response times (< 1 second)
Generous free tier for prototyping
Good structured output (JSON) capabilities
Lower cost than GPT-4
The system could easily swap to OpenAI, Anthropic, or open-source LLMs by changing the GeminiLLM class.
How does the system handle stale data?
Current approach uses full reload via storage.clear_graph() and re-parsing all files. For production, implement:
File watching with checksums to detect changes
Incremental updates using node/edge timestamps
Merge strategy for conflicting data from multiple sources
API endpoint /api/reload for manual refresh
Bottlenecks at 10K+ nodes:
LLM API calls - Rate limits and latency for query parsing
Complex Cypher queries - Deep traversals without proper indexing
Memory usage - Loading large result sets into Python objects
Web interface - Rendering large result lists
Solutions:
Query result pagination
Neo4j indexing on id and type properties
Async LLM calls with request batching
Result caching (Redis or in-memory)
Query optimization (limit depth, use specific edge types)
Next steps
API reference Explore all available endpoints and query methods
Custom Connectors Build custom connectors and contribute to EKG