Skip to main content

Practice

This module includes practical assignments to help you master monitoring and observability for ML systems in production.

Overview

You’ll complete three pull requests (PRs) integrating monitoring tools into your application, plus document your monitoring strategy.
These assignments build on previous modules. You should have a working ML application deployed in a pipeline (Airflow, Kubeflow, or Dagster) from Module 4.

Homework 13: Monitoring and Observability

Key Tasks

1

Integrate SigNoz

Instrument your application with SigNoz for distributed tracing and observability
2

Create Grafana Dashboard

Build a custom dashboard showing key metrics for your application
3

Add Drift Detection

Implement drift detection logic in your ML pipeline
4

Document Monitoring Plan

Write a comprehensive monitoring strategy in your Google Doc

PR1: SigNoz Integration

Objective: Add SigNoz monitoring to your application Requirements:
  1. Install SigNoz on your Kubernetes cluster using Helm
  2. Instrument your application with OpenTelemetry or OpenLLMetry
  3. Configure tracing to send data to SigNoz
  4. Verify traces are appearing in the SigNoz UI
  5. Add custom spans to track important operations
Implementation Guide:
# app.py
import os
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task

# Configure SigNoz endpoint
os.environ["TRACELOOP_BASE_URL"] = "http://localhost:4318"

# Initialize tracing
Traceloop.init(app_name="my-ml-app")

@workflow(name="prediction_pipeline")
def run_prediction(data):
    """Main prediction workflow."""
    preprocessed = preprocess_data(data)
    prediction = model_inference(preprocessed)
    result = postprocess_result(prediction)
    return result

@task(name="preprocess")
def preprocess_data(data):
    # Your preprocessing logic
    return processed_data

@task(name="inference")
def model_inference(data):
    # Model prediction
    return prediction

@task(name="postprocess")
def postprocess_result(prediction):
    # Format result
    return result
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-app
spec:
  template:
    spec:
      containers:
      - name: app
        image: my-ml-app:latest
        env:
        - name: TRACELOOP_BASE_URL
          value: "http://my-release-signoz-otel-collector.default:4318"
        - name: OTEL_SERVICE_NAME
          value: "ml-app-production"
# main.py
from fastapi import FastAPI
from traceloop.sdk import Traceloop
import os

os.environ["TRACELOOP_BASE_URL"] = "http://localhost:4318"
Traceloop.init(app_name="fastapi-ml-service")

app = FastAPI()

@app.post("/predict")
async def predict(request: PredictionRequest):
    # Automatically traced
    result = model.predict(request.data)
    return {"prediction": result}
Acceptance Criteria:
  • SigNoz is installed and running in your cluster
  • Application sends traces to SigNoz
  • At least 3 custom spans are instrumented
  • Traces are visible in SigNoz UI
  • Screenshots included in PR description

PR2: Grafana Dashboard

Objective: Create a Grafana dashboard for your application Requirements:
  1. Install Prometheus and Grafana using kube-prometheus-stack
  2. Expose metrics from your application (e.g., using prometheus_client)
  3. Create a ServiceMonitor to scrape your metrics
  4. Build a custom dashboard with at least 5 panels
  5. Export and commit dashboard JSON
Implementation Guide:
# metrics.py
from prometheus_client import Counter, Histogram, start_http_server

# Define metrics
prediction_count = Counter(
    'ml_predictions_total',
    'Total number of predictions',
    ['model', 'status']
)

prediction_duration = Histogram(
    'ml_prediction_duration_seconds',
    'Time spent on prediction',
    ['model']
)

input_feature_values = Histogram(
    'ml_input_feature_value',
    'Distribution of input feature values',
    ['feature_name']
)

# Start metrics server
start_http_server(8000)

# Use in your code
def predict(data):
    with prediction_duration.labels(model='v1').time():
        try:
            result = model.predict(data)
            prediction_count.labels(model='v1', status='success').inc()
            return result
        except Exception as e:
            prediction_count.labels(model='v1', status='error').inc()
            raise
# servicemonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: ml-app-metrics
  labels:
    release: monitoring  # Must match Prometheus selector
spec:
  selector:
    matchLabels:
      app: ml-app
  endpoints:
  - port: metrics
    interval: 30s
    path: /metrics
