Skip to main content

Overview

The Microsoft Agent Framework integrates with Durable Functions and Durable Task Framework to enable long-running, stateful agent workflows. This integration provides checkpoint and resume capabilities, parallel execution, human-in-the-loop patterns, and reliable state management for complex agent orchestrations.

Key Features

  • Durable orchestrations: Coordinate multiple agent calls with automatic checkpointing
  • State persistence: Agent conversation state survives restarts and failures
  • Parallel execution: Run multiple agents concurrently and aggregate results
  • External events: Human-in-the-loop and external approval workflows
  • Long-running tools: Agents can invoke durable activities for extended operations
  • Reliable streaming: Resumable streaming with cursor-based resumption

Architecture Patterns

Worker-Client Architecture

DurableTask uses a distributed worker-client pattern:
┌─────────────┐         ┌─────────────────────┐         ┌──────────────┐
│   Client    │────────›│ DurableTask Server  │‹────────│    Worker    │
│             │         │  (Orchestrator)     │         │  (Executor)  │
│ - Schedule  │         │ - Task Hub          │         │ - Agents     │
│ - Query     │         │ - State Storage     │         │ - Activities │
│ - Events    │         │ - Event Queue       │         │ - Entities   │
└─────────────┘         └─────────────────────┘         └──────────────┘

Hosting Options

  1. Azure Functions: Fully managed with auto-scaling
  2. Console Apps: Self-hosted for development or on-premises
  3. ASP.NET: Embedded in web applications

Quick Start

Prerequisites

Start the Durable Task Scheduler emulator:
docker run -d --name dts-emulator -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
DTS dashboard available at http://localhost:8082.

Python Setup

1

Install Package

pip install agent-framework-durabletask
2

Create Worker

Create worker.py:
from agent_framework import Agent
from agent_framework.azure import AzureOpenAIChatClient, DurableAIAgentWorker
from azure.identity import AzureCliCredential
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
import os

# Create agent
agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    name="Assistant",
    instructions="You are a helpful assistant."
)

# Create DurableTask worker
dts_worker = DurableTaskSchedulerWorker(
    host_address="http://localhost:8080",
    secure_channel=False,
    taskhub="default"
)

# Wrap with agent worker
agent_worker = DurableAIAgentWorker(dts_worker)
agent_worker.add_agent(agent)

# Start worker
print("Worker started. Press Ctrl+C to stop.")
dts_worker.start()
3

Create Client

Create client.py:
import asyncio
from agent_framework_durabletask import DurableAIAgentClient
from durabletask.azuremanaged.client import DurableTaskSchedulerClient

async def main():
    # Create DurableTask client
    dts_client = DurableTaskSchedulerClient(
        host_address="http://localhost:8080",
        secure_channel=False,
        taskhub="default"
    )
    
    # Wrap with agent client
    agent_client = DurableAIAgentClient(dts_client)
    agent = agent_client.get_agent("Assistant")
    
    # Run agent
    response = await agent.run("What is machine learning?")
    print(f"Response: {response.text}")

asyncio.run(main())
4

Run

In one terminal:
python worker.py
In another terminal:
python client.py

C# Setup

1

Add Package

dotnet add package Microsoft.Agents.AI.Hosting.AzureFunctions
dotnet add package Microsoft.Azure.Functions.Worker.Extensions.DurableTask
2

Create Program.cs

using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Hosting.AzureFunctions;
using Microsoft.Azure.Functions.Worker.Builder;
using Microsoft.Extensions.Hosting;
using OpenAI.Chat;

string endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
    ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
string deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") 
    ?? "gpt-4o-mini";

var client = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential());
AIAgent agent = client.GetChatClient(deploymentName)
    .AsAIAgent("You are a helpful assistant.", "Assistant");

using IHost app = FunctionsApplication
    .CreateBuilder(args)
    .ConfigureFunctionsWebApplication()
    .ConfigureDurableAgents(options => options.AddAIAgent(agent))
    .Build();

app.Run();
3

Configure Settings

Create local.settings.json:
{
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
    "AZURE_OPENAI_ENDPOINT": "https://your-resource.openai.azure.com/",
    "AZURE_OPENAI_DEPLOYMENT_NAME": "gpt-4o-mini"
  }
}
4

Start Function

func start

Single Agent Pattern

