Skip to main content

Overview

Digital Twins are live in-memory graphs of entities and relationships, initialized from ontologies and synchronized from storage. They enable real-time queries via SPARQL, ML-powered predictions, what-if scenarios, and automated actions. Key capabilities:
  • Entity Management: Store and query entities with attributes and relationships
  • SPARQL Queries: Standard semantic queries on entity graphs
  • ML Predictions: Run model inference on entity data
  • What-If Scenarios: Test hypothetical changes
  • Automated Actions: Trigger pipelines based on conditions

Digital Twin Structure

pkg/models/digitaltwin.go:10
type DigitalTwin struct {
    ID          string
    ProjectID   string
    OntologyID  string              // Blueprint for entity types
    Name        string
    Description string
    Status      string              // active, syncing, error
    Config      *DigitalTwinConfig
    Metadata    map[string]interface{}
    CreatedAt   time.Time
    UpdatedAt   time.Time
    LastSyncAt  *time.Time
}

type DigitalTwinConfig struct {
    StorageIDs         []string // CIR data sources
    CacheTTL           int      // Cache TTL in seconds
    AutoSync           bool     // Auto-sync on storage changes
    SyncInterval       int      // Sync interval in seconds
    EnablePredictions  bool     // ML predictions enabled
    PredictionCacheTTL int      // Prediction cache TTL
    IndexingStrategy   string   // "lazy" or "eager"
}

Creating a Digital Twin

curl -X POST http://localhost:8080/api/digital-twins \
  -H "Content-Type: application/json" \
  -d '{
    "project_id": "proj-uuid-1234",
    "ontology_id": "ont-uuid-7890",
    "name": "customer-twin",
    "description": "Customer entity graph",
    "config": {
      "storage_ids": ["storage-uuid-1", "storage-uuid-2"],
      "cache_ttl": 300,
      "auto_sync": true,
      "sync_interval": 3600,
      "enable_predictions": true,
      "prediction_cache_ttl": 1800,
      "indexing_strategy": "lazy"
    }
  }'

Synchronizing with Storage

Sync entities from storage backends:
curl -X POST http://localhost:8080/api/digital-twins/twin-uuid-2222/sync
Sync process:
  1. Retrieve CIR data from configured storage backends
  2. Infer entity types from ontology classes
  3. Create or update entities with attributes
  4. Detect shared key fields across entity types
  5. Wire relationships based on matching key values
  6. Update last_sync_at timestamp

Entity Resolution

When the same logical entity appears in multiple storage sources (e.g., student_id=42 in both grades_db and attendance_db), records are merged into a single canonical entity:
pkg/digitaltwin/service.go:696
func (s *Service) syncFromStorage(twin *models.DigitalTwin, ont *models.Ontology, storageID string) error {
    // Retrieve CIR data
    cirs, err := s.storageService.Retrieve(storageID, &models.CIRQuery{})
    
    // Pre-load existing entities per type
    typeIndex := make(map[string]map[string]*models.Entity)
    
    for _, cir := range cirs {
        entityType := inferEntityTypeFromCIR(cir, classNames)
        attrs := extractAttributes(cir)
        
        // Attempt entity resolution by matching key fields
        keyFields := detectKeyFields(attrs)
        var resolved *models.Entity
        for _, kf := range keyFields {
            kv := keyValue(attrs[kf])
            if existing, ok := typeIndex[entityType][kf+":"+kv]; ok {
                resolved = existing
                break
            }
        }
        
        if resolved != nil {
            // Merge: add new attributes without overwriting
            for k, v := range attrs {
                if _, exists := resolved.Attributes[k]; !exists {
                    resolved.Attributes[k] = v
                }
            }
            resolved.UpdatedAt = time.Now()
            s.store.SaveEntity(resolved)
        } else {
            // Create new entity
            entity := &models.Entity{
                ID:            uuid.New().String(),
                DigitalTwinID: twin.ID,
                Type:          entityType,
                Attributes:    attrs,
                CreatedAt:     time.Now(),
            }
            s.store.SaveEntity(entity)
        }
    }
    
    return nil
}