Your dashboard should include:
  1. Request Rate: rate(ml_predictions_total[5m])
  2. Error Rate: sum(rate(ml_predictions_total{status="error"}[5m])) / sum(rate(ml_predictions_total[5m])) * 100
  3. Latency Percentiles: histogram_quantile(0.95, rate(ml_prediction_duration_seconds_bucket[5m]))
  4. Model Distribution: Pie chart of predictions by model
  5. Resource Usage: CPU and memory from Kubernetes metrics
Acceptance Criteria:
  • Prometheus and Grafana are installed
  • Application exposes metrics on /metrics endpoint
  • ServiceMonitor successfully scrapes metrics
  • Dashboard has at least 5 meaningful panels
  • Dashboard JSON is committed to repository
  • Screenshots of dashboard included in PR

PR3: Drift Detection

Objective: Add drift detection to your ML pipeline Requirements:
  1. Choose a drift detection method:
    • Evidently for Python-based detection
    • Seldon Core with Alibi Detect for Kubernetes-native solution
  2. Implement drift checking in your pipeline (Airflow/Kubeflow/Dagster)
  3. Define reference data (baseline distribution)
  4. Configure alerts when drift is detected
  5. Log drift metrics to monitoring system
Implementation Guide:
# airflow_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from evidently.test_suite import TestSuite
from evidently.tests import TestColumnDrift
import pandas as pd
from datetime import datetime

def check_drift(**context):
    """Check for data drift."""
    # Load reference data
    reference_df = pd.read_parquet("s3://bucket/reference_data.parquet")
    
    # Load recent production data
    current_df = pd.read_parquet("s3://bucket/production_data_latest.parquet")
    
    # Define tests
    tests = TestSuite(tests=[
        TestColumnDrift(column_name="age"),
        TestColumnDrift(column_name="income"),
        TestColumnDrift(column_name="education"),
    ])
    
    # Run tests
    tests.run(reference_data=reference_df, current_data=current_df)
    
    # Check results
    results = tests.as_dict()
    if not results["summary"]["all_passed"]:
        # Send alert
        send_slack_alert("Drift detected in production data!")
        
        # Save report
        tests.save_html(f"drift_report_{datetime.now().isoformat()}.html")
        
        raise AirflowException("Drift detected - manual review required")

with DAG('ml_pipeline_with_drift', schedule_interval='@daily') as dag:
    drift_check = PythonOperator(
        task_id='check_drift',
        python_callable=check_drift
    )
    
    retrain = PythonOperator(
        task_id='retrain_model',
        python_callable=retrain_model
    )
    
    drift_check >> retrain
# pipeline.yaml
apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
  name: ml-pipeline-with-drift
spec:
  steps:
    - name: preprocessor
    - name: model
      inputs:
      - preprocessor
    - name: drift-detector
      inputs:
      - preprocessor
      batch:
        size: 100  # Check drift every 100 requests
  output:
    steps:
    - model
    - drift-detector.outputs.drift_score
---
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: drift-detector
spec:
  storageUri: "gs://my-bucket/drift-detector"
  requirements:
    - mlserver
    - alibi-detect
# drift_component.py
from kfp import dsl
from kfp.dsl import component, Input, Output, Dataset, Metrics

@component(
    base_image="python:3.10",
    packages_to_install=["evidently", "pandas"]
)
def check_drift(
    reference_data: Input[Dataset],
    current_data: Input[Dataset],
    metrics: Output[Metrics]
):
    """Check for drift between datasets."""
    import pandas as pd
    from evidently.report import Report
    from evidently.metric_preset import DataDriftPreset
    
    ref_df = pd.read_parquet(reference_data.path)
    curr_df = pd.read_parquet(current_data.path)
    
    report = Report(metrics=[DataDriftPreset()])
    report.run(reference_data=ref_df, current_data=curr_df)
    
    result = report.as_dict()
    drift_score = result["metrics"][0]["result"]["drift_score"]
    
    metrics.log_metric("drift_score", drift_score)
    
    if drift_score > 0.1:
        raise ValueError(f"Drift detected: {drift_score}")

# Use in pipeline
@dsl.pipeline(name="ML Pipeline with Drift Check")
def ml_pipeline():
    drift_task = check_drift(
        reference_data=load_reference_data(),
        current_data=load_production_data()
    )
    
    with dsl.Condition(drift_task.outputs["drift_score"] > 0.1):
        retrain_model()
Acceptance Criteria:
  • Drift detection is integrated into pipeline
  • Reference data is defined and stored
  • Drift checks run automatically on schedule or trigger
  • Alerts are sent when drift exceeds threshold
  • Drift metrics are logged/visualized
  • Documentation explains how to update reference data

