Skip to main content
A Channel is a low-level streaming primitive that forms the foundation of both Stream and Sink. It represents a nexus of I/O operations supporting both reading and writing. Channels are the foundational building blocks of Effect’s streaming capabilities. Most users should use Stream and Sink instead, but Channel is useful for advanced use cases and creating new streaming operators.

Type Parameters

A Channel has seven type parameters:
  • OutElem: Type of output elements
  • InElem: Type of input elements (contravariant)
  • OutErr: Type of output errors
  • InErr: Type of input errors (contravariant)
  • OutDone: Type of output completion value
  • InDone: Type of input completion value (contravariant)
  • Env: Context requirements

Basic Channels

Creating Channels

import { Channel, Effect } from "effect"

// Succeed with a value
const succeed = Channel.succeed(42)

// Fail with an error
const failed = Channel.fail("error")

// From an effect
const fromEffect = Channel.fromEffect(
  Effect.succeed("hello")
)

// Write output values
const writer = Channel.write(1)

// Read input value
const reader = Channel.readWith({
  onInput: (input) => Channel.succeed(input),
  onFailure: (error) => Channel.fail(error),
  onDone: (done) => Channel.succeed(done)
})

Composing Channels

import { Channel } from "effect"

// Pipe channels together
const composed = Channel.pipeTo(
  Channel.write(1),
  Channel.readWith({
    onInput: (n) => Channel.succeed(n * 2),
    onFailure: (e) => Channel.fail(e),
    onDone: () => Channel.succeed(0)
  })
)

// Sequential composition with flatMap
const sequenced = Channel.succeed(1).pipe(
  Channel.flatMap(n =>
    Channel.succeed(n * 2)
  )
)

Channel Operations

Mapping

import { Channel } from "effect"

const channel = Channel.succeed(42)

// Map output done value
const mapped = channel.pipe(
  Channel.map(n => n * 2)
)

// Map output elements
const mappedOut = channel.pipe(
  Channel.mapOut(n => n.toString())
)

// Map with effects
const mappedEffect = channel.pipe(
  Channel.mapEffect(n =>
    Effect.succeed(n * 2)
  )
)

Transforming Inputs

import { Channel } from "effect"

const inputChannel = Channel.identity<number, string, void>()

// Map input values
const mappedIn = inputChannel.pipe(
  Channel.mapInputIn((s: string) => s.length)
)

// Map input done value
const mappedDone = inputChannel.pipe(
  Channel.mapInput((s: string) => s.length)
)

Error Handling

import { Channel } from "effect"

const risky = Channel.fail("error")

// Catch and recover
const recovered = risky.pipe(
  Channel.catchAll(error =>
    Channel.succeed("fallback")
  )
)

// Catch all causes
const causeHandled = risky.pipe(
  Channel.catchAllCause(cause =>
    Channel.succeed("recovered")
  )
)

// Map errors
const mappedError = risky.pipe(
  Channel.mapError(e => `Error: ${e}`)
)

Folding

import { Channel } from "effect"

const channel = Channel.succeed(42)

// Fold over result
const folded = channel.pipe(
  Channel.foldChannel({
    onFailure: (error) => Channel.succeed(-1),
    onSuccess: (value) => Channel.succeed(value * 2)
  })
)

// Fold over cause
const foldedCause = channel.pipe(
  Channel.foldCauseChannel({
    onFailure: (cause) => Channel.succeed(-1),
    onSuccess: (value) => Channel.succeed(value * 2)
  })
)

Merging and Concatenation

Concatenation

import { Channel } from "effect"

// Concatenate channels sequentially
const ch1 = Channel.write(1)
const ch2 = Channel.write(2)

const concatenated = Channel.concatMap(
  ch1,
  () => ch2
)

// Concatenate with custom merging
const concatWith = Channel.concatMapWith(
  ch1,
  () => ch2,
  (a, b) => a + b, // combine intermediate results
  (a, final) => a + final // combine with final result
)

Merging

import { Channel } from "effect"

// Merge multiple channels
const channels = Channel.write(
  Channel.write(1)
).pipe(
  Channel.concatMap(() => Channel.write(Channel.write(2)))
)

const merged = channels.pipe(
  Channel.mergeAll({
    concurrency: 2,
    bufferSize: 16
  })
)

// Merge with custom strategy
const mergedWith = channels.pipe(
  Channel.mergeAllWith({
    concurrency: 2,
    bufferSize: 16
  }),
  (a, b) => a + b
)

Resource Management

Acquire-Release

import { Channel, Console, Effect } from "effect"

const managed = Channel.acquireUseRelease(
  Effect.gen(function*() {
    yield* Console.log("Acquiring")
    return { data: [1, 2, 3] }
  }),
  (resource) =>
    Channel.writeAll(...resource.data),
  (resource, exit) =>
    Console.log("Releasing")
)

