Skip to main content
Pipes are the heart of Tinybird’s data transformation layer. They execute SQL queries, transform data, and expose results through APIs. The TypeScript SDK provides type-safe pipe definitions with multiple pipe types for different use cases.

Pipe Types

The SDK supports five types of pipes:

Endpoints

API-exposed pipes that return query results over HTTP

Internal Pipes

Reusable SQL transformations (not exposed as APIs)

Materialized Views

Continuously updated aggregations

Copy Pipes

Scheduled or on-demand data snapshots

Sink Pipes

Export data to external systems (Kafka, S3)

Endpoints

Endpoints are pipes exposed as HTTP API endpoints. Use defineEndpoint() to create them:
import { defineEndpoint, node, t, p } from '@tinybirdco/sdk';

export const topEvents = defineEndpoint('top_events', {
  description: 'Get the most frequent events',
  params: {
    start_date: p.dateTime(),
    end_date: p.dateTime(),
    limit: p.int32().optional(10),
  },
  nodes: [
    node({
      name: 'aggregated',
      sql: `
        SELECT event_name, count() AS event_count
        FROM events
        WHERE timestamp >= {{DateTime(start_date)}}
          AND timestamp <= {{DateTime(end_date)}}
        GROUP BY event_name
        ORDER BY event_count DESC
        LIMIT {{Int32(limit, 10)}}
      `,
    }),
  ],
  output: {
    event_name: t.string(),
    event_count: t.uint64(),
  },
});
Location: ~/workspace/source/src/schema/pipe.ts:766-823

Query Parameters

Define parameters using the p.* validators:
import { p } from '@tinybirdco/sdk';

const params = {
  // Required parameters
  user_id: p.string(),
  start_date: p.dateTime(),
  
  // Optional with defaults
  limit: p.int32().optional(10),
  offset: p.int32().optional(0),
  
  // With descriptions
  status: p.string().optional('active').describe('Filter by status'),
  
  // Array parameters
  tags: p.array(p.string()),
};

SQL Templates

Use Tinybird’s SQL templating syntax to inject parameters:
-- Type casting with defaults
WHERE timestamp >= {{DateTime(start_date)}}
  AND limit = {{Int32(limit, 10)}}

-- Array parameters
WHERE id IN {{Array(ids, 'String')}}

-- Conditional logic
{% if defined(user_id) %}
  AND user_id = {{String(user_id)}}
{% end %}

Caching

Enable response caching for endpoints:
export const cachedEndpoint = defineEndpoint('cached', {
  params: { date: p.date() },
  nodes: [node({ name: 'data', sql: 'SELECT * FROM events' })],
  output: { id: t.string() },
  cache: {
    enabled: true,
    ttl: 300, // 5 minutes
  },
});

Internal Pipes

Internal pipes are reusable SQL transformations that are not exposed as API endpoints:
import { definePipe, node, p } from '@tinybirdco/sdk';

export const filteredEvents = definePipe('filtered_events', {
  description: 'Filter events by date range',
  params: {
    start_date: p.dateTime(),
    end_date: p.dateTime(),
  },
  nodes: [
    node({
      name: 'filtered',
      sql: `
        SELECT * FROM events
        WHERE timestamp >= {{DateTime(start_date)}}
          AND timestamp <= {{DateTime(end_date)}}
      `,
    }),
  ],
});
When to use: Breaking down complex queries into reusable components, intermediate transformations. Location: ~/workspace/source/src/schema/pipe.ts:555-615

Nodes

Nodes are individual SQL query steps within a pipe. Each node can reference previous nodes:
import { node } from '@tinybirdco/sdk';

const nodes = [
  node({
    name: 'filtered',
    description: 'Filter by date range',
    sql: `
      SELECT *
      FROM events
      WHERE timestamp >= {{DateTime(start_date)}}
    `,
  }),
  node({
    name: 'aggregated',
    sql: `
      SELECT
        event_type,
        count() as event_count,
        uniqExact(user_id) as unique_users
      FROM filtered  -- Reference previous node
      GROUP BY event_type
    `,
  }),
  node({
    name: 'top_events',
    sql: `
      SELECT *
      FROM aggregated
      ORDER BY event_count DESC
      LIMIT {{Int32(limit, 10)}}
    `,
  }),
];
Location: ~/workspace/source/src/schema/pipe.ts:56-92

Materialized Views

Materialized views continuously update aggregations as new data arrives:
import { defineDatasource, defineMaterializedView, node, t, engine } from '@tinybirdco/sdk';

// Target datasource for aggregated data
export const dailyStats = defineDatasource('daily_stats', {
  description: 'Daily aggregated statistics',
  schema: {
    date: t.date(),
    pathname: t.string(),
    views: t.simpleAggregateFunction('sum', t.uint64()),
    unique_sessions: t.aggregateFunction('uniq', t.string()),
  },
  engine: engine.aggregatingMergeTree({
    sortingKey: ['date', 'pathname'],
  }),
});

