Skip to main content
Template Worker uses a sophisticated inter-process communication system to coordinate between the master process and multiple worker processes. This page documents the communication protocols, worker spawning, and the WorkerProcessHandle interface.

Overview

The worker communication system enables:
  • Process isolation: Each worker runs in a separate OS process
  • Event dispatching: Master can dispatch events to specific workers
  • Load distribution: Multiple workers share the workload based on tenant routing
  • Fault tolerance: Automatic worker restart on failures
  • Coordinated state: Workers maintain synchronized state with the master

Architecture

WorkerProcessHandle

The WorkerProcessHandle is the master process’s interface to a worker process (src/worker/workerprocesshandle.rs:14-29):
pub struct WorkerProcessHandle {
    /// Mesophyll server handle to communicate with the worker process
    mesophyll_server: MesophyllServer,

    /// The id of the worker process, used for routing
    id: usize,
    
    /// The total number of processes in the pool
    total: usize,

    /// Kill message channel
    kill_msg_tx: UnboundedSender<()>,
}

WorkerLike Trait

All workers implement the WorkerLike trait, providing a uniform interface (src/worker/workerlike.rs:8-45):
#[async_trait]
pub trait WorkerLike: Send + Sync + 'static {
    /// Returns the worker's ID
    fn id(&self) -> usize;

    /// Runs a script with the given code and event
    async fn run_script(&self, id: Id, name: String, code: String, event: CreateEvent) 
        -> Result<KhronosValue, crate::Error>;

    /// Kill the worker
    async fn kill(&self) -> Result<(), crate::Error>;

    /// Dispatch an event to the templates managed by this worker
    async fn dispatch_event(&self, id: Id, event: CreateEvent) 
        -> Result<KhronosValue, crate::Error>;

    /// Dispatch an event without waiting for the result
    fn dispatch_event_nowait(&self, id: Id, event: CreateEvent) 
        -> Result<(), crate::Error>;

    /// Drop a tenant from the worker
    async fn drop_tenant(&self, id: Id) -> Result<(), crate::Error>;
}

Worker Spawning

The master process spawns worker processes using the current executable:

Spawn Process

From src/worker/workerprocesshandle.rs:74-109:
let current_exe = std::env::current_exe()?;

let mut command = Command::new(current_exe);

command.arg("--worker-type");
command.arg("processpoolworker");
command.arg("--worker-id");
command.arg(self.id.to_string());
command.arg("--process-workers");
command.arg(self.total.to_string());
command.env("MESOPHYLL_CLIENT_TOKEN", meso_token);
command.kill_on_drop(true);

let mut child = command.spawn()?;

Command-Line Arguments

Workers are spawned with these arguments:
  • --worker-type processpoolworker - Identifies this as a worker process
  • --worker-id N - Unique worker ID (0 to total-1)
  • --process-workers N - Total number of workers in the pool

Environment Variables

  • MESOPHYLL_CLIENT_TOKEN - Authentication token for connecting to Mesophyll server

Worker Lifecycle

Initialization

  1. Master spawns worker process with arguments (src/worker/workerprocesshandle.rs:86-109)
  2. Worker reads environment variables and connects to Mesophyll (src/mesophyll/client.rs:20-40)
  3. Worker establishes WebSocket connection for bidirectional communication
  4. Worker sends heartbeats to indicate health

Automatic Restart

Workers automatically restart on failure with exponential backoff:
const MAX_CONSECUTIVE_FAILURES_BEFORE_CRASH: usize = 10;

