Skip to main content

Overview

Pipelines are ordered sequences of processing steps executed asynchronously by workers. They enable data ingestion, transformation, and output operations through a plugin-based architecture. Pipeline types:
  • Ingestion: Fetch data from external sources (APIs, databases, files)
  • Processing: Transform, enrich, or analyze data
  • Output: Write results to storage backends or external systems

Pipeline Structure

pkg/models/pipeline.go:24
type Pipeline struct {
    ID          string
    ProjectID   string
    Name        string
    Type        PipelineType   // ingestion, processing, output
    Description string
    Steps       []PipelineStep
    Status      PipelineStatus // active, inactive, draft
    CreatedAt   time.Time
    UpdatedAt   time.Time
}

type PipelineStep struct {
    Name       string
    Plugin     string                 // Execution plugin name
    Action     string                 // Action to perform
    Parameters map[string]interface{} // Step configuration
    Output     map[string]string      // Output variable mappings
}

Creating a Pipeline

curl -X POST http://localhost:8080/api/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "project_id": "proj-uuid-1234",
    "name": "customer-import",
    "type": "ingestion",
    "description": "Import customers from PostgreSQL",
    "steps": [
      {
        "name": "fetch-data",
        "plugin": "default",
        "action": "http_request",
        "parameters": {
          "url": "https://api.example.com/customers",
          "method": "GET",
          "headers": {
            "Authorization": "Bearer {{context._parameters.api_key}}"
          }
        },
        "output": {
          "data": "{{context.fetch-data.response.body}}"
        }
      },
      {
        "name": "parse-json",
        "plugin": "default",
        "action": "parse_json",
        "parameters": {
          "data": "{{context.fetch-data.data}}"
        }
      }
    ]
  }'

Built-in Actions

The default plugin provides common operations:
Makes HTTP requests to external APIs.Parameters:
  • url (required): Request URL
  • method: HTTP method (default: GET)
  • headers: Request headers
  • body: Request body (for POST/PUT)
Output:
  • response.status_code: HTTP status code
  • response.body: Response body as string
  • response.headers: Response headers
- name: api-call
  plugin: default
  action: http_request
  parameters:
    url: "https://api.example.com/data"
    method: GET
    headers:
      Authorization: "Bearer token"
  output:
    data: "{{context.api-call.response.body}}"
Parses JSON strings into objects.Parameters:
  • data (required): JSON string to parse
Output:
  • parsed: Parsed JSON object
- name: parse
  plugin: default
  action: parse_json
  parameters:
    data: "{{context.previous-step.output}}"
Conditional logic for branching.Parameters:
  • condition (required): Value to evaluate (truthiness check)
  • if_true: Result if condition is true
  • if_false: Result if condition is false
Output:
  • result: Chosen branch result
  • condition: Boolean evaluation result
- name: check-status
  plugin: default
  action: if_else
  parameters:
    condition: "{{context.api-call.response.status_code}}"
    if_true: "success"
    if_false: "failure"
Stores values in execution context.Parameters:
  • key (required): Variable name
  • value (required): Value to store
  • step: Context namespace (default: _global)
- name: set-var
  plugin: default
  action: set_context
  parameters:
    key: "customer_count"
    value: "42"
    step: "_global"
Retrieves values from execution context.Parameters:
  • key (required): Variable name
  • step: Context namespace (default: _global)
Output:
  • exists: Boolean indicating if value exists
  • value: Retrieved value
- name: get-var
  plugin: default
  action: get_context
  parameters:
    key: "customer_count"
    step: "_global"
Jumps to another step for loops.Parameters:
  • target (required): Target step name
- name: loop-check
  plugin: default
  action: goto
  parameters:
    target: "process-batch"

Template Variables

Pipelines support dynamic value substitution using {{}} syntax.

Context Access

{{context.step_name.key}}
Accesses values from a specific step’s output. Example:
parameters:
  url: "https://api.example.com/users/{{context.fetch-id.user_id}}"

Initial Parameters

Parameters passed during execution are available in _parameters:
{{context._parameters.api_key}}

Nested Access

Access nested JSON objects:
{{context.api-call.response.body.data.items}}
pkg/pipeline/plugin.go:237
func (p *DefaultPlugin) ResolveTemplates(input string, ctx *models.PipelineContext) string {
    pattern := regexp.MustCompile(`\{\{([^}]+)\}\}`)
    
    result := pattern.ReplaceAllStringFunc(input, func(match string) string {
        expr := strings.TrimSpace(match[2 : len(match)-2])
        parts := strings.Split(expr, ".")
        
        if len(parts) < 2 || parts[0] != "context" {
            return match
        }
        
        stepName := parts[1]
        key := parts[2]
        
        value, exists := ctx.GetStepData(stepName, key)
        if !exists {
            return match
        }
        
        // Handle nested access
        for i := 3; i < len(parts); i++ {
            if m, ok := value.(map[string]interface{}); ok {
                value, exists = m[parts[i]]
                if !exists {
                    return match
                }
            }
        }
        
        return fmt.Sprintf("%v", value)
    })
    
    return result
}

