A containerized on-premise data platform I built for a multinational retail scenario. PostgreSQL, Spark, and Apache Airflow run as a five-service Docker Compose stack. A PySpark medallion pipeline (Bronze raw Parquet to Silver joined frame to Gold three-table dimensional model) flows through partitioned writes, broadcast joins, and explicit caching. Airflow orchestrates the pipeline daily with parameterized stock-vs-scaled execution from the same DAG.
Nova Retail Group is a fictional multinational retail and logistics organisation specialising in e-commerce, product distribution, and customer fulfillment. Their analysts and operations teams depend on a centralised PostgreSQL warehouse to answer questions about sales performance, regional behaviour, and product profitability. Their existing pipeline was a collection of manually executed Python scripts against fragmented database environments, with no historical preservation and no orchestration.
The brief identified four specific engineering constraints. Fragmented infrastructure: the organisation lacked a standardised environment for reproducible deployment. Manual ETL execution: workflows depended on operators remembering to run scripts in the right order. Limited historical data preservation: raw data was transformed in flight and discarded, so re-running analysis with new parameters meant re-ingesting from source. Runtime bottlenecks at growing volumes: pandas-based in-memory pipelines struggled past 100K-row datasets.
I was engaged as the on-premise data engineer responsible for transitioning Nova Retail from this fragmented, manual environment to a containerised, scheduled, distributed data platform. The deliverable had to do five things: deploy PostgreSQL reproducibly via Docker, build distributed PySpark ETL pipelines, implement Bronze/Silver/Gold medallion architecture, automate execution through orchestration, and support both production scheduling and ad-hoc scaled testing from the same DAG.
Dataset note: Nova Retail uses the public Olist Brazilian e-commerce dataset. This same dataset appears in PayFlow earlier in this portfolio. PayFlow demonstrates classical normalised warehousing; Nova Retail demonstrates production-grade infrastructure (Docker, Airflow, medallion). Different engineering angles on the same dataset.
The entire platform runs as a five-service Docker Compose stack. PostgreSQL 15 as the warehouse, a custom Spark container as the execution environment, and Apache Airflow's three components (init, webserver, scheduler) as the orchestration layer. Persistent named volumes keep data across restarts. A health check on PostgreSQL gates dependent services from starting before the database is genuinely ready. One docker-compose up brings the whole stack online.
The Spark container is "sidecar Spark": it runs sleep infinity and stays alive as a long-lived execution environment. Airflow doesn't run Spark itself. When the DAG triggers, the BashOperator calls docker exec spark /opt/spark/bin/spark-submit ... to inject jobs into the running Spark container. This keeps orchestration decoupled from execution, lets Airflow restart independently of Spark, and matches the topology used when deploying against a managed cluster (Databricks, EMR, Dataproc) where docker exec becomes a cluster-submit API call.
The Airflow DAG accepts two parameters via Airflow's native Param API: scale_data (boolean, defaults to false) and scale_multiplier (integer 1-5, defaults to 2). Scheduled runs use the defaults, producing analytical truth from real Olist data. Manual triggers via the Airflow UI surface a form letting the operator request a scaled run for architecture validation. Both flow through the same DAG, the same Spark container, the same pipeline code. Only the environment variables passed through docker exec change.
with DAG( dag_id="retail_etl_pipeline", schedule_interval="0 6 * * *", # daily at 06:00 UTC params={ "scale_data": Param(False, type="boolean"), "scale_multiplier": Param(2, type="integer", minimum=1, maximum=5), }, ) as dag: run_spark_etl = BashOperator( task_id="run_full_etl_pipeline", bash_command=( "docker exec " "-e SCALE_DATA={{ 'true' if params.scale_data else 'false' }} " "-e SCALE_MULTIPLIER={{ params.scale_multiplier }} " "spark /opt/spark/bin/spark-submit " "--master local[*] " "/opt/retail_project/etl/run_pipeline.py" ), )
The Jinja templating ({{ params.X }}) interpolates parameters into the docker exec -e flags at trigger time. The pipeline reads them as environment variables and adjusts behaviour accordingly.
Bronze writes raw Parquet with mode="overwrite". Silver writes joined Parquet with mode="overwrite". Gold loads to PostgreSQL with mode="overwrite". The honest tradeoff: simple, deterministic, idempotent. Running the pipeline twice in a row produces the same warehouse state. Re-running after a failure picks up cleanly. The cost is that every run rebuilds every layer; for the data volumes Nova Retail operates at, that cost is negligible compared to the simplicity it buys.
The warehouse isn't a single-table mart. It's three purpose-built analytical tables, each at its own grain, each with its own primary key, each with its own indexes. Different queries hit different tables. fact_sales for transaction-level questions. sales_summary for state-by-category analysis. sales_by_month_state for time-series.
fact_sales answers "what happened in this specific order?" sales_summary answers "how does category X perform across states?" sales_by_month_state answers "how did state Y trend over time?" Building these as separate tables with their own primary keys and indexes lets each query hit the right grain instead of scanning a generic fact table.
0.1 + 0.2 != 0.3). Same money-stays-money discipline as FibbieBanks. NUMERIC(12,2) on total_revenue accommodates larger aggregate values.
sql/create_tables.sql declares the three tables with composite primary keys, NOT NULL constraints, DECIMAL precision, and five read-optimised indexes on fact_sales (date, year, year-month, state, payment type). Spark's JDBC writer can auto-create tables, but it auto-infers types and silently discards PRIMARY KEYs and indexes. By owning the schema in version-controlled SQL, the warehouse gets full control. The DDL auto-runs on first PostgreSQL container start via the /docker-entrypoint-initdb.d mount, so the schema exists before any data lands.
CREATE SCHEMA IF NOT EXISTS retail_gold; CREATE TABLE IF NOT EXISTS retail_gold.fact_sales ( order_id VARCHAR(255) NOT NULL, customer_id VARCHAR(255), product_id VARCHAR(255), seller_id VARCHAR(255), customer_state VARCHAR(100), payment_value DECIMAL(10,2), payment_type VARCHAR(100), product_category_name VARCHAR(255), freight_value DECIMAL(10,2), order_purchase_timestamp TIMESTAMP, order_purchase_date DATE, order_year INTEGER, order_month INTEGER, order_day INTEGER, order_year_month VARCHAR(7), order_weekday INTEGER, PRIMARY KEY (order_id, product_id) ); -- Read-optimised indexes for the 8 production queries CREATE INDEX IF NOT EXISTS idx_fact_sales_date ON retail_gold.fact_sales (order_purchase_date); CREATE INDEX IF NOT EXISTS idx_fact_sales_year ON retail_gold.fact_sales (order_year); CREATE INDEX IF NOT EXISTS idx_fact_sales_ym ON retail_gold.fact_sales (order_year_month); CREATE INDEX IF NOT EXISTS idx_fact_sales_state ON retail_gold.fact_sales (customer_state); CREATE INDEX IF NOT EXISTS idx_fact_sales_payment ON retail_gold.fact_sales (payment_type);
Three engineering patterns carry most of the scalability weight in this pipeline. Plus one pattern that catches silent failures most pipelines never notice.
The Bronze layer writes orders and order_items with .partitionBy("order_year", "order_month"), deriving partition columns from order_purchase_timestamp and shipping_limit_date respectively. This isn't a transformation of the data; the source columns are preserved. The partition columns are storage-layout metadata that enable partition pruning on read. Small dimension tables (customers, sellers, products) write unpartitioned. At current 555K-row scale this is invisible. At any volume beyond that, it's the single biggest architectural lever.
PARTITIONED_DATASETS = {
"orders": "order_purchase_timestamp",
"order_items": "shipping_limit_date",
}
def bronze_layer(datasets):
for name, df in datasets.items():
output_path = layer_path("bronze", name)
if name in PARTITIONED_DATASETS:
ts_col = PARTITIONED_DATASETS[name]
df_partitioned = (
df.withColumn("order_year", year(col(ts_col)))
.withColumn("order_month", month(col(ts_col)))
)
df_partitioned.write \
.mode("overwrite") \
.partitionBy("order_year", "order_month") \
.parquet(output_path)
else:
df.write.mode("overwrite").parquet(output_path)
The Silver layer joins seven tables. Two of them are small enough to broadcast to every Spark executor instead of shuffling. sellers at 3K rows and products at 33K rows are stable across all tested scales. Explicit broadcast() hints make this deterministic instead of relying on Spark's auto-broadcast threshold. The customers table was tested as a broadcast target and removed when scaled testing showed it could exceed safe broadcast size; the discipline is to broadcast where the small side stays small, shuffle where it might grow.
silver_df = (
orders
.join(customers, "customer_id", "left")
.join(payments, "order_id", "left")
.join(items, "order_id", "left")
.join(broadcast(products), "product_id", "left") # 33K rows, safe
.join(broadcast(sellers), "seller_id", "left") # 3K rows, safe
# Numeric precision, date parts, then filter
.withColumn("payment_value", col("payment_value").cast("DECIMAL(10,2)"))
.withColumn("order_year", year(col("order_purchase_date")))
.filter(col("payment_value").isNotNull())
)
# Cache before reuse: count + write + downstream Gold all hit this frame
silver_df = silver_df.cache()
The Silver layer's joined output gets cached before any action. The Gold layer caches fact_sales after the column selection because it's used four times downstream (count + write + two summary aggregations). Without caching, Spark may recompute these chains on each subsequent action. With caching, the join work happens once and the result is held in memory (with disk spillover via MEMORY_AND_DISK semantics) for everything downstream. This is the optimisation that made Gold at 2x scale finish in 59 seconds instead of several minutes.
After each PostgreSQL write, the loader reads back the target table via JDBC and counts rows. If the count matches the source DataFrame, validation passes. If not, a warning is logged. This catches silent failures: writes that "succeed" at the protocol level but lose rows due to constraint violations, transaction rollbacks, or driver bugs. Three Gold tables, three validation passes per pipeline run.
def load_df_to_postgres(df, schema_name, table_name, mode="overwrite"): row_count = df.count() df.write \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", f"{schema_name}.{table_name}") \ .option("batchsize", 10000) \ .mode(mode) \ .save() # Validation: read back, count, compare check_df = df.sparkSession.read \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", f"{schema_name}.{table_name}") \ .load() loaded = check_df.count() if loaded == row_count: logging.info("✅ Validation OK — row count matches") else: logging.warning(f"⚠️ Mismatch! Source: {row_count} | DB: {loaded}")
The same logger.py from XTD and FibbieBanks powers Nova's observability: @timed decorator for stage durations, cross-platform UTF-8 detection for emoji-safe console output, dual-handler setup (colourised console + rotating UTF-8 file), section() banners between stages. One pipeline.log file tells the whole story of any run.
2026-06-08 13:56:21 INFO 🔷 Creating Spark Session 2026-06-08 13:57:02 INFO ============== Spark Session created successfully ============== 2026-06-08 13:57:02 INFO 🔷 STEP 1: EXTRACT 2026-06-08 13:57:30 INFO orders: 99441 rows · customers: 99441 · order_items: 112650 order_payments: 103886 · reviews: 104162 · products: 32951 · sellers: 3095 2026-06-08 13:57:30 INFO 🔷 STEP 2: BRONZE 2026-06-08 13:58:03 INFO Successfully written to Bronze: orders (partitioned by year/month) 2026-06-08 13:58:03 INFO Successfully written to Bronze: order_items (partitioned by year/month) 2026-06-08 13:58:03 INFO Successfully written to Bronze: customers · sellers · products · order_payments · reviews 2026-06-08 13:58:03 INFO 🔷 STEP 3: SILVER 2026-06-08 13:58:37 INFO Valid records after cleaning: 118,431 2026-06-08 13:58:37 INFO 🔷 STEP 4: GOLD 2026-06-08 13:59:14 INFO Fact Sales created: 118,431 records 2026-06-08 13:59:14 INFO Sales Summary created: 1,394 groups 2026-06-08 13:59:14 INFO Sales by Month + State created: 565 groups 2026-06-08 13:59:14 INFO 🔷 STEP 5: LOAD TO POSTGRESQL 2026-06-08 13:59:43 INFO ✅ Load successful → retail_gold.fact_sales — 118,431 rows 2026-06-08 13:59:43 INFO ✅ Validation OK — row count matches 2026-06-08 13:59:53 INFO ✅ Load successful → retail_gold.sales_summary — 1,394 rows 2026-06-08 13:59:53 INFO ✅ Validation OK — row count matches 2026-06-08 14:00:05 INFO ✅ Load successful → retail_gold.sales_by_month_state — 565 rows 2026-06-08 14:00:05 INFO ✅ Validation OK — row count matches 2026-06-08 14:00:06 INFO ✅✅✅ PIPELINE COMPLETED ✅✅✅ 2026-06-08 14:00:07 INFO ⏱️ Step completed in 3m 11s
Ten engineering decisions that shaped this pipeline, with the reasoning behind each.
mode="overwrite"?orders and order_items are the tables that grow as the business grows; partitioning them by year/month means a query for "last month's sales" reads only one partition instead of all 24+. Dimension tables (customers, sellers, products) are bounded by business reality (you have only so many sellers) and would just produce directory clutter if partitioned. Partition where pruning helps. Leave alone where it doesn't.
inferSchema=True?review_score column would be inferred as string but with explicit StructType schema it reads as integer. The cost is a one-time schema declaration. The benefit is permanent.
sellers at 3K rows is always safe. products at 33K is safe. customers at 99K was on the edge; scaled testing showed broadcast pressure at higher volumes, so customers was reverted to a regular shuffle join. Broadcast where the small side stays small. Shuffle where it might grow.
fact_sales answers questions about individual transactions. sales_summary answers "how does category X perform across states." sales_by_month_state answers "how did state Y trend over time." Building these as separate tables with their own primary keys and indexes lets each query hit the right table. The cost is three writes. The benefit is queries that run in milliseconds against the right table instead of seconds against a generic fact.
Param means the same orchestration code, two operating modes, controlled at trigger time. The pattern matches what real Airflow deployments do for backfills, replays, and operational what-ifs.
SparkSubmitOperator requires Spark installed inside the Airflow worker container. That couples Airflow's lifecycle to Spark's: upgrading one means rebuilding both. The sidecar pattern keeps them independent: the Spark container can be rebuilt with a new Spark version while Airflow stays exactly as it was. Same shape used at scale when Airflow triggers Spark on a remote cluster. Only the submit mechanism changes; the topology is identical.
sql/create_tables.sql, not auto-created?/docker-entrypoint-initdb.d mount, so the schema exists before any data lands.
apache/spark:4.1.1 image doesn't include the PostgreSQL JDBC driver. Without it, every pipeline run would have to download the driver at runtime (slow, network-dependent, version-drift-prone) or mount it from the host (fragile). Baking the driver into a custom image makes it part of the image contract: this image always has the right JDBC driver at the right version. The Dockerfile is four lines. The reliability gain is permanent.
Real numbers from two captured pipeline logs (June 2026): the stock production run that anchors analytical truth, and the 2x scaled validation run that proves the architecture handles substantially larger volumes than current production scale.
docker-compose up brings the entire data platform online.The 2x scaled run replicates source data via union, producing ~1.1M source rows that join-multiply to 7,533,104 Silver records. The pipeline processed all 7.5M rows end-to-end in 13m 14s with row-count validation passing on every Postgres load. Honest framing: this validates throughput, not analytical correctness. Row replication produces duplicate keys, which compound through joins; the 7.5M figure represents pipeline row-handling capacity, not distinct logical sales.
What it does prove, concretely: PySpark's distributed transform engine handled 7.5M rows through the medallion without intervention. The caching strategy paid off (Gold built three tables from a 7.5M-row cached fact_sales in 59 seconds total). Bronze partitioning scaled linearly (2x volume produced ~2x Bronze write time, not exponential). The JDBC load completed (7.5M rows over a single connection in ~5 minutes write + ~3.5 minutes validation). Three Gold tables loaded and validated successfully.
spark.master and memory config changes; the medallion separation, partitioning, broadcast strategy, and validation pattern all carry forward unchanged.
For production at higher volumes, the natural evolutions would be: parallel JDBC writes via Spark's numPartitions + partitionColumn to address the fact_sales load becoming the dominant cost at 2x scale and above; cross-platform Spark paths in spark_session.py (currently Windows-tuned hardcoded paths for the bare-metal developer workflow); incremental Silver via dynamic partition overwrite so growing data doesn't rebuild the entire Silver layer on every run; Spark cluster deployment (Databricks, EMR, Dataproc) which the code is already shaped for, only spark.master and infrastructure config would change; dbt models on top of Gold for analyst-authored transformations with tested SQL; migration tooling (Alembic, Flyway) for evolving DDL safely instead of DROP + CREATE; and separated PostgreSQL instances for Airflow metadata vs warehouse, currently sharing one container for development simplicity.
I'm currently looking for Data Engineer / Analytics Engineer roles. UK-based, open to remote or hybrid.