Skip to main content
Datasources are tables in Tinybird where you store and query data. In the TypeScript SDK, you define datasources using the defineDatasource() function with schema definitions, table engines, and ingestion configurations.

Basic Datasource

Here’s a simple datasource definition:
import { defineDatasource, t, engine } from '@tinybirdco/sdk';

export const events = defineDatasource('events', {
  description: 'Event tracking data',
  schema: {
    timestamp: t.dateTime(),
    event_name: t.string().lowCardinality(),
    user_id: t.string().nullable(),
    properties: t.string(), // JSON as string
  },
  engine: engine.mergeTree({
    sortingKey: ['event_name', 'timestamp'],
    partitionKey: 'toYYYYMM(timestamp)',
    ttl: 'timestamp + INTERVAL 90 DAY',
  }),
});

Schema Definition

Schemas are defined using the t.* type validators, which map to ClickHouse column types:

String Types

const schema = {
  name: t.string(),                    // Variable length UTF-8 string
  code: t.fixedString(3),             // Fixed length string (e.g., country codes)
  id: t.uuid(),                       // UUID type
};

Numeric Types

const schema = {
  // Integers
  count: t.int32(),                   // Signed 32-bit integer
  big_count: t.int64(),               // Signed 64-bit integer
  huge_count: t.uint64(),             // Unsigned 64-bit integer
  
  // Floating point
  amount: t.float64(),                // Double precision float
  price: t.decimal(10, 2),            // Fixed-point decimal (10 digits, 2 decimal places)
};

Date and Time Types

const schema = {
  date: t.date(),                     // YYYY-MM-DD format
  timestamp: t.dateTime(),            // YYYY-MM-DD HH:MM:SS format
  precise_time: t.dateTime64(3),     // With milliseconds
  utc_time: t.dateTime('UTC'),       // With timezone
};

Complex Types

const schema = {
  tags: t.array(t.string()),          // Array of strings
  metadata: t.map(t.string(), t.string()), // Key-value map
  json_data: t.json(),                // Semi-structured JSON
  coordinates: t.tuple(t.float64(), t.float64()), // Tuple (lat, lng)
};

Type Modifiers

All type validators support modifiers:
const schema = {
  // Nullable columns
  optional_field: t.string().nullable(),
  
  // LowCardinality optimization for strings with few unique values
  category: t.string().lowCardinality(),
  
  // Default values
  status: t.string().default('pending'),
  
  // Compression codec
  large_text: t.string().codec('ZSTD(1)'),
  
  // Custom JSON path for extraction
  user_id: t.string().jsonPath('$.user.id'),
};

Table Engines

The SDK supports all major ClickHouse table engines through the engine helper:

MergeTree (Default)

The most universal engine for high-load analytics:
import { engine } from '@tinybirdco/sdk';

const config = engine.mergeTree({
  sortingKey: ['user_id', 'timestamp'],
  partitionKey: 'toYYYYMM(timestamp)',
  ttl: 'timestamp + INTERVAL 90 DAY',
  settings: {
    index_granularity: 8192,
  },
});
When to use: General-purpose analytics, logs, events, time-series data.

ReplacingMergeTree

Removes duplicate rows during background merges:
const config = engine.replacingMergeTree({
  sortingKey: ['id'],
  ver: 'updated_at', // Keep rows with highest version
});
When to use: Maintaining latest state, upserts, slowly changing dimensions.

SummingMergeTree

Sums numeric columns during background merges:
const config = engine.summingMergeTree({
  sortingKey: ['date', 'metric_name'],
  columns: ['value'], // Columns to sum
});
When to use: Counters, metrics aggregation, pre-aggregated data.

AggregatingMergeTree

For incremental aggregation with AggregateFunction columns:
const config = engine.aggregatingMergeTree({
  sortingKey: ['date', 'country'],
});
When to use: Materialized views with complex aggregates, incremental aggregation pipelines.

CollapsingMergeTree

For collapsing state/cancel row pairs:
const config = engine.collapsingMergeTree({
  sortingKey: ['id', 'timestamp'],
  sign: 'sign_column', // 1 for state, -1 for cancel
});
When to use: Changelog-style updates, mutable data with deletes.

VersionedCollapsingMergeTree

Collapsing with versioning for out-of-order events:
const config = engine.versionedCollapsingMergeTree({
  sortingKey: ['id'],
  sign: 'sign',
  version: 'version',
});
When to use: Changelog-style updates with potential out-of-order arrival.

External Data Ingestion

Kafka

Consume data from Kafka topics:
import { defineKafkaConnection, defineDatasource, t, engine, secret } from '@tinybirdco/sdk';

