Skip to main content

AgentContext

The AgentContext provides a unified execution context for passing state between agents and their components. It includes state storage, interrupt signals, event bus, and configuration. Location: mofa-kernel/src/agent/context.rs

Type Definition

#[derive(Clone)]
pub struct AgentContext<S: Clone = serde_json::Value> {
    pub execution_id: String,
    pub session_id: Option<String>,
    // Internal fields (private)
}

Design Principles

The AgentContext follows kernel primitive principles:
  1. Basic state storage - Key-value store for execution state
  2. Interrupt signals - For graceful task cancellation
  3. Event bus - For publishing and subscribing to events
  4. Configuration - Execution-level settings
  5. Parent-child relationships - For nested task execution
Business logic (metrics collection, output logging) should be implemented in the foundation layer’s RichAgentContext, not in the kernel AgentContext.

Fields

execution_id
String
required
Unique identifier for this execution instance. Used to track individual task executions.
session_id
Option<String>
Optional session identifier for multi-turn conversations. Multiple executions can share the same session.

Constructor Methods

new

pub fn new(execution_id: impl Into<String>) -> Self
Creates a new context with the given execution ID.
use mofa_sdk::kernel::AgentContext;

let ctx = AgentContext::new("exec-123");

with_session

pub fn with_session(
    execution_id: impl Into<String>, 
    session_id: impl Into<String>
) -> Self
Creates a context with both execution and session IDs.
let ctx = AgentContext::with_session("exec-123", "session-456");

child

pub fn child(&self, execution_id: impl Into<String>) -> Self
Creates a child context for sub-task execution. The child shares the parent’s interrupt signal and event bus.
let parent_ctx = AgentContext::new("parent-exec");
let child_ctx = parent_ctx.child("child-exec");

// Child inherits parent's session ID
assert_eq!(child_ctx.session_id, parent_ctx.session_id);

with_config

pub fn with_config(self, config: ContextConfig<S>) -> Self
Sets the context configuration.
let config = ContextConfig {
    timeout_ms: Some(5000),
    max_retries: 3,
    enable_tracing: true,
    custom: HashMap::new(),
};

let ctx = AgentContext::new("exec-123")
    .with_config(config);

State Management

get

pub async fn get(&self, key: &str) -> Option<S>
Retrieves a value from the context state.
let value: Option<String> = ctx.get("user_id").await;

set

pub async fn set(&self, key: &str, value: S)
Stores a value in the context state.
ctx.set("user_id", "user-789".to_string()).await;
ctx.set("count", json!(42)).await;

remove

pub async fn remove(&self, key: &str) -> Option<S>
Removes and returns a value from the context state.
let old_value = ctx.remove("temp_data").await;

contains

pub async fn contains(&self, key: &str) -> bool
Checks if a key exists in the context state.
if ctx.contains("user_id").await {
    println!("User ID is set");
}

keys

pub async fn keys(&self) -> Vec<String>
Returns all keys in the context state.
let all_keys = ctx.keys().await;
for key in all_keys {
    println!("Key: {}", key);
}

find

pub async fn find(&self, key: &str) -> Option<S>
Recursively searches for a value in the current context and parent contexts.
let parent_ctx = AgentContext::new("parent");
parent_ctx.set("config", json!("production")).await;

let child_ctx = parent_ctx.child("child");

// Child can find parent's values
let value = child_ctx.find("config").await;
assert_eq!(value, Some(json!("production")));

Interrupt Handling

is_interrupted

pub fn is_interrupted(&self) -> bool
Checks if an interrupt signal has been triggered.
while !ctx.is_interrupted() {
    // Continue processing
    process_next_item().await;
}

trigger_interrupt

pub fn trigger_interrupt(&self)
Triggers an interrupt signal.
ctx.trigger_interrupt();

clear_interrupt

pub fn clear_interrupt(&self)
Clears the interrupt status.
ctx.clear_interrupt();

Event Bus

emit_event

pub async fn emit_event(&self, event: AgentEvent<S>)
Publishes an event to the event bus.
use mofa_sdk::kernel::AgentEvent;

let event = AgentEvent::new("task_started", json!({
    "task_id": "task-123",
    "timestamp": 1234567890
}));

ctx.emit_event(event).await;

subscribe

pub async fn subscribe(&self, event_type: &str) -> EventReceiver<S>
Subscribes to events of a specific type. Use "*" to subscribe to all events.
let mut rx = ctx.subscribe("task_completed").await;

tokio::spawn(async move {
    while let Some(event) = rx.recv().await {
        println!("Received event: {:?}", event);
    }
});

Configuration

config

