Event sourcing stores state as a sequence of events rather than current values. Instead of UPDATE orders SET status = 'shipped', you append OrderShipped{orderId, trackingNumber, timestamp}. The current state is the result of replaying all events. This pattern provides a complete audit trail, enables temporal queries (“what was the state on Tuesday?”), and makes bugs debuggable by replaying history.
Claude Code implements event sourcing correctly — distinguishing between events (facts) and commands (intentions), designing append-only event stores, and building projections that efficiently answer read queries.
When Event Sourcing is Worth It
Good fit:
- Financial systems (every transaction must be auditable and reversible)
- Compliance-heavy domains (HIPAA, SOX — “what did you know and when?”)
- Complex business workflows where state reconstruction is valuable
- Systems that need to support temporal queries (“what was the balance on March 15?”)
Poor fit (complexity not justified):
- Simple CRUD applications — you’ll spend 10x the effort for marginal benefit
- Systems where history genuinely doesn’t matter
Event Store Design
CLAUDE.md for Event-Sourced Systems
## Event Sourcing Configuration
- Event store: PostgreSQL append-only table (simpler than EventStoreDB for < 10M events/day)
- Aggregate pattern: one stream per aggregate instance (ID-based partitioning)
- Projections: rebuilt from event stream; never update an event
- Snapshots: after N events to avoid full replay on every read (threshold: 100 events)
- Event schema: {aggregateId, aggregateType, eventType, version, occurredAt, data}
- Versioning: events are immutable — use Upcasters to transform old events to new schema
## Critical Rules
- Events are FACTS — past tense, immutable (OrderPlaced, PaymentProcessed)
- Commands are INTENTIONS — imperative, can be rejected (PlaceOrder, ProcessPayment)
- Never delete events — compensate with new events (OrderCancelled reverses OrderPlaced)
- Projections can be rebuilt — never treat them as source of truth
-- Event store schema
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type TEXT NOT NULL,
event_type TEXT NOT NULL,
version INT NOT NULL, -- Position in this aggregate's stream
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
data JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}', -- causationId, correlationId, userId
-- Optimistic concurrency: prevent duplicate versions
UNIQUE (aggregate_id, version)
);
-- Index for loading an aggregate's full history
CREATE INDEX events_aggregate_idx ON events (aggregate_id, version);
-- Index for projections catching up
CREATE INDEX events_type_position_idx ON events (event_type, id);
-- Snapshots for read performance
CREATE TABLE snapshots (
aggregate_id UUID PRIMARY KEY,
aggregate_type TEXT NOT NULL,
version INT NOT NULL, -- Last event version included in snapshot
state JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Aggregate Implementation
Implement an Order aggregate using event sourcing.
Commands: PlaceOrder, ConfirmOrder, ShipOrder, CancelOrder.
// src/domain/order/orderAggregate.ts
// Events — facts that happened
type OrderEvent =
| { type: 'OrderPlaced'; customerId: string; items: OrderItem[]; totalCents: number }
| { type: 'OrderConfirmed'; confirmedAt: string }
| { type: 'OrderShipped'; trackingNumber: string; shippedAt: string }
| { type: 'OrderCancelled'; reason: string; cancelledAt: string };
interface OrderState {
status: 'PENDING' | 'CONFIRMED' | 'SHIPPED' | 'CANCELLED';
customerId: string;
items: OrderItem[];
totalCents: number;
trackingNumber?: string;
}
export class Order {
private _id: string;
private _version: number = 0;
private _state: OrderState | null = null;
private _uncommittedEvents: OrderEvent[] = [];
private constructor(id: string) {
this._id = id;
}
get id() { return this._id; }
get version() { return this._version; }
get uncommittedEvents(): ReadonlyArray<OrderEvent> { return this._uncommittedEvents; }
// Factory: create from event history (replay)
static reconstitute(id: string, events: Array<{ event: OrderEvent; version: number }>): Order {
const order = new Order(id);
for (const { event, version } of events) {
order.apply(event, version);
}
return order;
}
// Factory: restore from snapshot + remaining events
static fromSnapshot(
id: string,
snapshot: { state: OrderState; version: number },
events: Array<{ event: OrderEvent; version: number }>,
): Order {
const order = new Order(id);
order._state = snapshot.state;
order._version = snapshot.version;
for (const { event, version } of events) {
order.apply(event, version);
}
return order;
}
// Commands — validate business rules, then emit events
static place(id: string, customerId: string, items: OrderItem[]): Order {
if (items.length === 0) throw new Error('Order must have at least one item');
const totalCents = items.reduce((sum, i) => sum + i.priceCents * i.quantity, 0);
if (totalCents <= 0) throw new Error('Order total must be positive');
const order = new Order(id);
order.raise({ type: 'OrderPlaced', customerId, items, totalCents });
return order;
}
confirm(): void {
if (!this._state) throw new Error('Order not initialized');
if (this._state.status !== 'PENDING') {
throw new Error(`Cannot confirm order in status ${this._state.status}`);
}
this.raise({ type: 'OrderConfirmed', confirmedAt: new Date().toISOString() });
}
ship(trackingNumber: string): void {
if (!this._state) throw new Error('Order not initialized');
if (this._state.status !== 'CONFIRMED') {
throw new Error(`Cannot ship order in status ${this._state.status}`);
}
this.raise({ type: 'OrderShipped', trackingNumber, shippedAt: new Date().toISOString() });
}
cancel(reason: string): void {
if (!this._state) throw new Error('Order not initialized');
if (this._state.status === 'SHIPPED') throw new Error('Cannot cancel shipped order');
if (this._state.status === 'CANCELLED') throw new Error('Order already cancelled');
this.raise({ type: 'OrderCancelled', reason, cancelledAt: new Date().toISOString() });
}
// Private: record event and evolve state
private raise(event: OrderEvent): void {
this._uncommittedEvents.push(event);
this.apply(event, this._version + 1);
}
// State evolution — pure function, no business logic here
private apply(event: OrderEvent, version: number): void {
switch (event.type) {
case 'OrderPlaced':
this._state = {
status: 'PENDING',
customerId: event.customerId,
items: event.items,
totalCents: event.totalCents,
};
break;
case 'OrderConfirmed':
this._state!.status = 'CONFIRMED';
break;
case 'OrderShipped':
this._state!.status = 'SHIPPED';
this._state!.trackingNumber = event.trackingNumber;
break;
case 'OrderCancelled':
this._state!.status = 'CANCELLED';
break;
}
this._version = version;
}
markEventsAsCommitted(): void {
this._uncommittedEvents.length = 0;
}
}
Event Store Repository
// src/infrastructure/orderRepository.ts
export class OrderRepository {
constructor(
private db: Knex,
private SNAPSHOT_THRESHOLD = 100,
) {}
async save(order: Order): Promise<void> {
if (order.uncommittedEvents.length === 0) return;
await this.db.transaction(async (trx) => {
const events = order.uncommittedEvents.map((event, i) => ({
aggregate_id: order.id,
aggregate_type: 'Order',
event_type: event.type,
version: order.version - order.uncommittedEvents.length + 1 + i,
data: JSON.stringify(event),
metadata: JSON.stringify({ savedAt: new Date().toISOString() }),
}));
// Append events — UNIQUE (aggregate_id, version) prevents concurrent write conflicts
await trx('events').insert(events);
// Take snapshot if we've accumulated enough events
if (order.version % this.SNAPSHOT_THRESHOLD === 0) {
await trx('snapshots').insert({
aggregate_id: order.id,
aggregate_type: 'Order',
version: order.version,
state: JSON.stringify(order),
}).onConflict('aggregate_id').merge();
}
});
order.markEventsAsCommitted();
}
async load(id: string): Promise<Order | null> {
// Check for snapshot first
const snapshot = await this.db('snapshots')
.where({ aggregate_id: id })
.first();
const startVersion = snapshot?.version ?? 0;
const eventRows = await this.db('events')
.where({ aggregate_id: id })
.where('version', '>', startVersion)
.orderBy('version')
.select();
if (!snapshot && eventRows.length === 0) return null;
const events = eventRows.map(row => ({
event: JSON.parse(row.data) as OrderEvent,
version: row.version,
}));
if (snapshot) {
return Order.fromSnapshot(id, { state: JSON.parse(snapshot.state), version: snapshot.version }, events);
}
return Order.reconstitute(id, events);
}
}
Temporal Queries
What was the state of order #123 at 3pm on December 15?
// Temporal query: replay events up to a point in time
async function getOrderAtTime(orderId: string, asOf: Date): Promise<OrderState | null> {
const eventRows = await db('events')
.where({ aggregate_id: orderId })
.where('occurred_at', '<=', asOf.toISOString())
.orderBy('version')
.select();
if (eventRows.length === 0) return null;
const events = eventRows.map(row => ({
event: JSON.parse(row.data) as OrderEvent,
version: row.version,
}));
const order = Order.reconstitute(orderId, events);
return order as any; // Return state at that point in time
}
// "What orders were pending yesterday at noon?"
async function getPendingOrdersAt(asOf: Date): Promise<string[]> {
// This is where projections shine — the read model is pre-built
return db('order_projection_snapshots')
.where('captured_at', '<=', asOf.toISOString())
.where('status', 'PENDING')
.pluck('order_id');
}
For combining event sourcing with CQRS read models that answer complex queries efficiently, see the event-driven architecture guide. For using Kafka as the event store backbone for cross-service event sourcing, the Kafka guide covers that integration. The Claude Skills 360 bundle includes architecture skill sets covering event sourcing patterns, aggregate implementation, and projection design. Start with the free tier to try event store generation.