Skip to main content

Custom Storage Backends

Mimir AIP uses a plugin-based storage architecture that allows you to integrate with any data store through the StoragePlugin interface. This guide covers creating, installing, and configuring custom storage backends.

Storage Plugin Architecture

Storage plugins provide bidirectional translation between Mimir’s Common Interchange Representation (CIR) format and storage-specific data models. The plugin loader compiles plugins from source at runtime, ensuring compatibility with the host environment.

Plugin Lifecycle

  1. Installation - Clone repository, compile plugin, persist metadata
  2. Loading - On startup, load cached .so files for registered plugins
  3. Initialization - Configure plugin with connection details and credentials
  4. Schema Creation - Map ontology to storage-specific schema
  5. Operations - Execute CRUD operations through plugin interface
  6. Health Checks - Validate storage connectivity and availability

StoragePlugin Interface

All storage plugins must implement this interface from pkg/models/storage.go:28:
type StoragePlugin interface {
    // Initialize the plugin with configuration
    Initialize(config *PluginConfig) error

    // CreateSchema creates or updates the storage schema
    CreateSchema(ontology *OntologyDefinition) error

    // Store CIR data into the storage system
    Store(cir *CIR) (*StorageResult, error)

    // Retrieve data using queries and return as CIR objects
    Retrieve(query *CIRQuery) ([]*CIR, error)

    // Update existing CIR data
    Update(query *CIRQuery, updates *CIRUpdate) (*StorageResult, error)

    // Delete CIR data
    Delete(query *CIRQuery) (*StorageResult, error)

    // GetMetadata returns storage-specific metadata
    GetMetadata() (*StorageMetadata, error)

    // HealthCheck validates connection and storage availability
    HealthCheck() (bool, error)
}

Implementation Guide

1. Plugin Structure

Create a new Git repository with this structure:
mimir-plugin-cassandra/
├── README.md
├── plugin.go           # Main plugin implementation
├── schema.go           # Schema management
├── query.go            # Query translation
└── go.mod              # Dependencies (removed during compilation)

2. Plugin Configuration

The PluginConfig structure provides connection and credential information:
type PluginConfig struct {
    ConnectionString string                 `json:"connection_string"`
    Credentials      map[string]interface{} `json:"credentials,omitempty"`
    Options          map[string]interface{} `json:"options,omitempty"`
}

3. Implement Initialize

package main

import (
    "fmt"
    "github.com/gocql/gocql"
    "github.com/mimir-aip/mimir-aip-go/pkg/models"
)

type CassandraPlugin struct {
    session    *gocql.Session
    keyspace   string
    hosts      []string
    initialized bool
}

func NewCassandraPlugin() *CassandraPlugin {
    return &CassandraPlugin{}
}

func (p *CassandraPlugin) Initialize(config *models.PluginConfig) error {
    // Parse connection string
    hosts := []string{"localhost"}
    if config.ConnectionString != "" {
        hosts = parseHosts(config.ConnectionString)
    }

    // Get keyspace from options
    keyspace := "mimir"
    if ks, ok := config.Options["keyspace"].(string); ok {
        keyspace = ks
    }

    // Get credentials
    var username, password string
    if u, ok := config.Credentials["username"].(string); ok {
        username = u
    }
    if p, ok := config.Credentials["password"].(string); ok {
        password = p
    }

    // Create cluster configuration
    cluster := gocql.NewCluster(hosts...)
    cluster.Keyspace = keyspace
    if username != "" {
        cluster.Authenticator = gocql.PasswordAuthenticator{
            Username: username,
            Password: password,
        }
    }

    // Connect
    session, err := cluster.CreateSession()
    if err != nil {
        return fmt.Errorf("failed to connect to Cassandra: %w", err)
    }

    p.session = session
    p.keyspace = keyspace
    p.hosts = hosts
    p.initialized = true

    return nil
}

4. Implement CreateSchema

