Skip to main content

Overview

The FileIO interface is Iceberg’s abstraction for reading and writing data and metadata files. It provides a flexible way to integrate with any storage system while maintaining Iceberg’s guarantees around metadata operations and table commits.

Why FileIO?

Iceberg’s design separates concerns:
  • Metadata Operations: Planning and committing changes (uses FileIO)
  • Data Operations: Reading and writing table data (uses processing engine + FileIO)
  • Physical Layout: Absolute paths allow flexibility in file organization

Key Benefits

  1. No File Renaming: Iceberg never renames files, simplifying storage requirements
  2. Absolute Paths: Metadata tracks full file paths, enabling flexible layouts
  3. Minimal Requirements: Only need read, write, delete, and seek operations
  4. Custom Storage: Support any storage backend with a FileIO implementation

FileIO Interface

The core interface is simple:
public interface FileIO extends Serializable, Closeable {
  /**
   * Get an InputFile to read bytes from a file
   */
  InputFile newInputFile(String path);
  
  /**
   * Get an OutputFile to write bytes to a file
   */
  OutputFile newOutputFile(String path);
  
  /**
   * Delete a file
   */
  void deleteFile(String path);
  
  /**
   * Initialize with catalog properties
   */
  default void initialize(Map<String, String> properties) {}
}

Built-in Implementations

Iceberg provides FileIO implementations for common storage systems:
ImplementationStorage TypeModule
S3FileIOAmazon S3iceberg-aws
GCSFileIOGoogle Cloud Storageiceberg-gcp
ADLSFileIOAzure Data Lake Storageiceberg-azure
OSSFileIOAlibaba Cloud OSSiceberg-aliyun
HadoopFileIOAny Hadoop FileSystemiceberg-core
ResolvingFileIOMultiple storage typesiceberg-core

Implementing Custom FileIO

Basic Implementation

package com.example.storage;

import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import java.util.Map;

public class CustomFileIO implements FileIO {
  
  private CustomStorageClient client;
  
  // Must have no-arg constructor for dynamic loading
  public CustomFileIO() {
  }
  
  @Override
  public void initialize(Map<String, String> properties) {
    String endpoint = properties.get("custom.storage.endpoint");
    String accessKey = properties.get("custom.storage.access-key");
    String secretKey = properties.get("custom.storage.secret-key");
    
    this.client = new CustomStorageClient(endpoint, accessKey, secretKey);
  }
  
  @Override
  public InputFile newInputFile(String path) {
    return new CustomInputFile(client, path);
  }
  
  @Override
  public OutputFile newOutputFile(String path) {
    return new CustomOutputFile(client, path);
  }
  
  @Override
  public void deleteFile(String path) {
    try {
      client.delete(path);
    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to delete: %s", path);
    }
  }
  
  @Override
  public void close() {
    if (client != null) {
      client.close();
    }
  }
}

Implementing InputFile

public class CustomInputFile implements InputFile {
  
  private final CustomStorageClient client;
  private final String path;
  private Long length;
  
  public CustomInputFile(CustomStorageClient client, String path) {
    this.client = client;
    this.path = path;
  }
  
  @Override
  public long getLength() {
    if (length == null) {
      length = client.getObjectMetadata(path).getContentLength();
    }
    return length;
  }
  
  @Override
  public SeekableInputStream newStream() {
    return new CustomSeekableInputStream(client, path);
  }
  
  @Override
  public String location() {
    return path;
  }
  
  @Override
  public boolean exists() {
    return client.exists(path);
  }
}

Implementing OutputFile

public class CustomOutputFile implements OutputFile {
  
  private final CustomStorageClient client;
  private final String path;
  
  public CustomOutputFile(CustomStorageClient client, String path) {
    this.client = client;
    this.path = path;
  }
  
  @Override
  public PositionOutputStream create() {
    return new CustomPositionOutputStream(client, path);
  }
  
  @Override
  public PositionOutputStream createOrOverwrite() {
    // Delete if exists, then create
    if (client.exists(path)) {
      client.delete(path);
    }
    return create();
  }
  
