Skip to main content

Plugin Development

Mimir AIP’s plugin system allows you to extend both pipeline capabilities and storage backends. Plugins are compiled as Go shared objects (.so) and loaded dynamically at runtime.

Plugin Architecture

Mimir AIP supports two types of plugins:
  1. Pipeline Plugins - Custom actions for data transformation pipelines
  2. Storage Plugins - Custom storage backends for CIR data
All plugins are:
  • Cloned from Git repositories
  • Compiled at runtime with the host module’s dependencies
  • Cached to avoid recompilation on restarts
  • Hot-loadable without orchestrator restarts

Pipeline Plugin Development

Plugin Interface

Pipeline plugins must implement the Plugin interface from pkg/pipeline:
type Plugin interface {
    Execute(action string, params map[string]interface{}, context *PipelineContext) (map[string]interface{}, error)
}

Plugin Structure

Your plugin repository should have this structure:
my-plugin/
├── plugin.yaml          # Plugin metadata
├── actions/             # Action implementations (optional)
│   ├── transform.go
│   └── validate.go
└── go.mod               # Will be removed during compilation

plugin.yaml Definition

name: my-custom-plugin
version: 1.0.0
description: Custom data transformation plugin
author: Your Name
actions:
  - name: custom_transform
    description: Applies custom transformation logic
    parameters:
      - name: input_field
        type: string
        required: true
        description: Field to transform
      - name: mode
        type: string
        required: false
        description: Transformation mode
    returns:
      - name: transformed_value
        type: any
        description: The transformed result

Implementation Example

Create actions/transform.go:
package main

import (
    "fmt"
    "strings"
    "github.com/mimir-aip/mimir-aip-go/pkg/models"
)

type MyPlugin struct{}

func (p *MyPlugin) Execute(action string, params map[string]interface{}, context *models.PipelineContext) (map[string]interface{}, error) {
    switch action {
    case "custom_transform":
        return p.customTransform(params, context)
    default:
        return nil, fmt.Errorf("unknown action: %s", action)
    }
}

func (p *MyPlugin) customTransform(params map[string]interface{}, context *models.PipelineContext) (map[string]interface{}, error) {
    inputField, ok := params["input_field"].(string)
    if !ok {
        return nil, fmt.Errorf("input_field is required")
    }

    // Get data from context
    value := context.GetStepData("previous_step", inputField)
    
    // Apply transformation
    mode := "uppercase" // default
    if m, ok := params["mode"].(string); ok {
        mode = m
    }

    var result interface{}
    if strVal, ok := value.(string); ok {
        switch mode {
        case "uppercase":
            result = strings.ToUpper(strVal)
        case "lowercase":
            result = strings.ToLower(strVal)
        default:
            result = strVal
        }
    }

    return map[string]interface{}{
        "transformed_value": result,
    }, nil
}

// Export the plugin instance
var Plugin MyPlugin

Installation

Install your plugin via the API:
curl -X POST http://localhost:8080/api/plugins/install \
  -H "Content-Type: application/json" \
  -d '{
    "repository_url": "https://github.com/yourorg/my-plugin",
    "git_ref": "main"
  }'

Using in Pipelines

Reference your plugin in pipeline definitions:
{
  "name": "Data Processing Pipeline",
  "steps": [
    {
      "name": "load_data",
      "plugin": "builtin",
      "action": "read_file",
      "parameters": {"path": "/data/input.json"}
    },
    {
      "name": "transform",
      "plugin": "my-custom-plugin",
      "action": "custom_transform",
      "parameters": {
        "input_field": "$.load_data.content",
        "mode": "uppercase"
      }
    }
  ]
}

Storage Plugin Development

StoragePlugin Interface

Storage plugins implement the StoragePlugin interface from pkg/models:
type StoragePlugin interface {
    // Initialize the plugin with configuration
    Initialize(config *PluginConfig) error

    // CreateSchema creates storage schema from ontology
    CreateSchema(ontology *OntologyDefinition) error

    // Store CIR data
    Store(cir *CIR) (*StorageResult, error)

    // Retrieve data using queries
    Retrieve(query *CIRQuery) ([]*CIR, error)

    // Update existing data
    Update(query *CIRQuery, updates *CIRUpdate) (*StorageResult, error)

    // Delete data
    Delete(query *CIRQuery) (*StorageResult, error)

    // GetMetadata returns storage capabilities
    GetMetadata() (*StorageMetadata, error)

    // HealthCheck validates storage availability
    HealthCheck() (bool, error)
}

Implementation Example

Create a custom storage plugin for a hypothetical database:
package main

import (
    "fmt"
    "github.com/mimir-aip/mimir-aip-go/pkg/models"
)

type CustomStoragePlugin struct {
    client      *CustomDBClient
    database    string
    initialized bool
}

func NewCustomStoragePlugin() *CustomStoragePlugin {
    return &CustomStoragePlugin{}
}

func (p *CustomStoragePlugin) Initialize(config *models.PluginConfig) error {
    // Extract connection details
    connectionString := config.ConnectionString
    
    // Get optional configuration
    database := "mimir"
    if db, ok := config.Options["database"].(string); ok {
        database = db
    }

    // Initialize client
    client, err := ConnectToCustomDB(connectionString, database)
    if err != nil {
        return fmt.Errorf("failed to connect: %w", err)
    }

    p.client = client
    p.database = database
    p.initialized = true
    return nil
}

func (p *CustomStoragePlugin) CreateSchema(ontology *models.OntologyDefinition) error {
    if !p.initialized {
        return fmt.Errorf("plugin not initialized")
    }

    // Create tables/collections for each entity
    for _, entity := range ontology.Entities {
        if err := p.createEntityTable(entity); err != nil {
            return fmt.Errorf("failed to create schema for %s: %w", entity.Name, err)
        }
    }

    return nil
}

