Skip to main content

Connection Overview

Connections in the ACP Dart SDK manage the bidirectional JSON-RPC communication between clients and agents. The SDK provides two connection types:
  • AgentSideConnection: For agents connecting to clients
  • ClientSideConnection: For clients connecting to agents
    ┌─────────────────────────────────────────────┐
    │         Connection Architecture             │
    └─────────────────────────────────────────────┘

┌──────────────────┐              ┌──────────────────┐
│                  │              │                  │
│  ClientSideConn  │◄────────────►│  AgentSideConn   │
│                  │   AcpStream  │                  │
└──────────────────┘              └──────────────────┘
         │                                 │
         │ implements                      │ implements
         ▼                                 ▼
┌──────────────────┐              ┌──────────────────┐
│      Agent       │              │      Client      │
│    interface     │              │    interface     │
└──────────────────┘              └──────────────────┘
  (call agent)                      (call client)

Base Connection Class

Both connection types are built on the internal Connection class that handles:
  • JSON-RPC 2.0 protocol implementation
  • Request/response tracking
  • Notification delivery
  • Error handling and mapping
  • Request cancellation
class Connection {
  Connection(
    RequestHandler requestHandler,
    NotificationHandler notificationHandler,
    AcpStream stream,
  );
  
  Future<T> sendRequest<T>(String method, [dynamic params]);
  Future<void> sendNotification(String method, [dynamic params]);
  Future<bool> cancelPendingRequest(RequestId requestId, {Map<String, dynamic>? meta});
}

ACP Streams

Connections communicate over ACP streams, which are bidirectional message streams using NDJSON format:
class AcpStream {
  final Stream<Map<String, dynamic>> readable;
  final StreamSink<Map<String, dynamic>> writable;
}

Creating Streams

The SDK provides the ndJsonStream() helper function:
import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

// Create stream from stdin/stdout (typical for agents)
final stream = ndJsonStream(stdin, stdout);

// Create stream from subprocess pipes (typical for clients)
final agentProcess = await Process.start('agent-binary', []);
final stream = ndJsonStream(agentProcess.stdout, agentProcess.stdin);

// With error handling
final stream = ndJsonStream(
  stdin,
  stdout,
  onParseError: (line, error) {
    stderr.writeln('Failed to parse: $line');
    stderr.writeln('Error: $error');
  },
);
ndJsonStream() automatically handles JSON parsing, encoding, and newline delimiters. Malformed lines are skipped, allowing the stream to continue processing valid messages.

AgentSideConnection

AgentSideConnection enables agents to communicate with clients.

Creating an Agent Connection

import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

class MyAgent implements Agent {
  final AgentSideConnection _connection;
  
  MyAgent(this._connection);
  
  // Implement Agent interface methods...
}

void main() {
  final stream = ndJsonStream(stdin, stdout);
  
  // Factory function receives the connection
  final connection = AgentSideConnection(
    (conn) => MyAgent(conn),
    stream,
  );
  
  // Connection is now active and handling messages
}

Outbound Methods (Agent → Client)

The AgentSideConnection implements the Client interface for calling client methods:
// File operations
final content = await _connection.readTextFile(
  ReadTextFileRequest(
    sessionId: sessionId,
    path: '/project/config.json',
  ),
);

await _connection.writeTextFile(
  WriteTextFileRequest(
    sessionId: sessionId,
    path: '/output/result.txt',
    content: 'Generated output',
  ),
);

// Permission requests
final permission = await _connection.requestPermission(
  RequestPermissionRequest(
    sessionId: sessionId,
    options: options,
    toolCall: toolCallUpdate,
  ),
);

// Session updates (notification)
await _connection.sessionUpdate(
  SessionNotification(
    sessionId: sessionId,
    update: AgentMessageChunkSessionUpdate(
      content: TextContentBlock(text: 'Response text'),
    ),
  ),
);

// Terminal operations
final terminal = await _connection.createTerminal(
  CreateTerminalRequest(
    sessionId: sessionId,
    command: 'npm',
    args: ['test'],
  ),
);

final output = await _connection.terminalOutput(
  TerminalOutputRequest(
    sessionId: sessionId,
    terminalId: terminal.terminalId,
  ),
);

await _connection.releaseTerminal(
  ReleaseTerminalRequest(
    sessionId: sessionId,
    terminalId: terminal.terminalId,
  ),
);

// Extension methods
final result = await _connection.extMethod(
  '_custom/method',
  {'param': 'value'},
);

await _connection.extNotification(
  '_custom/notification',
  {'data': 'value'},
);

Inbound Request Handling

The connection automatically routes incoming requests to your Agent implementation:
  • initializeagent.initialize()
  • session/newagent.newSession()
  • session/promptagent.prompt()
  • session/cancelagent.cancel()
  • And all other agent methods…

Cancellation Support

// Send cancellation notification
await _connection.sendCancelRequest(
  CancelRequestNotification(requestId: requestId),
);

// Cancel and reject a pending outbound request
final wasCancelled = await _connection.cancelPendingRequest(
  requestId,
  meta: {'reason': 'Timeout'},
);

