Skip to main content
The ETL (Extract, Transform, Load) procedures orchestrate data movement through the Bronze, Silver, and Gold layers. These stored procedures handle error management, logging, and ensure data consistency across the warehouse.

Overview

The data warehouse pipeline uses two main stored procedures:

bronze.load_bronze()

Extracts raw data from CSV files into Bronze tables

silver.load_silver()

Transforms and cleanses Bronze data into Silver tables
The Gold layer uses views rather than procedures, automatically reflecting the latest Silver data when queried.

bronze.load_bronze()

Loads raw data from CSV files into Bronze layer tables using PostgreSQL’s COPY command.

Procedure Signature

CREATE OR REPLACE PROCEDURE bronze.load_bronze()
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
-- procedure body
$$;
None - The procedure takes no parameters.All file paths and table names are hardcoded for consistency and security.
The procedure uses SECURITY DEFINER which means:
The procedure executes with the privileges of the user who created it, not the user who calls it. This allows controlled access to COPY operations without granting file system permissions to all users.
Returns void. Status is communicated through RAISE NOTICE messages and exceptions.

Execution

Execute the procedure using the CALL statement:
CALL bronze.load_bronze();

Data Loading Process

The procedure loads six tables in sequence:
1

CRM Customer Info

TRUNCATE TABLE bronze.crm_cust_info;
COPY bronze.crm_cust_info
  FROM '/datasets/source_crm/cust_info.csv'
  WITH (FORMAT csv, HEADER true);
2

CRM Product Info

TRUNCATE TABLE bronze.crm_prd_info;
COPY bronze.crm_prd_info
  FROM '/datasets/source_crm/prd_info.csv'
  WITH (FORMAT csv, HEADER true);
3

CRM Sales Details

TRUNCATE TABLE bronze.crm_sales_details;
COPY bronze.crm_sales_details
  FROM '/datasets/source_crm/sales_details.csv'
  WITH (FORMAT csv, HEADER true);
4

ERP Customer Demographics

TRUNCATE TABLE bronze.erp_cust_az12;
COPY bronze.erp_cust_az12
  FROM '/datasets/source_erp/CUST_AZ12.csv'
  WITH (FORMAT csv, HEADER true);
5

ERP Location Data

TRUNCATE TABLE bronze.erp_loc_a101;
COPY bronze.erp_loc_a101
  FROM '/datasets/source_erp/LOC_A101.csv'    
  WITH (FORMAT csv, HEADER true);
6

ERP Product Categories

TRUNCATE TABLE bronze.erp_px_cat_g1v2;
COPY bronze.erp_px_cat_g1v2
  FROM '/datasets/source_erp/PX_CAT_G1V2.csv'    
  WITH (FORMAT csv, HEADER true);

Output Example

NOTICE:  [14:23:45] load bronze started
NOTICE:  Step [load bronze.crm_cust_info] started at 14:23:45
NOTICE:  Step [load bronze.crm_cust_info] completed successfully (2 s)
NOTICE:  Step [load bronze.crm_prd_info] completed successfully (1 s)
NOTICE:  Step [load bronze.crm_sales_details] completed successfully (5 s)
NOTICE:  Step [load bronze.erp_cust_az12] completed successfully (1 s)
NOTICE:  Step [load bronze.erp_loc_a101] completed successfully (0 s)
NOTICE:  Step [load bronze.erp_px_cat_g1v2] completed successfully (1 s)
NOTICE:  [14:23:55] load bronze finished
NOTICE:  load bronze finished in [10] seconds

silver.load_silver()

Transforms and cleanses data from Bronze tables into Silver tables.

Procedure Signature

CREATE OR REPLACE PROCEDURE silver.load_silver()
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
-- procedure body
$$;

Execution

CALL silver.load_silver();

Transformation Process

The procedure processes six tables with data quality transformations:
Transformations:
  • Deduplication using ROW_NUMBER window function
  • Gender code expansion (M → Male, F → Female)
  • Marital status expansion (S → Single, M → Married)
  • String trimming
INSERT INTO silver.crm_cust_info (...)
SELECT
  cst_id, 
  trim(cst_key), 
  trim(cst_firstname),
  CASE upper(trim(cst_gndr)) 
    WHEN 'M' THEN 'Male' 
    WHEN 'F' THEN 'Female' 
    ELSE 'n/a'
  END as cst_gndr,
  ...
FROM (
  SELECT *, 
    ROW_NUMBER() OVER (
      PARTITION BY cst_id 
      ORDER BY cst_create_date DESC
    ) AS flag_last
  FROM bronze.crm_cust_info
)
WHERE flag_last = 1;

Output Example

NOTICE:  [14:24:00] load silver started
NOTICE:  Step [load silver.crm_cust_info] completed successfully (3 s)
NOTICE:  Step [load silver.crm_prd_info] completed successfully (2 s)
NOTICE:  Step [load silver.crm_sales_details] completed successfully (8 s)
NOTICE:  Step [load silver.erp_cust_az12] completed successfully (1 s)
NOTICE:  Step [load silver.erp_loc_a101] completed successfully (1 s)
NOTICE:  Step [load silver.erp_px_cat_g1v2] completed successfully (0 s)
NOTICE:  load silver finished in [15] seconds

