Connection definitions allow you to configure external data sources and sinks for Tinybird datasources and pipes.
defineKafkaConnection
function defineKafkaConnection(
name: string,
options: KafkaConnectionOptions
): KafkaConnectionDefinition
Define a Kafka connection for streaming data ingestion or publishing.
Parameters
The connection name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
KafkaConnectionOptions
required
Kafka bootstrap servers (host:port)
securityProtocol
'SASL_SSL' | 'PLAINTEXT' | 'SASL_PLAINTEXT'
default:"'SASL_SSL'"
Security protocol for Kafka connection
saslMechanism
'PLAIN' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'
SASL mechanism for authentication
Kafka key/username. Can use secret() helper for secure storage.
Kafka secret/password. Can use secret() helper for secure storage.
Schema Registry URL (optionally with embedded auth credentials)
SSL CA certificate PEM for private CA certificates
Usage Example
import { defineKafkaConnection, secret } from '@tinybirdco/sdk'
export const eventsKafka = defineKafkaConnection('events_kafka', {
bootstrapServers: 'kafka.example.com:9092',
securityProtocol: 'SASL_SSL',
saslMechanism: 'PLAIN',
key: secret('KAFKA_KEY'),
secret: secret('KAFKA_SECRET'),
})
defineS3Connection
function defineS3Connection(
name: string,
options: S3ConnectionOptions
): S3ConnectionDefinition
Define an S3 connection for importing data from or exporting data to AWS S3.
Parameters
The connection name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
S3ConnectionOptions
required
S3 bucket region (e.g., 'us-east-1')
IAM role ARN used by Tinybird to access the bucket. Required if not using access key/secret.
S3 access key for key/secret authentication. Must be used with secret.
S3 secret key for key/secret authentication. Must be used with accessKey.
Usage Examples
With IAM Role (Recommended)
import { defineS3Connection } from '@tinybirdco/sdk'
export const landingS3 = defineS3Connection('landing_s3', {
region: 'us-east-1',
arn: 'arn:aws:iam::123456789012:role/tinybird-s3-access',
})
With Access Keys
import { defineS3Connection, secret } from '@tinybirdco/sdk'
export const backupS3 = defineS3Connection('backup_s3', {
region: 'us-west-2',
accessKey: secret('S3_ACCESS_KEY'),
secret: secret('S3_SECRET_KEY'),
})
defineGCSConnection
function defineGCSConnection(
name: string,
options: GCSConnectionOptions
): GCSConnectionDefinition
Define a Google Cloud Storage connection for importing data from GCS buckets.
Parameters
The connection name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
GCSConnectionOptions
required
serviceAccountCredentialsJson
Service account credentials JSON. Use secret() helper to store securely.
Usage Example
import { defineGCSConnection, secret } from '@tinybirdco/sdk'
export const landingGCS = defineGCSConnection('landing_gcs', {
serviceAccountCredentialsJson: secret('GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON'),
})
Using Connections
In Datasources (Ingestion)
import { defineDatasource, t, engine } from '@tinybirdco/sdk'
import { eventsKafka, landingS3, landingGCS } from './connections'
// Kafka ingestion
export const kafkaEvents = defineDatasource('kafka_events', {
schema: {
timestamp: t.dateTime(),
payload: t.string(),
},
engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
kafka: {
connection: eventsKafka,
topic: 'events',
groupId: 'events-consumer',
autoOffsetReset: 'earliest',
},
})
// S3 import
export const s3Landing = defineDatasource('s3_landing', {
schema: {
timestamp: t.dateTime(),
session_id: t.string(),
},
engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
s3: {
connection: landingS3,
bucketUri: 's3://my-bucket/events/*.csv',
schedule: '@auto',
},
})
// GCS import
export const gcsLanding = defineDatasource('gcs_landing', {
schema: {
timestamp: t.dateTime(),
session_id: t.string(),
},
engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
gcs: {
connection: landingGCS,
bucketUri: 'gs://my-gcs-bucket/events/*.csv',
schedule: '@auto',
},
})
In Sink Pipes (Export)
import { defineSinkPipe, node } from '@tinybirdco/sdk'
import { eventsKafka, landingS3 } from './connections'
// Kafka sink
export const kafkaEventsSink = defineSinkPipe('kafka_events_sink', {
sink: {
connection: eventsKafka,
topic: 'events_export',
schedule: '@on-demand',
},
nodes: [
node({
name: 'publish',
sql: 'SELECT timestamp, payload FROM kafka_events',
}),
],
})
// S3 sink
export const s3EventsSink = defineSinkPipe('s3_events_sink', {
sink: {
connection: landingS3,
bucketUri: 's3://my-bucket/exports/',
fileTemplate: 'events_{date}',
format: 'csv',
schedule: '@once',
strategy: 'create_new',
compression: 'gzip',
},
nodes: [
node({
name: 'export',
sql: 'SELECT timestamp, session_id FROM s3_landing',
}),
],
})
Secret Helper
Use the secret() helper to securely reference environment variables:
import { secret } from '@tinybirdco/sdk'
// Reference a secret
const apiKey = secret('API_KEY')
// Produces: {{ tb_secret("API_KEY") }}
// With default value
const dbHost = secret('DB_HOST', 'localhost')
// Produces: {{ tb_secret("DB_HOST", "localhost") }}
Secrets are stored in Tinybird’s secret management system and referenced at runtime.
defineDatasource
Create datasources with connections
defineSinkPipe
Export data to external systems
secret
Secure secret references