The langsmith.utils module provides utility functions and classes for working with the LangSmith SDK.
UUID generation
uuid7
Generate a UUID version 7 (time-ordered).
from langsmith import uuid7
# Generate a new UUID7
trace_id = uuid7()
print(trace_id) # UUID('01234567-89ab-cdef-0123-456789abcdef')
UUID7 is time-ordered, making it suitable for distributed systems and database indexing. Used internally by LangSmith for trace and run IDs.
uuid7_from_datetime
Generate a UUID7 from a specific datetime.
from langsmith import uuid7_from_datetime
from datetime import datetime, timezone
# Generate UUID7 for a specific timestamp
dt = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
trace_id = uuid7_from_datetime(dt)
Useful for backfilling traces or testing with deterministic IDs.
ContextThreadPoolExecutor
Thread pool executor that preserves context variables across threads.
from langsmith.utils import ContextThreadPoolExecutor
from langsmith import tracing_context, traceable
@traceable
def process_item(item):
# This function is traced even when called from thread pool
return item * 2
with tracing_context(project_name="parallel-processing"):
with ContextThreadPoolExecutor(max_workers=4) as executor:
items = [1, 2, 3, 4, 5]
results = list(executor.map(process_item, items))
# Each thread inherits the tracing context
Unlike concurrent.futures.ThreadPoolExecutor, this preserves contextvars (including LangSmith tracing context) when executing functions in worker threads.
Constructor
Maximum number of worker threads. Defaults to (cpu_count() or 1) * 5.
Prefix for worker thread names. Default is "".
Function to run in each worker thread on startup.
Arguments for the initializer function.
Methods
Inherits all methods from concurrent.futures.ThreadPoolExecutor:
executor = ContextThreadPoolExecutor(max_workers=4)
# Submit a single task
future = executor.submit(my_function, arg1, arg2)
result = future.result()
# Map over iterables
results = executor.map(my_function, items)
# Shutdown
executor.shutdown(wait=True)
Environment variable utilities
get_env_var
Safely get environment variables with defaults.
from langsmith.utils import get_env_var
api_key = get_env_var("LANGSMITH_API_KEY", default=None)
project = get_env_var("LANGCHAIN_PROJECT", default="default")
Internal utility used by the SDK for configuration.
tracing_is_enabled
Check if tracing is currently enabled based on context and environment.
from langsmith.utils import tracing_is_enabled
if tracing_is_enabled():
print("Tracing is active")
else:
print("Tracing is disabled")
True: Tracing is enabled and uploading to LangSmith
False: Tracing is disabled
"local": Tracing is enabled but only storing locally
Checks in order:
- Current context value from
tracing_context()
- Active run tree
- Global configuration from
configure()
- Environment variables (
LANGCHAIN_TRACING_V2, LANGCHAIN_TRACING)
Error classes
Custom exception types for LangSmith operations.
LangSmithError
Base exception for all LangSmith errors.
from langsmith.utils import LangSmithError
try:
client.create_run(...)
except LangSmithError as e:
print(f"LangSmith error: {e}")
LangSmithAPIError
Raised when the LangSmith API returns a 500 error.
from langsmith.utils import LangSmithAPIError
try:
client.read_dataset("non-existent")
except LangSmithAPIError as e:
print(f"API error: {e}")
LangSmithAuthError
Raised on authentication failures (401 errors).
from langsmith.utils import LangSmithAuthError
try:
client = Client(api_key="invalid-key")
client.list_datasets()
except LangSmithAuthError:
print("Invalid API key")
LangSmithNotFoundError
Raised when a resource is not found (404 errors).
from langsmith.utils import LangSmithNotFoundError
try:
dataset = client.read_dataset(dataset_name="does-not-exist")
except LangSmithNotFoundError:
print("Dataset not found")
LangSmithRateLimitError
Raised when rate limits are exceeded (429 errors).
from langsmith.utils import LangSmithRateLimitError
import time
while True:
try:
client.create_run(...)
break
except LangSmithRateLimitError:
print("Rate limited, retrying...")
time.sleep(1)
LangSmithConflictError
Raised when a resource already exists (409 errors).
from langsmith.utils import LangSmithConflictError
try:
client.create_dataset(dataset_name="existing-dataset")
except LangSmithConflictError:
print("Dataset already exists")
LangSmithConnectionError
Raised on network connection failures.
from langsmith.utils import LangSmithConnectionError
try:
client.list_datasets()
except LangSmithConnectionError:
print("Could not connect to LangSmith API")
Prompt identifier utilities
get_invalid_prompt_identifier_msg
Get a formatted error message for invalid prompt identifiers.
from langsmith.utils import get_invalid_prompt_identifier_msg
try:
parse_identifier("invalid@format")
except ValueError:
print(get_invalid_prompt_identifier_msg("invalid@format"))
# Invalid prompt identifier format: "invalid@format".
# Expected one of:
# - "prompt-name" (for private prompts)
# - "owner/prompt-name" (for prompts with explicit owner)
# - "prompt-name:commit-hash" (with commit reference)
# - "owner/prompt-name:commit-hash" (with owner and commit)
Used internally for consistent error messages.
Warning classes
LangSmithWarning
Base warning class for LangSmith.
import warnings
from langsmith.utils import LangSmithWarning
warnings.filterwarnings("ignore", category=LangSmithWarning)
LangSmithMissingAPIKeyWarning
Warning when API key is missing.
from langsmith.utils import LangSmithMissingAPIKeyWarning
import warnings
warnings.filterwarnings("ignore", category=LangSmithMissingAPIKeyWarning)
Example: Error handling
from langsmith import Client
from langsmith.utils import (
LangSmithAuthError,
LangSmithNotFoundError,
LangSmithRateLimitError,
LangSmithConnectionError,
)
import time
client = Client()
def safe_read_dataset(name: str, retries: int = 3):
"""Read dataset with error handling and retries."""
for attempt in range(retries):
try:
return client.read_dataset(dataset_name=name)
except LangSmithAuthError:
print("Authentication failed. Check your API key.")
raise
except LangSmithNotFoundError:
print(f"Dataset '{name}' not found.")
return None
except LangSmithRateLimitError:
if attempt < retries - 1:
wait_time = 2 ** attempt
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
except LangSmithConnectionError:
if attempt < retries - 1:
print(f"Connection failed. Retrying...")
time.sleep(1)
else:
raise
dataset = safe_read_dataset("my-dataset")
Example: Context-preserving parallelization
from langsmith import traceable, tracing_context
from langsmith.utils import ContextThreadPoolExecutor
@traceable
def process_document(doc_id: str) -> dict:
# Complex processing
return {"doc_id": doc_id, "result": "processed"}
def batch_process(doc_ids: list[str]) -> list[dict]:
"""Process documents in parallel while preserving trace context."""
with tracing_context(
project_name="batch-processing",
tags=["parallel"],
metadata={"batch_size": len(doc_ids)}
):
with ContextThreadPoolExecutor(max_workers=4) as executor:
# Each worker thread inherits the tracing context
# All process_document calls are traced under the same parent
results = list(executor.map(process_document, doc_ids))
return results
# Process 100 documents in parallel, all traced properly
results = batch_process([f"doc_{i}" for i in range(100)])