Function Signature
function defineSinkPipe<TParams extends ParamsDefinition>(
name: string,
options: SinkPipeOptions<TParams>
): PipeDefinition<TParams, Record<string, never>>
Define a Tinybird sink pipe that exports query results to external systems via Kafka or S3. Sink pipes can run on a schedule or be executed on demand.
Parameters
The sink pipe name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
SinkPipeOptions<TParams>
required
Sink pipe configuration object.
sink
KafkaSinkConfig | S3SinkConfig
required
Sink export configuration. Must be either Kafka or S3.
nodes
readonly NodeDefinition[]
required
Array of SQL transformation nodes. At least one node is required.
Human-readable description of the sink pipe
Parameter definitions for query inputs using p.* validators
tokens
readonly PipeTokenConfig[]
Access tokens for this sink pipe
Return Type
PipeDefinition<TParams, Record<string, never>>
A pipe definition configured as a sink pipe. Note that sink pipes do not have an output schema since they export to external systems.
Kafka Sink Configuration
connection
KafkaConnectionDefinition
required
Kafka connection used to publish records
Sink schedule. Can be a cron expression (e.g., '0 * * * *'), '@on-demand', or '@once'
S3 Sink Configuration
connection
S3ConnectionDefinition
required
S3 connection used to write exported files
Destination bucket URI (e.g., 's3://bucket/prefix/')
Output filename template. Supports Tinybird placeholders like {date}, {time}, etc.
Output format (e.g., 'csv', 'ndjson', 'parquet')
Sink schedule. Can be a cron expression (e.g., '0 * * * *'), '@on-demand', or '@once'
strategy
'create_new' | 'replace'
default:"'create_new'"
Export strategy:
'create_new': Write new files on each run
'replace': Replace destination data on each run
compression
'none' | 'gzip' | 'snappy'
default:"'none'"
Compression codec for output files
Usage Examples
Kafka Sink
import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { eventsKafka } from './connections'
export const kafkaEventsSink = defineSinkPipe('kafka_events_sink', {
description: 'Export events to Kafka',
sink: {
connection: eventsKafka,
topic: 'events_export',
schedule: '@on-demand',
},
nodes: [
node({
name: 'publish',
sql: `
SELECT timestamp, event_name, user_id, properties
FROM events
WHERE timestamp >= now() - interval 1 hour
`,
}),
],
})
S3 Sink with CSV Export
import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { landingS3 } from './connections'
export const s3EventsSink = defineSinkPipe('s3_events_sink', {
description: 'Export events to S3 as CSV',
sink: {
connection: landingS3,
bucketUri: 's3://my-bucket/exports/',
fileTemplate: 'events_{date}_{time}',
format: 'csv',
schedule: '@once',
strategy: 'create_new',
compression: 'gzip',
},
nodes: [
node({
name: 'export',
sql: `
SELECT timestamp, event_name, user_id
FROM events
WHERE timestamp >= now() - interval 1 day
ORDER BY timestamp
`,
}),
],
})
Scheduled S3 Export
import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { backupS3 } from './connections'
export const dailyBackup = defineSinkPipe('daily_backup', {
description: 'Daily backup to S3',
sink: {
connection: backupS3,
bucketUri: 's3://backups/tinybird/',
fileTemplate: 'events_backup_{date}',
format: 'parquet',
schedule: '0 2 * * *', // Daily at 2 AM UTC
strategy: 'create_new',
compression: 'snappy',
},
nodes: [
node({
name: 'backup',
sql: `
SELECT *
FROM events
WHERE toDate(timestamp) = today() - 1
`,
}),
],
})
Multi-Node Kafka Sink
import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { alertsKafka } from './connections'
export const anomalyAlerts = defineSinkPipe('anomaly_alerts', {
description: 'Publish anomaly alerts to Kafka',
sink: {
connection: alertsKafka,
topic: 'alerts',
schedule: '*/5 * * * *', // Every 5 minutes
},
nodes: [
node({
name: 'metrics',
sql: `
SELECT
toStartOfMinute(timestamp) AS minute,
count() AS request_count
FROM requests
WHERE timestamp >= now() - interval 5 minute
GROUP BY minute
`,
}),
node({
name: 'anomalies',
sql: `
SELECT
minute,
request_count,
'high_traffic' AS alert_type
FROM metrics
WHERE request_count > 10000
`,
}),
],
})
S3 Sink with Partitioning
import { defineSinkPipe, node, p } from '@tinybirdco/sdk'
import { dataLakeS3 } from './connections'
export const partitionedExport = defineSinkPipe('partitioned_export', {
description: 'Export partitioned data to S3',
params: {
export_date: p.date(),
},
sink: {
connection: dataLakeS3,
bucketUri: 's3://datalake/events/year={year}/month={month}/day={day}/',
fileTemplate: 'events_{time}',
format: 'parquet',
schedule: '@on-demand',
strategy: 'create_new',
compression: 'snappy',
},
nodes: [
node({
name: 'export',
sql: `
SELECT *
FROM events
WHERE toDate(timestamp) = {{Date(export_date)}}
`,
}),
],
})
File Template Placeholders
S3 sink file templates support the following placeholders:
| Placeholder | Description | Example |
|---|
{date} | Current date (YYYY-MM-DD) | 2024-01-15 |
{time} | Current time (HH-MM-SS) | 14-30-00 |
{timestamp} | Unix timestamp | 1705329000 |
{year} | Current year | 2024 |
{month} | Current month (01-12) | 01 |
{day} | Current day (01-31) | 15 |
{hour} | Current hour (00-23) | 14 |
Schedule Options
- Cron expression:
'0 * * * *' (every hour), '*/15 * * * *' (every 15 minutes), etc.
@on-demand: Execute manually via CLI or API
@once: Execute once immediately after deployment
defineKafkaConnection
Create Kafka connections
defineS3Connection
Create S3 connections
definePipe
Create transformation pipes
node
Create transformation nodes