Skip to main content
The IncrementalAppendScan interface provides an API for reading only the data appended between two snapshots in an Iceberg table.

Overview

IncrementalAppendScan is designed for incremental processing workflows where you need to read only the new data added since the last processed snapshot. This is useful for streaming pipelines, change data capture (CDC), and incremental ETL jobs.

Interface

public interface IncrementalAppendScan 
    extends IncrementalScan<IncrementalAppendScan, FileScanTask, CombinedScanTask>

Core Methods

IncrementalAppendScan inherits all methods from IncrementalScan:

fromSnapshotInclusive()

Instructs this scan to look for changes starting from a particular snapshot (inclusive).
IncrementalAppendScan fromSnapshotInclusive(long fromSnapshotId)
Parameters:
  • fromSnapshotId - The start snapshot ID (inclusive)
Returns: This scan for method chaining Throws: IllegalArgumentException if the start snapshot is not an ancestor of the end snapshot Example:
IncrementalAppendScan scan = table.newIncrementalAppendScan()
    .fromSnapshotInclusive(100L)
    .toSnapshot(200L);

fromSnapshotInclusive() with Reference

Instructs this scan to look for changes starting from a particular snapshot reference (inclusive).
default IncrementalAppendScan fromSnapshotInclusive(String ref)
Parameters:
  • ref - The start ref name that points to a particular snapshot ID (inclusive)
Returns: This scan for method chaining Throws: IllegalArgumentException if the start snapshot is not an ancestor of the end snapshot

fromSnapshotExclusive()

Instructs this scan to look for changes starting from a particular snapshot (exclusive).
IncrementalAppendScan fromSnapshotExclusive(long fromSnapshotId)
Parameters:
  • fromSnapshotId - The start snapshot ID (exclusive)
Returns: This scan for method chaining Throws: IllegalArgumentException if the start snapshot is not an ancestor of the end snapshot Example:
// Read appends after snapshot 100, up to snapshot 200
IncrementalAppendScan scan = table.newIncrementalAppendScan()
    .fromSnapshotExclusive(100L)
    .toSnapshot(200L);

fromSnapshotExclusive() with Reference

Instructs this scan to look for changes starting from a particular snapshot reference (exclusive).
default IncrementalAppendScan fromSnapshotExclusive(String ref)
Parameters:
  • ref - The start ref name that points to a particular snapshot ID (exclusive)
Returns: This scan for method chaining

toSnapshot()

Instructs this scan to look for changes up to a particular snapshot (inclusive).
IncrementalAppendScan toSnapshot(long toSnapshotId)
Parameters:
  • toSnapshotId - The end snapshot ID (inclusive)
Returns: This scan for method chaining Example:
// Read all appends up to snapshot 500
IncrementalAppendScan scan = table.newIncrementalAppendScan()
    .toSnapshot(500L);

toSnapshot() with Reference

Instructs this scan to look for changes up to a particular snapshot reference (inclusive).
default IncrementalAppendScan toSnapshot(String ref)
Parameters:
  • ref - The end snapshot ref (inclusive)
Returns: This scan for method chaining

useBranch()

Specifies the branch to use for the incremental scan.
default IncrementalAppendScan useBranch(String branch)
Parameters:
  • branch - The branch name
Returns: This scan for method chaining

Examples

Basic Incremental Scan

import org.apache.iceberg.Table;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.FileScanTask;

// Read new data since last processed snapshot
long lastProcessedSnapshot = 12345L;
long currentSnapshot = table.currentSnapshot().snapshotId();

IncrementalAppendScan scan = table.newIncrementalAppendScan()
    .fromSnapshotExclusive(lastProcessedSnapshot)
    .toSnapshot(currentSnapshot);

// Process only new files
try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
    for (FileScanTask task : tasks) {
        // Process appended data
        System.out.println("New file: " + task.file().path());
    }
}

Streaming Incremental Processing

import java.util.concurrent.TimeUnit;

class IncrementalProcessor {
    private long lastProcessedSnapshot;
    private final Table table;
    
    public IncrementalProcessor(Table table) {
        this.table = table;
        this.lastProcessedSnapshot = table.currentSnapshot().snapshotId();
    }
    
    public void processNewData() {
        Snapshot currentSnapshot = table.currentSnapshot();
        
        if (currentSnapshot.snapshotId() == lastProcessedSnapshot) {
            // No new data
            return;
        }
        
        IncrementalAppendScan scan = table.newIncrementalAppendScan()
            .fromSnapshotExclusive(lastProcessedSnapshot)
            .toSnapshot(currentSnapshot.snapshotId());
        
        try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
            for (FileScanTask task : tasks) {
                processTask(task);
            }
        }
        