Finalizers

import { Channel, Console } from "effect"

const channel = Channel.write(1)

// Add finalizer
const withFinalizer = channel.pipe(
  Channel.ensuring(
    Console.log("Cleaning up")
  )
)

// Exit-aware finalizer
const withExitHandler = channel.pipe(
  Channel.ensuringWith((exit) =>
    Exit.isSuccess(exit)
      ? Console.log("Success")
      : Console.log("Failure")
  )
)

Reading and Writing

Reading

import { Channel } from "effect"

// Read single input
const reader = Channel.readWith({
  onInput: (value) => Channel.succeed(value),
  onFailure: (error) => Channel.fail(error),
  onDone: (done) => Channel.succeed(done)
})

// Read or fail if no input
const readOrFail = Channel.readOrFail("No input")

Writing

import { Channel, Chunk } from "effect"

// Write single value
const writer = Channel.write(42)

// Write multiple values
const multiWriter = Channel.writeAll(1, 2, 3)

// Write chunk
const chunkWriter = Channel.writeChunk(
  Chunk.make(1, 2, 3)
)

Buffering

import { Channel, Chunk, Ref } from "effect"

// Buffer input elements
const buffered = Effect.gen(function*() {
  const ref = yield* Ref.make(Chunk.empty<number>())
  
  return Channel.buffer({
    empty: Chunk.empty<number>(),
    isEmpty: Chunk.isEmpty,
    ref
  })
})

// Buffer chunks
const chunkBuffered = Effect.gen(function*() {
  const ref = yield* Ref.make(Chunk.empty<number>())
  return Channel.bufferChunk(ref)
})

Interruption

import { Channel, Deferred, Effect } from "effect"

const channel = Channel.writeAll(1, 2, 3, 4, 5)

// Interrupt when effect completes
const interruptible = channel.pipe(
  Channel.interruptWhen(
    Effect.sleep("1 second")
  )
)

// Interrupt when deferred completes
const withDeferred = Effect.gen(function*() {
  const deferred = yield* Deferred.make<void>()
  
  return channel.pipe(
    Channel.interruptWhenDeferred(deferred)
  )
})

Channel to Stream/Sink

import { Channel, Chunk, Sink, Stream } from "effect"

// Channel that writes values (like a stream)
const streamLike = Channel.writeAll(1, 2, 3, 4, 5)

// Convert to stream
const stream = Stream.fromChannel(streamLike)

// Channel that consumes values (like a sink)
const sinkLike = Channel.foldChannel({
  onFailure: (e) => Channel.fail(e),
  onSuccess: (chunk: Chunk.Chunk<number>) =>
    Channel.succeed(
      Chunk.reduce(chunk, 0, (a, b) => a + b)
    )
})

// Convert to sink
const sink = Sink.fromChannel(sinkLike)

Advanced Patterns

Custom Merging Strategy

import { Channel, ChildExecutorDecision, UpstreamPullStrategy } from "effect"

const custom = Channel.concatMapWithCustom(
  channel,
  (elem) => Channel.write(elem),
  (a, b) => a + b,
  (a, final) => a + final,
  (upstreamPullRequest) =>
    UpstreamPullStrategy.PullAfterNext(Option.none()),
  (elem) => ChildExecutorDecision.Continue
)

Embedding Input

import { Channel, Effect, SingleProducerAsyncInput } from "effect"

const withInput = Effect.gen(function*() {
  const input = yield* SingleProducerAsyncInput.make<
    string,
    number,
    void
  >()
  
  return Channel.fromInput(input).pipe(
    Channel.embedInput(input)
  )
})

Collecting Output

import { Channel } from "effect"

const channel = Channel.writeAll(1, 2, 3)

// Collect all outputs with done value
const collected = channel.pipe(
  Channel.doneCollect
)

// Emit collected output as single value
const emitted = channel.pipe(
  Channel.emitCollect
)

Integration with Effect

import { Channel, Context, Effect } from "effect"

// Access context
const withContext = Channel.context<ConfigService>()

// Use context value
const withService = Channel.contextWith(
  (ctx: Context.Context<ConfigService>) =>
    Context.get(ctx, ConfigService)
)

// Channel from effect
const effectChannel = Channel.contextWithEffect(
  (ctx: Context.Context<ConfigService>) =>
    Effect.succeed(Context.get(ctx, ConfigService))
)

Performance Considerations

  1. Use channels directly only for advanced use cases - Prefer Stream and Sink for most scenarios
  2. Chunk-based operations - Work with chunks rather than individual elements for better performance
  3. Resource management - Always use proper acquire-release patterns
  4. Buffering - Use buffering to improve throughput when dealing with bursty data

See Also

  • Stream - High-level streaming API built on channels
  • Sink - Stream consumers built on channels
  • Effect - Core effect type

Build docs developers (and LLMs) love