Skip to main content

Performance Tuning

This guide covers performance optimization strategies for Mimir AIP deployments, including worker scaling, queue management, storage optimization, and resource allocation.

Worker Scaling

Mimir AIP uses Kubernetes Jobs for dynamic worker scaling. The orchestrator monitors queue depth and spawns workers as needed.

Configuration

Set worker pool parameters in helm/mimir-aip/values.yaml:22:
orchestrator:
  minWorkers: 1          # Minimum workers (always running)
  maxWorkers: 10         # Maximum workers (cap)
  queueThreshold: 5      # Spawn worker when queue exceeds this
  workerNamespace: mimir-aip
  workerServiceAccount: mimir-worker

Scaling Logic

The orchestrator implements adaptive scaling:
  1. Queue Monitoring - Checks queue length every polling cycle
  2. Worker Demand - Calculates needed workers: ceil(queueLength / queueThreshold)
  3. Active Workers - Counts currently running Kubernetes Jobs
  4. Spawn Decision - Spawns workers up to maxWorkers limit
// From pkg/queue/queue.go:14
type Queue struct {
    pq             *PriorityQueue
    workTasks      map[string]*models.WorkTask
}

func (q *Queue) QueueLength() (int64, error) {
    q.mu.RLock()
    defer q.mu.RUnlock()
    return int64(q.pq.Len()), nil
}

Priority Queue

Tasks are prioritized by a score based on creation time and priority level:
// From pkg/queue/queue.go:34
func (q *Queue) Enqueue(task *models.WorkTask) error {
    // Lower score = higher priority
    score := float64(time.Now().Unix()) / float64(task.Priority+1)
    
    item := &PriorityQueueItem{
        TaskID:   task.ID,
        Priority: score,
    }
    heap.Push(q.pq, item)
    // ...
}

Worker Resource Allocation

Configure resource requests and limits for workers:
# In Helm values
orchestrator:
  resources:
    requests:
      cpu: "500m"
      memory: "1Gi"
    limits:
      cpu: "1000m"
      memory: "2Gi"
Workers inherit these resources when spawned as Kubernetes Jobs.

Autoscaling Strategy

Horizontal Scaling:
  • Workers scale horizontally based on queue depth
  • Each worker processes one task at a time
  • Scale from minWorkers to maxWorkers dynamically
Vertical Scaling:
  • Adjust per-worker CPU/memory for compute-intensive tasks
  • Monitor resource utilization: kubectl top pods -n mimir-aip
Optimal Configuration:
# For high-throughput pipelines
minWorkers: 5
maxWorkers: 50
queueThreshold: 10

# For ML training workloads
resources:
  requests:
    cpu: "2000m"
    memory: "4Gi"
  limits:
    cpu: "4000m"
    memory: "8Gi"

Queue Management

In-Memory Priority Queue

Mimir uses a heap-based priority queue for task management:
// From pkg/queue/queue.go:240
type PriorityQueueItem struct {
    TaskID   string
    Priority float64 // Lower = higher priority
    index    int
}

type PriorityQueue []*PriorityQueueItem

func (pq PriorityQueue) Less(i, j int) bool {
    return pq[i].Priority < pq[j].Priority
}

Task Retry Logic

Failed tasks are automatically retried with exponential backoff:
// From pkg/queue/queue.go:200
func (q *Queue) RequeueWithRetry(taskID string, reason string) error {
    task, ok := q.workTasks[taskID]
    if !ok {
        return fmt.Errorf("work task not found: %s", taskID)
    }

    if task.RetryCount < task.MaxRetries {
        task.RetryCount++
        task.Status = models.WorkTaskStatusQueued
        
        // Deprioritize retries
        score := float64(time.Now().Unix())/float64(task.Priority+1) + 
                 float64(task.RetryCount)*60
        
        heap.Push(q.pq, &PriorityQueueItem{
            TaskID:   task.ID,
            Priority: score,
        })
        return nil
    }

    // Exhausted retries
    task.Status = models.WorkTaskStatusFailed
    return nil
}

Queue Monitoring

Monitor queue metrics via API:
# Get queue length
curl http://localhost:8080/api/worktasks/queue/length

# List queued tasks
curl http://localhost:8080/api/worktasks?status=queued

# Check high-priority tasks
curl http://localhost:8080/api/worktasks/high-priority?min_priority=5

Queue Optimization

