Back to portfolio
~10 min read · Updated May 2026
Case Study · Featured · Async + Medallion

XTD Research Labs: Async ingestion
and PySpark medallion pipeline

A three-stage data engineering pipeline I built for a UK grid decarbonization research scenario. Async aiohttp pulls 1,095 days of regional carbon intensity from a live government API into a Bronze data lake. PySpark explodes deeply nested JSON, pivots fuel types into typed columns, and aggregates to daily research metrics across Silver and Gold layers. A two-stage dedup-merge load lands the result in PostgreSQL with composite-key idempotency.

Featured case study Live UK Carbon Intensity API aiohttp async ingestion PySpark 4.1.1 medallion PostgreSQL 14+ Bronze · Silver · Gold
1,095
days ingested
8.7M
intermediate rows
945K
silver records
19.7K
gold daily rows
32
500s retried & recovered
0
rate-limit hits
01 The Problem

A research lab can't backfill three years of grid data.

XTD Research Labs is a fictional scientific institution studying the environmental footprint of UK electricity generation. Their analysts and policy researchers depend on regional carbon intensity data to build longitudinal climate models, with their current focus covering 2022 to 2024. The existing pipeline was an in-memory Python script designed for 30-minute interval pulls. It worked for one day at a time, and became the binding constraint on every research initiative that needed more than a few days of history.

The brief identified three specific engineering constraints. Scale incompatibility: the in-memory pipeline could not process 1,000+ days of concurrent regional data, and multi-year backfills timed out. Network latency: synchronous API requests against the Carbon Intensity API triggered rate-limiting when fetching multi-year datasets. No schema preservation: without a layered architecture, raw responses were discarded after transformation, so re-running analysis with new research parameters meant re-fetching from the source API every time.

I was engaged as the data engineer responsible for transitioning XTD from this single-day model to a distributed big data architecture capable of handling the full 2022 to 2024 research window. The deliverable had to do three things: ingest thousands of days of API data without hammering the upstream provider, transform deeply nested JSON at scale, and archive raw data immutably so reprocessing never required re-hitting the API.

Design constraint
The Carbon Intensity API is a live, public, government-maintained service. Every Bronze rebuild hits production. Rate-limit discipline, retry semantics, and bronze immutability aren't optimisations. They're prerequisites for being a polite consumer of a free public API.
02 Architecture

Three layers, three idempotency models.

The pipeline follows a medallion architecture: Bronze (raw API JSONs), Silver (cleaned PySpark Parquet), Gold (daily aggregated CSV). Three Python modules under an etl/ package, each runnable independently. There is no run_all.py orchestrator: each stage has a meaningfully different runtime profile (Bronze is API-bound, Silver/Gold is CPU-heavy under PySpark, Load is DB-bound), so decoupling them makes debugging, partial reruns, and external orchestration cleaner.

🥉
Bronze
async aiohttp · semaphore(5) · retry w/ backoff · file-existence idempotency
🥈
Silver
PySpark · explode regions+mix · pivot 9 fuels · tracker-file idempotency · Parquet append
🥇
Gold
daily aggregation · region × date groupBy · wipe-and-rebuild · CSV overwrite
📦
Load
two-stage dedup-merge · pandas merge w/ indicator · composite-PK enforced

Each layer uses a different idempotency model because the cost of rebuilding it is different. Bronze costs ~12 minutes and 1,000+ live API calls. Silver costs ~5 minutes of Spark. Gold costs ~3 seconds. A unified "wipe everything and rebuild" model would force every run to re-hit the API. A unified "incremental everything" model would make Gold queries return stale data. Different mechanisms, each appropriate to its layer.

Async ingestion with layered rate-limit defense

The Bronze layer uses aiohttp with two independent rate-limit boundaries. asyncio.Semaphore(5) bounds logical task concurrency. aiohttp.TCPConnector(limit=5) bounds physical socket count. Either alone would mostly work; layered, they make a 429 essentially impossible at this scale. In the actual run, 1,095 API calls produced exactly zero rate-limit errors.

etl/extract.py · concurrency + retry
semaphore = asyncio.Semaphore(5)  # bounds in-flight task count