Translate ontology definitions to storage-specific schema:
func (p *CassandraPlugin) CreateSchema(ontology *models.OntologyDefinition) error {
    if !p.initialized {
        return fmt.Errorf("plugin not initialized")
    }

    // Create keyspace if not exists
    createKeyspace := fmt.Sprintf(`
        CREATE KEYSPACE IF NOT EXISTS %s
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
    `, p.keyspace)
    if err := p.session.Query(createKeyspace).Exec(); err != nil {
        return fmt.Errorf("failed to create keyspace: %w", err)
    }

    // Create table for each entity
    for _, entity := range ontology.Entities {
        tableName := entity.Name
        
        // Build column definitions
        columns := []string{}
        primaryKey := "id"
        
        for _, attr := range entity.Attributes {
            cqlType := mapTypeToCQL(attr.Type)
            columns = append(columns, fmt.Sprintf("%s %s", attr.Name, cqlType))
            
            // Use first primary key attribute, or default to id
            if len(entity.PrimaryKey) > 0 && attr.Name == entity.PrimaryKey[0] {
                primaryKey = attr.Name
            }
        }

        // Create table
        createTable := fmt.Sprintf(`
            CREATE TABLE IF NOT EXISTS %s.%s (
                %s,
                PRIMARY KEY (%s)
            )
        `, p.keyspace, tableName, strings.Join(columns, ", "), primaryKey)
        
        if err := p.session.Query(createTable).Exec(); err != nil {
            return fmt.Errorf("failed to create table %s: %w", tableName, err)
        }
    }

    return nil
}

func mapTypeToCQL(cirType string) string {
    switch cirType {
    case "string":
        return "text"
    case "number":
        return "double"
    case "boolean":
        return "boolean"
    case "date":
        return "timestamp"
    case "json":
        return "text"
    default:
        return "text"
    }
}

5. Implement Store

func (p *CassandraPlugin) Store(cir *models.CIR) (*models.StorageResult, error) {
    if !p.initialized {
        return nil, fmt.Errorf("plugin not initialized")
    }

    // Validate CIR
    if err := cir.Validate(); err != nil {
        return nil, fmt.Errorf("invalid CIR: %w", err)
    }

    // Determine entity type
    entityType := "default"
    if et, ok := cir.GetParameter("entity_type"); ok {
        if etStr, ok := et.(string); ok {
            entityType = etStr
        }
    }

    // Handle array data
    affectedItems := 0
    if arr, err := cir.GetDataAsArray(); err == nil {
        for _, item := range arr {
            if err := p.storeItem(entityType, item); err != nil {
                return nil, err
            }
            affectedItems++
        }
    } else {
        // Single item
        dataMap, err := cir.GetDataAsMap()
        if err != nil {
            return nil, err
        }
        if err := p.storeItem(entityType, dataMap); err != nil {
            return nil, err
        }
        affectedItems = 1
    }

    return &models.StorageResult{
        Success:       true,
        AffectedItems: affectedItems,
    }, nil
}

func (p *CassandraPlugin) storeItem(table string, data map[string]interface{}) error {
    // Build INSERT statement
    columns := []string{}
    placeholders := []string{}
    values := []interface{}{}

    for key, value := range data {
        columns = append(columns, key)
        placeholders = append(placeholders, "?")
        values = append(values, value)
    }

    query := fmt.Sprintf(
        "INSERT INTO %s.%s (%s) VALUES (%s)",
        p.keyspace,
        table,
        strings.Join(columns, ", "),
        strings.Join(placeholders, ", "),
    )

    return p.session.Query(query, values...).Exec()
}

6. Implement Retrieve

func (p *CassandraPlugin) Retrieve(query *models.CIRQuery) ([]*models.CIR, error) {
    if !p.initialized {
        return nil, fmt.Errorf("plugin not initialized")
    }

    entityType := query.EntityType
    if entityType == "" {
        entityType = "default"
    }

    // Build SELECT query
    cql := fmt.Sprintf("SELECT * FROM %s.%s", p.keyspace, entityType)

    // Add WHERE clause from filters
    if len(query.Filters) > 0 {
        conditions := []string{}
        values := []interface{}{}
        
        for _, filter := range query.Filters {
            op := mapOperatorToCQL(filter.Operator)
            conditions = append(conditions, fmt.Sprintf("%s %s ?", filter.Attribute, op))
            values = append(values, filter.Value)
        }
        
        cql += " WHERE " + strings.Join(conditions, " AND ")
    }

    // Add LIMIT
    if query.Limit > 0 {
        cql += fmt.Sprintf(" LIMIT %d", query.Limit)
    }

    // Execute query
    iter := p.session.Query(cql).Iter()
    defer iter.Close()

    results := []*models.CIR{}
    row := make(map[string]interface{})
    
    for iter.MapScan(row) {
        cir := &models.CIR{
            Version: "1.0",
            Data:    row,
        }
        cir.UpdateSize()
        results = append(results, cir)
        row = make(map[string]interface{}) // Reset for next iteration
    }

    if err := iter.Close(); err != nil {
        return nil, fmt.Errorf("query iteration failed: %w", err)
    }

    return results, nil
}

