The upstream() method traverses the graph to find all nodes that depend on the starting node. This is critical for understanding impact - if you change or break this node, what else will be affected?The method performs a reverse traversal following incoming edges to the starting node, respecting the maximum depth constraint.
from graph.query import QueryEnginequery_engine = QueryEngine(storage)# Find all services that depend on a databasedependents = query_engine.upstream( node_id="database:users-db")print(f"Services affected if users-db goes down: {len(dependents)}")for dep in dependents: print(f"{dep['name']} ({dep['type']}) - distance: {dep['distance']}")
# Before making a breaking change to a librarylibrary_id = "library:auth-lib"consumers = query_engine.upstream(node_id=library_id)if len(consumers) > 0: print(f"WARNING: {len(consumers)} components depend on {library_id}") print("\nAffected components:") for consumer in consumers: print(f" - {consumer['name']} (distance: {consumer['distance']})")else: print("Safe to modify - no consumers found")
# Only find services that directly DEPEND_ON this database# (not just USE it for caching)direct_dependents = query_engine.upstream( node_id="database:postgres-main", edge_types=["DEPENDS_ON"])print(f"Critical dependents: {len(direct_dependents)}")
# Get only direct consumers (1 hop away)immediate = query_engine.upstream( node_id="service:auth-service", max_depth=1)print("Direct consumers:")for node in immediate: print(f" - {node['name']}")
affected = query_engine.upstream(node_id="database:analytics-db")# Categorize by typeby_type = {}for node in affected: node_type = node['type'] if node_type not in by_type: by_type[node_type] = [] by_type[node_type].append(node['name'])print("Impact by type:")for node_type, names in by_type.items(): print(f" {node_type}: {len(names)} affected") for name in names[:3]: # Show first 3 print(f" - {name}")