async def fetch_carbon_data(session, target_date):
    file_path = os.path.join(BRONZE_DIR, f"{target_date}.json")
    if os.path.exists(file_path):
        return None  # idempotent: skip if already saved

    async with semaphore:
        for attempt in range(3):
            try:
                async with session.get(url, timeout=20) as response:
                    if response.status == 200:
                        data = await response.json()
                        # defensive: API sometimes returns 200 with empty payload
                        if 'data' in data and data['data']:
                            save_bronze(data, file_path)
                    elif response.status == 429:
                        await asyncio.sleep(5 * (attempt + 1))  # 5s, 10s, 15s
            except (aiohttp.ClientError, asyncio.TimeoutError):
                await asyncio.sleep(2 * (attempt + 1))

The empty-payload check (if 'data' in data and data['data']) is the kind of defensive code that only exists because the author has been burned. The Carbon Intensity API occasionally returns 200 OK with an empty array. Naive code would write the empty response and call it success. This logs a warning and refuses to write a corrupt file. In the actual run, it caught date 2023-10-22, which is why Bronze contains 1,095 days not 1,096.

03 Schema Design

A flat fact table with composite primary key.

The warehouse is a single-table data mart, not a star schema. The grain is one row per region per day, which is exactly how XTD's research analysts consume the data: time-series queries sliced by region and date range. With ~19.7K total rows in the analytics table, normalization would cost more than it would buy.

carbon_data.carbon_intensity_daily
regionid + date_recorded (composite PK) · shortname (NOT NULL) · dno (NOT NULL) · intensity_avg DECIMAL(10,2) · index_mode · fuel_biomass · fuel_coal · fuel_gas · fuel_hydro · fuel_imports · fuel_nuclear · fuel_other · fuel_solar · fuel_wind (all DECIMAL(10,2))

Three key decisions

Decision · Primary key
Composite PK on (regionid, date_recorded)
The natural key for the data is one row per region per day. The composite PK enforces this at the database level and matches the keys the Python loader dedupes against. Same key, two enforcement points. PostgreSQL won't accept a duplicate even if the application logic ever drifts.
Decision · Precision
DECIMAL(10,2) on every numeric column
Carbon intensity is reported in gCO2/kWh and fuel mix in percentages. Both demand exactness when analysts compute means, sums, or rolling averages. DECIMAL preserves precision through aggregation. FLOAT loses it (0.1 + 0.2 != 0.3). Same principle as money in fintech, applied to research-grade environmental data.

Schema owned by SQL, not pandas

DDL lives in sql/create_table.sql, hand-authored, versioned in git. The Python loader doesn't get to define the schema by inference. This is the same discipline as FibbieBanks's create_tables.sql: types, constraints, and NOT NULL declarations belong to SQL, not to whatever pandas decides on the first to_sql(if_exists="append") call. The dedup-merge in load.py mirrors the composite PK exactly, so two layers of defense protect the same invariant.

sql/create_table.sql
CREATE SCHEMA IF NOT EXISTS carbon_data;

DROP TABLE IF EXISTS carbon_data.carbon_intensity_daily;

CREATE TABLE IF NOT EXISTS carbon_data.carbon_intensity_daily (
    regionid         INT          NOT NULL,
    date_recorded    DATE         NOT NULL,
    shortname        VARCHAR(100) NOT NULL,
    dno              VARCHAR(100) NOT NULL,
    intensity_avg    DECIMAL(10,2),
    index_mode       VARCHAR(50),
    fuel_biomass     DECIMAL(10,2),
    fuel_coal        DECIMAL(10,2),
    fuel_gas         DECIMAL(10,2),
    fuel_hydro       DECIMAL(10,2),
    fuel_imports     DECIMAL(10,2),
    fuel_nuclear     DECIMAL(10,2),
    fuel_other       DECIMAL(10,2),
    fuel_solar       DECIMAL(10,2),
    fuel_wind        DECIMAL(10,2),
    PRIMARY KEY (regionid, date_recorded)
);

Honest gap worth flagging: there are no secondary indexes beyond the PK and no FK to a dim_region table. With 19,728 total rows and queries that almost always filter by regionid or date_recorded (both covered by the PK index), additional indexes would be premature. A future iteration with billions of rows or new query patterns might add a standalone date_recorded index for time-range scans, but for this scale it would be optimization theater.