// Materialized view that populates the datasource
export const dailyStatsMv = defineMaterializedView('daily_stats_mv', {
  description: 'Materialize daily page view aggregations',
  datasource: dailyStats,
  nodes: [
    node({
      name: 'aggregate',
      sql: `
        SELECT
          toDate(timestamp) AS date,
          pathname,
          count() AS views,
          uniqState(session_id) AS unique_sessions
        FROM page_views
        GROUP BY date, pathname
      `,
    }),
  ],
});
Location: ~/workspace/source/src/schema/pipe.ts:690-763

Deployment Methods

By default, materialized views are recreated on deployment. Use alter to preserve data:
export const myMv = defineMaterializedView('my_mv', {
  datasource: targetDatasource,
  nodes: [/* ... */],
  deploymentMethod: 'alter', // Use ALTER TABLE instead of recreating
});
Location: ~/workspace/source/src/schema/pipe.ts:128-134

Copy Pipes

Copy pipes capture query results at a specific point in time and write them to a target datasource:
import { defineCopyPipe, defineDatasource, node, t, engine } from '@tinybirdco/sdk';

// Target datasource for snapshots
const dailySalesSnapshot = defineDatasource('daily_sales_snapshot', {
  schema: {
    snapshot_date: t.date(),
    country: t.string(),
    total_sales: t.uint64(),
  },
  engine: engine.mergeTree({
    sortingKey: ['snapshot_date', 'country'],
  }),
});

// Scheduled copy pipe
export const dailySnapshot = defineCopyPipe('daily_snapshot', {
  description: 'Daily snapshot of sales statistics',
  datasource: dailySalesSnapshot,
  copy_schedule: '0 0 * * *', // Daily at midnight UTC
  copy_mode: 'append',
  nodes: [
    node({
      name: 'snapshot',
      sql: `
        SELECT
          today() AS snapshot_date,
          country,
          sum(sales) AS total_sales
        FROM sales
        WHERE date = today() - 1
        GROUP BY country
      `,
    }),
  ],
});

// On-demand copy pipe
export const manualReport = defineCopyPipe('manual_report', {
  description: 'On-demand report generation',
  datasource: reportDatasource,
  copy_schedule: '@on-demand',
  copy_mode: 'replace', // Replace all data on each run
  nodes: [
    node({
      name: 'report',
      sql: `SELECT * FROM events WHERE timestamp >= now() - interval 7 day`,
    }),
  ],
});
Location: ~/workspace/source/src/schema/pipe.ts:825-904

Copy Modes

  • append (default): Appends results to the target datasource
  • replace: Replaces all data in the target datasource

Copy Schedules

  • Cron expression: '0 * * * *' (hourly), '0 0 * * *' (daily)
  • @on-demand: Manual execution only
  • @once: Run once on deployment

Sink Pipes

Sink pipes export query results to external systems:

Kafka Sink

import { defineSinkPipe, node } from '@tinybirdco/sdk';
import { eventsKafka } from './connections';

export const kafkaEventsSink = defineSinkPipe('kafka_events_sink', {
  sink: {
    connection: eventsKafka,
    topic: 'events_export',
    schedule: '@on-demand',
  },
  nodes: [
    node({
      name: 'publish',
      sql: `
        SELECT timestamp, payload
        FROM kafka_events
      `,
    }),
  ],
});
Location: ~/workspace/source/src/schema/pipe.ts:617-655

S3 Sink

import { defineSinkPipe, node } from '@tinybirdco/sdk';
import { landingS3 } from './connections';

export const s3EventsSink = defineSinkPipe('s3_events_sink', {
  sink: {
    connection: landingS3,
    bucketUri: 's3://my-bucket/exports/',
    fileTemplate: 'events_{date}',
    format: 'csv',
    schedule: '@once',
    strategy: 'create_new', // or 'replace'
    compression: 'gzip',    // or 'snappy', 'none'
  },
  nodes: [
    node({
      name: 'export',
      sql: `
        SELECT timestamp, session_id
        FROM s3_landing
      `,
    }),
  ],
});
Location: ~/workspace/source/src/schema/pipe.ts:617-655

Output Schema

Define the output schema for type-safe query results:
export const myEndpoint = defineEndpoint('my_endpoint', {
  params: { limit: p.int32() },
  nodes: [node({ name: 'data', sql: 'SELECT * FROM events' })],
  output: {
    timestamp: t.dateTime(),
    event_name: t.string(),
    count: t.uint64(),
    // Types must match your SQL output
  },
});
Note: The output schema is validated against the target datasource schema for materialized views and copy pipes. Location: ~/workspace/source/src/schema/pipe.ts:459-507

Access Control

Control pipe access with tokens:
import { defineToken, defineEndpoint } from '@tinybirdco/sdk';

const appToken = defineToken('app_read');

export const topEvents = defineEndpoint('top_events', {
  nodes: [node({ name: 'data', sql: 'SELECT * FROM events LIMIT 10' })],
  output: { timestamp: t.dateTime(), event_name: t.string() },
  tokens: [{ token: appToken, scope: 'READ' }],
});

Datasources

Learn about table definitions and schemas

Type Inference

Extract types from pipe definitions

Parameter Validators

Complete reference of p.* validators

Connections

External connections for sink pipes

Build docs developers (and LLMs) love