Claude Code for Workflow Orchestration: Airflow and Prefect Pipelines — Claude Skills 360 Blog
Blog / Data Engineering / Claude Code for Workflow Orchestration: Airflow and Prefect Pipelines
Data Engineering

Claude Code for Workflow Orchestration: Airflow and Prefect Pipelines

Published: September 13, 2026
Read time: 9 min read
By: Claude Skills 360

Workflow orchestration handles the hard parts of data pipelines: dependencies between tasks, retry logic, backfilling historical data, scheduling, and monitoring. Airflow is the industry standard for complex DAGs with many dependencies; Prefect offers a more Python-native experience with better dynamic task support. Claude Code generates correct DAG/flow definitions — understanding execution context, avoiding common pitfalls like serialization issues and implicit task dependencies.

Airflow: Complex DAG Design

CLAUDE.md for Airflow Projects

## Airflow Configuration
- Airflow 2.8+ with CeleryExecutor (Redis broker)
- DAG location: dags/
- Python packages for tasks: requirements.txt (not default Airflow env)
- Use TaskFlow API (@task decorator) for Python operations
- No heavy computation in DAG file parsing (keep DAGs lightweight)
- Use XCom for small data between tasks (< 1MB); write to S3 for larger
- Connection IDs: postgres_default, aws_default, slack_default

## Critical Rules
- DAG files execute on every scheduler heartbeat — no expensive code at module level
- All secrets via Airflow Variables or Connections — never hardcoded
- Idempotent tasks only — running a task twice should produce the same result
- Every DAG needs: catchup=False (unless backfill intended), tags, doc_md
# dags/customer_data_pipeline.py
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.trigger_rule import TriggerRule
import pandas as pd
import json

