Java API Quickstart
This guide provides a comprehensive introduction to the Apache Iceberg Java API. You’ll learn how to programmatically create and manage Iceberg tables, work with schemas and partitions, and perform read and write operations.
The Java API is the reference implementation for Iceberg. All code examples in this guide use real patterns from the Iceberg source code.
Dependencies
Add the required dependencies to your project:
< dependencies >
<!-- Core Iceberg API -->
< dependency >
< groupId > org.apache.iceberg </ groupId >
< artifactId > iceberg-core </ artifactId >
< version > 1.7.1 </ version >
</ dependency >
<!-- For Parquet file format -->
< dependency >
< groupId > org.apache.iceberg </ groupId >
< artifactId > iceberg-parquet </ artifactId >
< version > 1.7.1 </ version >
</ dependency >
<!-- For Hive Metastore catalog -->
< dependency >
< groupId > org.apache.iceberg </ groupId >
< artifactId > iceberg-hive-metastore </ artifactId >
< version > 1.7.1 </ version >
</ dependency >
<!-- For data operations -->
< dependency >
< groupId > org.apache.iceberg </ groupId >
< artifactId > iceberg-data </ artifactId >
< version > 1.7.1 </ version >
</ dependency >
</ dependencies >
Working with Catalogs
Catalogs are the entry point for working with Iceberg tables. They provide methods to create, load, rename, and drop tables.
Hive Catalog
The Hive catalog uses a Hive Metastore to track Iceberg tables:
import org.apache.iceberg.hive.HiveCatalog;
import java.util.HashMap;
import java.util.Map;
HiveCatalog catalog = new HiveCatalog ();
// Configure the catalog
Map < String , String > properties = new HashMap <>();
properties . put ( "warehouse" , "/path/to/warehouse" );
properties . put ( "uri" , "thrift://localhost:9083" );
catalog . initialize ( "hive" , properties);
You can optionally use Spark’s Hadoop configuration: catalog . setConf ( spark . sparkContext (). hadoopConfiguration ());
Hadoop Catalog
The Hadoop catalog is a file-based catalog suitable for HDFS and S3:
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;
Configuration conf = new Configuration ();
String warehousePath = "hdfs://host:8020/warehouse" ;
HadoopCatalog catalog = new HadoopCatalog (conf, warehousePath);
The Hadoop catalog is not safe for concurrent writes on local filesystems or S3. Use it with HDFS or an atomic-rename-capable filesystem.
Catalog Operations
All catalogs implement the Catalog interface:
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.Namespace;
// Create a table identifier
TableIdentifier name = TableIdentifier . of ( "logging" , "logs" );
// Create a table
Table table = catalog . createTable (name, schema, spec);
// Load an existing table
Table table = catalog . loadTable (name);
// List tables in a namespace
List < TableIdentifier > tables = catalog . listTables ( Namespace . of ( "logging" ));
// Rename a table
TableIdentifier newName = TableIdentifier . of ( "logging" , "application_logs" );
catalog . renameTable (name, newName);
// Drop a table
catalog . dropTable (name);
Creating Schemas
Schemas define the structure of your table. Iceberg schemas are strongly typed and support nested structures.
Basic Schema
Create a schema using the Schema class:
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
Schema schema = new Schema (
Types . NestedField . required ( 1 , "id" , Types . LongType . get ()),
Types . NestedField . required ( 2 , "data" , Types . StringType . get ()),
Types . NestedField . optional ( 3 , "category" , Types . StringType . get ()),
Types . NestedField . required ( 4 , "event_time" ,
Types . TimestampType . withZone ())
);
About type IDs:
Type IDs must be unique within a schema
They enable schema evolution and column mapping
Iceberg automatically reassigns IDs when creating tables
Complex Schema
Iceberg supports nested types (structs, lists, maps):
Schema schema = new Schema (
Types . NestedField . required ( 1 , "level" , Types . StringType . get ()),
Types . NestedField . required ( 2 , "event_time" ,
Types . TimestampType . withZone ()),
Types . NestedField . required ( 3 , "message" , Types . StringType . get ()),
// List type
Types . NestedField . optional ( 4 , "call_stack" ,
Types . ListType . ofRequired ( 5 , Types . StringType . get ())),
// Struct type
Types . NestedField . optional ( 6 , "user" ,
Types . StructType . of (
Types . NestedField . required ( 7 , "id" , Types . LongType . get ()),
Types . NestedField . required ( 8 , "name" , Types . StringType . get ())
)),
// Map type
Types . NestedField . optional ( 9 , "properties" ,
Types . MapType . ofRequired ( 10 , 11 ,
Types . StringType . get (),
Types . StringType . get ()))
);
Supported Types
Iceberg supports the following types:
Primitive Types
Timestamp Types
Complex Types
Types . BooleanType . get () // Boolean
Types . IntegerType . get () // 32-bit integer
Types . LongType . get () // 64-bit integer
Types . FloatType . get () // 32-bit float
Types . DoubleType . get () // 64-bit float
Types . DecimalType . of ( 9 , 2 ) // Decimal with precision and scale
Types . StringType . get () // UTF-8 string
Types . BinaryType . get () // Binary data
Types . FixedType . ofLength ( 16 ) // Fixed-length binary
Types . UUIDType . get () // UUID
Types . DateType . get () // Date (no time)
Types . TimeType . get () // Time (no date)
Converting Schemas
Convert schemas from other formats:
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.iceberg.avro.AvroSchemaUtil;
Schema avroSchema = new Parser (). parse (
"{ \" type \" : \" record \" , \" name \" : \" User \" , \" fields \" : [...]}"
);
Schema icebergSchema = AvroSchemaUtil . toIceberg (avroSchema);
Creating Partition Specs
Partition specs define how Iceberg groups records into data files. Unlike Hive, Iceberg partitioning is hidden from users.
Basic Partitioning
import org.apache.iceberg.PartitionSpec;
// Unpartitioned table
PartitionSpec spec = PartitionSpec . unpartitioned ();
// Partition by identity (exact value)
PartitionSpec spec = PartitionSpec . builderFor (schema)
. identity ( "category" )
. build ();
// Partition by hour of timestamp
PartitionSpec spec = PartitionSpec . builderFor (schema)
. hour ( "event_time" )
. build ();
Advanced Partitioning
Iceberg supports multiple partition transforms:
PartitionSpec spec = PartitionSpec . builderFor (schema)
// Time-based transforms
. year ( "event_time" ) // Partition by year
. month ( "event_time" ) // Partition by month
. day ( "event_time" ) // Partition by day
. hour ( "event_time" ) // Partition by hour
// Bucketing (hash-based)
. bucket ( "id" , 16 ) // Hash into 16 buckets
// Truncation (prefix)
. truncate ( "message" , 10 ) // First 10 characters
// Identity (exact value)
. identity ( "level" )
. build ();
Real-World Example
Partition a logs table by hour and log level:
Schema schema = new Schema (
Types . NestedField . required ( 1 , "level" , Types . StringType . get ()),
Types . NestedField . required ( 2 , "event_time" , Types . TimestampType . withZone ()),
Types . NestedField . required ( 3 , "message" , Types . StringType . get ())
);
PartitionSpec spec = PartitionSpec . builderFor (schema)
. hour ( "event_time" )
. identity ( "level" )
. build ();
Users don’t need to know about partitioning when querying! This query automatically uses the partition: SELECT * FROM logs
WHERE event_time BETWEEN '2024-01-01 10:00:00' AND '2024-01-01 12:00:00'
AND level = 'ERROR' ;
Creating Tables
Combine schemas and partition specs to create tables:
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
// Define the table
TableIdentifier name = TableIdentifier . of ( "logging" , "logs" );
// Create the table
Table table = catalog . createTable (name, schema, spec);
System . out . println ( "Created table at: " + table . location ());
With Properties
Specify table properties at creation time:
import java.util.HashMap;
import java.util.Map;
Map < String , String > properties = new HashMap <>();
properties . put ( "write.format.default" , "parquet" );
properties . put ( "write.parquet.compression-codec" , "zstd" );
properties . put ( "commit.retry.num-retries" , "3" );
Table table = catalog . createTable (name, schema, spec, properties);
The Table interface provides access to table metadata:
import org.apache.iceberg.Table;
Table table = catalog . loadTable ( TableIdentifier . of ( "db" , "table" ));
// Get schema
Schema schema = table . schema ();
System . out . println ( "Schema: " + schema);
// Get partition spec
PartitionSpec spec = table . spec ();
System . out . println ( "Partition spec: " + spec);
// Get properties
Map < String , String > properties = table . properties ();
System . out . println ( "Properties: " + properties);
// Get current snapshot
Snapshot snapshot = table . currentSnapshot ();
if (snapshot != null ) {
System . out . println ( "Snapshot ID: " + snapshot . snapshotId ());
System . out . println ( "Timestamp: " + snapshot . timestampMillis ());
}
// Get all snapshots
Iterable < Snapshot > snapshots = table . snapshots ();
for ( Snapshot s : snapshots) {
System . out . println ( "Snapshot: " + s . snapshotId ());
}
// Get table location
String location = table . location ();
System . out . println ( "Location: " + location);
Scanning Tables
Scan tables to read data.
File-Level Scanning
Get the list of files to read:
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.FileScanTask;
// Create a scan
TableScan scan = table . newScan ();
// Apply filters
scan = scan . filter ( Expressions . equal ( "level" , "ERROR" ));
scan = scan . filter ( Expressions . greaterThan ( "event_time" , 1704067200000L ));
// Project columns
scan = scan . select ( "level" , "message" , "event_time" );
// Get the projected schema
Schema projection = scan . schema ();
// Plan and iterate files
try ( CloseableIterable < FileScanTask > tasks = scan . planFiles ()) {
for ( FileScanTask task : tasks) {
System . out . println ( "File: " + task . file (). path ());
System . out . println ( "Records: " + task . file (). recordCount ());
System . out . println ( "Size: " + task . file (). fileSizeInBytes ());
}
}
Time Travel
Read from historical snapshots:
// Read from a specific snapshot
TableScan scan = table . newScan ()
. useSnapshot (snapshotId);
// Read as of a timestamp (milliseconds since epoch)
long oneHourAgo = System . currentTimeMillis () - 3600000 ;
TableScan scan = table . newScan ()
. asOfTime (oneHourAgo);
Row-Level Scanning
Read actual row data using IcebergGenerics:
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.expressions.Expressions;
// Read records
try ( CloseableIterable < Record > records = IcebergGenerics . read (table)
. where ( Expressions . lessThan ( "id" , 100 ))
. select ( "id" , "name" , "email" )
. build ()) {
for ( Record record : records) {
Long id = record . getField ( "id" );
String name = record . getField ( "name" );
String email = record . getField ( "email" );
System . out . println (id + ": " + name + " <" + email + ">" );
}
}
Filter Expressions
Iceberg supports rich filter expressions:
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Expression;
// Comparison operators
Expression filter = Expressions . equal ( "status" , "active" );
filter = Expressions . notEqual ( "status" , "deleted" );
filter = Expressions . lessThan ( "age" , 30 );
filter = Expressions . lessThanOrEqual ( "age" , 30 );
filter = Expressions . greaterThan ( "age" , 18 );
filter = Expressions . greaterThanOrEqual ( "age" , 18 );
// Null checks
filter = Expressions . isNull ( "deleted_at" );
filter = Expressions . isNotNull ( "created_at" );
// String operations
filter = Expressions . startsWith ( "name" , "John" );
filter = Expressions . notStartsWith ( "name" , "Test" );
// Set membership
filter = Expressions . in ( "status" , "active" , "pending" , "approved" );
filter = Expressions . notIn ( "status" , "deleted" , "archived" );
// Logical operators
filter = Expressions . and (
Expressions . greaterThan ( "age" , 18 ),
Expressions . equal ( "status" , "active" )
);
filter = Expressions . or (
Expressions . equal ( "type" , "admin" ),
Expressions . equal ( "type" , "moderator" )
);
filter = Expressions . not ( Expressions . equal ( "status" , "deleted" ));
Update Operations
All update operations use a builder pattern with a commit() call.
Appending Data
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
// Create data file metadata
DataFile dataFile = DataFiles . builder (spec)
. withPath ( "/path/to/data-file.parquet" )
. withFileSizeInBytes ( 1048576 ) // 1 MB
. withRecordCount ( 1000 )
. withPartitionPath ( "event_time_hour=2024-01-01-10/level=ERROR" )
. build ();
// Append to table
table . newAppend ()
. appendFile (dataFile)
. commit ();
Schema Evolution
Modify the schema without rewriting data:
// Add columns
table . updateSchema ()
. addColumn ( "user_id" , Types . LongType . get ())
. addColumn ( "tags" , Types . ListType . ofOptional ( 100 , Types . StringType . get ()))
. commit ();
// Rename columns
table . updateSchema ()
. renameColumn ( "user_id" , "account_id" )
. commit ();
// Update column types (must be compatible)
table . updateSchema ()
. updateColumn ( "count" , Types . LongType . get ())
. commit ();
// Delete columns
table . updateSchema ()
. deleteColumn ( "deprecated_field" )
. commit ();
// Make required column optional
table . updateSchema ()
. makeColumnOptional ( "optional_field" )
. commit ();
// Update column documentation
table . updateSchema ()
. updateColumnDoc ( "email" , "User's primary email address" )
. commit ();
Partition Evolution
Change how new data is partitioned:
// Add new partition field
table . updateSpec ()
. addField ( Expressions . bucket ( "user_id" , 16 ))
. commit ();
// Remove partition field
table . updateSpec ()
. removeField ( "event_time_day" )
. commit ();
// Rename partition field
table . updateSpec ()
. renameField ( "event_time_day" , "date" )
. commit ();
Updating Properties
table . updateProperties ()
. set ( "write.format.default" , "parquet" )
. set ( "write.parquet.compression-codec" , "zstd" )
. remove ( "old.property" )
. commit ();
Deleting Data
import org.apache.iceberg.expressions.Expressions;
// Delete files matching a filter
table . newDelete ()
. deleteFromRowFilter ( Expressions . lessThan ( "event_time" , cutoffTime))
. commit ();
// Delete specific files
table . newDelete ()
. deleteFile (dataFile)
. commit ();
Expiring Snapshots
Remove old snapshots to save space:
long olderThan = System . currentTimeMillis () - 7 * 24 * 60 * 60 * 1000L ; // 7 days
table . expireSnapshots ()
. expireOlderThan (olderThan)
. retainLast ( 10 ) // Keep at least 10 snapshots
. commit ();
Transactions
Group multiple operations into a single atomic commit:
import org.apache.iceberg.Transaction;
// Create a transaction
Transaction txn = table . newTransaction ();
// Add operations to the transaction
txn . newDelete ()
. deleteFromRowFilter ( Expressions . equal ( "status" , "deleted" ))
. commit ();
txn . newAppend ()
. appendFile (newDataFile)
. commit ();
txn . updateProperties ()
. set ( "last.update.time" , String . valueOf ( System . currentTimeMillis ()))
. commit ();
// Commit all operations atomically
txn . commitTransaction ();
Branching and Tagging
Create branches and tags for experimentation and auditing.
Creating Branches
String branch = "test-branch" ;
// Create a branch with retention settings
table . manageSnapshots ()
. createBranch (branch, table . currentSnapshot (). snapshotId ())
. setMinSnapshotsToKeep (branch, 2 )
. setMaxSnapshotAgeMs (branch, 3600000 ) // 1 hour
. setMaxRefAgeMs (branch, 604800000 ) // 7 days
. commit ();
String tag = "v1.0.0" ;
table . manageSnapshots ()
. createTag (tag, snapshotId)
. setMaxRefAgeMs (tag, 86400000 ) // 1 day
. commit ();
Writing to Branches
// Append to a branch
table . newAppend ()
. appendFile (dataFile)
. toBranch ( "test-branch" )
. commit ();
// Row delta on a branch
table . newRowDelta ()
. addRows (dataFile)
. addDeletes (deleteFile)
. toBranch ( "test-branch" )
. commit ();
Reading from Branches
// Read from branch head
TableScan branchScan = table . newScan (). useRef ( "test-branch" );
// Read from tag
TableScan tagScan = table . newScan (). useRef ( "v1.0.0" );
// Fast-forward a branch
table . manageSnapshots ()
. fastForwardBranch ( "test-branch" , "main" )
. commit ();
// Replace branch/tag snapshot
table . manageSnapshots ()
. replaceBranch ( "test-branch" , newSnapshotId)
. commit ();
// Remove branch
table . manageSnapshots ()
. removeBranch ( "test-branch" )
. commit ();
// Remove tag
table . manageSnapshots ()
. removeTag ( "v1.0.0" )
. commit ();
Best Practices
Use try-with-resources for scans and iterables: try ( CloseableIterable < Record > records = IcebergGenerics . read (table). build ()) {
for ( Record record : records) {
// Process record
}
}
Use Appropriate Partition Granularity
Too fine: Many small files, slow metadata operations
Too coarse: Poor query pruning, read unnecessary data
Target: 500MB-1GB per partition
// Good: Daily partitions for moderate data volume
PartitionSpec . builderFor (schema). day ( "event_time" ). build ();
// Good: Hourly + bucketing for high volume
PartitionSpec . builderFor (schema)
. hour ( "event_time" )
. bucket ( "user_id" , 16 )
. build ();
Handle Concurrent Writes Gracefully
Iceberg uses optimistic concurrency. Writes may need to retry: try {
table . newAppend ()
. appendFile (dataFile)
. commit ();
} catch ( CommitFailedException e ) {
// Another writer committed first
// Iceberg will retry automatically if the change is compatible
throw e;
}
Maintain Tables Regularly
Regularly run maintenance operations:
Expire snapshots : Remove old history
Remove orphan files : Clean up unreferenced files
Compact files : Merge small files
Rewrite manifests : Optimize metadata
// Expire old snapshots
table . expireSnapshots ()
. expireOlderThan (weekAgo)
. retainLast ( 10 )
. commit ();
Next Steps
API Documentation Complete Java API reference
Schema Evolution Learn safe schema evolution
Performance Tuning Optimize table performance
Configuration Table and catalog configuration