Skip to main content
Copy pipes capture query results at specific points in time and write them to a target datasource. Unlike materialized views that update continuously, copy pipes run on a schedule.

What are Copy Pipes?

Copy pipes execute a query and copy the results to a datasource either on a schedule or on-demand. Use copy pipes when:
  • You need periodic snapshots of data
  • You want to control when aggregations run
  • You’re creating reports or dashboards with point-in-time data
  • You want to archive historical data
Use materialized views instead when:
  • You need real-time aggregations
  • You want data to update as events arrive

Defining Copy Pipes

Use defineCopyPipe() to create a copy pipe:
import { defineCopyPipe, defineDatasource, node, t, engine } from "@tinybirdco/sdk";

// Target datasource for snapshots
export const dailySalesSnapshot = defineDatasource("daily_sales_snapshot", {
  schema: {
    snapshot_date: t.date(),
    country: t.string(),
    total_sales: t.uint64(),
  },
  engine: engine.mergeTree({
    sortingKey: ["snapshot_date", "country"],
  }),
});

// Copy pipe that runs daily at midnight
export const dailySalesCopy = defineCopyPipe("daily_sales_copy", {
  description: "Daily snapshot of sales by country",
  datasource: dailySalesSnapshot,
  copy_schedule: "0 0 * * *", // Daily at midnight UTC
  copy_mode: "append",
  nodes: [
    node({
      name: "snapshot",
      sql: `
        SELECT
          today() AS snapshot_date,
          country,
          sum(sales) AS total_sales
        FROM sales
        WHERE date = today() - 1
        GROUP BY country
      `,
    }),
  ],
});

Copy Modes

Control how data is written to the target datasource:

Append Mode (Default)

Appends new rows to the datasource on each run:
export const dailySnapshot = defineCopyPipe("daily_snapshot", {
  datasource: snapshotDatasource,
  copy_mode: "append",
  copy_schedule: "0 0 * * *",
  nodes: [
    node({
      name: "snapshot",
      sql: `
        SELECT today() AS snapshot_date, pathname, count() AS views
        FROM page_views
        WHERE toDate(timestamp) = today() - 1
        GROUP BY pathname
      `,
    }),
  ],
});
Use append mode when:
  • You’re creating historical snapshots
  • Each run produces new data (e.g., daily aggregations)
  • You want to keep all historical runs

Replace Mode

Replaces all data in the datasource on each run:
export const latestReport = defineCopyPipe("latest_report", {
  datasource: reportDatasource,
  copy_mode: "replace",
  copy_schedule: "0 * * * *", // Hourly
  nodes: [
    node({
      name: "current_stats",
      sql: `
        SELECT * FROM events
        WHERE timestamp >= now() - interval 24 hour
      `,
    }),
  ],
});
Use replace mode when:
  • You only need the latest data
  • You’re creating reports that show current state
  • You want to limit datasource size

Copy Schedules

Define when the copy pipe runs using cron expressions or special values:

Cron Expressions

Standard cron format: minute hour day month dayofweek
// Daily at midnight UTC
copy_schedule: "0 0 * * *"

// Every hour
copy_schedule: "0 * * * *"

// Every 15 minutes
copy_schedule: "*/15 * * * *"

// Weekdays at 9 AM
copy_schedule: "0 9 * * 1-5"

// First day of month at midnight
copy_schedule: "0 0 1 * *"

On-Demand Execution

Manual execution only (no automatic schedule):
export const manualReport = defineCopyPipe("manual_report", {
  datasource: reportDatasource,
  copy_schedule: "@on-demand",
  copy_mode: "replace",
  nodes: [
    node({
      name: "report",
      sql: `SELECT * FROM events WHERE timestamp >= now() - interval 7 day`,
    }),
  ],
});
Execute manually via the Tinybird UI or API.

Complete Examples

Daily Report

Create daily aggregations with historical tracking:
import { defineCopyPipe, defineDatasource, node, t, engine } from "@tinybirdco/sdk";