Relationship Wiring

After sync, entities with shared key field values are linked:
pkg/digitaltwin/service.go:859
func (s *Service) wireRelationships(twinID string) error {
    allEntities, _ := s.store.ListEntitiesByDigitalTwin(twinID)
    
    // Group by type
    byType := make(map[string][]*models.Entity)
    for _, e := range allEntities {
        byType[e.Type] = append(byType[e.Type], e)
    }
    
    // For each pair of entity types
    for i, typeA := range types {
        for j, typeB := range types[i+1:] {
            // Find common key fields
            sharedKeys := commonKeyFieldNames(byType[typeA], byType[typeB])
            
            // Build index for type B
            bIndex := make(map[string]map[string]*models.Entity)
            for _, e := range byType[typeB] {
                for _, kf := range sharedKeys {
                    kv := keyValue(e.Attributes[kf])
                    bIndex[kf][kv] = e
                }
            }
            
            // Match entities from type A
            for _, eA := range byType[typeA] {
                for _, kf := range sharedKeys {
                    kv := keyValue(eA.Attributes[kf])
                    if eB, ok := bIndex[kf][kv]; ok {
                        // Create bidirectional relationship
                        relType := "relatedBy" + toCamelCase(kf)
                        eA.Relationships = append(eA.Relationships, &models.EntityRelationship{
                            Type:       relType,
                            TargetID:   eB.ID,
                            TargetType: typeB,
                        })
                        eB.Relationships = append(eB.Relationships, &models.EntityRelationship{
                            Type:       relType,
                            TargetID:   eA.ID,
                            TargetType: typeA,
                        })
                    }
                }
            }
        }
    }
    
    return nil
}

SPARQL Queries

Query entities using standard SPARQL syntax:
curl -X POST http://localhost:8080/api/digital-twins/twin-uuid-2222/query \
  -H "Content-Type: application/json" \
  -d '{
    "query": "SELECT ?customer ?email ?orderCount WHERE { ?customer a :Customer ; :email ?email ; :hasOrder ?order . } GROUP BY ?customer ?email",
    "limit": 10
  }'
Response:
{
  "columns": ["customer", "email", "orderCount"],
  "rows": [
    {"customer": "entity-uuid-1", "email": "[email protected]", "orderCount": 12},
    {"customer": "entity-uuid-2", "email": "[email protected]", "orderCount": 8}
  ],
  "count": 2,
  "metadata": {"query_type": "sparql"}
}

SPARQL Examples

SELECT ?customer ?name ?email
WHERE {
  ?customer a :Customer ;
            :name ?name ;
            :email ?email .
}
LIMIT 10

Running Predictions

Execute ML model predictions on entity data:
curl -X POST http://localhost:8080/api/digital-twins/twin-uuid-2222/predict \
  -H "Content-Type: application/json" \
  -d '{
    "model_id": "model-uuid-1111",
    "entity_id": "entity-uuid-1",
    "use_cache": true
  }'
Response:
{
  "id": "pred-uuid-3333",
  "digital_twin_id": "twin-uuid-2222",
  "model_id": "model-uuid-1111",
  "entity_id": "entity-uuid-1",
  "prediction_type": "point",
  "output": 0.73,
  "confidence": 0.85,
  "cached_at": "2026-03-01T10:30:00Z",
  "expires_at": "2026-03-01T11:00:00Z"
}

Auto-Enrichment

