Workflow orchestration handles the hard parts of data pipelines: dependencies between tasks, retry logic, backfilling historical data, scheduling, and monitoring. Airflow is the industry standard for complex DAGs with many dependencies; Prefect offers a more Python-native experience with better dynamic task support. Claude Code generates correct DAG/flow definitions — understanding execution context, avoiding common pitfalls like serialization issues and implicit task dependencies.
Airflow: Complex DAG Design
CLAUDE.md for Airflow Projects
## Airflow Configuration
- Airflow 2.8+ with CeleryExecutor (Redis broker)
- DAG location: dags/
- Python packages for tasks: requirements.txt (not default Airflow env)
- Use TaskFlow API (@task decorator) for Python operations
- No heavy computation in DAG file parsing (keep DAGs lightweight)
- Use XCom for small data between tasks (< 1MB); write to S3 for larger
- Connection IDs: postgres_default, aws_default, slack_default
## Critical Rules
- DAG files execute on every scheduler heartbeat — no expensive code at module level
- All secrets via Airflow Variables or Connections — never hardcoded
- Idempotent tasks only — running a task twice should produce the same result
- Every DAG needs: catchup=False (unless backfill intended), tags, doc_md
# dags/customer_data_pipeline.py
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.trigger_rule import TriggerRule
import pandas as pd
import json
@dag(
dag_id='customer_data_pipeline',
description='Extract customer data, enrich with ML scores, load to warehouse',
schedule='0 6 * * *', # 6 AM daily
start_date=datetime(2026, 1, 1),
catchup=False, # Don't backfill missed runs
max_active_runs=1, # Prevent concurrent runs
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'email_on_failure': True,
'email': ['[email protected]'],
},
tags=['customers', 'daily', 'etl'],
doc_md="""
## Customer Data Pipeline
Extracts new/updated customers from PostgreSQL, scores them with the ML API,
and loads results to the data warehouse.
**Frequency**: Daily at 6 AM UTC
**SLA**: Complete by 7 AM UTC
**Owner**: Data Engineering team
""",
)
def customer_data_pipeline():
@task
def extract_customers(execution_date=None) -> str:
"""Extract customers updated in the last 24 hours."""
hook = PostgresHook(postgres_conn_id='postgres_default')
# Idempotent: use execution_date to ensure consistent backfills
query = """
SELECT id, email, created_at, updated_at, lifetime_value
FROM customers
WHERE updated_at BETWEEN %(start)s AND %(end)s
ORDER BY id
"""
start = execution_date - timedelta(days=1)
end = execution_date
df = hook.get_pandas_df(query, parameters={'start': start, 'end': end})
# Write to S3 (XComs are for small data — customer dumps can be large)
s3_key = f"pipeline/customers/extracted/{execution_date.strftime('%Y-%m-%d')}/data.parquet"
s3 = S3Hook(aws_conn_id='aws_default')
# Write to temp file then upload
import tempfile
with tempfile.NamedTemporaryFile(suffix='.parquet') as tmp:
df.to_parquet(tmp.name, index=False)
s3.load_file(tmp.name, key=s3_key, bucket_name='data-pipeline', replace=True)
print(f"Extracted {len(df)} customers → {s3_key}")
return s3_key
@task
def score_customers(s3_key: str) -> str:
"""Enrich customers with churn probability scores."""
import boto3
import httpx
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket='data-pipeline', Key=s3_key)
df = pd.read_parquet(response['Body'])
# Call ML scoring API in batches
from airflow.models import Variable
scoring_api_url = Variable.get('scoring_api_url')
api_key = Variable.get('scoring_api_key', deserialize_json=False)
scores = []
batch_size = 500
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
response = httpx.post(
f"{scoring_api_url}/score/batch",
json={'customers': batch[['id', 'lifetime_value']].to_dict('records')},
headers={'X-API-Key': api_key},
timeout=30,
)
response.raise_for_status()
scores.extend(response.json()['scores'])
df['churn_probability'] = [s['churn_probability'] for s in scores]
df['churn_segment'] = pd.cut(
df['churn_probability'],
bins=[0, 0.2, 0.5, 0.8, 1.0],
labels=['low', 'medium', 'high', 'critical']
)
scored_key = s3_key.replace('/extracted/', '/scored/')
with tempfile.NamedTemporaryFile(suffix='.parquet') as tmp:
df.to_parquet(tmp.name, index=False)
s3_client.upload_file(tmp.name, 'data-pipeline', scored_key)
return scored_key
@task
def load_to_warehouse(s3_key: str, execution_date=None) -> int:
"""Load scored customers to warehouse — idempotent upsert."""
import boto3
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket='data-pipeline', Key=s3_key)
df = pd.read_parquet(response['Body'])
hook = PostgresHook(postgres_conn_id='warehouse_default')
# Upsert — safe to run multiple times
records = df.to_dict('records')
hook.run("""
INSERT INTO customer_scores (customer_id, churn_probability, churn_segment, scored_at)
SELECT
(r->>'id')::uuid,
(r->>'churn_probability')::numeric,
r->>'churn_segment',
NOW()
FROM jsonb_array_elements(%s::jsonb) AS r(r)
ON CONFLICT (customer_id)
DO UPDATE SET
churn_probability = EXCLUDED.churn_probability,
churn_segment = EXCLUDED.churn_segment,
scored_at = EXCLUDED.scored_at
""", parameters=[json.dumps(records)])
return len(df)
@task(trigger_rule=TriggerRule.ONE_SUCCESS)
def notify_slack(row_count: int, execution_date=None):
"""Notify Slack on completion."""
from airflow.providers.slack.hooks.slack import SlackHook
slack = SlackHook(slack_conn_id='slack_default')
slack.call(
api_method='chat.postMessage',
json={
'channel': '#data-pipelines',
'text': f"✅ Customer scoring pipeline complete ({execution_date.strftime('%Y-%m-%d')}): {row_count:,} customers scored",
}
)
# Wire up the DAG
extracted = extract_customers()
scored = score_customers(extracted)
count = load_to_warehouse(scored)
notify_slack(count)
pipeline = customer_data_pipeline()
Dynamic Task Mapping
I have 50 data sources, each needing the same ETL process.
Don't create 50 separate tasks — generate them dynamically.
@dag(schedule='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def multi_source_etl():
@task
def get_active_sources() -> list[dict]:
"""Fetch list of active data sources from config table."""
hook = PostgresHook()
return hook.get_records(
"SELECT source_id, source_name, connection_string FROM data_sources WHERE active = true"
)
@task
def process_source(source: dict) -> dict:
"""Process one data source — this task runs once per source."""
source_id, source_name, conn_str = source
# ETL logic using source-specific config
record_count = run_etl(source_id, conn_str)
return {'source': source_name, 'records': record_count}
@task
def summarize_results(results: list[dict]):
total = sum(r['records'] for r in results)
print(f"Processed {len(results)} sources, {total:,} total records")
# Dynamic task mapping — creates one task per source at runtime
sources = get_active_sources()
results = process_source.expand(source=sources)
summarize_results(results)
etl_dag = multi_source_etl()
Airflow creates one process_source task per row in get_active_sources(). Add a new row to the config table — the next run picks it up automatically.
Prefect: Python-Native Flows
Same pipeline in Prefect — I prefer the Python-native approach
without XML config or Airflow's deployment complexity.
# flows/customer_pipeline.py
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from datetime import timedelta
import httpx
@task(
retries=3,
retry_delay_seconds=60,
# Cache results: same inputs → skip re-execution for 1 hour
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def extract_customers(start_date: str, end_date: str) -> list[dict]:
logger = get_run_logger()
db_url = Secret.load("postgres-url").get()
import psycopg2
with psycopg2.connect(db_url) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, email, lifetime_value FROM customers WHERE updated_at BETWEEN %s AND %s",
(start_date, end_date)
)
rows = cur.fetchall()
logger.info(f"Extracted {len(rows)} customers")
return [{'id': r[0], 'email': r[1], 'ltv': r[2]} for r in rows]
@task(retries=2)
def score_customers(customers: list[dict]) -> list[dict]:
logger = get_run_logger()
if not customers:
logger.warning("No customers to score")
return []
api_key = Secret.load("scoring-api-key").get()
with httpx.Client() as client:
response = client.post(
"https://api.internal/score/batch",
json={'customers': customers},
headers={'X-API-Key': api_key},
timeout=60,
)
response.raise_for_status()
scores = response.json()['scores']
logger.info(f"Scored {len(scores)} customers")
return scores
@task
def load_scores(scores: list[dict]) -> int:
if not scores:
return 0
db_url = Secret.load("warehouse-url").get()
import psycopg2
from psycopg2.extras import execute_values
with psycopg2.connect(db_url) as conn:
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO customer_scores (customer_id, churn_probability, scored_at)
VALUES %s
ON CONFLICT (customer_id) DO UPDATE
SET churn_probability = EXCLUDED.churn_probability, scored_at = NOW()
""", [(s['id'], s['churn_probability']) for s in scores])
conn.commit()
return len(scores)
@flow(
name="customer-pipeline",
description="Daily customer scoring pipeline",
log_prints=True,
)
def customer_pipeline(date: str | None = None):
from datetime import datetime, timedelta
if date is None:
date = datetime.utcnow().strftime('%Y-%m-%d')
start_date = (datetime.strptime(date, '%Y-%m-%d') - timedelta(days=1)).isoformat()
end_date = datetime.strptime(date, '%Y-%m-%d').isoformat()
# Tasks run sequentially by default; use submit() for parallel execution
customers = extract_customers(start_date, end_date)
scores = score_customers(customers)
count = load_scores(scores)
print(f"Pipeline complete: {count} records loaded")
# Deploy with schedule
if __name__ == "__main__":
customer_pipeline.serve(
name="customer-pipeline-daily",
cron="0 6 * * *", # 6 AM daily
)
For the data modeling layer that workflows typically feed, see the dbt analytics guide. For event-driven alternatives to scheduled orchestration — where pipelines trigger on data arrival rather than time, the Kafka guide and event-driven architecture guide cover those patterns. The Claude Skills 360 bundle includes data pipeline skill sets for DAG templates, dynamic task patterns, and testing orchestration code. Start with the free tier to try pipeline generation.