Inter-process communication between master and worker processes
Template Worker uses a sophisticated inter-process communication system to coordinate between the master process and multiple worker processes. This page documents the communication protocols, worker spawning, and the WorkerProcessHandle interface.
The WorkerProcessHandle is the master process’s interface to a worker process (src/worker/workerprocesshandle.rs:14-29):
pub struct WorkerProcessHandle { /// Mesophyll server handle to communicate with the worker process mesophyll_server: MesophyllServer, /// The id of the worker process, used for routing id: usize, /// The total number of processes in the pool total: usize, /// Kill message channel kill_msg_tx: UnboundedSender<()>,}
All workers implement the WorkerLike trait, providing a uniform interface (src/worker/workerlike.rs:8-45):
#[async_trait]pub trait WorkerLike: Send + Sync + 'static { /// Returns the worker's ID fn id(&self) -> usize; /// Runs a script with the given code and event async fn run_script(&self, id: Id, name: String, code: String, event: CreateEvent) -> Result<KhronosValue, crate::Error>; /// Kill the worker async fn kill(&self) -> Result<(), crate::Error>; /// Dispatch an event to the templates managed by this worker async fn dispatch_event(&self, id: Id, event: CreateEvent) -> Result<KhronosValue, crate::Error>; /// Dispatch an event without waiting for the result fn dispatch_event_nowait(&self, id: Id, event: CreateEvent) -> Result<(), crate::Error>; /// Drop a tenant from the worker async fn drop_tenant(&self, id: Id) -> Result<(), crate::Error>;}
loop { // Get authentication token let Some(meso_token) = self.mesophyll_server.get_token_for_worker(self.id) else { log::error!("No ident found for worker process with ID: {}", self.id); return; }; // Check failure threshold if consecutive_failures >= Self::MAX_CONSECUTIVE_FAILURES_BEFORE_CRASH { log::error!("Worker process with ID: {} has failed {} times in a row, crashing", self.id, consecutive_failures); std::process::abort(); } let sleep_duration = Duration::from_secs(3 * std::cmp::min(failed_attempts, 5)); // Spawn worker process let mut child = match command.spawn() { Ok(process) => process, Err(e) => { log::error!("Failed to spawn worker process: {}", e); failed_attempts += 1; consecutive_failures += 1; tokio::time::sleep(sleep_duration).await; continue; } }; log::info!("Spawned worker process with ID: {} and pid {:?}", self.id, child.id()); failed_attempts = 0; consecutive_failures = 0; // Wait for process or kill signal tokio::select! { resp = child.wait() => { match resp { Ok(status) => { log::warn!("Worker process with ID: {} exited with status: {}", self.id, status); }, Err(e) => { log::error!("Failed to wait for worker process with ID: {}: {}", self.id, e); } } } _ = kill_msg_rx.recv() => { log::info!("Received kill message for worker process with ID: {}", self.id); child.kill().await?; return; } }}
The master dispatches events to workers via Mesophyll:
impl WorkerLike for WorkerProcessHandle { async fn dispatch_event(&self, id: Id, event: CreateEvent) -> Result<KhronosValue, crate::Error> { let r = self.mesophyll_server.get_connection(self.id) .ok_or_else(|| format!("No Mesophyll connection found for worker process with ID: {}", self.id))?; r.dispatch_event(id, event).await }}
Flow (src/worker/workerprocesshandle.rs:160-164):
Master calls dispatch_event() on the WorkerProcessHandle
Handle retrieves the Mesophyll connection for the worker
Mesophyll sends a DispatchEvent message over WebSocket
Worker receives the message and processes it
Worker sends back a DispatchResponse with the result
Master receives the response and returns it to the caller
fn dispatch_event_nowait(&self, id: Id, event: CreateEvent) -> Result<(), crate::Error> { let r = self.mesophyll_server.get_connection(self.id) .ok_or_else(|| format!("No Mesophyll connection found for worker process with ID: {}", self.id))?; r.dispatch_event_nowait(id, event)}
This is more efficient when you don’t need to wait for execution results.
Thread-based worker for development and testing (src/worker/workerthread.rs:38-158):
pub struct WorkerThread { /// The tx channel for sending messages to the worker thread tx: UnboundedSender<WorkerThreadMessage>, /// The id of the worker thread, used for routing id: usize,}
let r = self.mesophyll_server.get_connection(self.id) .ok_or_else(|| format!("No Mesophyll connection found for worker process with ID: {}", self.id))?;
The Template Worker exposes several HTTP endpoints for internal management and monitoring. These endpoints are secured with internal authentication and should only be accessible to trusted services.
This endpoint is used by internal services and does not check if the bot is in the guild. It’s designed for administrative operations and system integrations.
Internal endpoints require the InternalEndpoint extractor which validates:
Authorization header format: Must be Authorization: Internal {token}
User ID extraction: Token contains the user ID of the caller
Source verification: Requests should only come from trusted internal services
Implementation (src/api/extractors.rs):
pub struct InternalEndpoint { pub user_id: String,}impl InternalEndpoint { // Validates internal authentication // Extracts user_id from the authorization token}
The internal API is designed for service-to-service communication within the Anti-Raid infrastructure. It should not be exposed to the public internet.