Skip to main content

IOTA Data Ingestion

The IOTA Data Ingestion service processes and stores blockchain checkpoint data in various formats and destinations.

Overview

The data ingestion service provides:
  • Archival: Long-term checkpoint storage with manifests
  • Blob Storage: Checkpoint data uploaded to object stores
  • KV Store: Key-value store for checkpoint data (BigTable, DynamoDB, etc.)
  • Historical Processing: Backfill and historical data processing
  • Progress Tracking: Persistent progress store for reliability

Architecture

The service consists of:
  1. IndexerExecutor: Main coordinator managing workers and progress
  2. WorkerPool: Parallel workers for data processing
  3. Reducers: Transform and reduce checkpoint data
  4. Progress Store: Tracks processing watermarks
  5. Remote Store: Source for checkpoint data

Task Types

The ingestion service supports multiple task types configured via YAML:

Archival Task

Stores checkpoint data with manifests for long-term archival:
task: archival
name: "archival-worker"
concurrency: 5
remote-url: "s3://bucket/path"
remote-store-options:
  - ["region", "us-west-2"]
commit-file-size: 1000
commit-duration-seconds: 300
Configuration:
  • remote-url: Object store URL for archived data
  • remote-store-options: Key-value pairs for store configuration
  • commit-file-size: Number of checkpoints per archive file
  • commit-duration-seconds: Maximum duration before committing

Blob Task

Uploads checkpoint data to blob storage:
task: blob
name: "blob-worker"
concurrency: 10
object-store-config:
  object-store: "S3"
  bucket: "checkpoint-data"
  aws-region: "us-west-2"
checkpoint-chunk-size-mb: 10
node-rest-api-url: "http://fullnode:9000/api/v1"
Configuration:
  • object-store-config: Object store type and credentials
  • checkpoint-chunk-size-mb: Size of checkpoint chunks (minimum 5 MB)
  • node-rest-api-url: Fullnode REST API for checkpoint data
Supported Object Stores:
  • Amazon S3
  • Google Cloud Storage
  • Azure Blob Storage
  • Local filesystem

BigTable KV Task

Stores checkpoint data in Google Cloud BigTable:
task: big-table-kv
name: "bigtable-worker"
concurrency: 20
instance-id: "iota-checkpoints"
column-family: "cf1"
timeout-secs: 30
emulator-host: "localhost:8086"  # Optional, for local testing
Configuration:
  • instance-id: BigTable instance identifier
  • column-family: Column family name
  • timeout-secs: Operation timeout
  • emulator-host: Optional emulator for local development

KV Store Task

Generic key-value store for checkpoint data:
task: kv
name: "kv-worker"
concurrency: 15
# KV-specific configuration here

Historical Task

Processes historical checkpoint data:
task: historical
name: "historical-worker"
concurrency: 5
# Historical processing configuration

Configuration File

Create a YAML configuration file to define ingestion tasks:
# Checkpoint data source
path: "/path/to/checkpoint/files"
remote-store-url: "http://fullnode:9000/api/v1"
remote-store-options:
  - ["timeout", "30"]
remote-read-batch-size: 100

# Progress tracking
progress-store-path: "/var/lib/iota/ingestion-progress"

# Metrics
metrics-host: "127.0.0.1"
metrics-port: 8081

# Tasks to run
tasks:
  - task: blob
    name: "blob-storage"
    concurrency: 10
    object-store-config:
      object-store: "S3"
      bucket: "iota-checkpoints"
      aws-region: "us-west-2"
    checkpoint-chunk-size-mb: 10
    node-rest-api-url: "http://fullnode:9000/api/v1"

  - task: archival
    name: "archival-storage"
    concurrency: 5
    remote-url: "s3://iota-archive/checkpoints"
    remote-store-options:
      - ["region", "us-west-2"]
    commit-file-size: 1000
    commit-duration-seconds: 300

Running the Service

Command-line Usage

cargo run --bin iota-data-ingestion -- <config.yaml>
Example:
cargo run --bin iota-data-ingestion -- /etc/iota/ingestion-config.yaml

Configuration Parameters

Global Settings

  • path: Local directory containing checkpoint files
  • remote-store-url: URL of remote checkpoint source (fullnode REST API)
  • remote-store-options: Connection options for remote store
  • remote-read-batch-size: Checkpoints to read per batch (default: 100)
  • progress-store-path: Directory for progress tracking
  • metrics-host: Prometheus metrics host (default: 127.0.0.1)
  • metrics-port: Prometheus metrics port (default: 8081)

Per-Task Settings

  • name: Unique task identifier
  • concurrency: Number of parallel workers
  • task: Task type (archival, blob, big-table-kv, kv, historical)

Progress Tracking

The service maintains a persistent progress store:

Purpose

  • Tracks processing watermark for each task
  • Enables resumption after restart
  • Prevents duplicate processing

Location

Specified by progress-store-path in configuration:
progress-store-path: "/var/lib/iota/ingestion-progress"

Behavior

  1. Each task maintains its own watermark
  2. Watermark updated after successful processing
  3. On restart, processing resumes from last watermark
  4. Epoch transitions handled automatically

Checkpoint Sources

The service can read checkpoints from multiple sources:

Local Files

Read from local .chk files:
path: "/data/checkpoints"

Remote Store (Fullnode REST API)

Fetch checkpoints from fullnode:
remote-store-url: "http://fullnode:9000/api/v1"
remote-store-options:
  - ["timeout", "30"]
  - ["max-retries", "5"]

Object Store

