Skip to main content

Overview

Workers are processes that connect to the iii-engine over WebSocket and register functions. Every AgentOS capability—agents, tools, memory, security, workflows—is implemented as a worker.
Worker = Process + Functions + TriggersA worker registers functions and binds them to triggers (HTTP, queue, cron, pubsub). Functions call each other via trigger() regardless of language.

Architecture

All workers follow this pattern:
┌─────────────────────────────────────────────────┐
│              iii-engine                         │
│   WebSocket: ws://localhost:49134               │
│   REST API: http://localhost:3111               │
└─────────────────────────────────────────────────┘
         ▲          ▲          ▲
         │          │          │
    ┌────┴───┐  ┌───┴────┐  ┌─┴───────┐
    │ Rust   │  │ TypeS  │  │ Python  │
    │ Worker │  │ Worker │  │ Worker  │
    └────────┘  └────────┘  └─────────┘
Workers register functions like agent::chat, tool::file_read, memory::store and the engine routes calls between them.

Creating a Rust Worker

Rust workers are used for performance-critical operations (agent core, security, memory, LLM routing).

Example: Agent Core Worker

From crates/agent-core/src/main.rs:
crates/my-worker/src/main.rs
use iii_sdk::iii::III;
use iii_sdk::error::IIIError;
use serde_json::{json, Value};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt::init();

    // 1. Connect to iii-engine
    let iii = III::new("ws://localhost:49134");

    // 2. Register function: agent::chat
    let iii_clone = iii.clone();
    iii.register_function_with_description(
        "agent::chat",
        "Process a message through the agent loop",
        move |input: Value| {
            let iii = iii_clone.clone();
            async move {
                let req: ChatRequest = serde_json::from_value(input)
                    .map_err(|e| IIIError::Handler(e.to_string()))?;
                agent_chat(&iii, req).await
            }
        },
    );

    // 3. Register function: agent::list_tools
    let iii_clone = iii.clone();
    iii.register_function_with_description(
        "agent::list_tools",
        "List tools available to an agent",
        move |input: Value| {
            let iii = iii_clone.clone();
            async move {
                let agent_id = input["agentId"].as_str().unwrap_or("default");
                list_tools(&iii, agent_id).await
            }
        },
    );

    // 4. Register trigger: bind agent::chat to queue
    iii.register_trigger(
        "queue",
        "agent::chat",
        json!({ "topic": "agent.inbox" })
    )?;

    tracing::info!("agent-core worker started");
    tokio::signal::ctrl_c().await?;
    iii.shutdown_async().await;
    Ok(())
}

async fn agent_chat(iii: &III, req: ChatRequest) -> Result<Value, IIIError> {
    // Call other functions
    let config: Value = iii.trigger("state::get", json!({
        "scope": "agents",
        "key": req.agent_id,
    })).await?;

    let memories: Value = iii.trigger("memory::recall", json!({
        "agentId": req.agent_id,
        "query": req.message,
        "limit": 20,
    })).await.unwrap_or(json!([]));

    // Process message...
    Ok(json!({ "content": "response" }))
}

Cargo.toml

crates/my-worker/Cargo.toml
[package]
name = "agentos-my-worker"
version = "0.0.1"
edition = "2024"

[dependencies]
iii-sdk = "0.4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tracing = "0.1"
tracing-subscriber = "0.3"

Add to Workspace

In root Cargo.toml:
Cargo.toml
[workspace]
members = [
    "crates/agent-core",
    "crates/my-worker",  # Add here
    # ...
]

Run the Worker

cargo run --release -p agentos-my-worker

Creating a TypeScript Worker

TypeScript workers are used for rapid iteration (API, tools, workflows, channels).

Example: Tools Worker

From src/tools.ts:
src/my-worker.ts
import { init } from "iii-sdk";

const ENGINE_URL = "ws://localhost:49134";

// 1. Initialize worker
const { registerFunction, registerTrigger, trigger, triggerVoid } = init(
  ENGINE_URL,
  { workerName: "my-worker" }
);

