Skip to main content

Overview

Callbacks are the primary mechanism for customizing agent behavior in Solace Agent Mesh. They intercept execution at critical points, allowing you to modify requests, responses, and tool outputs.

Callback Architecture

Callbacks are registered with the ADK agent and executed at specific lifecycle events:
from google.adk.agents.callback_context import CallbackContext
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse

def my_callback(
    callback_context: CallbackContext,
    llm_request: LlmRequest,
) -> Optional[LlmResponse]:
    """Callback executed before LLM call."""
    # Modify request or return early response
    return None  # Continue to LLM

Built-in Callbacks

Solace Agent Mesh includes several powerful built-in callbacks:

repair_history_callback

When: Before every LLM call
Purpose: Repair conversation history with dangling tool calls
def repair_history_callback(
    callback_context: CallbackContext,
    llm_request: LlmRequest,
) -> Optional[LlmResponse]:
    """
    Proactively checks for and repairs dangling tool calls
    in the conversation history before sending to the LLM.
    """
    if not llm_request.contents:
        return None
    
    history_modified = False
    for i, content in enumerate(llm_request.contents):
        function_calls = [
            p.function_call for p in content.parts 
            if p.function_call
        ]
        
        if function_calls:
            # Check if next content is a valid tool response
            next_content = llm_request.contents[i + 1] if i + 1 < len(llm_request.contents) else None
            
            if not next_content or not has_function_response(next_content):
                # Dangling tool call - insert synthetic response
                repair_content = create_error_response(function_calls)
                llm_request.contents.insert(i + 1, repair_content)
                history_modified = True
    
    return None  # Continue with repaired history
Use Case: Prevents LLM errors when tool calls weren’t completed due to crashes or cancellations.

process_artifact_blocks_callback

When: After every LLM response
Purpose: Parse and process fenced artifact blocks in streaming responses
async def process_artifact_blocks_callback(
    callback_context: CallbackContext,
    llm_response: LlmResponse,
    host_component: SamAgentComponent,
) -> Optional[LlmResponse]:
    """
    Orchestrates parsing of fenced artifact blocks from LLM stream.
    Handles streaming artifact creation and progress updates.
    """
    parser = session.state.get('fenced_block_parser')
    if parser is None:
        parser = FencedBlockStreamParser()
        session.state['fenced_block_parser'] = parser
    
    if llm_response.partial:
        # Process streaming chunks
        for part in llm_response.content.parts:
            if part.text:
                result = parser.process_chunk(part.text)
                
                # Handle parser events
                for event in result.events:
                    if isinstance(event, BlockStartedEvent):
                        await send_artifact_progress(
                            filename=event.params['filename'],
                            status='in-progress',
                        )
                    elif isinstance(event, BlockCompletedEvent):
                        # Save artifact
                        artifact = await save_artifact(
                            filename=event.params['filename'],
                            content=event.content,
                        )
                        await send_artifact_progress(
                            filename=event.params['filename'],
                            status='completed',
                            version=artifact.version,
                        )
    
    return None  # Return modified response
Artifact Block Format:
«««save_artifact: filename="data.json" mime_type="application/json" description="Analysis results"
{
  "result": "success",
  "data": [1, 2, 3]
}
»»»
Features:
  • Streaming artifact creation with progress updates
  • Early embed resolution (math, datetime, uuid)
  • Automatic versioning
  • Error handling for malformed blocks

manage_large_mcp_tool_responses_callback

