Skip to main content
The Apache Beam TypeScript SDK brings modern JavaScript/TypeScript development to data processing pipelines, with a focus on schema-first data processing and cross-language capabilities.

Installation

1

Install Node.js

Ensure you have Node.js 16 or later installed:
node --version
2

Create a new project

mkdir my-beam-pipeline
cd my-beam-pipeline
npm init -y
3

Install Apache Beam

npm install apache-beam
4

Install TypeScript (recommended)

npm install --save-dev typescript @types/node
npx tsc --init
5

Install Python and Java (for cross-language transforms)

The TypeScript SDK leverages cross-language transforms extensively. Install:
  • Python 3.8+ for Python transforms
  • Java 8+ for Java transforms

Quick Start

Here’s a simple word count example:
import * as beam from "apache-beam";
import { createRunner } from "apache-beam/runners";
import { countPerElement } from "apache-beam/transforms/group_and_combine";

function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
  return lines
    .map((s: string) => s.toLowerCase())
    .flatMap(function* (line: string) {
      yield* line.split(/[^a-z]+/);
    })
    .apply(countPerElement());
}

async function main() {
  await createRunner().run((root) => {
    const lines = root.apply(
      beam.create([
        "To be or not to be that is the question",
        "Whether tis nobler in the mind to suffer",
        "The slings and arrows of outrageous fortune",
      ])
    );

    const counts = lines.apply(wordCount);
    counts.map(console.log);
  });
}

main()
  .catch((e) => console.error(e))
  .finally(() => process.exit());
Build and run:
npx tsc
node dist/wordcount.js

Core Concepts

Root and Pipeline

Pipelines are built using a Root PValue:
import * as beam from "apache-beam";
import { createRunner } from "apache-beam/runners";

async function main() {
  const runner = createRunner();
  
  await runner.run((root) => {
    // Build your pipeline starting from root
    const data = root.apply(beam.create([1, 2, 3, 4, 5]));
    
    // Apply transforms
    data.map((x) => x * 2).map(console.log);
  });
}

PCollection

PCollections represent distributed datasets:
// Create from in-memory data
const numbers = root.apply(beam.create([1, 2, 3, 4, 5]));

// PCollections support method chaining
const result = numbers
  .map((x) => x * 2)
  .filter((x) => x > 5);

Transforms

Apply transforms using .apply() or convenience methods:
import * as beam from "apache-beam";

// Using convenience methods
const doubled = numbers.map((x) => x * 2);
const words = lines.flatMap((line) => line.split(" "));
const filtered = data.filter((x) => x > 10);

// Using apply with PTransform
const counted = words.apply(countPerElement());

// Using apply with a function
const custom = data.apply((pcoll) => {
  return pcoll.map((x) => x * 2).filter((x) => x > 5);
});

TypeScript-Specific Features

Schema-First Approach

The TypeScript SDK emphasizes working with structured data:
import * as beam from "apache-beam";

interface User {
  name: string;
  age: number;
  email: string;
}

const users = root.apply(
  beam.create<User>([
    { name: "Alice", age: 30, email: "[email protected]" },
    { name: "Bob", age: 25, email: "[email protected]" },
  ])
);

// Work with structured data naturally
const emails = users.map((user) => user.email);
const adults = users.filter((user) => user.age >= 18);

Map and FlatMap

Use familiar array-like operations:
// Map: 1-to-1 transformation
const upper = words.map((word) => word.toUpperCase());

// FlatMap: 1-to-many using generators
const chars = words.flatMap(function* (word) {
  for (const char of word) {
    yield char;
  }
});

// FlatMap: Return arrays
const split = lines.flatMap((line) => line.split(" "));

Async/Await Support

Work with asynchronous operations:
import * as beam from "apache-beam";

// Async map operations
const enriched = await data.asyncMap(async (element) => {
  const result = await fetchFromAPI(element);
  return result;
});

// Async pipeline execution
await runner.run(async (root) => {
  const data = root.apply(beam.create([1, 2, 3]));
  data.map(console.log);
});

Context Parameters

Access element metadata and side inputs:
import { withTimestamp, withWindow } from "apache-beam";

// Access timestamp
const withTime = data.map(
  (element, context) => {
    return {
      element,
      timestamp: context.timestamp,
    };
  },
  { timestamp: withTimestamp() }
);

// Access window
const withWin = data.map(
  (element, context) => {
    return {
      element,
      window: context.window,
    };
  },
  { window: withWindow() }
);

Working with Multiple Outputs

Split PCollections based on element properties:
import { Split } from "apache-beam/transforms";

interface Result {
  valid?: string;
  invalid?: string;
}

// Process and tag elements
const tagged = data.map((element): Result => {
  if (isValid(element)) {
    return { valid: element };
  } else {
    return { invalid: element };
  }
});

// Split into separate PCollections
const { valid, invalid } = tagged.apply(new Split());

Grouping and Combining

Group By Key

import { groupBy } from "apache-beam/transforms";

interface Event {
  userId: string;
  action: string;
  timestamp: number;
}

const events = root.apply(beam.create<Event>([...]));

// Group by a field
const byUser = events.apply(
  groupBy((event) => event.userId)
);

Aggregations

import {
  countPerElement,
  sum,
  mean,
  combine,
} from "apache-beam/transforms/group_and_combine";

// Count occurrences
const wordCounts = words.apply(countPerElement());

// Sum values by key
interface Sale {
  product: string;
  amount: number;
}

const totals = sales.apply(
  groupBy((sale) => sale.product)
).apply(
  combine((sales) => sales.reduce((sum, s) => sum + s.amount, 0))
);