func (p *CustomStoragePlugin) Store(cir *models.CIR) (*models.StorageResult, error) {
    if !p.initialized {
        return nil, fmt.Errorf("plugin not initialized")
    }

    // Validate CIR
    if err := cir.Validate(); err != nil {
        return nil, fmt.Errorf("invalid CIR: %w", err)
    }

    // Extract entity type and data
    dataMap, err := cir.GetDataAsMap()
    if err != nil {
        return nil, err
    }

    // Insert into storage
    if err := p.client.Insert(dataMap); err != nil {
        return nil, err
    }

    return &models.StorageResult{
        Success:       true,
        AffectedItems: 1,
    }, nil
}

func (p *CustomStoragePlugin) Retrieve(query *models.CIRQuery) ([]*models.CIR, error) {
    if !p.initialized {
        return nil, fmt.Errorf("plugin not initialized")
    }

    // Convert CIRQuery to native query format
    nativeQuery := p.convertQuery(query)

    // Execute query
    rows, err := p.client.Query(nativeQuery)
    if err != nil {
        return nil, err
    }

    // Convert results to CIR format
    results := make([]*models.CIR, 0)
    for _, row := range rows {
        cir := &models.CIR{
            Version: "1.0",
            Data:    row,
        }
        cir.UpdateSize()
        results = append(results, cir)
    }

    return results, nil
}

func (p *CustomStoragePlugin) Update(query *models.CIRQuery, updates *models.CIRUpdate) (*models.StorageResult, error) {
    // Implementation similar to Retrieve + modify + save
    count, err := p.client.Update(p.convertQuery(query), updates.Updates)
    if err != nil {
        return nil, err
    }

    return &models.StorageResult{
        Success:       true,
        AffectedItems: count,
    }, nil
}

func (p *CustomStoragePlugin) Delete(query *models.CIRQuery) (*models.StorageResult, error) {
    count, err := p.client.Delete(p.convertQuery(query))
    if err != nil {
        return nil, err
    }

    return &models.StorageResult{
        Success:       true,
        AffectedItems: count,
    }, nil
}

func (p *CustomStoragePlugin) GetMetadata() (*models.StorageMetadata, error) {
    return &models.StorageMetadata{
        StorageType: "customdb",
        Version:     "1.0.0",
        Capabilities: []string{
            "store",
            "retrieve",
            "update",
            "delete",
            "schema_creation",
            "transactions",
        },
    }, nil
}

func (p *CustomStoragePlugin) HealthCheck() (bool, error) {
    if !p.initialized {
        return false, fmt.Errorf("not initialized")
    }

    return p.client.Ping() == nil, nil
}

// Export plugin instance
var Plugin = NewCustomStoragePlugin()

Storage Plugin Installation

curl -X POST http://localhost:8080/api/storage-plugins \
  -H "Content-Type: application/json" \
  -d '{
    "repository_url": "https://github.com/yourorg/customdb-plugin",
    "git_ref": "v1.0.0"
  }'

Configuration

Configure storage for a project:
curl -X POST http://localhost:8080/api/storage/configs \
  -H "Content-Type: application/json" \
  -d '{
    "project_id": "project-123",
    "plugin_type": "customdb",
    "config": {
      "connection_string": "customdb://localhost:5432",
      "options": {
        "database": "my_project",
        "pool_size": 10
      }
    }
  }'

Plugin Compilation Process

The plugin loader performs these steps:
  1. Clone - Git repository is cloned to a temporary directory
  2. Flatten - Files from actions/ subdirectory are moved to root
  3. Remove go.mod - Plugin’s go.mod is removed; host module is used
  4. Move to host - Plugin source is moved to ${appDir}/plugins/${name} or ${appDir}/storage-plugins/${name}
  5. Compile - Built with go build -buildmode=plugin -trimpath
  6. Cache - Compiled .so is cached with commit hash metadata
  7. Load - Plugin symbol is resolved and registered
See pkg/plugins/client.go:99 and pkg/storage/dynamic_loader.go:76 for implementation details.

Best Practices

Version Control

  • Tag releases with semantic versioning
  • Use git_ref to pin to specific versions in production
  • Test plugins thoroughly before deploying

Error Handling

  • Return descriptive errors with context
  • Validate all input parameters
  • Handle edge cases gracefully

Performance

  • Minimize context lookups in hot paths
  • Cache expensive computations
  • Use efficient data structures

Dependencies

  • Keep external dependencies minimal
  • Use the same Go version as Mimir AIP
  • Test with the host module’s dependency versions

Security

  • Validate and sanitize all inputs
  • Avoid executing arbitrary code
  • Use secure connection strings (credentials via config.Credentials)
  • Never log sensitive data

Debugging Plugins

Compilation Errors

Check orchestrator logs for compilation output:
kubectl logs -n mimir-aip deployment/orchestrator | grep "plugin loader"

Runtime Errors

Check worker logs for execution errors:
kubectl logs -n mimir-aip -l app=mimir-worker

Testing Locally

Compile and test plugins locally before deployment:
go build -buildmode=plugin -o test.so ./
go run test-plugin.go

Built-in Plugins

Mimir AIP includes several built-in storage plugins:
  • filesystem - Local file storage (pkg/storage/plugins/filesystem.go:22)
  • s3 - AWS S3 and S3-compatible storage (pkg/storage/plugins/s3.go:22)
  • postgresql - PostgreSQL database
  • mysql - MySQL/MariaDB database
  • mongodb - MongoDB database
  • redis - Redis key-value store
  • neo4j - Neo4j graph database
  • elasticsearch - Elasticsearch search engine
Refer to the source code in pkg/storage/plugins/ for implementation examples.

Build docs developers (and LLMs) love