Skip to main content

Overview

The Pipeline is the high-level packet processing engine that matches packets against rules and applies configured transforms. It manages flow state, statistics, and supports hot-reloading configuration.

Pipeline

Constructor

pub fn new(config: Config, stats: Arc<Stats>) -> Result<Self>
Creates a new pipeline with the given configuration and statistics collector.
config
Config
required
Configuration with rules and transform parameters
stats
Arc<Stats>
required
Shared statistics collector for monitoring
Pipeline
Result<Pipeline>
New pipeline instance or error if configuration is invalid
Example:
use turkey_dpi_engine::{Config, Pipeline, Stats};
use std::sync::Arc;

let config = Config::load_from_file("config.toml")?;
let stats = Arc::new(Stats::new());
let pipeline = Pipeline::new(config, stats)?;

Methods

process

pub fn process(&self, key: FlowKey, data: BytesMut) -> Result<PipelineOutput>
Processes a packet through the pipeline, matching rules and applying transforms.
key
FlowKey
required
Flow identifier (5-tuple: src/dst IP/port + protocol)
data
BytesMut
required
Packet payload to process
PipelineOutput
Result<PipelineOutput>
Processed packet(s) with metadata and delays
Example:
use turkey_dpi_engine::{FlowKey, Protocol};
use bytes::BytesMut;
use std::net::IpAddr;

let key = FlowKey::new(
    "192.168.1.100".parse()?,
    "1.1.1.1".parse()?,
    54321,
    443,
    Protocol::Tcp,
);

let packet_data = BytesMut::from(&b"TLS handshake..."[..]);
let output = pipeline.process(key, packet_data)?;

if !output.dropped {
    // Send primary packet
    if let Some(primary) = output.primary {
        send_packet(&primary);
    }
    
    // Send additional packets (fragments, decoys)
    for packet in output.additional {
        send_packet(&packet);
    }
    
    // Apply delay if requested
    if let Some(delay) = output.delay {
        thread::sleep(delay);
    }
}

reload_config

pub fn reload_config(&self, new_config: Config) -> Result<()>
Hot-reloads configuration without dropping connections.
new_config
Config
required
New configuration to apply
Example:
// Watch for config file changes
let new_config = Config::load_from_file("config.toml")?;
pipeline.reload_config(new_config)?;
println!("Configuration reloaded");

config

pub fn config(&self) -> Arc<Config>
Returns a reference to the current configuration. Example:
let current_config = pipeline.config();
println!("Engine enabled: {}", current_config.global.enabled);
println!("Active rules: {}", current_config.rules.len());

flow_cache

pub fn flow_cache(&self) -> &FlowCache
Returns reference to the flow cache for inspection. Example:
let cache = pipeline.flow_cache();
let stats = cache.stats();
println!("Tracked flows: {}/{}", stats.size, stats.max_size);
println!("Hit rate: {:.2}%", stats.hit_rate() * 100.0);

stats

pub fn stats(&self) -> &Arc<Stats>
Returns reference to the statistics collector. Example:
let stats = pipeline.stats();
let snapshot = stats.snapshot();
println!("Packets in: {}", snapshot.packets_in);
println!("Packets out: {}", snapshot.packets_out);
println!("Matches: {}", snapshot.packets_matched);

cleanup

pub fn cleanup(&self) -> usize
Removes expired flows from cache. Returns number of flows evicted. Example:
// Call periodically (e.g., every 60 seconds)
let evicted = pipeline.cleanup();
if evicted > 0 {
    println!("Cleaned up {} expired flows", evicted);
}

FlowKey

Structure

pub struct FlowKey {
    pub src_ip: IpAddr,
    pub dst_ip: IpAddr,
    pub src_port: u16,
    pub dst_port: u16,
    pub protocol: Protocol,
}

Constructor

pub fn new(
    src_ip: IpAddr,
    dst_ip: IpAddr,
    src_port: u16,
    dst_port: u16,
    protocol: Protocol,
) -> Self