Cross-Language Transforms

Use transforms from Python and Java SDKs:
import * as external from "apache-beam/transforms/external";

// Use Python's ReadFromBigQuery
const bigQueryData = root.apply(
  external.pythonTransform(
    "apache_beam.io.ReadFromBigQuery",
    {
      query: "SELECT * FROM dataset.table LIMIT 100",
      use_standard_sql: true,
    }
  )
);

// Use Java transforms
const kafkaData = root.apply(
  external.javaTransform(
    "org.apache.beam.sdk.io.kafka.KafkaIO.Read",
    {
      bootstrapServers: "localhost:9092",
      topics: ["my-topic"],
    }
  )
);

I/O Operations

Reading Files

import { readFromText } from "apache-beam/io";

// Read text files
const lines = root.apply(
  readFromText("gs://bucket/path/*.txt")
);

// Read with custom parsing
const parsed = lines.map((line) => JSON.parse(line));

Writing Files

import { writeToText } from "apache-beam/io";

// Write to text files
data.apply(
  writeToText("output/results.txt")
);

// Format before writing
results
  .map((item) => JSON.stringify(item))
  .apply(writeToText("output/data.jsonl"));

BigQuery (via Python)

import * as external from "apache-beam/transforms/external";

// Read from BigQuery
const rows = root.apply(
  external.pythonTransform(
    "apache_beam.io.ReadFromBigQuery",
    {
      table: "project:dataset.table",
    }
  )
);

// Write to BigQuery
data.apply(
  external.pythonTransform(
    "apache_beam.io.WriteToBigQuery",
    {
      table: "project:dataset.output_table",
      schema: {
        fields: [
          { name: "field1", type: "STRING" },
          { name: "field2", type: "INTEGER" },
        ],
      },
    }
  )
);

Running Pipelines

Direct Runner (Local)

node dist/main.js --runner=direct
Flink infrastructure is automatically downloaded:
node dist/main.js --runner=flink

Dataflow Runner

node dist/main.js \
  --runner=dataflow \
  --project=YOUR_PROJECT_ID \
  --region=us-central1 \
  --tempLocation=gs://YOUR_BUCKET/temp

Configuring Runners

import { createRunner } from "apache-beam/runners";

const runner = createRunner({
  runner: "flink",
  flinkMaster: "localhost:8081",
});

await runner.run((root) => {
  // Build pipeline
});

Windowing

Process streaming data with time windows:
import { FixedWindows, SlidingWindows } from "apache-beam/transforms/window";
import { assignWindows } from "apache-beam";

// Fixed windows
const windowed = data.apply(
  assignWindows(FixedWindows({ durationSecs: 60 }))
);

// Sliding windows
const sliding = data.apply(
  assignWindows(
    SlidingWindows({
      periodSecs: 30,
      durationSecs: 60,
    })
  )
);

Best Practices

Define interfaces for your data:
interface LogEntry {
  timestamp: number;
  level: string;
  message: string;
  userId?: string;
}

const logs = root.apply(beam.create<LogEntry>([...]));

// TypeScript catches errors at compile time
const errors = logs.filter((log) => log.level === "ERROR");
Use mature I/O connectors from Python and Java:
import * as external from "apache-beam/transforms/external";

// Use Python's extensive I/O library
const kafkaData = root.apply(
  external.pythonTransform(
    "apache_beam.io.ReadFromKafka",
    {
      consumer_config: {
        "bootstrap.servers": "localhost:9092",
      },
      topics: ["events"],
    }
  )
);
Generators provide clean syntax for emitting multiple elements:
const exploded = data.flatMap(function* (element) {
  for (const item of element.items) {
    yield item;
  }
});
Use async/await for I/O operations:
const enriched = await data.asyncMap(async (element) => {
  try {
    const details = await fetchDetails(element.id);
    return { ...element, ...details };
  } catch (error) {
    console.error(`Failed to enrich ${element.id}:`, error);
    return element;
  }
});

Composite Transforms

Create reusable transform functions:
import * as beam from "apache-beam";
import { countPerElement } from "apache-beam/transforms/group_and_combine";

function extractAndCountWords(
  lines: beam.PCollection<string>
): beam.PCollection<{ word: string; count: number }> {
  return lines
    .flatMap((line) => line.toLowerCase().split(/\W+/))
    .filter((word) => word.length > 0)
    .apply(countPerElement())
    .map(({ element, count }) => ({
      word: element,
      count: count,
    }));
}

// Use the composite transform
const wordCounts = lines.apply(extractAndCountWords);

Testing

Test your transforms:
import * as beam from "apache-beam";
import { createRunner } from "apache-beam/runners";
import { expect } from "chai";

describe("Word Count Transform", () => {
  it("counts words correctly", async () => {
    const runner = createRunner({ runner: "direct" });
    const results: any[] = [];

    await runner.run((root) => {
      const input = root.apply(
        beam.create(["hello world", "hello beam"])
      );

      const counts = input.apply(wordCount);
      counts.map((x) => results.push(x));
    });

    expect(results).to.deep.include({ element: "hello", count: 2 });
    expect(results).to.deep.include({ element: "world", count: 1 });
    expect(results).to.deep.include({ element: "beam", count: 1 });
  });
});

Starter Project

Clone the official starter project:
git clone https://github.com/apache/beam-starter-typescript.git
cd beam-starter-typescript
npm install
npm run build
node dist/index.js

Resources

TypeScript SDK Docs

Official TypeScript SDK documentation

Starter Project

Template project to get started quickly

Code Examples

Sample pipelines and patterns

API Reference

Detailed API documentation

Next Steps

Build docs developers (and LLMs) love