        // Update checkpoint
        lastProcessedSnapshot = currentSnapshot.snapshotId();
    }
    
    private void processTask(FileScanTask task) {
        // Process the task
    }
}

Processing Appends in Time Range

import java.time.Instant;
import java.time.temporal.ChronoUnit;

// Find snapshots in time range
Instant yesterday = Instant.now().minus(1, ChronoUnit.DAYS);
Instant now = Instant.now();

long startSnapshotId = findSnapshotAtTime(table, yesterday.toEpochMilli());
long endSnapshotId = findSnapshotAtTime(table, now.toEpochMilli());

// Read appends in that range
IncrementalAppendScan scan = table.newIncrementalAppendScan()
    .fromSnapshotExclusive(startSnapshotId)
    .toSnapshot(endSnapshotId);

try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
    tasks.forEach(task -> {
        // Process appends from last 24 hours
    });
}

private long findSnapshotAtTime(Table table, long timestampMillis) {
    return table.snapshot(timestampMillis).snapshotId();
}

Branch-Based Incremental Scan

// Read appends on a specific branch
IncrementalAppendScan scan = table.newIncrementalAppendScan()
    .useBranch("feature-branch")
    .fromSnapshotExclusive("last-processed-ref")
    .toSnapshot("latest-ref");

try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
    for (FileScanTask task : tasks) {
        // Process new data on feature branch
    }
}

Incremental Scan with Filters

import org.apache.iceberg.expressions.Expressions;

// Read incremental data with filters
IncrementalAppendScan scan = table.newIncrementalAppendScan()
    .fromSnapshotExclusive(lastSnapshot)
    .toSnapshot(currentSnapshot)
    .filter(Expressions.greaterThan("timestamp", startTime))
    .select("id", "event_type", "payload");

try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
    tasks.forEach(task -> {
        // Process filtered incremental data
    });
}

Reading All Historical Appends

// Read all appends from beginning to current
IncrementalAppendScan scan = table.newIncrementalAppendScan()
    .toSnapshot(table.currentSnapshot().snapshotId());

// If fromSnapshot is not set, defaults to oldest ancestor
try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
    for (FileScanTask task : tasks) {
        // Process all data ever appended
    }
}

Inclusive vs Exclusive Boundaries

// Scenario: Snapshots at IDs 100, 200, 300, 400

// Exclusive start, inclusive end: reads data in 200, 300, 400
IncrementalAppendScan scan1 = table.newIncrementalAppendScan()
    .fromSnapshotExclusive(100L)
    .toSnapshot(400L);

// Inclusive start, inclusive end: reads data in 100, 200, 300, 400
IncrementalAppendScan scan2 = table.newIncrementalAppendScan()
    .fromSnapshotInclusive(100L)
    .toSnapshot(400L);

// Typically use exclusive start for checkpointing:
// - Process up to snapshot 200
// - Checkpoint snapshot 200
// - Next iteration: fromSnapshotExclusive(200)

Checkpointing Pattern

import java.io.*;

class CheckpointedIncrementalProcessor {
    private static final String CHECKPOINT_FILE = "checkpoint.txt";
    private final Table table;
    
    public void process() throws IOException {
        // Read checkpoint
        Long checkpoint = readCheckpoint();
        long fromSnapshot = checkpoint != null ? checkpoint : 0L;
        
        // Get current snapshot
        long toSnapshot = table.currentSnapshot().snapshotId();
        
        if (fromSnapshot == toSnapshot) {
            return; // No new data
        }
        
        // Create incremental scan
        IncrementalAppendScan scan = table.newIncrementalAppendScan();
        
        if (fromSnapshot > 0) {
            scan = scan.fromSnapshotExclusive(fromSnapshot);
        }
        scan = scan.toSnapshot(toSnapshot);
        
        // Process data
        try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
            for (FileScanTask task : tasks) {
                processTask(task);
            }
        }
        
        // Save checkpoint
        saveCheckpoint(toSnapshot);
    }
    
    private Long readCheckpoint() throws IOException {
        File file = new File(CHECKPOINT_FILE);
        if (!file.exists()) return null;
        try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
            return Long.parseLong(reader.readLine());
        }
    }
    
    private void saveCheckpoint(long snapshotId) throws IOException {
        try (PrintWriter writer = new PrintWriter(new FileWriter(CHECKPOINT_FILE))) {
            writer.println(snapshotId);
        }
    }
    
    private void processTask(FileScanTask task) {
        // Process task
    }
}

Use Cases

IncrementalAppendScan is ideal for:
  1. Streaming ETL Pipelines - Process only new data since last run
  2. Change Data Capture - Track appended records over time
  3. Incremental Analytics - Update aggregations with new data only
  4. Event Processing - Process new events in chronological order
  5. Data Replication - Sync only new data to downstream systems

See Also

Build docs developers (and LLMs) love