Skip to main content

Overview

The Stream interface and utilities enable bidirectional communication between ACP clients and agents. Streams handle the serialization and deserialization of JSON-RPC messages over various transport layers.

Stream Interface

type Stream = {
  writable: WritableStream<AnyMessage>;
  readable: ReadableStream<AnyMessage>;
};
The Stream type represents a bidirectional communication channel that:
  • Receives JSON-RPC messages through the readable stream
  • Sends JSON-RPC messages through the writable stream

Properties

writable

writable: WritableStream<AnyMessage>
A writable stream for sending JSON-RPC messages. Messages written to this stream are serialized and sent to the remote peer.

readable

readable: ReadableStream<AnyMessage>
A readable stream for receiving JSON-RPC messages. Messages arriving from the remote peer are deserialized and made available through this stream.

ndJsonStream

function ndJsonStream(
  output: WritableStream<Uint8Array>,
  input: ReadableStream<Uint8Array>
): Stream
Creates an ACP Stream from a pair of newline-delimited JSON streams. This is the typical way to handle ACP connections over stdio or other byte-oriented transports. Parameters:
  • output (WritableStream<Uint8Array>) - The writable stream to send encoded messages to
  • input (ReadableStream<Uint8Array>) - The readable stream to receive encoded messages from
Returns: A Stream object for bidirectional ACP communication

How It Works

The ndJsonStream function:
  1. Encoding (Writable Stream):
    • Takes AnyMessage objects
    • Serializes them to JSON
    • Appends a newline character
    • Encodes as UTF-8 bytes
    • Writes to the output stream
  2. Decoding (Readable Stream):
    • Reads UTF-8 bytes from the input stream
    • Buffers incomplete lines
    • Splits on newline characters
    • Parses each line as JSON
    • Emits AnyMessage objects

Error Handling

The function handles errors gracefully:
  • Parse errors: Logged to console but don’t stop the stream
  • Empty lines: Silently skipped
  • Incomplete messages: Buffered until complete

Usage Examples

Creating a Stream from stdin/stdout (Node.js)

import { ndJsonStream, AgentSideConnection } from "@agentclientprotocol/sdk";
import { ReadableStream, WritableStream } from "node:stream/web";

// Convert Node.js streams to Web Streams API
const input = ReadableStream.from(
  (async function* () {
    for await (const chunk of process.stdin) {
      yield chunk;
    }
  })()
);

const output = new WritableStream({
  write(chunk) {
    process.stdout.write(chunk);
  },
});

// Create the ACP stream
const stream = ndJsonStream(output, input);

// Use with AgentSideConnection
const connection = new AgentSideConnection(
  (conn) => createAgent(conn),
  stream
);

Creating a Stream from stdin/stdout (Deno)

import { ndJsonStream, AgentSideConnection } from "@agentclientprotocol/sdk";

// Deno provides Web Streams API directly
const stream = ndJsonStream(
  Deno.stdout.writable,
  Deno.stdin.readable
);

const connection = new AgentSideConnection(
  (conn) => createAgent(conn),
  stream
);

Creating a Stream from WebSockets

import { ndJsonStream } from "@agentclientprotocol/sdk";
import type { AnyMessage } from "@agentclientprotocol/sdk";

function createWebSocketStream(ws: WebSocket): Stream {
  // Create output stream (writable)
  const output = new WritableStream<Uint8Array>({
    write(chunk) {
      ws.send(chunk);
    },
    close() {
      ws.close();
    },
  });

  // Create input stream (readable)
  const input = new ReadableStream<Uint8Array>({
    start(controller) {
      ws.onmessage = (event) => {
        if (event.data instanceof ArrayBuffer) {
          controller.enqueue(new Uint8Array(event.data));
        } else if (typeof event.data === "string") {
          controller.enqueue(new TextEncoder().encode(event.data));
        }
      };

      ws.onclose = () => {
        controller.close();
      };

      ws.onerror = (error) => {
        controller.error(error);
      };
    },
  });

  return ndJsonStream(output, input);
}

