Airbyte is the open-source ELT platform with 300+ connectors — Airbyte Platform API: POST /v1/connections creates a connection, POST /v1/jobs triggers a sync, GET /v1/jobs/{jobId} polls status. AIRBYTE_API_KEY authenticates to Airbyte Cloud or self-hosted. Custom connectors via Airbyte CDK: from airbyte_cdk.sources import AbstractSource, class MySource(AbstractSource): def check_connection(self, logger, config) -> Tuple[bool, Optional[Any]], def streams(self, config) -> List[Stream]. Stream: class MyStream(HttpStream): url_base = "https://api.example.com", path() -> str, request_params(), parse_response(response, **kwargs) -> Iterable[Mapping]. Incremental: class MyIncrementalStream(HttpStream, IncrementalMixin): cursor_field = "updated_at", get_updated_state(current_stream_state, latest_record) -> Mapping. PyAirbyte (airbyte package): ab.get_source("source-github", config={...}), cache = ab.get_default_cache(), source.read(cache=cache), then cache["repository_activity"].to_pandas() — no server needed for scripting. ab.get_available_connectors() lists all available sources. Connection scheduling: schedule: { cron: "0 * * * *" } or schedule: { basicSchedule: { units: 1, timeUnit: "hours" } }. Full refresh vs incremental: syncMode: "incremental" with destinationSyncMode: "append_dedup" deduplicates by primary key. Claude Code generates Airbyte API clients, custom CDK connectors, PyAirbyte scripts, and connection management endpoints.
CLAUDE.md for Airbyte
## Airbyte Stack
- Platform API: https://api.airbyte.com/v1 (Cloud) or http://localhost:8006/api (OSS)
- Auth: Authorization: Bearer AIRBYTE_API_KEY
- PyAirbyte: pip install airbyte — ab.get_source("source-name", config={...}); no server required
- CDK connector: airbyte-cdk>=1.x — class SourceMyAPI(AbstractSource), HttpStream subclasses
- Incremental: cursor_field + get_updated_state — always prefer over full refresh where supported
- destinationSyncMode: "append_dedup" for deduplication by primary key in warehouse
Airbyte Platform API Client
// lib/airbyte/client.ts — Airbyte Platform API client
const AIRBYTE_API_BASE = process.env.AIRBYTE_API_URL ?? "https://api.airbyte.com/v1"
const AIRBYTE_API_KEY = process.env.AIRBYTE_API_KEY!
async function airbyteFetch<T>(
path: string,
options: RequestInit = {},
): Promise<T> {
const res = await fetch(`${AIRBYTE_API_BASE}${path}`, {
...options,
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${AIRBYTE_API_KEY}`,
...((options.headers as Record<string, string>) ?? {}),
},
})
if (!res.ok) throw new Error(`Airbyte API error ${res.status}: ${await res.text()}`)
return res.json()
}
// ── Connection management ──────────────────────────────────────────────────
export type SyncMode = "full_refresh" | "incremental"
export type DestinationSyncMode = "overwrite" | "append" | "append_dedup"
export type StreamConfig = {
name: string
syncMode: SyncMode
destinationSyncMode: DestinationSyncMode
cursorField?: string[]
primaryKey?: string[][]
}
export type Connection = {
connectionId: string
name: string
status: "active" | "inactive" | "deprecated"
scheduleType: "manual" | "cron" | "basic"
sourceId: string
destinationId: string
}
export async function createConnection(params: {
name: string
sourceId: string
destinationId: string
streams: StreamConfig[]
cron?: string // e.g. "0 * * * *"
namespaceFormat?: string
}): Promise<Connection> {
return airbyteFetch<Connection>("/connections", {
method: "POST",
body: JSON.stringify({
name: params.name,
sourceId: params.sourceId,
destinationId: params.destinationId,
namespaceFormat: params.namespaceFormat ?? "${SOURCE_NAMESPACE}",
scheduleType: params.cron ? "cron" : "manual",
scheduleData: params.cron ? { cron: { cronExpression: params.cron, cronTimeZone: "UTC" } } : undefined,
syncCatalog: {
streams: params.streams.map((s) => ({
stream: { name: s.name },
config: {
syncMode: s.syncMode,
destinationSyncMode: s.destinationSyncMode,
cursorField: s.cursorField ?? [],
primaryKey: s.primaryKey ?? [],
selected: true,
},
})),
},
}),
})
}
export async function listConnections(): Promise<{ data: Connection[] }> {
return airbyteFetch<{ data: Connection[] }>("/connections")
}
export async function deleteConnection(connectionId: string): Promise<void> {
await airbyteFetch(`/connections/${connectionId}`, { method: "DELETE" })
}
// ── Job management ─────────────────────────────────────────────────────────
export type JobStatus = "pending" | "running" | "incomplete" | "failed" | "succeeded" | "cancelled"
export type Job = {
jobId: number
status: JobStatus
jobType: string
startTime?: string
lastUpdated?: string
duration?: string
rowsSynced?: number
bytesSynced?: number
}
export async function triggerSync(connectionId: string): Promise<Job> {
return airbyteFetch<Job>("/jobs", {
method: "POST",
body: JSON.stringify({ connectionId, jobType: "sync" }),
})
}
export async function getJob(jobId: number): Promise<Job> {
return airbyteFetch<Job>(`/jobs/${jobId}`)
}
/** Trigger a sync and poll until completion */
export async function syncAndWait(
connectionId: string,
timeoutMs = 30 * 60 * 1000, // 30 min
): Promise<Job> {
const job = await triggerSync(connectionId)
const deadline = Date.now() + timeoutMs
let current = job
while (Date.now() < deadline) {
if (["succeeded", "failed", "cancelled"].includes(current.status)) return current
await new Promise((r) => setTimeout(r, 5000))
current = await getJob(job.jobId)
}
throw new Error(`Airbyte sync ${job.jobId} timed out after ${timeoutMs}ms`)
}
Custom Airbyte CDK Connector
# connectors/source-my-api/source_my_api/source.py — custom Airbyte connector
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.incremental import IncrementalMixin
class MyApiStream(HttpStream, ABC):
"""Base stream for My API."""
url_base = "https://api.myservice.com/v1/"
primary_key = "id"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
data = response.json()
if data.get("nextCursor"):
return {"cursor": data["nextCursor"]}
return None
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
params: dict = {"limit": 200}
if next_page_token:
params["cursor"] = next_page_token["cursor"]
return params
def parse_response(
self,
response: requests.Response,
**kwargs,
) -> Iterable[Mapping]:
yield from response.json().get("data", [])
class Users(MyApiStream):
"""Full refresh stream for users."""
def path(self, **kwargs) -> str:
return "users"
class Events(MyApiStream, IncrementalMixin):
"""Incremental stream for events using updated_at cursor."""
cursor_field = "updated_at"
def __init__(self, start_date: str, **kwargs):
super().__init__(**kwargs)
self._start_date = start_date
self._cursor_value = start_date
@property
def state(self) -> MutableMapping[str, Any]:
return {self.cursor_field: self._cursor_value}
@state.setter
def state(self, value: MutableMapping[str, Any]) -> None:
self._cursor_value = value.get(self.cursor_field, self._start_date)
def path(self, **kwargs) -> str:
return "events"
def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, **kwargs)
cursor = stream_state.get(self.cursor_field, self._start_date)
params["updated_after"] = cursor
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
for record in response.json().get("data", []):
# Update cursor to max seen value
if record.get(self.cursor_field, "") > self._cursor_value:
self._cursor_value = record[self.cursor_field]
yield record
class SourceMyApi(AbstractSource):
"""My API Airbyte source connector."""
def check_connection(
self,
logger,
config: Mapping[str, Any],
) -> Tuple[bool, Optional[Any]]:
try:
resp = requests.get(
"https://api.myservice.com/v1/health",
headers={"Authorization": f"Bearer {config['api_key']}"},
timeout=10,
)
if resp.ok:
return True, None
return False, f"API returned {resp.status_code}: {resp.text}"
except Exception as e:
return False, str(e)
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = TokenAuthenticator(token=config["api_key"])
return [
Users(authenticator=auth),
Events(authenticator=auth, start_date=config.get("start_date", "2024-01-01")),
]
PyAirbyte Script
# scripts/sync_to_warehouse.py — embedded ELT with PyAirbyte (no server required)
import airbyte as ab
import os
def sync_github_to_duckdb():
"""Sync GitHub data directly to a local DuckDB cache using PyAirbyte."""
source = ab.get_source(
"source-github",
install_if_missing=True,
config={
"repositories": ["anthropics/claude-code"],
"credentials": {
"personal_access_token": os.environ["GITHUB_TOKEN"],
},
},
)
# Validate config
source.check()
# Select specific streams
source.select_streams(["commits", "issues", "pull_requests", "stargazers"])
# Use local DuckDB as cache (default) or MotherDuck
cache = ab.get_default_cache()
# For MotherDuck: cache = ab.new_motherduck_cache(database="my_db", schema="github")
# Run the sync
result = source.read(cache=cache)
# Access as DataFrames
for stream_name, dataset in result.streams.items():
df = dataset.to_pandas()
print(f"{stream_name}: {len(df)} rows")
# Or query with SQL
conn = cache.get_arrow_dataset("commits").to_pandas()
return conn
def sync_postgres_to_bigquery():
"""Configure a Postgres → BigQuery connection via Airbyte Platform API."""
import httpx
api_key = os.environ["AIRBYTE_API_KEY"]
base_url = "https://api.airbyte.com/v1"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
# Trigger existing connection sync
connection_id = os.environ["AIRBYTE_CONNECTION_ID"]
resp = httpx.post(
f"{base_url}/jobs",
json={"connectionId": connection_id, "jobType": "sync"},
headers=headers,
)
resp.raise_for_status()
job = resp.json()
print(f"Started sync job {job['jobId']} — status: {job['status']}")
return job
if __name__ == "__main__":
sync_github_to_duckdb()
For the Fivetran alternative when needing a fully managed, zero-maintenance ELT connector service with SLA-backed data freshness, automatic schema migrations, and connectors to hundreds of SaaS sources without managing infrastructure — Fivetran is the premium managed option while Airbyte is open-source and self-hostable with lower per-connector cost and a large community-built connector catalog. For the dbt Core alternative when the data transformation layer (not ingestion) is the primary concern — dbt is purpose-built for SQL-based transformations in the warehouse (T in ELT) and pairs naturally with Airbyte for the EL step: Airbyte moves data in, dbt transforms it into analytics-ready models. The Claude Skills 360 bundle includes Airbyte skill sets covering Platform API, custom CDK connectors, and PyAirbyte scripting. Start with the free tier to try data integration generation.