Host a single durable agent with persistent state:
from agent_framework.azure import AzureOpenAIChatClient, DurableAIAgentWorker
from azure.identity import AzureCliCredential
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker

# Create agent
agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    name="Joker",
    instructions="You are good at telling jokes."
)

# Setup worker
dts_worker = DurableTaskSchedulerWorker(
    host_address="http://localhost:8080",
    secure_channel=False,
    taskhub="default"
)

agent_worker = DurableAIAgentWorker(dts_worker)
agent_worker.add_agent(agent)

print("Worker ready. Waiting for requests...")
dts_worker.start()
import asyncio
from agent_framework_durabletask import DurableAIAgentClient
from durabletask.azuremanaged.client import DurableTaskSchedulerClient

async def main():
    dts_client = DurableTaskSchedulerClient(
        host_address="http://localhost:8080",
        secure_channel=False,
        taskhub="default"
    )
    
    agent_client = DurableAIAgentClient(dts_client)
    joker = agent_client.get_agent("Joker")
    
    # Multi-turn conversation with persistent state
    response1 = await joker.run("Tell me a joke about clouds")
    print(f"Joke 1: {response1.text}")
    
    response2 = await joker.run("Tell me another one")
    print(f"Joke 2: {response2.text}")

asyncio.run(main())

Multi-Agent Pattern

Host multiple specialized agents:
from agent_framework.azure import AzureOpenAIChatClient, DurableAIAgentWorker
from azure.identity import AzureCliCredential
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker

client = AzureOpenAIChatClient(credential=AzureCliCredential())

# Create specialized agents
physicist = client.as_agent(
    name="Physicist",
    instructions="You are a physicist expert. Answer physics questions."
)

chemist = client.as_agent(
    name="Chemist",
    instructions="You are a chemistry expert. Answer chemistry questions."
)

# Register all agents
dts_worker = DurableTaskSchedulerWorker(
    host_address="http://localhost:8080",
    secure_channel=False,
    taskhub="default"
)

agent_worker = DurableAIAgentWorker(dts_worker)
agent_worker.add_agent(physicist)
agent_worker.add_agent(chemist)

print(f"Registered agents: {physicist.name}, {chemist.name}")
dts_worker.start()
import asyncio
from agent_framework_durabletask import DurableAIAgentClient
from durabletask.azuremanaged.client import DurableTaskSchedulerClient

async def main():
    dts_client = DurableTaskSchedulerClient(
        host_address="http://localhost:8080",
        secure_channel=False,
        taskhub="default"
    )
    
    agent_client = DurableAIAgentClient(dts_client)
    
    # Get specific agents
    physicist = agent_client.get_agent("Physicist")
    chemist = agent_client.get_agent("Chemist")
    
    # Query different agents
    physics_answer = await physicist.run("Explain quantum entanglement")
    chemistry_answer = await chemist.run("Explain chemical bonds")
    
    print(f"Physicist: {physics_answer.text}")
    print(f"Chemist: {chemistry_answer.text}")

asyncio.run(main())

Orchestration Patterns

Sequential Chaining

Chain agent calls with shared session:
from azure.durable_functions import DurableOrchestrationContext
from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient
from azure.identity import AzureCliCredential

WRITER_AGENT_NAME = "WriterAgent"

agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    name=WRITER_AGENT_NAME,
    instructions="""You refine text. Given an initial sentence you enhance it;
    given an improved sentence you polish it further."""
)

app = AgentFunctionApp(agents=[agent])

@app.orchestration_trigger(context_name="context")
def writer_orchestration(context: DurableOrchestrationContext):
    """Sequential refinement with shared conversation."""
    writer = app.get_agent(context, WRITER_AGENT_NAME)
    session = writer.create_session()
    
    # First pass
    initial = yield writer.run(
        messages="Write a sentence about learning.",
        session=session
    )
    
    # Second pass with context
    refined = yield writer.run(
        messages=f"Improve this: {initial.text}",
        session=session
    )
    
    return refined.text

Parallel Execution

Run multiple agents concurrently:
import asyncio
from azure.durable_functions import DurableOrchestrationContext
from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient

app = AgentFunctionApp(agents=[physicist, chemist, mathematician])

