Skip to main content

Google BigQuery

Google BigQuery is a serverless, highly scalable data warehouse that enables super-fast SQL queries using the processing power of Google’s infrastructure.

Install dlt with BigQuery

To use BigQuery as a destination, install dlt with the BigQuery extra:
pip install "dlt[bigquery]"

Quick Start

Here’s a simple example to get you started:
import dlt

# Define your data source
@dlt.resource
def my_data():
    yield {"id": 1, "name": "Alice"}
    yield {"id": 2, "name": "Bob"}

# Create pipeline
pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="bigquery",
    dataset_name="my_dataset"
)

# Run the pipeline
info = pipeline.run(my_data())
print(info)

Configuration

The BigQuery destination can be configured using the following parameters:

Basic Configuration

location
string
default:"US"
The geographic location where the dataset should be created (e.g., “US”, “EU”)
project_id
string
The Google Cloud project ID where data will be loaded. If not specified, uses the project from credentials.
has_case_sensitive_identifiers
bool
default:"true"
Whether the dataset uses case-sensitive identifiers

Advanced Configuration

http_timeout
float
default:"15.0"
Connection timeout for HTTP requests to BigQuery API (in seconds)
file_upload_timeout
float
default:"1800.0"
Timeout for file uploads when loading local files (in seconds)
retry_deadline
float
default:"60.0"
How long to retry operations in case of errors (in seconds)
batch_size
int
default:"500"
Number of rows in streaming insert batch
autodetect_schema
bool
default:"false"
Allow BigQuery to autodetect schemas and create data tables

Authentication

Create a .dlt/secrets.toml file with your service account credentials:
[destination.bigquery]
location = "US"

[destination.bigquery.credentials]
project_id = "your-project-id"
private_key = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n"
client_email = "[email protected]"
1

Create a Google Cloud Project

Go to the Google Cloud Console and create a new project.
2

Create a Service Account

Navigate to IAM & Admin > Service Accounts and create a new service account.
3

Grant Permissions

Add the following roles to your service account:
  • BigQuery Data Editor
  • BigQuery Job User
  • BigQuery Read Session User
4

Download JSON Key

Create a JSON key for the service account and download it.
5

Configure dlt

Copy the project_id, private_key, and client_email from the JSON file to your secrets.toml.

OAuth 2.0 Authentication

You can also use OAuth 2.0 for authentication:
[destination.bigquery]
location = "US"

[destination.bigquery.credentials]
project_id = "your-project-id"
client_id = "your-client-id"
client_secret = "your-client-secret"
refresh_token = "your-refresh-token"

Default Credentials

On Google Cloud environments (Cloud Functions, Composer, Colab), dlt can use default credentials:
[destination.bigquery]
location = "US"

Data Loading

dlt loads data to BigQuery using load jobs that send files from the local filesystem or GCS buckets.

Supported File Formats

pipeline = dlt.pipeline(
    destination="bigquery",
    dataset_name="my_dataset"
)
# JSONL is the default format
BigQuery cannot load JSON columns from Parquet files. If you have JSON columns, either:
  • Use JSONL format
  • Enable autodetect_schema to let BigQuery create RECORD types

Streaming Inserts

For better performance with small batches, you can use streaming inserts:
import dlt

@dlt.resource(write_disposition="append")
def streamed_data():
    yield {"field1": 1, "field2": 2}

# Enable streaming inserts
streamed_data.apply_hints(additional_table_hints={"x-insert-api": "streaming"})

pipeline = dlt.pipeline(destination="bigquery", dataset_name="my_dataset")
pipeline.run(streamed_data())
Streaming inserts only work with write_disposition="append" and data is locked for editing for up to 90 minutes.

Schema Autodetection

You can let BigQuery infer schemas and create nested RECORD types:
import dlt
from dlt.destinations.adapters import bigquery_adapter
import pyarrow.json as paj

@dlt.resource(name="nested_data")
def load_nested():
    with open("data.json", 'rb') as f:
        yield paj.read_json(f)

pipeline = dlt.pipeline("my_pipeline", destination="bigquery")
pipeline.run(
    bigquery_adapter(load_nested(), autodetect_schema=True)
)
Or enable globally:
[destination.bigquery]
autodetect_schema = true

Column Hints

BigQuery supports special column hints for optimization:

Partitioning

Partition tables by date, timestamp, or integer columns:
import dlt
from dlt.destinations.adapters import bigquery_adapter

@dlt.resource
def partitioned_data():
    yield {"timestamp": "2024-01-01T00:00:00Z", "value": 100}

pipeline = dlt.pipeline(destination="bigquery", dataset_name="my_dataset")
pipeline.run(
    bigquery_adapter(
        partitioned_data(),
        partition="timestamp"
    )
)

Clustering

Cluster tables by one or more columns:
import dlt
from dlt.destinations.adapters import bigquery_adapter

@dlt.resource
def clustered_data():
    yield {"user_id": 1, "country": "US", "value": 100}

pipeline = dlt.pipeline(destination="bigquery", dataset_name="my_dataset")
pipeline.run(
    bigquery_adapter(
        clustered_data(),
        cluster=["country", "user_id"]
    )
)

Staging Support

For large datasets, you can stage files in Google Cloud Storage before loading:
import dlt

pipeline = dlt.pipeline(
    destination="bigquery",
    staging="filesystem",  # Use GCS for staging
    dataset_name="my_dataset"
)
Configure the staging bucket in .dlt/secrets.toml:
[destination.filesystem]
bucket_url = "gs://my-bucket/staging"

[destination.filesystem.credentials]
project_id = "your-project-id"
private_key = "..."
client_email = "..."

Write Dispositions

BigQuery supports all write dispositions:
@dlt.resource(write_disposition="append")
def append_data():
    yield {"id": 1, "value": "new"}
With replace strategy set to staging-optimized, tables are cloned from staging:
import dlt

pipeline = dlt.pipeline(
    destination="bigquery",
    dataset_name="my_dataset"
)

@dlt.resource(
    write_disposition="replace",
    table_name="my_table"
)
def my_data():
    yield {"id": 1, "value": "data"}

info = pipeline.run(my_data(), loader_file_format="parquet")

Case Sensitivity

BigQuery uses case-sensitive identifiers by default. Configure case sensitivity:
[destination.bigquery]
has_case_sensitive_identifiers = false  # For case-insensitive datasets
Or let dlt set it automatically:
[destination.bigquery]
should_set_case_sensitivity_on_new_dataset = true

Data Types

BigQuery data type mapping:
dlt TypeBigQuery Type
textSTRING
doubleFLOAT64
boolBOOL
timestampTIMESTAMP
dateDATE
timeTIME
bigintINT64
binaryBYTES
decimalNUMERIC / BIGNUMERIC
jsonJSON

Additional Resources

BigQuery Adapter

Learn about the BigQuery adapter for advanced features

Build docs developers (and LLMs) love