let sleep_duration = Duration::from_secs(3 * std::cmp::min(failed_attempts, 5));
From src/worker/workerprocesshandle.rs:53-136:
loop {
    // Get authentication token
    let Some(meso_token) = self.mesophyll_server.get_token_for_worker(self.id) else {
        log::error!("No ident found for worker process with ID: {}", self.id);
        return;
    };

    // Check failure threshold
    if consecutive_failures >= Self::MAX_CONSECUTIVE_FAILURES_BEFORE_CRASH {
        log::error!("Worker process with ID: {} has failed {} times in a row, crashing", 
                   self.id, consecutive_failures);
        std::process::abort();
    }

    let sleep_duration = Duration::from_secs(3 * std::cmp::min(failed_attempts, 5));

    // Spawn worker process
    let mut child = match command.spawn() {
        Ok(process) => process,
        Err(e) => {
            log::error!("Failed to spawn worker process: {}", e);
            failed_attempts += 1;
            consecutive_failures += 1;
            tokio::time::sleep(sleep_duration).await;
            continue;
        }
    };

    log::info!("Spawned worker process with ID: {} and pid {:?}", self.id, child.id());
    failed_attempts = 0;
    consecutive_failures = 0;

    // Wait for process or kill signal
    tokio::select! {
        resp = child.wait() => {
            match resp {
                Ok(status) => {
                    log::warn!("Worker process with ID: {} exited with status: {}", self.id, status);
                },
                Err(e) => {
                    log::error!("Failed to wait for worker process with ID: {}: {}", self.id, e);
                }
            }
        }
        _ = kill_msg_rx.recv() => {
            log::info!("Received kill message for worker process with ID: {}", self.id);
            child.kill().await?;
            return;
        }
    }
}

Shutdown

Workers can be gracefully shut down:
async fn kill(&self) -> Result<(), crate::Error> {
    self.kill_msg_tx.send(())
        .map_err(|e| format!("Failed to send kill message: {}", e).into())
}

Communication Protocols

Event Dispatching

The master dispatches events to workers via Mesophyll:
impl WorkerLike for WorkerProcessHandle {
    async fn dispatch_event(&self, id: Id, event: CreateEvent) -> Result<KhronosValue, crate::Error> {
        let r = self.mesophyll_server.get_connection(self.id)
            .ok_or_else(|| format!("No Mesophyll connection found for worker process with ID: {}", self.id))?;
        r.dispatch_event(id, event).await
    }
}
Flow (src/worker/workerprocesshandle.rs:160-164):
  1. Master calls dispatch_event() on the WorkerProcessHandle
  2. Handle retrieves the Mesophyll connection for the worker
  3. Mesophyll sends a DispatchEvent message over WebSocket
  4. Worker receives the message and processes it
  5. Worker sends back a DispatchResponse with the result
  6. Master receives the response and returns it to the caller

Fire-and-Forget Dispatch

For scenarios where a response is not needed:
fn dispatch_event_nowait(&self, id: Id, event: CreateEvent) -> Result<(), crate::Error> {
    let r = self.mesophyll_server.get_connection(self.id)
        .ok_or_else(|| format!("No Mesophyll connection found for worker process with ID: {}", self.id))?;
    r.dispatch_event_nowait(id, event)
}
This is more efficient when you don’t need to wait for execution results.

Script Execution

For internal tooling and debugging, arbitrary Luau code can be executed:
async fn run_script(&self, id: Id, name: String, code: String, event: CreateEvent) 
    -> Result<KhronosValue, crate::Error> {
    let r = self.mesophyll_server.get_connection(self.id)
        .ok_or_else(|| format!("No Mesophyll connection found: {}", self.id))?;
    r.run_script(id, name, code, event).await
}
Used by the internal API endpoint /i/dispatch-event/{guild_id} (src/api/internal_api.rs:32-55).

Tenant Management

Workers can be instructed to drop a tenant (remove its VM):
async fn drop_tenant(&self, id: Id) -> Result<(), crate::Error> {
    let r = self.mesophyll_server.get_connection(self.id)
        .ok_or_else(|| format!("No Mesophyll connection found: {}", self.id))?;
    r.drop_tenant(id).await
}
This is useful for:
  • Freeing memory when a tenant is inactive
  • Forcing VM reload after configuration changes
  • Handling tenant migration between workers

Worker Types

Template Worker supports multiple worker implementations:

WorkerProcessHandle

Process-based worker for production deployments:
  • Full process isolation
  • Automatic restart on crash
  • Communication via Mesophyll WebSockets

