Skip to main content

Overview

This guide covers advanced MCP protocol features that go beyond basic tool and resource handling. Mastering these features helps you build more robust, user-friendly, and production-ready MCP servers.

Progress notifications

Report progress for long-running operations

Request cancellation

Allow clients to cancel in-flight requests

Resource templates

Dynamic resource URIs with parameters

Server lifecycle

Proper initialization and shutdown handling

Logging control

Server-side logging level configuration

Error handling

Consistent, structured error responses

1. Progress notifications

For long-running operations, progress notifications keep users informed without blocking.
Client ──► Server: tools/call (long operation)
Server ──► Client: notification: progress 10%
Server ──► Client: notification: progress 50%
Server ──► Client: notification: progress 90%
Server ──► Client: result (complete)
from mcp.server import Server
from mcp.types import ProgressNotification
import asyncio

app = Server("progress-server")

@app.tool()
async def process_large_file(file_path: str, ctx) -> str:
    """Process a large file with progress updates."""
    file_size = os.path.getsize(file_path)
    processed = 0

    with open(file_path, 'rb') as f:
        while chunk := f.read(8192):
            await process_chunk(chunk)
            processed += len(chunk)

            progress = (processed / file_size) * 100
            await ctx.send_notification(
                ProgressNotification(
                    progressToken=ctx.request_id,
                    progress=progress,
                    total=100,
                    message=f"Processing: {progress:.1f}%"
                )
            )

    return f"Processed {file_size} bytes"


@app.tool()
async def batch_operation(items: list[str], ctx) -> str:
    """Process multiple items with per-item progress."""
    results = []
    total   = len(items)

    for i, item in enumerate(items):
        result = await process_item(item)
        results.append(result)

        await ctx.send_notification(
            ProgressNotification(
                progressToken=ctx.request_id,
                progress=i + 1,
                total=total,
                message=f"Processed {i + 1}/{total}: {item}"
            )
        )

    return f"Completed {total} items"

Client-side progress handling

async def handle_progress(notification):
    params = notification.params
    print(f"Progress: {params.progress}/{params.total}{params.message}")

session.on_notification("notifications/progress", handle_progress)
result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})

2. Request cancellation

Allow clients to cancel requests that are no longer needed or have timed out.
from mcp.server import Server
from mcp.types import CancelledError
import asyncio

app = Server("cancellable-server")

@app.tool()
async def long_running_search(query: str, ctx) -> str:
    """Search that can be cancelled mid-execution."""
    results = []

    try:
        for page in range(100):
            if ctx.is_cancelled:
                raise CancelledError("Search cancelled by user")

            page_results = await search_page(query, page)
            results.extend(page_results)
            await asyncio.sleep(0.1)  # Yields control for cancellation check

    except CancelledError:
        return f"Cancelled. Found {len(results)} results before cancellation."

    return f"Found {len(results)} total results"

Client-side cancellation with timeout

async def search_with_timeout(session, query, timeout=30):
    task = asyncio.create_task(
        session.call_tool("long_running_search", {"query": query})
    )

    try:
        result = await asyncio.wait_for(task, timeout=timeout)
        return result
    except asyncio.TimeoutError:
        await session.send_notification({
            "method": "notifications/cancelled",
            "params": {"requestId": task.request_id, "reason": "Timeout"}
        })
        return "Search timed out"

3. Resource templates

Resource templates enable dynamic URI construction with parameters — useful for parameterized databases or APIs.
from mcp.server import Server
from mcp.types import ResourceTemplate

app = Server("template-server")

@app.list_resource_templates()
async def list_templates() -> list[ResourceTemplate]:
    return [
        ResourceTemplate(
            uriTemplate="db://users/{user_id}",
            name="User Profile",
            description="Fetch user profile by ID",
            mimeType="application/json"
        ),
        ResourceTemplate(
            uriTemplate="api://weather/{city}/{date}",
            name="Weather Data",
            description="Historical weather for city and date",
            mimeType="application/json"
        ),
        ResourceTemplate(
            uriTemplate="file://{path}",
            name="File Content",
            description="Read file at given path",
            mimeType="text/plain"
        )
    ]

@app.read_resource()
async def read_resource(uri: str) -> str:
    if uri.startswith("db://users/"):
        user_id = uri.split("/")[-1]
        return await fetch_user(user_id)

    elif uri.startswith("api://weather/"):
        parts      = uri.replace("api://weather/", "").split("/")
        city, date = parts[0], parts[1]
        return await fetch_weather(city, date)

    elif uri.startswith("file://"):
        path = uri.replace("file://", "")
        return await read_file(path)

    raise ValueError(f"Unknown resource URI: {uri}")