@dag(
    dag_id='customer_data_pipeline',
    description='Extract customer data, enrich with ML scores, load to warehouse',
    schedule='0 6 * * *',  # 6 AM daily
    start_date=datetime(2026, 1, 1),
    catchup=False,  # Don't backfill missed runs
    max_active_runs=1,  # Prevent concurrent runs
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'retry_exponential_backoff': True,
        'email_on_failure': True,
        'email': ['[email protected]'],
    },
    tags=['customers', 'daily', 'etl'],
    doc_md="""
    ## Customer Data Pipeline
    
    Extracts new/updated customers from PostgreSQL, scores them with the ML API,
    and loads results to the data warehouse.
    
    **Frequency**: Daily at 6 AM UTC  
    **SLA**: Complete by 7 AM UTC  
    **Owner**: Data Engineering team
    """,
)
def customer_data_pipeline():

    @task
    def extract_customers(execution_date=None) -> str:
        """Extract customers updated in the last 24 hours."""
        hook = PostgresHook(postgres_conn_id='postgres_default')
        
        # Idempotent: use execution_date to ensure consistent backfills
        query = """
            SELECT id, email, created_at, updated_at, lifetime_value
            FROM customers
            WHERE updated_at BETWEEN %(start)s AND %(end)s
            ORDER BY id
        """
        
        start = execution_date - timedelta(days=1)
        end = execution_date
        
        df = hook.get_pandas_df(query, parameters={'start': start, 'end': end})
        
        # Write to S3 (XComs are for small data — customer dumps can be large)
        s3_key = f"pipeline/customers/extracted/{execution_date.strftime('%Y-%m-%d')}/data.parquet"
        s3 = S3Hook(aws_conn_id='aws_default')
        
        # Write to temp file then upload
        import tempfile
        with tempfile.NamedTemporaryFile(suffix='.parquet') as tmp:
            df.to_parquet(tmp.name, index=False)
            s3.load_file(tmp.name, key=s3_key, bucket_name='data-pipeline', replace=True)
        
        print(f"Extracted {len(df)} customers → {s3_key}")
        return s3_key

    @task
    def score_customers(s3_key: str) -> str:
        """Enrich customers with churn probability scores."""
        import boto3
        import httpx
        
        s3_client = boto3.client('s3')
        response = s3_client.get_object(Bucket='data-pipeline', Key=s3_key)
        df = pd.read_parquet(response['Body'])
        
        # Call ML scoring API in batches
        from airflow.models import Variable
        scoring_api_url = Variable.get('scoring_api_url')
        api_key = Variable.get('scoring_api_key', deserialize_json=False)
        
        scores = []
        batch_size = 500
        
        for i in range(0, len(df), batch_size):
            batch = df.iloc[i:i+batch_size]
            response = httpx.post(
                f"{scoring_api_url}/score/batch",
                json={'customers': batch[['id', 'lifetime_value']].to_dict('records')},
                headers={'X-API-Key': api_key},
                timeout=30,
            )
            response.raise_for_status()
            scores.extend(response.json()['scores'])
        
        df['churn_probability'] = [s['churn_probability'] for s in scores]
        df['churn_segment'] = pd.cut(
            df['churn_probability'],
            bins=[0, 0.2, 0.5, 0.8, 1.0],
            labels=['low', 'medium', 'high', 'critical']
        )
        
        scored_key = s3_key.replace('/extracted/', '/scored/')
        with tempfile.NamedTemporaryFile(suffix='.parquet') as tmp:
            df.to_parquet(tmp.name, index=False)
            s3_client.upload_file(tmp.name, 'data-pipeline', scored_key)
        
        return scored_key

    @task
    def load_to_warehouse(s3_key: str, execution_date=None) -> int:
        """Load scored customers to warehouse — idempotent upsert."""
        import boto3
        
        s3_client = boto3.client('s3')
        response = s3_client.get_object(Bucket='data-pipeline', Key=s3_key)
        df = pd.read_parquet(response['Body'])
        
        hook = PostgresHook(postgres_conn_id='warehouse_default')
        
        # Upsert — safe to run multiple times
        records = df.to_dict('records')
        hook.run("""
            INSERT INTO customer_scores (customer_id, churn_probability, churn_segment, scored_at)
            SELECT
                (r->>'id')::uuid,
                (r->>'churn_probability')::numeric,
                r->>'churn_segment',
                NOW()
            FROM jsonb_array_elements(%s::jsonb) AS r(r)
            ON CONFLICT (customer_id)
            DO UPDATE SET
                churn_probability = EXCLUDED.churn_probability,
                churn_segment = EXCLUDED.churn_segment,
                scored_at = EXCLUDED.scored_at
        """, parameters=[json.dumps(records)])
        
        return len(df)

    @task(trigger_rule=TriggerRule.ONE_SUCCESS)
    def notify_slack(row_count: int, execution_date=None):
        """Notify Slack on completion."""
        from airflow.providers.slack.hooks.slack import SlackHook
        
        slack = SlackHook(slack_conn_id='slack_default')
        slack.call(
            api_method='chat.postMessage',
            json={
                'channel': '#data-pipelines',
                'text': f"✅ Customer scoring pipeline complete ({execution_date.strftime('%Y-%m-%d')}): {row_count:,} customers scored",
            }
        )

    # Wire up the DAG
    extracted = extract_customers()
    scored = score_customers(extracted)
    count = load_to_warehouse(scored)
    notify_slack(count)

pipeline = customer_data_pipeline()

Dynamic Task Mapping

I have 50 data sources, each needing the same ETL process.
Don't create 50 separate tasks — generate them dynamically.
@dag(schedule='@daily', start_date=datetime(2026, 1, 1), catchup=False)
def multi_source_etl():
    
    @task
    def get_active_sources() -> list[dict]:
        """Fetch list of active data sources from config table."""
        hook = PostgresHook()
        return hook.get_records(
            "SELECT source_id, source_name, connection_string FROM data_sources WHERE active = true"
        )
    
    @task
    def process_source(source: dict) -> dict:
        """Process one data source — this task runs once per source."""
        source_id, source_name, conn_str = source
        
        # ETL logic using source-specific config
        record_count = run_etl(source_id, conn_str)
        return {'source': source_name, 'records': record_count}
    
    @task
    def summarize_results(results: list[dict]):
        total = sum(r['records'] for r in results)
        print(f"Processed {len(results)} sources, {total:,} total records")
    
    # Dynamic task mapping — creates one task per source at runtime
    sources = get_active_sources()
    results = process_source.expand(source=sources)
    summarize_results(results)

