Claude Code for Knative: Serverless Workloads on Kubernetes — Claude Skills 360 Blog
Blog / DevOps / Claude Code for Knative: Serverless Workloads on Kubernetes
DevOps

Claude Code for Knative: Serverless Workloads on Kubernetes

Published: January 22, 2027
Read time: 9 min read
By: Claude Skills 360

Knative extends Kubernetes with serverless primitives: Knative Serving scales workloads to zero when idle and scales up in milliseconds on demand. Traffic splitting enables canary deployments with a single YAML patch. Knative Eventing delivers events from sources like Kafka, GitHub, and Pub/Sub to services through brokers and triggers — decoupling producers from consumers. CloudEvents provides a standard event envelope. Services declare their scale targets: autoscaling.knative.dev/target: "100" means scale one replica per 100 concurrent requests. Claude Code generates Knative Service manifests, eventing topology YAML, CloudEvent handlers, Kafka event source configurations, and the kn CLI commands for production serverless Kubernetes workloads.

CLAUDE.md for Knative Projects

## Knative Stack
- Version: Knative Serving >= 1.15, Knative Eventing >= 1.15
- Networking: Istio or Kourier as ingress (Kourier lighter for pure Knative)
- Scale to zero: enabled by default — set min-scale annotation to prevent cold starts
- Traffic: use traffic splits for canary/blue-green before promoting
- Events: Broker (default or Kafka-backed) + Trigger for event fan-out
- CloudEvents: use cloudevents-sdk-python or cloudevents/sdk-javascript
- kn CLI: use for Service create/update/traffic — faster than kubectl apply

Knative Service

# k8s/knative/orders-service.yaml — Knative Serving configuration
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: orders-service
  namespace: production
  labels:
    app: orders-service
spec:
  template:
    metadata:
      annotations:
        # Auto-scaling: 1 replica per 100 concurrent requests (default)
        autoscaling.knative.dev/target: "100"
        autoscaling.knative.dev/metric: "concurrency"
        # Keep 2 minimum replicas to avoid cold starts in production
        autoscaling.knative.dev/min-scale: "2"
        autoscaling.knative.dev/max-scale: "50"
        # Scale up window
        autoscaling.knative.dev/scale-up-delay: "0s"
        autoscaling.knative.dev/scale-down-delay: "60s"
    spec:
      containerConcurrency: 100
      timeoutSeconds: 300
      containers:
        - image: registry.example.com/orders-service:v1.2.0
          ports:
            - containerPort: 8080
          env:
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: orders-secrets
                  key: database-url
            - name: PORT
              value: "8080"
          resources:
            requests:
              cpu: 100m
              memory: 128Mi
            limits:
              cpu: 1000m
              memory: 512Mi
          readinessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 3
  traffic:
    - latestRevision: true
      percent: 100

Canary Deployments

#!/bin/bash
# scripts/canary-deploy.sh — progressive traffic rollout

SERVICE=orders-service
NAMESPACE=production
NEW_IMAGE=registry.example.com/orders-service:v1.3.0

# Step 1: Deploy new revision with 0% traffic
kn service update $SERVICE \
  --namespace $NAMESPACE \
  --image $NEW_IMAGE \
  --traffic @latest=0 \
  --traffic @prev=100

# Step 2: Route 10% to new version
kn service update $SERVICE \
  --namespace $NAMESPACE \
  --traffic @latest=10 \
  --traffic @prev=90

echo "Canary at 10% — monitor error rates"
echo "To promote: kn service update $SERVICE --traffic @latest=100"
echo "To rollback: kn service update $SERVICE --traffic @prev=100"
# Equivalent YAML for GitOps canary
# k8s/knative/orders-canary.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: orders-service
  namespace: production
spec:
  template:
    metadata:
      name: orders-service-v1-3-0   # Named revision
    spec:
      containers:
        - image: registry.example.com/orders-service:v1.3.0
  traffic:
    - revisionName: orders-service-v1-3-0
      percent: 10
      tag: canary
    - revisionName: orders-service-v1-2-0
      percent: 90
      tag: stable

Knative Eventing

# k8s/knative/eventing.yaml — event-driven order processing