  @Override
  public String location() {
    return path;
  }
  
  @Override
  public InputFile toInputFile() {
    return new CustomInputFile(client, path);
  }
}

Implementing Seekable Input

public class CustomSeekableInputStream extends SeekableInputStream {
  
  private final CustomStorageClient client;
  private final String path;
  private InputStream stream;
  private long pos = 0;
  private long markPos = 0;
  
  public CustomSeekableInputStream(CustomStorageClient client, String path) {
    this.client = client;
    this.path = path;
    this.stream = client.getObject(path);
  }
  
  @Override
  public long getPos() throws IOException {
    return pos;
  }
  
  @Override
  public void seek(long newPos) throws IOException {
    if (newPos == pos) {
      return;
    }
    
    // Close existing stream and open new one at position
    stream.close();
    stream = client.getObject(path, newPos);
    pos = newPos;
  }
  
  @Override
  public int read() throws IOException {
    int b = stream.read();
    if (b >= 0) {
      pos++;
    }
    return b;
  }
  
  @Override
  public int read(byte[] b, int off, int len) throws IOException {
    int bytesRead = stream.read(b, off, len);
    if (bytesRead > 0) {
      pos += bytesRead;
    }
    return bytesRead;
  }
  
  @Override
  public void close() throws IOException {
    stream.close();
  }
}

Configuration

Loading via Catalog Property

spark-sql \
  --conf spark.sql.catalog.my_catalog.io-impl=com.example.storage.CustomFileIO \
  --conf spark.sql.catalog.my_catalog.custom.storage.endpoint=https://storage.example.com \
  --conf spark.sql.catalog.my_catalog.custom.storage.access-key=ACCESS_KEY \
  --conf spark.sql.catalog.my_catalog.custom.storage.secret-key=SECRET_KEY

Loading via Java API

import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import java.util.HashMap;
import java.util.Map;

Map<String, String> properties = new HashMap<>();
properties.put("io-impl", "com.example.storage.CustomFileIO");
properties.put("custom.storage.endpoint", "https://storage.example.com");
properties.put("custom.storage.access-key", "ACCESS_KEY");
properties.put("custom.storage.secret-key", "SECRET_KEY");
properties.put("warehouse", "custom://my-bucket/warehouse");

Catalog catalog = CatalogUtil.buildIcebergCatalog(
  "my_catalog",
  properties,
  hadoopConf
);

Advanced Features

Hadoop Configuration Access

If your FileIO needs Hadoop configuration:
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

public class CustomFileIO implements FileIO, Configurable {
  
  private Configuration conf;
  
  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }
  
  @Override
  public Configuration getConf() {
    return conf;
  }
  
  @Override
  public void initialize(Map<String, String> properties) {
    // Can access Hadoop configuration here
    String value = conf.get("hadoop.property.name");
  }
}

Bulk Delete Operations

Optimize deletes with bulk operations:
public class CustomFileIO implements FileIO, SupportsBulkOperations {
  
  @Override
  public void deleteFiles(Iterable<String> pathsToDelete) throws IOException {
    List<String> batch = new ArrayList<>();
    
    for (String path : pathsToDelete) {
      batch.add(path);
      
      if (batch.size() >= 1000) {
        client.bulkDelete(batch);
        batch.clear();
      }
    }
    
    if (!batch.isEmpty()) {
      client.bulkDelete(batch);
    }
  }
}

Prefix Operations

Implement efficient prefix listing:
public class CustomFileIO implements FileIO, SupportsPrefixOperations {
  
  @Override
  public Iterable<FileInfo> listPrefix(String prefix) {
    return client.listObjects(prefix).stream()
      .map(obj -> new FileInfo(obj.getKey(), obj.getSize(), obj.getLastModified()))
      .collect(Collectors.toList());
  }
  
  @Override
  public void deletePrefix(String prefix) {
    List<String> toDelete = client.listObjects(prefix).stream()
      .map(StorageObject::getKey)
      .collect(Collectors.toList());
    
    deleteFiles(toDelete);
  }
}

Testing Your FileIO

Unit Tests