When: After MCP tool execution
Purpose: Handle large MCP responses by saving as artifacts
async def manage_large_mcp_tool_responses_callback(
    tool: BaseTool,
    args: Dict[str, Any],
    tool_context: ToolContext,
    tool_response: Any,
    host_component: SamAgentComponent,
) -> Optional[Dict[str, Any]]:
    """
    Manages large or non-textual responses from MCP tools.
    
    Strategy:
    1. Save as artifact if response > threshold
    2. Truncate for LLM if response > LLM max
    3. Return artifact reference instead of full content
    """
    if not isinstance(tool, MCPTool):
        return tool_response
    
    # Convert response to dict
    mcp_response_dict = tool_response.model_dump(exclude_none=True)
    
    # Get configuration
    save_threshold = host_component.get_config(
        'mcp_tool_response_save_threshold_bytes', 2048
    )
    llm_max_bytes = host_component.get_config(
        'mcp_tool_llm_return_max_bytes', 4096
    )
    
    # Calculate size
    response_size = len(json.dumps(mcp_response_dict).encode('utf-8'))
    
    needs_saving = response_size > save_threshold
    needs_truncation = response_size > llm_max_bytes
    
    if needs_saving:
        # Save as artifact
        save_result = await save_mcp_response_as_artifact(
            tool, tool_context, host_component, 
            mcp_response_dict, args
        )
        
        if needs_truncation:
            # All-or-nothing approach: withhold partial data
            return {
                'status': 'processed_saved_artifact_only',
                'message_to_llm': (
                    f"Response too large ({response_size} bytes). "
                    f"Saved as artifact '{save_result.artifacts_saved[0].data_filename}'. "
                    f"Use template_liquid or load_artifact to access data."
                ),
                'saved_mcp_response_artifact_details': save_result.model_dump(),
            }
        else:
            return {
                'status': 'processed_and_saved',
                'message_to_llm': (
                    f"Response saved as artifact '{save_result.artifacts_saved[0].data_filename}'."
                ),
                'mcp_tool_output': mcp_response_dict,
                'saved_mcp_response_artifact_details': save_result.model_dump(),
            }
    
    return mcp_response_dict
Configuration:
app_config:
  mcp_tool_response_save_threshold_bytes: 2048
  mcp_tool_llm_return_max_bytes: 4096
Intelligent Artifact Saving: The callback uses save_mcp_response_as_artifact_intelligent to intelligently handle different content types:
async def save_mcp_response_as_artifact_intelligent(
    tool: MCPTool,
    tool_context: ToolContext,
    host_component: SamAgentComponent,
    mcp_response_dict: Dict[str, Any],
    tool_args: Dict[str, Any],
) -> McpSaveResult:
    """
    Intelligently saves MCP response content:
    - Text content → text artifacts
    - Image content → image artifacts
    - Resource content → typed artifacts
    - Mixed content → multiple artifacts
    """
    artifacts_saved = []
    
    for content_item in mcp_response_dict.get('content', []):
        if content_item['type'] == 'text':
            artifact = await save_text_artifact(
                filename=f"{tool.name}_response.txt",
                content=content_item['text'],
            )
            artifacts_saved.append(artifact)
        
        elif content_item['type'] == 'image':
            artifact = await save_binary_artifact(
                filename=f"{tool.name}_image.{ext}",
                content=base64.b64decode(content_item['data']),
                mime_type=content_item['mimeType'],
            )
            artifacts_saved.append(artifact)
        
        elif content_item['type'] == 'resource':
            artifact = await save_resource_artifact(
                filename=content_item['resource']['uri'].split('/')[-1],
                content=content_item['resource']['blob'],
                mime_type=content_item['resource']['mimeType'],
            )
            artifacts_saved.append(artifact)
    
    return McpSaveResult(
        status=McpSaveStatus.SUCCESS,
        artifacts_saved=artifacts_saved,
    )

Custom Callbacks

Creating Custom Callbacks

You can create custom callbacks for specific agent behaviors:
from google.adk.agents.callback_context import CallbackContext
from typing import Optional

class MyCustomCallback:
    def __init__(self, config: dict):
        self.config = config
    
    async def before_model_call(
        self,
        callback_context: CallbackContext,
        llm_request: LlmRequest,
    ) -> Optional[LlmResponse]:
        """
        Inject dynamic instructions before LLM call.
        """
        # Get current time
        current_time = datetime.now().isoformat()
        
        # Inject into system message
        system_instruction = f"""
        Current time: {current_time}
        
        Additional context: {self.config.get('context')}
        """
        
        # Add to first user message
        if llm_request.contents:
            first_content = llm_request.contents[0]
            first_content.parts.insert(0, Part(text=system_instruction))
        
        return None  # Continue to LLM
    
    async def after_model_call(
        self,
        callback_context: CallbackContext,
        llm_response: LlmResponse,
    ) -> Optional[LlmResponse]:
        """
        Post-process LLM response.
        """
        # Extract metadata from response
        if llm_response.content:
            for part in llm_response.content.parts:
                if part.text and "[METADATA]" in part.text:
                    # Extract and store metadata
                    metadata = self.extract_metadata(part.text)
                    callback_context.state['metadata'] = metadata
        
        return None  # Return modified response
    
    async def after_tool_call(
        self,
        tool: BaseTool,
        args: Dict[str, Any],
        tool_context: ToolContext,
        tool_response: Any,
    ) -> Optional[Dict[str, Any]]:
        """
        Post-process tool execution results.
        """
        # Log tool usage
        self.log_tool_usage(tool.name, args, tool_response)
        
        # Transform response if needed
        if tool.name == 'search' and isinstance(tool_response, list):
            # Limit search results
            max_results = self.config.get('max_search_results', 10)
            return tool_response[:max_results]
        
        return tool_response

