Skip to main content
Connections allow you to integrate external data sources with Tinybird. The SDK supports three types of connections: Kafka for real-time streaming, S3 for AWS data lakes, and GCS for Google Cloud Storage.

Connection Types

Kafka

Stream data from Kafka topics

S3

Import data from AWS S3 buckets

GCS

Import data from Google Cloud Storage

Kafka Connections

Define Kafka connections to stream data from Kafka topics into Tinybird datasources.

Basic Kafka Connection

import { defineKafkaConnection, secret } from '@tinybirdco/sdk';

export const eventsKafka = defineKafkaConnection('events_kafka', {
  bootstrapServers: 'kafka.example.com:9092',
  securityProtocol: 'SASL_SSL',
  saslMechanism: 'PLAIN',
  key: secret('KAFKA_KEY'),
  secret: secret('KAFKA_SECRET'),
});
Location: ~/workspace/source/src/schema/connection.ts:143-156

Connection Options

export interface KafkaConnectionOptions {
  /** Kafka bootstrap servers (host:port) */
  bootstrapServers: string;
  
  /** Security protocol (default: 'SASL_SSL') */
  securityProtocol?: 'SASL_SSL' | 'PLAINTEXT' | 'SASL_PLAINTEXT';
  
  /** SASL mechanism for authentication */
  saslMechanism?: 'PLAIN' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER';
  
  /** Kafka key/username */
  key?: string;
  
  /** Kafka secret/password */
  secret?: string;
  
  /** Schema Registry URL */
  schemaRegistryUrl?: string;
  
  /** SSL CA certificate PEM - for private CA certs */
  sslCaPem?: string;
}
Location: ~/workspace/source/src/schema/connection.ts:11-38

Using in Datasources

Use Kafka connections in datasource definitions:
import { defineDatasource, t, engine } from '@tinybirdco/sdk';
import { eventsKafka } from './connections';

export const kafkaEvents = defineDatasource('kafka_events', {
  schema: {
    timestamp: t.dateTime(),
    event_type: t.string(),
    payload: t.string(),
  },
  engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
  kafka: {
    connection: eventsKafka,
    topic: 'events',
    groupId: 'my-consumer-group',
    autoOffsetReset: 'earliest', // or 'latest'
    storeRawValue: false,        // optional
  },
});

Kafka Configuration Options

export interface KafkaConfig {
  /** Kafka connection to use */
  connection: KafkaConnectionDefinition;
  
  /** Kafka topic to consume from */
  topic: string;
  
  /** Consumer group ID (optional) */
  groupId?: string;
  
  /** Where to start reading: 'earliest' or 'latest' (default: 'latest') */
  autoOffsetReset?: 'earliest' | 'latest';
  
  /** Whether to store the raw Kafka value payload */
  storeRawValue?: boolean;
}
Location: ~/workspace/source/src/schema/datasource.ts:62-75

Multiple Kafka Connections

You can define multiple Kafka connections for different clusters:
export const productionKafka = defineKafkaConnection('prod_kafka', {
  bootstrapServers: 'kafka-prod.example.com:9092',
  securityProtocol: 'SASL_SSL',
  saslMechanism: 'PLAIN',
  key: secret('PROD_KAFKA_KEY'),
  secret: secret('PROD_KAFKA_SECRET'),
});

export const stagingKafka = defineKafkaConnection('staging_kafka', {
  bootstrapServers: 'kafka-staging.example.com:9092',
  securityProtocol: 'SASL_SSL',
  saslMechanism: 'PLAIN',
  key: secret('STAGING_KAFKA_KEY'),
  secret: secret('STAGING_KAFKA_SECRET'),
});

S3 Connections

Define S3 connections to import data from AWS S3 buckets into Tinybird.

Basic S3 Connection

import { defineS3Connection } from '@tinybirdco/sdk';

// Using IAM role (recommended)
export const landingS3 = defineS3Connection('landing_s3', {
  region: 'us-east-1',
  arn: 'arn:aws:iam::123456789012:role/tinybird-s3-access',
});

// Using access keys
export const dataLakeS3 = defineS3Connection('data_lake_s3', {
  region: 'us-west-2',
  accessKey: secret('S3_ACCESS_KEY'),
  secret: secret('S3_SECRET_KEY'),
});
Location: ~/workspace/source/src/schema/connection.ts:170-193

