Overview
The Pipeline is the high-level packet processing engine that matches packets against rules and applies configured transforms. It manages flow state, statistics, and supports hot-reloading configuration.
Pipeline
Constructor
pub fn new(config: Config, stats: Arc<Stats>) -> Result<Self>
Creates a new pipeline with the given configuration and statistics collector.
Configuration with rules and transform parameters
Shared statistics collector for monitoring
New pipeline instance or error if configuration is invalid
Example:
use turkey_dpi_engine::{Config, Pipeline, Stats};
use std::sync::Arc;
let config = Config::load_from_file("config.toml")?;
let stats = Arc::new(Stats::new());
let pipeline = Pipeline::new(config, stats)?;
Methods
process
pub fn process(&self, key: FlowKey, data: BytesMut) -> Result<PipelineOutput>
Processes a packet through the pipeline, matching rules and applying transforms.
Flow identifier (5-tuple: src/dst IP/port + protocol)
Packet payload to process
Processed packet(s) with metadata and delays
Example:
use turkey_dpi_engine::{FlowKey, Protocol};
use bytes::BytesMut;
use std::net::IpAddr;
let key = FlowKey::new(
"192.168.1.100".parse()?,
"1.1.1.1".parse()?,
54321,
443,
Protocol::Tcp,
);
let packet_data = BytesMut::from(&b"TLS handshake..."[..]);
let output = pipeline.process(key, packet_data)?;
if !output.dropped {
// Send primary packet
if let Some(primary) = output.primary {
send_packet(&primary);
}
// Send additional packets (fragments, decoys)
for packet in output.additional {
send_packet(&packet);
}
// Apply delay if requested
if let Some(delay) = output.delay {
thread::sleep(delay);
}
}
reload_config
pub fn reload_config(&self, new_config: Config) -> Result<()>
Hot-reloads configuration without dropping connections.
New configuration to apply
Example:
// Watch for config file changes
let new_config = Config::load_from_file("config.toml")?;
pipeline.reload_config(new_config)?;
println!("Configuration reloaded");
config
pub fn config(&self) -> Arc<Config>
Returns a reference to the current configuration.
Example:
let current_config = pipeline.config();
println!("Engine enabled: {}", current_config.global.enabled);
println!("Active rules: {}", current_config.rules.len());
flow_cache
pub fn flow_cache(&self) -> &FlowCache
Returns reference to the flow cache for inspection.
Example:
let cache = pipeline.flow_cache();
let stats = cache.stats();
println!("Tracked flows: {}/{}", stats.size, stats.max_size);
println!("Hit rate: {:.2}%", stats.hit_rate() * 100.0);
stats
pub fn stats(&self) -> &Arc<Stats>
Returns reference to the statistics collector.
Example:
let stats = pipeline.stats();
let snapshot = stats.snapshot();
println!("Packets in: {}", snapshot.packets_in);
println!("Packets out: {}", snapshot.packets_out);
println!("Matches: {}", snapshot.packets_matched);
cleanup
pub fn cleanup(&self) -> usize
Removes expired flows from cache. Returns number of flows evicted.
Example:
// Call periodically (e.g., every 60 seconds)
let evicted = pipeline.cleanup();
if evicted > 0 {
println!("Cleaned up {} expired flows", evicted);
}
FlowKey
Structure
pub struct FlowKey {
pub src_ip: IpAddr,
pub dst_ip: IpAddr,
pub src_port: u16,
pub dst_port: u16,
pub protocol: Protocol,
}
Constructor
pub fn new(
src_ip: IpAddr,
dst_ip: IpAddr,
src_port: u16,
dst_port: u16,
protocol: Protocol,
) -> Self
Fields
Source IP address (IPv4 or IPv6)
Protocol: Tcp, Udp, or Icmp
Methods
pub fn reverse(&self) -> Self // Swap src/dst
pub fn is_tcp(&self) -> bool // Check if TCP
pub fn is_udp(&self) -> bool // Check if UDP
Example:
use turkey_dpi_engine::{FlowKey, Protocol};
use std::net::IpAddr;
let key = FlowKey::new(
"192.168.1.100".parse()?,
"8.8.8.8".parse()?,
54321,
443,
Protocol::Tcp,
);
assert!(key.is_tcp());
let reverse = key.reverse();
assert_eq!(reverse.src_ip, key.dst_ip);
assert_eq!(reverse.dst_ip, key.src_ip);
PipelineOutput
Structure
pub struct PipelineOutput {
pub primary: Option<BytesMut>,
pub additional: Vec<BytesMut>,
pub delay: Option<Duration>,
pub dropped: bool,
pub matched_rule: Option<String>,
}
Fields
Primary packet to send (None if dropped)
Additional packets (fragments, decoys, etc.)
Requested delay before sending next packet
Whether packet was dropped by a transform
Name of the rule that matched this packet
Methods
pub fn dropped() -> Self // Create a dropped packet result
pub fn passthrough(data: BytesMut) -> Self // Create passthrough result
pub fn all_packets(self) -> Vec<BytesMut> // Get all packets (primary + additional)
Example:
let output = pipeline.process(key, data)?;
if output.dropped {
println!("Packet dropped");
return Ok(());
}
if let Some(rule) = output.matched_rule {
println!("Matched rule: {}", rule);
}
// Send all packets
for packet in output.all_packets() {
send_packet(&packet)?;
}
FlowState
Structure
pub struct FlowState {
pub key: FlowKey,
pub created_at: Instant,
pub last_seen: Instant,
pub packet_count: u64,
pub byte_count: u64,
pub matched_rule: Option<String>,
pub direction: FlowDirection,
pub tcp_state: Option<TcpFlowState>,
pub transform_state: TransformState,
}
Automatically tracked by the pipeline for each flow.
Total packets seen in this flow
Total bytes seen in this flow
Last rule that matched this flow
Methods
pub fn update(&mut self, size: usize) // Update counters
pub fn is_expired(&self, timeout: Duration) -> bool // Check if expired
pub fn age(&self) -> Duration // Time since creation
pub fn idle_time(&self) -> Duration // Time since last packet
Complete Example
use turkey_dpi_engine::{Config, Pipeline, FlowKey, Protocol, Stats};
use bytes::BytesMut;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
tracing_subscriber::fmt::init();
// Load configuration
let config = Config::load_from_file("config.toml")?;
// Create statistics collector
let stats = Arc::new(Stats::new());
// Create pipeline
let pipeline = Pipeline::new(config, stats.clone())?;
// Simulate packet processing
let key = FlowKey::new(
"192.168.1.100".parse()?,
"104.244.42.1".parse()?, // Twitter IP
54321,
443,
Protocol::Tcp,
);
// TLS ClientHello packet
let tls_packet = BytesMut::from(&b"\x16\x03\x01..."[..]);
// Process through pipeline
let output = pipeline.process(key, tls_packet)?;
if output.dropped {
println!("Packet was dropped");
return Ok(());
}
if let Some(rule_name) = output.matched_rule {
println!("Matched rule: {}", rule_name);
}
// Handle primary packet
if let Some(primary) = output.primary {
println!("Sending primary packet: {} bytes", primary.len());
send_packet(&primary)?;
}
// Handle additional packets (fragments, decoys)
for (i, packet) in output.additional.iter().enumerate() {
println!("Sending additional packet {}: {} bytes", i, packet.len());
send_packet(packet)?;
}
// Apply requested delay
if let Some(delay) = output.delay {
println!("Applying delay: {:?}", delay);
thread::sleep(delay);
}
// Print statistics
let snapshot = stats.snapshot();
println!("\nStatistics:");
println!(" Packets in: {}", snapshot.packets_in);
println!(" Packets out: {}", snapshot.packets_out);
println!(" Matched: {}", snapshot.packets_matched);
// Print flow cache statistics
let cache_stats = pipeline.flow_cache().stats();
println!("\nFlow Cache:");
println!(" Active flows: {}/{}", cache_stats.size, cache_stats.max_size);
println!(" Hit rate: {:.2}%", cache_stats.hit_rate() * 100.0);
// Cleanup expired flows
let evicted = pipeline.cleanup();
if evicted > 0 {
println!("\nCleaned up {} expired flows", evicted);
}
Ok(())
}
fn send_packet(data: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
// Implementation: send via raw socket, packet filter, etc.
Ok(())
}
Background Maintenance
use std::time::Duration;
// Spawn background thread for maintenance
let pipeline = Arc::new(pipeline);
let maintenance_pipeline = pipeline.clone();
thread::spawn(move || {
loop {
thread::sleep(Duration::from_secs(60));
// Clean up expired flows
let evicted = maintenance_pipeline.cleanup();
if evicted > 0 {
println!("[Maintenance] Evicted {} flows", evicted);
}
// Print statistics
let snapshot = maintenance_pipeline.stats().snapshot();
println!("[Maintenance] Packets: in={}, out={}, matched={}",
snapshot.packets_in,
snapshot.packets_out,
snapshot.packets_matched,
);
}
});
Hot-Reloading Configuration
use notify::{Watcher, RecursiveMode, watcher};
use std::sync::mpsc::channel;
use std::time::Duration;
let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_secs(2))?;
watcher.watch("config.toml", RecursiveMode::NonRecursive)?;
let reload_pipeline = pipeline.clone();
thread::spawn(move || {
for event in rx {
match event {
Ok(_) => {
println!("Config file changed, reloading...");
match Config::load_from_file("config.toml") {
Ok(new_config) => {
if let Err(e) = reload_pipeline.reload_config(new_config) {
eprintln!("Failed to reload config: {}", e);
} else {
println!("Configuration reloaded successfully");
}
}
Err(e) => eprintln!("Failed to load config: {}", e),
}
}
Err(e) => eprintln!("Watch error: {}", e),
}
}
});
Error Handling
use turkey_dpi_engine::EngineError;
match pipeline.process(key, data) {
Ok(output) => {
// Handle output
}
Err(EngineError::FlowLimitExceeded { max, current }) => {
eprintln!("Flow limit exceeded: {}/{}", current, max);
// Maybe clean up flows or increase limit
}
Err(EngineError::Transform { transform, message }) => {
eprintln!("Transform '{}' failed: {}", transform, message);
// Log and continue
}
Err(e) => {
eprintln!("Pipeline error: {}", e);
return Err(e.into());
}
}
See Also
- Config - Configuration format and options
- BypassEngine - Simpler alternative for basic use cases