Skip to main content

Method Signature

def upstream(
    node_id: str,
    max_depth: int = 10,
    edge_types: List[str] = None
) -> List[Dict[str, Any]]

Description

The upstream() method traverses the graph to find all nodes that depend on the starting node. This is critical for understanding impact - if you change or break this node, what else will be affected? The method performs a reverse traversal following incoming edges to the starting node, respecting the maximum depth constraint.

Parameters

node_id
str
required
The unique identifier of the starting node. Should be in the format type:name (e.g., database:users-db, library:auth-lib).
max_depth
int
default:"10"
Maximum traversal depth to prevent infinite loops. Controls how far up the dependency chain to search.
edge_types
List[str]
default:"None"
Optional list of edge types to follow during traversal. If provided, only edges matching these types will be followed in reverse. Common edge types:
  • DEPENDS_ON
  • USES
  • IMPORTS
  • CALLS

Returns

dependents
List[Dict[str, Any]]
A list of dependent nodes (consumers), each containing:
Results are ordered by distance (closest first), then by node name alphabetically.

Examples

Basic Usage

from graph.query import QueryEngine

query_engine = QueryEngine(storage)

# Find all services that depend on a database
dependents = query_engine.upstream(
    node_id="database:users-db"
)

print(f"Services affected if users-db goes down: {len(dependents)}")

for dep in dependents:
    print(f"{dep['name']} ({dep['type']}) - distance: {dep['distance']}")

Output Example

Services affected if users-db goes down: 5

auth-service (service) - distance: 1
payment-api (service) - distance: 1
user-profile-api (service) - distance: 1
api-gateway (service) - distance: 2
web-frontend (application) - distance: 3

Change Impact Analysis

# Before making a breaking change to a library
library_id = "library:auth-lib"
consumers = query_engine.upstream(node_id=library_id)

if len(consumers) > 0:
    print(f"WARNING: {len(consumers)} components depend on {library_id}")
    print("\nAffected components:")
    for consumer in consumers:
        print(f"  - {consumer['name']} (distance: {consumer['distance']})")
else:
    print("Safe to modify - no consumers found")

Filter by Edge Type

# Only find services that directly DEPEND_ON this database
# (not just USE it for caching)
direct_dependents = query_engine.upstream(
    node_id="database:postgres-main",
    edge_types=["DEPENDS_ON"]
)

print(f"Critical dependents: {len(direct_dependents)}")

Immediate Dependents Only

# Get only direct consumers (1 hop away)
immediate = query_engine.upstream(
    node_id="service:auth-service",
    max_depth=1
)

print("Direct consumers:")
for node in immediate:
    print(f"  - {node['name']}")

Group by Node Type

affected = query_engine.upstream(node_id="database:analytics-db")

# Categorize by type
by_type = {}
for node in affected:
    node_type = node['type']
    if node_type not in by_type:
        by_type[node_type] = []
    by_type[node_type].append(node['name'])

print("Impact by type:")
for node_type, names in by_type.items():
    print(f"  {node_type}: {len(names)} affected")
    for name in names[:3]:  # Show first 3
        print(f"    - {name}")

Use Cases

  • Change impact analysis: Understand what breaks if you modify this component
  • Deprecation planning: Find all consumers before deprecating an API or service
  • Database maintenance: Identify services affected by database downtime
  • Library upgrades: Find all applications that need updating
  • Incident response: Quickly identify affected services during outages
  • API versioning: Track API consumers for version migrations

Comparison with downstream()

MethodDirectionQuestion Answered
downstream()Follows outgoing edges”What does this depend on?”
upstream()Follows incoming edges”What depends on this?”

See Also

Build docs developers (and LLMs) love