Priority Tuning:
{
  "priority": 10,  // 1-10, higher = more important
  "max_retries": 3
}
Batch Processing:
  • Group similar tasks into single pipeline with batch operations
  • Use array data in CIR for bulk storage operations
Concurrency Control:
// From pkg/queue/queue.go:166
func (q *Queue) CountActiveByType(taskType models.WorkTaskType) (int64, error) {
    var count int64
    for _, task := range q.workTasks {
        if task.Type == taskType &&
            (task.Status == models.WorkTaskStatusSpawned || 
             task.Status == models.WorkTaskStatusExecuting) {
            count++
        }
    }
    return count, nil
}

Pipeline Optimization

Plugin Compilation Caching

Plugins are compiled once and cached:
// From pkg/plugins/client.go:54
func (c *Client) isCached(path, version, commit, updated string) bool {
    if _, err := os.Stat(path); os.IsNotExist(err) {
        return false
    }
    
    metaPath := path + ".meta"
    data, err := os.ReadFile(metaPath)
    if err != nil {
        return false
    }
    
    expected := fmt.Sprintf("%s:%s:%s", version, commit, updated)
    return string(data) == expected
}
Cache Directory: /tmp/plugins on workers Cache Invalidation: Automatic when commit hash changes

Pipeline Context Size Limits

Context size is capped to prevent memory issues:
// From cmd/worker/main.go:114
context := models.NewPipelineContext(10485760) // 10MB max size
Optimize by:
  • Storing large data in external storage (S3, filesystem)
  • Passing references instead of raw data
  • Cleaning up intermediate results

Step Execution Efficiency

Avoid Redundant Lookups:
{
  "steps": [
    {
      "name": "load_once",
      "action": "read_file",
      "output": {"data": "$.result"}
    },
    {
      "name": "transform_1",
      "action": "transform",
      "parameters": {"input": "$.load_once.data"}
    },
    {
      "name": "transform_2",
      "action": "transform",
      "parameters": {"input": "$.load_once.data"}
    }
  ]
}
Minimize Plugin Switching:
  • Group actions from same plugin together
  • Reduces plugin lookup overhead

Storage Performance

Storage Plugin Selection

Use Case Matching:
Use CaseRecommended PluginRationale
High write throughputS3, CassandraDistributed, scalable
Complex queriesPostgreSQL, Neo4jRich query capabilities
Real-time analyticsElasticsearchFast search and aggregation
Low latencyRedis, In-memoryMillisecond response times
Cost-sensitiveFilesystem, MinIONo cloud costs

Connection Pooling

Storage plugins should implement connection pooling:
// Example from pkg/storage/plugins/s3.go:70
func (s *S3Plugin) Initialize(cfg *models.PluginConfig) error {
    awsCfg, err := config.LoadDefaultConfig(context.Background(),
        config.WithRegion(region),
    )
    
    // S3 client uses connection pooling internally
    s.client = s3.NewFromConfig(awsCfg)
    // ...
}

Batch Operations

Store multiple items in single call:
// From pkg/storage/plugins/filesystem.go:99
if arr, err := cir.GetDataAsArray(); err == nil {
    for _, item := range arr {
        if err := f.storeItem(entityDir, itemCIR); err != nil {
            return nil, err
        }
        affectedItems++
    }
}
API Usage:
curl -X POST http://localhost:8080/api/storage/store \
  -d '{
    "storage_id": "store-123",
    "cir_data": {
      "version": "1.0",
      "data": [
        {"id": 1, "name": "Item 1"},
        {"id": 2, "name": "Item 2"},
        {"id": 3, "name": "Item 3"}
      ]
    }
  }'

Query Optimization

Index Creation:
  • Create indexes on frequently queried attributes
  • Use storage-specific index types (B-tree, hash, GiST)
Filter Pushdown:
{
  "entity_type": "users",
  "filters": [
    {"attribute": "status", "operator": "eq", "value": "active"},
    {"attribute": "created_at", "operator": "gt", "value": "2024-01-01"}
  ],
  "limit": 1000
}
Projection:
  • Only retrieve needed attributes (future enhancement)
  • Reduces network transfer and memory usage

Storage Schema Design

Denormalization for Read Performance:
{
  "entities": [
    {
      "name": "orders",
      "attributes": [
        {"name": "order_id", "type": "string"},
        {"name": "customer_name", "type": "string"},  // Denormalized
        {"name": "customer_email", "type": "string"},  // Denormalized
        {"name": "total", "type": "number"}
      ]
    }
  ]
}
Partitioning:
  • Partition large tables by date, region, or other keys
  • Improves query performance and maintenance