// Target datasource
export const dailyMetrics = defineDatasource("daily_metrics", {
  description: "Daily aggregated metrics",
  schema: {
    report_date: t.date(),
    event_type: t.string(),
    event_count: t.uint64(),
    unique_users: t.uint64(),
  },
  engine: engine.mergeTree({
    sortingKey: ["report_date", "event_type"],
  }),
});

// Copy pipe running daily at 1 AM
export const dailyMetricsCopy = defineCopyPipe("daily_metrics_copy", {
  description: "Generate daily metrics report",
  datasource: dailyMetrics,
  copy_schedule: "0 1 * * *", // 1 AM UTC daily
  copy_mode: "append",
  nodes: [
    node({
      name: "daily_aggregation",
      sql: `
        SELECT
          yesterday() AS report_date,
          event_type,
          count() AS event_count,
          uniqExact(user_id) AS unique_users
        FROM events
        WHERE toDate(timestamp) = yesterday()
        GROUP BY event_type
      `,
    }),
  ],
});

Weekly Summary

Create weekly snapshots every Monday:
import { defineCopyPipe, defineDatasource, node, t, engine } from "@tinybirdco/sdk";

export const weeklySignups = defineDatasource("weekly_signups", {
  schema: {
    week_start: t.date(),
    country: t.string(),
    signup_count: t.uint64(),
  },
  engine: engine.mergeTree({
    sortingKey: ["week_start", "country"],
  }),
});

export const weeklySignupsCopy = defineCopyPipe("weekly_signups_copy", {
  description: "Weekly signup counts by country",
  datasource: weeklySignups,
  copy_schedule: "0 2 * * 1", // Mondays at 2 AM
  copy_mode: "append",
  nodes: [
    node({
      name: "weekly_stats",
      sql: `
        SELECT
          toMonday(today()) - interval 7 day AS week_start,
          country,
          count() AS signup_count
        FROM user_signups
        WHERE timestamp >= toMonday(today()) - interval 7 day
          AND timestamp < toMonday(today())
        GROUP BY country
      `,
    }),
  ],
});

Current State Report

Replace mode for always-current data:
import { defineCopyPipe, defineDatasource, node, t, engine } from "@tinybirdco/sdk";

export const activeUsers = defineDatasource("active_users", {
  schema: {
    user_id: t.string(),
    last_seen: t.dateTime(),
    event_count: t.uint64(),
  },
  engine: engine.mergeTree({
    sortingKey: ["user_id"],
  }),
});

export const activeUsersCopy = defineCopyPipe("active_users_copy", {
  description: "Users active in the last 24 hours",
  datasource: activeUsers,
  copy_schedule: "0 * * * *", // Hourly
  copy_mode: "replace", // Always show current state
  nodes: [
    node({
      name: "active_24h",
      sql: `
        SELECT
          user_id,
          max(timestamp) AS last_seen,
          count() AS event_count
        FROM events
        WHERE timestamp >= now() - interval 24 hour
        GROUP BY user_id
      `,
    }),
  ],
});

Best Practices

1

Choose the right mode

Use append for historical snapshots, replace for current state reports.
2

Include timestamp in results

Always include a timestamp column (like snapshot_date) when using append mode to track when each snapshot was taken.
3

Schedule during off-peak hours

Run heavy aggregations during low-traffic periods to minimize impact on your analytics.
4

Use date functions consistently

Use functions like yesterday(), today(), toMonday() for consistent time windows.
5

Test with @on-demand first

Test your copy pipe logic with @on-demand before setting up a schedule.

Execution

Copy pipes can be triggered:
  1. Automatically - Based on the copy_schedule cron expression
  2. Manually - Via the Tinybird UI or API
  3. On-demand - Using @on-demand schedule for manual-only execution
Monitor copy pipe execution in the Tinybird dashboard to track run history and troubleshoot failures.

Next Steps

Materialized Views

Learn about real-time aggregations

Sink Pipes

Export data to external systems

Build docs developers (and LLMs) love