// Define the connection
export const eventsKafka = defineKafkaConnection('events_kafka', {
  bootstrapServers: 'kafka.example.com:9092',
  securityProtocol: 'SASL_SSL',
  saslMechanism: 'PLAIN',
  key: secret('KAFKA_KEY'),
  secret: secret('KAFKA_SECRET'),
});

// Use in datasource
export const kafkaEvents = defineDatasource('kafka_events', {
  schema: {
    timestamp: t.dateTime(),
    payload: t.string(),
  },
  engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
  kafka: {
    connection: eventsKafka,
    topic: 'events',
    groupId: 'events-consumer',
    autoOffsetReset: 'earliest', // or 'latest'
  },
});
Location: ~/workspace/source/src/schema/datasource.ts:64-75

S3

Import data from S3 buckets:
import { defineS3Connection, defineDatasource, t, engine } from '@tinybirdco/sdk';

// Define the connection
export const landingS3 = defineS3Connection('landing_s3', {
  region: 'us-east-1',
  arn: 'arn:aws:iam::123456789012:role/tinybird-s3-access',
});

// Use in datasource
export const s3Landing = defineDatasource('s3_landing', {
  schema: {
    timestamp: t.dateTime(),
    session_id: t.string(),
  },
  engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
  s3: {
    connection: landingS3,
    bucketUri: 's3://my-bucket/events/*.csv',
    schedule: '@auto', // or '@once' or cron expression
  },
});
Location: ~/workspace/source/src/schema/datasource.ts:78-89

GCS

Import data from Google Cloud Storage:
import { defineGCSConnection, defineDatasource, t, engine, secret } from '@tinybirdco/sdk';

// Define the connection
export const landingGCS = defineGCSConnection('landing_gcs', {
  serviceAccountCredentialsJson: secret('GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON'),
});

// Use in datasource
export const gcsLanding = defineDatasource('gcs_landing', {
  schema: {
    timestamp: t.dateTime(),
    session_id: t.string(),
  },
  engine: engine.mergeTree({ sortingKey: ['timestamp'] }),
  gcs: {
    connection: landingGCS,
    bucketUri: 'gs://my-gcs-bucket/events/*.csv',
    schedule: '@auto',
  },
});
Location: ~/workspace/source/src/schema/datasource.ts:92-103

Aggregate Functions

For materialized views with AggregatingMergeTree:
import { defineDatasource, t, engine } from '@tinybirdco/sdk';

export const dailyStats = defineDatasource('daily_stats', {
  schema: {
    date: t.date(),
    pathname: t.string(),
    // SimpleAggregateFunction for simple aggregates (sum, min, max, any)
    views: t.simpleAggregateFunction('sum', t.uint64()),
    // AggregateFunction for complex aggregates (uniq, quantile, etc.)
    unique_sessions: t.aggregateFunction('uniq', t.string()),
  },
  engine: engine.aggregatingMergeTree({
    sortingKey: ['date', 'pathname'],
  }),
});
Location: ~/workspace/source/src/schema/types.ts:392-424

Column Definitions

For advanced column configurations, use the column() helper:
import { defineDatasource, t, column } from '@tinybirdco/sdk';

export const events = defineDatasource('events', {
  schema: {
    // Simple column
    id: t.string(),
    
    // Column with custom JSON extraction path
    user_id: column(t.string(), { jsonPath: '$.user.id' }),
  },
});
Location: ~/workspace/source/src/schema/datasource.ts:317-325

Secondary Indexes

Add secondary indexes for faster filtering:
export const events = defineDatasource('events', {
  schema: {
    timestamp: t.dateTime(),
    user_id: t.string(),
    event_type: t.string(),
  },
  engine: engine.mergeTree({
    sortingKey: ['timestamp'],
  }),
  indexes: [
    {
      name: 'user_idx',
      expr: 'user_id',
      type: 'set(100)',
      granularity: 4,
    },
  ],
});
Location: ~/workspace/source/src/schema/datasource.ts:106-118

JSON Path Generation

By default, the SDK generates JSON path expressions for all columns. Disable this for datasources that are targets of materialized views:
export const targetDatasource = defineDatasource('target', {
  schema: {
    date: t.date(),
    total: t.uint64(),
  },
  jsonPaths: false, // Disable JSON path generation
});

Schema Evolution

For incompatible schema changes, use a forward query:
export const events = defineDatasource('events', {
  schema: {
    timestamp: t.dateTime(),
    event_name: t.string(),
    new_column: t.int32(), // New column added
  },
  forwardQuery: 'timestamp, event_name, 0 as new_column',
});
Location: ~/workspace/source/src/schema/datasource.ts:140-144

Pipes

Learn how to transform and query datasource data

Type Inference

Extract TypeScript types from datasources

Connections

External data connections (Kafka, S3, GCS)

Type Validators

Complete reference of t.* validators

Build docs developers (and LLMs) love