4. Server lifecycle events

Proper startup and shutdown handling ensures clean resource management and prevents connection leaks.
from mcp.server import Server
from contextlib import asynccontextmanager

db_connection = None
cache         = None

@asynccontextmanager
async def lifespan(server: Server):
    global db_connection, cache

    # Startup
    print("Server starting...")
    db_connection = await create_database_connection()
    cache         = await create_cache_client()

    yield  # Server runs here

    # Shutdown
    print("Server shutting down...")
    await db_connection.close()
    await cache.close()

app = Server("lifecycle-server", lifespan=lifespan)

@app.tool()
async def query_database(sql: str) -> str:
    result = await db_connection.execute(sql)
    return str(result)

5. Logging control

MCP supports server-side logging levels that clients can control at runtime.
from mcp.server import Server
from mcp.types import LoggingLevel
import logging

app = Server("logging-server")

LEVEL_MAP = {
    LoggingLevel.DEBUG:   logging.DEBUG,
    LoggingLevel.INFO:    logging.INFO,
    LoggingLevel.WARNING: logging.WARNING,
    LoggingLevel.ERROR:   logging.ERROR,
}

logger = logging.getLogger("mcp-server")

@app.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> None:
    python_level = LEVEL_MAP.get(level, logging.INFO)
    logger.setLevel(python_level)
    logger.info(f"Logging level set to {level}")

@app.tool()
async def debug_operation(data: str, ctx) -> str:
    logger.debug(f"Processing data: {data}")

    # Send log to client as well
    await ctx.send_log(
        level="info",
        message=f"Starting operation with input: {data}"
    )

    try:
        result = process(data)
        logger.info(f"Successfully processed: {result}")
        return result
    except Exception as e:
        logger.error(f"Processing failed: {e}")
        raise

6. Error handling patterns

Structured error classes

from mcp.types import McpError, ErrorCode

class ValidationError(McpError):
    def __init__(self, message: str):
        super().__init__(ErrorCode.INVALID_PARAMS, message)

class NotFoundError(McpError):
    def __init__(self, resource: str):
        super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")

class PermissionError(McpError):
    def __init__(self, action: str):
        super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")

class InternalError(McpError):
    def __init__(self, message: str):
        super().__init__(ErrorCode.INTERNAL_ERROR, message)


@app.tool()
async def safe_operation(input: str) -> str:
    if not input:
        raise ValidationError("Input cannot be empty")

    if len(input) > 10000:
        raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")

    try:
        if not await check_permission(input):
            raise PermissionError(f"read {input}")

        result = await perform_operation(input)

        if result is None:
            raise NotFoundError(input)

        return result

    except ConnectionError as e:
        raise InternalError(f"Database connection failed: {e}")
    except TimeoutError as e:
        raise InternalError(f"Operation timed out: {e}")

TypeScript error handling

import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";

server.setRequestHandler(CallToolSchema, async (request) => {
    try {
        validateInput(request.params.arguments);
        const result = await performOperation(request.params.arguments);
        return { content: [{ type: "text", text: JSON.stringify(result) }] };

    } catch (error) {
        if (error instanceof McpError) throw error;

        if (error instanceof NotFoundError)
            throw new McpError(ErrorCode.InvalidRequest, error.message);

        console.error("Unexpected error:", error);
        throw new McpError(ErrorCode.InternalError, "An unexpected error occurred");
    }
});

Experimental features (MCP 2025-11-25)

The following features are marked experimental in the MCP specification and may change before stabilization.

Tasks — long-running operations

@app.task()
async def training_task(model_id: str, data_path: str, ctx) -> str:
    await ctx.report_status("running", "Initializing training...")

    for epoch in range(100):
        await train_epoch(model_id, data_path, epoch)
        await ctx.report_status(
            "running",
            f"Training epoch {epoch + 1}/100",
            progress=epoch + 1,
            total=100
        )

    await ctx.report_status("completed", "Training finished")
    return f"Model {model_id} trained successfully"

Tool annotations

@app.tool(
    annotations={
        "destructive":        False,  # Does not modify data
        "idempotent":         True,   # Safe to retry
        "timeout_seconds":    30,     # Expected max duration
        "requires_approval":  False   # No user approval needed
    }
)
async def safe_query(query: str) -> str:
    """A read-only database query tool."""
    return await execute_read_query(query)

Build docs developers (and LLMs) love