Overview
Dell ECS (Enterprise Cloud Storage) is an on-premises object storage platform that provides S3-compatible APIs. Iceberg supports Dell ECS through the dedicated ECS catalog, available since version 0.15.0.
Prerequisites
Dell ECS installation with S3 API enabled
ECS access credentials (username and secret key)
Network access to ECS endpoint
Configuration Parameters
Parameter Required Description ecs.s3.endpointYes ECS S3 service endpoint (e.g., http://10.1.2.3:9020) ecs.s3.access-key-idYes ECS username ecs.s3.secret-access-keyYes ECS S3 secret key warehouseYes Location for data and metadata
The warehouse property supports these formats:
Format Description Example ecs://bucket-nameUse entire bucket ecs://analyticsecs://bucket-name/Use entire bucket (trailing slash ignored) ecs://analytics/ecs://bucket-name/prefixUse specific namespace prefix ecs://analytics/warehouse
Spark Integration
Starting Spark SQL Shell
ICEBERG_VERSION = 1.4.2
SPARK_VERSION = 3.5_2.12
ECS_CLIENT_VERSION = 3.3.2
DEPENDENCIES = "org.apache.iceberg:iceberg-spark-runtime-${ SPARK_VERSION }:${ ICEBERG_VERSION }, \
org.apache.iceberg:iceberg-dell:${ ICEBERG_VERSION }, \
com.emc.ecs:object-client-bundle:${ ECS_CLIENT_VERSION }"
spark-sql --packages ${ DEPENDENCIES } \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.ecs_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.ecs_catalog.warehouse=ecs://my-bucket/warehouse \
--conf spark.sql.catalog.ecs_catalog.catalog-impl=org.apache.iceberg.dell.ecs.EcsCatalog \
--conf spark.sql.catalog.ecs_catalog.ecs.s3.endpoint=http://10.1.2.3:9020 \
--conf spark.sql.catalog.ecs_catalog.ecs.s3.access-key-id= < ecs-usernam e > \
--conf spark.sql.catalog.ecs_catalog.ecs.s3.secret-access-key= < ecs-secret-ke y >
Using the Catalog
-- Show available namespaces
SHOW NAMESPACES IN ecs_catalog;
-- Show tables in a namespace
SHOW TABLES IN ecs_catalog . my_database ;
-- Create a namespace
CREATE NAMESPACE ecs_catalog . analytics ;
-- Create a table
CREATE TABLE ecs_catalog . analytics . events (
event_id bigint ,
event_type string,
user_id string,
timestamp timestamp ,
properties map < string, string >
)
USING iceberg
PARTITIONED BY ( days ( timestamp ));
-- Insert data
INSERT INTO ecs_catalog . analytics .events VALUES
( 1 , 'page_view' , 'user123' , current_timestamp (), map( 'page' , '/home' )),
( 2 , 'click' , 'user456' , current_timestamp (), map( 'button' , 'signup' ));
-- Query data
SELECT * FROM ecs_catalog . analytics .events
WHERE event_type = 'click' ;
PySpark Example
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName( "Iceberg with Dell ECS" ) \
.config( "spark.sql.extensions" , "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" ) \
.config( "spark.sql.catalog.ecs_catalog" , "org.apache.iceberg.spark.SparkCatalog" ) \
.config( "spark.sql.catalog.ecs_catalog.catalog-impl" , "org.apache.iceberg.dell.ecs.EcsCatalog" ) \
.config( "spark.sql.catalog.ecs_catalog.warehouse" , "ecs://my-bucket/warehouse" ) \
.config( "spark.sql.catalog.ecs_catalog.ecs.s3.endpoint" , "http://10.1.2.3:9020" ) \
.config( "spark.sql.catalog.ecs_catalog.ecs.s3.access-key-id" , "ecs-user" ) \
.config( "spark.sql.catalog.ecs_catalog.ecs.s3.secret-access-key" , "ecs-secret" ) \
.getOrCreate()
# Create a DataFrame
data = [
( 1 , "product_view" , 1000 ),
( 2 , "add_to_cart" , 1001 ),
( 3 , "purchase" , 1002 )
]
df = spark.createDataFrame(data, [ "event_id" , "event_type" , "user_id" ])
# Write to Iceberg table
df.writeTo( "ecs_catalog.analytics.user_events" ).create()
# Read from Iceberg table
result = spark.table( "ecs_catalog.analytics.user_events" )
result.show()
Flink Integration
Setting Up Flink Environment
# Set Hadoop classpath
export HADOOP_CLASSPATH = ` $HADOOP_HOME /bin/hadoop classpath`
# Download dependencies
MAVEN_URL = https://repo1.maven.org/maven2
ICEBERG_VERSION = 1.4.2
FLINK_VERSION = 1.17
ECS_CLIENT_VERSION = 3.3.2
wget ${ MAVEN_URL } /org/apache/iceberg/iceberg-flink-runtime- ${ FLINK_VERSION } / ${ ICEBERG_VERSION } /iceberg-flink-runtime- ${ FLINK_VERSION } - ${ ICEBERG_VERSION } .jar
wget ${ MAVEN_URL } /org/apache/iceberg/iceberg-dell/ ${ ICEBERG_VERSION } /iceberg-dell- ${ ICEBERG_VERSION } .jar
wget ${ MAVEN_URL } /com/emc/ecs/object-client-bundle/ ${ ECS_CLIENT_VERSION } /object-client-bundle- ${ ECS_CLIENT_VERSION } .jar
# Start SQL client
/path/to/flink/bin/sql-client.sh embedded \
-j iceberg-flink-runtime- ${ FLINK_VERSION } - ${ ICEBERG_VERSION } .jar \
-j iceberg-dell- ${ ICEBERG_VERSION } .jar \
-j object-client-bundle- ${ ECS_CLIENT_VERSION } .jar \
shell
Creating ECS Catalog in Flink
CREATE CATALOG ecs_catalog WITH (
'type' = 'iceberg' ,
'warehouse' = 'ecs://my-bucket/warehouse' ,
'catalog-impl' = 'org.apache.iceberg.dell.ecs.EcsCatalog' ,
'ecs.s3.endpoint' = 'http://10.1.2.3:9020' ,
'ecs.s3.access-key-id' = '<ecs-username>' ,
'ecs.s3.secret-access-key' = '<ecs-secret-key>'
);
USE CATALOG ecs_catalog;
SHOW DATABASES;
SHOW TABLES;
Flink Streaming Example
-- Create a table
CREATE TABLE ecs_catalog . default_db . sensor_data (
sensor_id STRING,
temperature DOUBLE,
humidity DOUBLE,
timestamp_col TIMESTAMP ( 3 ),
PRIMARY KEY (sensor_id, timestamp_col) NOT ENFORCED
) WITH (
'format-version' = '2' ,
'write.upsert.enabled' = 'true'
);
-- Stream data into the table
INSERT INTO ecs_catalog . default_db .sensor_data
SELECT
sensor_id,
temperature,
humidity,
event_time
FROM sensor_stream;
-- Query the table
SELECT * FROM ecs_catalog . default_db .sensor_data
WHERE temperature > 25 . 0 ;
Java API
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.Table;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.CatalogUtil;
import java.util.HashMap;
import java.util.Map;
// Configure ECS catalog
Map < String , String > properties = new HashMap <>();
properties . put ( "warehouse" , "ecs://my-bucket/warehouse" );
properties . put ( "ecs.s3.endpoint" , "http://10.1.2.3:9020" );
properties . put ( "ecs.s3.access-key-id" , "ecs-user" );
properties . put ( "ecs.s3.secret-access-key" , "ecs-secret" );
// Create catalog instance
Catalog catalog = CatalogUtil . loadCatalog (
"org.apache.iceberg.dell.ecs.EcsCatalog" ,
"ecs_catalog" ,
properties,
new org. apache . hadoop . conf . Configuration ()
);
// Define schema
Schema schema = new Schema (
Types . NestedField . required ( 1 , "id" , Types . LongType . get ()),
Types . NestedField . required ( 2 , "data" , Types . StringType . get ()),
Types . NestedField . required ( 3 , "timestamp" , Types . TimestampType . withoutZone ())
);
// Create table
TableIdentifier tableId = TableIdentifier . of ( "my_database" , "my_table" );
Table table = catalog . createTable (
tableId,
schema,
PartitionSpec . builderFor (schema). day ( "timestamp" ). build ()
);
System . out . println ( "Table created: " + table . location ());
Important Limitations
Be aware of these limitations when using ECS catalog:
1. RENAME Operations
No transaction protection : RENAME statements are supported but without additional safeguards
Require manual coordination : Ensure all commits are finished before renaming a table
No data movement : RENAME only updates metadata; data files remain in original location
Potential path mismatch : Renamed tables may have data outside configured warehouse path
-- Ensure no active writes before renaming
ALTER TABLE ecs_catalog . db .old_name RENAME TO ecs_catalog . db .new_name;
2. CAS (Compare-and-Swap) Operations
Checksum-based : ECS uses object checksums for atomic commits
Small collision probability : Very low but non-zero chance of checksum conflicts
Retry logic recommended : Implement retries for critical operations
3. Concurrent Access
Limited coordination : Less robust than cloud-native catalogs (Glue, DynamoDB)
Best for single-writer : Optimal when one writer per table
Careful with multi-writer : Test thoroughly if multiple writers are required
Best Practices
Organize data with namespace prefixes in warehouse path: ecs://analytics/prod/
ecs://analytics/dev/
ecs://analytics/staging/
int maxRetries = 3 ;
for ( int i = 0 ; i < maxRetries; i ++ ) {
try {
table . newAppend ()
. appendFile (dataFile)
. commit ();
break ;
} catch ( CommitFailedException e ) {
if (i == maxRetries - 1 ) throw e;
Thread . sleep ( 1000 * (i + 1 ));
}
}
Coordinate RENAME operations:
Use external locking mechanism
Schedule during maintenance windows
Document rename procedures
Security Configuration
SSL/TLS
spark-sql \
--conf spark.sql.catalog.ecs_catalog.ecs.s3.endpoint=https://ecs.example.com:9021 \
--conf spark.sql.catalog.ecs_catalog.ecs.s3.protocol=https
Custom Trust Store
spark-sql \
--conf spark.hadoop.fs.s3a.ssl.channel.mode=openssl \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled= true \
--driver-java-options "-Djavax.net.ssl.trustStore=/path/to/truststore.jks -Djavax.net.ssl.trustStorePassword=changeit"
Troubleshooting
Connection Issues
Unable to connect to ECS endpoint
Check:
ECS endpoint is reachable: curl http://10.1.2.3:9020
Firewall rules allow access
Correct port number (default: 9020 HTTP, 9021 HTTPS)
ECS service is running
Authentication Errors
Access Denied or Invalid credentials
Verify:
Username (access-key-id) is correct
Secret key is correct and not expired
User has S3 permissions in ECS
Bucket exists and user has access
Slow read/write operations
Investigate:
ECS cluster health and load
Network latency between Spark/Flink and ECS
ECS configuration (retention, replication)
Partition strategy (too many small files)
CommitFailedException: Commit failed, please retry
Solutions:
Implement retry logic with exponential backoff
Reduce concurrent writers
Check for checksum collisions in ECS logs
Version Compatibility
Iceberg Version ECS Client Version Spark Version Flink Version 1.4.x 3.3.2 3.5.x 1.17.x 1.3.x 3.3.1 3.4.x 1.16.x 1.0.x - 1.2.x 3.3.0 3.2.x - 3.3.x 1.14.x - 1.15.x
Migration Path
Migrating from other storage to ECS:
-- Export from S3-based catalog
CREATE TABLE ecs_catalog . db . migrated_table
USING iceberg
AS SELECT * FROM s3_catalog . db .source_table;
-- Or copy with metadata preservation
CALL ecs_catalog . system .snapshot_table(
source_table => 's3://source-bucket/warehouse/db/table' ,
table => 'db.migrated_table'
);
Next Steps
AWS S3 Storage Configure cloud storage with S3
Custom FileIO Implement custom storage backends