etl_dag = multi_source_etl()

Airflow creates one process_source task per row in get_active_sources(). Add a new row to the config table — the next run picks it up automatically.

Prefect: Python-Native Flows

Same pipeline in Prefect — I prefer the Python-native approach
without XML config or Airflow's deployment complexity.
# flows/customer_pipeline.py
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.blocks.system import Secret
from datetime import timedelta
import httpx

@task(
    retries=3,
    retry_delay_seconds=60,
    # Cache results: same inputs → skip re-execution for 1 hour
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
)
def extract_customers(start_date: str, end_date: str) -> list[dict]:
    logger = get_run_logger()
    
    db_url = Secret.load("postgres-url").get()
    import psycopg2
    
    with psycopg2.connect(db_url) as conn:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT id, email, lifetime_value FROM customers WHERE updated_at BETWEEN %s AND %s",
                (start_date, end_date)
            )
            rows = cur.fetchall()
    
    logger.info(f"Extracted {len(rows)} customers")
    return [{'id': r[0], 'email': r[1], 'ltv': r[2]} for r in rows]

@task(retries=2)
def score_customers(customers: list[dict]) -> list[dict]:
    logger = get_run_logger()
    
    if not customers:
        logger.warning("No customers to score")
        return []
    
    api_key = Secret.load("scoring-api-key").get()
    
    with httpx.Client() as client:
        response = client.post(
            "https://api.internal/score/batch",
            json={'customers': customers},
            headers={'X-API-Key': api_key},
            timeout=60,
        )
        response.raise_for_status()
    
    scores = response.json()['scores']
    logger.info(f"Scored {len(scores)} customers")
    return scores

@task
def load_scores(scores: list[dict]) -> int:
    if not scores:
        return 0
    
    db_url = Secret.load("warehouse-url").get()
    import psycopg2
    from psycopg2.extras import execute_values
    
    with psycopg2.connect(db_url) as conn:
        with conn.cursor() as cur:
            execute_values(cur, """
                INSERT INTO customer_scores (customer_id, churn_probability, scored_at)
                VALUES %s
                ON CONFLICT (customer_id) DO UPDATE
                SET churn_probability = EXCLUDED.churn_probability, scored_at = NOW()
            """, [(s['id'], s['churn_probability']) for s in scores])
        conn.commit()
    
    return len(scores)

@flow(
    name="customer-pipeline",
    description="Daily customer scoring pipeline",
    log_prints=True,
)
def customer_pipeline(date: str | None = None):
    from datetime import datetime, timedelta
    
    if date is None:
        date = datetime.utcnow().strftime('%Y-%m-%d')
    
    start_date = (datetime.strptime(date, '%Y-%m-%d') - timedelta(days=1)).isoformat()
    end_date = datetime.strptime(date, '%Y-%m-%d').isoformat()
    
    # Tasks run sequentially by default; use submit() for parallel execution
    customers = extract_customers(start_date, end_date)
    scores = score_customers(customers)
    count = load_scores(scores)
    
    print(f"Pipeline complete: {count} records loaded")

# Deploy with schedule
if __name__ == "__main__":
    customer_pipeline.serve(
        name="customer-pipeline-daily",
        cron="0 6 * * *",  # 6 AM daily
    )

For the data modeling layer that workflows typically feed, see the dbt analytics guide. For event-driven alternatives to scheduled orchestration — where pipelines trigger on data arrival rather than time, the Kafka guide and event-driven architecture guide cover those patterns. The Claude Skills 360 bundle includes data pipeline skill sets for DAG templates, dynamic task patterns, and testing orchestration code. Start with the free tier to try pipeline generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free