Skip to main content
Snuba supports real-time query subscriptions that continuously evaluate queries and deliver results when conditions are met. Subscription configuration defines validation rules and processing logic for these real-time queries.

Overview

Subscription configuration controls:
  • Aggregation limits: Maximum number of aggregations per query
  • Disallowed operations: Operations not permitted in subscriptions
  • Validation rules: Custom validators for subscription queries
  • Processing logic: Custom processors for subscription execution
  • Time column requirements: Time-based filtering rules

Configuration Levels

Subscription configuration exists at two levels:

Entity-Level

Defined in entity YAML files. Applies to all subscriptions on that entity.

Storage-Level

Defined in writable storage YAML files. Controls subscription infrastructure.

Entity Subscription Configuration

Schema

subscription_validators
array
Array of validator configurations for subscription queries.
subscription_processors
array
Array of processor configurations for subscription execution.

Basic Example

Entity Subscription Config
version: v1
kind: entity
name: events

# ... other entity configuration ...

subscription_validators:
  - validator: AggregationValidator
    args:
      max_allowed_aggregations: 10
      disallowed_aggregations:
        - having
        - orderby
      required_time_column: timestamp
      allows_group_by_without_condition: true

Subscription Validators

Validators enforce rules on subscription queries:

AggregationValidator

Controls aggregation complexity:
max_allowed_aggregations
integer
Maximum number of aggregation functions allowed per query.
disallowed_aggregations
array
Array of clause types not allowed in subscriptions (e.g., having, orderby).
required_time_column
string
Name of the time column that must be filtered in subscription queries.
allows_group_by_without_condition
boolean
Whether GROUP BY is allowed without WHERE conditions.
Aggregation Validator
subscription_validators:
  - validator: AggregationValidator
    args:
      max_allowed_aggregations: 10
      disallowed_aggregations:
        - having
        - orderby
      required_time_column: timestamp
      allows_group_by_without_condition: true

Custom Validators

You can register custom validators:
Custom Validator
from snuba.datasets.entity_subscriptions.validators import EntitySubscriptionValidator

@EntitySubscriptionValidator.register("CustomSubscriptionValidator")
class CustomSubscriptionValidator(EntitySubscriptionValidator):
    def validate(self, query, entity):
        # Custom validation logic
        if not self._is_valid(query):
            raise InvalidQueryException("Query violates custom rules")
Using Custom Validator
subscription_validators:
  - validator: CustomSubscriptionValidator
    args:
      custom_param: value

Subscription Processors

Processors transform subscription queries before execution:
Subscription Processors
subscription_processors:
  - processor: AddColumnProcessor
    args:
      column_name: organization_id
      
  - processor: MetricsTimeProcessor
    args:
      time_column: timestamp

Common Processors

Automatically adds required columns to subscription queries.
- processor: AddColumnProcessor
  args:
    column_name: project_id
Processes time-based aggregations for metrics subscriptions.
- processor: MetricsTimeProcessor
  args:
    time_column: timestamp
    granularity: 60

Storage Subscription Configuration

Writable storages configure subscription infrastructure:
Storage Subscription Config
version: v1
kind: writable_storage
name: errors

# ... other storage configuration ...

stream_loader:
  processor: ErrorsProcessor
  default_topic: events
  
  # Subscription topics
  subscription_scheduled_topic: scheduled-subscriptions-events
  subscription_result_topic: events-subscription-results
  
  # Subscription scheduling
  subscription_scheduler_mode: partition
  subscription_synchronization_timestamp: received_p99
  subscription_delay_seconds: 30

Subscription Stream Configuration

subscription_scheduled_topic
string
Kafka topic for scheduled subscription executions.
subscription_result_topic
string
Kafka topic where subscription results are published.
subscription_scheduler_mode
string
Scheduler mode: partition (per-partition scheduling) or global (global scheduling).
subscription_synchronization_timestamp
string
Timestamp field for synchronization:
  • orig_message_ts - Use original message timestamp
  • received_p99 - Use 99th percentile of received timestamp
subscription_delay_seconds
integer
Additional delay in seconds before executing subscriptions (0-120).

Complete Examples

Events Entity Subscriptions

events.yaml
version: v1
kind: entity
name: events

schema:
  - name: project_id
    type: UInt
    args:
      size: 64
  - name: timestamp
    type: DateTime
  - name: event_id
    type: UUID
  - name: message
    type: String

required_time_column: timestamp

# ... storage and query configuration ...

subscription_validators:
  - validator: AggregationValidator
    args:
      max_allowed_aggregations: 10
      disallowed_aggregations:
        - having
        - orderby
      required_time_column: timestamp
      allows_group_by_without_condition: true

Errors Storage Subscriptions

errors.yaml
version: v1
kind: writable_storage
name: errors

storage:
  key: errors
  set_key: events

readiness_state: complete

schema:
  columns:
    - name: project_id
      type: UInt
      args:
        size: 64
    - name: timestamp
      type: DateTime
        
  local_table_name: errors_local
  dist_table_name: errors_dist

