Skip to main content

Catalogs

Spark uses pluggable table catalogs configured via properties under spark.sql.catalog.

Catalog Types

Iceberg provides two catalog implementations:
ImplementationDescriptionUse Case
SparkCatalogDedicated Iceberg catalogHive Metastore or Hadoop warehouse
SparkSessionCatalogAdds Iceberg support to built-in catalogMixed Iceberg and non-Iceberg tables

Hive Metastore Catalog

Configure a Hive-based catalog:
spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type=hive
spark.sql.catalog.hive_prod.uri=thrift://metastore-host:port
# Omit uri to use hive.metastore.uris from hive-site.xml

REST Catalog

Configure a REST catalog:
spark.sql.catalog.rest_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest_prod.type=rest
spark.sql.catalog.rest_prod.uri=http://localhost:8080

Hadoop Catalog

Configure a directory-based catalog:
spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type=hadoop
spark.sql.catalog.hadoop_prod.warehouse=hdfs://nn:8020/warehouse/path

Catalog Configuration

Common configuration properties:
PropertyValuesDescription
spark.sql.catalog.<name>.typehive, hadoop, rest, glue, jdbc, nessieCatalog implementation type
spark.sql.catalog.<name>.catalog-implClass nameCustom catalog implementation
spark.sql.catalog.<name>.io-implClass nameCustom FileIO implementation
spark.sql.catalog.<name>.warehousePathWarehouse directory base path
spark.sql.catalog.<name>.uriURIMetastore URI (Hive) or REST URL
spark.sql.catalog.<name>.default-namespaceNamespaceDefault current namespace
spark.sql.catalog.<name>.cache-enabledtrue/falseEnable catalog cache (default: true)
spark.sql.catalog.<name>.cache.expiration-interval-msMillisecondsCache expiration time (default: 30000)

Table Defaults and Overrides

Set default or enforced table properties:
# Default property (can be overridden)
spark.sql.catalog.my_catalog.table-default.write.format.default=orc

# Override property (cannot be overridden)
spark.sql.catalog.my_catalog.table-override.write.metadata.compression-codec=gzip

View Defaults and Overrides

Similar configuration for views:
spark.sql.catalog.my_catalog.view-default.key=value
spark.sql.catalog.my_catalog.view-override.key=value

Using Catalogs

Reference tables with catalog names:
SELECT * FROM hive_prod.db.table;

Replacing the Session Catalog

Add Iceberg support to Spark’s built-in catalog:
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type=hive
This allows using the same Hive Metastore for both Iceberg and non-Iceberg tables. Non-Iceberg tables are handled by the built-in catalog.

Catalog-Specific Hadoop Configuration

Set per-catalog Hadoop properties:
spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint=http://aws-local:9000
spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.access.key=mykey
spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.secret.key=mysecret
Catalog-specific properties take precedence over global spark.hadoop.* properties.

Loading Custom Catalogs

Use a custom catalog implementation:
spark.sql.catalog.custom_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.custom_prod.catalog-impl=com.my.custom.CatalogImpl
spark.sql.catalog.custom_prod.my-additional-catalog-config=my-value

SQL Extensions

Enable Iceberg SQL extensions for advanced features:
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Extensions enable:
  • CALL stored procedures
  • ALTER TABLE ... ADD/DROP PARTITION FIELD
  • ALTER TABLE ... WRITE ORDERED BY
  • ALTER TABLE ... SET IDENTIFIER FIELDS
  • Branching and tagging DDL

Runtime Configuration

Configuration Precedence

Settings are applied in the following order (highest to lowest priority):
  1. DataSource Read/Write Options - .option(...) in code
  2. Spark Session Configuration - spark.conf.set(...) or spark-defaults.conf
  3. Table Properties - ALTER TABLE SET TBLPROPERTIES
  4. Default Value

Spark SQL Options