// 2. Register function: tool::file_read
registerFunction(
  {
    id: "tool::file_read",
    description: "Read file contents with path containment",
    metadata: { category: "tool" },
  },
  async ({ path, maxBytes }: { path: string; maxBytes?: number }) => {
    const resolved = resolve(WORKSPACE_ROOT, path);
    assertPathContained(resolved);

    const content = await readFile(resolved, "utf-8");
    const limited = maxBytes ? content.slice(0, maxBytes) : content;
    return { content: limited, path: resolved, size: content.length };
  }
);

// 3. Register function: tool::file_write
registerFunction(
  {
    id: "tool::file_write",
    description: "Write file with path containment",
    metadata: { category: "tool" },
  },
  async ({ path, content }: { path: string; content: string }) => {
    const resolved = resolve(WORKSPACE_ROOT, path);
    assertPathContained(resolved);

    await writeFile(resolved, content, "utf-8");
    return { written: true, path: resolved, size: content.length };
  }
);

// 4. Call other functions
async function processFile(path: string) {
  // Call file_read
  const data: any = await trigger("tool::file_read", { path }, 30_000);
  
  // Call security scan
  const scan: any = await trigger("security::scan_content", {
    content: data.content
  }, 10_000);
  
  if (scan.safe) {
    return data;
  } else {
    throw new Error("Content failed security scan");
  }
}

console.log("my-worker started");

package.json

Ensure iii-sdk is in dependencies:
package.json
{
  "name": "agentos",
  "type": "module",
  "dependencies": {
    "iii-sdk": "^0.4.0"
  },
  "devDependencies": {
    "tsx": "^4.0.0",
    "typescript": "^5.0.0"
  }
}

Run the Worker

# Development (with watch mode)
npx tsx --watch src/my-worker.ts

# Production
node --loader tsx src/my-worker.ts

Creating a Python Worker

Python workers are used for ML operations (embeddings, inference).

Example: Embedding Worker

From workers/embedding/main.py:
workers/my-worker/main.py
import asyncio
import os
from iii_sdk import III

# 1. Initialize worker
iii = III(
    "ws://localhost:49134",
    worker_name="my-worker",
)

model = None

def get_model():
    global model
    if model is None:
        try:
            from sentence_transformers import SentenceTransformer
            model_name = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
            model = SentenceTransformer(model_name)
        except ImportError:
            model = "fallback"
    return model

# 2. Register function: embedding::generate
@iii.function(id="embedding::generate", description="Generate text embeddings")
async def generate_embedding(input):
    text = input.get("text", "")
    batch = input.get("batch")

    m = get_model()

    if m == "fallback":
        if batch:
            return {"embeddings": [_hash_embed(t) for t in batch], "dim": 128}
        return {"embedding": _hash_embed(text), "dim": 128}

    if batch:
        embeddings = m.encode(batch, normalize_embeddings=True)
        return {
            "embeddings": [e.tolist() for e in embeddings],
            "dim": embeddings.shape[1],
        }

    embedding = m.encode([text], normalize_embeddings=True)[0]
    return {"embedding": embedding.tolist(), "dim": len(embedding)}

# 3. Register function: embedding::similarity
@iii.function(id="embedding::similarity", description="Compute cosine similarity")
async def compute_similarity(input):
    a = input.get("a", [])
    b = input.get("b", [])

    if len(a) != len(b) or not a:
        return {"similarity": 0.0}

    dot = sum(x * y for x, y in zip(a, b))
    norm_a = sum(x * x for x in a) ** 0.5
    norm_b = sum(x * x for x in b) ** 0.5
    denom = norm_a * norm_b

    return {"similarity": dot / denom if denom > 0 else 0.0}

