Skip to main content

Overview

Event pipelines provide a declarative way to process events through a series of filter and action steps. Pipelines enable complex event processing logic without writing custom handlers. Location: crates/oneclaw-core/src/event_bus/pipeline.rs

Pipeline Concept

A pipeline is a series of steps: Filter → Transform → Action
Event → [Filter] → [Action] → [Filter] → [Action] → Emitted Events
Each step:
  1. Filter: Check if event matches criteria
  2. Actions: Modify event or emit new events
  3. Pass to next step with modified event

Pipeline Structure

Location: crates/oneclaw-core/src/event_bus/pipeline.rs:89
pub struct Pipeline {
    pub name: String,
    pub topic_pattern: String,
    pub steps: Vec<PipelineStep>,
}

PipelineStep

Location: crates/oneclaw-core/src/event_bus/pipeline.rs:78
pub struct PipelineStep {
    pub name: String,
    pub filter: FilterOp,
    pub actions: Vec<PipelineAction>,
}

Filter Operations

Location: crates/oneclaw-core/src/event_bus/pipeline.rs:10
pub enum FilterOp {
    HasField(String),                    // Check field exists
    FieldEquals(String, String),         // Field equals value
    FieldGreaterThan(String, f64),       // Numeric field > threshold
    FieldLessThan(String, f64),          // Numeric field < threshold
    TopicMatches(String),                // Topic matches pattern
    Always,                               // Always pass
}

Filter Examples

// Check if temperature field exists
FilterOp::HasField("temperature".into())

// Check if device type is "sensor"
FilterOp::FieldEquals("device_type".into(), "sensor".into())

// Check if temperature > 38.0
FilterOp::FieldGreaterThan("temperature".into(), 38.0)

// Check if humidity < 30.0
FilterOp::FieldLessThan("humidity".into(), 30.0)

// Check if topic matches "sensor.*"
FilterOp::TopicMatches("sensor.*".into())

// Always execute actions
FilterOp::Always

Pipeline Actions

Location: crates/oneclaw-core/src/event_bus/pipeline.rs:55
pub enum PipelineAction {
    EmitEvent { topic: String, priority: EventPriority },
    SetField { key: String, value: String },
    SetPriority(EventPriority),
    Log(String),
}

Action Examples

// Emit a new alert event
PipelineAction::EmitEvent {
    topic: "alert.high_temp".into(),
    priority: EventPriority::Critical,
}

// Add or update a field
PipelineAction::SetField {
    key: "alert_type".into(),
    value: "threshold_exceeded".into(),
}

// Change event priority
PipelineAction::SetPriority(EventPriority::High)

// Log for debugging
PipelineAction::Log("High temperature detected".into())

Building Pipelines

Location: crates/oneclaw-core/src/event_bus/pipeline.rs:100
let pipeline = Pipeline::new("high-temp-detector", "sensor.temperature*")
    .add_step(PipelineStep {
        name: "check-threshold".into(),
        filter: FilterOp::FieldGreaterThan("value".into(), 100.0),
        actions: vec![
            PipelineAction::SetField {
                key: "alert_type".into(),
                value: "high_temperature".into(),
            },
            PipelineAction::SetPriority(EventPriority::Critical),
            PipelineAction::EmitEvent {
                topic: "alert.high_temp".into(),
                priority: EventPriority::Critical,
            },
        ],
    });

Processing Events

Location: crates/oneclaw-core/src/event_bus/pipeline.rs:115
let event = Event::new("sensor.temperature", "temp-sensor")
    .with_data("device", "device_01")
    .with_data("value", "105.3");

let emitted_events = pipeline.process(&event);
// Returns: Vec<Event> with any events emitted by EmitEvent actions

Filter-Transform-Route Patterns

Threshold Detection

Location: crates/oneclaw-core/src/event_bus/pipeline.rs:212
Pipeline::new("threshold-alert", "sensor.*")
    .add_step(PipelineStep {
        name: "detect-high-value".into(),
        filter: FilterOp::FieldGreaterThan("value".into(), 100.0),
        actions: vec![
            PipelineAction::EmitEvent {
                topic: "alert.threshold".into(),
                priority: EventPriority::Critical,
            },
        ],
    })

Multi-Step Processing

Location: crates/oneclaw-core/src/event_bus/pipeline.rs:265
Pipeline::new("pressure-monitor", "sensor.pressure*")
    // Step 1: Tag high-pressure events
    .add_step(PipelineStep {
        name: "tag-high-pressure".into(),
        filter: FilterOp::FieldGreaterThan("value".into(), 1000.0),
        actions: vec![
            PipelineAction::SetField {
                key: "risk".into(),
                value: "high".into(),
            },
        ],
    })
    // Step 2: Emit alert if tagged as risky
    .add_step(PipelineStep {
        name: "emit-alert-if-risky".into(),
        filter: FilterOp::FieldEquals("risk".into(), "high".into()),
        actions: vec![
            PipelineAction::EmitEvent {
                topic: "alert.high_pressure".into(),
                priority: EventPriority::High,
            },
        ],
    })

