Skip to main content
Data Pipelines let you transform events as they arrive and route them to external destinations. Write custom transformation logic in Hog, filter events, and send data to webhooks, databases, and third-party tools.

Overview

Data Pipelines provide:
  • Real-time transformations - Modify events as theyโ€™re ingested
  • Event filtering - Route specific events to destinations
  • Hog functions - Write custom logic in PostHogโ€™s scripting language
  • Destinations - Send data to webhooks, Slack, databases, and more
  • Error handling - Automatic retries and degradation states

Transformations

Transformations modify events before theyโ€™re stored. Use them to:
  • Enrich events with external data
  • Normalize property formats
  • Filter out PII
  • Add computed properties

Creating a Transformation

1

Write Hog code

Transformations use Hog, PostHogโ€™s event processing language:
// Normalize email addresses
if (event.properties.email) {
  event.properties.email := lower(trim(event.properties.email))
}

// Add computed property
if (event.properties.revenue) {
  event.properties.revenue_usd := event.properties.revenue * 1.2
}

// Filter out test users
if (event.properties.email like '%@test.com') {
  return null  // Drop the event
}

return event
2

Set filters

Only process specific events:
{
  "events": [
    {"id": "$pageview", "type": "events", "name": "$pageview"},
    {"id": "purchase", "type": "events", "name": "purchase"}
  ],
  "properties": [
    {
      "key": "$current_url",
      "type": "event",
      "value": "/checkout",
      "operator": "icontains"
    }
  ]
}
3

Deploy

Transformations apply immediately to new events. Existing events arenโ€™t modified.

Transformation Examples

// Remove sensitive data
if (event.properties.credit_card) {
  event.properties.credit_card := 'REDACTED'
}

if (event.properties.ssn) {
  delete event.properties.ssn
}

return event

Destinations

Destinations send event data to external systems. PostHog supports:
  • Webhooks - POST events to any HTTP endpoint
  • Message queues - Kafka, RabbitMQ, AWS SQS
  • Data warehouses - Snowflake, BigQuery, Redshift
  • Tools - Slack, Discord, customer.io, Amplitude

Webhook Destination

Send events to a webhook:
let payload := {
  event: event.event,
  distinct_id: event.distinct_id,
  properties: event.properties,
  timestamp: event.timestamp
}

fetch(inputs.webhook_url, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {inputs.api_token}'
  },
  body: payload
})
Destinations run asynchronously. They donโ€™t block event ingestion, and failures are automatically retried with exponential backoff.

Slack Notifications

Send alerts to Slack:
if (event.properties.revenue > 1000) {
  let message := f'๐ŸŽ‰ Large purchase: ${event.properties.revenue} from {event.distinct_id}'
  
  fetch(inputs.slack_webhook, {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: {
      text: message,
      channel: '#sales',
      username: 'PostHog Bot'
    }
  })
}

Database Export

Write to PostgreSQL:
// Export high-value events to database
if (event.event == 'purchase' and event.properties.revenue > 500) {
  let query := f'''
    INSERT INTO high_value_purchases 
    (event_id, user_id, revenue, timestamp)
    VALUES ('{event.uuid}', '{event.distinct_id}', {event.properties.revenue}, '{event.timestamp}')
  '''
  
  // Execute via webhook to database proxy
  fetch(inputs.db_proxy_url, {
    method: 'POST',
    body: {query: query}
  })
}

Hog Language Reference

Available Context

Hog functions have access to:
// Event data
event.event          // Event name
event.distinct_id    // User identifier  
event.uuid          // Event UUID
event.timestamp     // Event timestamp
event.properties    // Event properties object
event.person        // Person properties (if available)

// Inputs (defined in function config)
inputs.api_key
inputs.webhook_url
inputs.any_custom_input

Built-in Functions

lower('Hello')           // 'hello'
upper('hello')          // 'HELLO'
trim('  text  ')        // 'text'
split('a,b,c', ',')     // ['a', 'b', 'c']
replace('hello', 'l', 'r')  // 'herro'

Control Flow

// Conditionals
if (condition) {
  // ...
} else if (other_condition) {
  // ...
} else {
  // ...
}

// Loops
for (let item in array) {
  // ...
}

let i := 0
while (i < 10) {
  i := i + 1
}

Function Inputs

Define configurable inputs for your functions:
[
  {
    "key": "api_key",
    "label": "API Key",
    "type": "string",
    "required": true,
    "secret": true
  },
  {
    "key": "webhook_url",
    "label": "Webhook URL",
    "type": "string",
    "required": true
  },
  {
    "key": "threshold",
    "label": "Revenue Threshold",
    "type": "number",
    "default": 100
  }
]
Access inputs in your Hog code:
if (event.properties.revenue > inputs.threshold) {
  fetch(inputs.webhook_url, {
    headers: {'Authorization': f'Bearer {inputs.api_key}'},
    // ...
  })
}

Error Handling

Automatic Retries

Failed destinations are automatically retried with exponential backoff:
  • First retry: 1 minute
  • Second retry: 5 minutes
  • Third retry: 15 minutes
  • Subsequent retries: 30 minutes

Degraded State

Functions enter degraded state after repeated failures:
GET /api/projects/{project_id}/warehouse/data_health_issues/

{
  "results": [
    {
      "id": "hog-fn-123",
      "name": "Slack Notifier",
      "type": "destination",
      "status": "degraded",
      "error": "HTTP 429: Rate limit exceeded"
    }
  ]
}

Error Logging

Log errors for debugging:
try {
  let response := fetch(inputs.webhook_url, {body: event})
  if (response.status >= 400) {
    print(f'Webhook failed: {response.status} - {response.body}')
  }
} catch (error) {
  print(f'Error: {error}')
}

Monitoring

Function Metrics

Track function performance:
  • Execution count - Total invocations
  • Success rate - Percentage of successful runs
  • Average latency - Time spent processing
  • Error rate - Failed invocations

Health Status

Functions have health states:
  • Active - Running normally
  • Degraded - Experiencing errors but still running
  • Disabled - Automatically disabled after sustained failures
  • Forcefully disabled - Manually disabled by admin

Best Practices

Keep Transformations Fast

Transformations run synchronously during ingestion. Avoid slow operations like external API calls. If needed, use destinations for async processing.

Use Filters Wisely

Apply specific event filters to avoid running functions unnecessarily. This reduces costs and improves performance.

Handle Failures Gracefully

Use try/catch blocks and validate data before processing. Failed transformations can drop events, while failed destinations only lose the external delivery.

Test Before Deploying

Test Hog functions with sample events before enabling. Use print() statements for debugging during development.

Limitations

Transformation limits:
  • Max execution time: 30 seconds
  • Max memory: 128 MB
  • No file system access
  • Limited external API calls (use destinations for heavy I/O)
Destination limits:
  • Max retries: 10 attempts over 24 hours
  • Functions disabled after 100 consecutive failures
  • Rate limits depend on external service

Build docs developers (and LLMs) love