Use async *run to stream chunks incrementally. Yield objects for structured data or plain strings for text.
Basic Streaming
Return an async generator to stream output:
import { Cli } from 'incur'
Cli.create('my-cli', { description: 'My CLI' })
.command('logs', {
description: 'Tail logs',
async *run() {
yield 'connecting...'
yield 'streaming logs'
yield 'done'
},
})
.serve()
Output
$ my-cli logs
connecting...
streaming logs
done
Each yielded value is written as a line in human/TOON mode.
Streaming Objects
Yield structured data:
import { Cli } from 'incur'
Cli.create('my-cli', { description: 'My CLI' })
.command('progress', {
async *run() {
yield { progress: 0, status: 'starting' }
yield { progress: 50, status: 'processing' }
yield { progress: 100, status: 'complete' }
},
})
.serve()
Output (TOON)
$ my-cli progress
progress: 0
status: starting
progress: 50
status: processing
progress: 100
status: complete
With --format jsonl, each chunk becomes a JSON object:
$ my-cli progress --format jsonl
{"type":"chunk","data":{"progress":0,"status":"starting"}}
{"type":"chunk","data":{"progress":50,"status":"processing"}}
{"type":"chunk","data":{"progress":100,"status":"complete"}}
This format is ideal for:
- Log aggregation
- Real-time monitoring
- Piping to other tools
Streaming with CTAs
Use c.ok() as the return value to attach CTAs or signal completion:
import { Cli } from 'incur'
Cli.create('my-cli', { description: 'My CLI' })
.command('build', {
async *run(c) {
yield { step: 'compile', status: 'running' }
yield { step: 'compile', status: 'done' }
yield { step: 'bundle', status: 'running' }
yield { step: 'bundle', status: 'done' }
return c.ok(undefined, {
cta: {
commands: [
{ command: 'deploy staging', description: 'Deploy to staging' },
],
},
})
},
})
.serve()
Output
$ my-cli build
step: compile
status: running
step: compile
status: done
step: bundle
status: running
step: bundle
status: done
Next:
my-cli deploy staging – Deploy to staging
Error Handling
Return c.error() to signal failure:
import { Cli } from 'incur'
Cli.create('my-cli', { description: 'My CLI' })
.command('process', {
async *run(c) {
yield { step: 1, status: 'processing' }
yield { step: 2, status: 'processing' }
const failed = true
if (failed) {
return c.error({
code: 'PROCESSING',
message: 'Processing failed at step 2',
})
}
yield { step: 3, status: 'done' }
},
})
.serve()
Output
$ my-cli process
step: 1
status: processing
step: 2
status: processing
Error (PROCESSING): Processing failed at step 2
Streaming with Delays
Simulate real-time streaming:
import { Cli } from 'incur'
function sleep(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms))
}
Cli.create('my-cli', { description: 'My CLI' })
.command('countdown', {
async *run() {
yield 'Starting countdown...'
for (let i = 5; i > 0; i--) {
await sleep(1000)
yield `${i}...`
}
yield 'Liftoff!'
},
})
.serve()
Output
$ my-cli countdown
Starting countdown...
5...
4...
3...
2...
1...
Liftoff!
Real-Time Logs
Stream logs from an external process:
import { Cli } from 'incur'
import { spawn } from 'node:child_process'
Cli.create('my-cli', { description: 'My CLI' })
.command('tail', {
description: 'Tail application logs',
async *run() {
const proc = spawn('tail', ['-f', '/var/log/app.log'])
for await (const chunk of proc.stdout) {
yield chunk.toString().trim()
}
},
})
.serve()
Streaming from APIs
Stream data from HTTP endpoints:
import { Cli } from 'incur'
Cli.create('my-cli', { description: 'My CLI' })
.command('fetch', {
description: 'Fetch streaming data',
async *run() {
const response = await fetch('https://api.example.com/stream')
const reader = response.body?.getReader()
if (!reader) throw new Error('No response body')
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const text = decoder.decode(value, { stream: true })
yield text
}
},
})
.serve()
Buffered Streaming
Buffer chunks before yielding:
import { Cli } from 'incur'
Cli.create('my-cli', { description: 'My CLI' })
.command('stream', {
async *run() {
const buffer: string[] = []
for (let i = 0; i < 100; i++) {
buffer.push(`line ${i}`)
// Yield every 10 lines
if (buffer.length === 10) {
yield buffer.join('\n')
buffer.length = 0
}
}
// Yield remaining
if (buffer.length > 0) {
yield buffer.join('\n')
}
},
})
.serve()
MCP Streaming
When running as an MCP server (--mcp), streaming chunks are sent as progress notifications:
import { Cli } from 'incur'
Cli.create('my-cli', { description: 'My CLI' })
.command('analyze', {
async *run() {
yield { stage: 'parsing', progress: 0.25 }
yield { stage: 'analyzing', progress: 0.5 }
yield { stage: 'reporting', progress: 0.75 }
return { complete: true }
},
})
.serve()
When invoked via MCP:
- Each
yield sends a progress notification to the agent
- The final
return value becomes the tool result
Agents receive incremental updates and can show progress in real-time.
Streaming vs Non-Streaming
Non-streaming (returns once)
Cli.create('my-cli', { description: 'My CLI' })
.command('status', {
run() {
return { status: 'ok', uptime: 3600 }
},
})
.serve()
Streaming (yields multiple times)
Cli.create('my-cli', { description: 'My CLI' })
.command('monitor', {
async *run() {
while (true) {
yield { status: 'ok', uptime: getUptime() }
await sleep(1000)
}
},
})
.serve()
Use streaming when:
- Output is produced incrementally
- Long-running operations need progress updates
- Real-time data needs to be displayed
- Logs or events are being tailed
Return Values
Async generators can return values:
import { Cli } from 'incur'
Cli.create('my-cli', { description: 'My CLI' })
.command('process', {
output: z.object({ summary: z.string() }),
async *run(c) {
yield { step: 1 }
yield { step: 2 }
yield { step: 3 }
// Return final result
return { summary: 'Processed 3 steps' }
},
})
.serve()
Output
$ my-cli process
step: 1
step: 2
step: 3
summary: Processed 3 steps
The return value is displayed after all yielded chunks.
Implementation
Streaming detection is automatic based on the function signature:
type RunHandler<args, env, options, output, vars, cmdEnv> =
| ((c: Context) => output | Promise<output>)
| ((c: Context) => AsyncGenerator<unknown, output, unknown>)
When run is an async generator, incur:
- Iterates over each yielded value
- Formats and writes each chunk to stdout
- Handles the final return value as the result
Use streaming for long-running operations to provide real-time feedback to agents and humans.