Overview
Workers are processes that connect to the iii-engine over WebSocket and register functions. Every AgentOS capability—agents, tools, memory, security, workflows—is implemented as a worker.
Worker = Process + Functions + Triggers A worker registers functions and binds them to triggers (HTTP, queue, cron, pubsub). Functions call each other via trigger() regardless of language.
Architecture
All workers follow this pattern:
┌─────────────────────────────────────────────────┐
│ iii-engine │
│ WebSocket: ws://localhost:49134 │
│ REST API: http://localhost:3111 │
└─────────────────────────────────────────────────┘
▲ ▲ ▲
│ │ │
┌────┴───┐ ┌───┴────┐ ┌─┴───────┐
│ Rust │ │ TypeS │ │ Python │
│ Worker │ │ Worker │ │ Worker │
└────────┘ └────────┘ └─────────┘
Workers register functions like agent::chat, tool::file_read, memory::store and the engine routes calls between them.
Creating a Rust Worker
Rust workers are used for performance-critical operations (agent core, security, memory, LLM routing).
Example: Agent Core Worker
From crates/agent-core/src/main.rs:
crates/my-worker/src/main.rs
use iii_sdk :: iii :: III ;
use iii_sdk :: error :: IIIError ;
use serde_json :: {json, Value };
#[tokio :: main]
async fn main () -> Result <(), Box < dyn std :: error :: Error >> {
tracing_subscriber :: fmt :: init ();
// 1. Connect to iii-engine
let iii = III :: new ( "ws://localhost:49134" );
// 2. Register function: agent::chat
let iii_clone = iii . clone ();
iii . register_function_with_description (
"agent::chat" ,
"Process a message through the agent loop" ,
move | input : Value | {
let iii = iii_clone . clone ();
async move {
let req : ChatRequest = serde_json :: from_value ( input )
. map_err ( | e | IIIError :: Handler ( e . to_string ())) ? ;
agent_chat ( & iii , req ) . await
}
},
);
// 3. Register function: agent::list_tools
let iii_clone = iii . clone ();
iii . register_function_with_description (
"agent::list_tools" ,
"List tools available to an agent" ,
move | input : Value | {
let iii = iii_clone . clone ();
async move {
let agent_id = input [ "agentId" ] . as_str () . unwrap_or ( "default" );
list_tools ( & iii , agent_id ) . await
}
},
);
// 4. Register trigger: bind agent::chat to queue
iii . register_trigger (
"queue" ,
"agent::chat" ,
json! ({ "topic" : "agent.inbox" })
) ? ;
tracing :: info! ( "agent-core worker started" );
tokio :: signal :: ctrl_c () . await ? ;
iii . shutdown_async () . await ;
Ok (())
}
async fn agent_chat ( iii : & III , req : ChatRequest ) -> Result < Value , IIIError > {
// Call other functions
let config : Value = iii . trigger ( "state::get" , json! ({
"scope" : "agents" ,
"key" : req . agent_id,
})) . await ? ;
let memories : Value = iii . trigger ( "memory::recall" , json! ({
"agentId" : req . agent_id,
"query" : req . message,
"limit" : 20 ,
})) . await . unwrap_or ( json! ([]));
// Process message...
Ok ( json! ({ "content" : "response" }))
}
Cargo.toml
crates/my-worker/Cargo.toml
[ package ]
name = "agentos-my-worker"
version = "0.0.1"
edition = "2024"
[ dependencies ]
iii-sdk = "0.4"
tokio = { version = "1" , features = [ "full" ] }
serde = { version = "1" , features = [ "derive" ] }
serde_json = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
Add to Workspace
In root Cargo.toml:
[ workspace ]
members = [
"crates/agent-core" ,
"crates/my-worker" , # Add here
# ...
]
Run the Worker
cargo run --release -p agentos-my-worker
Creating a TypeScript Worker
TypeScript workers are used for rapid iteration (API, tools, workflows, channels).
From src/tools.ts:
import { init } from "iii-sdk" ;
const ENGINE_URL = "ws://localhost:49134" ;
// 1. Initialize worker
const { registerFunction , registerTrigger , trigger , triggerVoid } = init (
ENGINE_URL ,
{ workerName: "my-worker" }
);
// 2. Register function: tool::file_read
registerFunction (
{
id: "tool::file_read" ,
description: "Read file contents with path containment" ,
metadata: { category: "tool" },
},
async ({ path , maxBytes } : { path : string ; maxBytes ?: number }) => {
const resolved = resolve ( WORKSPACE_ROOT , path );
assertPathContained ( resolved );
const content = await readFile ( resolved , "utf-8" );
const limited = maxBytes ? content . slice ( 0 , maxBytes ) : content ;
return { content: limited , path: resolved , size: content . length };
}
);
// 3. Register function: tool::file_write
registerFunction (
{
id: "tool::file_write" ,
description: "Write file with path containment" ,
metadata: { category: "tool" },
},
async ({ path , content } : { path : string ; content : string }) => {
const resolved = resolve ( WORKSPACE_ROOT , path );
assertPathContained ( resolved );
await writeFile ( resolved , content , "utf-8" );
return { written: true , path: resolved , size: content . length };
}
);
// 4. Call other functions
async function processFile ( path : string ) {
// Call file_read
const data : any = await trigger ( "tool::file_read" , { path }, 30_000 );
// Call security scan
const scan : any = await trigger ( "security::scan_content" , {
content: data . content
}, 10_000 );
if ( scan . safe ) {
return data ;
} else {
throw new Error ( "Content failed security scan" );
}
}
console . log ( "my-worker started" );
package.json
Ensure iii-sdk is in dependencies:
{
"name" : "agentos" ,
"type" : "module" ,
"dependencies" : {
"iii-sdk" : "^0.4.0"
},
"devDependencies" : {
"tsx" : "^4.0.0" ,
"typescript" : "^5.0.0"
}
}
Run the Worker
# Development (with watch mode)
npx tsx --watch src/my-worker.ts
# Production
node --loader tsx src/my-worker.ts
Creating a Python Worker
Python workers are used for ML operations (embeddings, inference).
Example: Embedding Worker
From workers/embedding/main.py:
workers/my-worker/main.py
import asyncio
import os
from iii_sdk import III
# 1. Initialize worker
iii = III(
"ws://localhost:49134" ,
worker_name = "my-worker" ,
)
model = None
def get_model ():
global model
if model is None :
try :
from sentence_transformers import SentenceTransformer
model_name = os.getenv( "EMBEDDING_MODEL" , "all-MiniLM-L6-v2" )
model = SentenceTransformer(model_name)
except ImportError :
model = "fallback"
return model
# 2. Register function: embedding::generate
@iii.function ( id = "embedding::generate" , description = "Generate text embeddings" )
async def generate_embedding ( input ):
text = input .get( "text" , "" )
batch = input .get( "batch" )
m = get_model()
if m == "fallback" :
if batch:
return { "embeddings" : [_hash_embed(t) for t in batch], "dim" : 128 }
return { "embedding" : _hash_embed(text), "dim" : 128 }
if batch:
embeddings = m.encode(batch, normalize_embeddings = True )
return {
"embeddings" : [e.tolist() for e in embeddings],
"dim" : embeddings.shape[ 1 ],
}
embedding = m.encode([text], normalize_embeddings = True )[ 0 ]
return { "embedding" : embedding.tolist(), "dim" : len (embedding)}
# 3. Register function: embedding::similarity
@iii.function ( id = "embedding::similarity" , description = "Compute cosine similarity" )
async def compute_similarity ( input ):
a = input .get( "a" , [])
b = input .get( "b" , [])
if len (a) != len (b) or not a:
return { "similarity" : 0.0 }
dot = sum (x * y for x, y in zip (a, b))
norm_a = sum (x * x for x in a) ** 0.5
norm_b = sum (x * x for x in b) ** 0.5
denom = norm_a * norm_b
return { "similarity" : dot / denom if denom > 0 else 0.0 }
def _hash_embed ( text : str , dim : int = 128 ) -> list :
import math
words = text.lower().split()
vec = [ 0.0 ] * dim
for word in words:
h = hash (word) & 0x FFFFFFFF
for i in range (dim):
vec[i] += math.sin(h * (i + 1 )) / max ( len (words), 1 )
norm = sum (v * v for v in vec) ** 0.5
if norm > 0 :
vec = [v / norm for v in vec]
return vec
async def main ():
print ( "my-worker started" )
try :
await asyncio.Event().wait()
except KeyboardInterrupt :
await iii.shutdown()
if __name__ == "__main__" :
asyncio.run(main())
requirements.txt
workers/my-worker/requirements.txt
iii-sdk>=0.4.0
sentence-transformers>=2.0.0
torch>=2.0.0
Run the Worker
pip install -r workers/my-worker/requirements.txt
python workers/my-worker/main.py
Worker Registration Patterns
Pattern 1: Simple Function
iii . register_function_with_description (
"my::simple" ,
"A simple function" ,
| input : Value | async move {
Ok ( json! ({ "result" : "success" }))
},
);
Pattern 2: Function with State
let state = Arc :: new ( Mutex :: new ( HashMap :: new ()));
let state_clone = state . clone ();
iii . register_function_with_description (
"my::with_state" ,
"Function with shared state" ,
move | input : Value | {
let state = state_clone . clone ();
async move {
let mut s = state . lock () . unwrap ();
s . insert ( "key" , "value" );
Ok ( json! ({ "state" : s . len () }))
}
},
);
Pattern 3: Function Calling Other Functions
let iii_clone = iii . clone ();
iii . register_function_with_description (
"my::composite" ,
"Calls other functions" ,
move | input : Value | {
let iii = iii_clone . clone ();
async move {
let result1 = iii . trigger ( "tool::file_read" , json! ({ "path" : "/tmp/test" })) . await ? ;
let result2 = iii . trigger ( "security::scan" , json! ({ "content" : result1 })) . await ? ;
Ok ( result2 )
}
},
);
Trigger Types
HTTP Trigger
Expose a function via REST API:
iii . register_trigger (
"http" ,
"my::http_handler" ,
json! ({
"api_path" : "my/endpoint" ,
"http_method" : "POST"
})
) ? ;
Accessible at: POST http://localhost:3111/my/endpoint
Queue Trigger
Process messages from a queue:
iii . register_trigger (
"queue" ,
"my::queue_handler" ,
json! ({ "topic" : "my.queue" })
) ? ;
Cron Trigger
Run on a schedule:
iii . register_trigger (
"cron" ,
"my::scheduled" ,
json! ({ "expression" : "0 */5 * * * *" }) // Every 5 minutes
) ? ;
PubSub Trigger
Subscribe to events:
iii . register_trigger (
"pubsub" ,
"my::event_handler" ,
json! ({ "topic" : "agent.lifecycle" })
) ? ;
Testing Workers
#[cfg(test)]
mod tests {
use super ::* ;
#[tokio :: test]
async fn test_function () {
let input = json! ({ "key" : "value" });
let result = my_function ( input ) . await . unwrap ();
assert_eq! ( result [ "status" ], "ok" );
}
}
cargo test -p agentos-my-worker
// src/__tests__/my-worker.test.ts
import { describe , it , expect } from "vitest" ;
import { myFunction } from "../my-worker.js" ;
describe ( "my-worker" , () => {
it ( "should process input" , async () => {
const result = await myFunction ({ key: "value" });
expect ( result . status ). toBe ( "ok" );
});
});
npx vitest --run src/__tests__/my-worker.test.ts
# workers/my-worker/test_main.py
import pytest
from main import generate_embedding
@pytest.mark.asyncio
async def test_generate_embedding ():
result = await generate_embedding({ "text" : "hello world" })
assert "embedding" in result
assert result[ "dim" ] > 0
python3 -m pytest workers/my-worker/
Best Practices
Use Descriptive IDs
Function IDs should follow namespace::action format:
agent::chat, agent::create, agent::delete
tool::file_read, tool::file_write
memory::store, memory::recall
Handle Errors Gracefully
Always return structured errors: try {
const result = await riskyOperation ();
return { success: true , data: result };
} catch ( err : any ) {
return { success: false , error: err . message };
}
Set Timeouts
Always specify timeouts when calling other functions: // Good
await trigger ( "tool::web_fetch" , { url }, 30_000 );
// Bad (no timeout)
await trigger ( "tool::web_fetch" , { url });
Add Metadata
Include metadata for categorization: registerFunction (
{
id: "my::function" ,
description: "Clear description" ,
metadata: { category: "my-category" , version: "1.0" }
},
handler
);
Log Important Events
Use structured logging: tracing :: info! ( "agent-core worker started" );
tracing :: warn! ( "rate limit exceeded" , agent_id = % agent_id );
Next Steps
Creating Tools Build tools that agents can call
Creating Agents Create agent templates
API Reference Full worker API documentation