WorkerThread

Thread-based worker for development and testing (src/worker/workerthread.rs:38-158):
pub struct WorkerThread {
    /// The tx channel for sending messages to the worker thread
    tx: UnboundedSender<WorkerThreadMessage>,
    /// The id of the worker thread, used for routing
    id: usize,
}
Advantages:
  • Faster startup time
  • Lower memory overhead
  • Easier debugging
  • No IPC overhead
Disadvantages:
  • No process isolation
  • Crash affects entire master process

WorkerThread Implementation

For comparison, here’s how WorkerThread implements similar functionality:

Message Passing

WorkerThread uses Tokio channels for communication:
enum WorkerThreadMessage {
    Kill {
        tx: OneShotSender<Result<(), crate::Error>>,
    },
    DropTenant {
        id: Id,
        tx: OneShotSender<Result<(), crate::Error>>,
    },
    RunScript {
        id: Id,
        name: String,
        code: String,
        event: CreateEvent,
        tx: OneShotSender<Result<KhronosValue, crate::Error>>,
    },
    DispatchEvent {
        id: Id,
        event: CreateEvent,
        tx: Option<OneShotSender<Result<KhronosValue, crate::Error>>>,
    },
}

Thread Creation

From src/worker/workerthread.rs:62-112:
std::thread::Builder::new()
    .name(format!("lua-vm-threadpool-{id}"))
    .stack_size(MAX_VM_THREAD_STACK_SIZE)
    .spawn(move || {
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build_local(tokio::runtime::LocalOptions::default())?;

        rt.block_on(async move {
            let state = WorkerState::new(state, id).await?;
            let worker = Worker::new(state);

            // Listen to messages and handle them
            while let Some(msg) = rx.recv().await {
                match msg {
                    WorkerThreadMessage::Kill { tx } => {
                        log::info!("Killing worker thread with ID: {}", id);
                        let _ = tx.send(Ok(()));
                        return;
                    }
                    WorkerThreadMessage::DispatchEvent { id, event, tx } => {
                        let res = worker.dispatch.dispatch_event(id, event).await;
                        if let Some(tx) = tx {
                            let _ = tx.send(res);
                        }
                    }
                    WorkerThreadMessage::RunScript { id, name, code, event, tx } => {
                        let res = worker.dispatch.run_script(id, name, code, event).await;
                        let _ = tx.send(res);
                    }
                    WorkerThreadMessage::DropTenant { id, tx } => {
                        let res = worker.vm_manager.remove_vm_for(id);
                        let _ = tx.send(res);
                    }
                }
            }
        });
    })?;
Key differences from WorkerProcessHandle:
  • Direct function calls instead of WebSocket messages
  • Same-process communication via channels
  • No reconnection logic needed
  • Immediate message delivery

Worker Pooling

Both WorkerProcessHandle and WorkerThread implement the Poolable trait for use in worker pools:
pub trait Poolable: WorkerLike {
    type ExtState;

    fn new(id: usize, total: usize, ext_state: &Self::ExtState) -> Result<Self, crate::Error>
    where Self: Sized;
}
This enables:
  • Creating multiple workers with sequential IDs
  • Load distribution based on tenant ID
  • Horizontal scaling by adding more workers

WorkerProcessHandle Pooling

From src/worker/workerprocesshandle.rs:193-201:
impl Poolable for WorkerProcessHandle {
    type ExtState = WorkerProcessHandleCreateOpts;

    fn new(id: usize, total: usize, ext_state: &Self::ExtState) -> Result<Self, crate::Error>
    where Self: Sized 
    {
        Self::new(id, total, ext_state.mesophyll_server.clone())
    }
}

pub struct WorkerProcessHandleCreateOpts {
    pub(super) mesophyll_server: MesophyllServer,
}

WorkerThread Pooling

From src/worker/workerthread.rs:161-168:
impl Poolable for WorkerThread {
    type ExtState = CreateWorkerState;
    
    fn new(id: usize, _total: usize, ext_state: &Self::ExtState) -> Result<Self, crate::Error>
    where Self: Sized {
        Self::new(ext_state.clone(), id)
    }
}