@app.orchestration_trigger(context_name="context")
def parallel_orchestration(context: DurableOrchestrationContext):
    """Run multiple agents concurrently and aggregate results."""
    
    physicist = app.get_agent(context, "Physicist")
    chemist = app.get_agent(context, "Chemist")
    mathematician = app.get_agent(context, "Mathematician")
    
    topic = context.get_input()
    
    # Schedule all agents concurrently
    physics_task = physicist.run(f"Explain {topic} from a physics perspective")
    chemistry_task = chemist.run(f"Explain {topic} from a chemistry perspective")
    math_task = mathematician.run(f"Explain {topic} from a mathematics perspective")
    
    # Wait for all to complete
    results = yield [physics_task, chemistry_task, math_task]
    
    # Aggregate results
    combined = "\n\n".join([
        f"Physics: {results[0].text}",
        f"Chemistry: {results[1].text}",
        f"Mathematics: {results[2].text}"
    ])
    
    return combined

Conditional Routing

Route to different agents based on conditions:
from azure.durable_functions import DurableOrchestrationContext
from pydantic import BaseModel

class Classification(BaseModel):
    is_spam: bool
    confidence: float

@app.orchestration_trigger(context_name="context")
def conditional_orchestration(context: DurableOrchestrationContext):
    """Route based on spam detection."""
    
    spam_detector = app.get_agent(context, "SpamDetector")
    email_assistant = app.get_agent(context, "EmailAssistant")
    
    email = context.get_input()
    
    # Step 1: Detect spam
    classification = yield spam_detector.run(
        messages=f"Classify this email: {email}",
        response_format=Classification
    )
    
    # Step 2: Conditional routing
    if classification.is_spam and classification.confidence > 0.8:
        return {"action": "blocked", "reason": "spam detected"}
    else:
        # Process legitimate email
        response = yield email_assistant.run(
            messages=f"Draft a reply to: {email}"
        )
        return {"action": "replied", "draft": response.text}

Human-in-the-Loop (HITL)

Incorporate human approval in workflows:
import asyncio
from datetime import timedelta
from azure.durable_functions import DurableOrchestrationContext

@app.orchestration_trigger(context_name="context")
def hitl_orchestration(context: DurableOrchestrationContext):
    """Request human approval before proceeding."""
    
    writer = app.get_agent(context, "WriterAgent")
    
    # Step 1: Generate draft
    draft = yield writer.run("Write a blog post about AI")
    
    # Step 2: Wait for human approval
    approval_event = context.wait_for_external_event(
        "approval_received",
        timeout=timedelta(hours=24)
    )
    
    timeout_event = context.create_timer(
        context.current_utc_datetime + timedelta(hours=24)
    )
    
    winner = yield context.task_any([approval_event, timeout_event])
    
    if winner == approval_event:
        # Approved: publish
        approval_data = approval_event.result
        if approval_data.get("approved"):
            return {"status": "published", "content": draft.text}
        else:
            # Revise based on feedback
            revised = yield writer.run(
                f"Revise this based on feedback '{approval_data.get('feedback')}': {draft.text}"
            )
            return {"status": "revised", "content": revised.text}
    else:
        # Timeout: escalate
        return {"status": "escalated", "reason": "approval timeout"}
Send approval:
# Approve
curl -X POST http://localhost:7071/api/approval/{instanceId} \
  -H "Content-Type: application/json" \
  -d '{"approved": true}'

# Request revision
curl -X POST http://localhost:7071/api/approval/{instanceId} \
  -H "Content-Type: application/json" \
  -d '{"approved": false, "feedback": "Add more technical details"}'

Reliable Streaming

Implement resumable streaming with Redis:
import redis.asyncio as redis
from agent_framework_durabletask import AgentResponseCallbackProtocol, AgentCallbackContext
from typing import AsyncIterator

class RedisStreamResponseHandler(AgentResponseCallbackProtocol):
    """Reliable streaming using Redis Streams."""
    
    def __init__(self, redis_client: redis.Redis, ttl_seconds: int = 3600):
        self.redis = redis_client
        self.ttl = ttl_seconds
    
    async def handle_agent_response(
        self,
        context: AgentCallbackContext,
        response_stream: AsyncIterator[str]
    ):
        """Stream agent responses to Redis."""
        stream_key = f"agent:response:{context.correlation_id}"
        
        async for chunk in response_stream:
            # Write to Redis Stream
            await self.redis.xadd(
                stream_key,
                {"chunk": chunk, "timestamp": context.timestamp}
            )
        
        # Set expiration
        await self.redis.expire(stream_key, self.ttl)
        
        # Mark as complete
        await self.redis.xadd(
            stream_key,
            {"status": "complete"}
        )
