A three-stage data engineering pipeline I built for a UK grid decarbonization research scenario. Async aiohttp pulls 1,095 days of regional carbon intensity from a live government API into a Bronze data lake. PySpark explodes deeply nested JSON, pivots fuel types into typed columns, and aggregates to daily research metrics across Silver and Gold layers. A two-stage dedup-merge load lands the result in PostgreSQL with composite-key idempotency.
XTD Research Labs is a fictional scientific institution studying the environmental footprint of UK electricity generation. Their analysts and policy researchers depend on regional carbon intensity data to build longitudinal climate models, with their current focus covering 2022 to 2024. The existing pipeline was an in-memory Python script designed for 30-minute interval pulls. It worked for one day at a time, and became the binding constraint on every research initiative that needed more than a few days of history.
The brief identified three specific engineering constraints. Scale incompatibility: the in-memory pipeline could not process 1,000+ days of concurrent regional data, and multi-year backfills timed out. Network latency: synchronous API requests against the Carbon Intensity API triggered rate-limiting when fetching multi-year datasets. No schema preservation: without a layered architecture, raw responses were discarded after transformation, so re-running analysis with new research parameters meant re-fetching from the source API every time.
I was engaged as the data engineer responsible for transitioning XTD from this single-day model to a distributed big data architecture capable of handling the full 2022 to 2024 research window. The deliverable had to do three things: ingest thousands of days of API data without hammering the upstream provider, transform deeply nested JSON at scale, and archive raw data immutably so reprocessing never required re-hitting the API.
The pipeline follows a medallion architecture: Bronze (raw API JSONs), Silver (cleaned
PySpark Parquet), Gold (daily aggregated CSV). Three Python modules under an etl/ package,
each runnable independently. There is no run_all.py orchestrator: each stage has a
meaningfully different runtime profile (Bronze is API-bound, Silver/Gold is CPU-heavy under PySpark,
Load is DB-bound), so decoupling them makes debugging, partial reruns, and external orchestration
cleaner.
Each layer uses a different idempotency model because the cost of rebuilding it is different. Bronze costs ~12 minutes and 1,000+ live API calls. Silver costs ~5 minutes of Spark. Gold costs ~3 seconds. A unified "wipe everything and rebuild" model would force every run to re-hit the API. A unified "incremental everything" model would make Gold queries return stale data. Different mechanisms, each appropriate to its layer.
The Bronze layer uses aiohttp with two independent rate-limit boundaries.
asyncio.Semaphore(5) bounds logical task concurrency.
aiohttp.TCPConnector(limit=5) bounds physical socket count. Either alone
would mostly work; layered, they make a 429 essentially impossible at this scale. In the actual run,
1,095 API calls produced exactly zero rate-limit errors.
semaphore = asyncio.Semaphore(5) # bounds in-flight task count async def fetch_carbon_data(session, target_date): file_path = os.path.join(BRONZE_DIR, f"{target_date}.json") if os.path.exists(file_path): return None # idempotent: skip if already saved async with semaphore: for attempt in range(3): try: async with session.get(url, timeout=20) as response: if response.status == 200: data = await response.json() # defensive: API sometimes returns 200 with empty payload if 'data' in data and data['data']: save_bronze(data, file_path) elif response.status == 429: await asyncio.sleep(5 * (attempt + 1)) # 5s, 10s, 15s except (aiohttp.ClientError, asyncio.TimeoutError): await asyncio.sleep(2 * (attempt + 1))
The empty-payload check (if 'data' in data and data['data']) is the kind of defensive code
that only exists because the author has been burned. The Carbon Intensity API occasionally returns
200 OK with an empty array. Naive code would write the empty response and call it success.
This logs a warning and refuses to write a corrupt file. In the actual run, it caught date
2023-10-22, which is why Bronze contains 1,095 days not 1,096.
The warehouse is a single-table data mart, not a star schema. The grain is one row per region per day, which is exactly how XTD's research analysts consume the data: time-series queries sliced by region and date range. With ~19.7K total rows in the analytics table, normalization would cost more than it would buy.
0.1 + 0.2 != 0.3). Same principle as money in fintech,
applied to research-grade environmental data.
DDL lives in sql/create_table.sql, hand-authored, versioned in git. The Python loader
doesn't get to define the schema by inference. This is the same discipline as FibbieBanks's
create_tables.sql: types, constraints, and NOT NULL declarations belong to SQL, not to
whatever pandas decides on the first to_sql(if_exists="append") call. The dedup-merge in
load.py mirrors the composite PK exactly, so two layers of defense protect the same
invariant.
CREATE SCHEMA IF NOT EXISTS carbon_data; DROP TABLE IF EXISTS carbon_data.carbon_intensity_daily; CREATE TABLE IF NOT EXISTS carbon_data.carbon_intensity_daily ( regionid INT NOT NULL, date_recorded DATE NOT NULL, shortname VARCHAR(100) NOT NULL, dno VARCHAR(100) NOT NULL, intensity_avg DECIMAL(10,2), index_mode VARCHAR(50), fuel_biomass DECIMAL(10,2), fuel_coal DECIMAL(10,2), fuel_gas DECIMAL(10,2), fuel_hydro DECIMAL(10,2), fuel_imports DECIMAL(10,2), fuel_nuclear DECIMAL(10,2), fuel_other DECIMAL(10,2), fuel_solar DECIMAL(10,2), fuel_wind DECIMAL(10,2), PRIMARY KEY (regionid, date_recorded) );
Honest gap worth flagging: there are no secondary indexes beyond the PK and no FK to a
dim_region table. With 19,728 total rows and queries that almost always filter by
regionid or date_recorded (both covered by the PK index), additional indexes
would be premature. A future iteration with billions of rows or new query patterns might add a
standalone date_recorded index for time-range scans, but for this scale it would be
optimization theater.
Three engineering patterns carry most of the technical weight in this pipeline. They map directly to the three layers of the medallion.
The Carbon Intensity API response is two arrays deep: each day has regions[], each region
has generationmix[]. To get one tabular row per (region, timestamp, fuel) requires
exploding both arrays, then pivoting fuel types from rows into columns. PySpark's F.explode
and .pivot() handle this at scale where pandas would struggle. 53,594 raw JSON
records expand to 8,682,228 intermediate rows after the double explode, then collapse to 945,092
silver records after the pivot.
def transform_bronze_to_silver(raw_df): # Level 1 explode: regions[] array exploded_df = raw_df.select( F.col("from").alias("timestamp"), F.explode(F.col("regions")).alias("region") ) # Level 2 explode: generationmix[] array within each region silver_df = exploded_df.select( F.col("timestamp"), F.col("region.regionid").alias("regionid"), F.col("region.shortname").alias("shortname"), F.col("region.dnoregion").alias("dno"), F.col("region.intensity.forecast").alias("intensity"), F.col("region.intensity.index").alias("index"), F.explode(F.col("region.generationmix")).alias("mix") ) # Pivot: fuel types as columns, percentages as values silver_df_pivoted = silver_df.groupBy( "regionid", "shortname", "dno", "timestamp", "intensity", "index" ).pivot("mix.fuel").agg(F.first("mix.perc")) return silver_df_pivoted # 9 fuel columns: biomass, coal, gas, ...
The Silver layer is incremental, not full-rebuild. A plain text file
(data/silver/_processed_files.txt) records which Bronze JSON files have already been folded
in. On each run, the transform compares the current Bronze directory against the tracker, processes only
files not yet listed, appends to the Silver Parquet directory, and marks them processed. Simple,
debuggable, no state in the database. At larger scales a checkpoint table would be more
robust; for ~1,000 files this is honest engineering.
The loader does deduplication twice. Stage one removes duplicates within the input files via
drop_duplicates(subset=["regionid", "date_recorded"]). Stage two pulls existing keys from
PostgreSQL and runs a merge(how="left", indicator=True) against them, keeping only rows
where the indicator says left_only. Same composite key as the PostgreSQL PRIMARY KEY.
Re-runs insert zero rows.
unique_cols = ["regionid", "date_recorded"] # STAGE 1: Dedup within the input files before_file = len(df) df = df.drop_duplicates(subset=unique_cols, keep="first") logging.info(f"Removed {before_file - len(df)} duplicate rows FROM INPUT FILES") # STAGE 2: Diff against what's already in the database existing = pd.read_sql("SELECT regionid, date_recorded FROM ...", engine) df = df.merge(existing, on=unique_cols, how="left", indicator=True) df = df[df["_merge"] == "left_only"].drop(columns=["_merge"]) df.to_sql(table_name, engine, schema="carbon_data", if_exists="append", index=False, chunksize=20000)
Why pandas merge instead of PostgreSQL's ON CONFLICT DO NOTHING? The merge gives explicit
logging on both axes: "removed X rows from input files" and "skipped Y rows already in database." When
something goes wrong, the log tells the operator exactly which side the duplicate came from. Trades one
PostgreSQL optimization for observability that helps debug an idempotent load.
A custom logger.py module (shared across multiple projects in this portfolio) provides a
@timed decorator that wraps every pipeline stage with execution timing, cross-platform
UTF-8 detection for emoji-safe console output, a dual-handler setup with colorized console and rotating
file logs, and SQLAlchemy noise suppression. Every retry, every empty payload, every dedup result is in
pipeline.log.
2026-05-27 19:38:10 INFO 🔷 Counting Extracted JSON Files 2026-05-27 19:38:10 INFO Total JSON files: 1095 2026-05-27 19:38:10 INFO ⏱️ Step completed in 11m 43s 2026-05-27 19:43:08 INFO 📥 Loaded 53594 new records from Bronze. 2026-05-27 19:43:46 INFO 📥 Exploded generation mix for 8682228 records. 2026-05-27 19:44:54 INFO 📥 Pivoted generation mix for 945092 records. 2026-05-27 19:46:46 INFO 📦 Full Silver dataset ready — 945092 total records. 2026-05-27 19:46:48 INFO ✅ Gold layer ready — 19728 daily records. 2026-05-27 19:47:13 INFO ✅ FULL PIPELINE COMPLETED 2026-05-27 19:47:13 INFO ⏱️ Step completed in 5m 32s 2026-05-27 19:49:01 INFO ✅ Removed 0 duplicate rows FROM INPUT FILES 2026-05-27 19:49:01 INFO ✅ Skipped 0 rows ALREADY IN DATABASE 2026-05-27 19:49:07 INFO ✅ SUCCESS: 19728 NEW rows loaded 2026-05-27 19:49:29 INFO ✅ Skipped 19728 rows ALREADY IN DATABASE 2026-05-27 19:49:29 INFO ℹ️ All rows already exist — table up to date
The final two lines are the visible proof that the dedup-merge actually works. First load: 19,728 rows in. Second load on the same data: 19,728 rows recognized as already present, zero inserted.
Seven engineering decisions that shaped this pipeline, with the reasoning behind each.
asyncio.Semaphore(5) collapses this to under 12 minutes
and never trips rate limits because the semaphore caps in-flight concurrency. The
right tool for I/O-bound work is async I/O.
asyncio.Semaphore(5) bounds logical task concurrency: how many
fetch_carbon_data() coroutines are running at once.
aiohttp.TCPConnector(limit=5) bounds physical socket count: how many
TCP connections are open to the server. Either alone would work most of the time. Layered, they make
a 429 essentially impossible at this scale. In the actual May 2026 run, 1,095 successful requests
produced zero rate-limit errors.
ON CONFLICT DO NOTHING?ON CONFLICT (regionid, date_recorded) DO NOTHING would be the conventional
choice. Two-stage pandas dedup-merge wins on observability: the log explicitly says
"removed X rows from input files" and "skipped Y rows already in database." When a duplicate
appears, the operator can see which side it came from. Trades one PostgreSQL optimization for
observability that helps debug an idempotent load. Real production engineering is about
debuggability, not raw throughput.
(regionid, date_recorded)?0.1 + 0.2 != 0.3). Fuel mix data is consumed by analysts who compute means,
sums, and rolling averages. DECIMAL preserves exactness through aggregation. Carbon intensity
(gCO2/kWh) and fuel mix (%) both demand exactness for policy-grade research. The schema commits to
two-decimal precision; the transform layer rounds to two decimals with
F.round(F.mean(...), 2). Same contract, two layers.
run_all.py orchestrator?Real numbers from the actual May 2026 pipeline run, not estimates.
For production at higher volumes, the natural evolutions would be: cross-platform Spark
paths via env vars so the same code runs on macOS, Linux, and Windows (currently hardcoded
to C:/hadoop and C:/spark/jars/...); transactional Silver
tracker so the processed-files marker only updates after a successful Spark write;
a thin run_all.py orchestrator for non-interactive runs with halt-on-failure semantics;
secondary indexes on the fact table for time-range queries that don't filter by
regionid; a dim_region lookup table with FK from the fact to normalise the small
repeated string columns; managed Spark (Databricks, EMR, Dataproc) to parallelise the
transform; and dbt models on top of the warehouse for analyst-authored transformations
with tested SQL.
I'm currently looking for Data Engineer / Analytics Engineer roles. UK-based, open to remote or hybrid.