Skip to main content
Query processors transform and validate queries as they flow through the execution pipeline.

Overview

Snuba’s query pipeline consists of multiple stages, each with its own processors:

Processor Interface

All processors implement a common interface:
from abc import ABC, abstractmethod
from snuba.query import ProcessableQuery

class QueryProcessor(ABC):
    @abstractmethod
    def process_query(
        self,
        query: ProcessableQuery,
        query_settings: QuerySettings
    ) -> None:
        """
        Process and transform the query in-place.
        
        Args:
            query: The query to process
            query_settings: Query execution settings
        """
        raise NotImplementedError

Entity Processing Stage

Entity processors run after query parsing, before storage selection.
from snuba.pipeline.stages.query_processing import EntityProcessingStage
from snuba.pipeline.query_pipeline import QueryPipelineResult
from snuba.request import Request
from snuba.utils.metrics.timer import Timer

# Execute entity processing stage
stage = EntityProcessingStage()
result = stage.execute(
    QueryPipelineResult(
        data=request,
        query_settings=request.query_settings,
        timer=timer,
        error=None
    )
)
Source: snuba/pipeline/stages/query_processing.py:14

Common Entity Processors

Validates and processes time range queries:
  • Ensures required time columns are present
  • Validates time range is reasonable
  • Adds time-based optimizations
from snuba.datasets.processors.timeseries_processor import TimeSeriesProcessor

processor = TimeSeriesProcessor()
processor.process_query(query, query_settings)
Enforces project-level rate limits:
from snuba.datasets.processors.project_rate_limiter import ProjectRateLimiter

processor = ProjectRateLimiter()
processor.process_query(query, query_settings)
Optimizes queries using ClickHouse PREWHERE:
  • Moves selective conditions to PREWHERE
  • Improves query performance
from snuba.query.processors.prewhere import PrewhereProcessor

processor = PrewhereProcessor()
processor.process_query(query, query_settings)

Storage Processing Stage

Storage processors run after storage selection, before query execution.
from snuba.pipeline.stages.query_processing import StorageProcessingStage

# Execute storage processing stage
stage = StorageProcessingStage()
result = stage.execute(clickhouse_query)
Source: snuba/pipeline/stages/query_processing.py:16

Common Storage Processors

Optimizes column mapping and projection:
from snuba.datasets.processors.mapping_optimizer import MappingOptimizer

processor = MappingOptimizer(column_mapping)
processor.process_query(query, query_settings)
Enforces table-level rate limits:
from snuba.datasets.processors.table_rate_limiter import TableRateLimiter

processor = TableRateLimiter()
processor.process_query(query, query_settings)
Enforces consistency settings:
from snuba.datasets.processors.consistency import ConsistencyEnforcer

processor = ConsistencyEnforcer()
processor.process_query(query, query_settings)

Creating Custom Processors

You can create custom processors for specific use cases:
from snuba.query.processors import QueryProcessor
from snuba.query import ProcessableQuery
from snuba.query.query_settings import QuerySettings
from snuba.query.expressions import Column, Literal
from snuba.query.conditions import binary_condition, ConditionFunctions

class CustomProjectFilter(QueryProcessor):
    """
    Automatically adds project_id filter to all queries.
    """
    
    def __init__(self, allowed_projects: list[int]):
        self.allowed_projects = allowed_projects
    
    def process_query(
        self,
        query: ProcessableQuery,
        query_settings: QuerySettings
    ) -> None:
        # Add project filter condition
        project_condition = binary_condition(
            ConditionFunctions.IN,
            Column("project_id", None, "project_id"),
            Literal(None, self.allowed_projects)
        )
        
        # Combine with existing condition
        existing = query.get_condition()
        if existing:
            from snuba.query.conditions import combine_and_conditions
            new_condition = combine_and_conditions([existing, project_condition])
        else:
            new_condition = project_condition
        
        query.set_ast_condition(new_condition)

# Usage
processor = CustomProjectFilter(allowed_projects=[1, 2, 3])
processor.process_query(query, query_settings)

Processor Examples

Add Default Conditions

from snuba.query.processors import QueryProcessor
from snuba.query.conditions import binary_condition, ConditionFunctions
from snuba.query.expressions import Column, Literal

class DefaultConditionsProcessor(QueryProcessor):
    def process_query(self, query, query_settings):
        # Add default deleted = 0 condition
        deleted_condition = binary_condition(
            ConditionFunctions.EQ,
            Column("deleted", None, "deleted"),
            Literal(None, 0)
        )
        
        existing = query.get_condition()
        if existing:
            from snuba.query.conditions import BooleanFunctions
            new_condition = binary_condition(
                BooleanFunctions.AND,
                existing,
                deleted_condition
            )
        else:
            new_condition = deleted_condition
        
        query.set_ast_condition(new_condition)