04 Engineering

Flatten nested JSON. Append-only Silver. Dedup-merge Load.

Three engineering patterns carry most of the technical weight in this pipeline. They map directly to the three layers of the medallion.

1. Double-explode-and-pivot for deeply nested JSON

The Carbon Intensity API response is two arrays deep: each day has regions[], each region has generationmix[]. To get one tabular row per (region, timestamp, fuel) requires exploding both arrays, then pivoting fuel types from rows into columns. PySpark's F.explode and .pivot() handle this at scale where pandas would struggle. 53,594 raw JSON records expand to 8,682,228 intermediate rows after the double explode, then collapse to 945,092 silver records after the pivot.

etl/transform.py · the explode+pivot core
def transform_bronze_to_silver(raw_df):
    # Level 1 explode: regions[] array
    exploded_df = raw_df.select(
        F.col("from").alias("timestamp"),
        F.explode(F.col("regions")).alias("region")
    )

    # Level 2 explode: generationmix[] array within each region
    silver_df = exploded_df.select(
        F.col("timestamp"),
        F.col("region.regionid").alias("regionid"),
        F.col("region.shortname").alias("shortname"),
        F.col("region.dnoregion").alias("dno"),
        F.col("region.intensity.forecast").alias("intensity"),
        F.col("region.intensity.index").alias("index"),
        F.explode(F.col("region.generationmix")).alias("mix")
    )

    # Pivot: fuel types as columns, percentages as values
    silver_df_pivoted = silver_df.groupBy(
        "regionid", "shortname", "dno",
        "timestamp", "intensity", "index"
    ).pivot("mix.fuel").agg(F.first("mix.perc"))

    return silver_df_pivoted  # 9 fuel columns: biomass, coal, gas, ...

2. Incremental Silver via a processed-files tracker

The Silver layer is incremental, not full-rebuild. A plain text file (data/silver/_processed_files.txt) records which Bronze JSON files have already been folded in. On each run, the transform compares the current Bronze directory against the tracker, processes only files not yet listed, appends to the Silver Parquet directory, and marks them processed. Simple, debuggable, no state in the database. At larger scales a checkpoint table would be more robust; for ~1,000 files this is honest engineering.

Honest gap worth flagging
The tracker is updated before the Spark append confirms success. If Spark crashes mid-write, the file gets marked as processed but its data isn't in Silver. A transactional version (write-then-mark) would close this gap. Listed in the README's "Future Iterations" section as a real tightening opportunity.

3. Two-stage dedup-merge on load

The loader does deduplication twice. Stage one removes duplicates within the input files via drop_duplicates(subset=["regionid", "date_recorded"]). Stage two pulls existing keys from PostgreSQL and runs a merge(how="left", indicator=True) against them, keeping only rows where the indicator says left_only. Same composite key as the PostgreSQL PRIMARY KEY. Re-runs insert zero rows.

etl/load.py · two-stage dedup-merge
unique_cols = ["regionid", "date_recorded"]

# STAGE 1: Dedup within the input files
before_file = len(df)
df = df.drop_duplicates(subset=unique_cols, keep="first")
logging.info(f"Removed {before_file - len(df)} duplicate rows FROM INPUT FILES")

# STAGE 2: Diff against what's already in the database
existing = pd.read_sql("SELECT regionid, date_recorded FROM ...", engine)
df = df.merge(existing, on=unique_cols, how="left", indicator=True)
df = df[df["_merge"] == "left_only"].drop(columns=["_merge"])

df.to_sql(table_name, engine, schema="carbon_data",
          if_exists="append", index=False, chunksize=20000)

Why pandas merge instead of PostgreSQL's ON CONFLICT DO NOTHING? The merge gives explicit logging on both axes: "removed X rows from input files" and "skipped Y rows already in database." When something goes wrong, the log tells the operator exactly which side the duplicate came from. Trades one PostgreSQL optimization for observability that helps debug an idempotent load.

Observability via custom logger

