Open Policy Agent decouples authorization logic from application code. Instead of if user.role === 'admin' && resource.ownerId === user.id scattered across services, OPA evaluates Rego policies from a central policy repository. Rego is a declarative query language: you describe what’s allowed, and OPA evaluates against JSON data. Kubernetes uses OPA’s Gatekeeper for admission control. API gateways send OPA sidecar queries for every request. Claude Code writes Rego policies, unit tests, Kubernetes ConstraintTemplates, and the CI pipelines that test policy changes before deployment.
CLAUDE.md for OPA Projects
## Policy Stack
- OPA 0.68+ with Rego v1 syntax
- Policy repos: policies/ directory, one package per domain
- Testing: rego_test.rego files alongside every policy; run with `opa test ./policies -v`
- Kubernetes: Gatekeeper v3 CRDs (ConstraintTemplate + Constraint)
- API gateway: OPA sidecar with REST /v1/data endpoint
- Bundles: policies loaded via OPA bundle from S3/GCS for distributed deployment
- Decision logging: always log allow/deny for audit trail
Core Rego Patterns
# policies/orders/authorization.rego
package orders.authorization
import future.keywords.if
import future.keywords.in
# Default deny — explicit allow required
default allow := false
# Allow if any of the rules matches
allow if {
is_order_owner
}
allow if {
is_admin
}
allow if {
is_support_agent
input.action in {"read", "cancel"} # Support can't create
}
# Helper rules
is_order_owner if {
input.user.id == input.resource.owner_id
}
is_admin if {
"admin" in input.user.roles
}
is_support_agent if {
"support" in input.user.roles
}
# Complex rule: support agents can only access orders in their assigned region
allow if {
is_support_agent
input.action == "read"
input.user.region == input.resource.region
}
# Deny reason: explain WHY access was denied (for debugging)
deny_reasons contains "not_owner" if {
not is_order_owner
not is_admin
not is_support_agent
}
deny_reasons contains "support_write_not_allowed" if {
is_support_agent
input.action in {"create", "update"}
}
RBAC Policies
# policies/rbac/roles.rego
package rbac
import future.keywords.if
import future.keywords.in
# Role hierarchy
role_permissions := {
"viewer": {"orders:read", "products:read"},
"editor": {"orders:read", "orders:write", "products:read", "products:write"},
"admin": {"orders:read", "orders:write", "orders:delete", "products:read", "products:write", "users:read", "users:write"},
}
# Effective permissions: union of all role permissions for user
effective_permissions[permission] if {
role := input.user.roles[_]
permission := role_permissions[role][_]
}
# Allow if user has required permission
default allow := false
allow if {
required_permission := concat(":", [input.resource_type, input.action])
required_permission in effective_permissions
}
Rego Unit Tests
# policies/orders/authorization_test.rego
package orders.authorization_test
import future.keywords.if
# Test: order owner can read their own order
test_owner_can_read if {
allow with input as {
"user": {"id": "user-123", "roles": ["viewer"]},
"resource": {"owner_id": "user-123", "order_id": "ORD-001", "region": "us-east"},
"action": "read"
}
}
# Test: non-owner cannot read another user's order
test_non_owner_cannot_read if {
not allow with input as {
"user": {"id": "user-456", "roles": ["viewer"]},
"resource": {"owner_id": "user-123", "order_id": "ORD-001", "region": "us-east"},
"action": "read"
}
}
# Test: admin can read any order
test_admin_can_read_any if {
allow with input as {
"user": {"id": "user-999", "roles": ["admin"]},
"resource": {"owner_id": "user-123", "order_id": "ORD-001", "region": "eu-west"},
"action": "delete"
}
}
# Test: support cannot create orders
test_support_cannot_create if {
not allow with input as {
"user": {"id": "sup-001", "roles": ["support"], "region": "us-east"},
"resource": {"owner_id": "user-123", "region": "us-east"},
"action": "create"
}
}
# Test deny reasons populated correctly
test_deny_reasons_for_non_owner if {
"not_owner" in deny_reasons with input as {
"user": {"id": "user-456", "roles": []},
"resource": {"owner_id": "user-123"},
"action": "read"
}
}
Kubernetes Admission Control (Gatekeeper)
# gatekeeper/required-labels/template.yaml
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
name: requirelabels
spec:
crd:
spec:
names:
kind: RequireLabels
validation:
openAPIV3Schema:
properties:
labels:
type: array
items:
type: string
targets:
- target: admission.k8s.gatekeeper.sh
rego: |
package requirelabels
violation[{"msg": msg}] {
provided := {label | input.review.object.metadata.labels[label]}
required := {label | label := input.parameters.labels[_]}
missing := required - provided
count(missing) > 0
msg := sprintf("Missing required labels: %v", [missing])
}
# gatekeeper/required-labels/constraint.yaml — apply the template
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: RequireLabels
metadata:
name: require-team-labels
spec:
match:
kinds:
- apiGroups: ["apps"]
kinds: ["Deployment"]
namespaces: ["production"]
parameters:
labels: ["team", "app", "version"]
OPA Sidecar Integration (FastAPI)
# middleware/opa_auth.py — call OPA for every request
import httpx
from fastapi import Request, HTTPException
OPA_URL = "http://localhost:8181/v1/data/orders/authorization"
async def check_authorization(
user: dict,
resource: dict,
action: str,
) -> bool:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{OPA_URL}/allow",
json={
"input": {
"user": user,
"resource": resource,
"action": action,
}
},
timeout=0.1, # 100ms budget — OPA is fast
)
response.raise_for_status()
return response.json().get("result", False)
For the Kubernetes pod security and RBAC patterns that Gatekeeper enforces at admission time, the Kubernetes guide covers pod security standards and service account scoping. For the supply chain security policies that OPA can enforce at CI/CD time, the supply chain security guide covers SBOM and policy gates in pipelines. The Claude Skills 360 bundle includes OPA/Rego skill sets covering RBAC policies, Kubernetes admission control, and policy unit testing. Start with the free tier to try Rego policy generation.