Overview
Scalable trace upload is critical for production agent observability. This guide covers patterns for uploading traces efficiently, handling parent-child relationships, and managing large volumes.Complete Upload Implementation
Here’s a production-ready implementation for uploading traces to LangSmith:upload_traces.py
"""Load traces.json, shift timestamps to now, regenerate IDs, and upload via RunTree."""
import json
from collections import defaultdict
from datetime import datetime, timezone
from dotenv import load_dotenv
load_dotenv()
from langsmith import Client, uuid7
from langsmith.run_trees import RunTree
def parse_dt(s: str | None) -> datetime | None:
"""Parse ISO format datetime string."""
if s is None:
return None
dt = datetime.fromisoformat(s)
if dt.tzinfo is not None:
dt = dt.replace(tzinfo=None)
return dt
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--project", default="default", help="Target project name")
parser.add_argument("--input", default="synthetic_traces.json", help="Input file path")
args = parser.parse_args()
with open(args.input) as f:
runs = json.load(f)
print(f"Loaded {len(runs)} runs from {args.input}")
# Calculate time shift so traces appear recent
latest = max(parse_dt(r["start_time"]) for r in runs if r["start_time"])
time_delta = datetime.now(timezone.utc).replace(tzinfo=None) - latest
print(f"Shifting timestamps by: {time_delta}")
# Build ID map (uuid7 for time-ordering)
id_map = {}
for run in runs:
for field in ("id", "trace_id", "parent_run_id"):
old_id = run.get(field)
if old_id and old_id not in id_map:
id_map[old_id] = str(uuid7())
# Group runs by trace and transform
traces = defaultdict(list)
for run in runs:
traces[run["trace_id"]].append({
"id": id_map[run["id"]],
"parent_run_id": id_map.get(run["parent_run_id"]),
"name": run["name"],
"run_type": run["run_type"],
"inputs": run["inputs"],
"outputs": run.get("outputs"),
"error": run.get("error"),
"extra": run.get("extra"),
"tags": run.get("tags"),
"start_time": parse_dt(run["start_time"]) + time_delta,
"end_time": parse_dt(run["end_time"]) + time_delta if run.get("end_time") else None,
})
client = Client()
print(f"Uploading {len(traces)} traces to project '{args.project}'...")
for i, trace_runs in enumerate(traces.values()):
# Sort by start_time, root first (no parent)
trace_runs.sort(key=lambda r: (r["parent_run_id"] is not None, r["start_time"]))
tree_map = {}
root_tree = None
for run in trace_runs:
if run["parent_run_id"] is None:
# Root run
root_tree = RunTree(
id=run["id"],
name=run["name"],
run_type=run["run_type"],
inputs=run["inputs"],
start_time=run["start_time"],
extra=run.get("extra"),
tags=run.get("tags"),
project_name=args.project,
client=client,
)
tree_map[run["id"]] = root_tree
else:
# Child run
parent = tree_map.get(run["parent_run_id"])
if parent:
child = parent.create_child(
name=run["name"],
run_type=run["run_type"],
run_id=run["id"],
inputs=run["inputs"],
start_time=run["start_time"],
extra=run.get("extra"),
tags=run.get("tags"),
)
tree_map[run["id"]] = child
# End all runs (children first)
for run in reversed(trace_runs):
tree = tree_map.get(run["id"])
if tree:
tree.end(outputs=run.get("outputs"), error=run.get("error"), end_time=run["end_time"])
if root_tree:
root_tree.post(exclude_child_runs=False)
if (i + 1) % 10 == 0:
print(f" Uploaded {i + 1}/{len(traces)} traces")
# Wait for all background operations to complete
print("Flushing...")
client.flush()
print("Done!")
if __name__ == "__main__":
main()
Key Implementation Patterns
1. Timestamp Shifting
When uploading historical or synthetic traces, shift timestamps to make them appear recent:# Find the latest timestamp in your dataset
latest = max(parse_dt(r["start_time"]) for r in runs if r["start_time"])
# Calculate offset to current time
time_delta = datetime.now(timezone.utc).replace(tzinfo=None) - latest
# Apply to all timestamps
start_time = parse_dt(run["start_time"]) + time_delta
end_time = parse_dt(run["end_time"]) + time_delta if run.get("end_time") else None
Ensure your datetime objects are timezone-aware or consistently naive. Mixing the two causes errors.
2. ID Mapping with UUID7
Preserve time-ordering while generating fresh IDs:from langsmith import uuid7
# Build bidirectional ID map
id_map = {}
for run in runs:
for field in ("id", "trace_id", "parent_run_id"):
old_id = run.get(field)
if old_id and old_id not in id_map:
id_map[old_id] = str(uuid7()) # Time-ordered UUIDs
# Remap all IDs
for run in runs:
run["id"] = id_map[run["id"]]
run["trace_id"] = id_map[run["trace_id"]]
if run.get("parent_run_id"):
run["parent_run_id"] = id_map[run["parent_run_id"]]
3. Trace Grouping and Sorting
Group runs by trace and sort to ensure parent runs are processed before children:from collections import defaultdict
# Group by trace_id
traces = defaultdict(list)
for run in runs:
traces[run["trace_id"]].append(run)
# Sort each trace: root first, then by start_time
for trace_runs in traces.values():
trace_runs.sort(key=lambda r: (
r["parent_run_id"] is not None, # False (root) comes before True
r["start_time"]
))
4. Building the Run Tree
Construct the hierarchical trace structure:tree_map = {}
root_tree = None
for run in trace_runs:
if run["parent_run_id"] is None:
# Create root run
root_tree = RunTree(
id=run["id"],
name=run["name"],
run_type=run["run_type"],
inputs=run["inputs"],
start_time=run["start_time"],
extra=run.get("extra"),
tags=run.get("tags"),
project_name="your-project",
client=client,
)
tree_map[run["id"]] = root_tree
else:
# Create child run
parent = tree_map.get(run["parent_run_id"])
if parent:
child = parent.create_child(
name=run["name"],
run_type=run["run_type"],
run_id=run["id"],
inputs=run["inputs"],
start_time=run["start_time"],
extra=run.get("extra"),
tags=run.get("tags"),
)
tree_map[run["id"]] = child
5. Ending Runs in Reverse Order
End child runs before parent runs:# Process in reverse to end children first
for run in reversed(trace_runs):
tree = tree_map.get(run["id"])
if tree:
tree.end(
outputs=run.get("outputs"),
error=run.get("error"),
end_time=run["end_time"]
)
6. Uploading and Flushing
Post the complete trace tree and flush at the end:if root_tree:
root_tree.post(exclude_child_runs=False) # Include all children
# After all traces uploaded
client.flush() # Critical: ensures all data is sent
Always call
client.flush() before your script exits. Otherwise, traces may be lost due to background operations not completing.Usage Example
# Upload synthetic traces to a specific project
python upload_traces.py --input synthetic_traces.json --project prod-agent-v2
# Output:
# Loaded 2000 runs from synthetic_traces.json
# Shifting timestamps by: 2 days, 3:24:15.123456
# Uploading 1000 traces to project 'prod-agent-v2'...
# Uploaded 10/1000 traces
# Uploaded 20/1000 traces
# ...
# Flushing...
# Done!
Production Considerations
Rate Limiting
Implement exponential backoff for API rate limits:import time
from requests.exceptions import HTTPError
def upload_with_retry(root_tree, max_retries=3):
for attempt in range(max_retries):
try:
root_tree.post(exclude_child_runs=False)
return
except HTTPError as e:
if e.response.status_code == 429: # Rate limit
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
raise Exception(f"Failed to upload after {max_retries} attempts")
Batch Processing
For large datasets, process in batches:BATCH_SIZE = 100
for i in range(0, len(traces), BATCH_SIZE):
batch = list(traces.values())[i:i + BATCH_SIZE]
upload_batch(batch, client)
client.flush() # Flush after each batch
time.sleep(1) # Rate limiting
Error Recovery
Log failed traces for retry:import logging
failed_traces = []
for trace_runs in traces.values():
try:
upload_trace(trace_runs, client)
except Exception as e:
logging.error(f"Failed to upload trace: {e}")
failed_traces.append(trace_runs)
# Write failed traces to disk for manual inspection
if failed_traces:
with open("failed_traces.json", "w") as f:
json.dump(failed_traces, f)
Performance Optimization
Parallel Upload
Use thread pools for concurrent uploads:from concurrent.futures import ThreadPoolExecutor, as_completed
def upload_trace(trace_runs, project_name):
client = Client() # Thread-local client
# ... upload logic ...
client.flush()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(upload_trace, trace_runs, args.project)
for trace_runs in traces.values()
]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logging.error(f"Upload failed: {e}")
Be cautious with parallelization. Too many concurrent uploads can trigger rate limits or exhaust connection pools.
Next Steps
Online Evaluation
Set up continuous evaluation on uploaded traces
Production Overview
Learn more about production deployment strategies