Skip to main content

Overview

The Connection class is the foundational building block for all ACP communication. It manages bidirectional JSON-RPC 2.0 message exchange over ACP streams, handling request/response matching, notifications, and cancellation semantics.

Class Definition

class Connection
This class provides:
  • Bidirectional JSON-RPC 2.0 communication
  • Request/response correlation using unique IDs
  • Notification handling (one-way messages)
  • Request cancellation support
  • Automatic message queuing and delivery

Constructor

Connection

Creates a new connection instance.
Connection(
  RequestHandler requestHandler,
  NotificationHandler notificationHandler,
  AcpStream stream,
)
requestHandler
RequestHandler
required
A function that handles incoming requests. Signature: Future<dynamic> Function(String method, dynamic params)
notificationHandler
NotificationHandler
required
A function that handles incoming notifications. Signature: Future<void> Function(String method, dynamic params)
stream
AcpStream
required
The bidirectional message stream for communication

Type Aliases

RequestHandler

Type alias for request handler functions.
typedef RequestHandler = Future<dynamic> Function(String method, dynamic params)

NotificationHandler

Type alias for notification handler functions.
typedef NotificationHandler = Future<void> Function(String method, dynamic params)

Methods

sendRequest

Sends a request and returns a future that completes with the response.
Future<T> sendRequest<T>(String method, [dynamic params])
The method automatically:
  • Generates a unique request ID
  • Sends the JSON-RPC request message
  • Tracks the pending response
  • Returns a future that resolves when the response arrives
method
String
required
The JSON-RPC method name
params
dynamic
Optional parameters for the request (typically a Map or List)
result
T
The response result, typed according to the generic parameter
Example:
final response = await connection.sendRequest<Map<String, dynamic>>(
  'session/new',
  {'mcpServers': []},
);
print('Session ID: ${response['sessionId']}');

sendNotification

Sends a notification (no response expected).
Future<void> sendNotification(String method, [dynamic params])
Notifications are one-way messages that do not expect a response. They are fire-and-forget.
method
String
required
The JSON-RPC method name
params
dynamic
Optional parameters for the notification
Example:
await connection.sendNotification('session/update', {
  'sessionId': 'abc123',
  'message': {'role': 'assistant', 'content': 'Hello'},
});

sendCancelRequestNotification

Sends the protocol-level cancellation notification $/cancel_request.
Future<void> sendCancelRequestNotification(CancelRequestNotification params)
params
CancelRequestNotification
required
The cancellation notification with request ID and optional metadata
Example:
await connection.sendCancelRequestNotification(
  CancelRequestNotification(
    requestId: 42,
    meta: {'reason': 'user_cancelled'},
  ),
);

cancelPendingRequest

Cancels a pending outbound request and notifies the peer.
Future<bool> cancelPendingRequest(
  RequestId requestId, {
  Map<String, dynamic>? meta,
})
Returns true if the request ID was still pending locally. This method:
  1. Removes the pending request from tracking
  2. Rejects the request’s future with a cancellation error
  3. Sends a $/cancel_request notification to the peer
requestId
RequestId
required
The ID of the request to cancel
meta
Map<String, dynamic>
Optional metadata to include in the cancellation notification
result
bool
true if the request was pending and was cancelled, false if it was already completed or not found
Example:
final requestId = 42;
final wasPending = await connection.cancelPendingRequest(
  requestId,
  meta: {'reason': 'timeout'},
);
if (wasPending) {
  print('Request $requestId was cancelled');
}

Message Processing

The Connection class automatically processes incoming messages and routes them appropriately:

Request Handling

Incoming requests (messages with both method and id) are:
  1. Validated and parsed
  2. Passed to the requestHandler function
  3. Responded to with either a success result or error

Notification Handling

Incoming notifications (messages with method but no id) are:
  1. Validated and parsed
  2. Passed to the notificationHandler function
  3. No response is sent

Response Handling

Incoming responses (messages with id but no method) are:
  1. Matched to pending requests by ID
  2. Used to resolve or reject the corresponding request future
  3. Removed from pending request tracking

Error Handling

The Connection class provides robust error handling:

RequestError Class

Represents an error in ACP/JSON-RPC communication.
class RequestError implements Exception {
  final int code;
  final String message;
  final dynamic data;

  RequestError(this.code, this.message, [this.data]);
}
code
int
required
The JSON-RPC error code
message
String
required
Human-readable error message
data
dynamic
Additional error data

Standard Error Factories

// Parse error (-32700)
RequestError.parseError([dynamic data])

// Invalid request (-32600)
RequestError.invalidRequest([dynamic data])

// Method not found (-32601)
RequestError.methodNotFound(String method)

// Invalid parameters (-32602)
RequestError.invalidParams([dynamic data])

// Internal error (-32603)
RequestError.internalError([dynamic data])

// Authentication required (-32000)
RequestError.authRequired([dynamic data])

// Resource not found (-32002)
RequestError.resourceNotFound([String? uri])

// Request cancelled (-32800)
RequestError.requestCancelled([dynamic data])

Error Response

JSON-RPC 2.0 error response structure.
class ErrorResponse {
  final int code;
  final String message;
  final dynamic data;

  ErrorResponse({required this.code, required this.message, this.data});
}

Usage Example

import 'package:acp_dart/acp_dart.dart';

void main() async {
  // Define request handler
  Future<dynamic> handleRequest(String method, dynamic params) async {
    switch (method) {
      case 'echo':
        return {'echo': params};
      case 'add':
        final a = params['a'] as int;
        final b = params['b'] as int;
        return {'result': a + b};
      default:
        throw RequestError.methodNotFound(method);
    }
  }

  // Define notification handler
  Future<void> handleNotification(String method, dynamic params) async {
    print('Notification: $method with params: $params');
  }

  // Create stream (in real usage, this would be stdio or network)
  final stream = createAcpStream();

  // Create connection
  final connection = Connection(
    handleRequest,
    handleNotification,
    stream,
  );

  // Send a request
  try {
    final result = await connection.sendRequest('add', {
      'a': 5,
      'b': 3,
    });
    print('Result: ${result['result']}'); // Output: Result: 8
  } catch (error) {
    print('Request failed: $error');
  }

  // Send a notification
  await connection.sendNotification('log', {
    'level': 'info',
    'message': 'Operation completed',
  });

  // Cancel a pending request
  final requestId = 42;
  final cancelled = await connection.cancelPendingRequest(requestId);
  print('Cancellation successful: $cancelled');
}

Advanced Features

Request Cancellation

The Connection class implements the ACP cancellation protocol:
  1. Local cancellation tracking: Maintains a set of locally cancelled request IDs
  2. Notification sending: Sends $/cancel_request to notify the peer
  3. Response filtering: Ignores responses for cancelled requests

Message Queuing

All outbound messages are queued to ensure ordered delivery:
Future<void> _writeQueue = Future.value();

Future<void> _sendMessage(Map<String, dynamic> message) {
  _writeQueue = _writeQueue.then((_) async {
    try {
      _stream.writable.add(message);
    } catch (error) {
      print('Error sending message: $error');
    }
  });
  return _writeQueue;
}
This ensures messages are sent in order, even when multiple requests are initiated concurrently.

See Also

Build docs developers (and LLMs) love