Overview
This guide covers advanced patterns and techniques for building robust ACP implementations, including extension methods, request cancellation, managing multiple sessions, and sophisticated error handling.Extension Methods
Extension methods allow you to add custom functionality beyond the ACP specification.Implementing Extension Methods
class MyAgent implements Agent {
@override
Future<Map<String, dynamic>>? extMethod(
String method,
Map<String, dynamic> params,
) async {
switch (method) {
case '_analytics/track':
return await handleAnalytics(params);
case '_config/get':
return await getCustomConfig(params);
case '_debug/status':
return {
'activeSessions': _sessions.length,
'uptime': DateTime.now().difference(_startTime).inSeconds,
'memoryUsage': ProcessInfo.currentRss,
};
default:
throw RequestError.methodNotFound(method);
}
}
@override
Future<void>? extNotification(
String method,
Map<String, dynamic> params,
) async {
switch (method) {
case '_telemetry/event':
await logTelemetryEvent(params);
break;
case '_cache/clear':
await clearInternalCache();
break;
default:
// Unknown notifications can be safely ignored
break;
}
}
}
Calling Extension Methods
// Call agent extension method
final debugInfo = await connection.extMethod(
'_debug/status',
{},
);
print('Active sessions: ${debugInfo['activeSessions']}');
print('Uptime: ${debugInfo['uptime']}s');
// Send extension notification
await connection.extNotification(
'_cache/clear',
{},
);
Extension methods must start with underscore (
_). The ACP specification reserves this prefix for extensions.Protocol-Level Request Cancellation
Beyond session cancellation, ACP supports protocol-level request cancellation using the$/cancel_request notification.
Implementing Cancellation Handler
import 'package:acp_dart/acp_dart.dart';
class CancellableAgent implements Agent, ProtocolCancellationHandler {
final Map<RequestId, Completer<void>> _pendingRequests = {};
@override
Future<void> cancelRequest(CancelRequestNotification params) async {
final requestId = params.requestId;
final completer = _pendingRequests.remove(requestId);
if (completer != null && !completer.isCompleted) {
completer.complete();
print('Cancelled request: $requestId');
}
}
Future<T> _cancellableOperation<T>(
RequestId requestId,
Future<T> Function(Stream<void> cancelSignal) operation,
) async {
final completer = Completer<void>();
_pendingRequests[requestId] = completer;
try {
final result = await operation(completer.future.asStream());
return result;
} finally {
_pendingRequests.remove(requestId);
}
}
@override
Future<PromptResponse> prompt(PromptRequest params) async {
return await _cancellableOperation(
generateRequestId(),
(cancelSignal) async {
// Your long-running operation here
await processWithCancellation(cancelSignal);
return PromptResponse(stopReason: StopReason.endTurn);
},
);
}
}
Sending Cancellation Requests
// From the client side
final requestId = 123; // Track request IDs
// Cancel a specific request
await connection.sendCancelRequest(
CancelRequestNotification(requestId: requestId),
);
// Or use the helper method
final wasCancelled = await connection.cancelPendingRequest(
requestId,
meta: {'reason': 'user_requested'},
);
if (wasCancelled) {
print('Request was successfully cancelled');
}
Protocol-level cancellation is independent of session cancellation. Use it for fine-grained control over individual requests.
Managing Multiple Sessions
Handle multiple concurrent sessions efficiently.Session State Management
class SessionManager {
final Map<SessionId, SessionState> _sessions = {};
final int _maxSessions = 10;
SessionState getOrCreateSession(SessionId id) {
if (_sessions.length >= _maxSessions) {
_evictOldestSession();
}
return _sessions.putIfAbsent(
id,
() => SessionState(
id: id,
createdAt: DateTime.now(),
conversationHistory: [],
),
);
}
void _evictOldestSession() {
if (_sessions.isEmpty) return;
var oldest = _sessions.entries.first;
for (final entry in _sessions.entries) {
if (entry.value.lastAccessedAt.isBefore(oldest.value.lastAccessedAt)) {
oldest = entry;
}
}
_sessions.remove(oldest.key);
print('Evicted session: ${oldest.key}');
}
void updateLastAccess(SessionId id) {
_sessions[id]?.lastAccessedAt = DateTime.now();
}
}
class SessionState {
final SessionId id;
final DateTime createdAt;
DateTime lastAccessedAt;
final List<Message> conversationHistory;
SessionMode currentMode;
SessionState({
required this.id,
required this.createdAt,
required this.conversationHistory,
this.currentMode = SessionMode.default_,
}) : lastAccessedAt = createdAt;
}
Session Forking
Create branching conversations:// Implement fork capability
class ForkableAgent implements Agent {
@override
Future<ForkSessionResponse> unstableForkSession(
ForkSessionRequest params,
) async {
final originalSession = _sessions[params.sessionId];
if (originalSession == null) {
throw RequestError.resourceNotFound(params.sessionId);
}
// Create new session with copied state
final newSessionId = _generateRandomSessionId();
final forkedSession = originalSession.fork(newSessionId);
_sessions[newSessionId] = forkedSession;
return ForkSessionResponse(
sessionId: newSessionId,
modes: SessionModeState(
availableModes: forkedSession.availableModes,
currentModeId: forkedSession.currentMode.id,
),
);
}
}
// Client usage
final forkedSession = await connection.unstableForkSession(
ForkSessionRequest(
sessionId: originalSessionId,
cwd: Directory.current.path,
),
);
print('Forked to new session: ${forkedSession.sessionId}');
Session Resume
Resume sessions without replaying history:@override
Future<ResumeSessionResponse> unstableResumeSession(
ResumeSessionRequest params,
) async {
final session = await _loadSessionFromStorage(params.sessionId);
if (session == null) {
throw RequestError.resourceNotFound(params.sessionId);
}
_activeSessions[params.sessionId] = session;
return ResumeSessionResponse(
modes: SessionModeState(
availableModes: session.availableModes,
currentModeId: session.currentMode.id,
),
);
}
Advanced Error Handling
Implement comprehensive error handling patterns.Custom Error Types
class AgentError extends RequestError {
final String category;
final Map<String, dynamic>? context;
AgentError({
required int code,
required String message,
required this.category,
this.context,
}) : super(code, message, context);
factory AgentError.modelTimeout({
required String modelId,
required Duration timeout,
}) {
return AgentError(
code: -32001,
message: 'Model request timed out',
category: 'model_error',
context: {
'modelId': modelId,
'timeout': timeout.inSeconds,
},
);
}
factory AgentError.rateLimited({
required int retryAfter,
}) {
return AgentError(
code: -32003,
message: 'Rate limit exceeded',
category: 'rate_limit',
context: {'retryAfter': retryAfter},
);
}
}
Retry Logic with Exponential Backoff
Future<T> retryWithBackoff<T>(
Future<T> Function() operation, {
int maxAttempts = 3,
Duration initialDelay = const Duration(seconds: 1),
double backoffMultiplier = 2.0,
}) async {
int attempt = 0;
Duration delay = initialDelay;
while (true) {
attempt++;
try {
return await operation();
} catch (error) {
if (attempt >= maxAttempts) {
rethrow;
}
// Check if error is retryable
if (error is RequestError) {
if (error.code == -32003) { // Rate limited
final retryAfter = error.data?['retryAfter'] as int?;
if (retryAfter != null) {
delay = Duration(seconds: retryAfter);
}
} else if (!_isRetryable(error)) {
rethrow;
}
}
print('Attempt $attempt failed, retrying in ${delay.inSeconds}s...');
await Future.delayed(delay);
delay *= backoffMultiplier;
}
}
}
bool _isRetryable(RequestError error) {
// Retry on internal errors and timeouts
return error.code == -32603 || // Internal error
error.code == -32001; // Timeout
}
// Usage
final result = await retryWithBackoff(
() => connection.prompt(promptRequest),
maxAttempts: 5,
);
Circuit Breaker Pattern
class CircuitBreaker {
final int failureThreshold;
final Duration resetTimeout;
int _failureCount = 0;
DateTime? _lastFailureTime;
CircuitState _state = CircuitState.closed;
CircuitBreaker({
this.failureThreshold = 5,
this.resetTimeout = const Duration(minutes: 1),
});
Future<T> execute<T>(Future<T> Function() operation) async {
if (_state == CircuitState.open) {
if (DateTime.now().difference(_lastFailureTime!) > resetTimeout) {
_state = CircuitState.halfOpen;
_failureCount = 0;
} else {
throw RequestError.internalError('Circuit breaker is open');
}
}
try {
final result = await operation();
_onSuccess();
return result;
} catch (error) {
_onFailure();
rethrow;
}
}
void _onSuccess() {
_failureCount = 0;
_state = CircuitState.closed;
}
void _onFailure() {
_failureCount++;
_lastFailureTime = DateTime.now();
if (_failureCount >= failureThreshold) {
_state = CircuitState.open;
print('Circuit breaker opened after $failureCount failures');
}
}
}
enum CircuitState { closed, open, halfOpen }
Permission Management
Implement sophisticated permission handling.Permission Policy System
class PermissionPolicy {
final Map<String, PermissionRule> _rules = {};
void addRule(String pattern, PermissionRule rule) {
_rules[pattern] = rule;
}
PermissionDecision evaluate(ToolCallUpdate toolCall) {
for (final entry in _rules.entries) {
if (_matches(entry.key, toolCall)) {
return entry.value.evaluate(toolCall);
}
}
return PermissionDecision.prompt; // Default: ask user
}
bool _matches(String pattern, ToolCallUpdate toolCall) {
// Pattern matching logic
if (pattern == '*') return true;
if (pattern.startsWith('kind:')) {
return toolCall.kind?.toString() == pattern.substring(5);
}
if (pattern.startsWith('path:')) {
final pathPattern = pattern.substring(5);
return toolCall.locations?.any(
(loc) => loc.path.contains(pathPattern),
) ?? false;
}
return false;
}
}
enum PermissionDecision { allow, deny, prompt }
abstract class PermissionRule {
PermissionDecision evaluate(ToolCallUpdate toolCall);
}
class AllowRule extends PermissionRule {
@override
PermissionDecision evaluate(ToolCallUpdate toolCall) {
return PermissionDecision.allow;
}
}
class DenyRule extends PermissionRule {
@override
PermissionDecision evaluate(ToolCallUpdate toolCall) {
return PermissionDecision.deny;
}
}
// Usage
final policy = PermissionPolicy()
..addRule('kind:read', AllowRule()) // Auto-allow all reads
..addRule('path:.env', DenyRule()) // Always deny .env files
..addRule('kind:delete', PromptRule()); // Always ask for deletes
// In requestPermission implementation
@override
Future<RequestPermissionResponse> requestPermission(
RequestPermissionRequest params,
) async {
final decision = policy.evaluate(params.toolCall);
switch (decision) {
case PermissionDecision.allow:
return RequestPermissionResponse(
outcome: SelectedOutcome(
optionId: params.options
.firstWhere((o) => o.kind == PermissionOptionKind.allowOnce)
.optionId,
),
);
case PermissionDecision.deny:
return RequestPermissionResponse(
outcome: SelectedOutcome(
optionId: params.options
.firstWhere((o) => o.kind == PermissionOptionKind.rejectOnce)
.optionId,
),
);
case PermissionDecision.prompt:
return await _promptUser(params);
}
}
Resource Management
Manage resources efficiently.Terminal Handle Pool
class TerminalPool {
final Map<String, Process> _activeProcesses = {};
final int maxTerminals;
TerminalPool({this.maxTerminals = 10});
Future<String> createTerminal(
String command,
List<String> args,
) async {
if (_activeProcesses.length >= maxTerminals) {
throw RequestError.internalError(
'Maximum number of terminals reached',
);
}
final process = await Process.start(command, args);
final terminalId = _generateTerminalId();
_activeProcesses[terminalId] = process;
// Auto-cleanup on exit
process.exitCode.then((_) {
Future.delayed(Duration(minutes: 5), () {
_activeProcesses.remove(terminalId);
});
});
return terminalId;
}
Future<void> cleanup() async {
for (final process in _activeProcesses.values) {
process.kill();
}
_activeProcesses.clear();
}
}
Always implement proper cleanup for resources like terminals and file handles to prevent leaks.
Next Steps
Basic Agent
Review the basic agent implementation
Basic Client
Review the basic client implementation
Error Handling
Deep dive into error handling
Testing
Learn how to test your implementation