Skip to main content

Overview

Starting from release 0.5.0, Delta Sharing supports querying Change Data Feed (CDF) from shared tables. This enables you to track row-level changes (inserts, updates, deletes) between table versions.
CDF must be enabled by the data provider on the original Delta table. Once enabled and shared through Delta Sharing, recipients can query the change feed just like they would with a regular Delta table.

Basic CDF Queries

To read change data from a Delta Sharing table, use the readChangeFeed option:
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

val changesDF = spark.read
  .format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("startingVersion", "3")
  .load(tablePath)

changesDF.show()

Version Options

Control which versions to read using startingVersion and endingVersion:

Starting Version

Specify the first version to include in the change feed:
// Read changes from version 5 onwards
val changesDF = spark.read
  .format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("startingVersion", "5")
  .load(tablePath)

Ending Version

Specify the last version to include (inclusive):
// Read changes from version 3 to version 10
val changesDF = spark.read
  .format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("startingVersion", "3")
  .option("endingVersion", "10")
  .load(tablePath)
# Read changes from only version 5
changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "5") \
  .option("endingVersion", "5") \
  .load(table_path)
# Read changes from version 10 to version 20
changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "10") \
  .option("endingVersion", "20") \
  .load(table_path)
# Read changes from version 15 to the latest version
changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "15") \
  .load(table_path)

Change Data Schema

The change data feed includes additional metadata columns beyond the table’s regular columns:
ColumnTypeDescription
_change_typeStringType of change: insert, update_preimage, update_postimage, or delete
_commit_versionLongThe table version where this change occurred
_commit_timestampTimestampWhen the change was committed

Understanding Change Types

# Filter for only inserted rows
inserts = changes_df.filter(changes_df._change_type == "insert")
inserts.show()
Represents newly added rows to the table.

Complete Example

Here’s a comprehensive example analyzing customer data changes:
val tablePath = "/opt/profiles/prod.share#sales.crm.customers"

// Read changes from version 0 to version 5
val changesDF = spark.read
  .format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("startingVersion", "0")
  .option("endingVersion", "5")
  .load(tablePath)

// Analyze change types
println("\nChange type distribution:")
changesDF
  .groupBy("_change_type")
  .count()
  .orderBy(desc("count"))
  .show()

// Find customers who were updated
println("\nUpdated customers:")
changesDF
  .filter($"_change_type".startsWith("update"))
  .select("customer_id", "name", "_change_type", "_commit_version")
  .orderBy("customer_id", "_commit_version")
  .show()

// Track changes by version
println("\nChanges per version:")
changesDF
  .groupBy("_commit_version", "_change_type")
  .count()
  .orderBy("_commit_version")
  .show()

Common Use Cases

Incremental Data Processing

# Process only new and changed records since last run
last_processed_version = 10  # Retrieve from checkpoint/state store

changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", str(last_processed_version + 1)) \
  .load(table_path)

# Process inserts and updates
new_and_updated = changes_df.filter(
  changes_df._change_type.isin(["insert", "update_postimage"])
)

# Apply transformations and write to target
new_and_updated \
  .select("customer_id", "name", "email", "updated_at") \
  .write \
  .mode("append") \
  .parquet("/output/customer_updates")

Change Audit Trail

import org.apache.spark.sql.functions._

// Create an audit log of all changes
val auditLog = changesDF
  .withColumn("audit_timestamp", current_timestamp())
  .select(
    $"customer_id",
    $"_change_type".as("operation"),
    $"_commit_version".as("version"),
    $"_commit_timestamp".as("change_time"),
    $"audit_timestamp",
    struct(col("*")).as("record_data")
  )

auditLog.write
  .mode("append")
  .partitionBy("change_time")
  .parquet("/audit/customer_changes")

Data Synchronization

from pyspark.sql.functions import col, when

# Sync changes to a downstream system
changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "0") \
  .option("endingVersion", "5") \
  .load(table_path)

