Skip to main content

Overview

The Agent-to-Agent (A2A) protocol enables standardized communication between agents built with different frameworks and technologies. The Microsoft Agent Framework provides full support for both hosting A2A-compliant agents and consuming external A2A agents.
Learn more about the A2A protocol at a2a-protocol.org

Key Concepts

Agent Discovery

A2A agents expose an AgentCard at /.well-known/agent.json that describes:
  • Agent name and description
  • Available skills and capabilities
  • Supported input/output modes
  • API endpoints

Communication Modes

  • Non-streaming: Request-response pattern for simple interactions
  • Streaming: Server-Sent Events (SSE) for real-time updates
  • Background tasks: Long-running operations with continuation tokens

Task States

A2A tasks progress through states:
  • working: Task is being processed
  • completed: Task finished successfully
  • failed: Task encountered an error
  • cancelled: Task was cancelled

Consuming A2A Agents

Connect to external A2A-compliant agents:
import asyncio
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent

async def main():
    a2a_host = "https://example-agent.com"
    
    # 1. Discover agent capabilities
    async with httpx.AsyncClient(timeout=60.0) as http_client:
        resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_host)
        agent_card = await resolver.get_agent_card()
        print(f"Found agent: {agent_card.name}")
        print(f"Description: {agent_card.description}")
        print(f"Skills: {[s.name for s in agent_card.skills]}")
    
    # 2. Create A2A agent instance
    async with A2AAgent(
        name=agent_card.name,
        description=agent_card.description,
        agent_card=agent_card,
        url=a2a_host,
    ) as agent:
        # 3. Simple request/response
        response = await agent.run("What can you do?")
        print(f"Response: {response.text}")
        
        # 4. Streaming response
        async with agent.run("Explain AI", stream=True) as stream:
            async for update in stream:
                for content in update.contents:
                    if content.text:
                        print(content.text, end="")
            
            final = await stream.get_final_response()
            print(f"\n\nComplete. {len(final.messages)} message(s) received.")

asyncio.run(main())

Hosting A2A Agents

Expose your agents via the A2A protocol:

Using Azure Functions

The AgentFunctionApp automatically exposes A2A-compliant endpoints:
from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient
from azure.identity import AzureCliCredential

agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    name="MyAgent",
    instructions="You are a helpful assistant.",
    description="A general-purpose AI assistant"
)

app = AgentFunctionApp(agents=[agent])

# Automatically available:
# - /.well-known/agent.json (AgentCard)
# - /api/agents/MyAgent/run (invoke endpoint)
# - /api/agents/MyAgent/task/{taskId} (task status)

Custom A2A Server

Implement A2A protocol manually for custom hosting:
from agent_framework.a2a import A2AServer
from agent_framework import Agent
from fastapi import FastAPI

app = FastAPI()
agent = # ... create your agent

# Create A2A server
a2a_server = A2AServer(agent)

# Expose endpoints
@app.get("/.well-known/agent.json")
async def get_agent_card():
    return a2a_server.get_agent_card()

@app.post("/api/run")
async def run_agent(request: dict):
    return await a2a_server.handle_request(request)

@app.get("/api/task/{task_id}")
async def get_task_status(task_id: str):
    return await a2a_server.get_task_status(task_id)

AgentCard Structure

The AgentCard describes your agent’s capabilities:
{
  "@type": "AgentCard",
  "name": "MyAgent",
  "description": "A helpful AI assistant",
  "version": "1.0",
  "url": "https://example.com",
  "skills": [
    {
      "name": "general_assistance",
      "description": "Answer questions and help with tasks",
      "tags": ["qa", "assistant"],
      "examples": [
        "What is the weather?",
        "Explain quantum computing"
      ],
      "inputModes": ["text/plain"],
      "outputModes": ["text/plain", "application/json"]
    }
  ],
  "capabilities": {
    "streaming": true,
    "backgroundTasks": true,
    "multimodal": false
  }
}

A2A Skills as Tools

Use A2A agent skills as tools for another agent:
from agent_framework.a2a import A2AAgent, a2a_skills_as_tools

# Discover remote agent
a2a_agent = await A2AAgent.from_url("https://specialist-agent.com")

# Convert skills to tools
tools = a2a_skills_as_tools(a2a_agent)

