Skip to main content

Function Signature

function definePipe<
  TParams extends ParamsDefinition,
  TOutput extends OutputDefinition
>(
  name: string,
  options: PipeOptions<TParams, TOutput>
): PipeDefinition<TParams, TOutput>
Define a Tinybird pipe for reusable SQL transformations. Pipes can be internal (not exposed as API) or configured as endpoints, materialized views, or copy pipes.

Parameters

name
string
required
The pipe name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
PipeOptions<TParams, TOutput>
required
Pipe configuration object.

Return Type

PipeDefinition<TParams, TOutput>
A pipe definition that can be used in a project with full type inference.

ParamsDefinition

type ParamsDefinition = Record<string, AnyParamValidator>
Parameters are defined using validators from p.*.

OutputDefinition

type OutputDefinition = Record<string, AnyTypeValidator>
Output schema is defined using type validators from t.*.

NodeDefinition

Create nodes using the node() function:
node({
  name: string
  sql: string
  description?: string
})

Usage Examples

Internal Pipe (No API Endpoint)

import { definePipe, node, p } from '@tinybirdco/sdk'

export const filteredEvents = definePipe('filtered_events', {
  description: 'Filter events by date range',
  params: {
    start_date: p.dateTime(),
    end_date: p.dateTime(),
  },
  nodes: [
    node({
      name: 'filtered',
      sql: `
        SELECT * FROM events
        WHERE timestamp >= {{DateTime(start_date)}}
          AND timestamp <= {{DateTime(end_date)}}
      `,
    }),
  ],
})

Multi-Node Pipeline

import { definePipe, node, p, t } from '@tinybirdco/sdk'

export const topEvents = definePipe('top_events', {
  description: 'Get top events by count',
  params: {
    start_date: p.dateTime(),
    end_date: p.dateTime(),
    limit: p.int32().optional(10),
  },
  nodes: [
    node({
      name: 'filtered',
      sql: `
        SELECT *
        FROM events
        WHERE timestamp BETWEEN {{DateTime(start_date)}} AND {{DateTime(end_date)}}
      `,
    }),
    node({
      name: 'aggregated',
      sql: `
        SELECT
          event_type,
          count() as event_count,
          uniqExact(user_id) as unique_users
        FROM filtered
        GROUP BY event_type
        ORDER BY event_count DESC
        LIMIT {{Int32(limit, 10)}}
      `,
    }),
  ],
  output: {
    event_type: t.string(),
    event_count: t.uint64(),
    unique_users: t.uint64(),
  },
  endpoint: true,
})

With Caching

import { definePipe, node, t } from '@tinybirdco/sdk'

export const cachedStats = definePipe('cached_stats', {
  nodes: [
    node({
      name: 'stats',
      sql: 'SELECT count() as total FROM events',
    }),
  ],
  output: { total: t.uint64() },
  endpoint: {
    enabled: true,
    cache: {
      enabled: true,
      ttl: 300, // 5 minutes
    },
  },
})

Materialized View Pipe

import { definePipe, defineDatasource, node, t, engine } from '@tinybirdco/sdk'

const dailyStats = defineDatasource('daily_stats', {
  schema: {
    date: t.date(),
    pathname: t.string(),
    views: t.simpleAggregateFunction('sum', t.uint64()),
  },
  engine: engine.aggregatingMergeTree({
    sortingKey: ['date', 'pathname'],
  }),
})

export const dailyStatsMv = definePipe('daily_stats_mv', {
  nodes: [
    node({
      name: 'aggregate',
      sql: `
        SELECT
          toDate(timestamp) AS date,
          pathname,
          count() AS views
        FROM page_views
        GROUP BY date, pathname
      `,
    }),
  ],
  output: {
    date: t.date(),
    pathname: t.string(),
    views: t.uint64(),
  },
  materialized: {
    datasource: dailyStats,
  },
})

Type Inference

Use InferParams and InferOutputRow to extract TypeScript types:
import { type InferParams, type InferOutputRow } from '@tinybirdco/sdk'
import { topEvents } from './pipes'

type TopEventsParams = InferParams<typeof topEvents>
// { start_date: string, end_date: string, limit?: number }

type TopEventsOutput = InferOutputRow<typeof topEvents>
// { event_type: string, event_count: bigint, unique_users: bigint }

defineEndpoint

Simplified endpoint creation

defineMaterializedView

Simplified materialized view creation

defineCopyPipe

Simplified copy pipe creation

node

Create transformation nodes

Build docs developers (and LLMs) love