Global Iceberg behaviors via Spark configuration:
val spark = SparkSession.builder()
  .appName("IcebergExample")
  .config("spark.sql.catalog.my_catalog", 
          "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.extensions", 
          "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  .config("spark.sql.iceberg.vectorization.enabled", "false")
  .getOrCreate()

Common SQL Options

OptionDefaultDescription
spark.sql.iceberg.vectorization.enabledTable defaultEnable vectorized reads
spark.sql.iceberg.parquet.reader-typeICEBERGParquet reader (ICEBERG, COMET)
spark.sql.iceberg.check-nullabilitytrueValidate write schema nullability
spark.sql.iceberg.check-orderingtrueValidate write schema column order
spark.sql.iceberg.aggregate-push-down.enabledtruePush down aggregates (MAX, MIN, COUNT)
spark.sql.iceberg.distribution-modeSee WritesWrite distribution strategy
spark.wap.idnullWrite-Audit-Publish snapshot ID
spark.wap.branchnullWAP branch name
spark.sql.iceberg.compression-codecTable defaultWrite compression codec
spark.sql.iceberg.compression-levelTable defaultCompression level
spark.sql.iceberg.data-planning-modeAUTOData file scan planning (AUTO, LOCAL, DISTRIBUTED)
spark.sql.iceberg.delete-planning-modeAUTODelete file scan planning
spark.sql.iceberg.locality.enabledfalseReport locality for task placement
spark.sql.iceberg.executor-cache.enabledtrueEnable executor-side cache
spark.sql.iceberg.merge-schemafalseEnable schema evolution on write
spark.sql.iceberg.report-column-statstrueReport Puffin statistics to Spark CBO

Read Options

Options for DataFrame reads:
spark.read
    .option("snapshot-id", 10963874102873L)
    .table("catalog.db.table")
OptionDefaultDescription
snapshot-idLatestSnapshot ID to read
as-of-timestampLatestTimestamp in milliseconds
branch-Branch name to read
tag-Tag name to read
split-sizeTable propertyOverride split target size
lookbackTable propertyOverride planning lookback
file-open-costTable propertyOverride file open cost
vectorization-enabledTable propertyEnable vectorized reads
batch-sizeTable propertyVectorization batch size
stream-from-timestamp-Streaming start timestamp
streaming-max-files-per-micro-batchINT_MAXMax files per streaming batch
streaming-max-rows-per-micro-batchINT_MAXSoft max rows per batch

Write Options

Options for DataFrame writes:
df.writeTo("catalog.db.table")
    .option("write-format", "avro")
    .option("target-file-size-bytes", "268435456")
    .option("compression-codec", "zstd")
    .option("snapshot-property.key", "value")
    .append()
OptionDefaultDescription
write-formatTable defaultFile format (parquet, avro, orc)
target-file-size-bytesTable propertyTarget file size
compression-codecTable defaultCompression codec
compression-levelTable defaultCompression level
compression-strategyTable defaultORC compression strategy
distribution-modeSee WritesDistribution mode
fanout-enabledfalseEnable fanout writer
check-nullabilitytrueValidate field nullability
check-orderingtrueValidate column order
isolation-levelnullIsolation level (serializable, snapshot)
validate-from-snapshot-idnullBase snapshot for conflict detection
snapshot-property.<key>-Custom snapshot metadata
delete-granularityfileDelete granularity

Commit Metadata

Add custom metadata to snapshots:
import org.apache.iceberg.spark.CommitMetadata;

Map<String, String> properties = Maps.newHashMap();
properties.put("property_key", "property_value");

CommitMetadata.withCommitProperties(properties,
    () -> {
        spark.sql("DELETE FROM table WHERE id = 1");
        return 0;
    },
    RuntimeException.class);

Next Steps

Getting Started

Set up your first Iceberg table

Write Data

Configure write performance and distribution

Procedures

Use stored procedures for maintenance

Structured Streaming

Configure streaming reads and writes

Build docs developers (and LLMs) love