pub fn config(&self) -> &ContextConfig<S>
Returns the context configuration.
let cfg = ctx.config();
if let Some(timeout) = cfg.timeout_ms {
    println!("Timeout: {}ms", timeout);
}

parent

pub fn parent(&self) -> Option<&Arc<AgentContext<S>>>
Returns a reference to the parent context (if this is a child context).
if let Some(parent) = ctx.parent() {
    println!("Parent execution ID: {}", parent.execution_id);
}

ContextConfig

#[derive(Debug, Clone)]
pub struct ContextConfig<S = serde_json::Value> {
    pub timeout_ms: Option<u64>,
    pub max_retries: u32,
    pub enable_tracing: bool,
    pub custom: HashMap<String, S>,
}

Fields

timeout_ms
Option<u64>
Execution timeout in milliseconds
max_retries
u32
default:"3"
Maximum number of retry attempts
enable_tracing
bool
default:"false"
Whether to enable distributed tracing
custom
HashMap<String, S>
default:"{}"
Custom configuration key-value pairs

AgentEvent

#[derive(Debug, Clone)]
pub struct AgentEvent<S = serde_json::Value> {
    pub event_type: String,
    pub data: S,
    pub timestamp_ms: u64,
    pub source: Option<String>,
}

Fields

event_type
String
required
Type/name of the event (e.g., “task_started”, “error_occurred”)
data
S
required
Event payload data
timestamp_ms
u64
required
Event timestamp in milliseconds since epoch
source
Option<String>
Optional source identifier (agent ID, component name, etc.)

Constructor

impl<S> AgentEvent<S> {
    pub fn new(event_type: impl Into<String>, data: S) -> Self;
    pub fn with_source(self, source: impl Into<String>) -> Self;
}
let event = AgentEvent::new("task_completed", json!({"status": "success"}))
    .with_source("agent-123");

Example Usage

Basic State Management

use mofa_sdk::kernel::{AgentContext, AgentEvent};
use serde_json::json;

let ctx = AgentContext::new("exec-001");

// Store state
ctx.set("user_id", json!("user-123")).await;
ctx.set("request_count", json!(5)).await;

// Retrieve state
let user_id = ctx.get("user_id").await;
assert_eq!(user_id, Some(json!("user-123")));

// Check existence
if ctx.contains("user_id").await {
    println!("User ID is set");
}

Event-Driven Communication

use mofa_sdk::kernel::{AgentContext, AgentEvent};

let ctx = AgentContext::new("exec-002");

// Subscribe to events
let mut rx = ctx.subscribe("progress_update").await;

tokio::spawn(async move {
    while let Some(event) = rx.recv().await {
        println!("Progress: {:?}", event.data);
    }
});

// Emit events
for i in 0..10 {
    ctx.emit_event(AgentEvent::new(
        "progress_update",
        json!({"percent": i * 10})
    )).await;
}

Interrupt Handling

use mofa_sdk::kernel::AgentContext;
use std::time::Duration;

let ctx = AgentContext::new("exec-003");

// Spawn interrupt handler
let ctx_clone = ctx.clone();
tokio::spawn(async move {
    tokio::time::sleep(Duration::from_secs(5)).await;
    ctx_clone.trigger_interrupt();
});

// Long-running task with interrupt checks
while !ctx.is_interrupted() {
    process_item().await;
    tokio::time::sleep(Duration::from_millis(100)).await;
}

println!("Task interrupted!");

Parent-Child Context

use mofa_sdk::kernel::AgentContext;
use serde_json::json;

// Parent context
let parent = AgentContext::with_session("parent-exec", "session-123");
parent.set("global_config", json!("production")).await;

// Child context inherits session and can access parent state
let child = parent.child("child-exec-001");
assert_eq!(child.session_id, Some("session-123".to_string()));

// Child can find parent's values
let config = child.find("global_config").await;
assert_eq!(config, Some(json!("production")));

// Child's own state doesn't affect parent
child.set("local_data", json!("child-specific")).await;
assert!(parent.get("local_data").await.is_none());

With Configuration

use mofa_sdk::kernel::{AgentContext, ContextConfig};
use std::collections::HashMap;

let config = ContextConfig {
    timeout_ms: Some(10000), // 10 seconds
    max_retries: 5,
    enable_tracing: true,
    custom: HashMap::from([(
        "log_level".to_string(),
        json!("debug")
    )]),
};

let ctx = AgentContext::new("exec-004")
    .with_config(config);

let cfg = ctx.config();
println!("Timeout: {:?}ms", cfg.timeout_ms);
println!("Max retries: {}", cfg.max_retries);

See Also

Build docs developers (and LLMs) love