Knative extends Kubernetes with serverless primitives: Knative Serving scales workloads to zero when idle and scales up in milliseconds on demand. Traffic splitting enables canary deployments with a single YAML patch. Knative Eventing delivers events from sources like Kafka, GitHub, and Pub/Sub to services through brokers and triggers — decoupling producers from consumers. CloudEvents provides a standard event envelope. Services declare their scale targets: autoscaling.knative.dev/target: "100" means scale one replica per 100 concurrent requests. Claude Code generates Knative Service manifests, eventing topology YAML, CloudEvent handlers, Kafka event source configurations, and the kn CLI commands for production serverless Kubernetes workloads.
CLAUDE.md for Knative Projects
## Knative Stack
- Version: Knative Serving >= 1.15, Knative Eventing >= 1.15
- Networking: Istio or Kourier as ingress (Kourier lighter for pure Knative)
- Scale to zero: enabled by default — set min-scale annotation to prevent cold starts
- Traffic: use traffic splits for canary/blue-green before promoting
- Events: Broker (default or Kafka-backed) + Trigger for event fan-out
- CloudEvents: use cloudevents-sdk-python or cloudevents/sdk-javascript
- kn CLI: use for Service create/update/traffic — faster than kubectl apply
Knative Service
# k8s/knative/orders-service.yaml — Knative Serving configuration
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: orders-service
namespace: production
labels:
app: orders-service
spec:
template:
metadata:
annotations:
# Auto-scaling: 1 replica per 100 concurrent requests (default)
autoscaling.knative.dev/target: "100"
autoscaling.knative.dev/metric: "concurrency"
# Keep 2 minimum replicas to avoid cold starts in production
autoscaling.knative.dev/min-scale: "2"
autoscaling.knative.dev/max-scale: "50"
# Scale up window
autoscaling.knative.dev/scale-up-delay: "0s"
autoscaling.knative.dev/scale-down-delay: "60s"
spec:
containerConcurrency: 100
timeoutSeconds: 300
containers:
- image: registry.example.com/orders-service:v1.2.0
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: orders-secrets
key: database-url
- name: PORT
value: "8080"
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 1000m
memory: 512Mi
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 3
traffic:
- latestRevision: true
percent: 100
Canary Deployments
#!/bin/bash
# scripts/canary-deploy.sh — progressive traffic rollout
SERVICE=orders-service
NAMESPACE=production
NEW_IMAGE=registry.example.com/orders-service:v1.3.0
# Step 1: Deploy new revision with 0% traffic
kn service update $SERVICE \
--namespace $NAMESPACE \
--image $NEW_IMAGE \
--traffic @latest=0 \
--traffic @prev=100
# Step 2: Route 10% to new version
kn service update $SERVICE \
--namespace $NAMESPACE \
--traffic @latest=10 \
--traffic @prev=90
echo "Canary at 10% — monitor error rates"
echo "To promote: kn service update $SERVICE --traffic @latest=100"
echo "To rollback: kn service update $SERVICE --traffic @prev=100"
# Equivalent YAML for GitOps canary
# k8s/knative/orders-canary.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: orders-service
namespace: production
spec:
template:
metadata:
name: orders-service-v1-3-0 # Named revision
spec:
containers:
- image: registry.example.com/orders-service:v1.3.0
traffic:
- revisionName: orders-service-v1-3-0
percent: 10
tag: canary
- revisionName: orders-service-v1-2-0
percent: 90
tag: stable
Knative Eventing
# k8s/knative/eventing.yaml — event-driven order processing
# Broker: event bus that routes events to matching triggers
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: orders-broker
namespace: production
annotations:
eventing.knative.dev/broker.class: MTChannelBasedBroker
---
# Trigger: routes matching events to the notification service
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: order-created-notification
namespace: production
spec:
broker: orders-broker
filter:
attributes:
type: com.myapp.order.created
source: orders-service
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: notification-service
---
# Trigger: inventory update on order creation
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: order-created-inventory
namespace: production
spec:
broker: orders-broker
filter:
attributes:
type: com.myapp.order.created
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: inventory-service
---
# KafkaSource: consume from Kafka → emit to broker
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: orders-kafka-source
namespace: production
spec:
consumerGroup: knative-orders-consumer
bootstrapServers:
- kafka-0.kafka.kafka.svc.cluster.local:9092
topics:
- order-events
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: orders-broker
CloudEvents Handler
# services/notification_service.py — CloudEvents consumer
from fastapi import FastAPI, Request, HTTPException
from cloudevents.http import from_http, CloudEvent
from cloudevents.conversion import to_json
import json
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
@app.post("/")
async def handle_cloud_event(request: Request):
"""Handle incoming CloudEvents from Knative trigger."""
try:
event = from_http(request.headers, await request.body())
except Exception as e:
raise HTTPException(400, f"Invalid CloudEvent: {e}")
logger.info(
"Received CloudEvent",
extra={
"ce_type": event["type"],
"ce_source": event["source"],
"ce_id": event["id"],
}
)
await route_event(event)
return {"processed": True}
async def route_event(event: CloudEvent) -> None:
event_type = event["type"]
data = event.get_data()
if event_type == "com.myapp.order.created":
await handle_order_created(data)
elif event_type == "com.myapp.order.shipped":
await handle_order_shipped(data)
elif event_type == "com.myapp.order.cancelled":
await handle_order_cancelled(data)
else:
logger.warning(f"Unknown event type: {event_type}")
async def handle_order_created(data: dict) -> None:
"""Send order confirmation email."""
customer_id = data.get("customer_id")
order_id = data.get("order_id")
amount_cents = data.get("total_cents", 0)
logger.info(f"Sending confirmation for order {order_id}")
await send_email(
to=await get_customer_email(customer_id),
subject=f"Order Confirmed: #{order_id[-6:]}",
body=f"Your order of ${amount_cents/100:.2f} has been confirmed.",
)
# Health check for Knative readiness probe
@app.get("/health")
def health():
return {"status": "ok"}
Emit CloudEvents from Service
# services/orders_service.py — emit CloudEvents to Knative broker
from cloudevents.http import CloudEvent, to_structured
import httpx
import os
import uuid
from datetime import datetime, timezone
BROKER_URL = os.environ.get(
"K_SINK", # Knative injects K_SINK when SinkBinding is configured
"http://default-broker.production.svc.cluster.local",
)
async def emit_order_created(order: dict) -> None:
"""Emit CloudEvent to Knative broker."""
event = CloudEvent({
"specversion": "1.0",
"type": "com.myapp.order.created",
"source": "orders-service",
"id": str(uuid.uuid4()),
"time": datetime.now(tz=timezone.utc).isoformat(),
"datacontenttype": "application/json",
}, order)
headers, body = to_structured(event)
async with httpx.AsyncClient() as client:
response = await client.post(
BROKER_URL,
content=body,
headers=dict(headers),
timeout=5.0,
)
response.raise_for_status()
For the KEDA-based Kubernetes autoscaling that scales on Kafka consumer lag and custom metrics rather than Knative’s HTTP concurrency model, see the Kubernetes operators guide for custom controllers. For the AWS Lambda serverless approach that avoids managing Kubernetes entirely, the serverless guide covers Lambda functions, API Gateway, and event sources. The Claude Skills 360 bundle includes Knative skill sets covering Serving configuration, Eventing topology, and CloudEvents patterns. Start with the free tier to try Knative manifest generation.