Skip to main content
Connection definitions allow you to configure external data sources and sinks for Tinybird datasources and pipes.

defineKafkaConnection

function defineKafkaConnection(
  name: string,
  options: KafkaConnectionOptions
): KafkaConnectionDefinition
Define a Kafka connection for streaming data ingestion or publishing.

Parameters

name
string
required
The connection name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
KafkaConnectionOptions
required

Usage Example

import { defineKafkaConnection, secret } from '@tinybirdco/sdk'

export const eventsKafka = defineKafkaConnection('events_kafka', {
  bootstrapServers: 'kafka.example.com:9092',
  securityProtocol: 'SASL_SSL',
  saslMechanism: 'PLAIN',
  key: secret('KAFKA_KEY'),
  secret: secret('KAFKA_SECRET'),
})

defineS3Connection

function defineS3Connection(
  name: string,
  options: S3ConnectionOptions
): S3ConnectionDefinition
Define an S3 connection for importing data from or exporting data to AWS S3.

Parameters

name
string
required
The connection name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
S3ConnectionOptions
required

Usage Examples

import { defineS3Connection } from '@tinybirdco/sdk'

export const landingS3 = defineS3Connection('landing_s3', {
  region: 'us-east-1',
  arn: 'arn:aws:iam::123456789012:role/tinybird-s3-access',
})

With Access Keys

import { defineS3Connection, secret } from '@tinybirdco/sdk'

export const backupS3 = defineS3Connection('backup_s3', {
  region: 'us-west-2',
  accessKey: secret('S3_ACCESS_KEY'),
  secret: secret('S3_SECRET_KEY'),
})

defineGCSConnection

function defineGCSConnection(
  name: string,
  options: GCSConnectionOptions
): GCSConnectionDefinition
Define a Google Cloud Storage connection for importing data from GCS buckets.

Parameters

name
string
required
The connection name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
GCSConnectionOptions
required

Usage Example

import { defineGCSConnection, secret } from '@tinybirdco/sdk'

export const landingGCS = defineGCSConnection('landing_gcs', {
  serviceAccountCredentialsJson: secret('GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON'),
})

Using Connections

In Datasources (Ingestion)

import { defineDatasource, t, engine } from '@tinybirdco/sdk'
import { eventsKafka, landingS3, landingGCS } from './connections'

// Kafka ingestion
export const kafkaEvents = defineDatasource('kafka_events', {
  schema: {
    timestamp: t.dateTime(),
    payload: t.string(),
  },
  engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
  kafka: {
    connection: eventsKafka,
    topic: 'events',
    groupId: 'events-consumer',
    autoOffsetReset: 'earliest',
  },
})

// S3 import
export const s3Landing = defineDatasource('s3_landing', {
  schema: {
    timestamp: t.dateTime(),
    session_id: t.string(),
  },
  engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
  s3: {
    connection: landingS3,
    bucketUri: 's3://my-bucket/events/*.csv',
    schedule: '@auto',
  },
})

// GCS import
export const gcsLanding = defineDatasource('gcs_landing', {
  schema: {
    timestamp: t.dateTime(),
    session_id: t.string(),
  },
  engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
  gcs: {
    connection: landingGCS,
    bucketUri: 'gs://my-gcs-bucket/events/*.csv',
    schedule: '@auto',
  },
})

In Sink Pipes (Export)

import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { eventsKafka, landingS3 } from './connections'

// Kafka sink
export const kafkaEventsSink = defineSinkPipe('kafka_events_sink', {
  sink: {
    connection: eventsKafka,
    topic: 'events_export',
    schedule: '@on-demand',
  },
  nodes: [
    node({
      name: 'publish',
      sql: 'SELECT timestamp, payload FROM kafka_events',
    }),
  ],
})

// S3 sink
export const s3EventsSink = defineSinkPipe('s3_events_sink', {
  sink: {
    connection: landingS3,
    bucketUri: 's3://my-bucket/exports/',
    fileTemplate: 'events_{date}',
    format: 'csv',
    schedule: '@once',
    strategy: 'create_new',
    compression: 'gzip',
  },
  nodes: [
    node({
      name: 'export',
      sql: 'SELECT timestamp, session_id FROM s3_landing',
    }),
  ],
})

Secret Helper

Use the secret() helper to securely reference environment variables:
import { secret } from '@tinybirdco/sdk'

// Reference a secret
const apiKey = secret('API_KEY')
// Produces: {{ tb_secret("API_KEY") }}

// With default value
const dbHost = secret('DB_HOST', 'localhost')
// Produces: {{ tb_secret("DB_HOST", "localhost") }}
Secrets are stored in Tinybird’s secret management system and referenced at runtime.

defineDatasource

Create datasources with connections

defineSinkPipe

Export data to external systems

secret

Secure secret references

Build docs developers (and LLMs) love