Connections allow you to integrate external data sources with Tinybird. The SDK supports three types of connections: Kafka for real-time streaming, S3 for AWS data lakes, and GCS for Google Cloud Storage.
Connection Types
Kafka Stream data from Kafka topics
S3 Import data from AWS S3 buckets
GCS Import data from Google Cloud Storage
Kafka Connections
Define Kafka connections to stream data from Kafka topics into Tinybird datasources.
Basic Kafka Connection
import { defineKafkaConnection , secret } from '@tinybirdco/sdk' ;
export const eventsKafka = defineKafkaConnection ( 'events_kafka' , {
bootstrapServers: 'kafka.example.com:9092' ,
securityProtocol: 'SASL_SSL' ,
saslMechanism: 'PLAIN' ,
key: secret ( 'KAFKA_KEY' ),
secret: secret ( 'KAFKA_SECRET' ),
});
Location: ~/workspace/source/src/schema/connection.ts:143-156
Connection Options
export interface KafkaConnectionOptions {
/** Kafka bootstrap servers (host:port) */
bootstrapServers : string ;
/** Security protocol (default: 'SASL_SSL') */
securityProtocol ?: 'SASL_SSL' | 'PLAINTEXT' | 'SASL_PLAINTEXT' ;
/** SASL mechanism for authentication */
saslMechanism ?: 'PLAIN' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER' ;
/** Kafka key/username */
key ?: string ;
/** Kafka secret/password */
secret ?: string ;
/** Schema Registry URL */
schemaRegistryUrl ?: string ;
/** SSL CA certificate PEM - for private CA certs */
sslCaPem ?: string ;
}
Location: ~/workspace/source/src/schema/connection.ts:11-38
Using in Datasources
Use Kafka connections in datasource definitions:
import { defineDatasource , t , engine } from '@tinybirdco/sdk' ;
import { eventsKafka } from './connections' ;
export const kafkaEvents = defineDatasource ( 'kafka_events' , {
schema: {
timestamp: t . dateTime (),
event_type: t . string (),
payload: t . string (),
},
engine: engine . mergeTree ({ sortingKey: [ 'timestamp' ] }),
kafka: {
connection: eventsKafka ,
topic: 'events' ,
groupId: 'my-consumer-group' ,
autoOffsetReset: 'earliest' , // or 'latest'
storeRawValue: false , // optional
},
});
Kafka Configuration Options
export interface KafkaConfig {
/** Kafka connection to use */
connection : KafkaConnectionDefinition ;
/** Kafka topic to consume from */
topic : string ;
/** Consumer group ID (optional) */
groupId ?: string ;
/** Where to start reading: 'earliest' or 'latest' (default: 'latest') */
autoOffsetReset ?: 'earliest' | 'latest' ;
/** Whether to store the raw Kafka value payload */
storeRawValue ?: boolean ;
}
Location: ~/workspace/source/src/schema/datasource.ts:62-75
Multiple Kafka Connections
You can define multiple Kafka connections for different clusters:
export const productionKafka = defineKafkaConnection ( 'prod_kafka' , {
bootstrapServers: 'kafka-prod.example.com:9092' ,
securityProtocol: 'SASL_SSL' ,
saslMechanism: 'PLAIN' ,
key: secret ( 'PROD_KAFKA_KEY' ),
secret: secret ( 'PROD_KAFKA_SECRET' ),
});
export const stagingKafka = defineKafkaConnection ( 'staging_kafka' , {
bootstrapServers: 'kafka-staging.example.com:9092' ,
securityProtocol: 'SASL_SSL' ,
saslMechanism: 'PLAIN' ,
key: secret ( 'STAGING_KAFKA_KEY' ),
secret: secret ( 'STAGING_KAFKA_SECRET' ),
});
S3 Connections
Define S3 connections to import data from AWS S3 buckets into Tinybird.
Basic S3 Connection
import { defineS3Connection } from '@tinybirdco/sdk' ;
// Using IAM role (recommended)
export const landingS3 = defineS3Connection ( 'landing_s3' , {
region: 'us-east-1' ,
arn: 'arn:aws:iam::123456789012:role/tinybird-s3-access' ,
});
// Using access keys
export const dataLakeS3 = defineS3Connection ( 'data_lake_s3' , {
region: 'us-west-2' ,
accessKey: secret ( 'S3_ACCESS_KEY' ),
secret: secret ( 'S3_SECRET_KEY' ),
});
Location: ~/workspace/source/src/schema/connection.ts:170-193
Connection Options
export interface S3ConnectionOptions {
/** S3 bucket region (for example: us-east-1) */
region : string ;
/** IAM role ARN used by Tinybird to access the bucket */
arn ?: string ;
/** S3 access key for key/secret auth */
accessKey ?: string ;
/** S3 secret key for key/secret auth */
secret ?: string ;
}
Note: You must provide either arn or both accessKey and secret.
Location: ~/workspace/source/src/schema/connection.ts:56-67
Using in Datasources
import { defineDatasource , t , engine } from '@tinybirdco/sdk' ;
import { landingS3 } from './connections' ;
export const s3Landing = defineDatasource ( 's3_landing' , {
schema: {
timestamp: t . dateTime (),
session_id: t . string (),
event_data: t . json (),
},
engine: engine . mergeTree ({ sortingKey: [ 'timestamp' ] }),
s3: {
connection: landingS3 ,
bucketUri: 's3://my-bucket/events/*.csv' ,
schedule: '@auto' , // or '@once' or cron expression
fromTimestamp: 'toDateTime(timestamp)' , // optional: incremental imports
},
});
S3 Configuration Options
export interface S3Config {
/** S3 connection to use */
connection : S3ConnectionDefinition ;
/** S3 bucket URI, for example: s3://my-bucket/path/*.csv */
bucketUri : string ;
/** Import schedule, for example: @auto or @once */
schedule ?: string ;
/** Incremental import lower bound timestamp expression */
fromTimestamp ?: string ;
}
Location: ~/workspace/source/src/schema/datasource.ts:77-90
Using in Sink Pipes
Export data to S3 using sink pipes:
import { defineSinkPipe , node } from '@tinybirdco/sdk' ;
import { landingS3 } from './connections' ;
export const s3EventsSink = defineSinkPipe ( 's3_events_sink' , {
sink: {
connection: landingS3 ,
bucketUri: 's3://my-bucket/exports/' ,
fileTemplate: 'events_{date}' ,
format: 'csv' ,
schedule: '@once' ,
strategy: 'create_new' , // or 'replace'
compression: 'gzip' , // or 'snappy', 'none'
},
nodes: [
node ({
name: 'export' ,
sql: 'SELECT timestamp, session_id FROM events' ,
}),
],
});
Location: ~/workspace/source/src/schema/pipe.ts:184-202
GCS Connections
Define GCS connections to import data from Google Cloud Storage buckets.
Basic GCS Connection
import { defineGCSConnection , secret } from '@tinybirdco/sdk' ;
export const landingGCS = defineGCSConnection ( 'landing_gcs' , {
serviceAccountCredentialsJson: secret ( 'GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON' ),
});
Location: ~/workspace/source/src/schema/connection.ts:195-219
Connection Options
export interface GCSConnectionOptions {
/** Service account credentials JSON */
serviceAccountCredentialsJson : string ;
}
Location: ~/workspace/source/src/schema/connection.ts:85-90
Using in Datasources
import { defineDatasource , t , engine } from '@tinybirdco/sdk' ;
import { landingGCS } from './connections' ;
export const gcsLanding = defineDatasource ( 'gcs_landing' , {
schema: {
timestamp: t . dateTime (),
session_id: t . string (),
},
engine: engine . mergeTree ({ sortingKey: [ 'timestamp' ] }),
gcs: {
connection: landingGCS ,
bucketUri: 'gs://my-gcs-bucket/events/*.csv' ,
schedule: '@auto' ,
fromTimestamp: 'toDateTime(timestamp)' , // optional
},
});
GCS Configuration Options
export interface GCSConfig {
/** GCS connection to use */
connection : GCSConnectionDefinition ;
/** GCS bucket URI, for example: gs://my-bucket/path/*.csv */
bucketUri : string ;
/** Import schedule, for example: @auto or @once */
schedule ?: string ;
/** Incremental import lower bound timestamp expression */
fromTimestamp ?: string ;
}
Location: ~/workspace/source/src/schema/datasource.ts:92-103
Secrets
The SDK provides a secret() helper to reference Tinybird secrets in connection configurations:
import { secret } from '@tinybirdco/sdk' ;
// Reference a secret
const apiKey = secret ( 'API_KEY' );
// Generates: {{ tb_secret("API_KEY") }}
// With default value
const apiKeyWithDefault = secret ( 'API_KEY' , 'default-value' );
// Generates: {{ tb_secret("API_KEY", "default-value") }}
Location: ~/workspace/source/src/schema/secret.ts:1-17
Using Secrets in Connections
import { defineKafkaConnection , secret } from '@tinybirdco/sdk' ;
export const kafka = defineKafkaConnection ( 'my_kafka' , {
bootstrapServers: 'kafka.example.com:9092' ,
securityProtocol: 'SASL_SSL' ,
saslMechanism: 'PLAIN' ,
key: secret ( 'KAFKA_KEY' ),
secret: secret ( 'KAFKA_SECRET' ),
});
Managing Secrets
Secrets are managed in the Tinybird dashboard or via the API. The SDK references them at runtime:
Create secrets in Tinybird dashboard
Reference them in your TypeScript code using secret()
The SDK generates the correct template syntax
Tinybird resolves secrets at deployment time
Never hardcode sensitive credentials in your code. Always use the secret() helper to reference Tinybird secrets.
Connection in Generated Files
When you deploy connections, the SDK generates .connection files:
Kafka Connection File
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS kafka.example.com:9092
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM PLAIN
KAFKA_KEY {{ tb_secret("KAFKA_KEY") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET") }}
S3 Connection File
TYPE s3
REGION us-east-1
ARN arn:aws:iam::123456789012:role/tinybird-s3-access
GCS Connection File
TYPE gcs
SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON") }}
Import Schedules
For S3 and GCS datasources, configure import schedules:
Schedule Description @autoAutomatic continuous import @onceImport once on deployment Cron expression Custom schedule (e.g., 0 * * * * for hourly)
export const scheduledImport = defineDatasource ( 'scheduled_import' , {
schema: { /* ... */ },
s3: {
connection: landingS3 ,
bucketUri: 's3://bucket/data/*.csv' ,
schedule: '0 */6 * * *' , // Every 6 hours
},
});
Incremental Imports
Use fromTimestamp for incremental imports to avoid reprocessing old data:
export const incrementalImport = defineDatasource ( 'incremental' , {
schema: {
timestamp: t . dateTime (),
data: t . string (),
},
s3: {
connection: landingS3 ,
bucketUri: 's3://bucket/data/*.csv' ,
schedule: '@auto' ,
fromTimestamp: 'toDateTime(timestamp)' , // Only import newer data
},
});
Type Guards
The SDK provides type guards for connection types:
import {
isConnectionDefinition ,
isKafkaConnectionDefinition ,
isS3ConnectionDefinition ,
isGCSConnectionDefinition ,
} from '@tinybirdco/sdk' ;
if ( isKafkaConnectionDefinition ( connection )) {
// connection is typed as KafkaConnectionDefinition
console . log ( connection . options . bootstrapServers );
}
Location: ~/workspace/source/src/schema/connection.ts:224-252
Best Practices
Use IAM roles for S3 when possible
IAM roles are more secure than access keys and easier to manage: // Recommended
export const s3 = defineS3Connection ( 'my_s3' , {
region: 'us-east-1' ,
arn: 'arn:aws:iam::123456789012:role/tinybird-access' ,
});
// Avoid (unless necessary)
export const s3 = defineS3Connection ( 'my_s3' , {
region: 'us-east-1' ,
accessKey: secret ( 'ACCESS_KEY' ),
secret: secret ( 'SECRET_KEY' ),
});
Organize connections in a dedicated file
Keep all connections in a single file for easy management: // src/tinybird/connections.ts
export const eventsKafka = defineKafkaConnection ( 'events_kafka' , { /* ... */ });
export const landingS3 = defineS3Connection ( 'landing_s3' , { /* ... */ });
export const landingGCS = defineGCSConnection ( 'landing_gcs' , { /* ... */ });
Use descriptive connection names
Choose clear, descriptive names that indicate the connection’s purpose: // Good
export const productionEventsKafka = defineKafkaConnection ( 'prod_events_kafka' , { /* ... */ });
export const analyticsDataLakeS3 = defineS3Connection ( 'analytics_data_lake_s3' , { /* ... */ });
// Avoid
export const kafka1 = defineKafkaConnection ( 'kafka1' , { /* ... */ });
export const s3 = defineS3Connection ( 's3' , { /* ... */ });
Datasources Learn how to use connections in datasources
Pipes Learn about sink pipes that export to S3/Kafka
Secrets Reference for the secret() helper function
Configuration Learn about managing secrets in Tinybird