Connection Options

export interface S3ConnectionOptions {
  /** S3 bucket region (for example: us-east-1) */
  region: string;
  
  /** IAM role ARN used by Tinybird to access the bucket */
  arn?: string;
  
  /** S3 access key for key/secret auth */
  accessKey?: string;
  
  /** S3 secret key for key/secret auth */
  secret?: string;
}
Note: You must provide either arn or both accessKey and secret. Location: ~/workspace/source/src/schema/connection.ts:56-67

Using in Datasources

import { defineDatasource, t, engine } from '@tinybirdco/sdk';
import { landingS3 } from './connections';

export const s3Landing = defineDatasource('s3_landing', {
  schema: {
    timestamp: t.dateTime(),
    session_id: t.string(),
    event_data: t.json(),
  },
  engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
  s3: {
    connection: landingS3,
    bucketUri: 's3://my-bucket/events/*.csv',
    schedule: '@auto',  // or '@once' or cron expression
    fromTimestamp: 'toDateTime(timestamp)', // optional: incremental imports
  },
});

S3 Configuration Options

export interface S3Config {
  /** S3 connection to use */
  connection: S3ConnectionDefinition;
  
  /** S3 bucket URI, for example: s3://my-bucket/path/*.csv */
  bucketUri: string;
  
  /** Import schedule, for example: @auto or @once */
  schedule?: string;
  
  /** Incremental import lower bound timestamp expression */
  fromTimestamp?: string;
}
Location: ~/workspace/source/src/schema/datasource.ts:77-90

Using in Sink Pipes

Export data to S3 using sink pipes:
import { defineSinkPipe, node } from '@tinybirdco/sdk';
import { landingS3 } from './connections';

export const s3EventsSink = defineSinkPipe('s3_events_sink', {
  sink: {
    connection: landingS3,
    bucketUri: 's3://my-bucket/exports/',
    fileTemplate: 'events_{date}',
    format: 'csv',
    schedule: '@once',
    strategy: 'create_new',  // or 'replace'
    compression: 'gzip',     // or 'snappy', 'none'
  },
  nodes: [
    node({
      name: 'export',
      sql: 'SELECT timestamp, session_id FROM events',
    }),
  ],
});
Location: ~/workspace/source/src/schema/pipe.ts:184-202

GCS Connections

Define GCS connections to import data from Google Cloud Storage buckets.

Basic GCS Connection

import { defineGCSConnection, secret } from '@tinybirdco/sdk';

export const landingGCS = defineGCSConnection('landing_gcs', {
  serviceAccountCredentialsJson: secret('GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON'),
});
Location: ~/workspace/source/src/schema/connection.ts:195-219

Connection Options

export interface GCSConnectionOptions {
  /** Service account credentials JSON */
  serviceAccountCredentialsJson: string;
}
Location: ~/workspace/source/src/schema/connection.ts:85-90

Using in Datasources

import { defineDatasource, t, engine } from '@tinybirdco/sdk';
import { landingGCS } from './connections';

export const gcsLanding = defineDatasource('gcs_landing', {
  schema: {
    timestamp: t.dateTime(),
    session_id: t.string(),
  },
  engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
  gcs: {
    connection: landingGCS,
    bucketUri: 'gs://my-gcs-bucket/events/*.csv',
    schedule: '@auto',
    fromTimestamp: 'toDateTime(timestamp)', // optional
  },
});

GCS Configuration Options

export interface GCSConfig {
  /** GCS connection to use */
  connection: GCSConnectionDefinition;
  
  /** GCS bucket URI, for example: gs://my-bucket/path/*.csv */
  bucketUri: string;
  
  /** Import schedule, for example: @auto or @once */
  schedule?: string;
  
  /** Incremental import lower bound timestamp expression */
  fromTimestamp?: string;
}
Location: ~/workspace/source/src/schema/datasource.ts:92-103

Secrets

The SDK provides a secret() helper to reference Tinybird secrets in connection configurations:
import { secret } from '@tinybirdco/sdk';

// Reference a secret
const apiKey = secret('API_KEY');
// Generates: {{ tb_secret("API_KEY") }}

// With default value
const apiKeyWithDefault = secret('API_KEY', 'default-value');
// Generates: {{ tb_secret("API_KEY", "default-value") }}
Location: ~/workspace/source/src/schema/secret.ts:1-17

