Great Expectations validates data quality with executable expectations — import great_expectations as gx. context = gx.get_context() loads or creates a DataContext. Fluent API: datasource = context.sources.add_pandas("my_ds"), asset = datasource.add_dataframe_asset("orders"), batch = asset.add_batch_definition_whole_dataframe("batch").get_batch(dataframe=df). Expectations: expect_column_to_exist, expect_column_values_to_not_be_null(column="email"), expect_column_values_to_be_unique(column="order_id"), expect_column_values_to_be_between(column="amount", min_value=0), expect_column_values_to_match_regex(column="email", regex=r"^[^@]+@[^@]+\\.[^@]+$"), expect_column_pair_values_to_be_equal. expect_table_row_count_to_be_between(min_value=100). expect_column_proportion_of_unique_values_to_be_between. Suites: suite = context.suites.add(ExpectationSuite(name="orders_suite")), add expectations, then batch.validate(suite) returns ExpectationSuiteValidationResult. Checkpoints: checkpoint = context.checkpoints.add(Checkpoint(name="daily_check", validations=[...])), result = checkpoint.run() — fails with non-zero exit for pipeline gating. Custom Expectation: subclass ColumnMapExpectation, define map_condition. Data Docs: context.build_data_docs() generates HTML report at great_expectations/uncommitted/data_docs/. SQLAlchemy datasource: context.sources.add_sql("pg_ds", connection_string=DSN). Claude Code generates GE expectation suites, checkpoint configs, custom expectations, and CI/CD data quality gates.
CLAUDE.md for Great Expectations
## Great Expectations Stack
- Version: great-expectations >= 1.0 (GX Core — new Fluent API)
- Context: gx.get_context() — reads gx.yml from cwd or creates ephemeral context
- Datasource: context.sources.add_pandas() or add_sql() or add_spark_filesystem()
- Suites: context.suites.add(ExpectationSuite(name="suite_name"))
- Validate: batch.validate(suite) or checkpoint.run()
- Data Docs: context.build_data_docs() — HTML reports in uncommitted/data_docs/
- CI gate: sys.exit(0 if result.success else 1) — fail pipeline on validation failure
Expectation Suite Builder
# pipelines/data_quality/suite_builder.py — build reusable expectation suites
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.expectations import (
ExpectColumnToExist,
ExpectColumnValuesToNotBeNull,
ExpectColumnValuesToBeUnique,
ExpectColumnValuesToBeBetween,
ExpectColumnValuesToMatchRegex,
ExpectColumnValuesToBeInSet,
ExpectTableRowCountToBeBetween,
ExpectColumnProportionOfUniqueValuesToBeBetween,
)
def build_orders_suite() -> ExpectationSuite:
"""Expectation suite for the orders dataset."""
suite = ExpectationSuite(name="orders_suite")
# Schema
for col in ["order_id", "user_id", "amount", "currency", "status", "created_at"]:
suite.add_expectation(ExpectColumnToExist(column=col))
# Nullability
for col in ["order_id", "user_id", "amount", "status"]:
suite.add_expectation(ExpectColumnValuesToNotBeNull(column=col))
# Uniqueness
suite.add_expectation(ExpectColumnValuesToBeUnique(column="order_id"))
# Value ranges
suite.add_expectation(ExpectColumnValuesToBeBetween(
column="amount",
min_value=0,
max_value=1_000_000,
))
# Categorical values
suite.add_expectation(ExpectColumnValuesToBeInSet(
column="currency",
value_set=["USD", "EUR", "GBP", "JPY", "CAD", "AUD"],
))
suite.add_expectation(ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "processing", "completed", "refunded", "cancelled"],
))
# Pattern matching
suite.add_expectation(ExpectColumnValuesToMatchRegex(
column="order_id",
regex=r"^ord_[a-zA-Z0-9]{16}$",
mostly=1.0,
))
# Volume check
suite.add_expectation(ExpectTableRowCountToBeBetween(
min_value=1,
max_value=10_000_000,
))
# Cardinality
suite.add_expectation(ExpectColumnProportionOfUniqueValuesToBeBetween(
column="user_id",
min_value=0.001, # At least 0.1% of rows have distinct users
))
return suite
def build_users_suite() -> ExpectationSuite:
suite = ExpectationSuite(name="users_suite")
for col in ["id", "email", "plan", "created_at"]:
suite.add_expectation(ExpectColumnToExist(column=col))
suite.add_expectation(ExpectColumnValuesToNotBeNull(column="email"))
suite.add_expectation(ExpectColumnValuesToBeUnique(column="id"))
suite.add_expectation(ExpectColumnValuesToBeUnique(column="email"))
suite.add_expectation(ExpectColumnValuesToMatchRegex(
column="email",
regex=r"^[^@\s]+@[^@\s]+\.[^@\s]+$",
mostly=0.99, # Allow 1% edge cases
))
suite.add_expectation(ExpectColumnValuesToBeInSet(
column="plan",
value_set=["free", "pro", "enterprise"],
))
return suite
Validation Runner
# pipelines/data_quality/runner.py — validate DataFrames and SQL sources
import sys
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.checkpoint import Checkpoint
import pandas as pd
from typing import Optional
def validate_dataframe(
df: pd.DataFrame,
suite: ExpectationSuite,
asset_name: str = "data",
fail_fast: bool = True,
) -> bool:
"""Validate a pandas DataFrame against an expectation suite."""
context = gx.get_context()
datasource = context.sources.add_pandas(name=f"pd_{asset_name}")
asset = datasource.add_dataframe_asset(name=asset_name)
batch_def = asset.add_batch_definition_whole_dataframe("batch")
# Register suite
try:
context.suites.add(suite)
except Exception:
pass # Suite may already exist
batch = batch_def.get_batch(dataframe=df)
result = batch.validate(suite)
if not result.success:
failed = [r for r in result.results if not r.success]
print(f"\n[GX] FAILED: {len(failed)} expectations on '{asset_name}':")
for r in failed:
print(f" ✗ {r.expectation_config.expectation_type}")
if r.result.get("unexpected_percent"):
print(f" unexpected_percent={r.result['unexpected_percent']:.2f}%")
if fail_fast:
raise ValueError(f"Data quality validation failed for '{asset_name}'")
else:
print(f"[GX] PASSED: All {len(result.results)} expectations on '{asset_name}'")
return result.success
def validate_sql_table(
connection_string: str,
table_name: str,
suite: ExpectationSuite,
schema: Optional[str] = None,
fail_fast: bool = True,
) -> bool:
"""Validate a SQL table directly."""
context = gx.get_context()
datasource = context.sources.add_sql(
name=f"sql_{table_name}",
connection_string=connection_string,
)
asset = datasource.add_table_asset(
name=table_name,
table_name=table_name,
schema_name=schema,
)
batch_def = asset.add_batch_definition_whole_table("batch")
try:
context.suites.add(suite)
except Exception:
pass
batch = batch_def.get_batch()
result = batch.validate(suite)
if not result.success and fail_fast:
failed = [r for r in result.results if not r.success]
raise ValueError(
f"SQL table '{table_name}' failed {len(failed)} expectations. "
f"Build docs: context.build_data_docs()"
)
return result.success
Custom Expectation
# pipelines/data_quality/custom_expectations.py — custom expectation implementations
from great_expectations.expectations import ColumnMapExpectation
from great_expectations.execution_engine import PandasExecutionEngine
import pandas as pd
import re
class ExpectColumnValuesToBeValidUuid(ColumnMapExpectation):
"""Expect all values in a column to be valid UUID v4 strings."""
map_metric = "column_values.match_uuid"
success_keys = ("mostly",)
default_kwarg_values = {"mostly": 1.0}
UUID_REGEX = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
re.IGNORECASE,
)
@classmethod
def _get_map_metric_provider_class(cls):
from great_expectations.expectations.metrics import ColumnMapMetricProvider
class UuidColumnMapMetric(ColumnMapMetricProvider):
condition_metric_name = "column_values.match_uuid"
condition_value_keys = ()
@PandasExecutionEngine.metric_condition(
"column_values.match_uuid",
condition_provider=True,
domain_type="column",
value_keys=(),
)
def _pandas_condition(cls, execution_engine, metric_domain_kwargs, metric_value_kwargs, **kwargs):
df, _, _ = execution_engine.get_compute_domain(
metric_domain_kwargs, domain_type="column"
)
column = metric_domain_kwargs["column"]
return df[column].apply(
lambda v: bool(cls.UUID_REGEX.match(str(v))) if pd.notna(v) else False
)
return UuidColumnMapMetric
class ExpectColumnValuesToBePositiveAmount(ColumnMapExpectation):
"""Expect all values to be positive numbers representing monetary amounts."""
map_metric = "column_values.positive_amount"
success_keys = ("min_value", "max_value", "mostly")
default_kwarg_values = {"min_value": 0.01, "max_value": 9_999_999.99, "mostly": 1.0}
CI/CD Data Quality Gate
# scripts/validate_pipeline.py — use as CI/CD data quality gate
#!/usr/bin/env python3
"""
Run data quality validation as a CI gate.
Usage: python scripts/validate_pipeline.py
Exit 0 = all checks passed, Exit 1 = failures detected.
"""
import sys
import os
import pandas as pd
from sqlalchemy import create_engine
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from pipelines.data_quality.suite_builder import build_orders_suite, build_users_suite
from pipelines.data_quality.runner import validate_sql_table
DATABASE_URL = os.environ["DATABASE_URL"]
failures = 0
tables = [
("orders", build_orders_suite()),
("users", build_users_suite()),
]
for table_name, suite in tables:
print(f"\nValidating {table_name}...")
try:
passed = validate_sql_table(
connection_string=DATABASE_URL,
table_name=table_name,
suite=suite,
fail_fast=False, # Don't raise — collect all failures
)
if not passed:
failures += 1
except Exception as e:
print(f"[ERROR] {table_name}: {e}")
failures += 1
if failures:
print(f"\n[GX] {failures} table(s) failed data quality checks — blocking pipeline.")
sys.exit(1)
print(f"\n[GX] All {len(tables)} tables passed data quality checks.")
sys.exit(0)
For the Soda Core alternative when needing a lightweight, YAML-driven data quality tool that connects directly to warehouses and integrates with Airflow/dbt/Dagster through native operators — Soda uses a simpler SodaCL check syntax while Great Expectations provides a Python-native API with more programmatic control, richer HTML reports, and a larger expectation library. For the dbt Tests alternative when already deep in the dbt ecosystem and wanting data quality checks co-located in the dbt project — dbt generic tests (not_null, unique, accepted_values) and singular tests (custom SQL assertions) are simpler to add alongside dbt models while Great Expectations is better for complex statistical checks, profiling, and expectations that need to run outside of dbt. The Claude Skills 360 bundle includes Great Expectations skill sets covering expectation suites, checkpoints, and CI/CD data gates. Start with the free tier to try data quality validation generation.