Error Handling

Both procedures implement comprehensive error handling:
BEGIN
  DECLARE
    v_start_date timestamp := clock_timestamp();
  BEGIN
    v_step := 'load bronze.crm_cust_info';
    
    -- Load operation
    TRUNCATE TABLE bronze.crm_cust_info;
    COPY bronze.crm_cust_info FROM '...';
    
    v_ok := v_ok + 1;
    RAISE NOTICE 'Step [%] completed (%.0f s)',
      v_step,
      extract(epoch FROM (clock_timestamp() - v_start_date));
      
  EXCEPTION WHEN OTHERS THEN
    v_err := v_err + 1;
    v_msg := 'Error in step: ' || v_step || ' - ' || SQLERRM;
    RAISE EXCEPTION '%', v_msg;
  END;
END;
Each procedure declares error tracking variables:
  • v_step: Current step being executed
  • v_ok: Count of successful steps
  • v_err: Count of failed steps
  • v_msg: Error message text
When an error occurs:
  1. The EXCEPTION WHEN OTHERS block catches all errors
  2. Error counter is incremented
  3. Error message is constructed with step name and SQL error
  4. Exception is re-raised to halt execution
  5. Transaction is rolled back automatically
If any step fails, the entire procedure fails and all changes are rolled back. This ensures data consistency.
Error: could not open file for reading: No such file or directory
Solution: Verify file path and file existence

Logging and Monitoring

Procedures provide detailed execution logging:

Timing Information

RAISE NOTICE '[%] load bronze started', 
  to_char(clock_timestamp(), 'HH24:MI:SS');

Variables Used

DECLARE
  v_step text;                                  -- Current step name
  v_start_batch timestamp := clock_timestamp(); -- Batch start time
  v_end_batch timestamp;                        -- Batch end time
  v_ok int := 0;                               -- Success counter
  v_err int := 0;                              -- Error counter
  v_msg text;                                  -- Error message
Monitor PostgreSQL logs or client output to track procedure execution and identify performance bottlenecks.

Permissions

Both procedures grant execution permissions to all users:
GRANT EXECUTE ON PROCEDURE bronze.load_bronze() TO PUBLIC;
GRANT USAGE ON SCHEMA bronze TO PUBLIC;

GRANT EXECUTE ON PROCEDURE silver.load_silver() TO PUBLIC;
GRANT USAGE ON SCHEMA silver TO PUBLIC;
GRANT SELECT ON ALL TABLES IN SCHEMA silver TO PUBLIC;
While procedures are public, SECURITY DEFINER ensures they run with creator privileges. Review these permissions based on your security requirements.

Complete Pipeline Execution

To refresh the entire data warehouse:
1

Load Bronze Layer

Extract raw data from source files:
CALL bronze.load_bronze();
Duration: ~10 seconds
2

Load Silver Layer

Transform and cleanse data:
CALL silver.load_silver();
Duration: ~15 seconds
3

Query Gold Layer

Analytics views automatically reflect updated data:
SELECT 
    p.category,
    SUM(f.sales_amount) as total_sales
FROM gold.fact_sales f
INNER JOIN gold.dim_product p 
    ON f.product_key = p.product_key
GROUP BY p.category;

Automation Script

-- Complete pipeline execution
DO $$
BEGIN
  RAISE NOTICE 'Starting data warehouse refresh...';
  
  -- Bronze layer
  CALL bronze.load_bronze();
  RAISE NOTICE 'Bronze layer loaded successfully';
  
  -- Silver layer
  CALL silver.load_silver();
  RAISE NOTICE 'Silver layer loaded successfully';
  
  RAISE NOTICE 'Data warehouse refresh complete';
END $$;

Best Practices

Sequential Execution

Always run procedures in order: Bronze → Silver → Gold queries

Monitor Logs

Review NOTICE messages to track execution time and identify slow steps

Test on Subset

For development, test with smaller CSV files first

Schedule Wisely

Run during low-usage periods to minimize user impact

Backup Before Load

Consider backing up Silver/Bronze before major refreshes

Validate Results

Query record counts after each load to verify data completeness

Troubleshooting

Symptoms: Procedure doesn’t complete or respondPossible Causes:
  • Large file size
  • Disk I/O bottleneck
  • Lock contention
Solutions:
  • Check PostgreSQL locks: SELECT * FROM pg_locks;
  • Monitor disk usage and I/O
  • Increase statement timeout if needed
Symptoms: Some tables loaded, others emptyCause: Procedure failed mid-executionSolution:
  • Check error messages in logs
  • Verify all source files are present
  • Re-run the procedure (TRUNCATE ensures clean reload)
Symptoms: Procedures taking longer than usualInvestigation:
-- Check table sizes
SELECT schemaname, tablename, 
       pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename))
FROM pg_tables
WHERE schemaname IN ('bronze', 'silver')
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
Solutions:
  • Vacuum and analyze tables
  • Check for missing indexes
  • Review CSV file sizes

Bronze Layer

Raw data ingestion details

Silver Layer

Transformation logic reference

Gold Layer

Analytics model documentation

Build docs developers (and LLMs) love