A custom logger.py module (shared across multiple projects in this portfolio) provides a @timed decorator that wraps every pipeline stage with execution timing, cross-platform UTF-8 detection for emoji-safe console output, a dual-handler setup with colorized console and rotating file logs, and SQLAlchemy noise suppression. Every retry, every empty payload, every dedup result is in pipeline.log.

pipeline.log · actual May 2026 run
2026-05-27 19:38:10 INFO 🔷 Counting Extracted JSON Files
2026-05-27 19:38:10 INFO Total JSON files: 1095
2026-05-27 19:38:10 INFO ⏱️ Step completed in 11m 43s

2026-05-27 19:43:08 INFO 📥 Loaded 53594 new records from Bronze.
2026-05-27 19:43:46 INFO 📥 Exploded generation mix for 8682228 records.
2026-05-27 19:44:54 INFO 📥 Pivoted generation mix for 945092 records.
2026-05-27 19:46:46 INFO 📦 Full Silver dataset ready — 945092 total records.
2026-05-27 19:46:48 INFO ✅ Gold layer ready — 19728 daily records.
2026-05-27 19:47:13 INFO ✅ FULL PIPELINE COMPLETED
2026-05-27 19:47:13 INFO ⏱️ Step completed in 5m 32s

2026-05-27 19:49:01 INFO ✅ Removed 0 duplicate rows FROM INPUT FILES
2026-05-27 19:49:01 INFO ✅ Skipped 0 rows ALREADY IN DATABASE
2026-05-27 19:49:07 INFO ✅ SUCCESS: 19728 NEW rows loaded
2026-05-27 19:49:29 INFO ✅ Skipped 19728 rows ALREADY IN DATABASE
2026-05-27 19:49:29 INFO ℹ️ All rows already exist — table up to date

The final two lines are the visible proof that the dedup-merge actually works. First load: 19,728 rows in. Second load on the same data: 19,728 rows recognized as already present, zero inserted.

05 Decisions

The choices behind the design.

Seven engineering decisions that shaped this pipeline, with the reasoning behind each.

01
Why async aiohttp instead of synchronous requests?
Synchronous requests would have taken 4 to 6x longer than the actual 11m 43s. Each daily API call takes 0.2 to 2 seconds depending on the server's load. At 1,096 sequential calls, that's 30 to 50 minutes minimum. Async with asyncio.Semaphore(5) collapses this to under 12 minutes and never trips rate limits because the semaphore caps in-flight concurrency. The right tool for I/O-bound work is async I/O.
02
Why two layers of rate-limit defense (semaphore and TCP connector)?
asyncio.Semaphore(5) bounds logical task concurrency: how many fetch_carbon_data() coroutines are running at once. aiohttp.TCPConnector(limit=5) bounds physical socket count: how many TCP connections are open to the server. Either alone would work most of the time. Layered, they make a 429 essentially impossible at this scale. In the actual May 2026 run, 1,095 successful requests produced zero rate-limit errors.
03
Why three different idempotency models across the three layers?
The cost of rebuilding each layer is different. Bronze costs 12 minutes and 1,000+ live API calls. Silver costs 5 minutes of Spark. Gold costs 3 seconds. So Bronze uses file-existence checks (cheap to check, expensive to rebuild). Silver uses a tracker file of processed Bronze files (allows incremental Silver growth). Gold uses wipe-and-rebuild (cheap, guarantees freshness). A unified "wipe everything" model would force every run to re-hit the API. A unified "incremental everything" model would make Gold queries return stale data. Different mechanisms, each appropriate to its layer.
04
Why pandas dedup-merge instead of ON CONFLICT DO NOTHING?
PostgreSQL's ON CONFLICT (regionid, date_recorded) DO NOTHING would be the conventional choice. Two-stage pandas dedup-merge wins on observability: the log explicitly says "removed X rows from input files" and "skipped Y rows already in database." When a duplicate appears, the operator can see which side it came from. Trades one PostgreSQL optimization for observability that helps debug an idempotent load. Real production engineering is about debuggability, not raw throughput.
05
Why composite PK on (regionid, date_recorded)?
Natural keys for the data. There is exactly one row per (region, day) in the analytics warehouse, which is the grain analysts query at. The PK enforces this at the database level and matches the same key the Python loader dedupes against. Two enforcement points for the same invariant. PostgreSQL won't accept a duplicate even if the application logic ever drifts. Defense in depth on the integrity constraint that matters most.
06
Why DECIMAL(10,2) on fuel mix percentages instead of FLOAT?
Same reasoning as money in fintech, applied to research-grade environmental data. Floats lose precision (0.1 + 0.2 != 0.3). Fuel mix data is consumed by analysts who compute means, sums, and rolling averages. DECIMAL preserves exactness through aggregation. Carbon intensity (gCO2/kWh) and fuel mix (%) both demand exactness for policy-grade research. The schema commits to two-decimal precision; the transform layer rounds to two decimals with F.round(F.mean(...), 2). Same contract, two layers.
07
Why no run_all.py orchestrator?
Each layer has a meaningfully different runtime profile: Bronze is API-bound, Transform is CPU-bound and JVM-heavy, Load is DB-bound. Running them as separate commands means a failed extract doesn't roll back a working transform, transforms can be re-run against existing Bronze for debugging without re-hitting the API, and external orchestrators (cron, Airflow, Prefect) can wire stages with their own retry policies. For production deployment, an external scheduler is the right place for orchestration. For development, three explicit commands is honest about what's happening at each stage.
06 Impact

