Iceberg provides a pluggable metrics reporting system to collect and export operational metrics. As of version 1.1.0, the MetricsReporter and MetricsReport APIs enable tracking scan performance, commit statistics, and other operational data.
Metrics Architecture
The metrics system consists of two main components:
- MetricsReport: Data structure containing metrics for specific operations
- MetricsReporter: Plugin interface for reporting metrics to external systems
Types of Reports
ScanReport
A ScanReport captures metrics collected during scan planning against a table.
Included Metrics
- General information: Table name, snapshot ID, schema ID, projection details
- Planning duration: Total time spent planning the scan
- File statistics: Number of data/delete files and manifests scanned or skipped
- Size statistics: Total size of data and delete files
- Delete file breakdown: Equality vs. positional delete files
Example ScanReport
ScanReport{
tableName=scan-planning-with-eq-and-pos-delete-files,
snapshotId=2,
filter=ref(name="data") == "(hash-27fa7cc0)",
schemaId=0,
projectedFieldIds=[1, 2],
projectedFieldNames=[id, data],
scanMetrics=ScanMetricsResult{
totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.026569404S, count=1},
resultDataFiles=CounterResult{unit=COUNT, value=1},
resultDeleteFiles=CounterResult{unit=COUNT, value=2},
totalDataManifests=CounterResult{unit=COUNT, value=1},
totalDeleteManifests=CounterResult{unit=COUNT, value=1},
scannedDataManifests=CounterResult{unit=COUNT, value=1},
skippedDataManifests=CounterResult{unit=COUNT, value=0},
totalFileSizeInBytes=CounterResult{unit=BYTES, value=10},
totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=20},
skippedDataFiles=CounterResult{unit=COUNT, value=0},
skippedDeleteFiles=CounterResult{unit=COUNT, value=0},
scannedDeleteManifests=CounterResult{unit=COUNT, value=1},
skippedDeleteManifests=CounterResult{unit=COUNT, value=0},
indexedDeleteFiles=CounterResult{unit=COUNT, value=2},
equalityDeleteFiles=CounterResult{unit=COUNT, value=1},
positionalDeleteFiles=CounterResult{unit=COUNT, value=1}
},
metadata={
iceberg-version=Apache Iceberg 1.4.0-SNAPSHOT
}
}
CommitReport
A CommitReport captures metrics collected after committing changes to a table (producing a snapshot).
Included Metrics
- General information: Table name, snapshot ID, sequence number, operation type
- Commit duration: Total time to commit
- Retry statistics: Number of attempts required for commit to succeed
- File changes: Added/removed data and delete files
- Row changes: Added/removed records
- Delete breakdown: Equality vs. positional deletes added/removed
Example CommitReport
CommitReport{
tableName=scan-planning-with-eq-and-pos-delete-files,
snapshotId=1,
sequenceNumber=1,
operation=append,
commitMetrics=CommitMetricsResult{
totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.098429626S, count=1},
attempts=CounterResult{unit=COUNT, value=1},
addedDataFiles=CounterResult{unit=COUNT, value=1},
removedDataFiles=null,
totalDataFiles=CounterResult{unit=COUNT, value=1},
addedDeleteFiles=null,
addedEqualityDeleteFiles=null,
addedPositionalDeleteFiles=null,
removedDeleteFiles=null,
removedEqualityDeleteFiles=null,
removedPositionalDeleteFiles=null,
totalDeleteFiles=CounterResult{unit=COUNT, value=0},
addedRecords=CounterResult{unit=COUNT, value=1},
removedRecords=null,
totalRecords=CounterResult{unit=COUNT, value=1},
addedFilesSizeInBytes=CounterResult{unit=BYTES, value=10},
removedFilesSizeInBytes=null,
totalFilesSizeInBytes=CounterResult{unit=BYTES, value=10},
addedPositionalDeletes=null,
removedPositionalDeletes=null,
totalPositionalDeletes=CounterResult{unit=COUNT, value=0},
addedEqualityDeletes=null,
removedEqualityDeletes=null,
totalEqualityDeletes=CounterResult{unit=COUNT, value=0}
},
metadata={
iceberg-version=Apache Iceberg 1.4.0-SNAPSHOT
}
}
Available Metrics Reporters
LoggingMetricsReporter
The default metrics reporter that logs reports to the application log file.
Configuration
No configuration required - automatically enabled when no other reporter is configured.
Output Example
INFO org.apache.iceberg.metrics.LoggingMetricsReporter - Received metrics report:
ScanReport{tableName=events, snapshotId=12345, ...}
The LoggingMetricsReporter is useful for debugging and development but may generate significant log volume in production.
RESTMetricsReporter
Automatically enabled when using the RESTCatalog. Sends metrics to a REST server endpoint.
Endpoint
Metrics are posted to:
/v1/{prefix}/namespaces/{namespace}/tables/{table}/metrics
Configuration
Control REST metrics reporting with the catalog property:
Map<String, String> catalogProps = new HashMap<>();
catalogProps.put("rest-metrics-reporting-enabled", "true"); // default: true
Catalog catalog = new RESTCatalog();
catalog.initialize("rest-catalog", catalogProps);
| Property | Default | Description |
|---|
rest-metrics-reporting-enabled | true | Enable/disable REST metrics reporting |
Implementing a Custom Metrics Reporter
Create custom reporters to send metrics to monitoring systems like Prometheus, CloudWatch, or Datadog.
Basic Implementation
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.metrics.CommitReport;
import java.util.List;
import com.google.common.collect.Lists;
public class InMemoryMetricsReporter implements MetricsReporter {
private List<MetricsReport> metricsReports = Lists.newArrayList();
@Override
public void report(MetricsReport report) {
metricsReports.add(report);
}
public List<MetricsReport> reports() {
return metricsReports;
}
}
Prometheus Reporter Example
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import org.apache.iceberg.metrics.*;
public class PrometheusMetricsReporter implements MetricsReporter {
private static final Counter scanCounter = Counter.build()
.name("iceberg_scans_total")
.help("Total number of table scans")
.labelNames("table", "snapshot_id")
.register();
private static final Histogram scanDuration = Histogram.build()
.name("iceberg_scan_duration_seconds")
.help("Scan planning duration in seconds")
.labelNames("table")
.register();
private static final Counter commitCounter = Counter.build()
.name("iceberg_commits_total")
.help("Total number of commits")
.labelNames("table", "operation")
.register();
private static final Counter filesAdded = Counter.build()
.name("iceberg_files_added_total")
.help("Total number of files added")
.labelNames("table")
.register();
@Override
public void report(MetricsReport report) {
if (report instanceof ScanReport) {
handleScanReport((ScanReport) report);
} else if (report instanceof CommitReport) {
handleCommitReport((CommitReport) report);
}
}
private void handleScanReport(ScanReport report) {
scanCounter.labels(
report.tableName(),
String.valueOf(report.snapshotId())
).inc();
if (report.scanMetrics().totalPlanningDuration() != null) {
double durationSeconds = report.scanMetrics()
.totalPlanningDuration()
.totalDuration()
.toNanos() / 1_000_000_000.0;
scanDuration.labels(report.tableName()).observe(durationSeconds);
}
}
private void handleCommitReport(CommitReport report) {
commitCounter.labels(
report.tableName(),
report.operation()
).inc();
if (report.commitMetrics().addedDataFiles() != null) {
filesAdded.labels(report.tableName())
.inc(report.commitMetrics().addedDataFiles().value());
}
}
}
CloudWatch Reporter Example
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.model.*;
import org.apache.iceberg.metrics.*;
import java.util.Date;
public class CloudWatchMetricsReporter implements MetricsReporter {
private final AmazonCloudWatch cloudWatch;
private final String namespace;
public CloudWatchMetricsReporter(AmazonCloudWatch cloudWatch, String namespace) {
this.cloudWatch = cloudWatch;
this.namespace = namespace;
}
@Override
public void report(MetricsReport report) {
if (report instanceof ScanReport) {
reportScanMetrics((ScanReport) report);
} else if (report instanceof CommitReport) {
reportCommitMetrics((CommitReport) report);
}
}
private void reportScanMetrics(ScanReport report) {
PutMetricDataRequest request = new PutMetricDataRequest()
.withNamespace(namespace)
.withMetricData(
new MetricDatum()
.withMetricName("ScanPlanningDuration")
.withValue(
report.scanMetrics()
.totalPlanningDuration()
.totalDuration()
.toMillis() / 1000.0
)
.withUnit(StandardUnit.Seconds)
.withTimestamp(new Date())
.withDimensions(
new Dimension()
.withName("TableName")
.withValue(report.tableName())
),
new MetricDatum()
.withMetricName("DataFilesScanned")
.withValue(
(double) report.scanMetrics().resultDataFiles().value()
)
.withUnit(StandardUnit.Count)
.withTimestamp(new Date())
.withDimensions(
new Dimension()
.withName("TableName")
.withValue(report.tableName())
)
);
cloudWatch.putMetricData(request);
}
private void reportCommitMetrics(CommitReport report) {
// Similar implementation for commit metrics
}
}
Registering Custom Metrics Reporters
Via Catalog Configuration
Register a reporter globally for all tables in a catalog:
Map<String, String> catalogProps = new HashMap<>();
catalogProps.put(
"metrics-reporter-impl",
"com.example.PrometheusMetricsReporter"
);
Catalog catalog = new HadoopCatalog();
catalog.initialize("hadoop-catalog", catalogProps);
Via Scan API
Attach reporters to specific scan operations:
MetricsReporter prometheusReporter = new PrometheusMetricsReporter();
MetricsReporter cloudWatchReporter = new CloudWatchMetricsReporter(...);
TableScan scan = table.newScan()
.metricsReporter(prometheusReporter)
.metricsReporter(cloudWatchReporter)
.filter(equal("category", "orders"));
try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
// Metrics automatically reported to both reporters
for (FileScanTask task : tasks) {
// Process files
}
}
Reporters registered via the scan API are used in addition to catalog-level reporters, not instead of them.
Metrics Use Cases
Track scan performance to identify slow queries:
public class PerformanceMonitor implements MetricsReporter {
@Override
public void report(MetricsReport report) {
if (report instanceof ScanReport) {
ScanReport scan = (ScanReport) report;
long durationMs = scan.scanMetrics()
.totalPlanningDuration()
.totalDuration()
.toMillis();
if (durationMs > 5000) { // Alert on scans > 5 seconds
alertSlowScan(scan.tableName(), durationMs);
}
}
}
}
Cost Tracking
Track data scanned for cost allocation:
public class CostTracker implements MetricsReporter {
@Override
public void report(MetricsReport report) {
if (report instanceof ScanReport) {
ScanReport scan = (ScanReport) report;
long bytesScanned = scan.scanMetrics().totalFileSizeInBytes().value();
recordCost(
scan.tableName(),
calculateCost(bytesScanned)
);
}
}
}
Audit Logging
Log all table modifications:
public class AuditLogger implements MetricsReporter {
@Override
public void report(MetricsReport report) {
if (report instanceof CommitReport) {
CommitReport commit = (CommitReport) report;
logAudit(
commit.tableName(),
commit.operation(),
commit.snapshotId(),
commit.commitMetrics().addedRecords(),
commit.commitMetrics().removedRecords()
);
}
}
}
Best Practices
- Implement efficient reporters: Avoid blocking operations in the
report() method
- Use asynchronous publishing: Buffer and batch metrics before sending to external systems
- Handle failures gracefully: Don’t let reporter failures affect table operations
- Filter relevant metrics: Only export metrics you need to reduce overhead
- Add context: Include table name, operation type, and timestamps in all metrics
- Monitor reporter health: Track metrics about the metrics system itself
- Use structured logging: Export metrics in structured formats (JSON) for easier parsing