Event Enrichment

Pipeline::new("enrich-sensor-data", "sensor.*")
    .add_step(PipelineStep {
        name: "add-metadata".into(),
        filter: FilterOp::Always,
        actions: vec![
            PipelineAction::SetField {
                key: "processed_by".into(),
                value: "pipeline".into(),
            },
            PipelineAction::SetField {
                key: "pipeline_version".into(),
                value: "1.0".into(),
            },
        ],
    })

Routing by Type

Pipeline::new("alert-router", "sensor.*")
    .add_step(PipelineStep {
        name: "route-temperature".into(),
        filter: FilterOp::FieldEquals("type".into(), "temperature".into()),
        actions: vec![
            PipelineAction::EmitEvent {
                topic: "telemetry.temperature".into(),
                priority: EventPriority::Normal,
            },
        ],
    })
    .add_step(PipelineStep {
        name: "route-motion".into(),
        filter: FilterOp::FieldEquals("type".into(), "motion".into()),
        actions: vec![
            PipelineAction::EmitEvent {
                topic: "telemetry.motion".into(),
                priority: EventPriority::Normal,
            },
        ],
    })

Topic Matching

Location: crates/oneclaw-core/src/event_bus/pipeline.rs:154
let pipeline = Pipeline::new("test", "sensor.*");

assert!(pipeline.matches_topic("sensor.temp"));      // true
assert!(pipeline.matches_topic("sensor.motion"));    // true
assert!(!pipeline.matches_topic("alert.critical")); // false

Pipeline Configuration Examples

Temperature Monitoring

let temp_pipeline = Pipeline::new("temperature-monitor", "sensor.temperature*")
    .add_step(PipelineStep {
        name: "log-all".into(),
        filter: FilterOp::Always,
        actions: vec![
            PipelineAction::Log("Temperature reading received".into()),
        ],
    })
    .add_step(PipelineStep {
        name: "high-temp-alert".into(),
        filter: FilterOp::FieldGreaterThan("value".into(), 80.0),
        actions: vec![
            PipelineAction::SetPriority(EventPriority::Critical),
            PipelineAction::SetField {
                key: "alert_reason".into(),
                value: "temperature_critical".into(),
            },
            PipelineAction::EmitEvent {
                topic: "alert.critical.temperature".into(),
                priority: EventPriority::Critical,
            },
        ],
    })
    .add_step(PipelineStep {
        name: "low-temp-alert".into(),
        filter: FilterOp::FieldLessThan("value".into(), 5.0),
        actions: vec![
            PipelineAction::SetPriority(EventPriority::High),
            PipelineAction::EmitEvent {
                topic: "alert.low_temperature".into(),
                priority: EventPriority::High,
            },
        ],
    });

Device Status Monitoring

let status_pipeline = Pipeline::new("device-status", "device.status*")
    .add_step(PipelineStep {
        name: "offline-detection".into(),
        filter: FilterOp::FieldEquals("status".into(), "offline".into()),
        actions: vec![
            PipelineAction::SetField {
                key: "alert_type".into(),
                value: "device_offline".into(),
            },
            PipelineAction::EmitEvent {
                topic: "alert.device_offline".into(),
                priority: EventPriority::High,
            },
        ],
    })
    .add_step(PipelineStep {
        name: "error-detection".into(),
        filter: FilterOp::FieldEquals("status".into(), "error".into()),
        actions: vec![
            PipelineAction::EmitEvent {
                topic: "alert.device_error".into(),
                priority: EventPriority::Critical,
            },
        ],
    });

Integration with EventBus

Pipelines can be used with EventBus handlers:
let bus = DefaultEventBus::new();
let pipeline = Pipeline::new("my-pipeline", "sensor.*");

bus.subscribe("sensor.*", Box::new(move |event| {
    let emitted = pipeline.process(event);
    // Publish first emitted event as response
    emitted.into_iter().next()
}))?;

Benefits of Pipelines

  1. Declarative: Define processing logic as data, not code
  2. Composable: Chain multiple steps for complex logic
  3. Testable: Easy to test filter and action combinations
  4. Configurable: Pipelines can be loaded from config files
  5. Maintainable: Clear separation of concerns

Use Cases

  • Alert generation: Threshold violations, anomaly detection
  • Event routing: Route events to different topics based on content
  • Data enrichment: Add metadata, timestamps, computed fields
  • Priority adjustment: Escalate/de-escalate based on conditions
  • Logging and debugging: Conditional logging of events
  • Event transformation: Normalize event formats

Build docs developers (and LLMs) love