Consume stream with cursor:
import redis.asyncio as redis

async def consume_stream(correlation_id: str, cursor: str = "0"):
    """Resume streaming from cursor position."""
    r = redis.from_url("redis://localhost:6379")
    stream_key = f"agent:response:{correlation_id}"
    
    current_cursor = cursor
    
    while True:
        # Read from cursor position
        entries = await r.xread(
            {stream_key: current_cursor},
            count=10,
            block=1000
        )
        
        if not entries:
            break
        
        for stream, messages in entries:
            for message_id, data in messages:
                if b"chunk" in data:
                    print(data[b"chunk"].decode(), end="")
                    current_cursor = message_id
                elif b"status" in data and data[b"status"] == b"complete":
                    return
    
    # Return cursor for resumption
    return current_cursor

Configuration

Connection Settings

from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
from azure.identity import DefaultAzureCredential

# Local development
worker = DurableTaskSchedulerWorker(
    host_address="http://localhost:8080",
    secure_channel=False,
    taskhub="default"
)

# Azure production
worker = DurableTaskSchedulerWorker(
    host_address="https://my-dts.azurewebsites.net",
    secure_channel=True,
    taskhub="production",
    token_credential=DefaultAzureCredential()
)

Task Hub Configuration

Task hubs provide isolation between environments:
# Development
export TASKHUB="dev"

# Staging
export TASKHUB="staging"

# Production
export TASKHUB="production"

Monitoring

DTS Dashboard

View orchestration status at http://localhost:8082:
  • Orchestrations: List all running/completed orchestrations
  • Instance details: View inputs, outputs, and execution history
  • Timeline: See checkpoints and state transitions

Query Orchestration Status

from durabletask.azuremanaged.client import DurableTaskSchedulerClient

client = DurableTaskSchedulerClient(
    host_address="http://localhost:8080",
    secure_channel=False,
    taskhub="default"
)

# Get instance metadata
metadata = await client.get_instance_metadata(instance_id)
print(f"Status: {metadata.runtime_status}")
print(f"Input: {metadata.serialized_input}")
print(f"Output: {metadata.serialized_output}")

Best Practices

Orchestrations may replay. Ensure activities are idempotent:
@app.activity_trigger(input_name="data")
def process_data(data: dict) -> dict:
    # Use idempotency key
    idempotency_key = data["id"]
    
    # Check if already processed
    if already_processed(idempotency_key):
        return get_cached_result(idempotency_key)
    
    # Process and cache
    result = do_work(data)
    cache_result(idempotency_key, result)
    return result
Keep orchestration input/output small to reduce storage costs:
# Bad: Large data in orchestration
large_data = context.get_input()  # 10 MB
result = yield process_activity(large_data)

# Good: Use references
blob_url = context.get_input()  # Just URL
result = yield process_activity(blob_url)
Set appropriate timeouts for external events:
from datetime import timedelta

# Wait with timeout
approval = context.wait_for_external_event("approval")
timeout = context.create_timer(
    context.current_utc_datetime + timedelta(hours=24)
)

winner = yield context.task_any([approval, timeout])

if winner == timeout:
    # Handle timeout
    yield context.call_activity("send_escalation_email")
Separate environments with different task hubs:
  • Development: dev
  • Staging: staging
  • Production: production

Troubleshooting

Symptoms: Client schedules tasks but worker doesn’t process themSolutions:
  • Verify worker and client use same task hub name
  • Check DTS connection strings match
  • Ensure worker called start() and is running
  • Check DTS dashboard for pending tasks
Symptoms: Orchestration shows as “Running” indefinitelySolutions:
  • Check worker logs for errors
  • Verify all activities are registered
  • Look for unhandled exceptions in orchestration code
  • Check if waiting for external event that never arrives
Symptoms: “Non-deterministic behavior detected” errorsSolutions:
  • Don’t use random numbers or current time in orchestrations
  • Don’t make non-deterministic API calls
  • Use context.current_utc_datetime instead of datetime.now()
  • Generate GUIDs with context.new_uuid()

Next Steps

Azure Functions

Deploy durable agents to Azure Functions

A2A Protocol

Integrate with A2A agents in orchestrations

Build docs developers (and LLMs) love