def _hash_embed(text: str, dim: int = 128) -> list:
    import math
    words = text.lower().split()
    vec = [0.0] * dim
    for word in words:
        h = hash(word) & 0xFFFFFFFF
        for i in range(dim):
            vec[i] += math.sin(h * (i + 1)) / max(len(words), 1)
    norm = sum(v * v for v in vec) ** 0.5
    if norm > 0:
        vec = [v / norm for v in vec]
    return vec

async def main():
    print("my-worker started")
    try:
        await asyncio.Event().wait()
    except KeyboardInterrupt:
        await iii.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

requirements.txt

workers/my-worker/requirements.txt
iii-sdk>=0.4.0
sentence-transformers>=2.0.0
torch>=2.0.0

Run the Worker

pip install -r workers/my-worker/requirements.txt
python workers/my-worker/main.py

Worker Registration Patterns

Pattern 1: Simple Function

iii.register_function_with_description(
    "my::simple",
    "A simple function",
    |input: Value| async move {
        Ok(json!({ "result": "success" }))
    },
);

Pattern 2: Function with State

let state = Arc::new(Mutex::new(HashMap::new()));
let state_clone = state.clone();

iii.register_function_with_description(
    "my::with_state",
    "Function with shared state",
    move |input: Value| {
        let state = state_clone.clone();
        async move {
            let mut s = state.lock().unwrap();
            s.insert("key", "value");
            Ok(json!({ "state": s.len() }))
        }
    },
);

Pattern 3: Function Calling Other Functions

let iii_clone = iii.clone();
iii.register_function_with_description(
    "my::composite",
    "Calls other functions",
    move |input: Value| {
        let iii = iii_clone.clone();
        async move {
            let result1 = iii.trigger("tool::file_read", json!({ "path": "/tmp/test" })).await?;
            let result2 = iii.trigger("security::scan", json!({ "content": result1 })).await?;
            Ok(result2)
        }
    },
);

Trigger Types

HTTP Trigger

Expose a function via REST API:
iii.register_trigger(
    "http",
    "my::http_handler",
    json!({
        "api_path": "my/endpoint",
        "http_method": "POST"
    })
)?;
Accessible at: POST http://localhost:3111/my/endpoint

Queue Trigger

Process messages from a queue:
iii.register_trigger(
    "queue",
    "my::queue_handler",
    json!({ "topic": "my.queue" })
)?;

Cron Trigger

Run on a schedule:
iii.register_trigger(
    "cron",
    "my::scheduled",
    json!({ "expression": "0 */5 * * * *" })  // Every 5 minutes
)?;

PubSub Trigger

Subscribe to events:
iii.register_trigger(
    "pubsub",
    "my::event_handler",
    json!({ "topic": "agent.lifecycle" })
)?;

Testing Workers

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_function() {
        let input = json!({ "key": "value" });
        let result = my_function(input).await.unwrap();
        assert_eq!(result["status"], "ok");
    }
}
cargo test -p agentos-my-worker

Best Practices

1

Use Descriptive IDs

Function IDs should follow namespace::action format:
  • agent::chat, agent::create, agent::delete
  • tool::file_read, tool::file_write
  • memory::store, memory::recall
2

Handle Errors Gracefully

Always return structured errors:
try {
  const result = await riskyOperation();
  return { success: true, data: result };
} catch (err: any) {
  return { success: false, error: err.message };
}
3

Set Timeouts

Always specify timeouts when calling other functions:
// Good
await trigger("tool::web_fetch", { url }, 30_000);

// Bad (no timeout)
await trigger("tool::web_fetch", { url });
4

Add Metadata

Include metadata for categorization:
registerFunction(
  {
    id: "my::function",
    description: "Clear description",
    metadata: { category: "my-category", version: "1.0" }
  },
  handler
);
5

Log Important Events

Use structured logging:
tracing::info!("agent-core worker started");
tracing::warn!("rate limit exceeded", agent_id = %agent_id);

Next Steps

Creating Tools

Build tools that agents can call

Creating Agents

Create agent templates

Testing

Test your workers

API Reference

Full worker API documentation

Build docs developers (and LLMs) love