Data Pipelines let you transform events as they arrive and route them to external destinations. Write custom transformation logic in Hog, filter events, and send data to webhooks, databases, and third-party tools.
Overview
Data Pipelines provide:
Real-time transformations - Modify events as theyโre ingested
Event filtering - Route specific events to destinations
Hog functions - Write custom logic in PostHogโs scripting language
Destinations - Send data to webhooks, Slack, databases, and more
Error handling - Automatic retries and degradation states
Transformations modify events before theyโre stored. Use them to:
Enrich events with external data
Normalize property formats
Filter out PII
Add computed properties
Write Hog code
Transformations use Hog, PostHogโs event processing language: // Normalize email addresses
if (event.properties.email) {
event.properties.email := lower(trim(event.properties.email))
}
// Add computed property
if (event.properties.revenue) {
event.properties.revenue_usd := event.properties.revenue * 1.2
}
// Filter out test users
if (event.properties.email like '%@test.com') {
return null // Drop the event
}
return event
Set filters
Only process specific events: {
"events" : [
{ "id" : "$pageview" , "type" : "events" , "name" : "$pageview" },
{ "id" : "purchase" , "type" : "events" , "name" : "purchase" }
],
"properties" : [
{
"key" : "$current_url" ,
"type" : "event" ,
"value" : "/checkout" ,
"operator" : "icontains"
}
]
}
Deploy
Transformations apply immediately to new events. Existing events arenโt modified.
PII Filtering
Property Enrichment
Data Normalization
External API Enrichment
// Remove sensitive data
if (event.properties.credit_card) {
event.properties.credit_card := 'REDACTED'
}
if (event.properties.ssn) {
delete event.properties.ssn
}
return event
Destinations
Destinations send event data to external systems. PostHog supports:
Webhooks - POST events to any HTTP endpoint
Message queues - Kafka, RabbitMQ, AWS SQS
Data warehouses - Snowflake, BigQuery, Redshift
Tools - Slack, Discord, customer.io, Amplitude
Webhook Destination
Send events to a webhook:
let payload := {
event: event.event,
distinct_id: event.distinct_id,
properties: event.properties,
timestamp: event.timestamp
}
fetch(inputs.webhook_url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': f'Bearer {inputs.api_token}'
},
body: payload
})
Destinations run asynchronously. They donโt block event ingestion, and failures
are automatically retried with exponential backoff.
Slack Notifications
Send alerts to Slack:
if (event.properties.revenue > 1000) {
let message := f'๐ Large purchase: ${event.properties.revenue} from {event.distinct_id}'
fetch(inputs.slack_webhook, {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: {
text: message,
channel: '#sales',
username: 'PostHog Bot'
}
})
}
Database Export
Write to PostgreSQL:
// Export high-value events to database
if (event.event == 'purchase' and event.properties.revenue > 500) {
let query := f'''
INSERT INTO high_value_purchases
(event_id, user_id, revenue, timestamp)
VALUES ('{event.uuid}', '{event.distinct_id}', {event.properties.revenue}, '{event.timestamp}')
'''
// Execute via webhook to database proxy
fetch(inputs.db_proxy_url, {
method: 'POST',
body: {query: query}
})
}
Hog Language Reference
Available Context
Hog functions have access to:
// Event data
event.event // Event name
event.distinct_id // User identifier
event.uuid // Event UUID
event.timestamp // Event timestamp
event.properties // Event properties object
event.person // Person properties (if available)
// Inputs (defined in function config)
inputs.api_key
inputs.webhook_url
inputs.any_custom_input
Built-in Functions
String Functions
JSON Functions
URL Functions
HTTP Functions
lower('Hello') // 'hello'
upper('hello') // 'HELLO'
trim(' text ') // 'text'
split('a,b,c', ',') // ['a', 'b', 'c']
replace('hello', 'l', 'r') // 'herro'
Control Flow
// Conditionals
if (condition) {
// ...
} else if (other_condition) {
// ...
} else {
// ...
}
// Loops
for (let item in array) {
// ...
}
let i := 0
while (i < 10) {
i := i + 1
}
Define configurable inputs for your functions:
[
{
"key" : "api_key" ,
"label" : "API Key" ,
"type" : "string" ,
"required" : true ,
"secret" : true
},
{
"key" : "webhook_url" ,
"label" : "Webhook URL" ,
"type" : "string" ,
"required" : true
},
{
"key" : "threshold" ,
"label" : "Revenue Threshold" ,
"type" : "number" ,
"default" : 100
}
]
Access inputs in your Hog code:
if (event.properties.revenue > inputs.threshold) {
fetch(inputs.webhook_url, {
headers: {'Authorization': f'Bearer {inputs.api_key}'},
// ...
})
}
Error Handling
Automatic Retries
Failed destinations are automatically retried with exponential backoff:
First retry: 1 minute
Second retry: 5 minutes
Third retry: 15 minutes
Subsequent retries: 30 minutes
Degraded State
Functions enter degraded state after repeated failures:
GET / api / projects / {project_id} / warehouse / data_health_issues /
{
"results" : [
{
"id" : "hog-fn-123" ,
"name" : "Slack Notifier" ,
"type" : "destination" ,
"status" : "degraded" ,
"error" : "HTTP 429: Rate limit exceeded"
}
]
}
Error Logging
Log errors for debugging:
try {
let response := fetch(inputs.webhook_url, {body: event})
if (response.status >= 400) {
print(f'Webhook failed: {response.status} - {response.body}')
}
} catch (error) {
print(f'Error: {error}')
}
Monitoring
Function Metrics
Track function performance:
Execution count - Total invocations
Success rate - Percentage of successful runs
Average latency - Time spent processing
Error rate - Failed invocations
Health Status
Functions have health states:
Active - Running normally
Degraded - Experiencing errors but still running
Disabled - Automatically disabled after sustained failures
Forcefully disabled - Manually disabled by admin
Best Practices
Keep Transformations Fast Transformations run synchronously during ingestion. Avoid slow operations like
external API calls. If needed, use destinations for async processing.
Use Filters Wisely Apply specific event filters to avoid running functions unnecessarily. This
reduces costs and improves performance.
Handle Failures Gracefully Use try/catch blocks and validate data before processing. Failed transformations
can drop events, while failed destinations only lose the external delivery.
Test Before Deploying Test Hog functions with sample events before enabling. Use print() statements
for debugging during development.
Limitations
Transformation limits :
Max execution time: 30 seconds
Max memory: 128 MB
No file system access
Limited external API calls (use destinations for heavy I/O)
Destination limits :
Max retries: 10 attempts over 24 hours
Functions disabled after 100 consecutive failures
Rate limits depend on external service