Home » A Coding Guide to Build and Validate End-to-End Partitioned Data Pipelines in Dagster with Machine Learning Integration

A Coding Guide to Build and Validate End-to-End Partitioned Data Pipelines in Dagster with Machine Learning Integration

In this tutorial, we implement an advanced data pipeline using Dagster. We set up a custom CSV-based IOManager to persist assets, define partitioned daily data generation, and process synthetic sales data through cleaning, feature engineering, and model training. Along the way, we add a data-quality asset check to validate nulls, ranges, and categorical values, and we ensure that metadata and outputs are stored in a structured way. The focus throughout is on hands-on implementation, showing how to integrate raw data ingestion, transformations, quality checks, and machine learning into a single reproducible workflow.

import sys, subprocess, json, os
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])


import numpy as np, pandas as pd
from pathlib import Path
from dagster import (
   asset, AssetCheckResult, asset_check, Definitions, materialize, Output,
   DailyPartitionsDefinition, IOManager, io_manager
)
from sklearn.linear_model import LinearRegression


BASE = Path("/content/dagstore"); BASE.mkdir(parents=True, exist_ok=True)
START = "2025-08-01" 

We begin by installing the required libraries, Dagster, Pandas, and scikit-learn, so that we have the full toolset available in Colab. We then import essential modules, set up NumPy and Pandas for data handling, and define a base directory along with a start date to organize our pipeline outputs.

class CSVIOManager(IOManager):
   def __init__(self, base: Path): self.base = base
   def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"
   def handle_output(self, context, obj):
       if isinstance(obj, pd.DataFrame):
           p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)
           context.log.info(f"Saved {context.asset_key} -> {p}")
       else:
           p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))
           context.log.info(f"Saved {context.asset_key} -> {p}")
   def load_input(self, context):
       k = context.upstream_output.asset_key; p = self._path(k, "csv")
       df = pd.read_csv(p); context.log.info(f"Loaded {k} <- {p} ({len(df)} rows)"); return df


@io_manager
def csv_io_manager(_): return CSVIOManager(BASE)


daily = DailyPartitionsDefinition(start_date=START)

We define a custom CSVIOManager to save asset outputs as CSV or JSON files and reload them when needed. We then register it with Dagster as csv_io_manager and set up a daily partitioning scheme so that our pipeline can process data for each date independently.

@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")
def raw_sales(context) -> Output[pd.DataFrame]:
   rng = np.random.default_rng(42)
   n = 200; day = context.partition_key
   x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
   sales = 2.5 * x + 30 * promo + noise + 50
   x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
   df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})
   meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}
   return Output(df, metadata=meta)


@asset(description="Clean nulls, clip outliers for robust downstream modeling.")
def clean_sales(context, raw_sales: pd.DataFrame) -> Output[pd.DataFrame]:
   df = raw_sales.dropna(subset=["units"]).copy()
   lo, hi = df["units"].quantile([0.01, 0.99]); df["units"] = df["units"].clip(lo, hi)
   meta = {"rows": len(df), "units_min": float(df.units.min()), "units_max": float(df.units.max())}
   return Output(df, metadata=meta)


@asset(description="Feature engineering: interactions & standardized columns.")
def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:
   df = clean_sales.copy()
   df["units_sq"] = df["units"] ** 2; df["units_promo"] = df["units"] * df["promo"]
   for c in ["units", "units_sq", "units_promo"]:
       mu, sigma = df[c].mean(), df[c].std(ddof=0) or 1.0
       df[f"z_{c}"] = (df[c] - mu) / sigma
   return Output(df, metadata={"rows": len(df), "cols": list(df.columns)})

We create three core assets for the pipeline. First, raw_sales generates synthetic daily sales data with noise and occasional missing values, simulating real-world imperfections. Next, clean_sales removes nulls and clips outliers to stabilize the dataset, while logging metadata about ranges and row counts. Finally, features perform feature engineering by adding interaction and standardized variables, preparing the data for downstream modeling.

@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")
def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:
   nulls = int(clean_sales.isna().sum().sum())
   promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))
   units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())
   passed = bool((nulls == 0) and promo_ok and units_ok)
   return AssetCheckResult(
       passed=passed,
       metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},
   )


@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")
def tiny_model_metrics(context, features: pd.DataFrame) -> dict:
   X = features[["z_units", "z_units_sq", "z_units_promo", "promo"]].values
   y = features["sales"].values
   model = LinearRegression().fit(X, y)
   return {"r2_train": float(model.score(X, y)),
           **{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}}

We strengthen the pipeline with validation and modeling. The clean_sales_quality asset check enforces data integrity by verifying that there are no nulls, the promo field only has 0/1 values, and the cleaned units remain within valid bounds. After that, tiny_model_metrics trains a simple linear regression on the engineered features and outputs key metrics like training and learned coefficients, giving us a lightweight but complete modeling step within the Dagster workflow.

defs = Definitions(
   assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
   resources={"io_manager": csv_io_manager}
)


if __name__ == "__main__":
   run_day = os.environ.get("RUN_DATE") or START
   print("Materializing everything for:", run_day)
   result = materialize(
       [raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
       partition_key=run_day,
       resources={"io_manager": csv_io_manager},
   )
   print("Run success:", result.success)


   for fname in ["raw_sales.csv","clean_sales.csv","features.csv","tiny_model_metrics.json"]:
       f = BASE / fname
       if f.exists():
           print(fname, "->", f.stat().st_size, "bytes")
           if fname.endswith(".json"):
               print("Metrics:", json.loads(f.read_text()))

We register our assets and the IO manager in Definitions, then materialize the entire DAG for a selected partition key in one run. We persist CSV/JSON artifacts to /content/dagstore and print a quick success flag, plus saved file sizes and model metrics for immediate verification.

In conclusion, we materialize all assets and checks in a single Dagster run, confirm data quality, and train a regression model whose metrics are stored for inspection. We keep the pipeline modular, with each asset producing and persisting its outputs in CSV or JSON, and ensure compatibility by explicitly converting metadata values to supported types. This tutorial demonstrates how we can combine partitioning, asset definitions, and checks to build a technically robust and reproducible workflow, giving us a practical framework to extend toward more complex real-world pipelines.


Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.


Sana Hassan, a consulting intern at Marktechpost and dual-degree student at IIT Madras, is passionate about applying technology and AI to address real-world challenges. With a keen interest in solving practical problems, he brings a fresh perspective to the intersection of AI and real-life solutions.

Related Posts

Leave a Reply

Your email address will not be published. Required fields are marked *