The ACP TypeScript SDK uses the Web Streams API to provide bidirectional communication between clients and agents. Streams handle the serialization and deserialization of JSON-RPC messages.
Stream Interface
The core Stream type powers all ACP connections:
export type Stream = {
writable : WritableStream < AnyMessage >;
readable : ReadableStream < AnyMessage >;
};
This interface provides:
readable: A ReadableStream for receiving messages from the other side
writable: A WritableStream for sending messages to the other side
The AnyMessage type includes all JSON-RPC 2.0 message types: requests, responses, and notifications.
Using ndJsonStream
The most common way to create a Stream is using the ndJsonStream function, which handles newline-delimited JSON encoding:
export function ndJsonStream (
output : WritableStream < Uint8Array >,
input : ReadableStream < Uint8Array >,
) : Stream
Example: stdio-based Connection
Agent (stdio)
Client (subprocess)
import * as acp from '@agentclientprotocol/acp' ;
import { Readable , Writable } from 'node:stream' ;
// Convert Node.js streams to Web Streams
const input = Writable . toWeb ( process . stdout );
const output = Readable . toWeb ( process . stdin ) as ReadableStream < Uint8Array >;
// Create the stream
const stream = acp . ndJsonStream ( input , output );
// Use it with a connection
new acp . AgentSideConnection (( conn ) => new MyAgent ( conn ), stream );
How ndJsonStream Works
The ndJsonStream function creates a Stream that:
Encodes outgoing messages : Serializes AnyMessage objects to JSON and appends a newline
Decodes incoming messages : Splits input by newlines and parses each line as JSON
Handles errors : Logs parse errors without disrupting the stream
Manages buffering : Accumulates partial lines until complete messages arrive
Implementation Details
The writable stream encodes messages:
const writable = new WritableStream < AnyMessage >({
async write ( message ) {
const content = JSON . stringify ( message ) + " \n " ;
const writer = output . getWriter ();
try {
await writer . write ( textEncoder . encode ( content ));
} finally {
writer . releaseLock ();
}
},
});
The readable stream decodes messages:
const readable = new ReadableStream < AnyMessage >({
async start ( controller ) {
let content = "" ;
const reader = input . getReader ();
try {
while ( true ) {
const { value , done } = await reader . read ();
if ( done ) break ;
content += textDecoder . decode ( value , { stream: true });
const lines = content . split ( " \n " );
content = lines . pop () || "" ;
for ( const line of lines ) {
const trimmedLine = line . trim ();
if ( trimmedLine ) {
const message = JSON . parse ( trimmedLine ) as AnyMessage ;
controller . enqueue ( message );
}
}
}
} finally {
reader . releaseLock ();
controller . close ();
}
},
});
Creating Custom Streams
You can create custom Stream implementations for different transport mechanisms:
WebSocket Stream
function webSocketStream ( ws : WebSocket ) : Stream {
const readable = new ReadableStream < AnyMessage >({
start ( controller ) {
ws . onmessage = ( event ) => {
const message = JSON . parse ( event . data );
controller . enqueue ( message );
};
ws . onclose = () => {
controller . close ();
};
ws . onerror = ( error ) => {
controller . error ( error );
};
},
});
const writable = new WritableStream < AnyMessage >({
write ( message ) {
ws . send ( JSON . stringify ( message ));
},
});
return { readable , writable };
}
HTTP Stream (SSE + POST)
function httpStream ( url : string ) : Stream {
// Readable: Server-Sent Events
const readable = new ReadableStream < AnyMessage >({
start ( controller ) {
const eventSource = new EventSource ( url );
eventSource . onmessage = ( event ) => {
const message = JSON . parse ( event . data );
controller . enqueue ( message );
};
eventSource . onerror = () => {
controller . close ();
eventSource . close ();
};
},
});
// Writable: POST requests
const writable = new WritableStream < AnyMessage >({
async write ( message ) {
await fetch ( url , {
method: 'POST' ,
headers: { 'Content-Type' : 'application/json' },
body: JSON . stringify ( message ),
});
},
});
return { readable , writable };
}
Stream Requirements
When creating custom streams, ensure they:
Preserve message order : Messages must arrive in the order they were sent
Handle backpressure : Respect the writable stream’s ready state
Close gracefully : Clean up resources when the stream ends
Report errors : Use controller.error() for stream errors
Best Practices
Use ndJsonStream for stdio : It’s battle-tested and handles edge cases
Test custom streams : Verify message ordering and error handling
Handle encoding properly : Ensure UTF-8 encoding for text-based transports
Monitor stream closure : Listen for the connection’s closed promise