ClientSideConnection

ClientSideConnection enables clients to communicate with agents.

Creating a Client Connection

import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

class MyClient implements Client {
  @override
  Future<RequestPermissionResponse> requestPermission(
    RequestPermissionRequest params,
  ) async {
    // Handle permission request
  }
  
  @override
  Future<void> sessionUpdate(SessionNotification params) async {
    // Handle session update
  }
  
  // Implement other Client interface methods...
}

Future<void> main() async {
  // Spawn agent process
  final agentProcess = await Process.start('my-agent', []);
  
  // Create stream from process pipes
  final stream = ndJsonStream(agentProcess.stdout, agentProcess.stdin);
  
  // Create connection
  final client = MyClient();
  final connection = ClientSideConnection((conn) => client, stream);
  
  // Use the connection
  await connection.initialize(InitializeRequest(protocolVersion: 1));
}

Outbound Methods (Client → Agent)

The ClientSideConnection implements the Agent interface for calling agent methods:
// Initialize connection
final initResponse = await connection.initialize(
  InitializeRequest(
    protocolVersion: 1,
    clientCapabilities: ClientCapabilities(
      fs: FileSystemCapability(readTextFile: true, writeTextFile: true),
      terminal: true,
    ),
  ),
);

// Authenticate if needed
if (initResponse.authMethods.isNotEmpty) {
  await connection.authenticate(
    AuthenticateRequest(methodId: initResponse.authMethods.first.id),
  );
}

// Create session
final session = await connection.newSession(
  NewSessionRequest(
    cwd: '/workspace',
    mcpServers: [],
  ),
);

// Send prompt
final result = await connection.prompt(
  PromptRequest(
    sessionId: session.sessionId,
    prompt: [TextContentBlock(text: 'Hello')],
  ),
);

// Cancel session
await connection.cancel(
  CancelNotification(sessionId: session.sessionId),
);

// Session management
await connection.setSessionMode(
  SetSessionModeRequest(sessionId: sessionId, modeId: 'architect'),
);

final sessions = await connection.unstableListSessions(
  ListSessionsRequest(),
);

final forked = await connection.unstableForkSession(
  ForkSessionRequest(sessionId: sessionId, cwd: '/workspace'),
);

Inbound Request Handling

