A 1M-row PySpark pipeline for synthetic banking transactions, with a validated star schema warehouse, deterministic SHA-256 surrogate keys, and PostgreSQL as the analytics destination. End-to-end in ~25 minutes on a single local Spark instance. Re-run safely in ~9 minutes with zero duplicate rows.
A 1-million-row banking dataset arrives as a single flat CSV with 23 columns. Customer demographics, transaction
details, employee metadata, financial identifiers, audit timestamps, all flattened together. For an analyst,
this is unusable: you can't SUM(amount) across customers without scanning the whole file, you can't
ask "show me weekend transactions in Q3" without parsing dates row-by-row, and you can't trust the numbers
because ~10% of rows have NULLs scattered across every column.
The job was to turn this raw CSV into a dimensional warehouse: a Kimball-style star schema where dimensions are conformed, facts are tightly typed, and analytical queries run in seconds against a Postgres instance any BI tool can connect to. Beyond the modelling, the pipeline had to be safely re-runnable. If someone re-runs it tomorrow on the same source data, the warehouse must not double, must not drift, must not silently corrupt itself.
The pipeline is composed of five Python modules under an etl/ package. Each stage is independently
runnable for debugging (python -m etl.clean, python -m etl.transform, etc.), and each
can be imported by the next via a clean main() entry point that returns its outputs plus the shared
Spark session.
The explore module is its own step rather than a notebook afterthought. Knowing your data is a
first-class engineering concern: profiling row counts, null distributions, and distinct values catches problems
before they propagate downstream. Most pipelines bury this in commented-out notebook cells. Here it has a
dedicated module with its own section() banners in the log output.
Each module imports the previous one's main() function and chains forward. Running
python -m etl.load walks the entire pipeline: extract → clean → transform → load, because each
stage's main() calls into the previous one. This means every stage is independently testable and
every intermediate output is available without re-reading the source CSV.
def main(): df_cleaned, spark = clean_main() # Build the date dimension first — others depend on it dim_date_df = dim_date(df_cleaned) # Build the surrogate-keyed dimensions in parallel dim_cust = dim_customer(df_cleaned) dim_trans = dim_transaction(df_cleaned, dim_date_df) dim_emp = dim_employee(df_cleaned) # Fact joins back to all four dims on natural keys fact = fact_table(df_cleaned, dim_cust, dim_trans, dim_emp) return { "dim_date": dim_date_df, "dim_customer": dim_cust, "dim_transaction": dim_trans, "dim_employee": dim_emp, "fact_transactions": fact }, spark
The warehouse is a textbook star schema. One central fact (fact_transactions) connects to four
conformed dimensions via foreign keys. Every analytical query joins through the fact; every dimension is
independently sliceable.
date_key is an INT in YYYYMMDD format (the Kimball convention). Small, naturally
sortable, human-readable. Date dimensions are tiny (one row per date), so a 4-byte INT beats a 64-char hash
on every dimension that matters: storage, joins, debuggability.
VARCHAR(64) surrogate key
derived from sha2(concat_ws('|', ...natural_keys), 256). Same row in → same key out, every run.
This is what makes the LEFT JOIN merge in load.py actually idempotent.
Eight of the source columns (credit_card_number, iban, currency_code,
random_number, category, group, is_active,
description) don't get their own dim tables. They're either one-to-one with transactions (no
analytical benefit to splitting) or pure metadata. Following Kimball's "degenerate dimension" principle, they
live as attributes directly on the fact row. A pure star, not a snowflake. Easier to query, smaller storage,
fewer joins.
CREATE TABLE analytics.fact_transactions ( transaction_id VARCHAR(64) PRIMARY KEY REFERENCES analytics.dim_transaction(transaction_id), customer_id VARCHAR(64) NOT NULL REFERENCES analytics.dim_customer(customer_id), employee_id VARCHAR(64) NOT NULL REFERENCES analytics.dim_employee(employee_id), date_key INT NOT NULL REFERENCES analytics.dim_date(date_key), -- Degenerate dimensions credit_card_number NUMERIC(50), iban VARCHAR(80), currency_code VARCHAR(10), random_number NUMERIC(18,4), category VARCHAR(100), "group" VARCHAR(100), -- quoted: reserved word is_active VARCHAR(10), last_updated TIMESTAMP, description TEXT );
A warehouse is only as trustworthy as the discipline that built it. Three engineering patterns run through every stage of FibbieBanks.
Every transformation in transform.py calls validate_schema() before and after it runs.
If a column is missing, the pipeline halts with a ValueError listing exactly which columns weren't
found. This means a schema drift in the source CSV gets caught at the start of transform, not three stages later
when something tries to load NULLs into a NOT NULL column.
def validate_schema(df, expected_columns: list[str]) -> None: missing_cols = [c for c in expected_columns if c not in df.columns] if missing_cols: logging.error(f"Missing columns: {missing_cols}") raise ValueError(f"Missing required columns: {missing_cols}") logging.info("✅ Schema validation passed.")
The source CSV's amount column arrives as a string (with "$1,250.50" formatting in places). In
clean.py it's stripped of non-numeric characters via regex and cast to Spark's
DecimalType(18,2). This type flows through every subsequent stage unchanged and lands in PostgreSQL
as NUMERIC(18,2). The cast logs how many rows became NULL during the conversion. Silent data
corruption is impossible.
0.1 + 0.2 != 0.3). Strings prevent aggregation. DECIMAL
preserves exactness AND enables SUM(amount) directly in SQL without casts. Money is
too important to leave to type inference.
A custom logger.py module provides a @timed decorator that wraps every pipeline stage
with execution timing, and a section() utility that prints visual banners between stages. Every
cleaning operation logs before/after row counts. Every load operation logs how many rows were appended (or that
none were, on idempotent re-runs). The whole story of any run is in a single pipeline.log file.
2026-05-17 15:47:30 INFO 🔷 Creating dim_date table 2026-05-17 15:47:31 INFO ✅ dim_date table created. 2026-05-17 15:47:31 INFO 🔷 Creating dim_transaction table 2026-05-17 15:47:31 INFO ✅ Schema validation passed. 2026-05-17 15:47:36 INFO ✅ dim_transaction table created — amount is NUMERIC. 2026-05-17 15:47:36 INFO ⏱️ Step completed in 2m 6s 2026-05-17 15:47:42 INFO 🔷 Loading dimension & fact tables — INCREMENTAL MODE 2026-05-17 15:47:54 INFO ℹ️ No new rows to insert — analytics.dim_date is already up to date 2026-05-17 15:48:44 INFO ℹ️ No new rows to insert — analytics.dim_customer is already up to date 2026-05-17 15:49:50 INFO ℹ️ No new rows to insert — analytics.dim_transaction is already up to date 2026-05-17 15:50:26 INFO ℹ️ No new rows to insert — analytics.dim_employee is already up to date 2026-05-17 15:54:26 INFO ℹ️ No new rows to insert — analytics.fact_transactions is already up to date 2026-05-17 15:54:27 INFO 🎉 ALL TABLES LOADED SUCCESSFULLY — ONLY NEW DATA APPENDED
That sequence of "No new rows to insert" lines (one per table) is the visible proof that idempotency is working. The deterministic hash keys mean the same source data produces the same surrogate keys, and the LEFT JOIN merge correctly identifies that nothing is actually new.
Six engineering decisions that shaped FibbieBanks, with the reasoning behind each.
monotonically_increasing_id()?load.py entirely: every row looks "new" because its surrogate key doesn't match what's in the
target table. Switching to sha2(concat_ws('|', ...natural_keys), 256) makes the surrogate key a
deterministic function of the row's content. Same row in, same key out, every time. This is what makes the
warehouse idempotent.
0.1 + 0.2 doesn't equal 0.3 in IEEE
754. For money, that's unacceptable. DECIMAL preserves exactness and still enables
SUM, AVG, and other aggregations directly in SQL without casts. The same type flows
from Spark's DecimalType(18,2) through to Postgres's NUMERIC(18,2). Money stays
money through the entire pipeline.
VARCHAR(64) documents this contract to
anyone reading the schema and constrains the column to its real shape. TEXT would technically
work but tells future readers nothing about the column's purpose. The tradeoff is real: 64-char surrogate keys
are larger than BIGINT and slightly slower to join. But warehouse correctness on re-runs is worth far more
than the micro-perf delta.
create_tables.sql, the warehouse gets full control over types, constraints, and read-performance
indexes. The ETL becomes a pure data-movement layer with no DDL concerns mixed in. The pipeline hard-fails if
the schema isn't there, with a clear error pointing the operator at the SQL file.
explore.py stage?python -m etl.explore whenever the source dataset changes.
SystemExit(1) on critical errors: missing env vars, missing target tables,
JDBC write failures. Silent recovery is the enemy of observability. A pipeline that "kind of
worked" is worse than one that loudly didn't, because nobody investigates a green log. Loud failure means real
correctness, not optimistic correctness.
Measurable outcomes from end-to-end runs on the 1M-row source dataset.
For production at higher volumes, the natural evolutions would be: managed Spark
(Databricks/EMR/Dataproc) to parallelise the transform across executors; parallelised JDBC
writes via Spark's numPartitions + partitionColumn; dbt on
top of the warehouse for analyst-authored transformations and tested SQL models; Airflow or Prefect
orchestration for scheduled runs with retry and alerting; SCD Type 2 on dim_customer
with valid_from / valid_to columns to track attribute changes over time; and
migration tooling (Alembic, Flyway) for evolving the DDL safely instead of
DROP + CREATE.
I'm currently looking for Data Engineer / Analytics Engineer roles. UK-based, open to remote or hybrid.