Skip to main content
Iceberg is designed for massive scale - production deployments manage single tables containing tens of petabytes of data. Despite this scale, even multi-petabyte tables can be efficiently queried from a single node without requiring a distributed SQL engine for metadata operations.

Scale Without Compromise

Iceberg tables scale to production workloads with:
  • Tens of petabytes in a single table
  • Billions of files tracked efficiently
  • Millions of partitions without performance degradation
  • Single-node planning even at massive scale

Real-World Scale

Iceberg is used in production for:
  • Netflix: Petabyte-scale data lakehouse with thousands of tables
  • Apple: Exabyte-scale analytics platform
  • Adobe: Multi-petabyte event data processing
  • LinkedIn: Large-scale data warehouse migration from Hive
These deployments demonstrate Iceberg’s ability to handle:
  • High-volume streaming ingestion (millions of events/second)
  • Large batch processing (petabytes per job)
  • Interactive queries (sub-second to minutes)
  • Concurrent readers and writers (hundreds of simultaneous operations)

Scan Planning Performance

Scan planning is the process of determining which data files to read for a query. This is where Iceberg excels.

The Traditional Problem

Hive tables require O(n) remote calls for planning:
Plan Hive query:
1. List all partitions from metastore      [1 RPC]
2. For each partition (10,000):
   - List files in partition directory    [10,000 RPCs]
3. Filter files in application

Total: 10,001 remote calls for 10K partitions
Time: Minutes for large tables
This creates bottlenecks:
  • Metastore becomes overwhelmed
  • Planning time grows with table size
  • Can’t scale to millions of partitions

Iceberg’s O(1) Planning

Iceberg uses O(1) remote calls regardless of table size:
Plan Iceberg query:
1. Read table metadata file              [1 RPC]
2. Read manifest list                    [1 RPC]
3. Filter manifests using partition stats (in memory)
4. Read relevant manifest files          [~10 RPCs for filtered manifests]
5. Filter files using column stats       (in memory)

Total: ~12 remote calls for any table size
Time: Seconds even for petabyte tables

Two-Level Metadata Filtering

Iceberg uses hierarchical metadata for aggressive pruning:

Level 1: Manifest List Filtering

Manifest list contains min/max values for each partition field:
{
  "manifest-path": "manifest-1.avro",
  "partition-summaries": [
    {
      "contains-null": false,
      "lower-bound": 18718,  // 2024-03-01 in days
      "upper-bound": 18748   // 2024-03-31 in days
    }
  ],
  "added-files-count": 1500,
  "existing-files-count": 0
}
Query example:
SELECT * FROM events
WHERE event_date = DATE '2024-02-15';
Iceberg:
  1. Converts predicate: event_date = 2024-02-15days(event_time) = 18707
  2. Checks manifest bounds: 18707 < 18718 (lower bound)
  3. Skips entire manifest without reading it
Result: 1,500 files pruned without reading the manifest file.

Level 2: Manifest File Filtering

Manifest files contain per-file statistics:
{
  "data-file": {
    "file-path": "s3://bucket/db/table/data/file-123.parquet",
    "partition": {"event_date_day": 18718},
    "record-count": 1000000,
    "column-sizes": {1: 15000000, 2: 8000000},
    "value-counts": {1: 1000000, 2: 950000},
    "null-value-counts": {1: 0, 2: 50000},
    "lower-bounds": {
      1: 100,        // min user_id
      2: "2024-03-01 00:00:00"  // min timestamp
    },
    "upper-bounds": {
      1: 999999,     // max user_id  
      2: "2024-03-01 23:59:59"  // max timestamp
    }
  }
}
Query example:
SELECT * FROM events  
WHERE event_date = DATE '2024-03-01'
  AND user_id = 500000;
Iceberg:
  1. Partition filter: Reads only manifests for 2024-03-01
  2. Column bounds: Checks 500000 >= lower_bound AND 500000 <= upper_bound
  3. Scans only files where user_id range includes 500000
Result: Out of 1,000 files for 2024-03-01, only ~10 files scanned.

Data Filtering with Column Statistics

Manifest statistics enable column-level pruning:
-- Query
SELECT * FROM orders WHERE order_id = 12345;

-- Pruning
IF order_id = 12345 NOT BETWEEN lower_bound AND upper_bound:
  SKIP FILE

Performance Impact

Real-world query on 100TB table: Without column statistics (full scan):
  • Read: 100TB
  • Files scanned: 100,000
  • Time: Hours
With partition pruning only:
  • Read: 10TB (90% reduction)
  • Files scanned: 10,000
  • Time: 30-60 minutes
With partition + column statistics:
  • Read: 100GB (99.9% reduction)
  • Files scanned: 100
  • Time: Seconds to minutes
Iceberg’s column statistics provide 10x performance improvement over partition-only filtering.

Distributed Planning

Iceberg pushes file pruning to query engines, removing metastore bottlenecks:

Traditional Centralized Planning