import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.InputFile;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

public class CustomFileIOTest {
  
  @Test
  public void testWriteAndRead() throws IOException {
    FileIO io = new CustomFileIO();
    io.initialize(testProperties());
    
    String path = "test://bucket/file.txt";
    String content = "Hello, Iceberg!";
    
    // Write
    OutputFile out = io.newOutputFile(path);
    try (PositionOutputStream stream = out.create()) {
      stream.write(content.getBytes(StandardCharsets.UTF_8));
    }
    
    // Read
    InputFile in = io.newInputFile(path);
    assertEquals(content.length(), in.getLength());
    
    try (SeekableInputStream stream = in.newStream()) {
      byte[] bytes = new byte[content.length()];
      stream.readFully(bytes);
      assertEquals(content, new String(bytes, StandardCharsets.UTF_8));
    }
    
    // Delete
    io.deleteFile(path);
    assertFalse(io.newInputFile(path).exists());
  }
  
  @Test
  public void testSeek() throws IOException {
    FileIO io = new CustomFileIO();
    io.initialize(testProperties());
    
    String path = "test://bucket/seekable.txt";
    byte[] content = "0123456789".getBytes(StandardCharsets.UTF_8);
    
    // Write
    try (PositionOutputStream stream = io.newOutputFile(path).create()) {
      stream.write(content);
    }
    
    // Read with seek
    try (SeekableInputStream stream = io.newInputFile(path).newStream()) {
      stream.seek(5);
      assertEquals('5', stream.read());
      
      stream.seek(0);
      assertEquals('0', stream.read());
      
      stream.seek(9);
      assertEquals('9', stream.read());
    }
  }
}

Performance Considerations

Cache file metadata (size, existence) to reduce storage API calls:
private final Map<String, FileMetadata> metadataCache = new ConcurrentHashMap<>();

@Override
public long getLength() {
  return metadataCache.computeIfAbsent(path, 
    p -> client.getObjectMetadata(p)
  ).getContentLength();
}
Reuse HTTP connections for better performance:
private static final CustomStorageClient sharedClient = 
  new CustomStorageClient(pooledConnectionManager);
Batch deletes and list operations when possible to reduce API calls.
Use buffered streams for better throughput:
@Override
public SeekableInputStream newStream() {
  return new BufferedSeekableInputStream(
    new CustomSeekableInputStream(client, path),
    8192 // 8KB buffer
  );
}

Best Practices

  1. Thread Safety: Ensure FileIO instances are thread-safe or document thread safety requirements
  2. Resource Cleanup: Always close streams and clients in close() method
  3. Error Handling: Wrap storage exceptions in Iceberg exceptions (RuntimeIOException)
  4. Retry Logic: Implement retries for transient failures
  5. Metrics: Add instrumentation for monitoring (optional)
  6. Documentation: Document custom properties and configuration requirements

Common Use Cases

Cloud Storage Integration

Implement FileIO for cloud storage not natively supported:
  • Oracle Cloud Infrastructure (OCI) Object Storage
  • Cloudflare R2
  • Wasabi
  • MinIO
  • Ceph RADOS Gateway

On-Premises Storage

Integrate with enterprise storage systems:
  • NetApp StorageGRID
  • Pure Storage FlashBlade
  • IBM Cloud Object Storage
  • Scality RING

Custom Protocols

Support custom URI schemes:
// Support custom:// protocol
if (path.startsWith("custom://")) {
  return new CustomInputFile(path);
}

Debugging

Enable debug logging:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomFileIO implements FileIO {
  private static final Logger LOG = LoggerFactory.getLogger(CustomFileIO.class);
  
  @Override
  public InputFile newInputFile(String path) {
    LOG.debug("Creating InputFile for path: {}", path);
    return new CustomInputFile(client, path);
  }
}
Configure logging level:
--conf spark.driver.extraJavaOptions="-Dlog4j.logger.com.example.storage=DEBUG"

Next Steps

AWS S3 FileIO

See production FileIO implementation for S3

Custom Catalog

Build custom catalog with your FileIO

Build docs developers (and LLMs) love