Skip to main content
The Query Builder API allows you to construct and execute queries programmatically without writing SnQL strings.

Building Queries

Snuba provides a programmatic query builder for constructing queries:
from snuba.query.logical import Query
from snuba.query.data_source.simple import Entity
from snuba.query.expressions import Column, FunctionCall, Literal
from snuba.query.conditions import binary_condition, ConditionFunctions
from snuba.datasets.entities.entity_key import EntityKey
from datetime import datetime

# Create a query
query = Query(
    from_clause=Entity(EntityKey.EVENTS, None),
    selected_columns=[
        Column("event_id", None, "event_id"),
        Column("project_id", None, "project_id"),
        Column("timestamp", None, "timestamp"),
    ],
    condition=binary_condition(
        ConditionFunctions.AND,
        binary_condition(
            ConditionFunctions.EQ,
            Column("project_id", None, "project_id"),
            Literal(None, 1)
        ),
        binary_condition(
            ConditionFunctions.GTE,
            Column("timestamp", None, "timestamp"),
            Literal(None, datetime(2024, 1, 1))
        )
    ),
    limit=100
)
Source: snuba/query/logical.py

Query Class

The Query class represents a logical query:
from snuba.query.logical import Query
from snuba.query.data_source.simple import Entity
from snuba.datasets.entities.entity_key import EntityKey

query = Query(
    from_clause=Entity(EntityKey.EVENTS, None),
    selected_columns=[...],
    condition=None,
    groupby=[],
    having=None,
    order_by=[],
    limit=None,
    offset=0,
    limitby=None,
    totals=False,
    granularity=None,
)

Query Parameters

from_clause
Entity
required
The entity or data source to query
selected_columns
Sequence[Expression]
required
List of columns/expressions to select
condition
Optional[Expression]
WHERE clause condition
groupby
Optional[Sequence[Expression]]
GROUP BY expressions
having
Optional[Expression]
HAVING clause condition
order_by
Optional[Sequence[OrderBy]]
ORDER BY clauses
limit
Optional[int]
LIMIT value
offset
int
OFFSET value (default: 0)
totals
bool
Include totals row (default: False)

Expressions

Expressions represent query components:

Column

from snuba.query.expressions import Column

# Simple column
event_id = Column("event_id", None, "event_id")

# Aliased column
count_col = Column("count", "event_count", "count")

# Column with table alias
aliased = Column("project_id", "e", "project_id")

Literal

from snuba.query.expressions import Literal
from datetime import datetime

# Integer literal
project_id = Literal(None, 1)

# String literal
level = Literal(None, "error")

# DateTime literal
start_time = Literal(None, datetime(2024, 1, 1))

# Null literal
null_val = Literal(None, None)

Function Call

from snuba.query.expressions import FunctionCall, Column, Literal

# Count function
count = FunctionCall(
    "count",
    "count",
    tuple()
)

# Count distinct
count_distinct = FunctionCall(
    "uniq",
    "count_distinct_users",
    (Column("user_id", None, "user_id"),)
)

# Date truncation
trunc_timestamp = FunctionCall(
    "toStartOfHour",
    "hour",
    (Column("timestamp", None, "timestamp"),)
)

Conditions

Build WHERE clause conditions:
from snuba.query.conditions import (
    binary_condition,
    ConditionFunctions,
    combine_and_conditions,
    combine_or_conditions,
)
from snuba.query.expressions import Column, Literal

# Simple equality
eq_condition = binary_condition(
    ConditionFunctions.EQ,
    Column("project_id", None, "project_id"),
    Literal(None, 1)
)

# Greater than or equal
gte_condition = binary_condition(
    ConditionFunctions.GTE,
    Column("timestamp", None, "timestamp"),
    Literal(None, datetime(2024, 1, 1))
)

# IN clause
in_condition = binary_condition(
    ConditionFunctions.IN,
    Column("project_id", None, "project_id"),
    Literal(None, [1, 2, 3])
)

# Combine conditions with AND
and_condition = combine_and_conditions([
    eq_condition,
    gte_condition,
    in_condition
])

# Combine conditions with OR
or_condition = combine_or_conditions([
    eq_condition,
    gte_condition
])
Source: snuba/query/conditions.py

Available Condition Functions

ConditionFunctions.EQ
enum
Equality: =
ConditionFunctions.NEQ
enum
Not equal: !=
ConditionFunctions.GT
enum
Greater than: >
ConditionFunctions.GTE
enum
Greater than or equal: >=
ConditionFunctions.LT
enum
Less than: <
ConditionFunctions.LTE
enum
Less than or equal: <=
ConditionFunctions.IN
enum
In list: IN
ConditionFunctions.NOT_IN
enum
Not in list: NOT IN
ConditionFunctions.LIKE
enum
Pattern match: LIKE

Executing Queries

Execute queries using the run_query function:
from snuba.datasets.factory import get_dataset
from snuba.request import Request
from snuba.query.query_settings import HTTPQuerySettings
from snuba.attribution.attribution_info import AttributionInfo
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
import uuid