stream_loader:
  processor: ErrorsProcessor
  default_topic: events
  commit_log_topic: snuba-commit-log
  
  # Subscription configuration
  subscription_scheduler_mode: partition
  subscription_scheduled_topic: scheduled-subscriptions-events
  subscription_result_topic: events-subscription-results
  subscription_synchronization_timestamp: received_p99
  subscription_delay_seconds: 30

Metrics Entity Subscriptions

metrics_counters.yaml
version: v1
kind: entity
name: generic_metrics_counters

schema:
  - name: org_id
    type: UInt
    args:
      size: 64
  - name: project_id
    type: UInt
    args:
      size: 64
  - name: timestamp
    type: DateTime
  - name: metric_id
    type: UInt
    args:
      size: 64
  - name: value
    type: Float
    args:
      size: 64

required_time_column: timestamp

# ... storage configuration ...

subscription_validators:
  - validator: AggregationValidator
    args:
      max_allowed_aggregations: 5
      disallowed_aggregations:
        - having
        - orderby
        - groupby  # No GROUP BY for metrics
      required_time_column: timestamp
      allows_group_by_without_condition: false

subscription_processors:
  - processor: MetricsTimeProcessor
    args:
      time_column: timestamp
      granularity: 60

Subscription Scheduler Modes

Partition Mode

Schedules subscriptions per Kafka partition:
Partition Mode
subscription_scheduler_mode: partition
Advantages:
  • Better parallelism
  • Partition-level ordering guarantees
  • Scales with Kafka partitions
Use when:
  • High subscription volume
  • Partition-level ordering is sufficient
  • Horizontal scaling is needed

Global Mode

Schedules subscriptions globally across all partitions:
Global Mode
subscription_scheduler_mode: global
Advantages:
  • Global ordering guarantees
  • Simpler coordination
  • Better for low-volume subscriptions
Use when:
  • Strict global ordering required
  • Lower subscription volume
  • Simpler deployment preferred

Subscription Delays

Control when subscriptions execute relative to data arrival:
Subscription Timing
subscription_synchronization_timestamp: received_p99
subscription_delay_seconds: 30

Synchronization Timestamps

Use the original message timestamp from the event.Advantages: Reflects actual event timeDisadvantages: Doesn’t account for ingestion delaysUse with: Higher delay_seconds (e.g., 60) to account for ingestion
Use the 99th percentile of message received time.Advantages: Accounts for ingestion delays automaticallyDisadvantages: May lag behind real-time slightlyUse with: Lower delay_seconds (e.g., 30)

Delay Seconds

Additional buffer time before executing subscriptions:
  • Low delay (10-30s): Fast results, may miss late-arriving data
  • Medium delay (30-60s): Balanced, catches most data
  • High delay (60-120s): Catches late data, slower results
The subscription_delay_seconds must be between 0 and 120 seconds. Choose based on your data latency requirements.

Query Examples

Subscription queries follow standard SnQL syntax with validation:
MATCH (events)
SELECT count() AS count
WHERE project_id = 1
  AND timestamp >= toDateTime('2023-01-01 00:00:00')
  AND timestamp < toDateTime('2023-01-01 01:00:00')
GROUP BY tags[environment]

Creating Subscriptions

Create subscriptions via the Snuba API:
Create Subscription
import requests

subscription = {
    "project_id": 1,
    "type": "query",
    "dataset": "events",
    "query": """
        MATCH (events)
        SELECT count() AS count
        WHERE project_id = 1
          AND timestamp >= now() - 3600
          AND timestamp < now()
        GROUP BY tags[environment]
    """,
    "time_window": 60,  # Check every 60 seconds
    "resolution": 60     # 1-minute granularity
}

response = requests.post(
    "http://localhost:1218/subscriptions",
    json=subscription
)

subscription_id = response.json()["subscription_id"]

Best Practices

Limit Complexity

Set reasonable max_allowed_aggregations to prevent expensive subscription queries.

Require Time Filters

Always require time column filters to prevent full table scans.

Use Partition Mode

Prefer partition mode for scalability with high subscription volumes.

Tune Delays

Balance between result freshness and data completeness with appropriate delays.

Monitoring Subscriptions

Monitor subscription health:
  • Scheduled topic lag: Check Kafka lag on scheduled topic
  • Result topic throughput: Monitor result topic message rate
  • Query execution time: Track subscription query performance
  • Validation failures: Monitor validation error rates

Troubleshooting

Symptom: Subscriptions rejected with validation errorsSolutions:
  • Check max_allowed_aggregations limit
  • Verify required_time_column is filtered
  • Remove disallowed clauses (HAVING, ORDER BY)
  • Ensure GROUP BY has WHERE conditions if required
Symptom: Subscription results arrive later than expectedSolutions:
  • Reduce subscription_delay_seconds
  • Check Kafka consumer lag
  • Verify subscription_synchronization_timestamp setting
  • Monitor query execution times
Symptom: Subscriptions miss some eventsSolutions:
  • Increase subscription_delay_seconds
  • Use received_p99 synchronization timestamp
  • Check Kafka retention settings
  • Verify partition assignment

Entities

Configure entity-level subscription validators

Storages

Set up storage-level subscription infrastructure

Overview

Configuration system overview

Build docs developers (and LLMs) love