Connection Overview
Connections in the ACP Dart SDK manage the bidirectional JSON-RPC communication between clients and agents. The SDK provides two connection types:
AgentSideConnection : For agents connecting to clients
ClientSideConnection : For clients connecting to agents
┌─────────────────────────────────────────────┐
│ Connection Architecture │
└─────────────────────────────────────────────┘
┌──────────────────┐ ┌──────────────────┐
│ │ │ │
│ ClientSideConn │◄────────────►│ AgentSideConn │
│ │ AcpStream │ │
└──────────────────┘ └──────────────────┘
│ │
│ implements │ implements
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Agent │ │ Client │
│ interface │ │ interface │
└──────────────────┘ └──────────────────┘
(call agent) (call client)
Base Connection Class
Both connection types are built on the internal Connection class that handles:
JSON-RPC 2.0 protocol implementation
Request/response tracking
Notification delivery
Error handling and mapping
Request cancellation
class Connection {
Connection (
RequestHandler requestHandler,
NotificationHandler notificationHandler,
AcpStream stream,
);
Future < T > sendRequest < T >( String method, [ dynamic params]);
Future < void > sendNotification ( String method, [ dynamic params]);
Future < bool > cancelPendingRequest ( RequestId requestId, { Map < String , dynamic > ? meta});
}
ACP Streams
Connections communicate over ACP streams , which are bidirectional message streams using NDJSON format:
class AcpStream {
final Stream < Map < String , dynamic >> readable;
final StreamSink < Map < String , dynamic >> writable;
}
Creating Streams
The SDK provides the ndJsonStream() helper function:
import 'dart:io' ;
import 'package:acp_dart/acp_dart.dart' ;
// Create stream from stdin/stdout (typical for agents)
final stream = ndJsonStream (stdin, stdout);
// Create stream from subprocess pipes (typical for clients)
final agentProcess = await Process . start ( 'agent-binary' , []);
final stream = ndJsonStream (agentProcess.stdout, agentProcess.stdin);
// With error handling
final stream = ndJsonStream (
stdin,
stdout,
onParseError : (line, error) {
stderr. writeln ( 'Failed to parse: $ line ' );
stderr. writeln ( 'Error: $ error ' );
},
);
ndJsonStream() automatically handles JSON parsing, encoding, and newline delimiters. Malformed lines are skipped, allowing the stream to continue processing valid messages.
AgentSideConnection
AgentSideConnection enables agents to communicate with clients.
Creating an Agent Connection
import 'dart:io' ;
import 'package:acp_dart/acp_dart.dart' ;
class MyAgent implements Agent {
final AgentSideConnection _connection;
MyAgent ( this ._connection);
// Implement Agent interface methods...
}
void main () {
final stream = ndJsonStream (stdin, stdout);
// Factory function receives the connection
final connection = AgentSideConnection (
(conn) => MyAgent (conn),
stream,
);
// Connection is now active and handling messages
}
Outbound Methods (Agent → Client)
The AgentSideConnection implements the Client interface for calling client methods:
// File operations
final content = await _connection. readTextFile (
ReadTextFileRequest (
sessionId : sessionId,
path : '/project/config.json' ,
),
);
await _connection. writeTextFile (
WriteTextFileRequest (
sessionId : sessionId,
path : '/output/result.txt' ,
content : 'Generated output' ,
),
);
// Permission requests
final permission = await _connection. requestPermission (
RequestPermissionRequest (
sessionId : sessionId,
options : options,
toolCall : toolCallUpdate,
),
);
// Session updates (notification)
await _connection. sessionUpdate (
SessionNotification (
sessionId : sessionId,
update : AgentMessageChunkSessionUpdate (
content : TextContentBlock (text : 'Response text' ),
),
),
);
// Terminal operations
final terminal = await _connection. createTerminal (
CreateTerminalRequest (
sessionId : sessionId,
command : 'npm' ,
args : [ 'test' ],
),
);
final output = await _connection. terminalOutput (
TerminalOutputRequest (
sessionId : sessionId,
terminalId : terminal.terminalId,
),
);
await _connection. releaseTerminal (
ReleaseTerminalRequest (
sessionId : sessionId,
terminalId : terminal.terminalId,
),
);
// Extension methods
final result = await _connection. extMethod (
'_custom/method' ,
{ 'param' : 'value' },
);
await _connection. extNotification (
'_custom/notification' ,
{ 'data' : 'value' },
);
Inbound Request Handling
The connection automatically routes incoming requests to your Agent implementation:
initialize → agent.initialize()
session/new → agent.newSession()
session/prompt → agent.prompt()
session/cancel → agent.cancel()
And all other agent methods…
Cancellation Support
// Send cancellation notification
await _connection. sendCancelRequest (
CancelRequestNotification (requestId : requestId),
);
// Cancel and reject a pending outbound request
final wasCancelled = await _connection. cancelPendingRequest (
requestId,
meta : { 'reason' : 'Timeout' },
);
ClientSideConnection
ClientSideConnection enables clients to communicate with agents.
Creating a Client Connection
import 'dart:io' ;
import 'package:acp_dart/acp_dart.dart' ;
class MyClient implements Client {
@override
Future < RequestPermissionResponse > requestPermission (
RequestPermissionRequest params,
) async {
// Handle permission request
}
@override
Future < void > sessionUpdate ( SessionNotification params) async {
// Handle session update
}
// Implement other Client interface methods...
}
Future < void > main () async {
// Spawn agent process
final agentProcess = await Process . start ( 'my-agent' , []);
// Create stream from process pipes
final stream = ndJsonStream (agentProcess.stdout, agentProcess.stdin);
// Create connection
final client = MyClient ();
final connection = ClientSideConnection ((conn) => client, stream);
// Use the connection
await connection. initialize ( InitializeRequest (protocolVersion : 1 ));
}
Outbound Methods (Client → Agent)
The ClientSideConnection implements the Agent interface for calling agent methods:
// Initialize connection
final initResponse = await connection. initialize (
InitializeRequest (
protocolVersion : 1 ,
clientCapabilities : ClientCapabilities (
fs : FileSystemCapability (readTextFile : true , writeTextFile : true ),
terminal : true ,
),
),
);
// Authenticate if needed
if (initResponse.authMethods.isNotEmpty) {
await connection. authenticate (
AuthenticateRequest (methodId : initResponse.authMethods.first.id),
);
}
// Create session
final session = await connection. newSession (
NewSessionRequest (
cwd : '/workspace' ,
mcpServers : [],
),
);
// Send prompt
final result = await connection. prompt (
PromptRequest (
sessionId : session.sessionId,
prompt : [ TextContentBlock (text : 'Hello' )],
),
);
// Cancel session
await connection. cancel (
CancelNotification (sessionId : session.sessionId),
);
// Session management
await connection. setSessionMode (
SetSessionModeRequest (sessionId : sessionId, modeId : 'architect' ),
);
final sessions = await connection. unstableListSessions (
ListSessionsRequest (),
);
final forked = await connection. unstableForkSession (
ForkSessionRequest (sessionId : sessionId, cwd : '/workspace' ),
);
Inbound Request Handling
The connection automatically routes incoming requests to your Client implementation:
session/update → client.sessionUpdate()
session/request_permission → client.requestPermission()
fs/read_text_file → client.readTextFile()
fs/write_text_file → client.writeTextFile()
terminal/* → client.createTerminal(), etc.
Request/Response Flow
The connection handles the JSON-RPC request/response cycle automatically:
// 1. Client sends request
final request = PromptRequest (sessionId : '123' , prompt : [...]);
final responseFuture = connection. prompt (request);
// 2. Connection serializes and sends:
// {
// "jsonrpc": "2.0",
// "id": 0,
// "method": "session/prompt",
// "params": {...}
// }
// 3. Agent receives, processes, and responds:
// {
// "jsonrpc": "2.0",
// "id": 0,
// "result": {"stopReason": "end_turn", ...}
// }
// 4. Connection deserializes and completes future
final response = await responseFuture;
print ( 'Stop reason: ${ response . stopReason } ' );
Notification Flow
Notifications are fire-and-forget messages with no response:
// Agent sends notification
await connection. sessionUpdate (
SessionNotification (
sessionId : '123' ,
update : AgentMessageChunkSessionUpdate (...),
),
);
// Connection serializes and sends:
// {
// "jsonrpc": "2.0",
// "method": "session/update",
// "params": {...}
// }
// (note: no "id" field)
// Client receives and handles immediately:
// client.sessionUpdate(params) is called
Error Handling
The connection automatically maps Dart errors to JSON-RPC error codes:
Automatic Error Mapping
// Parameter validation errors → -32602 (Invalid params)
@override
Future < PromptResponse > prompt ( PromptRequest params) async {
if (params.prompt.isEmpty) {
throw ArgumentError ( 'Prompt cannot be empty' );
}
// Automatically mapped to InvalidParams error
}
// Unexpected errors → -32603 (Internal error)
@override
Future < PromptResponse > prompt ( PromptRequest params) async {
// Any unexpected exception becomes Internal error
throw Exception ( 'Something went wrong' );
}
Explicit Error Responses
import 'package:acp_dart/acp_dart.dart' ;
// Throw RequestError for specific error codes
throw RequestError . methodNotFound ( 'unknown/method' );
throw RequestError . invalidParams ({ 'reason' : 'Invalid format' });
throw RequestError . authRequired ( 'Please authenticate first' );
throw RequestError . resourceNotFound ( '/missing/file.txt' );
throw RequestError . requestCancelled ();
// Custom error codes
throw RequestError ( - 32001 , 'Custom error' , { 'details' : 'info' });
Error Response Structure
class ErrorResponse {
final int code; // Error code
final String message; // Error message
final dynamic data; // Optional error data
}
Request Cancellation
Protocol-Level Cancellation
ACP supports the $/cancel_request notification for cancelling specific requests:
// Track request IDs
final requestIdCompleter = Completer < int >();
connection._nextRequestId; // Internal, but tracked
// Send a long-running request
final promptFuture = connection. prompt (request);
// Later, cancel it
await connection. sendCancelRequest (
CancelRequestNotification (requestId : requestId),
);
// Or use the convenience method
final cancelled = await connection. cancelPendingRequest (
requestId,
meta : { 'reason' : 'User cancelled' },
);
if (cancelled) {
// Request was pending and has been cancelled
// The future will complete with error code -32800
}
Handling Cancellation
Implement ProtocolCancellationHandler to handle incoming cancellation:
class MyAgent implements Agent , ProtocolCancellationHandler {
@override
Future < void > cancelRequest ( CancelRequestNotification params) async {
// Find and cancel the request
final requestId = params.requestId;
// Cancel associated operations...
}
}
Cancellation is best-effort. The receiver may have already completed the request before receiving the cancellation notification.
Write Queue
Connections serialize outgoing messages to prevent interleaving:
// These calls are queued internally
await connection. sessionUpdate (update1);
await connection. sessionUpdate (update2);
await connection. sessionUpdate (update3);
// Messages are sent in order:
// 1. update1 completes
// 2. update2 completes
// 3. update3 completes
This ensures message ordering is preserved even when sending multiple notifications rapidly.
Method Name Mapping
The SDK provides constants for protocol method names:
// Agent methods (client → agent)
const agentMethods = {
'initialize' : 'initialize' ,
'authenticate' : 'authenticate' ,
'sessionNew' : 'session/new' ,
'sessionLoad' : 'session/load' ,
'sessionPrompt' : 'session/prompt' ,
'sessionCancel' : 'session/cancel' ,
'sessionSetMode' : 'session/set_mode' ,
'sessionSetConfigOption' : 'session/set_config_option' ,
'sessionList' : 'session/list' ,
'sessionFork' : 'session/fork' ,
'sessionResume' : 'session/resume' ,
'modelSelect' : 'session/set_model' ,
};
// Client methods (agent → client)
const clientMethods = {
'sessionUpdate' : 'session/update' ,
'sessionRequestPermission' : 'session/request_permission' ,
'fsReadTextFile' : 'fs/read_text_file' ,
'fsWriteTextFile' : 'fs/write_text_file' ,
'terminalCreate' : 'terminal/create' ,
'terminalOutput' : 'terminal/output' ,
'terminalWaitForExit' : 'terminal/wait_for_exit' ,
'terminalKill' : 'terminal/kill' ,
'terminalRelease' : 'terminal/release' ,
};
// Protocol-level methods
const protocolMethods = {
'cancelRequest' : r'$/cancel_request' ,
};
Connection Lifecycle
// 1. Create stream
final stream = ndJsonStream (stdin, stdout);
// 2. Create connection (automatically starts listening)
final connection = AgentSideConnection (
(conn) => MyAgent (conn),
stream,
);
// 3. Connection is now active
// - Receiving messages from stream
// - Routing to agent/client handlers
// - Sending responses and notifications
// 4. Connection remains active until stream closes
// When stdin/stdout close, the connection ends
Connections automatically start processing messages upon creation. There’s no explicit “start” or “connect” method to call.
Best Practices
Handle parse errors : Use the onParseError callback in ndJsonStream() to log malformed messages without crashing.
Track pending requests : Store request IDs if you need to cancel them later.
Implement cancellation handlers : If your operations can take a long time, implement ProtocolCancellationHandler to handle cancellation gracefully.
Use typed errors : Throw RequestError instead of generic exceptions for better error handling on the receiving side.
Clean up resources : When using terminals or other resources, ensure they’re properly released even if connections are interrupted.
Example: Robust Connection
Future < void > main () async {
final stream = ndJsonStream (
stdin,
stdout,
onParseError : (line, error) {
stderr. writeln ( '[PARSE ERROR] $ error ' );
stderr. writeln ( '[LINE] $ line ' );
},
);
final connection = AgentSideConnection (
(conn) => RobustAgent (conn),
stream,
);
// Connection handles everything automatically
// Wait for stdin to close
await stdin.done;
}
class RobustAgent implements Agent , ProtocolCancellationHandler {
final Map < RequestId , Completer > _activeRequests = {};
@override
Future < void > cancelRequest ( CancelRequestNotification params) async {
final completer = _activeRequests. remove (params.requestId);
completer ? . complete ();
}
@override
Future < PromptResponse > prompt ( PromptRequest params) async {
final completer = Completer < void >();
final requestId = /* track somehow */ ;
_activeRequests[requestId] = completer;
try {
// Do work, checking completer.isCompleted periodically
return PromptResponse (stopReason : StopReason .endTurn);
} finally {
_activeRequests. remove (requestId);
}
}
}
Next Steps
Protocol Overview Learn about the overall protocol
Sessions Understand session management