Internal API

The master process exposes internal HTTP endpoints for worker management (src/api/internal_api.rs):

Dispatch Event

POST /i/dispatch-event/{guild_id}
Dispatches a custom event to a specific guild’s worker:
pub async fn dispatch_event(
    State(AppData { data, .. }): State<AppData>,
    InternalEndpoint { user_id }: InternalEndpoint,
    Path(guild_id): Path<GuildId>,
    Json(req): Json<PublicLuauExecute>,
) -> ApiResponse<KhronosValue> {
    let event = CreateEvent::new_khronos_value(req.name, Some(user_id.to_string()), req.data);
    
    let resp = data.worker.dispatch_event(
        Id::Guild(guild_id),
        event,
    ).await?;

    Ok(Json(resp))
}

Get Thread Count

GET /i/threads-count
Returns the number of active worker threads:
pub async fn get_threads_count(
    State(AppData { data, .. }): State<AppData>,
    InternalEndpoint { .. }: InternalEndpoint,
) -> ApiResponse<usize> {
    Ok(Json(data.worker.len()))
}

Guilds Exist

POST /i/guilds-exist
Checks which guilds exist in cache:
pub async fn guilds_exist(
    State(AppData { data, .. }): State<AppData>,
    InternalEndpoint { .. }: InternalEndpoint,
    Json(guilds): Json<Vec<GuildId>>,
) -> ApiResponse<Vec<u8>> {
    let guilds_exist = data.sandwich.has_guilds(&guilds).await?;
    Ok(Json(guilds_exist))
}
Returns a vector where 1 means the guild exists and 0 means it doesn’t.

Kill Worker

POST /i/kill-worker
Kills the worker pool (useful for debugging):
pub async fn kill_worker(
    State(AppData { data, .. }): State<AppData>,
    InternalEndpoint { .. }: InternalEndpoint,
) -> ApiResponse<()> {
    data.worker.kill().await?;
    Ok(Json(()))
}
Killing the worker pool will terminate all worker processes and cause service disruption. This endpoint is only for debugging purposes.

Error Handling

Connection Errors

When a Mesophyll connection is not available:
let r = self.mesophyll_server.get_connection(self.id)
    .ok_or_else(|| format!("No Mesophyll connection found for worker process with ID: {}", self.id))?;
This can happen when:
  • Worker process hasn’t connected yet
  • Worker process crashed and is restarting
  • Network issues prevent WebSocket connection

Spawn Failures

Worker spawn failures are logged and retried with exponential backoff:
let mut child = match command.spawn() {
    Ok(process) => process,
    Err(e) => {
        log::error!("Failed to spawn worker process: {}", e);
        failed_attempts += 1;
        consecutive_failures += 1;
        tokio::time::sleep(sleep_duration).await;
        continue;
    }
};
After 10 consecutive failures, the master process aborts to prevent infinite retry loops.

Worker Crashes

When a worker process exits unexpectedly:
tokio::select! {
    resp = child.wait() => {
        match resp {
            Ok(status) => {
                log::warn!("Worker process with ID: {} exited with status: {}", self.id, status);
            },
            Err(e) => {
                log::error!("Failed to wait for worker process with ID: {}: {}", self.id, e);
            }
        }
    }
}
The worker is automatically restarted in the next loop iteration.

Performance Considerations

Process vs Thread Workers

WorkerProcessHandle (Processes):
  • Pros: Process isolation, fault tolerance, resource limits
  • Cons: Higher memory usage, slower startup, IPC overhead
  • Use for: Production deployments
WorkerThread (Threads):
  • Pros: Fast startup, low overhead, easier debugging
  • Cons: No isolation, crash affects master, harder to resource-limit
  • Use for: Development and testing

Communication Overhead

  • WebSocket communication adds ~1-2ms latency per message
  • MessagePack serialization is efficient but still has overhead
  • Fire-and-forget dispatches avoid round-trip time
  • Connection pooling amortizes connection setup costs

