Skip to main content

Overview

The stream module provides utilities for creating bidirectional ACP streams using newline-delimited JSON (NDJSON) encoding. This is the standard way to handle ACP connections over stdio and other byte-oriented transports.

AcpStream

The core stream interface for ACP connections.
AcpStream
class
Bidirectional stream interface for ACP connections

Constructor

AcpStream({
  required Stream<Map<String, dynamic>> readable,
  required StreamSink<Map<String, dynamic>> writable,
})
readable
Stream<Map<String, dynamic>>
required
The readable stream to receive decoded messages from
writable
StreamSink<Map<String, dynamic>>
required
The writable sink to send encoded messages to

ndJsonStream

Creates an ACP stream from newline-delimited JSON byte streams.
AcpStream ndJsonStream(
  Stream<List<int>> input,
  StreamSink<List<int>> output, {
  void Function(String line, Object error)? onParseError,
})
input
Stream<List<int>>
required
The readable byte stream to receive encoded messages from (typically stdin)
output
StreamSink<List<int>>
required
The writable byte sink to send encoded messages to (typically stdout)
onParseError
void Function(String line, Object error)?
Optional callback invoked when a non-empty line cannot be parsed as JSON. Use this to log parse errors without disrupting the stream.

Returns

An AcpStream for bidirectional ACP communication with automatic NDJSON encoding/decoding.

Behavior

Readable Stream

The readable stream:
  1. Decodes bytes to UTF-8 strings
  2. Splits on newlines
  3. Trims whitespace
  4. Skips empty lines
  5. Parses JSON objects
  6. Calls onParseError for invalid JSON (if provided)
  7. Continues processing after errors
The stream skips malformed lines and continues processing subsequent messages. This ensures one bad message doesn’t break the entire connection.

Writable Stream

The writable stream:
  1. Encodes messages to JSON
  2. Appends newline character
  3. Encodes to UTF-8 bytes
  4. Writes to output sink

Usage Examples

Basic Usage (stdio)

import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

void main() {
  // Create stream from stdin/stdout
  final stream = ndJsonStream(stdin, stdout);
  
  // Use with AgentSideConnection
  final connection = AgentSideConnection(
    (conn) => MyAgent(conn),
    stream,
  );
  
  // Connection now handles all communication
}

With Error Logging

import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

void main() {
  // Log parse errors to stderr
  final stream = ndJsonStream(
    stdin,
    stdout,
    onParseError: (line, error) {
      stderr.writeln('Failed to parse message: $line');
      stderr.writeln('Error: $error');
    },
  );
  
  final connection = AgentSideConnection(
    (conn) => MyAgent(conn),
    stream,
  );
}

Custom Stream Source

import 'dart:async';
import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

void main() async {
  // Connect to a remote ACP server via socket
  final socket = await Socket.connect('localhost', 8080);
  
  final stream = ndJsonStream(
    socket,
    socket,
    onParseError: (line, error) {
      print('Parse error: $error');
    },
  );
  
  final connection = ClientSideConnection(
    (conn) => MyClient(conn),
    stream,
  );
}

Manual Message Reading

import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

void main() async {
  final stream = ndJsonStream(stdin, stdout);
  
  // Listen to incoming messages
  stream.readable.listen(
    (message) {
      print('Received: ${message['method']}');
      
      // Send a response
      stream.writable.add({
        'jsonrpc': '2.0',
        'id': message['id'],
        'result': {'status': 'ok'},
      });
    },
    onError: (error) {
      print('Stream error: $error');
    },
    onDone: () {
      print('Stream closed');
    },
  );
}

Error Handling

Parse Errors

import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

void main() {
  final stream = ndJsonStream(
    stdin,
    stdout,
    onParseError: (line, error) {
      // Log to stderr (doesn't interfere with stdout protocol)
      stderr.writeln('Parse error on line: $line');
      
      if (error is FormatException) {
        stderr.writeln('Invalid JSON format');
      } else {
        stderr.writeln('Error: $error');
      }
      
      // Stream continues processing next messages
    },
  );
  
  final connection = AgentSideConnection(
    (conn) => MyAgent(conn),
    stream,
  );
}
Always use stderr for logging when using stdio streams. Writing to stdout will corrupt the protocol stream.

Encoding Errors

import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

void main() {
  final stream = ndJsonStream(stdin, stdout);
  
  try {
    // This will fail if the message can't be encoded to JSON
    stream.writable.add({
      'invalid': Object(), // Objects can't be serialized
    });
  } catch (e) {
    stderr.writeln('Failed to encode message: $e');
  }
}

Stream Lifecycle

import 'dart:async';
import 'dart:io';
import 'package:acp_dart/acp_dart.dart';

void main() async {
  final stream = ndJsonStream(stdin, stdout);
  
  // Stream is active and processing messages
  
  // When done, close the writable side
  await stream.writable.close();
  
  // The readable side will complete when input closes
}

Thread Safety

The ndJsonStream implementation uses Dart streams, which are single-subscription by default. Messages are processed sequentially in the order they arrive.

Performance Considerations

Buffering: The NDJSON stream automatically handles buffering and line splitting, so partial UTF-8 sequences and incomplete lines are handled correctly.Backpressure: Dart streams provide natural backpressure handling. If the consumer is slow, the stream will pause reading until it’s ready.Memory: Each message is parsed independently, so memory usage is proportional to individual message size, not total stream size.

AgentSideConnection

Using streams with agents

ClientSideConnection

Using streams with clients

Connection

Base connection class

Build docs developers (and LLMs) love