// Usage
const ws = new WebSocket("ws://localhost:8080");
const stream = createWebSocketStream(ws);
const connection = new ClientSideConnection(
  (agent) => createClient(agent),
  stream
);

Creating a Custom Stream Implementation

For custom transport layers, you can create Stream objects directly without using ndJsonStream:
import type { Stream, AnyMessage } from "@agentclientprotocol/sdk";

function createCustomStream(): Stream {
  // Message queue for incoming messages
  const messageQueue: AnyMessage[] = [];
  let readController: ReadableStreamDefaultController<AnyMessage> | null = null;

  const readable = new ReadableStream<AnyMessage>({
    start(controller) {
      readController = controller;
      // Enqueue any buffered messages
      while (messageQueue.length > 0) {
        controller.enqueue(messageQueue.shift()!);
      }
    },
  });

  const writable = new WritableStream<AnyMessage>({
    async write(message) {
      // Send message over your custom transport
      await customTransport.send(message);
    },
  });

  // Handle incoming messages from your transport
  customTransport.onMessage((message: AnyMessage) => {
    if (readController) {
      readController.enqueue(message);
    } else {
      messageQueue.push(message);
    }
  });

  return { readable, writable };
}

Testing with In-Memory Streams

import type { Stream, AnyMessage } from "@agentclientprotocol/sdk";

function createTestStreamPair(): [Stream, Stream] {
  // Create two transformer streams
  const { readable: readable1, writable: writable1 } = 
    new TransformStream<AnyMessage>();
  const { readable: readable2, writable: writable2 } = 
    new TransformStream<AnyMessage>();

  // Cross-connect them
  const stream1: Stream = {
    readable: readable1,
    writable: writable2,
  };

  const stream2: Stream = {
    readable: readable2,
    writable: writable1,
  };

  return [stream1, stream2];
}

// Usage in tests
const [clientStream, agentStream] = createTestStreamPair();

const client = new ClientSideConnection(
  (agent) => createClient(agent),
  clientStream
);

const agent = new AgentSideConnection(
  (conn) => createAgent(conn),
  agentStream
);

// Now client and agent can communicate
await client.initialize({ protocolVersion: "0.1.0" });

Stream Lifecycle

Connection Establishment

  1. Create a Stream with your transport layer
  2. Pass the Stream to AgentSideConnection or ClientSideConnection
  3. The connection automatically starts reading from the stream

Message Flow

// Outgoing messages
await connection.sendRequest("method", params);
// → Written to stream.writable
// → Serialized by transport
// → Sent to remote peer

// Incoming messages
// ← Received from remote peer
// ← Deserialized by transport
// ← Read from stream.readable
// → Processed by connection

Connection Closure

The connection closes when:
  • The readable stream ends (remote peer disconnected)
  • An unrecoverable error occurs
  • You explicitly close the underlying transport
// Wait for connection to close
await connection.closed;
console.log("Connection closed");

// Or listen for closure
connection.signal.addEventListener("abort", () => {
  console.log("Connection closed");
});

Best Practices

1. Use ndJsonStream for stdio

For most use cases with stdin/stdout, ndJsonStream is the recommended approach:
const stream = ndJsonStream(stdout, stdin);

2. Handle Transport Errors

Ensure your transport layer handles errors gracefully:
const output = new WritableStream<Uint8Array>({
  write(chunk) {
    try {
      socket.write(chunk);
    } catch (error) {
      console.error("Write error:", error);
      throw error; // Propagate to stream
    }
  },
});

3. Clean Up Resources

Close streams and connections when done:
// Wait for connection to complete
await connection.closed;

// Clean up any resources
await cleanup();

4. Buffer Management

Be mindful of buffering when implementing custom streams:
// Avoid unbounded buffers
const messageQueue: AnyMessage[] = [];
const MAX_QUEUE_SIZE = 100;

if (messageQueue.length >= MAX_QUEUE_SIZE) {
  throw new Error("Message queue full");
}

See Also

Build docs developers (and LLMs) love