Transform Column Names

from snuba.query.processors import QueryProcessor
from snuba.query.expressions import Column

class ColumnRenameProcessor(QueryProcessor):
    def __init__(self, mapping: dict[str, str]):
        self.mapping = mapping
    
    def process_query(self, query, query_settings):
        # Walk through all expressions and rename columns
        def rename_visitor(expr):
            if isinstance(expr, Column):
                old_name = expr.column_name
                if old_name in self.mapping:
                    return Column(
                        self.mapping[old_name],
                        expr.table_name,
                        self.mapping[old_name]
                    )
            return expr
        
        # Apply to selected columns
        query.set_selected_columns([
            rename_visitor(col) for col in query.get_selected_columns()
        ])
        
        # Apply to conditions
        condition = query.get_condition()
        if condition:
            query.set_ast_condition(rename_visitor(condition))

# Usage
processor = ColumnRenameProcessor({
    "old_column": "new_column",
    "deprecated_field": "current_field"
})

Add Query Metadata

from snuba.query.processors import QueryProcessor

class MetadataProcessor(QueryProcessor):
    def process_query(self, query, query_settings):
        # Add custom metadata to query
        query.set_experiment("custom_processor", True)
        query.set_experiment("processor_version", "1.0")
        
        # Log processing
        import logging
        logger = logging.getLogger(__name__)
        logger.info(f"Processing query {query}")

Subscription Processors

Subscription-specific processors handle recurring queries:
from snuba.datasets.entity_subscriptions.processors import SubscriptionProcessor

class CustomSubscriptionProcessor(SubscriptionProcessor):
    def process(
        self,
        query: ProcessableQuery,
        metadata: Mapping[str, Any],
        offset: Optional[int]
    ) -> None:
        """
        Process subscription query.
        
        Args:
            query: The subscription query
            metadata: Subscription metadata
            offset: Kafka offset for deduplication
        """
        # Add subscription-specific logic
        pass
    
    def to_dict(self, metadata: Mapping[str, Any]) -> Mapping[str, Any]:
        """
        Serialize processor state to dict.
        """
        return {"custom_data": metadata.get("custom_field")}
Source: Entity implements get_subscription_processors()

Query Validation

Validators check query constraints:
from snuba.datasets.entity_subscriptions.validators import (
    EntitySubscriptionValidator,
    InvalidSubscriptionError
)

class CustomValidator(EntitySubscriptionValidator):
    def validate(
        self,
        query: ProcessableQuery,
        metadata: Mapping[str, Any]
    ) -> None:
        # Validate query constraints
        selected = query.get_selected_columns()
        if len(selected) > 10:
            raise InvalidSubscriptionError(
                "Too many selected columns (max 10)"
            )
        
        # Validate metadata
        if "required_field" not in metadata:
            raise InvalidSubscriptionError(
                "Missing required_field in metadata"
            )

Processor Registration

Processors are registered with entities and storages:
from snuba.datasets.pluggable_entity import PluggableEntity

entity = PluggableEntity(
    entity_key=EntityKey.CUSTOM,
    storages=[storage],
    query_processors=[
        CustomProjectFilter([1, 2, 3]),
        DefaultConditionsProcessor(),
    ],
    subscription_processors=[
        CustomSubscriptionProcessor()
    ],
    subscription_validators=[
        CustomValidator()
    ]
)

Pipeline Execution

The full pipeline execution:
from snuba.pipeline.query_pipeline import QueryPipelineResult
from snuba.pipeline.stages.query_processing import (
    EntityProcessingStage,
    StorageProcessingStage
)
from snuba.pipeline.stages.query_execution import ExecutionStage

# Stage 1: Entity processing
entity_result = EntityProcessingStage().execute(
    QueryPipelineResult(
        data=request,
        query_settings=request.query_settings,
        timer=timer,
        error=None
    )
)

# Stage 2: Storage processing
storage_result = StorageProcessingStage().execute(entity_result)

# Stage 3: Execution
final_result = ExecutionStage(
    request.attribution_info,
    query_metadata=query_metadata,
    robust=False
).execute(storage_result)
Source: snuba/web/query.py:36

Debugging Processors

Enable debug mode to see processor effects:
from snuba.query.query_settings import HTTPQuerySettings

settings = HTTPQuerySettings(debug=True)

# Execute query with debug enabled
result = run_query(dataset, request, timer)

# Check generated SQL and stats
print(f"SQL: {result.extra['sql']}")
print(f"Stats: {result.extra['stats']}")
print(f"Experiments: {result.extra['experiments']}")

Datasets

Dataset and entity configuration

Query Builder

Build queries programmatically

Build docs developers (and LLMs) love