Hive Query Planning:
1. SQL submitted to Hive metastore
2. Metastore lists partitions           [Bottleneck]
3. Metastore lists files per partition  [Bottleneck]  
4. Metastore filters files              [Bottleneck]
5. Return file list to query engine
6. Query engine distributes work

Metastore is a single point of contention

Iceberg Distributed Planning

Iceberg Query Planning:
1. SQL submitted to query engine
2. Engine reads metadata file           [Catalog: 1 RPC]
3. Engine reads manifest list           [Storage: 1 RPC]
4. Engine DISTRIBUTES manifests to workers
5. Each worker:
   - Reads assigned manifest files      [Parallel]
   - Filters files using predicates     [Parallel]
   - Plans local scan tasks             [Parallel]
6. Workers scan assigned files

Catalog touched once; planning scales horizontally
Benefits:

No Metastore Bottleneck

Catalog only provides metadata file location. No file listing or filtering.

Horizontal Scaling

Planning parallelizes across workers. More workers = faster planning.

Metadata Locality

Workers read manifests from object storage in parallel, not from a central server.

Better Statistics

Column statistics available for cost-based optimization and better execution plans.

Optimizations for Common Patterns

Time-Series Queries

Time-based filtering is extremely efficient:
-- Query last 7 days
SELECT count(*) FROM events
WHERE event_time >= current_timestamp - INTERVAL '7' DAY;
Optimizations:
  1. Partition by time (days/hours) for coarse pruning
  2. Manifest list filtering eliminates old manifests
  3. Column bounds on event_time eliminate old files
  4. Sorted data (optional) enables early termination
Result: Only read files from the last 7 days, skip everything else.

High-Cardinality Filters

Filters on high-cardinality columns (user_id, device_id) benefit from column statistics:
SELECT * FROM events WHERE user_id = 123456;
Optimizations:
  1. Bucket partition on user_id reduces search space
  2. Lower/upper bounds filter files without the user_id
  3. Bloom filters (future enhancement) for exact membership testing

Join Optimization

Iceberg provides metadata for cost-based optimization:
SELECT * 
FROM large_table l
JOIN small_table s ON l.id = s.id
WHERE l.date = CURRENT_DATE;
Optimizers use:
  • Record counts from manifest metadata
  • Partition statistics for partition pruning
  • Column bounds for bloom filter joins
  • File size statistics for task scheduling

Partitioning Best Practices

Partitioning strategy significantly impacts performance:

Right-Sizing Partitions

Too small (< 100MB):
  • Too many files = metadata overhead
  • Poor read performance (many small I/Os)
  • Slow query planning (many manifests)
Too large (> 1GB):
  • Can’t skip data within file
  • Poor parallelism (fewer tasks)
  • High memory usage per task
Optimal (500MB - 1GB):
  • Good balance of pruning and parallelism
  • Efficient I/O patterns
  • Manageable metadata size
Partition on columns frequently used in WHERE clauses:
-- Good: Time-based partitioning (queries filter by date)
PARTITIONED BY (days(event_time))

-- Good: Geographic partitioning (queries filter by region)  
PARTITIONED BY (region)