What this pipeline produces.

Real numbers from the actual May 2026 pipeline run, not estimates.

1,095
days ingested
Three full years of UK regional carbon intensity data from a live government API. One day (2023-10-22) returned an empty payload and was correctly skipped.
945K
silver records
53,594 raw JSON records exploded to 8.7M intermediate rows, then collapsed back to 945,092 silver records after fuel-type pivoting.
32 / 32
500s recovered
Transient HTTP 500 errors from the upstream API. All 32 retried successfully via exponential backoff. Zero permanent failures, zero rate-limit hits.
0 dupes
on re-run
Second load on identical data: 19,728 rows recognised as already present in PostgreSQL, zero inserted. Dedup-merge working as designed.

End-to-end runtime breakdown

Extract · async API ingestion
11m 43s cold · 4 seconds warm (file-existence idempotency)
Transform · Spark startup
~1m 50s (JVM cold-start cost is real and honest)
Transform · Silver + Gold
5m 32s for full rebuild · seconds if no new Bronze files
Load · dedup-merge to PostgreSQL
8 seconds first load · 2 seconds idempotent re-run (zero inserts)
End-to-end (cold)
~19 minutes on a single laptop
The real story
The 32 retry recoveries aren't a hypothetical. They're 32 days of research data that would have been lost in a naive implementation that didn't retry on transient 500s. The empty-payload defense caught 1 more. Real engineering pays for itself the first time something goes wrong in production.
07 Tech Stack

The tools, and what each one does here.

aiohttp Async HTTP client · semaphore-bounded concurrency
asyncio Concurrency primitives · semaphore, gather
PySpark 4.1.1 Distributed transform engine · explode + pivot
PostgreSQL 14+ Research warehouse · composite PK
pandas CSV consolidation + dedup-merge logic
SQLAlchemy DB engine · pool_pre_ping + pool_recycle
psycopg2-binary PostgreSQL driver
PostgreSQL JDBC Spark JAR for cross-engine writes
python-dotenv Env-var loader · fail-fast at import
Kryo Serializer Spark serialisation tuning
Custom logger @timed decorator + UTF-8 detection
UK Carbon Intensity API Live public data source · National Grid ESO

If I were scaling this up

For production at higher volumes, the natural evolutions would be: cross-platform Spark paths via env vars so the same code runs on macOS, Linux, and Windows (currently hardcoded to C:/hadoop and C:/spark/jars/...); transactional Silver tracker so the processed-files marker only updates after a successful Spark write; a thin run_all.py orchestrator for non-interactive runs with halt-on-failure semantics; secondary indexes on the fact table for time-range queries that don't filter by regionid; a dim_region lookup table with FK from the fact to normalise the small repeated string columns; managed Spark (Databricks, EMR, Dataproc) to parallelise the transform; and dbt models on top of the warehouse for analyst-authored transformations with tested SQL.

Let's talk

Need a data engineer who ships disciplined warehouses?

I'm currently looking for Data Engineer / Analytics Engineer roles. UK-based, open to remote or hybrid.