Fields

src_ip
IpAddr
required
Source IP address (IPv4 or IPv6)
dst_ip
IpAddr
required
Destination IP address
src_port
u16
required
Source port number
dst_port
u16
required
Destination port number
protocol
Protocol
required
Protocol: Tcp, Udp, or Icmp

Methods

pub fn reverse(&self) -> Self  // Swap src/dst
pub fn is_tcp(&self) -> bool    // Check if TCP
pub fn is_udp(&self) -> bool    // Check if UDP
Example:
use turkey_dpi_engine::{FlowKey, Protocol};
use std::net::IpAddr;

let key = FlowKey::new(
    "192.168.1.100".parse()?,
    "8.8.8.8".parse()?,
    54321,
    443,
    Protocol::Tcp,
);

assert!(key.is_tcp());

let reverse = key.reverse();
assert_eq!(reverse.src_ip, key.dst_ip);
assert_eq!(reverse.dst_ip, key.src_ip);

PipelineOutput

Structure

pub struct PipelineOutput {
    pub primary: Option<BytesMut>,
    pub additional: Vec<BytesMut>,
    pub delay: Option<Duration>,
    pub dropped: bool,
    pub matched_rule: Option<String>,
}

Fields

primary
Option<BytesMut>
Primary packet to send (None if dropped)
additional
Vec<BytesMut>
Additional packets (fragments, decoys, etc.)
delay
Option<Duration>
Requested delay before sending next packet
dropped
bool
Whether packet was dropped by a transform
matched_rule
Option<String>
Name of the rule that matched this packet

Methods

pub fn dropped() -> Self             // Create a dropped packet result
pub fn passthrough(data: BytesMut) -> Self  // Create passthrough result
pub fn all_packets(self) -> Vec<BytesMut>   // Get all packets (primary + additional)
Example:
let output = pipeline.process(key, data)?;

if output.dropped {
    println!("Packet dropped");
    return Ok(());
}

if let Some(rule) = output.matched_rule {
    println!("Matched rule: {}", rule);
}

// Send all packets
for packet in output.all_packets() {
    send_packet(&packet)?;
}

FlowState

Structure

pub struct FlowState {
    pub key: FlowKey,
    pub created_at: Instant,
    pub last_seen: Instant,
    pub packet_count: u64,
    pub byte_count: u64,
    pub matched_rule: Option<String>,
    pub direction: FlowDirection,
    pub tcp_state: Option<TcpFlowState>,
    pub transform_state: TransformState,
}
Automatically tracked by the pipeline for each flow.
packet_count
u64
Total packets seen in this flow
byte_count
u64
Total bytes seen in this flow
matched_rule
Option<String>
Last rule that matched this flow

Methods

pub fn update(&mut self, size: usize)           // Update counters
pub fn is_expired(&self, timeout: Duration) -> bool  // Check if expired
pub fn age(&self) -> Duration                    // Time since creation
pub fn idle_time(&self) -> Duration              // Time since last packet

Complete Example

