Skip to main content
The ACP TypeScript SDK uses the Web Streams API to provide bidirectional communication between clients and agents. Streams handle the serialization and deserialization of JSON-RPC messages.

Stream Interface

The core Stream type powers all ACP connections:
export type Stream = {
  writable: WritableStream<AnyMessage>;
  readable: ReadableStream<AnyMessage>;
};
This interface provides:
  • readable: A ReadableStream for receiving messages from the other side
  • writable: A WritableStream for sending messages to the other side
The AnyMessage type includes all JSON-RPC 2.0 message types: requests, responses, and notifications.

Using ndJsonStream

The most common way to create a Stream is using the ndJsonStream function, which handles newline-delimited JSON encoding:
export function ndJsonStream(
  output: WritableStream<Uint8Array>,
  input: ReadableStream<Uint8Array>,
): Stream

Example: stdio-based Connection

import * as acp from '@agentclientprotocol/acp';
import { Readable, Writable } from 'node:stream';

// Convert Node.js streams to Web Streams
const input = Writable.toWeb(process.stdout);
const output = Readable.toWeb(process.stdin) as ReadableStream<Uint8Array>;

// Create the stream
const stream = acp.ndJsonStream(input, output);

// Use it with a connection
new acp.AgentSideConnection((conn) => new MyAgent(conn), stream);

How ndJsonStream Works

The ndJsonStream function creates a Stream that:
  1. Encodes outgoing messages: Serializes AnyMessage objects to JSON and appends a newline
  2. Decodes incoming messages: Splits input by newlines and parses each line as JSON
  3. Handles errors: Logs parse errors without disrupting the stream
  4. Manages buffering: Accumulates partial lines until complete messages arrive

Implementation Details

The writable stream encodes messages:
const writable = new WritableStream<AnyMessage>({
  async write(message) {
    const content = JSON.stringify(message) + "\n";
    const writer = output.getWriter();
    try {
      await writer.write(textEncoder.encode(content));
    } finally {
      writer.releaseLock();
    }
  },
});
The readable stream decodes messages:
const readable = new ReadableStream<AnyMessage>({
  async start(controller) {
    let content = "";
    const reader = input.getReader();
    try {
      while (true) {
        const { value, done } = await reader.read();
        if (done) break;
        
        content += textDecoder.decode(value, { stream: true });
        const lines = content.split("\n");
        content = lines.pop() || "";
        
        for (const line of lines) {
          const trimmedLine = line.trim();
          if (trimmedLine) {
            const message = JSON.parse(trimmedLine) as AnyMessage;
            controller.enqueue(message);
          }
        }
      }
    } finally {
      reader.releaseLock();
      controller.close();
    }
  },
});

Creating Custom Streams

You can create custom Stream implementations for different transport mechanisms:

WebSocket Stream

function webSocketStream(ws: WebSocket): Stream {
  const readable = new ReadableStream<AnyMessage>({
    start(controller) {
      ws.onmessage = (event) => {
        const message = JSON.parse(event.data);
        controller.enqueue(message);
      };
      
      ws.onclose = () => {
        controller.close();
      };
      
      ws.onerror = (error) => {
        controller.error(error);
      };
    },
  });
  
  const writable = new WritableStream<AnyMessage>({
    write(message) {
      ws.send(JSON.stringify(message));
    },
  });
  
  return { readable, writable };
}

HTTP Stream (SSE + POST)

function httpStream(url: string): Stream {
  // Readable: Server-Sent Events
  const readable = new ReadableStream<AnyMessage>({
    start(controller) {
      const eventSource = new EventSource(url);
      
      eventSource.onmessage = (event) => {
        const message = JSON.parse(event.data);
        controller.enqueue(message);
      };
      
      eventSource.onerror = () => {
        controller.close();
        eventSource.close();
      };
    },
  });
  
  // Writable: POST requests
  const writable = new WritableStream<AnyMessage>({
    async write(message) {
      await fetch(url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(message),
      });
    },
  });
  
  return { readable, writable };
}

Stream Requirements

When creating custom streams, ensure they:
  • Preserve message order: Messages must arrive in the order they were sent
  • Handle backpressure: Respect the writable stream’s ready state
  • Close gracefully: Clean up resources when the stream ends
  • Report errors: Use controller.error() for stream errors

Best Practices

  1. Use ndJsonStream for stdio: It’s battle-tested and handles edge cases
  2. Test custom streams: Verify message ordering and error handling
  3. Handle encoding properly: Ensure UTF-8 encoding for text-based transports
  4. Monitor stream closure: Listen for the connection’s closed promise

Build docs developers (and LLMs) love