func mapOperatorToCQL(op string) string {
    switch op {
    case "eq":
        return "="
    case "neq":
        return "!="
    case "gt":
        return ">"
    case "gte":
        return ">="
    case "lt":
        return "<"
    case "lte":
        return "<="
    default:
        return "="
    }
}

7. Implement Update and Delete

func (p *CassandraPlugin) Update(query *models.CIRQuery, updates *models.CIRUpdate) (*models.StorageResult, error) {
    // Cassandra requires primary key for updates, so we:
    // 1. Retrieve matching records
    // 2. Apply updates
    // 3. Re-insert
    
    items, err := p.Retrieve(query)
    if err != nil {
        return nil, err
    }

    affectedItems := 0
    for _, item := range items {
        dataMap, err := item.GetDataAsMap()
        if err != nil {
            continue
        }

        // Apply updates
        for key, value := range updates.Updates {
            dataMap[key] = value
        }

        // Re-insert
        if err := p.storeItem(query.EntityType, dataMap); err != nil {
            continue
        }
        affectedItems++
    }

    return &models.StorageResult{
        Success:       true,
        AffectedItems: affectedItems,
    }, nil
}

func (p *CassandraPlugin) Delete(query *models.CIRQuery) (*models.StorageResult, error) {
    if !p.initialized {
        return nil, fmt.Errorf("plugin not initialized")
    }

    entityType := query.EntityType
    if entityType == "" {
        entityType = "default"
    }

    // Build DELETE statement
    cql := fmt.Sprintf("DELETE FROM %s.%s", p.keyspace, entityType)

    if len(query.Filters) > 0 {
        conditions := []string{}
        values := []interface{}{}
        
        for _, filter := range query.Filters {
            conditions = append(conditions, fmt.Sprintf("%s = ?", filter.Attribute))
            values = append(values, filter.Value)
        }
        
        cql += " WHERE " + strings.Join(conditions, " AND ")
        
        if err := p.session.Query(cql, values...).Exec(); err != nil {
            return nil, err
        }
    }

    return &models.StorageResult{
        Success:       true,
        AffectedItems: 1, // Cassandra doesn't return affected count
    }, nil
}

8. Implement Metadata and Health Check

func (p *CassandraPlugin) GetMetadata() (*models.StorageMetadata, error) {
    return &models.StorageMetadata{
        StorageType: "cassandra",
        Version:     "1.0.0",
        Capabilities: []string{
            "store",
            "retrieve",
            "update",
            "delete",
            "schema_creation",
            "distributed",
            "high_availability",
        },
    }, nil
}

func (p *CassandraPlugin) HealthCheck() (bool, error) {
    if !p.initialized {
        return false, fmt.Errorf("plugin not initialized")
    }

    // Simple query to verify connectivity
    query := fmt.Sprintf("SELECT now() FROM system.local")
    if err := p.session.Query(query).Exec(); err != nil {
        return false, fmt.Errorf("health check failed: %w", err)
    }

    return true, nil
}

// Export plugin instance
var Plugin = NewCassandraPlugin()

Installation and Configuration

Install Plugin

curl -X POST http://localhost:8080/api/storage-plugins \
  -H "Content-Type: application/json" \
  -d '{
    "repository_url": "https://github.com/yourorg/mimir-plugin-cassandra",
    "git_ref": "v1.0.0"
  }'

Configure for Project