# Use in main agent
main_agent = client.as_agent(
    name="MainAgent",
    instructions="You coordinate with specialist agents.",
    tools=tools
)

# Main agent can now call A2A agent skills
response = await main_agent.run(
    "Use the specialist to analyze this data: {...}"
)

Background Tasks

Handle long-running operations with continuation tokens:
from agent_framework.a2a import A2AAgent

# Start background task
agent = A2AAgent(
    name="LongRunningAgent",
    agent_card=agent_card,
    url=a2a_host,
    background=True  # Don't wait for completion
)

response = await agent.run(
    "Perform a long computation",
    background=True
)

# Get continuation token
task_id = response.task_id
continuation_token = response.continuation_token

# Later: poll for status
status = await agent.get_task_status(task_id)
print(f"Status: {status.state}")  # working, completed, failed

if status.state == "completed":
    final_response = status.result
    print(f"Result: {final_response.text}")

# Or: resume streaming from continuation token
if continuation_token:
    async with agent.resume_stream(task_id, continuation_token) as stream:
        async for update in stream:
            print(f"Update: {update.text}")

Polling for Completion

Implement polling with exponential backoff:
import asyncio
from agent_framework.a2a import A2AAgent, TaskState

async def poll_until_complete(
    agent: A2AAgent,
    task_id: str,
    max_attempts: int = 60,
    initial_delay: float = 1.0
):
    """Poll task status until completion."""
    delay = initial_delay
    
    for attempt in range(max_attempts):
        status = await agent.get_task_status(task_id)
        
        if status.state == TaskState.COMPLETED:
            return status.result
        elif status.state == TaskState.FAILED:
            raise Exception(f"Task failed: {status.error}")
        elif status.state == TaskState.CANCELLED:
            raise Exception("Task was cancelled")
        
        # Exponential backoff
        await asyncio.sleep(delay)
        delay = min(delay * 1.5, 30.0)  # Cap at 30 seconds
    
    raise TimeoutError(f"Task did not complete within {max_attempts} attempts")

# Usage
response = await agent.run("Long task", background=True)
result = await poll_until_complete(agent, response.task_id)
print(f"Final result: {result.text}")

Multi-Agent Communication

Orchestrate multiple A2A agents:
import asyncio
from agent_framework.a2a import A2AAgent

async def orchestrate_agents():
    """Coordinate multiple A2A agents."""
    
    # Discover agents
    research_agent = await A2AAgent.from_url("https://research-agent.com")
    writing_agent = await A2AAgent.from_url("https://writing-agent.com")
    review_agent = await A2AAgent.from_url("https://review-agent.com")
    
    # Sequential workflow
    topic = "quantum computing"
    
    # Step 1: Research
    research = await research_agent.run(f"Research {topic}")
    print(f"Research complete: {len(research.text)} chars")
    
    # Step 2: Write draft
    draft = await writing_agent.run(
        f"Write an article based on this research: {research.text}"
    )
    print(f"Draft complete: {len(draft.text)} chars")
    
    # Step 3: Review and revise
    final = await review_agent.run(
        f"Review and improve this article: {draft.text}"
    )
    print(f"Final article: {final.text}")
    
    return final.text

# Parallel execution
async def parallel_agents():
    """Run multiple agents concurrently."""
    
    analyst1 = await A2AAgent.from_url("https://analyst1.com")
    analyst2 = await A2AAgent.from_url("https://analyst2.com")
    analyst3 = await A2AAgent.from_url("https://analyst3.com")
    
    # Gather results concurrently
    results = await asyncio.gather(
        analyst1.run("Analyze market trends"),
        analyst2.run("Analyze competitor data"),
        analyst3.run("Analyze customer feedback")
    )
    
    # Aggregate results
    combined = "\n\n".join([r.text for r in results])
    return combined

Error Handling

Handle A2A protocol errors gracefully:
from agent_framework.a2a import (
    A2AAgent,
    A2AConnectionError,
    A2AAuthenticationError,
    A2ATaskError,
    TaskState
)