use turkey_dpi_engine::{Config, Pipeline, FlowKey, Protocol, Stats};
use bytes::BytesMut;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize logging
    tracing_subscriber::fmt::init();
    
    // Load configuration
    let config = Config::load_from_file("config.toml")?;
    
    // Create statistics collector
    let stats = Arc::new(Stats::new());
    
    // Create pipeline
    let pipeline = Pipeline::new(config, stats.clone())?;
    
    // Simulate packet processing
    let key = FlowKey::new(
        "192.168.1.100".parse()?,
        "104.244.42.1".parse()?,  // Twitter IP
        54321,
        443,
        Protocol::Tcp,
    );
    
    // TLS ClientHello packet
    let tls_packet = BytesMut::from(&b"\x16\x03\x01..."[..]);
    
    // Process through pipeline
    let output = pipeline.process(key, tls_packet)?;
    
    if output.dropped {
        println!("Packet was dropped");
        return Ok(());
    }
    
    if let Some(rule_name) = output.matched_rule {
        println!("Matched rule: {}", rule_name);
    }
    
    // Handle primary packet
    if let Some(primary) = output.primary {
        println!("Sending primary packet: {} bytes", primary.len());
        send_packet(&primary)?;
    }
    
    // Handle additional packets (fragments, decoys)
    for (i, packet) in output.additional.iter().enumerate() {
        println!("Sending additional packet {}: {} bytes", i, packet.len());
        send_packet(packet)?;
    }
    
    // Apply requested delay
    if let Some(delay) = output.delay {
        println!("Applying delay: {:?}", delay);
        thread::sleep(delay);
    }
    
    // Print statistics
    let snapshot = stats.snapshot();
    println!("\nStatistics:");
    println!("  Packets in:  {}", snapshot.packets_in);
    println!("  Packets out: {}", snapshot.packets_out);
    println!("  Matched:     {}", snapshot.packets_matched);
    
    // Print flow cache statistics
    let cache_stats = pipeline.flow_cache().stats();
    println!("\nFlow Cache:");
    println!("  Active flows: {}/{}", cache_stats.size, cache_stats.max_size);
    println!("  Hit rate:     {:.2}%", cache_stats.hit_rate() * 100.0);
    
    // Cleanup expired flows
    let evicted = pipeline.cleanup();
    if evicted > 0 {
        println!("\nCleaned up {} expired flows", evicted);
    }
    
    Ok(())
}

fn send_packet(data: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
    // Implementation: send via raw socket, packet filter, etc.
    Ok(())
}

Background Maintenance

use std::time::Duration;

// Spawn background thread for maintenance
let pipeline = Arc::new(pipeline);
let maintenance_pipeline = pipeline.clone();

thread::spawn(move || {
    loop {
        thread::sleep(Duration::from_secs(60));
        
        // Clean up expired flows
        let evicted = maintenance_pipeline.cleanup();
        if evicted > 0 {
            println!("[Maintenance] Evicted {} flows", evicted);
        }
        
        // Print statistics
        let snapshot = maintenance_pipeline.stats().snapshot();
        println!("[Maintenance] Packets: in={}, out={}, matched={}",
            snapshot.packets_in,
            snapshot.packets_out,
            snapshot.packets_matched,
        );
    }
});

Hot-Reloading Configuration

use notify::{Watcher, RecursiveMode, watcher};
use std::sync::mpsc::channel;
use std::time::Duration;

let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_secs(2))?;
watcher.watch("config.toml", RecursiveMode::NonRecursive)?;

let reload_pipeline = pipeline.clone();
thread::spawn(move || {
    for event in rx {
        match event {
            Ok(_) => {
                println!("Config file changed, reloading...");
                match Config::load_from_file("config.toml") {
                    Ok(new_config) => {
                        if let Err(e) = reload_pipeline.reload_config(new_config) {
                            eprintln!("Failed to reload config: {}", e);
                        } else {
                            println!("Configuration reloaded successfully");
                        }
                    }
                    Err(e) => eprintln!("Failed to load config: {}", e),
                }
            }
            Err(e) => eprintln!("Watch error: {}", e),
        }
    }
});

Error Handling

use turkey_dpi_engine::EngineError;

match pipeline.process(key, data) {
    Ok(output) => {
        // Handle output
    }
    Err(EngineError::FlowLimitExceeded { max, current }) => {
        eprintln!("Flow limit exceeded: {}/{}", current, max);
        // Maybe clean up flows or increase limit
    }
    Err(EngineError::Transform { transform, message }) => {
        eprintln!("Transform '{}' failed: {}", transform, message);
        // Log and continue
    }
    Err(e) => {
        eprintln!("Pipeline error: {}", e);
        return Err(e.into());
    }
}

See Also

  • Config - Configuration format and options
  • BypassEngine - Simpler alternative for basic use cases

Build docs developers (and LLMs) love