Registering Custom Callbacks

Callbacks are registered during agent setup:
from solace_agent_mesh.agent.sac.component import SamAgentComponent

class CustomAgentComponent(SamAgentComponent):
    def setup_callbacks(self):
        """Register custom callbacks."""
        super().setup_callbacks()  # Register built-in callbacks
        
        # Register custom callback
        custom_callback = MyCustomCallback(config=self.config)
        
        self.adk_agent.register_callback(
            'before_model_call',
            custom_callback.before_model_call,
        )
        
        self.adk_agent.register_callback(
            'after_tool_call',
            custom_callback.after_tool_call,
        )

Callback Context

Callbacks receive a CallbackContext object with access to:
class CallbackContext:
    state: Dict[str, Any]  # Mutable state shared across callbacks
    _invocation_context: InvocationContext  # ADK invocation context

Accessing Services

def my_callback(
    callback_context: CallbackContext,
    llm_request: LlmRequest,
) -> Optional[LlmResponse]:
    # Access artifact service
    inv_context = callback_context._invocation_context
    artifact_service = inv_context.artifact_service
    
    # Load artifact
    artifact = await artifact_service.load(
        app_name='my_agent',
        user_id='user123',
        filename='data.json',
    )
    
    # Inject artifact content into request
    llm_request.contents[0].parts.append(
        Part(text=f"Context: {artifact.content}")
    )
    
    return None

Storing State

# Store state for use in later callbacks
callback_context.state['processed_count'] = (
    callback_context.state.get('processed_count', 0) + 1
)

# Retrieve state
count = callback_context.state.get('processed_count', 0)

Callback Return Values

Continuing Execution

Return None to continue normal execution:
def my_callback(...) -> Optional[LlmResponse]:
    # Do preprocessing
    return None  # Continue to LLM

Short-Circuiting

Return a response to skip LLM call:
def cached_response_callback(
    callback_context: CallbackContext,
    llm_request: LlmRequest,
) -> Optional[LlmResponse]:
    # Check cache
    cache_key = generate_cache_key(llm_request)
    cached = get_from_cache(cache_key)
    
    if cached:
        # Return cached response (skip LLM)
        return LlmResponse(
            content=Content(parts=[Part(text=cached)]),
            partial=False,
            turn_complete=True,
        )
    
    return None  # Cache miss, continue to LLM

Callback Ordering

Callbacks execute in registration order:
# These execute in order:
1. repair_history_callback
2. inject_context_callback  
3. custom_preprocessing_callback
# → LLM call
4. process_artifact_blocks_callback
5. custom_postprocessing_callback

Error Handling

Callbacks should handle errors gracefully:
async def safe_callback(
    callback_context: CallbackContext,
    llm_request: LlmRequest,
) -> Optional[LlmResponse]:
    try:
        # Callback logic
        result = await risky_operation()
        return None
    except Exception as e:
        log.error(f"Callback error: {e}", exc_info=True)
        # Continue execution despite error
        return None

Performance Considerations

Async Operations

Callbacks can be async for I/O operations:
async def my_callback(...):
    # Async database query
    context = await db.get_user_context(user_id)
    # Continue processing

Caching

Cache expensive operations:
class CachedCallback:
    def __init__(self):
        self.cache = LRUCache(maxsize=1000)
    
    async def callback(self, callback_context, llm_request):
        cache_key = generate_key(llm_request)
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        result = await expensive_operation()
        self.cache[cache_key] = result
        return result

Best Practices

  1. Keep callbacks focused: Each callback should have one clear responsibility
  2. Handle errors gracefully: Don’t crash the agent on callback errors
  3. Use async when needed: I/O operations should be async
  4. Document behavior: Clearly document what each callback does
  5. Test thoroughly: Callbacks are critical path code

See Also

Build docs developers (and LLMs) love