Comprehensive guide to stored procedures that orchestrate the data warehouse pipeline
The ETL (Extract, Transform, Load) procedures orchestrate data movement through the Bronze, Silver, and Gold layers. These stored procedures handle error management, logging, and ensure data consistency across the warehouse.
CREATE OR REPLACE PROCEDURE bronze.load_bronze()LANGUAGE plpgsqlSECURITY DEFINERAS $$-- procedure body$$;
Parameters
None - The procedure takes no parameters.All file paths and table names are hardcoded for consistency and security.
Security
The procedure uses SECURITY DEFINER which means:
The procedure executes with the privileges of the user who created it, not the user who calls it. This allows controlled access to COPY operations without granting file system permissions to all users.
Return Value
Returns void. Status is communicated through RAISE NOTICE messages and exceptions.
The procedure processes six tables with data quality transformations:
Customer Info
Product Info
Sales Details
ERP Tables
Transformations:
Deduplication using ROW_NUMBER window function
Gender code expansion (M → Male, F → Female)
Marital status expansion (S → Single, M → Married)
String trimming
INSERT INTO silver.crm_cust_info (...)SELECT cst_id, trim(cst_key), trim(cst_firstname), CASE upper(trim(cst_gndr)) WHEN 'M' THEN 'Male' WHEN 'F' THEN 'Female' ELSE 'n/a' END as cst_gndr, ...FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY cst_id ORDER BY cst_create_date DESC ) AS flag_last FROM bronze.crm_cust_info)WHERE flag_last = 1;
Transformations:
Category ID extraction from product key
Product line code expansion
NULL cost handling (default to 0)
End date calculation using LEAD window function
INSERT INTO silver.crm_prd_info(...)SELECT prd_id, replace(substring(trim(prd_key), 1, 5), '-','_') as cat_id, substring(prd_key, 7, length(prd_key)) as prd_key, prd_name, coalesce(prd_cost, 0) as prd_cost, CASE upper(trim(prd_line)) WHEN 'M' THEN 'Mountain' WHEN 'R' THEN 'Road' ... END as prd_line, lead(prd_start_dt) OVER ( PARTITION BY prd_key ORDER BY prd_start_dt ) - 1 as prd_end_dtFROM bronze.crm_prd_info;
Transformations:
Integer to DATE conversion (YYYYMMDD → DATE)
Date validation (NULL for invalid dates)
Sales amount validation against quantity × price
Price correction (handle negatives and NULLs)
SELECT sls_ord_num, CASE WHEN sls_ord_dt IS NULL OR sls_ord_dt = 0 OR length(sls_ord_dt::text) != 8 THEN NULL ELSE to_date(TRIM(sls_ord_dt::text), 'YYYYMMDD') END AS sls_ord_dt, CASE WHEN sls_sales != sls_quantity * abs(sls_price) THEN sls_quantity * abs(sls_price) ELSE sls_sales END as sls_sales, ...FROM bronze.crm_sales_details;
Both procedures grant execution permissions to all users:
GRANT EXECUTE ON PROCEDURE bronze.load_bronze() TO PUBLIC;GRANT USAGE ON SCHEMA bronze TO PUBLIC;GRANT EXECUTE ON PROCEDURE silver.load_silver() TO PUBLIC;GRANT USAGE ON SCHEMA silver TO PUBLIC;GRANT SELECT ON ALL TABLES IN SCHEMA silver TO PUBLIC;
While procedures are public, SECURITY DEFINER ensures they run with creator privileges. Review these permissions based on your security requirements.
SELECT p.category, SUM(f.sales_amount) as total_salesFROM gold.fact_sales fINNER JOIN gold.dim_product p ON f.product_key = p.product_keyGROUP BY p.category;