Skip to main content
This example demonstrates how to build a RAG (Retrieval-Augmented Generation) pipeline with DSPy and instrument it with OpenInference tracing.

Prerequisites

  • Python 3.9+
  • OpenAI API key
  • Phoenix or another OpenTelemetry collector

Installation

1

Install dependencies

pip install dspy-ai \
  openinference-instrumentation-dspy \
  opentelemetry-sdk \
  opentelemetry-exporter-otlp \
  python-dotenv
2

Set environment variables

export OPENAI_API_KEY="your-api-key"
export COLLECTOR_ENDPOINT="http://localhost:6006/v1/traces"

Instrumentation Setup

Create an instrumentation module:
import os

from dotenv import load_dotenv
from openinference.instrumentation.dspy import DSPyInstrumentor
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import SimpleSpanProcessor

load_dotenv()

collector_endpoint = os.getenv("COLLECTOR_ENDPOINT", "http://localhost:6006/v1/traces")


def instrument():
    resource = Resource(attributes={})
    tracer_provider = trace_sdk.TracerProvider(resource=resource)
    span_exporter = OTLPSpanExporter(endpoint=collector_endpoint)
    span_processor = SimpleSpanProcessor(span_exporter=span_exporter)
    tracer_provider.add_span_processor(span_processor=span_processor)
    trace_api.set_tracer_provider(tracer_provider=tracer_provider)
    DSPyInstrumentor().instrument()

Basic DSPy RAG Module

import dspy
from instrument import instrument

# Initialize instrumentation
instrument()

# Configure DSPy with OpenAI
lm = dspy.OpenAI(model="gpt-3.5-turbo", max_tokens=300)
dspy.settings.configure(lm=lm)

# Define a simple RAG signature
class GenerateAnswer(dspy.Signature):
    """Answer questions with short factoid answers."""
    
    context = dspy.InputField(desc="may contain relevant facts")
    question = dspy.InputField()
    answer = dspy.OutputField(desc="often between 1 and 5 words")

# Create a RAG module
class RAG(dspy.Module):
    def __init__(self, num_passages=3):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=num_passages)
        self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
    
    def forward(self, question):
        context = self.retrieve(question).passages
        prediction = self.generate_answer(context=context, question=question)
        return dspy.Prediction(context=context, answer=prediction.answer)

# Use the RAG module
rag = RAG()
response = rag(question="What is the capital of France?")
print(f"Answer: {response.answer}")
print(f"Context: {response.context}")

Complete FastAPI Example

Here’s a production-ready FastAPI application with DSPy:
import os
from dotenv import load_dotenv
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import dspy

from instrument import instrument

load_dotenv()

# Initialize instrumentation
do_not_instrument = os.getenv("INSTRUMENT_DSPY", "true") == "false"
if not do_not_instrument:
    instrument()

# Configure DSPy
lm = dspy.OpenAI(model="gpt-3.5-turbo", max_tokens=300)
rm = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")
dspy.settings.configure(lm=lm, rm=rm)

app = FastAPI(title="DSPy x FastAPI")

environment = os.getenv("ENVIRONMENT", "dev")

if environment == "dev":
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

# Define request/response models
class MessageData(BaseModel):
    question: str
    num_passages: int = 3

class RAGResponse(BaseModel):
    answer: str
    context: list[str]

# RAG signature and module
class GenerateAnswer(dspy.Signature):
    """Answer questions with short factoid answers."""
    context = dspy.InputField(desc="may contain relevant facts")
    question = dspy.InputField()
    answer = dspy.OutputField(desc="often between 1 and 5 words")

class RAG(dspy.Module):
    def __init__(self, num_passages=3):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=num_passages)
        self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
    
    def forward(self, question):
        context = self.retrieve(question).passages
        prediction = self.generate_answer(context=context, question=question)
        return dspy.Prediction(context=context, answer=prediction.answer)

# Initialize module
rag_module = None

@app.on_event("startup")
async def startup_event():
    global rag_module
    rag_module = RAG()

@app.post("/api/rag/query", response_model=RAGResponse)
async def query(payload: MessageData):
    response = rag_module(question=payload.question)
    return RAGResponse(answer=response.answer, context=response.context)

@app.get("/api/rag/healthcheck")
async def healthcheck():
    return {"message": "All systems go."}

if __name__ == "__main__":
    uvicorn.run(app="main:app", host="0.0.0.0", port=8000, reload=True)

Optimizing with DSPy Compiler

DSPy’s key feature is automatic optimization:
import dspy
from dspy.teleprompt import BootstrapFewShot

# Define training examples
trainset = [
    dspy.Example(question="What is the capital of France?", answer="Paris").with_inputs("question"),
    dspy.Example(question="Who wrote Romeo and Juliet?", answer="Shakespeare").with_inputs("question"),
    dspy.Example(question="What is the largest planet?", answer="Jupiter").with_inputs("question"),
]

# Define validation metric
def validate_answer(example, pred, trace=None):
    answer_match = example.answer.lower() in pred.answer.lower()
    return answer_match

# Compile the RAG module
compiler = BootstrapFewShot(metric=validate_answer, max_bootstrapped_demos=2)
compiled_rag = compiler.compile(RAG(), trainset=trainset)

# Use the optimized module
response = compiled_rag(question="What is the capital of Germany?")
print(response.answer)

Key Features

Automatic Module Tracing

DSPy instrumentation captures:
  • Module execution: All DSPy module forwards
  • LM calls: Language model predictions with prompts
  • Retrieval: Document retrieval operations
  • Optimization: Compiler operations and few-shot selection

Signature Tracking

The instrumentation records:
  • Input and output fields
  • Field descriptions and constraints
  • Type annotations

Compilation Observability

When using DSPy optimizers:
  • Bootstrap demonstration selection
  • Metric evaluations
  • Prompt evolution

Next Steps

Build docs developers (and LLMs) love