Using Secrets in Connections

import { defineKafkaConnection, secret } from '@tinybirdco/sdk';

export const kafka = defineKafkaConnection('my_kafka', {
  bootstrapServers: 'kafka.example.com:9092',
  securityProtocol: 'SASL_SSL',
  saslMechanism: 'PLAIN',
  key: secret('KAFKA_KEY'),
  secret: secret('KAFKA_SECRET'),
});

Managing Secrets

Secrets are managed in the Tinybird dashboard or via the API. The SDK references them at runtime:
  1. Create secrets in Tinybird dashboard
  2. Reference them in your TypeScript code using secret()
  3. The SDK generates the correct template syntax
  4. Tinybird resolves secrets at deployment time
Never hardcode sensitive credentials in your code. Always use the secret() helper to reference Tinybird secrets.

Connection in Generated Files

When you deploy connections, the SDK generates .connection files:

Kafka Connection File

TYPE kafka
KAFKA_BOOTSTRAP_SERVERS kafka.example.com:9092
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM PLAIN
KAFKA_KEY {{ tb_secret("KAFKA_KEY") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET") }}

S3 Connection File

TYPE s3
REGION us-east-1
ARN arn:aws:iam::123456789012:role/tinybird-s3-access

GCS Connection File

TYPE gcs
SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON") }}

Import Schedules

For S3 and GCS datasources, configure import schedules:
ScheduleDescription
@autoAutomatic continuous import
@onceImport once on deployment
Cron expressionCustom schedule (e.g., 0 * * * * for hourly)
export const scheduledImport = defineDatasource('scheduled_import', {
  schema: { /* ... */ },
  s3: {
    connection: landingS3,
    bucketUri: 's3://bucket/data/*.csv',
    schedule: '0 */6 * * *', // Every 6 hours
  },
});

Incremental Imports

Use fromTimestamp for incremental imports to avoid reprocessing old data:
export const incrementalImport = defineDatasource('incremental', {
  schema: {
    timestamp: t.dateTime(),
    data: t.string(),
  },
  s3: {
    connection: landingS3,
    bucketUri: 's3://bucket/data/*.csv',
    schedule: '@auto',
    fromTimestamp: 'toDateTime(timestamp)', // Only import newer data
  },
});

Type Guards

The SDK provides type guards for connection types:
import {
  isConnectionDefinition,
  isKafkaConnectionDefinition,
  isS3ConnectionDefinition,
  isGCSConnectionDefinition,
} from '@tinybirdco/sdk';

if (isKafkaConnectionDefinition(connection)) {
  // connection is typed as KafkaConnectionDefinition
  console.log(connection.options.bootstrapServers);
}
Location: ~/workspace/source/src/schema/connection.ts:224-252

Best Practices

IAM roles are more secure than access keys and easier to manage:
// Recommended
export const s3 = defineS3Connection('my_s3', {
  region: 'us-east-1',
  arn: 'arn:aws:iam::123456789012:role/tinybird-access',
});

// Avoid (unless necessary)
export const s3 = defineS3Connection('my_s3', {
  region: 'us-east-1',
  accessKey: secret('ACCESS_KEY'),
  secret: secret('SECRET_KEY'),
});
Keep all connections in a single file for easy management:
// src/tinybird/connections.ts
export const eventsKafka = defineKafkaConnection('events_kafka', { /* ... */ });
export const landingS3 = defineS3Connection('landing_s3', { /* ... */ });
export const landingGCS = defineGCSConnection('landing_gcs', { /* ... */ });
Choose clear, descriptive names that indicate the connection’s purpose:
// Good
export const productionEventsKafka = defineKafkaConnection('prod_events_kafka', { /* ... */ });
export const analyticsDataLakeS3 = defineS3Connection('analytics_data_lake_s3', { /* ... */ });

// Avoid
export const kafka1 = defineKafkaConnection('kafka1', { /* ... */ });
export const s3 = defineS3Connection('s3', { /* ... */ });

Datasources

Learn how to use connections in datasources

Pipes

Learn about sink pipes that export to S3/Kafka

Secrets

Reference for the secret() helper function

Configuration

Learn about managing secrets in Tinybird

Build docs developers (and LLMs) love