Overview
Pipelines are ordered sequences of processing steps executed asynchronously by workers. They enable data ingestion, transformation, and output operations through a plugin-based architecture. Pipeline types:- Ingestion: Fetch data from external sources (APIs, databases, files)
- Processing: Transform, enrich, or analyze data
- Output: Write results to storage backends or external systems
Pipeline Structure
pkg/models/pipeline.go:24
Creating a Pipeline
- curl
- Response
Built-in Actions
Thedefault plugin provides common operations:
http_request
http_request
Makes HTTP requests to external APIs.Parameters:
url(required): Request URLmethod: HTTP method (default:GET)headers: Request headersbody: Request body (for POST/PUT)
response.status_code: HTTP status coderesponse.body: Response body as stringresponse.headers: Response headers
parse_json
parse_json
Parses JSON strings into objects.Parameters:
data(required): JSON string to parse
parsed: Parsed JSON object
if_else
if_else
Conditional logic for branching.Parameters:
condition(required): Value to evaluate (truthiness check)if_true: Result if condition is trueif_false: Result if condition is false
result: Chosen branch resultcondition: Boolean evaluation result
set_context
set_context
Stores values in execution context.Parameters:
key(required): Variable namevalue(required): Value to storestep: Context namespace (default:_global)
get_context
get_context
Retrieves values from execution context.Parameters:
key(required): Variable namestep: Context namespace (default:_global)
exists: Boolean indicating if value existsvalue: Retrieved value
goto
goto
Jumps to another step for loops.Parameters:
target(required): Target step name
Template Variables
Pipelines support dynamic value substitution using{{}} syntax.
Context Access
Initial Parameters
Parameters passed during execution are available in_parameters:
Nested Access
Access nested JSON objects:pkg/pipeline/plugin.go:237
Executing a Pipeline
- curl
- Response
Execution Status
pending— Queued for executionrunning— Currently executingcompleted— Finished successfullyfailed— Execution failed (seeerrorfield)
pkg/pipeline/service.go:142
Example Pipelines
- API Ingestion
- Conditional Logic
- Loop Example
Validation Rules
Pipeline Name
Pipeline Name
- Must be non-empty
- No uniqueness requirement (multiple pipelines can have the same name)
Pipeline Type
Pipeline Type
Must be one of:
ingestionprocessingoutput
pkg/pipeline/service.go:263
Steps
Steps
- Must have at least one step
- Step names must be unique within the pipeline
- Each step must specify
name,plugin, andaction
pkg/pipeline/service.go:271
Best Practices
Error Handling
Use
if_else to check HTTP status codes and handle errors gracefully.Logging
Store intermediate results in context for debugging:
Parameterization
Use initial parameters for configurable values:
Modularity
Break complex workflows into multiple pipelines and chain them with schedules.
Next Steps
Schedules
Automate pipeline execution with cron schedules.
Storage
Store pipeline results in backend systems.
Ontologies
Structure your data with ontologies.