Skip to main content

Function Signature

function defineSinkPipe<TParams extends ParamsDefinition>(
  name: string,
  options: SinkPipeOptions<TParams>
): PipeDefinition<TParams, Record<string, never>>
Define a Tinybird sink pipe that exports query results to external systems via Kafka or S3. Sink pipes can run on a schedule or be executed on demand.

Parameters

name
string
required
The sink pipe name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
SinkPipeOptions<TParams>
required
Sink pipe configuration object.

Return Type

PipeDefinition<TParams, Record<string, never>>
A pipe definition configured as a sink pipe. Note that sink pipes do not have an output schema since they export to external systems.

Kafka Sink Configuration

sink
KafkaSinkConfig

S3 Sink Configuration

sink
S3SinkConfig

Usage Examples

Kafka Sink

import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { eventsKafka } from './connections'

export const kafkaEventsSink = defineSinkPipe('kafka_events_sink', {
  description: 'Export events to Kafka',
  sink: {
    connection: eventsKafka,
    topic: 'events_export',
    schedule: '@on-demand',
  },
  nodes: [
    node({
      name: 'publish',
      sql: `
        SELECT timestamp, event_name, user_id, properties
        FROM events
        WHERE timestamp >= now() - interval 1 hour
      `,
    }),
  ],
})

S3 Sink with CSV Export

import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { landingS3 } from './connections'

export const s3EventsSink = defineSinkPipe('s3_events_sink', {
  description: 'Export events to S3 as CSV',
  sink: {
    connection: landingS3,
    bucketUri: 's3://my-bucket/exports/',
    fileTemplate: 'events_{date}_{time}',
    format: 'csv',
    schedule: '@once',
    strategy: 'create_new',
    compression: 'gzip',
  },
  nodes: [
    node({
      name: 'export',
      sql: `
        SELECT timestamp, event_name, user_id
        FROM events
        WHERE timestamp >= now() - interval 1 day
        ORDER BY timestamp
      `,
    }),
  ],
})

Scheduled S3 Export

import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { backupS3 } from './connections'

export const dailyBackup = defineSinkPipe('daily_backup', {
  description: 'Daily backup to S3',
  sink: {
    connection: backupS3,
    bucketUri: 's3://backups/tinybird/',
    fileTemplate: 'events_backup_{date}',
    format: 'parquet',
    schedule: '0 2 * * *', // Daily at 2 AM UTC
    strategy: 'create_new',
    compression: 'snappy',
  },
  nodes: [
    node({
      name: 'backup',
      sql: `
        SELECT *
        FROM events
        WHERE toDate(timestamp) = today() - 1
      `,
    }),
  ],
})

Multi-Node Kafka Sink

import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { alertsKafka } from './connections'

export const anomalyAlerts = defineSinkPipe('anomaly_alerts', {
  description: 'Publish anomaly alerts to Kafka',
  sink: {
    connection: alertsKafka,
    topic: 'alerts',
    schedule: '*/5 * * * *', // Every 5 minutes
  },
  nodes: [
    node({
      name: 'metrics',
      sql: `
        SELECT
          toStartOfMinute(timestamp) AS minute,
          count() AS request_count
        FROM requests
        WHERE timestamp >= now() - interval 5 minute
        GROUP BY minute
      `,
    }),
    node({
      name: 'anomalies',
      sql: `
        SELECT
          minute,
          request_count,
          'high_traffic' AS alert_type
        FROM metrics
        WHERE request_count > 10000
      `,
    }),
  ],
})

S3 Sink with Partitioning

import { defineSinkPipe, node, p } from '@tinybirdco/sdk'
import { dataLakeS3 } from './connections'

export const partitionedExport = defineSinkPipe('partitioned_export', {
  description: 'Export partitioned data to S3',
  params: {
    export_date: p.date(),
  },
  sink: {
    connection: dataLakeS3,
    bucketUri: 's3://datalake/events/year={year}/month={month}/day={day}/',
    fileTemplate: 'events_{time}',
    format: 'parquet',
    schedule: '@on-demand',
    strategy: 'create_new',
    compression: 'snappy',
  },
  nodes: [
    node({
      name: 'export',
      sql: `
        SELECT *
        FROM events
        WHERE toDate(timestamp) = {{Date(export_date)}}
      `,
    }),
  ],
})

File Template Placeholders

S3 sink file templates support the following placeholders:
PlaceholderDescriptionExample
{date}Current date (YYYY-MM-DD)2024-01-15
{time}Current time (HH-MM-SS)14-30-00
{timestamp}Unix timestamp1705329000
{year}Current year2024
{month}Current month (01-12)01
{day}Current day (01-31)15
{hour}Current hour (00-23)14

Schedule Options

  • Cron expression: '0 * * * *' (every hour), '*/15 * * * *' (every 15 minutes), etc.
  • @on-demand: Execute manually via CLI or API
  • @once: Execute once immediately after deployment

defineKafkaConnection

Create Kafka connections

defineS3Connection

Create S3 connections

definePipe

Create transformation pipes

node

Create transformation nodes

Build docs developers (and LLMs) love