Skip to main content
Use async *run to stream chunks incrementally. Yield objects for structured data or plain strings for text.

Basic Streaming

Return an async generator to stream output:
import { Cli } from 'incur'

Cli.create('my-cli', { description: 'My CLI' })
  .command('logs', {
    description: 'Tail logs',
    async *run() {
      yield 'connecting...'
      yield 'streaming logs'
      yield 'done'
    },
  })
  .serve()
Output
$ my-cli logs
connecting...
streaming logs
done
Each yielded value is written as a line in human/TOON mode.

Streaming Objects

Yield structured data:
import { Cli } from 'incur'

Cli.create('my-cli', { description: 'My CLI' })
  .command('progress', {
    async *run() {
      yield { progress: 0, status: 'starting' }
      yield { progress: 50, status: 'processing' }
      yield { progress: 100, status: 'complete' }
    },
  })
  .serve()
Output (TOON)
$ my-cli progress
progress: 0
status: starting
progress: 50
status: processing
progress: 100
status: complete

JSONL Format

With --format jsonl, each chunk becomes a JSON object:
$ my-cli progress --format jsonl
{"type":"chunk","data":{"progress":0,"status":"starting"}}
{"type":"chunk","data":{"progress":50,"status":"processing"}}
{"type":"chunk","data":{"progress":100,"status":"complete"}}
This format is ideal for:
  • Log aggregation
  • Real-time monitoring
  • Piping to other tools

Streaming with CTAs

Use c.ok() as the return value to attach CTAs or signal completion:
import { Cli } from 'incur'

Cli.create('my-cli', { description: 'My CLI' })
  .command('build', {
    async *run(c) {
      yield { step: 'compile', status: 'running' }
      yield { step: 'compile', status: 'done' }
      yield { step: 'bundle', status: 'running' }
      yield { step: 'bundle', status: 'done' }
      
      return c.ok(undefined, {
        cta: {
          commands: [
            { command: 'deploy staging', description: 'Deploy to staging' },
          ],
        },
      })
    },
  })
  .serve()
Output
$ my-cli build
step: compile
status: running
step: compile
status: done
step: bundle
status: running
step: bundle
status: done
Next:
  my-cli deploy staging Deploy to staging

Error Handling

Return c.error() to signal failure:
import { Cli } from 'incur'

Cli.create('my-cli', { description: 'My CLI' })
  .command('process', {
    async *run(c) {
      yield { step: 1, status: 'processing' }
      yield { step: 2, status: 'processing' }
      
      const failed = true
      if (failed) {
        return c.error({
          code: 'PROCESSING',
          message: 'Processing failed at step 2',
        })
      }
      
      yield { step: 3, status: 'done' }
    },
  })
  .serve()
Output
$ my-cli process
step: 1
status: processing
step: 2
status: processing
Error (PROCESSING): Processing failed at step 2

Streaming with Delays

Simulate real-time streaming:
import { Cli } from 'incur'

function sleep(ms: number) {
  return new Promise(resolve => setTimeout(resolve, ms))
}

Cli.create('my-cli', { description: 'My CLI' })
  .command('countdown', {
    async *run() {
      yield 'Starting countdown...'
      for (let i = 5; i > 0; i--) {
        await sleep(1000)
        yield `${i}...`
      }
      yield 'Liftoff!'
    },
  })
  .serve()
Output
$ my-cli countdown
Starting countdown...
5...
4...
3...
2...
1...
Liftoff!

Real-Time Logs

Stream logs from an external process:
import { Cli } from 'incur'
import { spawn } from 'node:child_process'

Cli.create('my-cli', { description: 'My CLI' })
  .command('tail', {
    description: 'Tail application logs',
    async *run() {
      const proc = spawn('tail', ['-f', '/var/log/app.log'])
      
      for await (const chunk of proc.stdout) {
        yield chunk.toString().trim()
      }
    },
  })
  .serve()

Streaming from APIs

Stream data from HTTP endpoints:
import { Cli } from 'incur'

Cli.create('my-cli', { description: 'My CLI' })
  .command('fetch', {
    description: 'Fetch streaming data',
    async *run() {
      const response = await fetch('https://api.example.com/stream')
      const reader = response.body?.getReader()
      
      if (!reader) throw new Error('No response body')
      
      const decoder = new TextDecoder()
      while (true) {
        const { done, value } = await reader.read()
        if (done) break
        
        const text = decoder.decode(value, { stream: true })
        yield text
      }
    },
  })
  .serve()

Buffered Streaming

Buffer chunks before yielding:
import { Cli } from 'incur'

Cli.create('my-cli', { description: 'My CLI' })
  .command('stream', {
    async *run() {
      const buffer: string[] = []
      
      for (let i = 0; i < 100; i++) {
        buffer.push(`line ${i}`)
        
        // Yield every 10 lines
        if (buffer.length === 10) {
          yield buffer.join('\n')
          buffer.length = 0
        }
      }
      
      // Yield remaining
      if (buffer.length > 0) {
        yield buffer.join('\n')
      }
    },
  })
  .serve()

MCP Streaming

When running as an MCP server (--mcp), streaming chunks are sent as progress notifications:
import { Cli } from 'incur'

Cli.create('my-cli', { description: 'My CLI' })
  .command('analyze', {
    async *run() {
      yield { stage: 'parsing', progress: 0.25 }
      yield { stage: 'analyzing', progress: 0.5 }
      yield { stage: 'reporting', progress: 0.75 }
      return { complete: true }
    },
  })
  .serve()
When invoked via MCP:
  1. Each yield sends a progress notification to the agent
  2. The final return value becomes the tool result
Agents receive incremental updates and can show progress in real-time.

Streaming vs Non-Streaming

Non-streaming (returns once)
Cli.create('my-cli', { description: 'My CLI' })
  .command('status', {
    run() {
      return { status: 'ok', uptime: 3600 }
    },
  })
  .serve()
Streaming (yields multiple times)
Cli.create('my-cli', { description: 'My CLI' })
  .command('monitor', {
    async *run() {
      while (true) {
        yield { status: 'ok', uptime: getUptime() }
        await sleep(1000)
      }
    },
  })
  .serve()
Use streaming when:
  • Output is produced incrementally
  • Long-running operations need progress updates
  • Real-time data needs to be displayed
  • Logs or events are being tailed

Return Values

Async generators can return values:
import { Cli } from 'incur'

Cli.create('my-cli', { description: 'My CLI' })
  .command('process', {
    output: z.object({ summary: z.string() }),
    async *run(c) {
      yield { step: 1 }
      yield { step: 2 }
      yield { step: 3 }
      
      // Return final result
      return { summary: 'Processed 3 steps' }
    },
  })
  .serve()
Output
$ my-cli process
step: 1
step: 2
step: 3
summary: Processed 3 steps
The return value is displayed after all yielded chunks.

Implementation

Streaming detection is automatic based on the function signature:
type RunHandler<args, env, options, output, vars, cmdEnv> =
  | ((c: Context) => output | Promise<output>)
  | ((c: Context) => AsyncGenerator<unknown, output, unknown>)
When run is an async generator, incur:
  1. Iterates over each yielded value
  2. Formats and writes each chunk to stdout
  3. Handles the final return value as the result
Use streaming for long-running operations to provide real-time feedback to agents and humans.

Build docs developers (and LLMs) love