When entity_id is provided, the service automatically enriches input features from related entities:
pkg/digitaltwin/service.go:386
func (s *Service) enrichPredictionInput(req *models.PredictionRequest) error {
    entity, _ := s.store.GetEntity(req.EntityID)
    
    // Auto-populate from entity attributes
    if len(req.Input) == 0 {
        req.Input = make(map[string]interface{})
        for k, v := range entity.Attributes {
            req.Input[k] = v
        }
    }
    
    // Merge related entity attributes with type prefix
    for _, rel := range entity.Relationships {
        related, _ := s.store.GetEntity(rel.TargetID)
        prefix := strings.ToLower(rel.TargetType) + "."
        for k, v := range related.Attributes {
            key := prefix + k
            if _, exists := req.Input[key]; !exists {
                req.Input[key] = v
            }
        }
    }
    
    return nil
}
Example: A Student entity with a related AttendanceRecord produces keys like:
  • avg_grade → from Student
  • attendancerecord.days_absent → from related AttendanceRecord

What-If Scenarios

Create hypothetical scenarios to test predictions:
curl -X POST http://localhost:8080/api/digital-twins/twin-uuid-2222/scenarios \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Price Increase Impact",
    "description": "Test 10% price increase",
    "base_state": "current",
    "modifications": [
      {
        "entity_type": "Product",
        "entity_id": "entity-uuid-5",
        "attribute": "price",
        "original_value": 29.99,
        "new_value": 32.99,
        "rationale": "10% increase"
      }
    ],
    "run_predictions": true
  }'
Response:
{
  "id": "scenario-uuid-4444",
  "digital_twin_id": "twin-uuid-2222",
  "name": "Price Increase Impact",
  "base_state": "current",
  "modifications": [...],
  "predictions": [
    {
      "model_id": "model-uuid-1111",
      "model_name": "demand-forecast",
      "prediction": 0.82,
      "confidence": 0.78,
      "impact": "Expected 18% decrease in demand"
    }
  ],
  "status": "active",
  "created_at": "2026-03-01T10:30:00Z"
}

Automated Actions

Trigger pipelines when conditions are met:
curl -X POST http://localhost:8080/api/digital-twins/twin-uuid-2222/actions \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Low Stock Alert",
    "description": "Trigger restock pipeline when inventory low",
    "enabled": true,
    "condition": {
      "attribute": "stock_level",
      "operator": "lt",
      "threshold": 10,
      "entity_type": "Product"
    },
    "trigger": {
      "pipeline_id": "pipe-restock-5678",
      "parameters": {
        "urgency": "high"
      }
    }
  }'
Condition operators:
  • gt / gte — Greater than / or equal
  • lt / lte — Less than / or equal
  • eq / ne — Equals / not equals
Actions are evaluated after predictions. When a condition is met, the specified pipeline is automatically enqueued.

Updating Entities

Modify entity attributes (stored as delta modifications):
curl -X PATCH http://localhost:8080/api/digital-twins/entities/entity-uuid-1 \
  -H "Content-Type: application/json" \
  -d '{
    "attributes": {
      "age": 33,
      "email": "[email protected]"
    }
  }'
Modifications are stored separately from source data, allowing rollback and audit trails.

Listing Entities

# All entities in digital twin
curl http://localhost:8080/api/digital-twins/twin-uuid-2222/entities

# Specific entity
curl http://localhost:8080/api/digital-twins/entities/entity-uuid-1

Best Practices

Auto-sync for real-time twins:
{"auto_sync": true, "sync_interval": 3600}
Manual sync for controlled updates:
{"auto_sync": false}
Trigger sync via API when data changes.
Set TTLs based on data volatility:
  • High volatility (1-5 min): Real-time sensor data
  • Medium volatility (5-30 min): Customer profiles
  • Low volatility (1+ hour): Product catalogs
{
  "cache_ttl": 300,
  "prediction_cache_ttl": 1800
}
  • Lazy: Build indexes on first query (faster startup)
  • Eager: Build indexes on sync (faster queries)
Use eager for frequently queried twins.
Archive old scenarios to prevent clutter:
curl -X DELETE /api/digital-twins/scenarios/scenario-uuid-4444
Keep only active what-if analyses.

Next Steps

ML Models

Train models for digital twin predictions.

Ontologies

Define entity types and relationships.

Storage

Configure data sources for sync.

Build docs developers (and LLMs) love