Read from S3, GCS, or Azure:
remote-store-url: "s3://bucket/checkpoints/"
remote-store-options:
  - ["region", "us-west-2"]
  - ["aws-access-key-id", "..."]
  - ["aws-secret-access-key", "..."]

Monitoring

Prometheus metrics exposed on configured port (default: 8081):

Key Metrics

  • ingestion_checkpoint_processed: Checkpoints processed by task
  • ingestion_checkpoint_errors: Processing errors by task
  • ingestion_watermark: Current watermark for each task
  • ingestion_worker_active: Active workers per task
  • ingestion_batch_duration: Processing time per batch

Access Metrics

curl http://localhost:8081/metrics

Blob Worker Details

Chunking Strategy

Checkpoints are grouped into chunks:
  • Minimum chunk size: 5 MB
  • Configurable size: checkpoint-chunk-size-mb
  • Multipart upload: For large chunks
  • Concurrent parts: Up to 50 parts uploaded in parallel

File Naming

Checkpoints stored as:
live/{checkpoint_number}.chk
ingestion/{checkpoint_number}.chk

Epoch Transitions

On epoch change:
  1. Detects transition via REST API
  2. Updates watermark to new epoch’s first checkpoint
  3. Resets remote store for previous epoch range
  4. Continues processing from new epoch

Archival Worker Details

Manifest Management

The archival worker maintains a MANIFEST file:
  • Lists all archived checkpoint files
  • Tracks checkpoint ranges per file
  • Updated after each commit
  • Enables efficient checkpoint discovery

File Format

Two files per range:
  1. Checkpoint file (.chk): Checkpoint contents
    • Magic bytes: CHECKPOINT_FILE_MAGIC
    • Compressed checkpoint data
  2. Summary file (.sum): Checkpoint summaries
    • Magic bytes: SUMMARY_FILE_MAGIC
    • Compressed summary data

Directory Structure

epoch_0/
  0.chk
  0.sum
  1000.chk
  1000.sum
epoch_1/
  0.chk
  0.sum
MANIFEST

Historical Worker Details

Processes historical data with a reducer:

Use Cases

  • Backfilling missing data
  • Reprocessing with new logic
  • Data migration
  • Analytics pipeline

Signal Handling

The service handles graceful shutdown:

Supported Signals

  • SIGTERM: Graceful shutdown
  • SIGINT (Ctrl+C): Graceful shutdown

Shutdown Behavior

  1. Stop accepting new checkpoints
  2. Complete in-flight processing
  3. Update progress store
  4. Exit cleanly

Environment Variables

BigTable Configuration

  • BIGTABLE_EMULATOR_HOST: BigTable emulator address (for testing)

AWS Configuration

  • AWS_REGION: AWS region
  • AWS_ACCESS_KEY_ID: AWS access key
  • AWS_SECRET_ACCESS_KEY: AWS secret key
  • AWS_SESSION_TOKEN: AWS session token (optional)

GCP Configuration

  • GOOGLE_APPLICATION_CREDENTIALS: Path to service account JSON

Error Handling

Retry Logic

Built-in retry for transient failures:
  • Network timeouts
  • Rate limiting
  • Temporary unavailability

Fatal Errors

Service exits on:
  • Invalid configuration
  • Missing credentials
  • Unrecoverable storage errors

Recovery

On restart:
  1. Reads progress store
  2. Resumes from last watermark
  3. Reprocesses failed checkpoints (if any)

Best Practices

  1. Concurrency: Tune based on available resources
  2. Progress Store: Use reliable storage (not tmpfs)
  3. Monitoring: Set up alerts on metrics
  4. Credentials: Use secure credential management
  5. Testing: Test with BigTable emulator for development
  6. Backups: Backup progress store periodically
  7. Chunk Size: Balance between efficiency and memory usage

Example Configurations

S3 Archival

path: "/data/checkpoints"
remote-store-url: "http://fullnode:9000/api/v1"
progress-store-path: "/var/lib/iota/progress"
metrics-port: 8081

tasks:
  - task: archival
    name: "s3-archival"
    concurrency: 5
    remote-url: "s3://my-bucket/iota/archive"
    remote-store-options:
      - ["region", "us-west-2"]
    commit-file-size: 1000
    commit-duration-seconds: 300

GCS Blob Storage

tasks:
  - task: blob
    name: "gcs-blob"
    concurrency: 10
    object-store-config:
      object-store: "GCS"
      bucket: "iota-checkpoints"
    checkpoint-chunk-size-mb: 10
    node-rest-api-url: "http://fullnode:9000/api/v1"

Multi-Task Pipeline

path: "/data/checkpoints"
remote-store-url: "http://fullnode:9000/api/v1"
progress-store-path: "/var/lib/iota/progress"

tasks:
  # Store in S3
  - task: blob
    name: "s3-storage"
    concurrency: 10
    object-store-config:
      object-store: "S3"
      bucket: "checkpoints"
      aws-region: "us-west-2"
    checkpoint-chunk-size-mb: 10
    node-rest-api-url: "http://fullnode:9000/api/v1"

  # Archive for long-term storage
  - task: archival
    name: "glacier-archive"
    concurrency: 5
    remote-url: "s3://archive-bucket/iota"
    remote-store-options:
      - ["region", "us-west-2"]
      - ["storage-class", "GLACIER"]
    commit-file-size: 10000
    commit-duration-seconds: 3600

  # Index in BigTable for fast queries
  - task: big-table-kv
    name: "bigtable-index"
    concurrency: 20
    instance-id: "iota-prod"
    column-family: "checkpoints"
    timeout-secs: 30

Build docs developers (and LLMs) love