Executing a Pipeline

curl -X POST http://localhost:8080/api/pipelines/pipe-uuid-5678/execute \
  -H "Content-Type: application/json" \
  -d '{
    "trigger_type": "manual",
    "triggered_by": "user-123",
    "parameters": {
      "api_key": "secret-key",
      "batch_size": 100
    }
  }'

Execution Status

  • pending — Queued for execution
  • running — Currently executing
  • completed — Finished successfully
  • failed — Execution failed (see error field)
pkg/pipeline/service.go:142
func (s *Service) Execute(pipelineID string, req *models.PipelineExecutionRequest) (*models.PipelineExecution, error) {
    pipeline, err := s.store.GetPipeline(pipelineID)
    if err != nil {
        return nil, err
    }
    
    execution := &models.PipelineExecution{
        ID:          uuid.New().String(),
        PipelineID:  pipelineID,
        ProjectID:   pipeline.ProjectID,
        Status:      "running",
        StartedAt:   time.Now(),
        Context:     models.NewPipelineContext(DefaultContextMaxSize),
        TriggerType: req.TriggerType,
        TriggeredBy: req.TriggeredBy,
    }
    
    // Execute steps sequentially
    for _, step := range pipeline.Steps {
        plugin := s.plugins[step.Plugin]
        result, err := plugin.Execute(step.Action, step.Parameters, execution.Context)
        if err != nil {
            execution.Status = "failed"
            execution.Error = fmt.Sprintf("step %s failed: %v", step.Name, err)
            return execution, err
        }
        
        // Store results in context
        for key, value := range result {
            execution.Context.SetStepData(step.Name, key, value)
        }
    }
    
    execution.Status = "completed"
    now := time.Now()
    execution.CompletedAt = &now
    
    return execution, nil
}

Example Pipelines

{
  "name": "github-stars",
  "type": "ingestion",
  "steps": [
    {
      "name": "fetch-repos",
      "plugin": "default",
      "action": "http_request",
      "parameters": {
        "url": "https://api.github.com/users/octocat/repos",
        "method": "GET",
        "headers": {
          "Accept": "application/vnd.github.v3+json"
        }
      },
      "output": {
        "repos": "{{context.fetch-repos.response.body}}"
      }
    },
    {
      "name": "parse-response",
      "plugin": "default",
      "action": "parse_json",
      "parameters": {
        "data": "{{context.fetch-repos.repos}}"
      },
      "output": {
        "parsed": "{{context.parse-response.parsed}}"
      }
    }
  ]
}

Validation Rules

  • Must be non-empty
  • No uniqueness requirement (multiple pipelines can have the same name)
Must be one of:
  • ingestion
  • processing
  • output
pkg/pipeline/service.go:263
if req.Type != models.PipelineTypeIngestion &&
    req.Type != models.PipelineTypeProcessing &&
    req.Type != models.PipelineTypeOutput {
    return fmt.Errorf("invalid pipeline type")
}
  • Must have at least one step
  • Step names must be unique within the pipeline
  • Each step must specify name, plugin, and action
pkg/pipeline/service.go:271
if len(req.Steps) == 0 {
    return fmt.Errorf("pipeline must have at least one step")
}

stepNames := make(map[string]bool)
for _, step := range req.Steps {
    if stepNames[step.Name] {
        return fmt.Errorf("duplicate step name: %s", step.Name)
    }
    stepNames[step.Name] = true
}

Best Practices

Error Handling

Use if_else to check HTTP status codes and handle errors gracefully.
- name: check-status
  action: if_else
  parameters:
    condition: "{{context.api-call.response.status_code}}"
    if_true: "success"
    if_false: "error-handler"

Logging

Store intermediate results in context for debugging:
- name: log-response
  action: set_context
  parameters:
    key: "api_response"
    value: "{{context.api-call.response}}"

Parameterization

Use initial parameters for configurable values:
parameters:
  url: "{{context._parameters.base_url}}/data"
  api_key: "{{context._parameters.api_key}}"

Modularity

Break complex workflows into multiple pipelines and chain them with schedules.

Next Steps

Schedules

Automate pipeline execution with cron schedules.

Storage

Store pipeline results in backend systems.

Ontologies

Structure your data with ontologies.

Build docs developers (and LLMs) love