Feast is the open-source feature store connecting offline training data to online serving. from feast import FeatureStore, FeatureView, Entity, Field, FileSource, PushSource are the core abstractions. Entity(name="user", join_keys=["user_id"]) defines the primary key. FileSource(path="s3://bucket/data/**/*.parquet", timestamp_field="event_timestamp") points to offline data. FeatureView(name="user_stats", entities=["user"], ttl=timedelta(days=1), schema=[Field(name="purchase_count", dtype=Int64)], source=file_source). FeatureService(name="user_ml", features=[user_stats[["purchase_count", "avg_spend"]]]) groups features for a model. feast apply registers features in the registry and creates online store tables. store = FeatureStore(repo_path="."). Materialization: store.materialize(start_date, end_date) batch-loads from offline to online. Serving: store.get_online_features(features=["user_stats:purchase_count"], entity_rows=[{"user_id": "u1"}]).to_dict(). Training: store.get_historical_features(entity_df=entity_df, features=[...]).to_df() — point-in-time correct join. PushSource(name="user_events", batch_source=file_source) enables streaming pushes. store.push("user_events", df) pushes fresh feature values. Redis online store: online_store: RedisOnlineStore in feature_store.yaml. Claude Code generates Feast feature definitions, materialization scripts, online serving APIs, and training dataset pipelines.
CLAUDE.md for Feast
## Feast Stack
- Version: feast >= 0.40
- Init: feast init feature_repo && cd feature_repo
- Registry: feature_store.yaml — project, registry path, online_store, offline_store
- Apply: feast apply — registers entities, views, services in registry
- Materialize: feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")
- Serve: store = FeatureStore(repo_path=".") — singleton in FastAPI app
- Online: store.get_online_features(features=[...], entity_rows=[{...}])
- Training: store.get_historical_features(entity_df, features=[...]).to_df()
Feature Repository
# feature_repo/features.py — Feast feature definitions
from datetime import timedelta
import pandas as pd
from feast import (
Entity,
FeatureService,
FeatureView,
Field,
FileSource,
PushSource,
RequestSource,
)
from feast.types import Float64, Int64, String, Bool, UnixTimestamp
# ── Entities ───────────────────────────────────────────────────────────────
user_entity = Entity(
name="user",
join_keys=["user_id"],
description="Application user",
)
product_entity = Entity(
name="product",
join_keys=["product_id"],
description="Product in the catalog",
)
# ── Data sources (offline) ─────────────────────────────────────────────────
import os
user_stats_source = FileSource(
name="user_stats_source",
path=os.environ.get("FEAST_USER_STATS_PATH", "data/user_stats.parquet"),
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
product_stats_source = FileSource(
name="product_stats_source",
path=os.environ.get("FEAST_PRODUCT_STATS_PATH", "data/product_stats.parquet"),
timestamp_field="event_timestamp",
)
# Streaming source for real-time pushes
user_push_source = PushSource(
name="user_push_source",
batch_source=user_stats_source,
)
# ── Feature Views ──────────────────────────────────────────────────────────
user_stats_fv = FeatureView(
name="user_stats",
entities=[user_entity],
ttl=timedelta(days=30),
schema=[
Field(name="purchase_count_7d", dtype=Int64, description="Purchases in last 7 days"),
Field(name="purchase_count_30d", dtype=Int64, description="Purchases in last 30 days"),
Field(name="avg_order_value_30d", dtype=Float64, description="Avg order value in last 30 days"),
Field(name="days_since_last_purchase", dtype=Int64, description="Days since last purchase"),
Field(name="total_lifetime_spend", dtype=Float64, description="Total spend since signup"),
Field(name="churn_risk_score", dtype=Float64, description="Predicted 30d churn probability"),
Field(name="is_high_value", dtype=Bool, description="High-value customer flag"),
Field(name="preferred_category", dtype=String, description="Most purchased category"),
],
source=user_push_source, # Supports both batch materialization and streaming push
online=True,
tags={"team": "ml", "model": "churn"},
)
product_stats_fv = FeatureView(
name="product_stats",
entities=[product_entity],
ttl=timedelta(days=7),
schema=[
Field(name="view_count_24h", dtype=Int64),
Field(name="purchase_count_24h", dtype=Int64),
Field(name="conversion_rate_7d", dtype=Float64),
Field(name="avg_rating", dtype=Float64),
Field(name="inventory_level", dtype=Int64),
Field(name="is_trending", dtype=Bool),
],
source=product_stats_source,
online=True,
)
# ── Feature Services (grouped for specific models) ────────────────────────
churn_prediction_fs = FeatureService(
name="churn_prediction",
features=[
user_stats_fv[["purchase_count_7d", "purchase_count_30d",
"avg_order_value_30d", "days_since_last_purchase",
"total_lifetime_spend", "is_high_value"]],
],
description="Features for churn prediction model",
tags={"model_version": "v2", "owner": "ml-team"},
)
product_recommendation_fs = FeatureService(
name="product_recommendation",
features=[
user_stats_fv[["purchase_count_30d", "preferred_category", "is_high_value"]],
product_stats_fv[["view_count_24h", "conversion_rate_7d", "avg_rating", "is_trending"]],
],
description="Features for product recommendation model",
)
feature_store.yaml
# feature_repo/feature_store.yaml — Feast project configuration
project: my_ml_platform
registry: registries/registry.db # or s3://bucket/feast/registry.db
provider: local # or "gcp" or "aws"
online_store:
type: redis
connection_string: "${REDIS_URL}" # redis://localhost:6379
# For production: type: dynamodb, region: us-east-1
offline_store:
type: file # or bigquery, snowflake, spark
# For BigQuery:
# type: bigquery
# dataset: feast_features
# project: my-gcp-project
entity_key_serialization_version: 2
flags:
alpha_features: false
Feature Materialization Script
# scripts/materialize.py — run from cron or Dagster/Airflow
from datetime import datetime, timedelta
from feast import FeatureStore
def materialize_features(lookback_hours: int = 24) -> None:
"""Materialize features from offline to online store."""
store = FeatureStore(repo_path=".")
end_date = datetime.utcnow()
start_date = end_date - timedelta(hours=lookback_hours)
print(f"Materializing features from {start_date} to {end_date}...")
store.materialize(start_date=start_date, end_date=end_date)
print("Materialization complete.")
def push_realtime_features(user_df) -> None:
"""Push fresh real-time feature values to online store."""
import pandas as pd
store = FeatureStore(repo_path=".")
# Add required event_timestamp column
user_df["event_timestamp"] = pd.Timestamp.now(tz="UTC")
store.push("user_push_source", user_df, allow_registry_cache=True)
print(f"Pushed {len(user_df)} user feature records to online store.")
if __name__ == "__main__":
import sys
hours = int(sys.argv[1]) if len(sys.argv) > 1 else 24
materialize_features(lookback_hours=hours)
FastAPI Serving Endpoint
# api/serve.py — real-time feature serving with FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from feast import FeatureStore
from functools import lru_cache
from typing import Optional
import pandas as pd
app = FastAPI(title="Feature Store API")
@lru_cache(maxsize=1)
def get_store() -> FeatureStore:
return FeatureStore(repo_path=".")
class OnlineFeaturesRequest(BaseModel):
user_id: str
product_id: Optional[str] = None
service: str = "churn_prediction"
class TrainingDataRequest(BaseModel):
user_ids: list[str]
as_of_date: str # ISO 8601
feature_view: str = "user_stats"
@app.post("/features/online")
def get_online_features(req: OnlineFeaturesRequest) -> dict:
store = get_store()
features = store.get_feature_service(req.service)
entity_rows = [{"user_id": req.user_id}]
if req.product_id:
entity_rows[0]["product_id"] = req.product_id
try:
result = store.get_online_features(
features=features,
entity_rows=entity_rows,
).to_dict()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Flatten single row result
return {k: v[0] for k, v in result.items()}
@app.post("/features/training")
def get_training_data(req: TrainingDataRequest) -> dict:
store = get_store()
as_of_dt = pd.Timestamp(req.as_of_date, tz="UTC")
entity_df = pd.DataFrame({
"user_id": req.user_ids,
"event_timestamp": [as_of_dt] * len(req.user_ids),
})
features = [f"user_stats:{col}" for col in [
"purchase_count_7d", "purchase_count_30d",
"avg_order_value_30d", "days_since_last_purchase",
]]
job = store.get_historical_features(entity_df=entity_df, features=features)
df = job.to_df()
return {"data": df.to_dict("records"), "count": len(df)}
@app.get("/health")
def health() -> dict:
return {"status": "ok"}
For the Tecton alternative when needing a fully managed enterprise feature platform with streaming feature pipelines, automatic CI/CD for feature deployment, feature monitoring, and SLA-backed low-latency serving backed by a commercial support team — Tecton builds on the same concepts as Feast but is SaaS and handles infrastructure, backfills, and monitoring while Feast is open-source and requires self-managing infrastructure. For the Hopsworks alternative when needing a full MLOps platform that bundles a feature store with model registry, experiment tracking, and pipeline orchestration in a single open-source platform — Hopsworks includes a more complete MLOps suite while Feast is a focused feature store that integrates with any model serving layer. The Claude Skills 360 bundle includes Feast skill sets covering feature definitions, materialization, online serving, and training data pipelines. Start with the free tier to try feature store generation.