# Broker: event bus that routes events to matching triggers
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: orders-broker
  namespace: production
  annotations:
    eventing.knative.dev/broker.class: MTChannelBasedBroker

---
# Trigger: routes matching events to the notification service
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-created-notification
  namespace: production
spec:
  broker: orders-broker
  filter:
    attributes:
      type: com.myapp.order.created
      source: orders-service
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: notification-service

---
# Trigger: inventory update on order creation
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-created-inventory
  namespace: production
spec:
  broker: orders-broker
  filter:
    attributes:
      type: com.myapp.order.created
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: inventory-service

---
# KafkaSource: consume from Kafka → emit to broker
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: orders-kafka-source
  namespace: production
spec:
  consumerGroup: knative-orders-consumer
  bootstrapServers:
    - kafka-0.kafka.kafka.svc.cluster.local:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: orders-broker

CloudEvents Handler

# services/notification_service.py — CloudEvents consumer
from fastapi import FastAPI, Request, HTTPException
from cloudevents.http import from_http, CloudEvent
from cloudevents.conversion import to_json
import json
import logging

app = FastAPI()
logger = logging.getLogger(__name__)


@app.post("/")
async def handle_cloud_event(request: Request):
    """Handle incoming CloudEvents from Knative trigger."""

    try:
        event = from_http(request.headers, await request.body())
    except Exception as e:
        raise HTTPException(400, f"Invalid CloudEvent: {e}")

    logger.info(
        "Received CloudEvent",
        extra={
            "ce_type": event["type"],
            "ce_source": event["source"],
            "ce_id": event["id"],
        }
    )

    await route_event(event)

    return {"processed": True}


async def route_event(event: CloudEvent) -> None:
    event_type = event["type"]
    data = event.get_data()

    if event_type == "com.myapp.order.created":
        await handle_order_created(data)
    elif event_type == "com.myapp.order.shipped":
        await handle_order_shipped(data)
    elif event_type == "com.myapp.order.cancelled":
        await handle_order_cancelled(data)
    else:
        logger.warning(f"Unknown event type: {event_type}")


async def handle_order_created(data: dict) -> None:
    """Send order confirmation email."""
    customer_id = data.get("customer_id")
    order_id = data.get("order_id")
    amount_cents = data.get("total_cents", 0)

    logger.info(f"Sending confirmation for order {order_id}")
    await send_email(
        to=await get_customer_email(customer_id),
        subject=f"Order Confirmed: #{order_id[-6:]}",
        body=f"Your order of ${amount_cents/100:.2f} has been confirmed.",
    )


# Health check for Knative readiness probe
@app.get("/health")
def health():
    return {"status": "ok"}

Emit CloudEvents from Service

# services/orders_service.py — emit CloudEvents to Knative broker
from cloudevents.http import CloudEvent, to_structured
import httpx
import os
import uuid
from datetime import datetime, timezone


BROKER_URL = os.environ.get(
    "K_SINK",   # Knative injects K_SINK when SinkBinding is configured
    "http://default-broker.production.svc.cluster.local",
)


async def emit_order_created(order: dict) -> None:
    """Emit CloudEvent to Knative broker."""

    event = CloudEvent({
        "specversion": "1.0",
        "type": "com.myapp.order.created",
        "source": "orders-service",
        "id": str(uuid.uuid4()),
        "time": datetime.now(tz=timezone.utc).isoformat(),
        "datacontenttype": "application/json",
    }, order)

    headers, body = to_structured(event)

    async with httpx.AsyncClient() as client:
        response = await client.post(
            BROKER_URL,
            content=body,
            headers=dict(headers),
            timeout=5.0,
        )
        response.raise_for_status()

For the KEDA-based Kubernetes autoscaling that scales on Kafka consumer lag and custom metrics rather than Knative’s HTTP concurrency model, see the Kubernetes operators guide for custom controllers. For the AWS Lambda serverless approach that avoids managing Kubernetes entirely, the serverless guide covers Lambda functions, API Gateway, and event sources. The Claude Skills 360 bundle includes Knative skill sets covering Serving configuration, Eventing topology, and CloudEvents patterns. Start with the free tier to try Knative manifest generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free