The Query Builder API allows you to construct and execute queries programmatically without writing SnQL strings.
Building Queries
Snuba provides a programmatic query builder for constructing queries:
from snuba.query.logical import Query
from snuba.query.data_source.simple import Entity
from snuba.query.expressions import Column, FunctionCall, Literal
from snuba.query.conditions import binary_condition, ConditionFunctions
from snuba.datasets.entities.entity_key import EntityKey
from datetime import datetime
# Create a query
query = Query(
from_clause=Entity(EntityKey.EVENTS, None),
selected_columns=[
Column("event_id", None, "event_id"),
Column("project_id", None, "project_id"),
Column("timestamp", None, "timestamp"),
],
condition=binary_condition(
ConditionFunctions.AND,
binary_condition(
ConditionFunctions.EQ,
Column("project_id", None, "project_id"),
Literal(None, 1)
),
binary_condition(
ConditionFunctions.GTE,
Column("timestamp", None, "timestamp"),
Literal(None, datetime(2024, 1, 1))
)
),
limit=100
)
Source: snuba/query/logical.py
Query Class
The Query class represents a logical query:
from snuba.query.logical import Query
from snuba.query.data_source.simple import Entity
from snuba.datasets.entities.entity_key import EntityKey
query = Query(
from_clause=Entity(EntityKey.EVENTS, None),
selected_columns=[...],
condition=None,
groupby=[],
having=None,
order_by=[],
limit=None,
offset=0,
limitby=None,
totals=False,
granularity=None,
)
Query Parameters
The entity or data source to query
selected_columns
Sequence[Expression]
required
List of columns/expressions to select
groupby
Optional[Sequence[Expression]]
GROUP BY expressions
order_by
Optional[Sequence[OrderBy]]
ORDER BY clauses
OFFSET value (default: 0)
Include totals row (default: False)
Expressions
Expressions represent query components:
Column
from snuba.query.expressions import Column
# Simple column
event_id = Column("event_id", None, "event_id")
# Aliased column
count_col = Column("count", "event_count", "count")
# Column with table alias
aliased = Column("project_id", "e", "project_id")
Literal
from snuba.query.expressions import Literal
from datetime import datetime
# Integer literal
project_id = Literal(None, 1)
# String literal
level = Literal(None, "error")
# DateTime literal
start_time = Literal(None, datetime(2024, 1, 1))
# Null literal
null_val = Literal(None, None)
Function Call
from snuba.query.expressions import FunctionCall, Column, Literal
# Count function
count = FunctionCall(
"count",
"count",
tuple()
)
# Count distinct
count_distinct = FunctionCall(
"uniq",
"count_distinct_users",
(Column("user_id", None, "user_id"),)
)
# Date truncation
trunc_timestamp = FunctionCall(
"toStartOfHour",
"hour",
(Column("timestamp", None, "timestamp"),)
)
Conditions
Build WHERE clause conditions:
from snuba.query.conditions import (
binary_condition,
ConditionFunctions,
combine_and_conditions,
combine_or_conditions,
)
from snuba.query.expressions import Column, Literal
# Simple equality
eq_condition = binary_condition(
ConditionFunctions.EQ,
Column("project_id", None, "project_id"),
Literal(None, 1)
)
# Greater than or equal
gte_condition = binary_condition(
ConditionFunctions.GTE,
Column("timestamp", None, "timestamp"),
Literal(None, datetime(2024, 1, 1))
)
# IN clause
in_condition = binary_condition(
ConditionFunctions.IN,
Column("project_id", None, "project_id"),
Literal(None, [1, 2, 3])
)
# Combine conditions with AND
and_condition = combine_and_conditions([
eq_condition,
gte_condition,
in_condition
])
# Combine conditions with OR
or_condition = combine_or_conditions([
eq_condition,
gte_condition
])
Source: snuba/query/conditions.py
Available Condition Functions
Greater than or equal: >=
ConditionFunctions.NOT_IN
Not in list: NOT IN
Executing Queries
Execute queries using the run_query function:
from snuba.datasets.factory import get_dataset
from snuba.request import Request
from snuba.query.query_settings import HTTPQuerySettings
from snuba.attribution.attribution_info import AttributionInfo
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
import uuid
# Build your query
query = Query(...)
# Create attribution info
attribution_info = AttributionInfo(
tenant_ids={"organization_id": 1, "referrer": "my_service"},
referrer="my_service",
app_id="my_app",
parent_api="api"
)
# Create request
request = Request(
id=uuid.uuid4(),
original_body={},
query=query,
query_settings=HTTPQuerySettings(),
attribution_info=attribution_info
)
# Get dataset
dataset = get_dataset("events")
# Create timer
timer = Timer("query")
# Execute query
result = run_query(
dataset=dataset,
request=request,
timer=timer,
robust=False
)
# Access results
for row in result.result["data"]:
print(row)
# Access metadata
print(f"SQL: {result.extra['sql']}")
print(f"Stats: {result.extra['stats']}")
Source: snuba/web/query.py:69
Query Settings
Configure query execution:
from snuba.query.query_settings import HTTPQuerySettings
settings = HTTPQuerySettings(
turbo=False,
consistent=False,
debug=True,
dry_run=False
)
# Set custom clickhouse settings
settings.set_resource_quota(ResourceQuota(max_threads=4))
settings.push_clickhouse_setting("max_execution_time", 30)
Source: snuba/query/query_settings.py
HTTPQuerySettings Methods
Check if turbo mode is enabledReturns: bool
Check if consistent reads are enabledReturns: bool
Check if debug mode is enabledReturns: bool
Check if dry run mode is enabledReturns: bool
Set resource quota for queryParameters:
quota (ResourceQuota): Resource quota limits
push_clickhouse_setting()
Add a ClickHouse setting overrideParameters:
key (str): Setting name
value (Any): Setting value
Complete Example
Here’s a complete example building and executing a query:
from snuba.datasets.factory import get_dataset
from snuba.datasets.entities.entity_key import EntityKey
from snuba.query.logical import Query
from snuba.query.data_source.simple import Entity
from snuba.query.expressions import Column, FunctionCall, Literal
from snuba.query.conditions import binary_condition, combine_and_conditions, ConditionFunctions
from snuba.request import Request
from snuba.query.query_settings import HTTPQuerySettings
from snuba.attribution.attribution_info import AttributionInfo
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
from datetime import datetime, timedelta
import uuid
# Build query: Count errors by project in last hour
query = Query(
from_clause=Entity(EntityKey.EVENTS, None),
selected_columns=[
Column("project_id", None, "project_id"),
FunctionCall(
"count",
"error_count",
tuple()
)
],
condition=combine_and_conditions([
binary_condition(
ConditionFunctions.EQ,
Column("level", None, "level"),
Literal(None, "error")
),
binary_condition(
ConditionFunctions.GTE,
Column("timestamp", None, "timestamp"),
Literal(None, datetime.now() - timedelta(hours=1))
),
binary_condition(
ConditionFunctions.LT,
Column("timestamp", None, "timestamp"),
Literal(None, datetime.now())
)
]),
groupby=[
Column("project_id", None, "project_id")
],
order_by=[
OrderBy(
expression=FunctionCall("count", "error_count", tuple()),
direction=OrderByDirection.DESC
)
],
limit=10
)
# Create request
attribution_info = AttributionInfo(
tenant_ids={"organization_id": 1, "referrer": "error_monitor"},
referrer="error_monitor",
app_id="monitoring",
parent_api="api"
)
request = Request(
id=uuid.uuid4(),
original_body={},
query=query,
query_settings=HTTPQuerySettings(debug=True),
attribution_info=attribution_info
)
# Execute
dataset = get_dataset("events")
timer = Timer("error_count_query")
try:
result = run_query(dataset, request, timer)
print("Top 10 projects by error count:")
for row in result.result["data"]:
print(f" Project {row['project_id']}: {row['error_count']} errors")
print(f"\nQuery took {timer.for_json()['duration_ms']}ms")
print(f"SQL: {result.extra['sql']}")
except Exception as e:
print(f"Query failed: {e}")
SnQL Parsing
You can also parse SnQL strings into Query objects:
from snuba.request.validation import parse_snql_query
snql = """
MATCH (events)
SELECT event_id, project_id, timestamp
WHERE project_id = 1
AND timestamp >= toDateTime('2024-01-01T00:00:00')
AND timestamp < toDateTime('2024-01-02T00:00:00')
LIMIT 100
"""
query = parse_snql_query(snql, get_dataset("events"))
print(query)
Source: snuba/request/validation.py:24
Datasets
Work with datasets and entities
Processors
Query processing pipeline