Skip to main content

Function Signature

function defineCopyPipe<
  TSchema extends SchemaDefinition,
  TDatasource extends DatasourceDefinition<TSchema>
>(
  name: string,
  options: CopyPipeOptions<TSchema, TDatasource>
): PipeDefinition<Record<string, never>, DatasourceSchemaToOutput<TSchema>>
Define a Tinybird copy pipe that captures the result of a query at a specific moment in time and writes it into a target datasource. Copy pipes can run on a schedule or be executed on demand. Unlike materialized views which continuously update as new events are inserted, copy pipes generate a single snapshot at a specific point in time.

Parameters

name
string
required
The copy pipe name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
CopyPipeOptions<TSchema, TDatasource>
required
Copy pipe configuration object.

Return Type

PipeDefinition<Record<string, never>, DatasourceSchemaToOutput<TSchema>>
A pipe definition configured as a copy pipe with full type inference. Note that copy pipes do not accept parameters.

Usage Examples

Scheduled Daily Snapshot

import { defineCopyPipe, defineDatasource, node, t, engine } from '@tinybirdco/sdk'

// Target datasource for daily snapshots
const dailySalesSnapshot = defineDatasource('daily_sales_snapshot', {
  schema: {
    snapshot_date: t.date(),
    country: t.string(),
    total_sales: t.uint64(),
  },
  engine: engine.mergeTree({
    sortingKey: ['snapshot_date', 'country'],
  }),
})

// Copy pipe that runs daily at midnight
export const dailySalesCopy = defineCopyPipe('daily_sales_copy', {
  description: 'Daily snapshot of sales by country',
  datasource: dailySalesSnapshot,
  copy_schedule: '0 0 * * *', // Daily at midnight UTC
  copy_mode: 'append',
  nodes: [
    node({
      name: 'snapshot',
      sql: `
        SELECT
          today() AS snapshot_date,
          country,
          sum(sales) AS total_sales
        FROM sales
        WHERE date = today() - 1
        GROUP BY country
      `,
    }),
  ],
})

On-Demand Report Generation

import { defineCopyPipe, defineDatasource, node, t, engine } from '@tinybirdco/sdk'

const reportDatasource = defineDatasource('weekly_report', {
  schema: {
    generated_at: t.dateTime(),
    metric_name: t.string(),
    metric_value: t.float64(),
  },
  engine: engine.mergeTree({
    sortingKey: ['generated_at', 'metric_name'],
  }),
})

export const manualReport = defineCopyPipe('manual_report', {
  description: 'On-demand report generation',
  datasource: reportDatasource,
  copy_schedule: '@on-demand',
  copy_mode: 'replace', // Replace entire report each time
  nodes: [
    node({
      name: 'report',
      sql: `
        SELECT
          now() AS generated_at,
          'total_events' AS metric_name,
          count() AS metric_value
        FROM events
        WHERE timestamp >= now() - interval 7 day
      `,
    }),
  ],
})

Hourly Aggregation

import { defineCopyPipe, node } from '@tinybirdco/sdk'
import { hourlyMetrics } from './datasources'

export const hourlyMetricsCopy = defineCopyPipe('hourly_metrics_copy', {
  datasource: hourlyMetrics,
  copy_schedule: '0 * * * *', // Every hour at minute 0
  copy_mode: 'append',
  nodes: [
    node({
      name: 'aggregate',
      sql: `
        SELECT
          toStartOfHour(timestamp) AS hour,
          metric_name,
          avg(value) AS avg_value,
          max(value) AS max_value,
          min(value) AS min_value
        FROM raw_metrics
        WHERE timestamp >= toStartOfHour(now()) - interval 1 hour
          AND timestamp < toStartOfHour(now())
        GROUP BY hour, metric_name
      `,
    }),
  ],
})

Multi-Node Copy Pipeline

import { defineCopyPipe, node } from '@tinybirdco/sdk'
import { customerSegments } from './datasources'

export const segmentSnapshot = defineCopyPipe('segment_snapshot', {
  datasource: customerSegments,
  copy_schedule: '0 2 * * 0', // Weekly on Sunday at 2 AM UTC
  copy_mode: 'replace',
  nodes: [
    node({
      name: 'user_activity',
      sql: `
        SELECT
          user_id,
          count() AS event_count,
          max(timestamp) AS last_seen
        FROM events
        WHERE timestamp >= now() - interval 30 day
        GROUP BY user_id
      `,
    }),
    node({
      name: 'segmented',
      sql: `
        SELECT
          user_id,
          CASE
            WHEN event_count >= 50 THEN 'power_user'
            WHEN event_count >= 10 THEN 'active'
            ELSE 'casual'
          END AS segment
        FROM user_activity
      `,
    }),
  ],
})

Cron Schedule Examples

0 0 * * *

Manual Execution

For copy pipes with copy_schedule: '@on-demand', you can trigger them manually using the Tinybird CLI or API:
tb pipe copy run daily_sales_copy

Copy Mode Comparison

ModeBehaviorUse Case
appendAdds new rows on each runHistorical snapshots, time-series data
replaceReplaces all data on each runLatest state, reports, dimensions

defineDatasource

Create target datasources

defineMaterializedView

Continuous aggregation alternative

definePipe

Create transformation pipes

node

Create transformation nodes

Build docs developers (and LLMs) love