Function Signature
function defineCopyPipe <
TSchema extends SchemaDefinition ,
TDatasource extends DatasourceDefinition < TSchema >
>(
name : string ,
options : CopyPipeOptions < TSchema , TDatasource >
) : PipeDefinition < Record < string , never >, DatasourceSchemaToOutput < TSchema >>
Define a Tinybird copy pipe that captures the result of a query at a specific moment in time and writes it into a target datasource. Copy pipes can run on a schedule or be executed on demand.
Unlike materialized views which continuously update as new events are inserted, copy pipes generate a single snapshot at a specific point in time.
Parameters
The copy pipe name. Must start with a letter or underscore and contain only alphanumeric characters and underscores.
options
CopyPipeOptions<TSchema, TDatasource>
required
Copy pipe configuration object. datasource
DatasourceDefinition
required
Target datasource where copied data is written. The output schema is automatically derived from this datasource.
nodes
readonly NodeDefinition[]
required
Array of SQL transformation nodes. At least one node is required.
copy_mode
'append' | 'replace'
default: "'append'"
How data is ingested:
'append': Appends the result to the target datasource (default)
'replace': Completely replaces the destination datasource content on each run
copy_schedule
string
default: "'@on-demand'"
When the copy job runs:
A cron expression (e.g., '0 0 * * *' for daily at midnight UTC)
'@on-demand' for manual execution only (default)
Human-readable description of the copy pipe
tokens
readonly PipeTokenConfig[]
Access tokens for this copy pipe
Return Type
PipeDefinition < Record < string , never > , DatasourceSchemaToOutput < TSchema >>
A pipe definition configured as a copy pipe with full type inference. Note that copy pipes do not accept parameters.
Usage Examples
Scheduled Daily Snapshot
import { defineCopyPipe , defineDatasource , node , t , engine } from '@tinybirdco/sdk'
// Target datasource for daily snapshots
const dailySalesSnapshot = defineDatasource ( 'daily_sales_snapshot' , {
schema: {
snapshot_date: t . date (),
country: t . string (),
total_sales: t . uint64 (),
},
engine: engine . mergeTree ({
sortingKey: [ 'snapshot_date' , 'country' ],
}),
})
// Copy pipe that runs daily at midnight
export const dailySalesCopy = defineCopyPipe ( 'daily_sales_copy' , {
description: 'Daily snapshot of sales by country' ,
datasource: dailySalesSnapshot ,
copy_schedule: '0 0 * * *' , // Daily at midnight UTC
copy_mode: 'append' ,
nodes: [
node ({
name: 'snapshot' ,
sql: `
SELECT
today() AS snapshot_date,
country,
sum(sales) AS total_sales
FROM sales
WHERE date = today() - 1
GROUP BY country
` ,
}),
],
})
On-Demand Report Generation
import { defineCopyPipe , defineDatasource , node , t , engine } from '@tinybirdco/sdk'
const reportDatasource = defineDatasource ( 'weekly_report' , {
schema: {
generated_at: t . dateTime (),
metric_name: t . string (),
metric_value: t . float64 (),
},
engine: engine . mergeTree ({
sortingKey: [ 'generated_at' , 'metric_name' ],
}),
})
export const manualReport = defineCopyPipe ( 'manual_report' , {
description: 'On-demand report generation' ,
datasource: reportDatasource ,
copy_schedule: '@on-demand' ,
copy_mode: 'replace' , // Replace entire report each time
nodes: [
node ({
name: 'report' ,
sql: `
SELECT
now() AS generated_at,
'total_events' AS metric_name,
count() AS metric_value
FROM events
WHERE timestamp >= now() - interval 7 day
` ,
}),
],
})
Hourly Aggregation
import { defineCopyPipe , node } from '@tinybirdco/sdk'
import { hourlyMetrics } from './datasources'
export const hourlyMetricsCopy = defineCopyPipe ( 'hourly_metrics_copy' , {
datasource: hourlyMetrics ,
copy_schedule: '0 * * * *' , // Every hour at minute 0
copy_mode: 'append' ,
nodes: [
node ({
name: 'aggregate' ,
sql: `
SELECT
toStartOfHour(timestamp) AS hour,
metric_name,
avg(value) AS avg_value,
max(value) AS max_value,
min(value) AS min_value
FROM raw_metrics
WHERE timestamp >= toStartOfHour(now()) - interval 1 hour
AND timestamp < toStartOfHour(now())
GROUP BY hour, metric_name
` ,
}),
],
})
Multi-Node Copy Pipeline
import { defineCopyPipe , node } from '@tinybirdco/sdk'
import { customerSegments } from './datasources'
export const segmentSnapshot = defineCopyPipe ( 'segment_snapshot' , {
datasource: customerSegments ,
copy_schedule: '0 2 * * 0' , // Weekly on Sunday at 2 AM UTC
copy_mode: 'replace' ,
nodes: [
node ({
name: 'user_activity' ,
sql: `
SELECT
user_id,
count() AS event_count,
max(timestamp) AS last_seen
FROM events
WHERE timestamp >= now() - interval 30 day
GROUP BY user_id
` ,
}),
node ({
name: 'segmented' ,
sql: `
SELECT
user_id,
CASE
WHEN event_count >= 50 THEN 'power_user'
WHEN event_count >= 10 THEN 'active'
ELSE 'casual'
END AS segment
FROM user_activity
` ,
}),
],
})
Cron Schedule Examples
Daily at midnight
Hourly
Every 15 minutes
Weekly on Mondays at 3 AM
Monthly on 1st at 4 AM
Every 6 hours
Manual Execution
For copy pipes with copy_schedule: '@on-demand', you can trigger them manually using the Tinybird CLI or API:
tb pipe copy run daily_sales_copy
Copy Mode Comparison
Mode Behavior Use Case appendAdds new rows on each run Historical snapshots, time-series data replaceReplaces all data on each run Latest state, reports, dimensions
defineDatasource Create target datasources
defineMaterializedView Continuous aggregation alternative
definePipe Create transformation pipes
node Create transformation nodes