dlt is the Python library for building ELT pipelines — pip install dlt. import dlt. @dlt.source wraps a function returning resources. @dlt.resource(write_disposition="append", primary_key="id") yields rows. @dlt.resource(write_disposition="merge", primary_key="id", merge_key="updated_at") upserts. Incremental: @dlt.resource(incremental=dlt.sources.incremental("updated_at", initial_value="2024-01-01")) — context.last_value gives the cursor. dlt.pipeline(pipeline_name="my_pipeline", destination="bigquery", dataset_name="raw") creates a pipeline. pipeline.run(my_source(), write_disposition="append") loads data. pipeline.run([resource1, resource2]) loads multiple resources. Auth: dlt.secrets["sources.my_source.api_key"] reads from .dlt/secrets.toml. REST API source: from dlt.sources.rest_api import rest_api_source, configure with client, resources: [{ name, endpoint, write_disposition, incremental }]. dlt.sources.filesystem("s3://bucket/*.parquet") reads files. Nested data: dlt.mark.with_hints(table_name="orders__items") for child table flattening. pipeline.last_trace.last_normalize_info shows schema. dlt run show CLI shows pipeline status. pipeline.dataset().orders.df() reads loaded data back. Destinations: dlt.destinations.bigquery(), dlt.destinations.duckdb(), dlt.destinations.snowflake(), dlt.destinations.postgres(). Claude Code generates dlt source functions, REST API pipelines, incremental loaders, and destination configurations.
CLAUDE.md for dlt
## dlt Stack
- Version: dlt >= 0.4 (data load tool, formerly dlthub)
- Install: pip install "dlt[bigquery]" or "dlt[duckdb]" or "dlt[snowflake]"
- Config: .dlt/config.toml for non-secret config, .dlt/secrets.toml for credentials
- pipeline = dlt.pipeline(pipeline_name, destination, dataset_name)
- pipeline.run(source, write_disposition="append"|"merge"|"replace")
- Incremental: @dlt.resource(incremental=dlt.sources.incremental("updated_at", initial_value="2024-01-01"))
- Merge: @dlt.resource(write_disposition="merge", primary_key="id")
- Secrets: dlt.secrets["path.to.secret"] — loaded from .dlt/secrets.toml or env SOURCES__MY__API_KEY
dlt Source and Resources
# pipelines/sources/my_api.py — dlt source with REST API and incremental loading
import dlt
from dlt.sources import incremental
from dlt.sources.helpers import requests
from typing import Iterator, Sequence
from datetime import datetime
@dlt.source(name="my_app_api")
def my_app_source(
api_base_url: str = dlt.config.value,
api_key: str = dlt.secrets.value,
):
"""Source for My App REST API."""
return (
users_resource(api_base_url, api_key),
orders_resource(api_base_url, api_key),
events_resource(api_base_url, api_key),
)
@dlt.resource(
name="users",
write_disposition="merge",
primary_key="id",
)
def users_resource(
api_base_url: str,
api_key: str,
) -> Iterator[dict]:
"""Load all users — full sync with merge dedup."""
session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"
cursor = None
while True:
params = {"limit": 500}
if cursor:
params["cursor"] = cursor
resp = session.get(f"{api_base_url}/api/admin/users", params=params)
resp.raise_for_status()
data = resp.json()
yield from data["data"]
cursor = data.get("nextCursor")
if not cursor:
break
@dlt.resource(
name="orders",
write_disposition="append",
primary_key="id",
columns={
"id": {"data_type": "text", "nullable": False},
"user_id": {"data_type": "text", "nullable": False},
"amount": {"data_type": "double", "nullable": False},
"status": {"data_type": "text"},
"created_at": {"data_type": "timestamp", "nullable": False},
},
)
def orders_resource(
api_base_url: str,
api_key: str,
updated_after: dlt.sources.incremental[str] = dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z",
),
) -> Iterator[dict]:
"""Load orders incrementally — only new/changed since last run."""
session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"
cursor_date = updated_after.last_value
page = 1
while True:
resp = session.get(f"{api_base_url}/api/admin/orders", params={
"updated_after": cursor_date,
"page": page,
"limit": 1000,
})
resp.raise_for_status()
data = resp.json()
if not data["data"]:
break
yield from data["data"]
page += 1
if page > data.get("pageCount", 1):
break
@dlt.resource(
name="events",
write_disposition="append",
primary_key=["session_id", "event_id"],
columns={
"timestamp": {"data_type": "timestamp", "partition": True}, # BigQuery partition
},
)
def events_resource(
api_base_url: str,
api_key: str,
last_timestamp: dlt.sources.incremental[str] = dlt.sources.incremental(
"timestamp",
initial_value="2024-01-01T00:00:00Z",
),
) -> Iterator[dict]:
"""Stream events incrementally from last seen timestamp."""
session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"
resp = session.get(f"{api_base_url}/api/admin/events/export", params={
"after": last_timestamp.last_value,
"limit": 5000,
})
resp.raise_for_status()
yield from resp.json()["events"]
REST API Source (dlt built-in)
# pipelines/sources/rest_source.py — use dlt's built-in REST API source
import dlt
from dlt.sources.rest_api import rest_api_source, RESTAPIConfig, EndpointResource
def create_rest_source(api_base_url: str, api_key: str):
"""Configure dlt REST API source with automatic pagination."""
config: RESTAPIConfig = {
"client": {
"base_url": api_base_url,
"auth": {
"type": "bearer",
"token": api_key,
},
"paginator": {
"type": "cursor",
"cursor_path": "nextCursor",
"cursor_param": "cursor",
},
},
"resource_defaults": {
"write_disposition": "append",
"primary_key": "id",
},
"resources": [
{
"name": "products",
"endpoint": "products",
"write_disposition": "replace", # Full refresh
},
{
"name": "orders",
"endpoint": "orders",
"incremental": dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z",
),
"params": {
"updated_after": "{incremental.start_value}",
"limit": 1000,
},
},
{
"name": "order_items",
"endpoint": "orders/{id}/items", # Child resource
"include_from_parent": ["id"],
"write_disposition": "append",
},
],
}
return rest_api_source(config)
Pipeline Runner
# pipelines/run.py — pipeline runner with multiple destination support
import dlt
import os
from sources.my_api import my_app_source
from sources.rest_source import create_rest_source
def run_pipeline(
destination: str = "duckdb",
dataset: str = "raw",
full_refresh: bool = False,
) -> dict:
"""Run the pipeline and return load info."""
# Build destination
if destination == "bigquery":
dest = dlt.destinations.bigquery(location="US")
elif destination == "snowflake":
dest = dlt.destinations.snowflake()
elif destination == "postgres":
dest = dlt.destinations.postgres()
else:
dest = dlt.destinations.duckdb(
os.environ.get("DUCKDB_PATH", "data/warehouse.duckdb")
)
pipeline = dlt.pipeline(
pipeline_name=f"my_app_{destination}",
destination=dest,
dataset_name=dataset,
dev_mode=False,
)
# Reset state for full refresh
if full_refresh:
pipeline.drop()
source = my_app_source(
api_base_url=os.environ["APP_API_URL"],
api_key=os.environ["APP_API_KEY"],
)
load_info = pipeline.run(source)
print(pipeline.last_trace.last_normalize_info)
print(f"Loaded: {load_info}")
return {
"pipeline_name": pipeline.pipeline_name,
"load_id": load_info.load_id if hasattr(load_info, "load_id") else None,
"schema": pipeline.default_schema.name,
}
if __name__ == "__main__":
import sys
dest = sys.argv[1] if len(sys.argv) > 1 else "duckdb"
run_pipeline(destination=dest)
For the Airbyte alternative when needing a GUI-driven, no-code ingestion platform with 300+ managed connectors and a hosted Cloud option where non-engineers can configure data sync without writing Python — Airbyte is better for teams that want UI-based connector management while dlt is better for engineers who want pipeline-as-code with full Python control, type safety, schema inference, and GitOps deployment. For the Singer/Meltano alternative when already using the tap/target Singer ecosystem with existing community taps — Meltano wraps Singer taps and adds project config management while dlt provides a more modern Python SDK with built-in incremental state, schema evolution, and native DuckDB/BigQuery destinations without the tap/target abstraction layer. The Claude Skills 360 bundle includes dlt skill sets covering source functions, incremental loading, merge strategies, and destination configuration. Start with the free tier to try ELT pipeline generation.