Skip to main content
Sink pipes export query results to external systems. The SDK supports Kafka and S3 sinks for publishing data to message queues or object storage.

What are Sink Pipes?

Sink pipes execute a query and publish the results to an external system:
  • Kafka sinks - Publish records to Kafka topics
  • S3 sinks - Write files to S3 buckets
Use sink pipes when:
  • You need to send data to downstream systems
  • You’re building event-driven architectures
  • You want to archive data to object storage
  • You need to integrate with external services

Kafka Sink Pipes

Publish query results to Kafka topics:

Basic Kafka Sink

import { defineSinkPipe, defineKafkaConnection, node, secret } from "@tinybirdco/sdk";

// Define Kafka connection
export const eventsKafka = defineKafkaConnection("events_kafka", {
  bootstrapServers: "kafka.example.com:9092",
  securityProtocol: "SASL_SSL",
  saslMechanism: "PLAIN",
  key: secret("KAFKA_KEY"),
  secret: secret("KAFKA_SECRET"),
});

// Create Kafka sink pipe
export const kafkaEventsSink = defineSinkPipe("kafka_events_sink", {
  description: "Export events to Kafka",
  sink: {
    connection: eventsKafka,
    topic: "events_export",
    schedule: "@on-demand",
  },
  nodes: [
    node({
      name: "publish",
      sql: `
        SELECT timestamp, event_type, user_id, payload
        FROM events
        WHERE timestamp >= now() - interval 1 hour
      `,
    }),
  ],
});

Scheduled Kafka Export

Run on a schedule to continuously export data:
export const hourlyEventsToKafka = defineSinkPipe("hourly_events_to_kafka", {
  description: "Export hourly events to Kafka",
  sink: {
    connection: eventsKafka,
    topic: "hourly_events",
    schedule: "0 * * * *", // Every hour
  },
  nodes: [
    node({
      name: "hourly_export",
      sql: `
        SELECT
          toStartOfHour(timestamp) AS hour,
          event_type,
          count() AS event_count,
          uniqExact(user_id) AS unique_users
        FROM events
        WHERE timestamp >= toStartOfHour(now()) - interval 1 hour
          AND timestamp < toStartOfHour(now())
        GROUP BY hour, event_type
      `,
    }),
  ],
});

S3 Sink Pipes

Export query results to S3 buckets as files:

Basic S3 Sink

import { defineSinkPipe, defineS3Connection, node } from "@tinybirdco/sdk";

// Define S3 connection
export const landingS3 = defineS3Connection("landing_s3", {
  region: "us-east-1",
  arn: "arn:aws:iam::123456789012:role/tinybird-s3-access",
});

// Create S3 sink pipe
export const s3EventsSink = defineSinkPipe("s3_events_sink", {
  description: "Export events to S3",
  sink: {
    connection: landingS3,
    bucketUri: "s3://my-bucket/exports/",
    fileTemplate: "events_{date}",
    format: "csv",
    schedule: "@once",
    strategy: "create_new",
  },
  nodes: [
    node({
      name: "export",
      sql: `
        SELECT timestamp, session_id, event_type
        FROM events
        WHERE toDate(timestamp) = today() - 1
      `,
    }),
  ],
});

S3 Sink Options

Export Strategies

Control how files are written:
// Create new files on each run
strategy: "create_new"

// Replace destination data
strategy: "replace"

File Formats

Supported export formats:
// CSV format
format: "csv"

// NDJSON (newline-delimited JSON)
format: "ndjson"

Compression

Compress exported files:
// No compression (default)
compression: "none"

// Gzip compression
compression: "gzip"

// Snappy compression
compression: "snappy"

Complete S3 Example

Daily export with compression:
import { defineSinkPipe, defineS3Connection, node } from "@tinybirdco/sdk";

export const dataWarehouse = defineS3Connection("data_warehouse", {
  region: "us-west-2",
  arn: "arn:aws:iam::123456789012:role/tinybird-warehouse-access",
});

export const dailyExport = defineSinkPipe("daily_export", {
  description: "Daily export of aggregated events to data warehouse",
  sink: {
    connection: dataWarehouse,
    bucketUri: "s3://data-warehouse/tinybird/daily/",
    fileTemplate: "events_{date}.csv",
    format: "csv",
    schedule: "0 1 * * *", // Daily at 1 AM
    strategy: "create_new",
    compression: "gzip",
  },
  nodes: [
    node({
      name: "daily_aggregation",
      sql: `
        SELECT
          toDate(timestamp) AS date,
          event_type,
          count() AS event_count,
          uniqExact(user_id) AS unique_users,
          avg(duration) AS avg_duration
        FROM events
        WHERE toDate(timestamp) = yesterday()
        GROUP BY date, event_type
        ORDER BY event_count DESC
      `,
    }),
  ],
});