-- Bad: Random UUIDs (queries don't filter by UUID)
PARTITIONED BY (event_id)
Don’t create millions of partitions:
-- Bad: Millions of user_id partitions
PARTITIONED BY (user_id)

-- Good: Fixed number of buckets
PARTITIONED BY (bucket(64, user_id))
Start coarse, refine as data grows:
-- Year 1: Small data, monthly partitions
PARTITIONED BY (months(event_time))

-- Year 2: Growing data, evolve to daily
ALTER TABLE events ADD PARTITION FIELD days(event_time);
ALTER TABLE events DROP PARTITION FIELD month_event_time;

-- Year 3: High volume, evolve to hourly  
ALTER TABLE events ADD PARTITION FIELD hours(event_time);
ALTER TABLE events DROP PARTITION FIELD day_event_time;

Multi-Column Partitioning

Combine dimensions for better pruning:
-- Partition by region and day
CREATE TABLE events (
  event_id bigint,
  event_time timestamp,
  region string,
  data string
) USING iceberg
PARTITIONED BY (region, days(event_time));

-- Both filters benefit from partitioning
SELECT * FROM events
WHERE region = 'us'
  AND event_time >= '2024-03-01';
-- Prunes to: region=us AND days(event_time)>=18718
Order matters:
  1. Put highest-selectivity column first (region in this case)
  2. Put time-based column last (for time-series queries)

File Layout Optimization

Sorting Within Files

Declare sort order for better min/max bounds:
-- Sort by commonly filtered column
ALTER TABLE events
WRITE ORDERED BY user_id, event_time;
Benefits:
  • Tighter min/max bounds (better pruning)
  • Clustered data (better compression)
  • Efficient range scans
Example impact:
Unsorted file:
  user_id range: [1, 1000000]  
  Can't prune for user_id = 5000

Sorted file:
  user_id range: [1, 1000]
  Can prune for user_id = 5000 (outside range)

Compaction

Regularly compact small files:
-- Compact small files in a partition
CALL catalog_name.system.rewrite_data_files(
  table => 'db.events',
  strategy => 'binpack',
  where => 'event_date = current_date - 1',
  options => map(
    'target-file-size-bytes', '536870912'  -- 512MB
  )
);
Benefits:
  • Fewer files = less metadata overhead
  • Larger files = more efficient I/O
  • Better compression ratios

Z-Ordering (Clustering)

Co-locate related data using space-filling curves:
-- Cluster by multiple columns
CALL catalog_name.system.rewrite_data_files(
  table => 'db.events',
  strategy => 'sort',
  sort_order => 'zorder(user_id, product_id)',
  where => 'event_date >= current_date - 7'
);
Use for:
  • Multi-dimensional queries
  • Columns with similar cardinality
  • When partition evolution isn’t practical

Metadata Performance

Manifest File Management

Keep manifest files optimized:
-- Rewrite manifests to reduce count
CALL catalog_name.system.rewrite_manifests('db.events');
Indicators you need manifest rewriting:
  • More than 100 manifests per snapshot
  • Many manifests with only “existing” files
  • Slow query planning despite good partitioning

Snapshot Expiration

Regularly expire old snapshots:
-- Expire snapshots older than 7 days
CALL catalog_name.system.expire_snapshots(
  table => 'db.events',
  older_than => TIMESTAMP '2024-02-25 00:00:00',
  retain_last => 5  -- Keep at least 5 snapshots
);
Benefits:
  • Reduces metadata file count
  • Enables cleanup of orphaned data files
  • Faster metadata reads

Read Performance Tuning

Split Size Configuration

Control task parallelism:
# Spark configuration
spark.sql.files.maxPartitionBytes = 134217728  # 128MB splits
spark.sql.files.openCostInBytes = 4194304      # 4MB open cost
Trade-offs:
  • Smaller splits = more parallelism, more overhead
  • Larger splits = less parallelism, less overhead

Vectorized Reads

Enable vectorization for Parquet:
# Spark configuration  
spark.sql.parquet.enableVectorizedReader = true
spark.sql.iceberg.vectorization.enabled = true
Benefits:
  • Process data in batches (1000s of rows at a time)
  • Better CPU cache utilization
  • 5-10x faster for analytical queries

Column Projection

Read only necessary columns:
-- Good: Reads 2 columns
SELECT user_id, event_time FROM events;

-- Bad: Reads all columns
SELECT * FROM events;
Column pruning saves:
  • I/O bandwidth (read less data)
  • Memory (store less data)
  • CPU (deserialize less data)

Write Performance Tuning

Batch Size

Control file size:
# Target file size: 512MB
write.target-file-size-bytes = 536870912

Commit Frequency

Balance freshness vs. overhead:
// Streaming: Commit every 60 seconds
streamingQuery
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start();

// Batch: Commit once per job
batchDF.writeTo("db.events").append();
Trade-offs:
  • Frequent commits = fresher data, more snapshots, more manifests
  • Infrequent commits = less overhead, larger files, stale data

Fanout Writers

Handle high-cardinality partitions:
# Allow writing to many partitions simultaneously  
write.distribution-mode = range  # Or 'hash'
spark.sql.shuffle.partitions = 200

Monitoring and Metrics

Key metrics to monitor:

File Count

Target: 500MB - 1GB per fileMonitor: Files per partition, total filesAction: Compact if too many small files

Manifest Count

Target: < 100 manifests per snapshotMonitor: Manifests per snapshotAction: Rewrite manifests if too many

Snapshot Count

Target: < 100 active snapshotsMonitor: Total snapshots, oldest snapshot ageAction: Expire old snapshots regularly

Planning Time

Target: < 10 seconds for large tablesMonitor: Query planning durationAction: Check manifest count, partition strategy

Performance Comparison

Iceberg vs. Hive

OperationHiveIcebergImprovement
Query planning (10K partitions)5-10 minutes5-10 seconds60-120x
Schema evolutionHours (rewrite data)Seconds (metadata)1000x+
Partition evolutionImpossibleSeconds (metadata)
Concurrent writesLocks/failuresOptimistic concurrency10x+ throughput
Time travelNot supportedInstant (metadata)New capability
Column-level pruningNoYes10x fewer files read

Real-World Example

100TB table, 1 million files, query filtering on timestamp and user_id: Hive:
  • Planning: List 10,000 partitions × 100 files each = 5 minutes
  • Scanning: Read all files in matching partitions = 1TB read
  • Total time: 30 minutes
Iceberg:
  • Planning: Read manifest list + filter manifests = 10 seconds
  • Scanning: Column stats prune to 1,000 matching files = 10GB read
  • Total time: 60 seconds
Result: 30x faster, 100x less data read

Learn More

Partitioning

Learn how to partition for optimal performance

Reliability

Understand how reliability features enable performance

Table Format

Explore the metadata structure that enables fast planning

Build docs developers (and LLMs) love