Dagster is the data orchestrator built around software-defined assets — @asset decorators define what data you produce. from dagster import asset, AssetExecutionContext, Definitions, ScheduleDefinition, sensor gives you everything. @asset(deps=["upstream_asset"]) builds the DAG from dependencies. context.log.info("msg") for structured logging; context.add_output_metadata({"row_count": n}) attaches metadata shown in the UI. @multi_asset(outs={"users": AssetOut(), "orders": AssetOut()}) yields multiple outputs. Resources provide injectable dependencies: class PostgresResource(ConfigurableResource): url: str then with self.db_connection() as conn. IOManagers control storage: class S3IOManager(IOManager): def handle_output(self, context, obj) + def load_input. Schedules: ScheduleDefinition(job=refresh_job, cron_schedule="0 * * * *") triggers on a schedule. Sensors: @sensor(asset_selection=AssetSelection.assets("raw_events")) triggers on external events. Partitions: DailyPartitionsDefinition(start_date="2025-01-01"), @asset(partitions_def=daily_partitions), context.partition_key gives “2025-07-15”. dbt: load_assets_from_dbt_project(project_dir="dbt") turns dbt models into Dagster assets with full lineage. Definitions(assets=[...], resources={...}, schedules=[...], sensors=[...]) assembles the workspace. dagster dev starts the local UI. Claude Code generates Dagster asset pipelines, resource configurations, partitioned jobs, and dbt integrations.
CLAUDE.md for Dagster
## Dagster Stack
- Version: dagster >= 1.7, dagster-webserver for UI
- Assets: @asset(deps=[], group_name="ingestion", compute_kind="python")
- Resources: class MyResource(ConfigurableResource) — injected via definitions
- IOManager: class MyIOManager(IOManager) — handle_output + load_input
- Partitions: DailyPartitionsDefinition(start_date="2025-01-01")
- Definitions: Definitions(assets=all_assets, resources={"db": DbResource(url=...)})
- Run via: dagster dev (local) or dagster-daemon for schedules/sensors
- dbt: load_assets_from_dbt_project(project_dir="./dbt", profiles_dir="./dbt")
Core Asset Pipeline
# pipelines/assets/ingestion.py — raw data ingestion assets
from dagster import (
asset,
AssetExecutionContext,
multi_asset,
AssetOut,
MaterializeResult,
MetadataValue,
DailyPartitionsDefinition,
)
from dagster_postgres import PostgresResource
import pandas as pd
import requests
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
@asset(
group_name="ingestion",
compute_kind="api",
description="Raw events fetched from the application API",
)
def raw_events(context: AssetExecutionContext) -> pd.DataFrame:
"""Fetch raw events from the REST API."""
api_key = context.resources.app_api.api_key
base_url = context.resources.app_api.base_url
resp = requests.get(
f"{base_url}/api/admin/events",
headers={"Authorization": f"Bearer {api_key}"},
timeout=30,
)
resp.raise_for_status()
data = resp.json()["data"]
df = pd.DataFrame(data)
context.add_output_metadata({
"row_count": MetadataValue.int(len(df)),
"columns": MetadataValue.json(list(df.columns)),
"preview": MetadataValue.md(df.head(5).to_markdown()),
})
return df
@asset(
group_name="ingestion",
compute_kind="api",
partitions_def=daily_partitions,
description="Daily event partition from API",
)
def raw_events_partitioned(context: AssetExecutionContext) -> pd.DataFrame:
"""Fetch events for a specific date partition."""
date = context.partition_key # "2025-07-15"
api_key = context.resources.app_api.api_key
base_url = context.resources.app_api.base_url
resp = requests.get(
f"{base_url}/api/admin/events",
params={"date": date, "limit": 10000},
headers={"Authorization": f"Bearer {api_key}"},
timeout=60,
)
resp.raise_for_status()
df = pd.DataFrame(resp.json()["data"])
df["partition_date"] = date
context.log.info(f"Fetched {len(df)} events for {date}")
return df
@multi_asset(
outs={
"clean_users": AssetOut(group_name="transform", description="Deduplicated user records"),
"clean_orders": AssetOut(group_name="transform", description="Validated order records"),
},
deps=["raw_events"],
compute_kind="pandas",
)
def split_events(
context: AssetExecutionContext,
raw_events: pd.DataFrame,
):
"""Split and clean raw events into typed tables."""
users_df = (
raw_events[raw_events["type"] == "user_created"]
.drop_duplicates(subset=["userId"])
.rename(columns={"userId": "id", "email": "email"})
)
orders_df = (
raw_events[raw_events["type"] == "order_placed"]
.dropna(subset=["orderId", "userId", "amount"])
)
context.log.info(f"Users: {len(users_df)}, Orders: {len(orders_df)}")
return users_df, orders_df
Resources
# pipelines/resources.py — injectable Dagster resources
from dagster import ConfigurableResource, resource, InitResourceContext
from contextlib import contextmanager
from typing import Generator
import psycopg2
import psycopg2.extras
import requests
class AppApiResource(ConfigurableResource):
"""REST API client for the application."""
base_url: str
api_key: str
def get_json(self, path: str, **params) -> dict:
resp = requests.get(
f"{self.base_url}{path}",
params=params,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=30,
)
resp.raise_for_status()
return resp.json()
def post_json(self, path: str, body: dict) -> dict:
resp = requests.post(
f"{self.base_url}{path}",
json=body,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=30,
)
resp.raise_for_status()
return resp.json()
class WarehouseResource(ConfigurableResource):
"""PostgreSQL data warehouse connection."""
dsn: str # postgresql://user:pass@host:5432/db
@contextmanager
def get_connection(self) -> Generator[psycopg2.extensions.connection, None, None]:
conn = psycopg2.connect(self.dsn, cursor_factory=psycopg2.extras.RealDictCursor)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def execute(self, sql: str, params=None) -> list[dict]:
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
if cur.description:
return [dict(row) for row in cur.fetchall()]
return []
def copy_from_df(self, df, table: str) -> int:
"""Bulk insert a DataFrame using COPY."""
import io
buf = io.StringIO()
df.to_csv(buf, index=False, header=False)
buf.seek(0)
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.copy_from(buf, table, sep=",", null="")
return cur.rowcount
IOManager
# pipelines/io_managers.py — custom storage IOManager
from dagster import IOManager, OutputContext, InputContext, io_manager
import pandas as pd
import boto3
import io
class S3ParquetIOManager(IOManager):
"""Store DataFrames as Parquet in S3."""
def __init__(self, bucket: str, prefix: str = "dagster"):
self.bucket = bucket
self.prefix = prefix
self.s3 = boto3.client("s3")
def _get_key(self, context: OutputContext | InputContext) -> str:
parts = list(context.asset_key.path)
if context.has_partition_key:
parts.append(context.partition_key)
return f"{self.prefix}/{'/'.join(parts)}.parquet"
def handle_output(self, context: OutputContext, obj: pd.DataFrame) -> None:
key = self._get_key(context)
buf = io.BytesIO()
obj.to_parquet(buf, index=False, compression="snappy")
buf.seek(0)
self.s3.put_object(Bucket=self.bucket, Key=key, Body=buf.getvalue())
context.log.info(f"Wrote {len(obj)} rows to s3://{self.bucket}/{key}")
def load_input(self, context: InputContext) -> pd.DataFrame:
key = self._get_key(context)
buf = io.BytesIO()
self.s3.download_fileobj(self.bucket, key, buf)
buf.seek(0)
return pd.read_parquet(buf)
@io_manager
def s3_parquet_io_manager(init_context) -> S3ParquetIOManager:
return S3ParquetIOManager(
bucket=init_context.resource_config["bucket"],
prefix=init_context.resource_config.get("prefix", "dagster"),
)
Schedules and Sensors
# pipelines/schedules.py — schedules and sensors
from dagster import (
ScheduleDefinition,
DefaultScheduleStatus,
sensor,
SensorEvaluationContext,
RunRequest,
SkipReason,
AssetSelection,
define_asset_job,
)
# ── Jobs ──────────────────────────────────────────────────────────────────
daily_ingest_job = define_asset_job(
name="daily_ingest",
selection=AssetSelection.groups("ingestion", "transform"),
tags={"team": "data-eng"},
)
hourly_metrics_job = define_asset_job(
name="hourly_metrics",
selection=AssetSelection.keys("user_metrics", "revenue_metrics"),
)
# ── Schedules ─────────────────────────────────────────────────────────────
daily_schedule = ScheduleDefinition(
name="daily_pipeline",
job=daily_ingest_job,
cron_schedule="0 2 * * *", # 2am UTC
default_status=DefaultScheduleStatus.RUNNING,
)
hourly_schedule = ScheduleDefinition(
name="hourly_metrics",
job=hourly_metrics_job,
cron_schedule="15 * * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
# ── Sensor ────────────────────────────────────────────────────────────────
@sensor(
job=daily_ingest_job,
minimum_interval_seconds=60,
)
def new_data_sensor(context: SensorEvaluationContext):
"""Trigger ingestion when new export files appear in S3."""
import boto3
s3 = boto3.client("s3")
bucket = "my-exports"
cursor = context.cursor or "0"
last_ts = float(cursor)
paginator = s3.get_paginator("list_objects_v2")
new_keys = []
for page in paginator.paginate(Bucket=bucket, Prefix="exports/"):
for obj in page.get("Contents", []):
if obj["LastModified"].timestamp() > last_ts:
new_keys.append(obj["Key"])
if not new_keys:
return SkipReason("No new export files found")
new_cursor = str(max(
s3.head_object(Bucket=bucket, Key=k)["LastModified"].timestamp()
for k in new_keys
))
return RunRequest(
run_key=new_cursor,
run_config={"ops": {"raw_events": {"config": {"keys": new_keys}}}},
tags={"triggered_by": "s3_sensor", "file_count": str(len(new_keys))},
)
Definitions Assembly
# pipelines/definitions.py — Dagster Definitions (workspace entry point)
import os
from dagster import Definitions, load_assets_from_modules
from dagster_dbt import load_assets_from_dbt_project
from . import assets
from .resources import AppApiResource, WarehouseResource
from .io_managers import s3_parquet_io_manager
from .schedules import daily_schedule, hourly_schedule, new_data_sensor
all_assets = load_assets_from_modules([assets])
# Load dbt models as Dagster assets (lineage auto-wired)
dbt_assets = load_assets_from_dbt_project(
project_dir=os.environ.get("DBT_PROJECT_DIR", "./dbt"),
profiles_dir=os.environ.get("DBT_PROFILES_DIR", "./dbt"),
key_prefix="dbt",
)
defs = Definitions(
assets = [*all_assets, *dbt_assets],
resources = {
"app_api": AppApiResource(
base_url = os.environ["APP_BASE_URL"],
api_key = os.environ["APP_API_KEY"],
),
"warehouse": WarehouseResource(dsn=os.environ["WAREHOUSE_DSN"]),
"io_manager": s3_parquet_io_manager.configured({
"bucket": os.environ.get("S3_BUCKET", "my-datalake"),
"prefix": "dagster",
}),
},
schedules = [daily_schedule, hourly_schedule],
sensors = [new_data_sensor],
)
For the Prefect alternative when needing a Python-native orchestrator with a simpler imperative flow model (@flow and @task), built-in parallelism with .map(), automatic retries and caching, and first-class support for serverless execution without deploying a persistent daemon — Prefect optimizes for developer ergonomics and dynamic workflows while Dagster is built around the asset graph model with a richer UI for data lineage and data quality checks. For the Apache Airflow alternative when operating in an environment that standardizes on Airflow (Astronomer, MWAA, Cloud Composer) with a large ecosystem of existing DAGs and operators — Airflow is the incumbent task-based orchestrator while Dagster is the modern asset-based successor with better software engineering practices, type safety, and first-class partitioning. The Claude Skills 360 bundle includes Dagster skill sets covering asset pipelines, resources, partitions, and dbt integrations. Start with the free tier to try data pipeline generation.