Resource Optimization

Orchestrator Tuning

Persistence Volume:
orchestrator:
  persistence:
    enabled: true
    size: 50Gi  # Increase for large metadata stores
    # storageClass: fast-ssd
Memory Allocation:
  • SQLite cache: ~500MB for metadata
  • Queue overhead: ~1MB per 1000 tasks
  • Plugin cache: Negligible (metadata only)
CPU Usage:
  • API endpoints: Lightweight, handle 1000s req/s
  • Worker spawning: Occasional spikes during scale-out

Worker Resource Sizing

Pipeline Execution:
resources:
  requests:
    cpu: "500m"     # Typical pipeline
    memory: "1Gi"
ML Training:
resources:
  requests:
    cpu: "4000m"    # CPU-intensive
    memory: "8Gi"   # Large datasets
  limits:
    cpu: "8000m"
    memory: "16Gi"
ML Inference:
resources:
  requests:
    cpu: "1000m"    # Moderate compute
    memory: "2Gi"

Plugin Cache Sizing

Plugin .so files are cached on each worker node:
# Typical plugin sizes
Builtin plugins:    negligible (built-in)
Custom plugins:     5-50MB per plugin
Storage plugins:    10-100MB per plugin (with dependencies)
Cache Location: /tmp/plugins and /tmp/storage-plugins Cleanup: Automatic on pod restart

Monitoring and Profiling

Metrics to Track

Queue Metrics:
  • Queue length over time
  • Task wait time (queued → executing)
  • Task execution time
  • Retry rate
Worker Metrics:
  • Active worker count
  • Worker spawn rate
  • Worker success/failure rate
  • Resource utilization per worker
Storage Metrics:
  • Operation latency (store, retrieve, update, delete)
  • Throughput (operations/second)
  • Connection pool utilization
  • Error rate

Profiling Tools

Kubernetes Metrics:
# Pod resource usage
kubectl top pods -n mimir-aip

# Node resource usage
kubectl top nodes

# Worker pod logs
kubectl logs -n mimir-aip -l app=mimir-worker --tail=100
Application Logs:
# Orchestrator logs
kubectl logs -n mimir-aip deployment/orchestrator | grep "queue"

# Worker execution time
kubectl logs -n mimir-aip -l app=mimir-worker | grep "completed"
API Response Times:
# Test API latency
curl -w "@curl-format.txt" -o /dev/null -s http://localhost:8080/api/projects

# curl-format.txt:
time_total: %{time_total}s

Multi-Cluster Scaling

For extreme scale, distribute workers across multiple Kubernetes clusters:

Configuration

# helm/mimir-aip/values.yaml:73
additionalClusters:
  - name: site-b
    orchestratorURL: http://192.168.10.5:8080
    maxWorkers: 50
    namespace: mimir-aip
    serviceAccount: mimir-worker
    kubeconfig: |
      apiVersion: v1
      kind: Config
      clusters:
      - cluster:
          server: https://site-b.example.com:6443
          certificate-authority-data: ...
        name: site-b
      # ...

Worker Distribution

Workers overflow to remote clusters when local cluster reaches capacity:
  1. Local cluster - Primary, workers spawn here first
  2. site-b - Overflow when local reaches maxWorkers
  3. site-c - Further overflow (if configured)

Network Optimization

  • Use regional storage replicas to reduce cross-cluster latency
  • Configure orchestratorURL with direct IP for worker callbacks
  • Enable worker authentication with workerAuthToken

Best Practices Summary

  1. Worker Scaling
    • Set queueThreshold to 2-5x average task duration (seconds)
    • Configure maxWorkers based on cluster capacity
    • Use resource limits to prevent node oversubscription
  2. Queue Management
    • Use priority for time-sensitive tasks
    • Monitor retry rates to detect systemic issues
    • Set reasonable max_retries (default: 3)
  3. Pipeline Optimization
    • Cache plugin compilations (automatic)
    • Minimize context size
    • Group operations by plugin
  4. Storage Performance
    • Choose appropriate storage plugin for workload
    • Implement connection pooling
    • Use batch operations
    • Create indexes on query attributes
  5. Resource Allocation
    • Size worker resources for task type
    • Monitor utilization and adjust
    • Use PV with fast storage class for orchestrator
  6. Monitoring
    • Track queue metrics
    • Monitor worker success rate
    • Profile slow operations
    • Set up alerts for queue depth and error rate

Build docs developers (and LLMs) love