AgentContext
The AgentContext provides a unified execution context for passing state between agents and their components. It includes state storage, interrupt signals, event bus, and configuration.
Location: mofa-kernel/src/agent/context.rs
Type Definition
#[derive(Clone)]
pub struct AgentContext<S: Clone = serde_json::Value> {
pub execution_id: String,
pub session_id: Option<String>,
// Internal fields (private)
}
Design Principles
The AgentContext follows kernel primitive principles:
- Basic state storage - Key-value store for execution state
- Interrupt signals - For graceful task cancellation
- Event bus - For publishing and subscribing to events
- Configuration - Execution-level settings
- Parent-child relationships - For nested task execution
Business logic (metrics collection, output logging) should be implemented in the foundation layer’s RichAgentContext, not in the kernel AgentContext.
Fields
Unique identifier for this execution instance. Used to track individual task executions.
Optional session identifier for multi-turn conversations. Multiple executions can share the same session.
Constructor Methods
new
pub fn new(execution_id: impl Into<String>) -> Self
Creates a new context with the given execution ID.
use mofa_sdk::kernel::AgentContext;
let ctx = AgentContext::new("exec-123");
with_session
pub fn with_session(
execution_id: impl Into<String>,
session_id: impl Into<String>
) -> Self
Creates a context with both execution and session IDs.
let ctx = AgentContext::with_session("exec-123", "session-456");
child
pub fn child(&self, execution_id: impl Into<String>) -> Self
Creates a child context for sub-task execution. The child shares the parent’s interrupt signal and event bus.
let parent_ctx = AgentContext::new("parent-exec");
let child_ctx = parent_ctx.child("child-exec");
// Child inherits parent's session ID
assert_eq!(child_ctx.session_id, parent_ctx.session_id);
with_config
pub fn with_config(self, config: ContextConfig<S>) -> Self
Sets the context configuration.
let config = ContextConfig {
timeout_ms: Some(5000),
max_retries: 3,
enable_tracing: true,
custom: HashMap::new(),
};
let ctx = AgentContext::new("exec-123")
.with_config(config);
State Management
get
pub async fn get(&self, key: &str) -> Option<S>
Retrieves a value from the context state.
let value: Option<String> = ctx.get("user_id").await;
set
pub async fn set(&self, key: &str, value: S)
Stores a value in the context state.
ctx.set("user_id", "user-789".to_string()).await;
ctx.set("count", json!(42)).await;
remove
pub async fn remove(&self, key: &str) -> Option<S>
Removes and returns a value from the context state.
let old_value = ctx.remove("temp_data").await;
contains
pub async fn contains(&self, key: &str) -> bool
Checks if a key exists in the context state.
if ctx.contains("user_id").await {
println!("User ID is set");
}
keys
pub async fn keys(&self) -> Vec<String>
Returns all keys in the context state.
let all_keys = ctx.keys().await;
for key in all_keys {
println!("Key: {}", key);
}
find
pub async fn find(&self, key: &str) -> Option<S>
Recursively searches for a value in the current context and parent contexts.
let parent_ctx = AgentContext::new("parent");
parent_ctx.set("config", json!("production")).await;
let child_ctx = parent_ctx.child("child");
// Child can find parent's values
let value = child_ctx.find("config").await;
assert_eq!(value, Some(json!("production")));
Interrupt Handling
is_interrupted
pub fn is_interrupted(&self) -> bool
Checks if an interrupt signal has been triggered.
while !ctx.is_interrupted() {
// Continue processing
process_next_item().await;
}
trigger_interrupt
pub fn trigger_interrupt(&self)
Triggers an interrupt signal.
clear_interrupt
pub fn clear_interrupt(&self)
Clears the interrupt status.
Event Bus
emit_event
pub async fn emit_event(&self, event: AgentEvent<S>)
Publishes an event to the event bus.
use mofa_sdk::kernel::AgentEvent;
let event = AgentEvent::new("task_started", json!({
"task_id": "task-123",
"timestamp": 1234567890
}));
ctx.emit_event(event).await;
subscribe
pub async fn subscribe(&self, event_type: &str) -> EventReceiver<S>
Subscribes to events of a specific type. Use "*" to subscribe to all events.
let mut rx = ctx.subscribe("task_completed").await;
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
println!("Received event: {:?}", event);
}
});
Configuration
config
pub fn config(&self) -> &ContextConfig<S>
Returns the context configuration.
let cfg = ctx.config();
if let Some(timeout) = cfg.timeout_ms {
println!("Timeout: {}ms", timeout);
}
parent
pub fn parent(&self) -> Option<&Arc<AgentContext<S>>>
Returns a reference to the parent context (if this is a child context).
if let Some(parent) = ctx.parent() {
println!("Parent execution ID: {}", parent.execution_id);
}
ContextConfig
#[derive(Debug, Clone)]
pub struct ContextConfig<S = serde_json::Value> {
pub timeout_ms: Option<u64>,
pub max_retries: u32,
pub enable_tracing: bool,
pub custom: HashMap<String, S>,
}
Fields
Execution timeout in milliseconds
Maximum number of retry attempts
Whether to enable distributed tracing
custom
HashMap<String, S>
default:"{}"
Custom configuration key-value pairs
AgentEvent
#[derive(Debug, Clone)]
pub struct AgentEvent<S = serde_json::Value> {
pub event_type: String,
pub data: S,
pub timestamp_ms: u64,
pub source: Option<String>,
}
Fields
Type/name of the event (e.g., “task_started”, “error_occurred”)
Event timestamp in milliseconds since epoch
Optional source identifier (agent ID, component name, etc.)
Constructor
impl<S> AgentEvent<S> {
pub fn new(event_type: impl Into<String>, data: S) -> Self;
pub fn with_source(self, source: impl Into<String>) -> Self;
}
let event = AgentEvent::new("task_completed", json!({"status": "success"}))
.with_source("agent-123");
Example Usage
Basic State Management
use mofa_sdk::kernel::{AgentContext, AgentEvent};
use serde_json::json;
let ctx = AgentContext::new("exec-001");
// Store state
ctx.set("user_id", json!("user-123")).await;
ctx.set("request_count", json!(5)).await;
// Retrieve state
let user_id = ctx.get("user_id").await;
assert_eq!(user_id, Some(json!("user-123")));
// Check existence
if ctx.contains("user_id").await {
println!("User ID is set");
}
Event-Driven Communication
use mofa_sdk::kernel::{AgentContext, AgentEvent};
let ctx = AgentContext::new("exec-002");
// Subscribe to events
let mut rx = ctx.subscribe("progress_update").await;
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
println!("Progress: {:?}", event.data);
}
});
// Emit events
for i in 0..10 {
ctx.emit_event(AgentEvent::new(
"progress_update",
json!({"percent": i * 10})
)).await;
}
Interrupt Handling
use mofa_sdk::kernel::AgentContext;
use std::time::Duration;
let ctx = AgentContext::new("exec-003");
// Spawn interrupt handler
let ctx_clone = ctx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
ctx_clone.trigger_interrupt();
});
// Long-running task with interrupt checks
while !ctx.is_interrupted() {
process_item().await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
println!("Task interrupted!");
Parent-Child Context
use mofa_sdk::kernel::AgentContext;
use serde_json::json;
// Parent context
let parent = AgentContext::with_session("parent-exec", "session-123");
parent.set("global_config", json!("production")).await;
// Child context inherits session and can access parent state
let child = parent.child("child-exec-001");
assert_eq!(child.session_id, Some("session-123".to_string()));
// Child can find parent's values
let config = child.find("global_config").await;
assert_eq!(config, Some(json!("production")));
// Child's own state doesn't affect parent
child.set("local_data", json!("child-specific")).await;
assert!(parent.get("local_data").await.is_none());
With Configuration
use mofa_sdk::kernel::{AgentContext, ContextConfig};
use std::collections::HashMap;
let config = ContextConfig {
timeout_ms: Some(10000), // 10 seconds
max_retries: 5,
enable_tracing: true,
custom: HashMap::from([(
"log_level".to_string(),
json!("debug")
)]),
};
let ctx = AgentContext::new("exec-004")
.with_config(config);
let cfg = ctx.config();
println!("Timeout: {:?}ms", cfg.timeout_ms);
println!("Max retries: {}", cfg.max_retries);
See Also