Skip to main content

Kubeflow Pipelines

Kubeflow Pipelines (KFP) is a platform for building and deploying portable, scalable ML workflows based on Docker containers. It provides native support for ML artifacts, experiment tracking, and component reusability.

Why Kubeflow Pipelines?

Kubeflow Pipelines offers several advantages for ML workflows:
  • Component-based architecture: Reusable, containerized pipeline components
  • Native artifact tracking: Input/Output artifacts with lineage tracking
  • Pipeline versioning: Track and compare different pipeline versions
  • Kubernetes-native: Built for cloud-native ML workloads
  • Experiment management: Organize runs into experiments

Installation & Setup

1

Deploy Kubeflow Pipelines

Install Kubeflow Pipelines on your Kubernetes cluster:
export PIPELINE_VERSION=2.2.0
export WANDB_PROJECT=your-project
export WANDB_API_KEY=your-key

# Install cluster-scoped resources
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"

# Wait for CRDs to be established
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io

# Install pipeline components
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION"
2

Access UI and Storage

Forward ports to access the UI and MinIO storage:
# MinIO object storage (artifact store)
kubectl port-forward --address=0.0.0.0 svc/minio-service 9000:9000 -n kubeflow

# Kubeflow Pipelines UI
kubectl port-forward --address=0.0.0.0 svc/ml-pipeline-ui 3000:80 -n kubeflow
Access the UI at http://0.0.0.0:3000
3

Install Python SDK

Install the Kubeflow Pipelines SDK:
pip install kfp==2.2.0

Training Pipeline

The training pipeline uses Kubeflow’s component-based architecture with typed inputs and outputs.

Pipeline Components

from kfp import dsl
from kfp.dsl import Dataset, Output

IMAGE = "ghcr.io/kyryl-opens-ml/classic-example:main"

@dsl.component(base_image=IMAGE)
def load_data(
    train_data: Output[Dataset], 
    val_data: Output[Dataset], 
    test_data: Output[Dataset]
):
    import shutil
    from pathlib import Path
    from classic_example.data import load_sst2_data

    # Load SST-2 sentiment dataset
    load_sst2_data(Path("/app/data"))

    # Move to KFP output artifacts
    shutil.move(Path("/app/data") / "train.csv", train_data.path)
    shutil.move(Path("/app/data") / "val.csv", val_data.path)
    shutil.move(Path("/app/data") / "test.csv", test_data.path)
Key Features:
  • Output[Dataset]: Declares typed output artifacts
  • Kubeflow automatically tracks artifact lineage
  • Artifacts stored in MinIO and passed between components

Pipeline Definition

kubeflow_pipelines/kfp_training_pipeline.py
import kfp
from kfp import dsl

@dsl.pipeline
def training_pipeline():
    # Step 1: Load datasets
    load_data_task = load_data()

    # Step 2: Train model with loaded data
    train_model_task = train_model(
        train_data=load_data_task.outputs["train_data"],
        val_data=load_data_task.outputs["val_data"],
        test_data=load_data_task.outputs["test_data"],
    )
    # Set environment variables for W&B tracking
    train_model_task = train_model_task.set_env_variable(
        name="WANDB_PROJECT", value=WANDB_PROJECT
    )
    train_model_task = train_model_task.set_env_variable(
        name="WANDB_API_KEY", value=WANDB_API_KEY
    )

    # Step 3: Upload model artifacts to registry
    upload_model_task = upload_model(
        config=train_model_task.outputs["config"],
        model=train_model_task.outputs["model"],
        tokenizer=train_model_task.outputs["tokenizer"],
        tokenizer_config=train_model_task.outputs["tokenizer_config"],
        model_card=train_model_task.outputs["model_card"],
        special_tokens_map=train_model_task.outputs["special_tokens_map"],
    )
    upload_model_task = upload_model_task.set_env_variable(
        name="WANDB_PROJECT", value=WANDB_PROJECT
    )
    upload_model_task = upload_model_task.set_env_variable(
        name="WANDB_API_KEY", value=WANDB_API_KEY
    )

Compiling and Deploying

def compile_pipeline() -> str:
    path = "/tmp/training_pipeline.yaml"
    kfp.compiler.Compiler().compile(training_pipeline, path)
    return path

def create_pipeline(client: kfp.Client, namespace: str):
    print("Creating experiment")
    client.create_experiment("training", namespace=namespace)

    print("Uploading pipeline")
    name = "classic-example-training"
    if client.get_pipeline_id(name) is not None:
        # Upload new version if pipeline exists
        pipeline_prev_version = client.get_pipeline(client.get_pipeline_id(name))
        version_name = f"{name}-{uuid.uuid4()}"
        pipeline = client.upload_pipeline_version(
            pipeline_package_path=compile_pipeline(),
            pipeline_version_name=version_name,
            pipeline_id=pipeline_prev_version.pipeline_id,
        )
    else:
        # Create new pipeline
        pipeline = client.upload_pipeline(
            pipeline_package_path=compile_pipeline(), 
            pipeline_name=name
        )
    print(f"pipeline {pipeline.pipeline_id}")

Inference Pipeline

The inference pipeline loads a trained model and runs predictions.

Pipeline Components

