DataHub is the open-source data catalog for metadata management and data discovery. Python SDK: from datahub.emitter.rest_emitter import DatahubRestEmitter, from datahub.emitter.mce_builder import make_dataset_urn, make_user_urn. emitter = DatahubRestEmitter(gms_server="http://localhost:8080", token=TOKEN). Dataset: DatasetProperties(description="...", customProperties={"team": "data-eng"}). Schema: SchemaMetadata(schemaName="orders", platform="urn:li:dataPlatform:postgres", fields=[SchemaField(fieldPath="id", type=SchemaFieldDataType(type=NumberType()), nativeDataType="int8")]). MetadataChangeProposalWrapper(entityUrn=urn, aspect=dataset_props).emit(emitter). Lineage: UpstreamLineage(upstreams=[Upstream(dataset=upstream_urn, type=DatasetLineageType.TRANSFORMED)]). Job lineage: DataJobInputOutput(inputDatasets=[input_urn], outputDatasets=[output_urn]). Tags: GlobalTagsClass(tags=[TagAssociationClass(tag=make_tag_urn("pii"))]). Ownership: Ownership(owners=[Owner(owner=make_user_urn("john"), type=OwnershipType.DATAOWNER)]). Glossary terms: GlossaryTermsClass(terms=[GlossaryTermAssociationClass(urn=make_glossary_term_urn("Revenue"))]). REST API: GET /entities/urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.orders,PROD) returns entity. GET /entities?action=search&input={"query":"orders","entity":"dataset","start":0,"count":10}. GraphQL: POST /api/graphql with { search(input: { type: DATASET, query: "orders" }) { searchResults { entity { urn } } } }. Recipes: YAML source: type: postgres + sink: type: datahub-rest + pipeline_name. Claude Code generates DataHub metadata emitters, lineage pipelines, ingestion recipes, and GraphQL query clients.
CLAUDE.md for DataHub
## DataHub Stack
- Python SDK: acryl-datahub >= 0.12 — DatahubRestEmitter + MetadataChangeProposalWrapper
- GMS URL: http://datahub-gms:8080 (or Acryl Cloud HTTPS endpoint)
- Emit: MetadataChangeProposalWrapper(entityUrn=urn, aspect=aspect).emit(emitter)
- URNs: make_dataset_urn(platform, name, env) / make_user_urn / make_tag_urn
- Lineage: UpstreamLineage(upstreams=[Upstream(dataset=upstream_urn, type=TRANSFORMED)])
- Ingestion: datahub ingest -c recipe.yaml — auto-discovers schemas, lineage, stats
- GraphQL: POST /api/graphql — search, getDataset, listOwners, listTags
Metadata Emitter
# lib/datahub/emitter.py — emit metadata to DataHub
import os
from datetime import datetime
from typing import Optional
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.mce_builder import (
make_dataset_urn,
make_user_urn,
make_tag_urn,
make_glossary_term_urn,
make_data_job_urn,
make_data_flow_urn,
)
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
SchemaMetadataClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
StringTypeClass,
NumberTypeClass,
BooleanTypeClass,
DateTypeClass,
NullTypeClass,
OwnershipClass,
OwnerClass,
OwnershipTypeClass,
GlobalTagsClass,
TagAssociationClass,
GlossaryTermsClass,
GlossaryTermAssociationClass,
UpstreamLineageClass,
UpstreamClass,
DatasetLineageTypeClass,
DataJobInputOutputClass,
StatusClass,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
def get_emitter() -> DatahubRestEmitter:
return DatahubRestEmitter(
gms_server=os.environ.get("DATAHUB_GMS_URL", "http://localhost:8080"),
token=os.environ.get("DATAHUB_TOKEN"),
)
def emit_dataset_metadata(
platform: str, # "postgres", "bigquery", "s3", "kafka"
name: str, # "mydb.public.orders" or "s3://bucket/data/orders"
description: str,
fields: list[dict], # [{"name": "id", "type": "bigint", "description": "..."}]
owners: list[str] = [], # ["john.doe"]
tags: list[str] = [], # ["pii", "finance"]
terms: list[str] = [], # ["Revenue", "OrderValue"]
custom_props: dict = {},
env: str = "PROD",
) -> str:
"""Emit complete dataset metadata to DataHub."""
emitter = get_emitter()
urn = make_dataset_urn(platform, name, env)
# Map Python types to DataHub schema types
TYPE_MAP = {
"bigint": NumberTypeClass(), "int": NumberTypeClass(), "integer": NumberTypeClass(),
"float": NumberTypeClass(), "double": NumberTypeClass(), "decimal": NumberTypeClass(),
"varchar": StringTypeClass(), "text": StringTypeClass(), "string": StringTypeClass(),
"boolean": BooleanTypeClass(), "bool": BooleanTypeClass(),
"date": DateTypeClass(), "timestamp": DateTypeClass(), "datetime": DateTypeClass(),
}
schema_fields = [
SchemaFieldClass(
fieldPath=f["name"],
type=SchemaFieldDataTypeClass(type=TYPE_MAP.get(f["type"].lower(), NullTypeClass())),
nativeDataType=f.get("type", "unknown"),
description=f.get("description", ""),
nullable=f.get("nullable", True),
)
for f in fields
]
aspects = [
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DatasetPropertiesClass(
description=description,
customProperties={
"platform": platform,
"env": env,
**custom_props,
},
),
),
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=SchemaMetadataClass(
schemaName=name,
platform=f"urn:li:dataPlatform:{platform}",
version=0,
hash="",
platformSchema=None,
fields=schema_fields,
),
),
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
),
]
if owners:
aspects.append(MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=OwnershipClass(owners=[
OwnerClass(
owner=make_user_urn(o),
type=OwnershipTypeClass.DATAOWNER,
) for o in owners
]),
))
if tags:
aspects.append(MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=GlobalTagsClass(tags=[
TagAssociationClass(tag=make_tag_urn(t)) for t in tags
]),
))
if terms:
aspects.append(MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=GlossaryTermsClass(
terms=[GlossaryTermAssociationClass(urn=make_glossary_term_urn(t)) for t in terms],
auditStamp=None,
),
))
for mcp in aspects:
mcp.emit(emitter)
print(f"Emitted metadata for {urn}")
return urn
def emit_lineage(
output_dataset_urn: str,
input_dataset_urns: list[str],
lineage_type: str = "TRANSFORMED",
) -> None:
"""Emit dataset lineage relationships."""
emitter = get_emitter()
lineage_type_obj = {
"TRANSFORMED": DatasetLineageTypeClass.TRANSFORMED,
"COPY": DatasetLineageTypeClass.COPY,
"VIEW": DatasetLineageTypeClass.VIEW,
}.get(lineage_type, DatasetLineageTypeClass.TRANSFORMED)
MetadataChangeProposalWrapper(
entityUrn=output_dataset_urn,
aspect=UpstreamLineageClass(
upstreams=[
UpstreamClass(dataset=urn, type=lineage_type_obj)
for urn in input_dataset_urns
]
),
).emit(emitter)
print(f"Emitted lineage: {input_dataset_urns} → {output_dataset_urn}")
Ingestion Recipe
# recipes/postgres_ingestion.yaml — DataHub ingestion recipe for PostgreSQL
source:
type: postgres
config:
host_port: "${POSTGRES_HOST}:5432"
database: "${POSTGRES_DB}"
username: "${POSTGRES_USER}"
password: "${POSTGRES_PASSWORD}"
include_tables: true
include_views: true
profiling:
enabled: true
profile_table_level_only: true
stateful_ingestion:
enabled: true
remove_stale_metadata: true
transformers:
- type: "add_dataset_tags"
config:
tag_urns:
- "urn:li:tag:production"
- "urn:li:tag:postgres"
- type: "simple_add_dataset_ownership"
config:
default_owners:
- "urn:li:corpuser:data-platform-team"
sink:
type: datahub-rest
config:
server: "${DATAHUB_GMS_URL}"
token: "${DATAHUB_TOKEN}"
pipeline_name: postgres_production
run_id: "postgres-${CURRENT_DATE}"
GraphQL Client (TypeScript)
// lib/datahub/graphql.ts — DataHub GraphQL API client
const DATAHUB_URL = process.env.DATAHUB_FRONTEND_URL ?? "http://localhost:9002"
const DATAHUB_TOKEN = process.env.DATAHUB_TOKEN
async function datahubGraphQL<T>(query: string, variables: Record<string, unknown> = {}): Promise<T> {
const res = await fetch(`${DATAHUB_URL}/api/graphql`, {
method: "POST",
headers: {
"Content-Type": "application/json",
...(DATAHUB_TOKEN ? { "Authorization": `Bearer ${DATAHUB_TOKEN}` } : {}),
},
body: JSON.stringify({ query, variables }),
})
const body = await res.json()
if (body.errors) throw new Error(`DataHub GraphQL error: ${JSON.stringify(body.errors)}`)
return body.data
}
export async function searchDatasets(query: string, limit = 20): Promise<Array<{
urn: string
name: string
platform: string
description: string
}>> {
const data = await datahubGraphQL<{ search: { searchResults: Array<{ entity: {
urn: string
properties: { name: string; description: string } | null
platform: { name: string }
}}> } }>(`
query SearchDatasets($input: SearchInput!) {
search(input: $input) {
searchResults {
entity {
urn
... on Dataset {
properties { name description }
platform { name }
}
}
}
}
}
`, {
input: { type: "DATASET", query, start: 0, count: limit },
})
return (data.search.searchResults ?? []).map((r) => ({
urn: r.entity.urn,
name: r.entity.properties?.name ?? "",
platform: r.entity.platform?.name ?? "",
description: r.entity.properties?.description ?? "",
}))
}
export async function getDatasetLineage(urn: string): Promise<{
upstreams: string[]
downstreams: string[]
}> {
const data = await datahubGraphQL<{ dataset: {
upstream: { relationships: Array<{ entity: { urn: string } }> }
downstream: { relationships: Array<{ entity: { urn: string } }> }
} }>(`
query GetLineage($urn: String!) {
dataset(urn: $urn) {
upstream: lineage(direction: UPSTREAM, count: 50) { relationships { entity { urn } } }
downstream: lineage(direction: DOWNSTREAM, count: 50) { relationships { entity { urn } } }
}
}
`, { urn })
return {
upstreams: (data.dataset?.upstream?.relationships ?? []).map((r) => r.entity.urn),
downstreams: (data.dataset?.downstream?.relationships ?? []).map((r) => r.entity.urn),
}
}
For the Apache Atlas alternative when operating in an on-premises Hadoop (HDP/CDP) ecosystem where Atlas is pre-installed and deeply integrated with HDFS, Hive, HBase, and Kafka through Atlas hooks — Atlas is bundled with Cloudera/Hortonworks distributions while DataHub is a newer open-source project with a better REST/GraphQL API, richer UI, and broader connector ecosystem. For the OpenMetadata alternative when needing a feature-equivalent open-source catalog with a similar metadata model, built-in data quality integrations (Great Expectations, dbt tests), and a more modern React UI — OpenMetadata is a strong competitor with a cleaner API design while DataHub has a larger community and deeper lineage support from LinkedIn’s internal usage at scale. The Claude Skills 360 bundle includes DataHub skill sets covering Python SDK metadata emission, lineage, ingestion recipes, and GraphQL queries. Start with the free tier to try data catalog generation.