Elixir’s OTP framework makes fault tolerance a first-class concern: processes are isolated, failures don’t cascade, and supervisors restart crashed processes automatically. Claude Code generates idiomatic OTP code — correct GenServer patterns, proper supervisor tree design, and Phoenix LiveView components that handle real-time updates cleanly.
This guide covers Elixir OTP with Claude Code: GenServers, supervision trees, Phoenix LiveView, Ecto, and testing.
CLAUDE.md for Elixir Projects
## Elixir OTP Stack
- Elixir 1.16, OTP 26, Phoenix 1.7
- Database: PostgreSQL via Ecto 3.11
- Real-time: Phoenix LiveView + PubSub
## OTP Patterns
- GenServer for stateful long-running processes (caches, rate limiters, state machines)
- Task for one-off async work — Task.Supervisor for supervised fire-and-forget
- Agent for simple state — GenServer when you need custom logic
- Supervisor tree mirrors domain boundaries
- Never store process state in application config — use Registry for named processes
## Testing
- ExUnit with async: true where possible
- Mox for external service mocks (behaviours required)
- Ecto sandbox for DB tests (async: true with sandbox)
GenServer: Order Rate Limiter
Create a GenServer that implements per-user rate limiting
for order creation. 5 orders per minute per user.
Use ETS for fast lookups.
# lib/orders/rate_limiter.ex
defmodule Orders.RateLimiter do
use GenServer
require Logger
@max_orders_per_minute 5
@cleanup_interval :timer.minutes(2)
# Public API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc "Returns :ok if allowed, {:error, :rate_limited, retry_after_seconds} if blocked"
def check_and_increment(user_id) do
GenServer.call(__MODULE__, {:check_and_increment, user_id})
end
# Callbacks
@impl true
def init(_opts) do
# ETS table for fast concurrent reads (protected = only this process writes)
table = :ets.new(:order_rate_limits, [:protected, :named_table, read_concurrency: true])
# Schedule periodic cleanup
Process.send_after(self(), :cleanup, @cleanup_interval)
{:ok, %{table: table}}
end
@impl true
def handle_call({:check_and_increment, user_id}, _from, state) do
now = System.system_time(:second)
window_start = now - 60
case :ets.lookup(state.table, user_id) do
[] ->
# First request — create entry
:ets.insert(state.table, {user_id, [now]})
{:reply, :ok, state}
[{^user_id, timestamps}] ->
# Filter to requests within last minute
recent = Enum.filter(timestamps, &(&1 > window_start))
if length(recent) >= @max_orders_per_minute do
oldest = Enum.min(recent)
retry_after = 60 - (now - oldest)
{:reply, {:error, :rate_limited, retry_after}, state}
else
:ets.insert(state.table, {user_id, [now | recent]})
{:reply, :ok, state}
end
end
end
@impl true
def handle_info(:cleanup, state) do
cutoff = System.system_time(:second) - 120
# Remove entries with no requests in last 2 minutes
:ets.foldl(
fn {user_id, timestamps}, _ ->
if Enum.all?(timestamps, &(&1 < cutoff)) do
:ets.delete(state.table, user_id)
end
end,
nil,
state.table
)
Process.send_after(self(), :cleanup, @cleanup_interval)
{:noreply, state}
end
end
Supervisor Tree
Show me the supervisor tree for an orders application.
Include the rate limiter, a cache GenServer, and the Phoenix endpoint.
# lib/orders_app/application.ex
defmodule OrdersApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Database
OrdersApp.Repo,
# Phoenix PubSub (required for LiveView and Channels)
{Phoenix.PubSub, name: OrdersApp.PubSub},
# Task supervisor for fire-and-forget async work
{Task.Supervisor, name: OrdersApp.TaskSupervisor},
# Domain processes — under DynamicSupervisor would allow runtime starts
Orders.RateLimiter,
Orders.ProductCache,
# Telemetry (metrics, tracing)
OrdersAppWeb.Telemetry,
# Phoenix HTTP endpoint — always last (depends on everything above)
OrdersAppWeb.Endpoint,
]
opts = [strategy: :one_for_one, name: OrdersApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
# lib/orders/product_cache.ex — GenServer for caching product lookups
defmodule Orders.ProductCache do
use GenServer
@ttl_seconds 300 # 5 minute cache
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def get(product_id) do
GenServer.call(__MODULE__, {:get, product_id})
end
def put(product_id, product) do
GenServer.cast(__MODULE__, {:put, product_id, product})
end
@impl true
def init(_) do
{:ok, %{cache: %{}, timers: %{}}}
end
@impl true
def handle_call({:get, product_id}, _from, state) do
{:reply, Map.get(state.cache, product_id), state}
end
@impl true
def handle_cast({:put, product_id, product}, state) do
# Cancel existing TTL timer if present
if existing = Map.get(state.timers, product_id) do
Process.cancel_timer(existing)
end
# Schedule eviction
timer_ref = Process.send_after(self(), {:evict, product_id}, @ttl_seconds * 1000)
{:noreply, %{
cache: Map.put(state.cache, product_id, product),
timers: Map.put(state.timers, product_id, timer_ref),
}}
end
@impl true
def handle_info({:evict, product_id}, state) do
{:noreply, %{
cache: Map.delete(state.cache, product_id),
timers: Map.delete(state.timers, product_id),
}}
end
end
Phoenix LiveView
Create a LiveView for real-time order status tracking.
When order status changes (via PubSub), update the UI without page reload.
# lib/orders_app_web/live/order_tracking_live.ex
defmodule OrdersAppWeb.OrderTrackingLive do
use OrdersAppWeb, :live_view
alias OrdersApp.Orders
@impl true
def mount(%{"id" => order_id}, _session, socket) do
# Subscribe to order updates if connected (not SSR)
if connected?(socket) do
Phoenix.PubSub.subscribe(OrdersApp.PubSub, "order:#{order_id}")
end
case Orders.get_order(order_id, socket.assigns.current_user.id) do
{:ok, order} ->
{:ok, assign(socket, order: order, loading: false)}
{:error, :not_found} ->
{:ok, push_navigate(socket, to: "/orders")}
end
end
@impl true
def handle_info({:order_updated, updated_order}, socket) do
# Real-time update — no page refresh needed
{:noreply, assign(socket, order: updated_order)}
end
@impl true
def handle_event("cancel_order", _params, socket) do
case Orders.cancel(socket.assigns.order.id, socket.assigns.current_user.id) do
{:ok, updated_order} ->
{:noreply, assign(socket, order: updated_order)}
{:error, :not_cancellable} ->
{:noreply, put_flash(socket, :error, "Order cannot be cancelled at this stage")}
end
end
@impl true
def render(assigns) do
~H"""
<div class="order-tracker">
<h1>Order #<%= String.slice(@order.id, 0, 8) %></h1>
<div class="status-bar">
<.status_step label="Pending" active={@order.status in [:pending, :confirmed, :shipped, :delivered]} />
<.status_step label="Confirmed" active={@order.status in [:confirmed, :shipped, :delivered]} />
<.status_step label="Shipped" active={@order.status in [:shipped, :delivered]} />
<.status_step label="Delivered" active={@order.status == :delivered} />
</div>
<div class="order-items">
<%= for item <- @order.items do %>
<div class="item">
<span class="name"><%= item.product_name %></span>
<span class="qty">x<%= item.quantity %></span>
</div>
<% end %>
</div>
<%= if @order.status == :pending do %>
<button phx-click="cancel_order" data-confirm="Cancel this order?">
Cancel Order
</button>
<% end %>
</div>
"""
end
defp status_step(assigns) do
~H"""
<div class={["step", @active && "active"]}>
<%= @label %>
</div>
"""
end
end
Broadcast the update from wherever order status changes:
# After updating order status:
Phoenix.PubSub.broadcast(
OrdersApp.PubSub,
"order:#{order.id}",
{:order_updated, order}
)
Ecto Queries and Testing
# lib/orders_app/orders.ex
defmodule OrdersApp.Orders do
import Ecto.Query
alias OrdersApp.Repo
alias OrdersApp.Orders.Order
def list_for_user(user_id, opts \\ []) do
page = Keyword.get(opts, :page, 1)
per_page = Keyword.get(opts, :per_page, 20)
Order
|> where([o], o.user_id == ^user_id)
|> order_by([o], desc: o.inserted_at)
|> preload(:items)
|> Repo.paginate(page: page, page_size: per_page)
end
def cancel(order_id, user_id) do
with {:ok, order} <- get_for_user(order_id, user_id),
true <- order.status in [:pending, :confirmed] || {:error, :not_cancellable} do
order
|> Ecto.Changeset.change(status: :cancelled)
|> Repo.update()
else
{:error, reason} -> {:error, reason}
false -> {:error, :not_cancellable}
end
end
defp get_for_user(order_id, user_id) do
case Repo.get_by(Order, id: order_id, user_id: user_id) do
nil -> {:error, :not_found}
order -> {:ok, order}
end
end
end
# test/orders_app/orders_test.exs
defmodule OrdersApp.OrdersTest do
use OrdersApp.DataCase, async: true # Parallel DB tests via Ecto sandbox
alias OrdersApp.Orders
import OrdersApp.Factory # ExMachina factories
describe "cancel/2" do
test "cancels a pending order belonging to the user" do
user = insert(:user)
order = insert(:order, user: user, status: :pending)
assert {:ok, cancelled} = Orders.cancel(order.id, user.id)
assert cancelled.status == :cancelled
end
test "returns error when order is already shipped" do
user = insert(:user)
order = insert(:order, user: user, status: :shipped)
assert {:error, :not_cancellable} = Orders.cancel(order.id, user.id)
end
test "returns not_found for another user's order" do
user = insert(:user)
other_user = insert(:user)
order = insert(:order, user: other_user, status: :pending)
assert {:error, :not_found} = Orders.cancel(order.id, user.id)
end
end
end
For real-time collaboration patterns that complement LiveView’s PubSub, see the realtime collaboration guide. For the Rust/Axum approach to high-performance web services, see the Axum guide. The Claude Skills 360 bundle includes Elixir skill sets covering OTP patterns, supervision trees, and LiveView real-time features. Start with the free tier to try Elixir code generation.