async def robust_a2a_call(agent: A2AAgent, message: str):
    """Call A2A agent with comprehensive error handling."""
    
    try:
        response = await agent.run(message)
        return response
        
    except A2AConnectionError as e:
        print(f"Connection failed: {e}")
        # Retry with exponential backoff
        
    except A2AAuthenticationError as e:
        print(f"Authentication failed: {e}")
        # Refresh credentials
        
    except A2ATaskError as e:
        print(f"Task failed: {e}")
        if e.state == TaskState.FAILED:
            print(f"Error details: {e.error_message}")
        # Handle task-specific errors
        
    except TimeoutError:
        print("Request timed out")
        # Handle timeout
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        # General error handling

Testing A2A Agents

Using .NET Sample Server

For quick testing, use the .NET A2A sample server:
# Clone repository
git clone https://github.com/microsoft/agent-framework.git
cd agent-framework/dotnet/samples/05-end-to-end/A2AClientServer

# Set environment variables
export AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/"
export AZURE_OPENAI_DEPLOYMENT_NAME="gpt-4o-mini"

# Run server
dotnet run

# Server available at http://localhost:5001/
Then test with Python client:
# Set A2A host
export A2A_AGENT_HOST="http://localhost:5001/"

# Run Python sample
python agent_with_a2a.py

Mock A2A Server

Create a mock server for testing:
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class AgentCard(BaseModel):
    name: str
    description: str
    skills: list

@app.get("/.well-known/agent.json")
async def get_agent_card():
    return {
        "@type": "AgentCard",
        "name": "MockAgent",
        "description": "A mock agent for testing",
        "version": "1.0",
        "skills": [
            {
                "name": "echo",
                "description": "Echo input back",
                "inputModes": ["text/plain"],
                "outputModes": ["text/plain"]
            }
        ]
    }

@app.post("/api/run")
async def run_agent(request: dict):
    return {
        "status": "completed",
        "result": {
            "text": f"Echo: {request.get('message')}"
        }
    }

# Run: uvicorn mock_server:app --port 5001

Best Practices

  • Clear descriptions: Make skills discoverable with detailed descriptions
  • Relevant tags: Use tags for categorization and searchability
  • Provide examples: Include usage examples for each skill
  • Specify modes: Declare supported input/output MIME types
  • Retry logic: Implement exponential backoff for transient errors
  • Timeout management: Set appropriate timeouts for long-running tasks
  • Graceful degradation: Handle agent unavailability gracefully
  • Detailed errors: Return meaningful error messages in task failures
  • Connection pooling: Reuse HTTP connections for multiple requests
  • Parallel execution: Use asyncio.gather() for concurrent agent calls
  • Caching: Cache AgentCards to avoid repeated discovery
  • Streaming: Use streaming for long responses to reduce latency
  • Authentication: Implement OAuth 2.0 or API key authentication
  • HTTPS: Always use HTTPS in production
  • Rate limiting: Protect against abuse with rate limits
  • Input validation: Validate all inputs before processing

Troubleshooting

Error: Cannot retrieve AgentCard at /.well-known/agent.jsonSolutions:
  • Verify URL is correct and accessible
  • Check CORS configuration if accessing from browser
  • Ensure AgentCard endpoint returns valid JSON
  • Test with curl: curl https://agent.com/.well-known/agent.json
Error: Request times out when calling A2A agentSolutions:
  • Increase timeout: httpx.AsyncClient(timeout=120.0)
  • Use background tasks for long operations
  • Implement polling instead of waiting for completion
  • Check network connectivity and firewall rules
Error: Task remains in working state indefinitelySolutions:
  • Implement timeout in polling loop
  • Check A2A agent logs for errors
  • Verify task is actually processing (check metrics)
  • Consider using continuation tokens to resume
Error: Streaming response disconnects prematurelySolutions:
  • Use continuation tokens to resume from last position
  • Implement retry logic with exponential backoff
  • Check load balancer timeout settings
  • Verify SSE configuration on both client and server

Protocol Compliance

Ensure your A2A implementation follows the specification:
  • ✅ AgentCard at /.well-known/agent.json
  • ✅ Proper content negotiation (Accept headers)
  • ✅ Task state transitions (working → completed/failed)
  • ✅ Continuation token support for resumption
  • ✅ SSE format for streaming responses
  • ✅ Error responses with proper status codes
Validate compliance using the A2A validation tool.

Next Steps

Azure Functions

Host A2A agents on Azure Functions

DurableTask

Orchestrate A2A agents with Durable Functions

Build docs developers (and LLMs) love