Sink Pipe Schedules

Define when sink pipes run:

Special Schedules

// Manual execution only
schedule: "@on-demand"

// Run once immediately
schedule: "@once"

Cron Expressions

// Every hour
schedule: "0 * * * *"

// Daily at midnight
schedule: "0 0 * * *"

// Every 15 minutes
schedule: "*/15 * * * *"

// Weekdays at 6 AM
schedule: "0 6 * * 1-5"

Setting Up Connections

Kafka Connection

import { defineKafkaConnection, secret } from "@tinybirdco/sdk";

export const kafka = defineKafkaConnection("kafka_prod", {
  bootstrapServers: "kafka-1.example.com:9092,kafka-2.example.com:9092",
  securityProtocol: "SASL_SSL",
  saslMechanism: "PLAIN",
  key: secret("KAFKA_KEY"),
  secret: secret("KAFKA_SECRET"),
});
Authentication methods:
  • SASL_SSL with PLAIN mechanism (most common)
  • SASL_SSL with SCRAM-SHA-256 or SCRAM-SHA-512
  • Custom authentication via Kafka connection settings

S3 Connection

import { defineS3Connection } from "@tinybirdco/sdk";

export const s3 = defineS3Connection("s3_prod", {
  region: "us-east-1",
  arn: "arn:aws:iam::123456789012:role/tinybird-s3-access",
});
Requirements:
  • IAM role with appropriate S3 permissions
  • Trust relationship allowing Tinybird to assume the role
  • PutObject permission for exports

Complete Examples

Real-time Event Streaming

Stream events to Kafka for downstream processing:
import { defineSinkPipe, defineKafkaConnection, node, secret } from "@tinybirdco/sdk";

export const streamingKafka = defineKafkaConnection("streaming_kafka", {
  bootstrapServers: "kafka.streaming.example.com:9092",
  securityProtocol: "SASL_SSL",
  saslMechanism: "PLAIN",
  key: secret("STREAMING_KAFKA_KEY"),
  secret: secret("STREAMING_KAFKA_SECRET"),
});

export const realtimeEventStream = defineSinkPipe("realtime_event_stream", {
  description: "Stream high-value events to Kafka in real-time",
  sink: {
    connection: streamingKafka,
    topic: "high_value_events",
    schedule: "*/5 * * * *", // Every 5 minutes
  },
  nodes: [
    node({
      name: "high_value_events",
      sql: `
        SELECT
          timestamp,
          user_id,
          event_type,
          amount,
          metadata
        FROM events
        WHERE timestamp >= now() - interval 5 minute
          AND amount > 1000
        ORDER BY timestamp
      `,
    }),
  ],
});

Data Lake Export

Export partitioned data to S3 for analytics:
import { defineSinkPipe, defineS3Connection, node } from "@tinybirdco/sdk";

export const dataLake = defineS3Connection("data_lake", {
  region: "us-west-2",
  arn: "arn:aws:iam::123456789012:role/data-lake-writer",
});

export const dailyPartitionExport = defineSinkPipe("daily_partition_export", {
  description: "Export daily partitions to data lake",
  sink: {
    connection: dataLake,
    bucketUri: "s3://data-lake/events/year={year}/month={month}/day={day}/",
    fileTemplate: "events_{date}_{time}",
    format: "ndjson",
    schedule: "0 2 * * *", // 2 AM daily
    strategy: "create_new",
    compression: "gzip",
  },
  nodes: [
    node({
      name: "partition_export",
      sql: `
        SELECT *
        FROM events
        WHERE toDate(timestamp) = yesterday()
        ORDER BY timestamp
      `,
    }),
  ],
});

Best Practices

1

Use appropriate schedules

Choose schedules based on your data freshness requirements. Use @on-demand for testing.
2

Include timestamps in exports

Always include timestamp columns to track when data was generated.
3

Use compression for large exports

Enable gzip compression for S3 exports to reduce storage costs and transfer time.
4

Monitor sink pipe execution

Check the Tinybird dashboard regularly to ensure sink pipes are running successfully.
5

Handle failures gracefully

Design downstream systems to handle duplicate or missing data in case of sink pipe failures.

Next Steps

Copy Pipes

Learn about internal data snapshots

Type-Safe Client

Query and ingest data with the Tinybird client

Build docs developers (and LLMs) love