Skip to main content

Overview

Scalable trace upload is critical for production agent observability. This guide covers patterns for uploading traces efficiently, handling parent-child relationships, and managing large volumes.

Complete Upload Implementation

Here’s a production-ready implementation for uploading traces to LangSmith:
upload_traces.py
"""Load traces.json, shift timestamps to now, regenerate IDs, and upload via RunTree."""

import json
from collections import defaultdict
from datetime import datetime, timezone

from dotenv import load_dotenv
load_dotenv()

from langsmith import Client, uuid7
from langsmith.run_trees import RunTree


def parse_dt(s: str | None) -> datetime | None:
    """Parse ISO format datetime string."""
    if s is None:
        return None
    dt = datetime.fromisoformat(s)
    if dt.tzinfo is not None:
        dt = dt.replace(tzinfo=None)
    return dt


def main():
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("--project", default="default", help="Target project name")
    parser.add_argument("--input", default="synthetic_traces.json", help="Input file path")
    args = parser.parse_args()

    with open(args.input) as f:
        runs = json.load(f)

    print(f"Loaded {len(runs)} runs from {args.input}")

    # Calculate time shift so traces appear recent
    latest = max(parse_dt(r["start_time"]) for r in runs if r["start_time"])
    time_delta = datetime.now(timezone.utc).replace(tzinfo=None) - latest
    print(f"Shifting timestamps by: {time_delta}")

    # Build ID map (uuid7 for time-ordering)
    id_map = {}
    for run in runs:
        for field in ("id", "trace_id", "parent_run_id"):
            old_id = run.get(field)
            if old_id and old_id not in id_map:
                id_map[old_id] = str(uuid7())

    # Group runs by trace and transform
    traces = defaultdict(list)
    for run in runs:
        traces[run["trace_id"]].append({
            "id": id_map[run["id"]],
            "parent_run_id": id_map.get(run["parent_run_id"]),
            "name": run["name"],
            "run_type": run["run_type"],
            "inputs": run["inputs"],
            "outputs": run.get("outputs"),
            "error": run.get("error"),
            "extra": run.get("extra"),
            "tags": run.get("tags"),
            "start_time": parse_dt(run["start_time"]) + time_delta,
            "end_time": parse_dt(run["end_time"]) + time_delta if run.get("end_time") else None,
        })

    client = Client()
    print(f"Uploading {len(traces)} traces to project '{args.project}'...")

    for i, trace_runs in enumerate(traces.values()):
        # Sort by start_time, root first (no parent)
        trace_runs.sort(key=lambda r: (r["parent_run_id"] is not None, r["start_time"]))

        tree_map = {}
        root_tree = None

        for run in trace_runs:
            if run["parent_run_id"] is None:
                # Root run
                root_tree = RunTree(
                    id=run["id"],
                    name=run["name"],
                    run_type=run["run_type"],
                    inputs=run["inputs"],
                    start_time=run["start_time"],
                    extra=run.get("extra"),
                    tags=run.get("tags"),
                    project_name=args.project,
                    client=client,
                )
                tree_map[run["id"]] = root_tree
            else:
                # Child run
                parent = tree_map.get(run["parent_run_id"])
                if parent:
                    child = parent.create_child(
                        name=run["name"],
                        run_type=run["run_type"],
                        run_id=run["id"],
                        inputs=run["inputs"],
                        start_time=run["start_time"],
                        extra=run.get("extra"),
                        tags=run.get("tags"),
                    )
                    tree_map[run["id"]] = child

        # End all runs (children first)
        for run in reversed(trace_runs):
            tree = tree_map.get(run["id"])
            if tree:
                tree.end(outputs=run.get("outputs"), error=run.get("error"), end_time=run["end_time"])

        if root_tree:
            root_tree.post(exclude_child_runs=False)

        if (i + 1) % 10 == 0:
            print(f"  Uploaded {i + 1}/{len(traces)} traces")

    # Wait for all background operations to complete
    print("Flushing...")
    client.flush()
    print("Done!")


if __name__ == "__main__":
    main()

Key Implementation Patterns

1. Timestamp Shifting

When uploading historical or synthetic traces, shift timestamps to make them appear recent:
# Find the latest timestamp in your dataset
latest = max(parse_dt(r["start_time"]) for r in runs if r["start_time"])

# Calculate offset to current time
time_delta = datetime.now(timezone.utc).replace(tzinfo=None) - latest

# Apply to all timestamps
start_time = parse_dt(run["start_time"]) + time_delta
end_time = parse_dt(run["end_time"]) + time_delta if run.get("end_time") else None
Ensure your datetime objects are timezone-aware or consistently naive. Mixing the two causes errors.

2. ID Mapping with UUID7

Preserve time-ordering while generating fresh IDs:
from langsmith import uuid7

# Build bidirectional ID map
id_map = {}
for run in runs:
    for field in ("id", "trace_id", "parent_run_id"):
        old_id = run.get(field)
        if old_id and old_id not in id_map:
            id_map[old_id] = str(uuid7())  # Time-ordered UUIDs