@dsl.component(base_image=IMAGE)
def load_model(
    config: Output[Artifact],
    model: Output[Model],
    tokenizer: Output[Artifact],
    tokenizer_config: Output[Artifact],
    model_card: Output[Artifact],
    special_tokens_map: Output[Artifact],
):
    import shutil
    from pathlib import Path
    from classic_example.utils import load_from_registry

    model_path = Path("/tmp/model")
    model_path.mkdir(exist_ok=True)
    
    # Download from W&B registry
    load_from_registry(model_name="kfp-pipeline:latest", model_path=model_path)

    # Export as KFP artifacts
    shutil.move(model_path / "config.json", config.path)
    shutil.move(model_path / "model.safetensors", model.path)
    shutil.move(model_path / "tokenizer.json", tokenizer.path)
    shutil.move(model_path / "tokenizer_config.json", tokenizer_config.path)
    shutil.move(model_path / "special_tokens_map.json", model_card.path)
    shutil.move(model_path / "README.md", special_tokens_map.path)

Pipeline Definition

kubeflow_pipelines/kfp_inference_pipeline.py
@dsl.pipeline
def inference_pipeline():
    # Load test data
    load_data_task = load_data()

    # Load trained model from registry
    load_model_task = load_model()
    load_model_task = load_model_task.set_env_variable(
        name="WANDB_PROJECT", value=WANDB_PROJECT
    )
    load_model_task = load_model_task.set_env_variable(
        name="WANDB_API_KEY", value=WANDB_API_KEY
    )

    # Run inference (depends on both data and model)
    run_inference(
        config=load_model_task.outputs["config"],
        model=load_model_task.outputs["model"],
        tokenizer=load_model_task.outputs["tokenizer"],
        tokenizer_config=load_model_task.outputs["tokenizer_config"],
        model_card=load_model_task.outputs["model_card"],
        special_tokens_map=load_model_task.outputs["special_tokens_map"],
        test_data=load_data_task.outputs["test_data"],
    )

Running Pipelines

1

Deploy Training Pipeline

python ./kubeflow_pipelines/kfp_training_pipeline.py http://0.0.0.0:3000
This compiles and uploads the training pipeline to Kubeflow.
2

Deploy Inference Pipeline

python ./kubeflow_pipelines/kfp_inference_pipeline.py http://0.0.0.0:3000
3

Trigger Runs via UI

  1. Navigate to http://0.0.0.0:3000
  2. Go to Pipelines → Select your pipeline
  3. Click Create run
  4. Configure run parameters (if any)
  5. Click Start
4

Monitor Execution

  • View Graph for component dependencies
  • Click components to see logs and artifacts
  • Check Input/Output tab for artifact lineage

Artifact Management

Kubeflow Pipelines v2 supports typed artifacts:
TypeDescriptionUse Case
DatasetTabular or structured dataCSVs, DataFrames
ModelML model artifactsTrained models
ArtifactGeneric filesConfigs, logs, metadata
MetricsEvaluation metricsAccuracy, loss
from kfp.dsl import Dataset, Model, Artifact, Input, Output

@dsl.component
def my_component(
    input_data: Input[Dataset],
    output_model: Output[Model],
    config: Output[Artifact]
):
    # Access via .path attribute
    df = pd.read_csv(input_data.path)
    # Save to output path
    model.save(output_model.path)
Kubeflow automatically tracks:
  • Which component produced each artifact
  • Which components consumed the artifact
  • Artifact versions across pipeline runs
  • Storage location in MinIO
View lineage in the UI under Artifacts tab.
Download artifacts from MinIO:
# Install MinIO client
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc

# Configure
./mc alias set kubeflow http://localhost:9000 minio minio123

# List artifacts
./mc ls kubeflow/mlpipeline/

# Download artifact
./mc cp kubeflow/mlpipeline/artifacts/... ./local-path/

Best Practices

Component Design

  • Keep components focused and single-purpose
  • Use typed inputs/outputs for clarity
  • Document component parameters
  • Make components reusable across pipelines

Pipeline Versioning

  • Upload new versions instead of overwriting
  • Tag pipeline versions semantically
  • Test pipelines in separate experiments
  • Document breaking changes between versions

Resource Management

  • Set resource limits on components
  • Use node selectors for GPU workloads
  • Enable autoscaling for variable workloads
  • Monitor MinIO storage usage

Artifact Storage

  • Use appropriate artifact types
  • Compress large artifacts (models, datasets)
  • Clean up old experiments periodically
  • Back up MinIO for production

Troubleshooting

If pipeline upload fails:
# Check KFP API server
kubectl get pods -n kubeflow | grep ml-pipeline

# Check logs
kubectl logs -n kubeflow deployment/ml-pipeline

# Verify port-forward is active
curl http://localhost:3000/apis/v2beta1/pipelines
Debug component failures:
  1. Click failed component in UI
  2. View Logs tab for error messages
  3. Check Input/Output for artifact issues
  4. Verify base image has required dependencies
  5. Test component locally:
    # Run component as standalone function
    load_data_task = load_data()
    
If artifacts aren’t passed between components:
  • Verify component output names match pipeline inputs
  • Check MinIO is running: kubectl get pods -n kubeflow | grep minio
  • Ensure components write to .path attribute
  • Verify network policies allow pod communication

Additional Resources

Next Steps

Explore Dagster

Learn asset-centric orchestration with built-in data quality checks

Build docs developers (and LLMs) love