# Categorize changes for different sync operations
synced_changes = changes_df.withColumn(
  "sync_action",
  when(col("_change_type") == "insert", "INSERT")
  .when(col("_change_type") == "update_postimage", "UPDATE")
  .when(col("_change_type") == "delete", "DELETE")
  .otherwise("SKIP")
)

# Process each action type
for action in ["INSERT", "UPDATE", "DELETE"]:
    action_df = synced_changes.filter(col("sync_action") == action)
    # Send to downstream system
    action_df.write.format("jdbc").save()  # Example

Tracking Field-Level Changes

import org.apache.spark.sql.functions._

// Compare before and after values for updates
val updates = changesDF
  .filter($"_change_type".startsWith("update"))

val beforeUpdates = updates
  .filter($"_change_type" === "update_preimage")
  .withColumnRenamed("email", "old_email")
  .withColumnRenamed("status", "old_status")
  .select($"customer_id", $"_commit_version", $"old_email", $"old_status")

val afterUpdates = updates
  .filter($"_change_type" === "update_postimage")
  .withColumnRenamed("email", "new_email")
  .withColumnRenamed("status", "new_status")
  .select($"customer_id", $"_commit_version", $"new_email", $"new_status")

// Join to see what changed
val fieldChanges = beforeUpdates
  .join(afterUpdates, Seq("customer_id", "_commit_version"))
  .filter(
    $"old_email" =!= $"new_email" || $"old_status" =!= $"new_status"
  )

fieldChanges.show()

Filtering Change Data

You can apply filters to focus on specific changes:
# Get only deleted premium customers
changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "0") \
  .load(table_path)

deleted_premium = changes_df.filter(
  (changes_df._change_type == "delete") & 
  (changes_df.customer_tier == "premium")
)

deleted_premium.show()

Performance Considerations

// Good: Query only needed versions
val changesDF = spark.read
  .format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("startingVersion", "10")
  .option("endingVersion", "15")
  .load(tablePath)
Avoid reading the entire change history when you only need recent changes.
// Apply change type filters early
val insertsOnly = changesDF
  .filter($"_change_type" === "insert")
  .select("customer_id", "name", "_commit_version")
Filter for specific change types before other operations to reduce data volume.
# Process changes in batches
for version in range(start_version, end_version + 1, batch_size):
    batch_start = version
    batch_end = min(version + batch_size - 1, end_version)
    
    changes = spark.read \
      .format("deltaSharing") \
      .option("readChangeFeed", "true") \
      .option("startingVersion", str(batch_start)) \
      .option("endingVersion", str(batch_end)) \
      .load(table_path)
    
    # Process batch
    process_changes(changes)

Handling Update Pairs

When processing updates, you often need to handle preimage/postimage pairs:
from pyspark.sql.functions import col, lead
from pyspark.sql.window import Window

# Get only the final state after updates
changes_df = spark.read \
  .format("deltaSharing") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "0") \
  .load(table_path)

# Keep inserts, update_postimage (final state), and deletes
final_state = changes_df.filter(
  col("_change_type").isin(["insert", "update_postimage", "delete"])
)

final_state.show()

Error Handling

try {
  val changesDF = spark.read
    .format("deltaSharing")
    .option("readChangeFeed", "true")
    .option("startingVersion", "3")
    .option("endingVersion", "10")
    .load(tablePath)
  
  changesDF.show()
} catch {
  case e: Exception if e.getMessage.contains("Change data feed") =>
    println("CDF is not enabled for this table. Contact the data provider.")
  case e: Exception if e.getMessage.contains("version") =>
    println("Invalid version specified. Check available versions.")
  case e: Exception =>
    println(s"Error reading change feed: ${e.getMessage}")
}
Common Issues:
  • CDF not enabled: The data provider must enable CDF on the source table
  • Invalid versions: Ensure the specified versions exist in the table history
  • Version gaps: Some versions may not have changes if no operations occurred

Next Steps

Streaming

Stream live changes from shared tables

SQL Usage

Query shared tables with SQL

Build docs developers (and LLMs) love