# Remap all IDs
for run in runs:
    run["id"] = id_map[run["id"]]
    run["trace_id"] = id_map[run["trace_id"]]
    if run.get("parent_run_id"):
        run["parent_run_id"] = id_map[run["parent_run_id"]]
Why uuid7? Unlike uuid4, uuid7 preserves temporal ordering, making trace analysis and debugging easier.

3. Trace Grouping and Sorting

Group runs by trace and sort to ensure parent runs are processed before children:
from collections import defaultdict

# Group by trace_id
traces = defaultdict(list)
for run in runs:
    traces[run["trace_id"]].append(run)

# Sort each trace: root first, then by start_time
for trace_runs in traces.values():
    trace_runs.sort(key=lambda r: (
        r["parent_run_id"] is not None,  # False (root) comes before True
        r["start_time"]
    ))

4. Building the Run Tree

Construct the hierarchical trace structure:
tree_map = {}
root_tree = None

for run in trace_runs:
    if run["parent_run_id"] is None:
        # Create root run
        root_tree = RunTree(
            id=run["id"],
            name=run["name"],
            run_type=run["run_type"],
            inputs=run["inputs"],
            start_time=run["start_time"],
            extra=run.get("extra"),
            tags=run.get("tags"),
            project_name="your-project",
            client=client,
        )
        tree_map[run["id"]] = root_tree
    else:
        # Create child run
        parent = tree_map.get(run["parent_run_id"])
        if parent:
            child = parent.create_child(
                name=run["name"],
                run_type=run["run_type"],
                run_id=run["id"],
                inputs=run["inputs"],
                start_time=run["start_time"],
                extra=run.get("extra"),
                tags=run.get("tags"),
            )
            tree_map[run["id"]] = child

5. Ending Runs in Reverse Order

End child runs before parent runs:
# Process in reverse to end children first
for run in reversed(trace_runs):
    tree = tree_map.get(run["id"])
    if tree:
        tree.end(
            outputs=run.get("outputs"),
            error=run.get("error"),
            end_time=run["end_time"]
        )

6. Uploading and Flushing

Post the complete trace tree and flush at the end:
if root_tree:
    root_tree.post(exclude_child_runs=False)  # Include all children

# After all traces uploaded
client.flush()  # Critical: ensures all data is sent
Always call client.flush() before your script exits. Otherwise, traces may be lost due to background operations not completing.

Usage Example

# Upload synthetic traces to a specific project
python upload_traces.py --input synthetic_traces.json --project prod-agent-v2

# Output:
# Loaded 2000 runs from synthetic_traces.json
# Shifting timestamps by: 2 days, 3:24:15.123456
# Uploading 1000 traces to project 'prod-agent-v2'...
#   Uploaded 10/1000 traces
#   Uploaded 20/1000 traces
#   ...
# Flushing...
# Done!

Production Considerations

Rate Limiting

Implement exponential backoff for API rate limits:
import time
from requests.exceptions import HTTPError

def upload_with_retry(root_tree, max_retries=3):
    for attempt in range(max_retries):
        try:
            root_tree.post(exclude_child_runs=False)
            return
        except HTTPError as e:
            if e.response.status_code == 429:  # Rate limit
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Rate limited. Retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise
    raise Exception(f"Failed to upload after {max_retries} attempts")

Batch Processing

For large datasets, process in batches:
BATCH_SIZE = 100

for i in range(0, len(traces), BATCH_SIZE):
    batch = list(traces.values())[i:i + BATCH_SIZE]
    upload_batch(batch, client)
    client.flush()  # Flush after each batch
    time.sleep(1)  # Rate limiting

Error Recovery

Log failed traces for retry:
import logging

failed_traces = []

for trace_runs in traces.values():
    try:
        upload_trace(trace_runs, client)
    except Exception as e:
        logging.error(f"Failed to upload trace: {e}")
        failed_traces.append(trace_runs)

# Write failed traces to disk for manual inspection
if failed_traces:
    with open("failed_traces.json", "w") as f:
        json.dump(failed_traces, f)

Performance Optimization

Parallel Upload

Use thread pools for concurrent uploads:
from concurrent.futures import ThreadPoolExecutor, as_completed

def upload_trace(trace_runs, project_name):
    client = Client()  # Thread-local client
    # ... upload logic ...
    client.flush()

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [
        executor.submit(upload_trace, trace_runs, args.project)
        for trace_runs in traces.values()
    ]
    
    for future in as_completed(futures):
        try:
            future.result()
        except Exception as e:
            logging.error(f"Upload failed: {e}")
Be cautious with parallelization. Too many concurrent uploads can trigger rate limits or exhaust connection pools.

Next Steps

Online Evaluation

Set up continuous evaluation on uploaded traces

Production Overview

Learn more about production deployment strategies

Build docs developers (and LLMs) love