# Build your query
query = Query(...)

# Create attribution info
attribution_info = AttributionInfo(
    tenant_ids={"organization_id": 1, "referrer": "my_service"},
    referrer="my_service",
    app_id="my_app",
    parent_api="api"
)

# Create request
request = Request(
    id=uuid.uuid4(),
    original_body={},
    query=query,
    query_settings=HTTPQuerySettings(),
    attribution_info=attribution_info
)

# Get dataset
dataset = get_dataset("events")

# Create timer
timer = Timer("query")

# Execute query
result = run_query(
    dataset=dataset,
    request=request,
    timer=timer,
    robust=False
)

# Access results
for row in result.result["data"]:
    print(row)

# Access metadata
print(f"SQL: {result.extra['sql']}")
print(f"Stats: {result.extra['stats']}")
Source: snuba/web/query.py:69

Query Settings

Configure query execution:
from snuba.query.query_settings import HTTPQuerySettings

settings = HTTPQuerySettings(
    turbo=False,
    consistent=False,
    debug=True,
    dry_run=False
)

# Set custom clickhouse settings
settings.set_resource_quota(ResourceQuota(max_threads=4))
settings.push_clickhouse_setting("max_execution_time", 30)
Source: snuba/query/query_settings.py

HTTPQuerySettings Methods

get_turbo()
method
Check if turbo mode is enabledReturns: bool
get_consistent()
method
Check if consistent reads are enabledReturns: bool
get_debug()
method
Check if debug mode is enabledReturns: bool
get_dry_run()
method
Check if dry run mode is enabledReturns: bool
set_resource_quota()
method
Set resource quota for queryParameters:
  • quota (ResourceQuota): Resource quota limits
push_clickhouse_setting()
method
Add a ClickHouse setting overrideParameters:
  • key (str): Setting name
  • value (Any): Setting value

Complete Example

Here’s a complete example building and executing a query:
from snuba.datasets.factory import get_dataset
from snuba.datasets.entities.entity_key import EntityKey
from snuba.query.logical import Query
from snuba.query.data_source.simple import Entity
from snuba.query.expressions import Column, FunctionCall, Literal
from snuba.query.conditions import binary_condition, combine_and_conditions, ConditionFunctions
from snuba.request import Request
from snuba.query.query_settings import HTTPQuerySettings
from snuba.attribution.attribution_info import AttributionInfo
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
from datetime import datetime, timedelta
import uuid

# Build query: Count errors by project in last hour
query = Query(
    from_clause=Entity(EntityKey.EVENTS, None),
    selected_columns=[
        Column("project_id", None, "project_id"),
        FunctionCall(
            "count",
            "error_count",
            tuple()
        )
    ],
    condition=combine_and_conditions([
        binary_condition(
            ConditionFunctions.EQ,
            Column("level", None, "level"),
            Literal(None, "error")
        ),
        binary_condition(
            ConditionFunctions.GTE,
            Column("timestamp", None, "timestamp"),
            Literal(None, datetime.now() - timedelta(hours=1))
        ),
        binary_condition(
            ConditionFunctions.LT,
            Column("timestamp", None, "timestamp"),
            Literal(None, datetime.now())
        )
    ]),
    groupby=[
        Column("project_id", None, "project_id")
    ],
    order_by=[
        OrderBy(
            expression=FunctionCall("count", "error_count", tuple()),
            direction=OrderByDirection.DESC
        )
    ],
    limit=10
)

# Create request
attribution_info = AttributionInfo(
    tenant_ids={"organization_id": 1, "referrer": "error_monitor"},
    referrer="error_monitor",
    app_id="monitoring",
    parent_api="api"
)

request = Request(
    id=uuid.uuid4(),
    original_body={},
    query=query,
    query_settings=HTTPQuerySettings(debug=True),
    attribution_info=attribution_info
)

# Execute
dataset = get_dataset("events")
timer = Timer("error_count_query")

try:
    result = run_query(dataset, request, timer)
    
    print("Top 10 projects by error count:")
    for row in result.result["data"]:
        print(f"  Project {row['project_id']}: {row['error_count']} errors")
    
    print(f"\nQuery took {timer.for_json()['duration_ms']}ms")
    print(f"SQL: {result.extra['sql']}")
    
except Exception as e:
    print(f"Query failed: {e}")

SnQL Parsing

You can also parse SnQL strings into Query objects:
from snuba.request.validation import parse_snql_query

snql = """
MATCH (events)
SELECT event_id, project_id, timestamp
WHERE project_id = 1
  AND timestamp >= toDateTime('2024-01-01T00:00:00')
  AND timestamp < toDateTime('2024-01-02T00:00:00')
LIMIT 100
"""

query = parse_snql_query(snql, get_dataset("events"))
print(query)
Source: snuba/request/validation.py:24

Datasets

Work with datasets and entities

Processors

Query processing pipeline

Build docs developers (and LLMs) love