Overview
This guide covers advanced MCP protocol features that go beyond basic tool and resource handling. Mastering these features helps you build more robust, user-friendly, and production-ready MCP servers.Progress notifications
Report progress for long-running operations
Request cancellation
Allow clients to cancel in-flight requests
Resource templates
Dynamic resource URIs with parameters
Server lifecycle
Proper initialization and shutdown handling
Logging control
Server-side logging level configuration
Error handling
Consistent, structured error responses
1. Progress notifications
For long-running operations, progress notifications keep users informed without blocking.Client ──► Server: tools/call (long operation)
Server ──► Client: notification: progress 10%
Server ──► Client: notification: progress 50%
Server ──► Client: notification: progress 90%
Server ──► Client: result (complete)
from mcp.server import Server
from mcp.types import ProgressNotification
import asyncio
app = Server("progress-server")
@app.tool()
async def process_large_file(file_path: str, ctx) -> str:
"""Process a large file with progress updates."""
file_size = os.path.getsize(file_path)
processed = 0
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
await process_chunk(chunk)
processed += len(chunk)
progress = (processed / file_size) * 100
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=progress,
total=100,
message=f"Processing: {progress:.1f}%"
)
)
return f"Processed {file_size} bytes"
@app.tool()
async def batch_operation(items: list[str], ctx) -> str:
"""Process multiple items with per-item progress."""
results = []
total = len(items)
for i, item in enumerate(items):
result = await process_item(item)
results.append(result)
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=i + 1,
total=total,
message=f"Processed {i + 1}/{total}: {item}"
)
)
return f"Completed {total} items"
Client-side progress handling
async def handle_progress(notification):
params = notification.params
print(f"Progress: {params.progress}/{params.total} — {params.message}")
session.on_notification("notifications/progress", handle_progress)
result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})
2. Request cancellation
Allow clients to cancel requests that are no longer needed or have timed out.from mcp.server import Server
from mcp.types import CancelledError
import asyncio
app = Server("cancellable-server")
@app.tool()
async def long_running_search(query: str, ctx) -> str:
"""Search that can be cancelled mid-execution."""
results = []
try:
for page in range(100):
if ctx.is_cancelled:
raise CancelledError("Search cancelled by user")
page_results = await search_page(query, page)
results.extend(page_results)
await asyncio.sleep(0.1) # Yields control for cancellation check
except CancelledError:
return f"Cancelled. Found {len(results)} results before cancellation."
return f"Found {len(results)} total results"
Client-side cancellation with timeout
async def search_with_timeout(session, query, timeout=30):
task = asyncio.create_task(
session.call_tool("long_running_search", {"query": query})
)
try:
result = await asyncio.wait_for(task, timeout=timeout)
return result
except asyncio.TimeoutError:
await session.send_notification({
"method": "notifications/cancelled",
"params": {"requestId": task.request_id, "reason": "Timeout"}
})
return "Search timed out"
3. Resource templates
Resource templates enable dynamic URI construction with parameters — useful for parameterized databases or APIs.from mcp.server import Server
from mcp.types import ResourceTemplate
app = Server("template-server")
@app.list_resource_templates()
async def list_templates() -> list[ResourceTemplate]:
return [
ResourceTemplate(
uriTemplate="db://users/{user_id}",
name="User Profile",
description="Fetch user profile by ID",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="api://weather/{city}/{date}",
name="Weather Data",
description="Historical weather for city and date",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="file://{path}",
name="File Content",
description="Read file at given path",
mimeType="text/plain"
)
]
@app.read_resource()
async def read_resource(uri: str) -> str:
if uri.startswith("db://users/"):
user_id = uri.split("/")[-1]
return await fetch_user(user_id)
elif uri.startswith("api://weather/"):
parts = uri.replace("api://weather/", "").split("/")
city, date = parts[0], parts[1]
return await fetch_weather(city, date)
elif uri.startswith("file://"):
path = uri.replace("file://", "")
return await read_file(path)
raise ValueError(f"Unknown resource URI: {uri}")
4. Server lifecycle events
Proper startup and shutdown handling ensures clean resource management and prevents connection leaks.from mcp.server import Server
from contextlib import asynccontextmanager
db_connection = None
cache = None
@asynccontextmanager
async def lifespan(server: Server):
global db_connection, cache
# Startup
print("Server starting...")
db_connection = await create_database_connection()
cache = await create_cache_client()
yield # Server runs here
# Shutdown
print("Server shutting down...")
await db_connection.close()
await cache.close()
app = Server("lifecycle-server", lifespan=lifespan)
@app.tool()
async def query_database(sql: str) -> str:
result = await db_connection.execute(sql)
return str(result)
5. Logging control
MCP supports server-side logging levels that clients can control at runtime.from mcp.server import Server
from mcp.types import LoggingLevel
import logging
app = Server("logging-server")
LEVEL_MAP = {
LoggingLevel.DEBUG: logging.DEBUG,
LoggingLevel.INFO: logging.INFO,
LoggingLevel.WARNING: logging.WARNING,
LoggingLevel.ERROR: logging.ERROR,
}
logger = logging.getLogger("mcp-server")
@app.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> None:
python_level = LEVEL_MAP.get(level, logging.INFO)
logger.setLevel(python_level)
logger.info(f"Logging level set to {level}")
@app.tool()
async def debug_operation(data: str, ctx) -> str:
logger.debug(f"Processing data: {data}")
# Send log to client as well
await ctx.send_log(
level="info",
message=f"Starting operation with input: {data}"
)
try:
result = process(data)
logger.info(f"Successfully processed: {result}")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
raise
6. Error handling patterns
Structured error classes
from mcp.types import McpError, ErrorCode
class ValidationError(McpError):
def __init__(self, message: str):
super().__init__(ErrorCode.INVALID_PARAMS, message)
class NotFoundError(McpError):
def __init__(self, resource: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")
class PermissionError(McpError):
def __init__(self, action: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")
class InternalError(McpError):
def __init__(self, message: str):
super().__init__(ErrorCode.INTERNAL_ERROR, message)
@app.tool()
async def safe_operation(input: str) -> str:
if not input:
raise ValidationError("Input cannot be empty")
if len(input) > 10000:
raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")
try:
if not await check_permission(input):
raise PermissionError(f"read {input}")
result = await perform_operation(input)
if result is None:
raise NotFoundError(input)
return result
except ConnectionError as e:
raise InternalError(f"Database connection failed: {e}")
except TimeoutError as e:
raise InternalError(f"Operation timed out: {e}")
TypeScript error handling
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
server.setRequestHandler(CallToolSchema, async (request) => {
try {
validateInput(request.params.arguments);
const result = await performOperation(request.params.arguments);
return { content: [{ type: "text", text: JSON.stringify(result) }] };
} catch (error) {
if (error instanceof McpError) throw error;
if (error instanceof NotFoundError)
throw new McpError(ErrorCode.InvalidRequest, error.message);
console.error("Unexpected error:", error);
throw new McpError(ErrorCode.InternalError, "An unexpected error occurred");
}
});
Experimental features (MCP 2025-11-25)
The following features are marked experimental in the MCP specification and may change before stabilization.
Tasks — long-running operations
@app.task()
async def training_task(model_id: str, data_path: str, ctx) -> str:
await ctx.report_status("running", "Initializing training...")
for epoch in range(100):
await train_epoch(model_id, data_path, epoch)
await ctx.report_status(
"running",
f"Training epoch {epoch + 1}/100",
progress=epoch + 1,
total=100
)
await ctx.report_status("completed", "Training finished")
return f"Model {model_id} trained successfully"
Tool annotations
@app.tool(
annotations={
"destructive": False, # Does not modify data
"idempotent": True, # Safe to retry
"timeout_seconds": 30, # Expected max duration
"requires_approval": False # No user approval needed
}
)
async def safe_query(query: str) -> str:
"""A read-only database query tool."""
return await execute_read_query(query)