Monitoring Plan Document

Objective: Design and document your monitoring strategy Requirements: Update your Google Doc with sections covering:
Document your approach to infrastructure monitoring:
  • Metrics tracked: Latency, throughput, error rates, resource usage
  • Dashboards: Link to Grafana dashboards
  • Alerts: List alert conditions and thresholds
  • On-call procedures: How to respond to incidents
Document your approach to model monitoring:
  • Input monitoring: Feature distributions, data quality checks
  • Output monitoring: Prediction distributions, confidence scores
  • Performance monitoring: Metrics when ground truth is available
  • Drift detection: Methods used, thresholds, remediation actions
Explain how you collect labels for validation:
  • Collection method: User feedback, manual labeling, delayed outcomes, etc.
  • Frequency: How often labels are collected
  • Coverage: Percentage of predictions that get labeled
  • Labeling process: Tools and workflows used
List all alerts with details:
Alert NameConditionSeverityAction
High Error Rateerror_rate > 5%CriticalPage on-call
Drift Detecteddrift_score > 0.1WarningInvestigate data
High Latencyp95_latency > 2sWarningCheck resources
Low Accuracyaccuracy < 0.85CriticalTrigger retraining
Document your incident response process:
  1. Detection: How are issues discovered?
  2. Triage: Who investigates and assigns priority?
  3. Diagnosis: What tools and data are used?
  4. Remediation: Common fixes and rollback procedures
  5. Post-mortem: How are incidents documented and learnings shared?
Acceptance Criteria:
  • Document includes all required sections
  • Monitoring plan is specific to your application
  • Alert thresholds are justified based on requirements
  • Ground truth collection strategy is clearly defined
  • Incident response procedures are documented

Evaluation Criteria

Your work will be evaluated based on:
  1. Completeness: All 3 PRs are merged
  2. Functionality: Monitoring tools work as intended
  3. Code quality: Clean, well-documented code
  4. Documentation: Monitoring plan is thorough and clear
  5. Integration: Monitoring is integrated into existing pipeline

Homework 14: Tools, LLMs, and Data Moat

Key Tasks

1

Use Managed Monitoring

Integrate a managed model monitoring tool (e.g., Arize, Evidently Cloud)
2

LLM Monitoring

Add LLM-specific monitoring with LangSmith or similar tools
3

Close the Loop

Create a dataset for labeling from your monitoring solution
4

Data Moat Strategy

Document how you’ll use production data to build future models

PR1: Managed Model Monitoring

Objective: Use a managed monitoring service Options: Requirements:
  1. Sign up for a managed monitoring service
  2. Integrate your application to send data
  3. Configure dashboards and alerts
  4. Document the integration process

PR2: LLM Monitoring

Objective: Add specialized monitoring for LLM applications Requirements:
  1. Integrate LangSmith, AgentOps, or similar tool
  2. Track token usage and costs
  3. Monitor prompt-response pairs
  4. Set up alerts for high costs or latency

PR3: Close the Loop

Objective: Create a labeling pipeline from production data Requirements:
  1. Sample production predictions for labeling
  2. Create a dataset in a labeling tool (e.g., Argilla, Label Studio)
  3. Document the labeling workflow
  4. Show how labeled data feeds back into training

Data Moat Strategy Document

Objective: Plan how to build a competitive advantage with data Requirements: Document in your Google Doc:
  • Data collection: What production data do you collect?
  • Data enrichment: How do you improve data quality over time?
  • Feedback loops: How do users provide corrections/labels?
  • Model improvement: How is new data used for retraining?
  • Competitive advantage: Why is your data unique and valuable?

Reading Materials

Review these resources to deepen your understanding:

Homework 13 Readings

Homework 14 Readings

Tips for Success

Start Early

Monitoring setup can be time-consuming. Begin with simple instrumentation and iterate.

Use Examples

Reference the module examples (sql_app.py, reviewer.py) when implementing observability.

Test Thoroughly

Verify traces and metrics are appearing before submitting PRs.

Document Well

Clear documentation helps reviewers understand your approach and makes maintenance easier.

Getting Help

If you’re stuck:
  1. Review the module documentation and examples
  2. Check tool documentation (SigNoz, Grafana, Evidently, Seldon)
  3. Ask in the Discord community
  4. Look at past student submissions (if available)

Next Steps

Module 8: Cloud Platforms

Learn about deploying ML systems on cloud platforms and buy vs. build decisions

Build docs developers (and LLMs) love