Scaling

Worker count should be based on:
  • CPU core count
  • Memory availability
  • Expected concurrent load
  • Tenant distribution
Typical configuration: 1 worker per 2-4 CPU cores.

Best Practices

  1. Use fire-and-forget when possible: If you don’t need the result, use dispatch_event_nowait() to reduce latency
  2. Monitor connection health: Check Mesophyll connection status before critical operations
  3. Handle connection failures gracefully: Implement retry logic at the application level for important operations
  4. Set appropriate failure thresholds: Adjust MAX_CONSECUTIVE_FAILURES_BEFORE_CRASH based on your reliability requirements
  5. Use WorkerThread for development: Faster iteration and easier debugging
  6. Use WorkerProcessHandle for production: Better isolation and fault tolerance
  7. Monitor worker restarts: Frequent restarts indicate underlying issues that need investigation

Internal API Endpoints

The Template Worker exposes several HTTP endpoints for internal management and monitoring. These endpoints are secured with internal authentication and should only be accessible to trusted services.

Dispatch Event (Internal)

Dispatch an event directly to a guild, bypassing the standard Web* event name restriction. Endpoint: POST /i/dispatch-event/{guild_id}
Internal Dispatch
curl -X POST "http://localhost:60000/i/dispatch-event/123456789012345678" \
  -H "Authorization: Internal YOUR_INTERNAL_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "CustomInternalEvent",
    "data": {
      "type": "Text",
      "value": "Internal event data"
    }
  }'
guild_id
string
required
The Discord guild ID to dispatch the event to
name
string
required
Event name (Web* prefix restriction does not apply)
data
KhronosValue
required
Event data as a KhronosValue object
This endpoint is used by internal services and does not check if the bot is in the guild. It’s designed for administrative operations and system integrations.

Get Thread Count

Returns the number of active worker threads/processes. Endpoint: GET /i/threads-count
Get Thread Count
curl -X GET "http://localhost:60000/i/threads-count" \
  -H "Authorization: Internal YOUR_INTERNAL_TOKEN"
Response: Integer representing the number of active workers.
8
Useful for monitoring worker pool size and detecting if workers have crashed.

Check Guilds Existence

Check which guilds from a given list exist in the bot’s cache. Endpoint: POST /i/guilds-exist
Check Guilds
curl -X POST "http://localhost:60000/i/guilds-exist" \
  -H "Authorization: Internal YOUR_INTERNAL_TOKEN" \
  -H "Content-Type: application/json" \
  -d '["123456789012345678", "987654321098765432"]'
Request Body: Array of guild ID strings Response: Array of integers where 0 = guild not in cache, 1 = guild exists
[1, 0]
This endpoint is used by Sandwich integration to quickly check bot presence across multiple guilds.

Kill Worker

Forcefully kills a worker or the entire worker pool. Use with extreme caution. Endpoint: POST /i/kill-worker
Kill Worker
curl -X POST "http://localhost:60000/i/kill-worker" \
  -H "Authorization: Internal YOUR_INTERNAL_TOKEN"
This is a destructive operation intended for debugging and emergency situations. Killing a WorkerPool will terminate all workers in the pool.
When used with WorkerProcessHandle, workers will automatically restart after being killed. With WorkerThread, the threads are permanently stopped.

Internal API Authentication

Internal endpoints require the InternalEndpoint extractor which validates:
  1. Authorization header format: Must be Authorization: Internal {token}
  2. User ID extraction: Token contains the user ID of the caller
  3. Source verification: Requests should only come from trusted internal services
Implementation (src/api/extractors.rs):
pub struct InternalEndpoint {
    pub user_id: String,
}

impl InternalEndpoint {
    // Validates internal authentication
    // Extracts user_id from the authorization token
}
The internal API is designed for service-to-service communication within the Anti-Raid infrastructure. It should not be exposed to the public internet.

Next Steps

Mesophyll Protocol

Learn about the WebSocket coordination layer

System API

Explore public monitoring endpoints

Build docs developers (and LLMs) love