The connection automatically routes incoming requests to your Client implementation:
  • session/updateclient.sessionUpdate()
  • session/request_permissionclient.requestPermission()
  • fs/read_text_fileclient.readTextFile()
  • fs/write_text_fileclient.writeTextFile()
  • terminal/*client.createTerminal(), etc.

Request/Response Flow

The connection handles the JSON-RPC request/response cycle automatically:
// 1. Client sends request
final request = PromptRequest(sessionId: '123', prompt: [...]);
final responseFuture = connection.prompt(request);

// 2. Connection serializes and sends:
// {
//   "jsonrpc": "2.0",
//   "id": 0,
//   "method": "session/prompt",
//   "params": {...}
// }

// 3. Agent receives, processes, and responds:
// {
//   "jsonrpc": "2.0",
//   "id": 0,
//   "result": {"stopReason": "end_turn", ...}
// }

// 4. Connection deserializes and completes future
final response = await responseFuture;
print('Stop reason: ${response.stopReason}');

Notification Flow

Notifications are fire-and-forget messages with no response:
// Agent sends notification
await connection.sessionUpdate(
  SessionNotification(
    sessionId: '123',
    update: AgentMessageChunkSessionUpdate(...),
  ),
);

// Connection serializes and sends:
// {
//   "jsonrpc": "2.0",
//   "method": "session/update",
//   "params": {...}
// }
// (note: no "id" field)

// Client receives and handles immediately:
// client.sessionUpdate(params) is called

Error Handling

The connection automatically maps Dart errors to JSON-RPC error codes:

Automatic Error Mapping

// Parameter validation errors → -32602 (Invalid params)
@override
Future<PromptResponse> prompt(PromptRequest params) async {
  if (params.prompt.isEmpty) {
    throw ArgumentError('Prompt cannot be empty');
  }
  // Automatically mapped to InvalidParams error
}

// Unexpected errors → -32603 (Internal error)
@override
Future<PromptResponse> prompt(PromptRequest params) async {
  // Any unexpected exception becomes Internal error
  throw Exception('Something went wrong');
}

Explicit Error Responses

import 'package:acp_dart/acp_dart.dart';

// Throw RequestError for specific error codes
throw RequestError.methodNotFound('unknown/method');
throw RequestError.invalidParams({'reason': 'Invalid format'});
throw RequestError.authRequired('Please authenticate first');
throw RequestError.resourceNotFound('/missing/file.txt');
throw RequestError.requestCancelled();

// Custom error codes
throw RequestError(-32001, 'Custom error', {'details': 'info'});

Error Response Structure

class ErrorResponse {
  final int code;         // Error code
  final String message;   // Error message
  final dynamic data;     // Optional error data
}

Request Cancellation

Protocol-Level Cancellation

ACP supports the $/cancel_request notification for cancelling specific requests:
// Track request IDs
final requestIdCompleter = Completer<int>();
connection._nextRequestId; // Internal, but tracked

// Send a long-running request
final promptFuture = connection.prompt(request);

// Later, cancel it
await connection.sendCancelRequest(
  CancelRequestNotification(requestId: requestId),
);

// Or use the convenience method
final cancelled = await connection.cancelPendingRequest(
  requestId,
  meta: {'reason': 'User cancelled'},
);

if (cancelled) {
  // Request was pending and has been cancelled
  // The future will complete with error code -32800
}

Handling Cancellation

Implement ProtocolCancellationHandler to handle incoming cancellation:
class MyAgent implements Agent, ProtocolCancellationHandler {
  @override
  Future<void> cancelRequest(CancelRequestNotification params) async {
    // Find and cancel the request
    final requestId = params.requestId;
    // Cancel associated operations...
  }
}
Cancellation is best-effort. The receiver may have already completed the request before receiving the cancellation notification.

Write Queue

Connections serialize outgoing messages to prevent interleaving:
// These calls are queued internally
await connection.sessionUpdate(update1);
await connection.sessionUpdate(update2);
await connection.sessionUpdate(update3);

// Messages are sent in order:
// 1. update1 completes
// 2. update2 completes
// 3. update3 completes
This ensures message ordering is preserved even when sending multiple notifications rapidly.

Method Name Mapping

The SDK provides constants for protocol method names:
// Agent methods (client → agent)
const agentMethods = {
  'initialize': 'initialize',
  'authenticate': 'authenticate',
  'sessionNew': 'session/new',
  'sessionLoad': 'session/load',
  'sessionPrompt': 'session/prompt',
  'sessionCancel': 'session/cancel',
  'sessionSetMode': 'session/set_mode',
  'sessionSetConfigOption': 'session/set_config_option',
  'sessionList': 'session/list',
  'sessionFork': 'session/fork',
  'sessionResume': 'session/resume',
  'modelSelect': 'session/set_model',
};

// Client methods (agent → client)
const clientMethods = {
  'sessionUpdate': 'session/update',
  'sessionRequestPermission': 'session/request_permission',
  'fsReadTextFile': 'fs/read_text_file',
  'fsWriteTextFile': 'fs/write_text_file',
  'terminalCreate': 'terminal/create',
  'terminalOutput': 'terminal/output',
  'terminalWaitForExit': 'terminal/wait_for_exit',
  'terminalKill': 'terminal/kill',
  'terminalRelease': 'terminal/release',
};

// Protocol-level methods
const protocolMethods = {
  'cancelRequest': r'$/cancel_request',
};

Connection Lifecycle

// 1. Create stream
final stream = ndJsonStream(stdin, stdout);

// 2. Create connection (automatically starts listening)
final connection = AgentSideConnection(
  (conn) => MyAgent(conn),
  stream,
);

// 3. Connection is now active
//    - Receiving messages from stream
//    - Routing to agent/client handlers
//    - Sending responses and notifications

// 4. Connection remains active until stream closes
//    When stdin/stdout close, the connection ends
Connections automatically start processing messages upon creation. There’s no explicit “start” or “connect” method to call.

Best Practices

Handle parse errors: Use the onParseError callback in ndJsonStream() to log malformed messages without crashing.
Track pending requests: Store request IDs if you need to cancel them later.
Implement cancellation handlers: If your operations can take a long time, implement ProtocolCancellationHandler to handle cancellation gracefully.
Use typed errors: Throw RequestError instead of generic exceptions for better error handling on the receiving side.
Clean up resources: When using terminals or other resources, ensure they’re properly released even if connections are interrupted.

Example: Robust Connection

Future<void> main() async {
  final stream = ndJsonStream(
    stdin,
    stdout,
    onParseError: (line, error) {
      stderr.writeln('[PARSE ERROR] $error');
      stderr.writeln('[LINE] $line');
    },
  );

  final connection = AgentSideConnection(
    (conn) => RobustAgent(conn),
    stream,
  );

  // Connection handles everything automatically
  // Wait for stdin to close
  await stdin.done;
}

class RobustAgent implements Agent, ProtocolCancellationHandler {
  final Map<RequestId, Completer> _activeRequests = {};

  @override
  Future<void> cancelRequest(CancelRequestNotification params) async {
    final completer = _activeRequests.remove(params.requestId);
    completer?.complete();
  }

  @override
  Future<PromptResponse> prompt(PromptRequest params) async {
    final completer = Completer<void>();
    final requestId = /* track somehow */;
    _activeRequests[requestId] = completer;

    try {
      // Do work, checking completer.isCompleted periodically
      return PromptResponse(stopReason: StopReason.endTurn);
    } finally {
      _activeRequests.remove(requestId);
    }
  }
}

Next Steps

Protocol Overview

Learn about the overall protocol

Sessions

Understand session management

Build docs developers (and LLMs) love