curl -X POST http://localhost:8080/api/storage/configs \
  -H "Content-Type: application/json" \
  -d '{
    "project_id": "project-123",
    "plugin_type": "cassandra",
    "config": {
      "connection_string": "cassandra://cassandra-cluster:9042",
      "credentials": {
        "username": "mimir",
        "password": "secure-password"
      },
      "options": {
        "keyspace": "my_project",
        "consistency": "quorum"
      }
    }
  }'

Initialize Schema

curl -X POST http://localhost:8080/api/storage/${STORAGE_ID}/initialize \
  -H "Content-Type: application/json" \
  -d '{
    "ontology": {
      "entities": [
        {
          "name": "users",
          "attributes": [
            {"name": "id", "type": "string"},
            {"name": "email", "type": "string"},
            {"name": "created_at", "type": "date"}
          ],
          "primary_key": ["id"]
        }
      ]
    }
  }'

Query Translation

CIRQuery Format

type CIRQuery struct {
    EntityType string          `json:"entity_type,omitempty"`
    Filters    []CIRCondition  `json:"filters,omitempty"`
    OrderBy    []OrderByClause `json:"order_by,omitempty"`
    Limit      int             `json:"limit,omitempty"`
    Offset     int             `json:"offset,omitempty"`
}

type CIRCondition struct {
    Attribute string      `json:"attribute"`
    Operator  string      `json:"operator"` // eq, neq, gt, gte, lt, lte, in, like
    Value     interface{} `json:"value"`
}

Example Translation

CIR Query:
{
  "entity_type": "users",
  "filters": [
    {"attribute": "age", "operator": "gt", "value": 18},
    {"attribute": "status", "operator": "eq", "value": "active"}
  ],
  "limit": 100
}
Translated SQL:
SELECT * FROM users 
WHERE age > 18 AND status = 'active' 
LIMIT 100

Testing

Unit Tests

Create plugin_test.go:
package main

import (
    "testing"
    "github.com/mimir-aip/mimir-aip-go/pkg/models"
)

func TestCassandraPlugin_Initialize(t *testing.T) {
    plugin := NewCassandraPlugin()
    
    config := &models.PluginConfig{
        ConnectionString: "localhost:9042",
        Options: map[string]interface{}{
            "keyspace": "test",
        },
    }
    
    err := plugin.Initialize(config)
    if err != nil {
        t.Fatalf("Initialize failed: %v", err)
    }
    
    if !plugin.initialized {
        t.Error("Plugin should be initialized")
    }
}

Integration Tests

Test against real Cassandra instance:
docker run -d --name cassandra -p 9042:9042 cassandra:4.1
go test -v

Performance Optimization

Connection Pooling

func (p *CassandraPlugin) Initialize(config *models.PluginConfig) error {
    cluster := gocql.NewCluster(hosts...)
    
    // Configure connection pool
    cluster.NumConns = 4
    cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(
        gocql.RoundRobinHostPolicy(),
    )
    
    session, err := cluster.CreateSession()
    // ...
}

Batch Operations

func (p *CassandraPlugin) StoreBatch(items []map[string]interface{}) error {
    batch := p.session.NewBatch(gocql.LoggedBatch)
    
    for _, item := range items {
        query := buildInsertQuery(item)
        batch.Query(query, extractValues(item)...)
    }
    
    return p.session.ExecuteBatch(batch)
}

Query Caching

Cache prepared statements for repeated queries:
type CassandraPlugin struct {
    session        *gocql.Session
    preparedStmts  map[string]*gocql.Query
    // ...
}

Best Practices

  1. Connection Management - Reuse connections, implement proper cleanup
  2. Error Handling - Return descriptive errors with storage-specific context
  3. Type Mapping - Handle CIR type conversions carefully
  4. Transactions - Use native transaction support when available
  5. Security - Never log credentials, use secure connections
  6. Testing - Test against real storage instances
  7. Documentation - Document configuration options and limitations

Reference Implementations

See built-in plugins for examples:
  • S3 Plugin: pkg/storage/plugins/s3.go:22
  • Filesystem Plugin: pkg/storage/plugins/filesystem.go:22
  • Dynamic Loader: pkg/storage/dynamic_loader.go:15

Build docs developers (and LLMs) love