AI SYNTHESIZED • 150 SHEETS
v1.0.0

🎯 Event Sourcing — Complete Cheatsheet 🎯

Event Sourcing es un patrón de persistencia que almacena el estado de una aplicación como una secuencia inmutable de eventos, en lugar de guardar solo el estado actual. Cada cambio en el sistema se captura como un evento de dominio que representa un hecho ocurrido, permitiendo reconstruir cualquier estado histórico, auditar completamente la evolución del sistema y reaccionar a cambios de forma desacoplada. Popularizado por Greg Young, Event Sourcing combina naturalmente con CQRS y Domain-Driven Design para construir sistemas resilientes y trazables. Este cheatsheet cubre desde la definición de eventos y streams hasta proyecciones, snapshots, versionado de esquemas, upcasting, concurrencia optimista, integración con CQRS, testing, herramientas de producción y patrones avanzados de evolución de esquemas. Ideal para arquitectos y desarrolladores backend que trabajan con dominios complejos donde la auditabilidad, el debug forense y la flexibilidad de evolución son críticos.


1. 🌟 Core Concepts

  • Event (Evento): Registro inmutable de un hecho significativo ocurrido en el dominio. Se nombra en pasado (OrderPlaced, PaymentRefunded) y contiene los datos necesarios para describir lo sucedido, no cómo procesarlo.
    • Por qué importa: Los eventos son la fuente de verdad. El estado actual se deriva de ellos, nunca al revés.
  • Event Stream (Stream de Eventos): Secuencia ordenada de eventos asociados a una misma entidad o agregado (ej. todos los eventos de order-123). Es la “historia completa” de esa entidad.
  • Event Store: Base de datos optimizada para almacenar streams de eventos de forma append-only, con garantías de orden y atomicidad por stream.
  • Aggregate (Agregado): Unidad de consistencia que reconstituye su estado aplicando eventos en orden. Solo puede mutar a través de métodos que generan nuevos eventos.
  • Rehydration (Rehidratación): Proceso de reconstruir el estado de un agregado aplicando todos sus eventos históricos desde el stream.
  • Projection (Proyección): Transformación que consume eventos para construir read models, vistas materializadas o incluso otros streams derivados.
  • Snapshot: Captura periódica del estado de un agregado para evitar reprocesar eventos antiguos. Optimización de rehidratación.
  • Append-Only: Los eventos nunca se modifican ni eliminan. Si hay un error, se publica un evento correctivo (compensación), no se edita el original.
  • Causal Ordering: Los eventos dentro de un stream se ordenan por ocurrencia. Streams distintos pueden procesarse en paralelo.
  • CQRS Orthogonal but Natural: Event Sourcing no requiere CQRS, pero combinados forman una arquitectura poderosa donde el write side publica eventos que alimentan read models optimizados.

2. 🛠 Setup & Tooling

2.1. Event Stores Populares

Event StoreCaracterísticasCaso de uso
EventStoreDBNativo para ES, projections built-in, $all streamSistemas ES-first
KafkaStreaming distribuido, retención configurable, replay masivoEvent-driven + ES
PostgreSQL + Event Sourcing libFamiliar, transaccional, maduroProyectos que ya usan PG
Marten (Postgres)Document DB + ES sobre PG.NET ecosystem
Axon ServerJava-first, CQRS/ES nativoJava/Spring
Redis StreamsLigero, in-memory, rápidoPrototipos, baja escala
DynamoDB StreamsServerless, AWS integradoArquitecturas serverless

2.2. Instalación (Node.js/TypeScript)

# EventStoreDB client
npm install @eventstore/db-client

# Alternativa: Eventuous (wrapper sobre ESDB)
npm install @eventuous/core @eventuous/eventstore

# Para PostgreSQL-based
npm install @event-driven-io/emmett pg

# Schema Registry para versionado (opcional pero recomendado)
npm install @kafkajs/confluent-schema-registry

2.3. EventStoreDB con Docker

# Levantar EventStoreDB en modo insecure (dev)
docker run --name eventstore -it -p 2113:2113 \
  eventstore/eventstore:latest \
  --insecure \
  --run-projections=All \
  --enable-atom-pub-over-http

# UI disponible en http://localhost:2113
# gRPC endpoint: esdb://localhost:2113?tls=false

2.4. Estructura del Proyecto

src/
├── domain/
│   └── order/
│       ├── Order.ts                  # Aggregate
│       ├── events/                   # Definición de eventos
│       │   ├── OrderCreated.ts
│       │   ├── OrderConfirmed.ts
│       │   └── OrderCancelled.ts
│       └── commands/                 # Opcional si usas CQRS
├── infrastructure/
│   ├── event-store/                  # Cliente EventStoreDB
│   ├── projections/                  # Read models derivados
│   └── subscriptions/                # Catch-up & persistent subs
├── application/
│   ├── services/                     # Use cases
│   └── dtos/                         # Read models públicos
└── tests/
    ├── unit/                         # Tests de agregados
    └── integration/                  # Tests con ES real

3. 📝 Event Definition & Basic Operations

3.1. Definición de Eventos

// Eventos deben ser inmutables y auto-contenidos
// Base común para todos los eventos del sistema
export interface DomainEvent {
  eventType: string;           // Nombre del tipo (para deserialización)
  occurredAt: Date;            // Cuándo sucedió
  eventId: string;             // UUID único del evento
}

// Eventos específicos del dominio Order
export class OrderCreated implements DomainEvent {
  eventType = 'OrderCreated' as const;
  occurredAt = new Date();
  eventId = generateUuid();

  constructor(
    public readonly orderId: string,
    public readonly customerId: string,
    public readonly items: ReadonlyArray<OrderItemData>,
    public readonly total: number,
  ) {}
}

export class OrderConfirmed implements DomainEvent {
  eventType = 'OrderConfirmed' as const;
  occurredAt = new Date();
  eventId = generateUuid();

  constructor(
    public readonly orderId: string,
    public readonly paymentId: string,
    public readonly confirmedAt: Date = new Date(),
  ) {}
}

export class OrderCancelled implements DomainEvent {
  eventType = 'OrderCancelled' as const;
  occurredAt = new Date();
  eventId = generateUuid();

  constructor(
    public readonly orderId: string,
    public readonly reason: string,
    public readonly cancelledBy: string,
  ) {}
}

// Eventos son datos, no comportamiento.
// Usar ReadonlyArray y readonly para reforzar inmutabilidad.

3.2. Escribir Eventos al Event Store

import { EventStoreDBClient, jsonEvent, AppendExpectedRevision } from '@eventstore/db-client';

const client = EventStoreDBClient.connectionString('esdb://localhost:2113?tls=false');

// Construir eventos para el Event Store
function toEventData(event: DomainEvent) {
  return jsonEvent({
    type: event.eventType,
    data: {
      // Excluir metadatos internos del event payload
      orderId: (event as any).orderId,
      ...event,
    },
    metadata: {
      eventId: event.eventId,
      occurredAt: event.occurredAt.toISOString(),
      // Añadir contexto de correlación para tracing
      correlationId: AsyncLocalStorage.getCorrelationId(),
      causationId: AsyncLocalStorage.getCausationId(),
      userId: getCurrentUserId(),
    },
  });
}

// Append a un stream (append-only)
async function appendEvents(
  streamName: string,
  events: DomainEvent[],
  expectedRevision: bigint = AppendExpectedRevision.ANY
): Promise<void> {
  await client.appendToStream(
    streamName,
    events.map(toEventData),
    { expectedRevision }  // Control de concurrencia optimista
  );
}

// Stream naming convention: Categoría-ID
// order-123, customer-456, account-789
const streamName = `order-${orderId}`;
await appendEvents(streamName, [new OrderCreated(orderId, customerId, items, total)]);

3.3. Leer un Stream Completo

import { readStream, direction } from '@eventstore/db-client';

async function readAllEvents(streamName: string): Promise<DomainEvent[]> {
  const events: DomainEvent[] = [];

  // Lee desde el inicio hasta el final
  for await (const { event } of client.readStream(streamName, {
    direction: direction.FORWARDS,
    fromRevision: BigInt(0),
  })) {
    if (!event) continue;

    // Deserializar según el eventType
    const domainEvent = deserializeEvent(event);
    events.push(domainEvent);
  }

  return events;
}

function deserializeEvent(record: any): DomainEvent {
  const data = record.data;
  const metadata = record.metadata || {};

  // Factory por tipo — permite extender sin modificar
  switch (record.type) {
    case 'OrderCreated':
      return new OrderCreated(data.orderId, data.customerId, data.items, data.total);
    case 'OrderConfirmed':
      return new OrderConfirmed(data.orderId, data.paymentId, new Date(data.confirmedAt));
    case 'OrderCancelled':
      return new OrderCancelled(data.orderId, data.reason, data.cancelledBy);
    default:
      throw new Error(`Unknown event type: ${record.type}`);
  }
}

4. 🔁 Streams, Categories & $all

4.1. Convenciones de Nomenclatura

// Patrones estándar de streams
const streams = {
  // Por agregado (stream único por entidad)
  order: (id: string) =&gt; `order-${id}`,
  customer: (id: string) =&gt; `customer-${id}`,

  // Por categoría (todos los eventos de un tipo)
  // EventStoreDB crea automáticamente: $ce-order, $ce-customer
  // Útil para proyecciones globales

  // Global ($all): TODOS los eventos del sistema en orden
  // $all es un stream especial del EventStoreDB
};

// Por qué importa la convención:
// - Permite proyecciones por categoría sin subscribirse a todo
// - Facilita testing (leer solo el stream del agregado bajo test)
// - Habilita snapshots por categoría

4.2. Lectura desde $all (Global Stream)

// Útil para proyecciones globales, auditoría o replay completo
async function readFromAll(fromPosition?: {
  commitPosition: bigint;
  preparePosition: bigint;
}): Promise<void> {
  const options = fromPosition
    ? { fromPosition, direction: direction.FORWARDS }
    : { from: 'start', direction: direction.FORWARDS };

  for await (const { event } of client.readAll(options)) {
    if (!event) continue;
    // Evitar eventos del sistema (proyecciones internas)
    if (event.streamId.startsWith('$')) continue;

    const domainEvent = deserializeEvent(event);
    await processGlobalEvent(domainEvent, event);
  }
}

4.3. Persistent Subscriptions (Consumer Groups)

// Una suscripción persistente recuerda la posición
// Útil para múltiples consumidores del mismo stream

// Crear una vez
await client.createPersistentSubscriptionToStream(
  'order-123',
  'invoice-generator',
  {
    resolveLinkTos: true,
    settings: {
      maxRetryCount: 10,
      minCheckpointCount: 10,
      readBatchSize: 20,
    },
  }
);

// Consumir (múltiples workers compiten por eventos)
const subscription = client.subscribeToPersistentSubscriptionToStream(
  'order-123',
  'invoice-generator'
);

for await (const { event, retryCount } of subscription) {
  try {
    await generateInvoice(deserializeEvent(event));
    // ACK explícito — solo se avanza el offset tras éxito
    subscription.ack(event);
  } catch (error) {
    // NACK con estrategia de retry
    subscription.nack('retry', error.message, event);
  }
}

5. 🏛️ Aggregates & Rehydration

5.1. Aggregate con Event Sourcing

// Un aggregate es una máquina de estados que evoluciona aplicando eventos
export class Order {
  // Estado interno (privado)
  private _id!: string;
  private _customerId!: string;
  private _items: OrderItem[] = [];
  private _status: OrderStatus = 'DRAFT';
  private _total = 0;
  private _version = -1;  // -1 significa "nuevo"

  // Eventos pendientes de persistir
  private uncommittedEvents: DomainEvent[] = [];

  // === FACTORY: Crea un nuevo aggregate generando eventos ===
  static create(
    orderId: string,
    customerId: string,
    items: OrderItemData[],
  ): Order {
    if (items.length === 0) {
      throw new Error('Order must have at least one item');
    }

    const order = new Order();
    const total = items.reduce((sum, i) =&gt; sum + i.price * i.quantity, 0);

    // Genera el evento pero no aplica directamente
    order.raiseEvent(new OrderCreated(orderId, customerId, items, total));
    return order;
  }

  // === REHYDRATION: Reconstruye estado desde eventos ===
  static rehydrate(events: DomainEvent[]): Order {
    if (events.length === 0) {
      throw new Error('Cannot rehydrate from empty event list');
    }

    const order = new Order();
    for (const event of events) {
      order.applyEvent(event);
    }
    // Limpiar uncommitted (rehydration no genera eventos nuevos)
    order.uncommittedEvents = [];
    return order;
  }

  // === MÉTODOS DE COMPORTAMIENTO: Validan + generan eventos ===
  confirm(paymentId: string): void {
    if (this._status !== 'DRAFT') {
      throw new Error(`Cannot confirm order in status ${this._status}`);
    }
    if (this._total === 0) {
      throw new Error('Cannot confirm empty order');
    }
    this.raiseEvent(new OrderConfirmed(this._id, paymentId));
  }

  cancel(reason: string, cancelledBy: string): void {
    if (this._status === 'SHIPPED') {
      throw new Error('Cannot cancel shipped order');
    }
    if (this._status === 'CANCELLED') {
      throw new Error('Order already cancelled');
    }
    this.raiseEvent(new OrderCancelled(this._id, reason, cancelledBy));
  }

  addItem(product: Product, quantity: number): void {
    if (this._status !== 'DRAFT') {
      throw new Error('Can only add items to draft orders');
    }
    if (quantity &lt;= 0) {
      throw new Error('Quantity must be positive');
    }
    this.raiseEvent(new OrderItemAdded(this._id, product.id, product.name, product.price, quantity));
  }

  // === EVENT RAISING: Encola + aplica ===
  private raiseEvent(event: DomainEvent): void {
    this.uncommittedEvents.push(event);
    this.applyEvent(event);
  }

  // === EVENT APPLICATION: Modifica estado según tipo de evento ===
  private applyEvent(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.applyOrderCreated(event as OrderCreated);
        break;
      case 'OrderConfirmed':
        this.applyOrderConfirmed(event as OrderConfirmed);
        break;
      case 'OrderCancelled':
        this.applyOrderCancelled(event as OrderCancelled);
        break;
      case 'OrderItemAdded':
        this.applyOrderItemAdded(event as OrderItemAdded);
        break;
    }
    this._version++;
  }

  private applyOrderCreated(e: OrderCreated): void {
    this._id = e.orderId;
    this._customerId = e.customerId;
    this._items = e.items.map(i =&gt; ({ ...i }));
    this._total = e.total;
    this._status = 'DRAFT';
  }

  private applyOrderConfirmed(e: OrderConfirmed): void {
    this._status = 'CONFIRMED';
  }

  private applyOrderCancelled(e: OrderCancelled): void {
    this._status = 'CANCELLED';
  }

  private applyOrderItemAdded(e: OrderItemAdded): void {
    this._items.push({
      productId: e.productId,
      name: e.productName,
      price: e.unitPrice,
      quantity: e.quantity,
    });
    this._total += e.unitPrice * e.quantity;
  }

  // === ACCESORS ===
  pullUncommittedEvents(): DomainEvent[] {
    const events = [...this.uncommittedEvents];
    this.uncommittedEvents = [];
    return events;
  }

  get version(): number { return this._version; }
  get id(): string { return this._id; }
  get status(): OrderStatus { return this._status; }
}

5.2. Repository para el Aggregate

export class EventSourcedOrderRepository {
  constructor(private client: EventStoreDBClient) {}

  private streamName(orderId: string): string {
    return `order-${orderId}`;
  }

  async save(order: Order): Promise<void> {
    const events = order.pullUncommittedEvents();
    if (events.length === 0) return;

    // Optimistic concurrency: solo persistir si nadie más modificó
    await this.client.appendToStream(
      this.streamName(order.id),
      events.map(toEventData),
      { expectedRevision: BigInt(order.version - events.length) }
    );
  }

  async load(orderId: string): Promise<Order | null> {
    const events: DomainEvent[] = [];

    try {
      for await (const { event } of this.client.readStream(this.streamName(orderId))) {
        if (event) events.push(deserializeEvent(event));
      }
    } catch (err: any) {
      if (err?.code === 5) return null;  // Stream not found
      throw err;
    }

    if (events.length === 0) return null;
    return Order.rehydrate(events);
  }
}

6. 📊 Projections & Read Models

6.1. Proyección a Read Model (PostgreSQL)

// Una proyección escucha eventos y construye una vista materializada
export class OrderListProjection {
  constructor(private db: Pool) {}

  async handle(event: DomainEvent): Promise<void> {
    // Cada handler debe ser idempotente
    switch (event.eventType) {
      case 'OrderCreated':
        await this.onOrderCreated(event as OrderCreated);
        break;
      case 'OrderConfirmed':
        await this.onOrderConfirmed(event as OrderConfirmed);
        break;
      case 'OrderCancelled':
        await this.onOrderCancelled(event as OrderCancelled);
        break;
    }
  }

  private async onOrderCreated(e: OrderCreated): Promise<void> {
    await this.db.query(
      `INSERT INTO read_orders (id, customer_id, status, total, created_at)
       VALUES ($1, $2, 'DRAFT', $3, $4)
       ON CONFLICT (id) DO NOTHING`,  // Idempotencia
      [e.orderId, e.customerId, e.total, e.occurredAt]
    );
  }

  private async onOrderConfirmed(e: OrderConfirmed): Promise<void> {
    await this.db.query(
      `UPDATE read_orders SET status = 'CONFIRMED', updated_at = $2 WHERE id = $1`,
      [e.orderId, e.confirmedAt]
    );
  }

  private async onOrderCancelled(e: OrderCancelled): Promise<void> {
    await this.db.query(
      `UPDATE read_orders SET status = 'CANCELLED', updated_at = $2 WHERE id = $1`,
      [e.orderId, e.occurredAt]
    );
  }
}

6.2. Subscribe to Category Stream

// Escuchar TODOS los eventos de una categoría (todos los orders)
async function subscribeToOrders(projection: OrderListProjection) {
  // $ce-order es una proyección built-in que concatena todos los streams order-*
  const subscription = client.subscribeToStream('$ce-order', {
    fromRevision: BigInt(0),
    resolveLinkTos: true,
  });

  for await (const { event } of subscription) {
    if (!event || event.streamId.startsWith('$')) continue;

    try {
      const domainEvent = deserializeEvent(event);
      await projection.handle(domainEvent);
    } catch (error) {
      // Log y enviar a DLQ, no detener la suscripción
      logger.error('Projection failed', { event, error });
      await deadLetterQueue.push({ event, error });
    }
  }
}

6.3. Proyecciones Múltiples desde los Mismos Eventos

// Un evento puede alimentar múltiples proyecciones independientes
const projections = {
  orderList: new OrderListProjection(db),           // Listado general
  orderDashboard: new OrderDashboardProjection(db), // Estadísticas agregadas
  customerOrders: new CustomerOrdersProjection(db), // Órdenes por cliente
  inventory: new InventoryProjection(db),           // Impacto en inventario
  auditLog: new AuditLogProjection(db),             // Auditoría completa
};

// Fan-out: cada evento se distribuye a todas las proyecciones interesadas
async function dispatchEvent(event: DomainEvent): Promise<void> {
  await Promise.all(
    Object.values(projections).map(p =&gt; p.handle(event).catch(err =&gt; {
      // Una proyección fallando no bloquea a las demás
      logger.error(`Projection ${p.constructor.name} failed`, err);
    }))
  );
}

7. 📸 Snapshots & Optimization

7.1. Cuándo Usar Snapshots

// Un stream con 10,000 eventos tarda en rehidratarse
// Snapshot: guardar el estado cada N eventos para reducir carga

interface Snapshot {
  aggregateId: string;
  version: number;              // Versión en la que se tomó
  state: unknown;               // Estado serializado
  createdAt: Date;
}

// Estrategia típica: snapshot cada 50-100 eventos
const SNAPSHOT_FREQUENCY = 100;

7.2. Guardar Snapshots

export class SnapshotRepository {
  constructor(private client: EventStoreDBClient) {}

  async save(order: Order): Promise<void> {
    const snapshot: Snapshot = {
      aggregateId: order.id,
      version: order.version,
      state: order.toState(),  // Método público que expone estado serializable
      createdAt: new Date(),
    };

    // Los snapshots también son eventos (en stream especial)
    await this.client.appendToStream(
      `$order-${order.id}-snapshot`,
      [jsonEvent({ type: 'OrderSnapshot', data: snapshot })]
    );
  }
}

7.3. Rehidratar con Snapshot

async function loadWithSnapshot(orderId: string): Promise<Order | null> {
  // 1. Buscar el snapshot más reciente
  let snapshot: Snapshot | null = null;
  try {
    for await (const { event } of client.readStream(`$order-${orderId}-snapshot`, {
      direction: direction.BACKWARDS,
      maxCount: 1,
    })) {
      if (event) snapshot = event.data as Snapshot;
    }
  } catch {
    // Sin snapshots todavía
  }

  // 2. Leer eventos DESDE la versión del snapshot
  const fromRevision = snapshot ? BigInt(snapshot.version + 1) : BigInt(0);
  const events: DomainEvent[] = [];

  try {
    for await (const { event } of client.readStream(`order-${orderId}`, {
      fromRevision,
    })) {
      if (event) events.push(deserializeEvent(event));
    }
  } catch (err: any) {
    if (err?.code === 5 && !snapshot) return null;  // Sin stream ni snapshot
    if (err?.code !== 5) throw err;
  }

  // 3. Si hay snapshot, recrear desde el estado
  if (snapshot) {
    return Order.rehydrateFromSnapshot(snapshot.state, events);
  }

  // 4. Si no hay snapshot, rehidratar desde cero
  if (events.length === 0) return null;
  return Order.rehydrate(events);
}

8. 🔄 Versioning & Schema Evolution

8.1. El Problema

Los esquemas de eventos evolucionan. Agregar campos es fácil; renombrar, eliminar o cambiar tipos es disruptivo. Los eventos viejos siguen en el stream y deben poder reprocesarse.

8.2. Reglas de Evolución

  • Permitido: Agregar campos opcionales nuevos.
  • Permitido: Agregar nuevos tipos de eventos.
  • ⚠️ Evitar: Renombrar campos (mejor: agregar nuevo + deprecar viejo).
  • Nunca: Eliminar campos existentes (rompe replay).
  • Nunca: Cambiar semántica de un campo existente.

8.3. Upcasting (Transformación de Eventos Antiguos)

// V1: OrderCreated tenía items como array de {productId, quantity}
// V2: OrderCreated agrega unitPrice dentro de cada item

interface OrderCreatedV1 {
  orderId: string;
  customerId: string;
  items: Array&lt;{ productId: string; quantity: number }>;
  total: number;
}

interface OrderCreatedV2 {
  orderId: string;
  customerId: string;
  items: Array&lt;{ productId: string; productName: string; quantity: number; unitPrice: number }>;
  total: number;
}

// Upcaster: transforma eventos viejos al formato actual
function upcastOrderCreated(raw: any): OrderCreatedV2 {
  // Si ya es V2, retornar tal cual
  if (raw.items[0]?.unitPrice !== undefined) {
    return raw as OrderCreatedV2;
  }

  // Si es V1, convertir a V2
  // Necesitamos el precio actual — lookup o fallback a 0
  const enrichedItems = raw.items.map((item: any) =&gt; ({
    ...item,
    productName: `Product ${item.productId}`,  // Placeholder
    unitPrice: raw.total / raw.items.reduce((s: number, i: any) =&gt; s + i.quantity, 0),
  }));

  return { ...raw, items: enrichedItems };
}

// Aplicar upcasting al deserializar
function deserializeEvent(record: any): DomainEvent {
  const data = record.data;
  switch (record.type) {
    case 'OrderCreated':
      const v2 = upcastOrderCreated(data);
      return new OrderCreated(v2.orderId, v2.customerId, v2.items, v2.total);
    // ...
  }
}

8.4. Versionado Explícito de Eventos

// Nombrar versiones en el tipo para claridad
type EventSchemaVersion = 'v1' | 'v2' | 'v3';

interface VersionedEvent {
  schemaVersion: EventSchemaVersion;
  // ... resto del evento
}

class OrderCreatedV3 implements DomainEvent {
  eventType = 'OrderCreated' as const;
  schemaVersion: EventSchemaVersion = 'v3';
  // ...
}

// En deserialización, router por versión
function deserializeOrderCreated(raw: any): OrderCreated {
  switch (raw.schemaVersion) {
    case 'v1': return fromV1(raw);
    case 'v2': return fromV2(raw);
    case 'v3': return fromV3(raw);
    default:   return fromV3(raw);  // Default al último
  }
}

9. 🔐 Concurrency & Idempotency

9.1. Optimistic Concurrency

// ExpectedRevision asegura que nadie modificó el stream entre load y save
import { AppendExpectedRevision, WrongExpectedVersionError } from '@eventstore/db-client';

async function saveWithConcurrencyCheck(order: Order, expectedVersion: number): Promise<void> {
  try {
    await client.appendToStream(
      `order-${order.id}`,
      order.pullUncommittedEvents().map(toEventData),
      { expectedRevision: BigInt(expectedVersion) }
    );
  } catch (err) {
    if (err instanceof WrongExpectedVersionError) {
      // Otro proceso modificó el stream → recargar y reintentar
      throw new ConcurrencyConflictError(
        `Order ${order.id} was modified by another process`,
        err
      );
    }
    throw err;
  }
}

// Patrón completo de retry con backoff
async function handleWithRetry(command: Command, maxRetries = 3): Promise<void> {
  for (let attempt = 0; attempt &lt; maxRetries; attempt++) {
    try {
      const order = await repository.load(command.orderId);
      if (!order) throw new NotFoundError();

      order.execute(command);
      await repository.save(order);
      return;
    } catch (err) {
      if (err instanceof ConcurrencyConflictError && attempt &lt; maxRetries - 1) {
        // Backoff exponencial: 100ms, 200ms, 400ms
        await sleep(100 * Math.pow(2, attempt));
        continue;
      }
      throw err;
    }
  }
}

9.2. Idempotency en Commands

// Evitar que un command se ejecute dos veces ante retries de red
interface IdempotentCommand {
  idempotencyKey: string;
  // ... resto del comando
}

export class CommandHandler {
  async handle(command: IdempotentCommand): Promise<void> {
    // 1. Verificar si ya fue procesado
    const existing = await this.db.query(
      `SELECT processed_at FROM idempotency_keys WHERE key = $1`,
      [command.idempotencyKey]
    );

    if (existing.rows.length > 0) {
      // Ya procesado — retornar sin hacer nada
      return;
    }

    // 2. Ejecutar lógica de negocio
    await this.executeBusinessLogic(command);

    // 3. Registrar key idempotente (dentro de la misma TX si es posible)
    await this.db.query(
      `INSERT INTO idempotency_keys (key, processed_at) VALUES ($1, $2)
       ON CONFLICT (key) DO NOTHING`,
      [command.idempotencyKey, new Date()]
    );
  }
}

9.3. Idempotency en Proyecciones

// Cada proyección debe poder procesar el mismo evento múltiples veces
class OrderListProjection {
  async onOrderCreated(e: OrderCreated): Promise<void> {
    // UPSERT idempotente, no INSERT
    await this.db.query(
      `INSERT INTO read_orders (id, customer_id, status, total, created_at, version)
       VALUES ($1, $2, 'DRAFT', $3, $4, 1)
       ON CONFLICT (id) DO UPDATE SET
         customer_id = EXCLUDED.customer_id,
         status = EXCLUDED.status,
         total = EXCLUDED.total,
         version = read_orders.version + 1`,
      [e.orderId, e.customerId, e.total, e.occurredAt]
    );
  }
}

10. 🔗 Integration with CQRS

10.1. Flujo Completo

┌──────────┐    Command     ┌──────────────┐
│  Client  │ ─────────────▶ │  Command     │
└──────────┘                │  Handler     │
                            └──────┬───────┘


                            ┌──────────────┐
                            │  Aggregate   │ ─── Domain Events
                            └──────┬───────┘       │
                                   │               │
                                   ▼               ▼
                            ┌──────────────┐  ┌──────────────┐
                            │ Event Store  │  │  Projections │
                            │ (Write DB)   │  │ (Read Models)│
                            └──────────────┘  └──────┬───────┘

                            ┌──────────┐             │
                            │  Client  │◀────────────┘
                            └──────────┘    Query

10.2. Command Handler Publica Eventos

class ConfirmOrderHandler {
  constructor(
    private repo: EventSourcedOrderRepository,
    private eventBus: EventBus,
  ) {}

  async handle(command: ConfirmOrderCommand): Promise<void> {
    const order = await this.repo.load(command.orderId);
    if (!order) throw new NotFoundError();

    order.confirm(command.paymentId);

    // 1. Persistir en Event Store (atomic)
    await this.repo.save(order);

    // 2. Publicar eventos al bus para proyecciones externas
    for (const event of order.pullDomainEvents()) {
      await this.eventBus.publish(event);
    }
  }
}

10.3. Query Handler Lee Read Model

// El query NO toca el Event Store, solo el read model
class GetOrderByIdHandler {
  constructor(private readRepo: OrderReadRepository) {}

  async handle(query: GetOrderByIdQuery): Promise<OrderDto | null> {
    return this.readRepo.findById(query.orderId);
  }
}

// Consistencia eventual: el read model se actualiza de forma asíncrona
// desde las proyecciones que escuchan el Event Store.

11. 🏭 Production Patterns

11.1. Catch-Up Subscriptions (Replay)

// Para reconstruir una proyección desde cero o tras caída
async function catchUpSubscription(
  checkpointStore: CheckpointStore,
  projection: Projection,
  streamName: string,
): Promise<void> {
  // 1. Recuperar última posición procesada
  const lastPosition = await checkpointStore.get(streamName);
  const fromRevision = lastPosition ? lastPosition + BigInt(1) : BigInt(0);

  // 2. Consumir eventos desde esa posición
  for await (const { event } of client.readStream(streamName, { fromRevision })) {
    if (!event) continue;

    try {
      const domainEvent = deserializeEvent(event);
      await projection.handle(domainEvent);

      // 3. Actualizar checkpoint tras éxito
      await checkpointStore.save(streamName, BigInt(event.revision));
    } catch (err) {
      // Log pero no detener — DLQ para reproceso manual
      await dlq.push({ event, error: err });
    }
  }
}

11.2. Dead Letter Queue (DLQ)

// Eventos fallidos se envían a DLQ para retry manual
class DeadLetterQueue {
  constructor(private db: Pool) {}

  async push(entry: { event: any; error: Error; retries?: number }): Promise<void> {
    await this.db.query(
      `INSERT INTO dlq (event_type, event_data, error_message, stack, retry_count, created_at)
       VALUES ($1, $2, $3, $4, $5, $6)`,
      [
        entry.event.eventType,
        JSON.stringify(entry.event),
        entry.error.message,
        entry.error.stack,
        entry.retries || 0,
        new Date(),
      ]
    );
  }

  async retryAll(): Promise<void> {
    const { rows } = await this.db.query(
      `SELECT * FROM dlq WHERE retry_count &lt; 5 ORDER BY created_at`
    );

    for (const row of rows) {
      try {
        const event = deserializeFromRaw(row.event_data);
        await projection.handle(event);
        await this.db.query(`DELETE FROM dlq WHERE id = $1`, [row.id]);
      } catch (err) {
        await this.db.query(
          `UPDATE dlq SET retry_count = retry_count + 1, last_error = $2 WHERE id = $1`,
          [row.id, (err as Error).message]
        );
      }
    }
  }
}

11.3. Checkpoint Store

// Persiste la posición procesada por proyección
class PostgresCheckpointStore {
  constructor(private db: Pool) {}

  async get(projectionId: string): Promise<bigint | null> {
    const { rows } = await this.db.query(
      `SELECT position FROM checkpoints WHERE projection_id = $1`,
      [projectionId]
    );
    return rows[0]?.position ? BigInt(rows[0].position) : null;
  }

  async save(projectionId: string, position: bigint): Promise<void> {
    await this.db.query(
      `INSERT INTO checkpoints (projection_id, position, updated_at)
       VALUES ($1, $2, $3)
       ON CONFLICT (projection_id) DO UPDATE SET position = $2, updated_at = $3`,
      [projectionId, position.toString(), new Date()]
    );
  }
}

11.4. Observabilidad

// Métricas críticas en sistemas Event Sourced
class EventSourcingMetrics {
  // Tiempo de rehidratación por agregado
  recordRehydration(aggregateType: string, eventCount: number, durationMs: number) {
    metrics.histogram('es.rehydration_duration_ms', durationMs, { aggregateType });
    metrics.histogram('es.events_per_rehydration', eventCount, { aggregateType });
  }

  // Lag de proyecciones (diferencia entre último evento y último procesado)
  recordProjectionLag(projectionId: string, lagMs: number) {
    metrics.gauge('es.projection_lag_ms', lagMs, { projectionId });
  }

  // Tasa de conflictos de concurrencia
  recordConcurrencyConflict(aggregateType: string) {
    metrics.counter('es.concurrency_conflicts_total', { aggregateType });
  }

  // Tamaño de streams (alertar si crece sin snapshots)
  recordStreamSize(streamCategory: string, eventCount: number) {
    metrics.gauge('es.stream_events_count', eventCount, { streamCategory });
  }
}

// Alertas recomendadas
// - projection_lag_ms > 30000 por 5min → Warning
// - projection_lag_ms > 300000 por 5min → Critical
// - stream_events_count > 1000 sin snapshot → Warning
// - concurrency_conflicts_total rate > 100/min → Warning

12. ⚠️ Common Mistakes & Pitfalls

  • Eventos con semántica de comandos: CreateOrder como evento.
    • Fix: Los eventos son hechos en pasado: OrderCreated. Los commands son intenciones en imperativo.
  • Eventos con comportamiento o lógica: OrderCancelledEvent con método calculateRefund().
    • Fix: Los eventos son datos planos. La lógica va en agregados o handlers.
  • Modificar eventos históricos: Editar un evento del mes pasado porque “tenía un error”.
    • Fix: Publicar evento correctivo (OrderTotalCorrected). La historia es inmutable por definición.
  • Streams gigantes sin snapshots: Rehidratar 50,000 eventos en cada load.
    • Fix: Snapshots cada 100-500 eventos. Medir latencia de rehidratación.
  • Rehidratar el aggregate desde una query: getOrderById() reconstituye el aggregate completo.
    • Fix: Queries leen read models (proyecciones). El aggregate solo se rehidrata para comandos.
  • Olvidar expectedRevision: Append sin control de concurrencia permite sobrescribir cambios.
    • Fix: Siempre pasar expectedRevision: aggregate.version - events.length al guardar.
  • Eventos acoplados a la estructura interna del aggregate: OrderAggregateStateChangeEvent con todo el estado.
    • Fix: Eventos pequeños, enfocados en un cambio específico. OrderItemAdded, OrderTotalUpdated.
  • Ignorar el versionado de eventos: Agregar campos obligatorios rompe replay de eventos viejos.
    • Fix: Nuevos campos opcionales. Upcasters para versiones antiguas. Tests de replay en CI.
  • Proyecciones no idempotentes: INSERT falla en retry por duplicado.
    • Fix: ON CONFLICT DO NOTHING o UPSERTs. Cada handler debe ser safe-to-retry.
  • Depender del orden de eventos entre streams distintos: Asumir que el evento A de stream X siempre llega antes que B de stream Y.
    • Fix: Solo garantizar orden DENTRO de un stream. Entre streams, usar correlationId y aceptar eventual consistency.
  • No publicar eventos dentro de la misma transacción que el cambio: Estado persistido pero evento perdido → inconsistencia.
    • Fix: Outbox pattern — escribir evento en tabla outbox dentro de la TX, poller publica al bus.
  • Eventos que referencian otros aggregates directamente: OrderCreated con el Customer completo embebido.
    • Fix: Solo IDs y datos necesarios. OrderCreated(orderId, customerId, ...). Customer es otra historia.
  • Testing sin verificar rehidratación: Tests que solo validan el método, no el ciclo completo.
    • Fix: Tests que crean un aggregate, lo serializan a eventos, lo rehidratan, y verifican el estado final.
  • Usar Event Sourcing para todo: Un CRUD de 5 entidades con 100 usuarios.
    • Fix: ES agrega complejidad. Úsalo solo cuando la auditabilidad, replay o evolución justifiquen el costo.
  • Proyecciones lentas que bloquean el stream: Un handler tarda 5s por evento.
    • Fix: Procesamiento asíncrono por evento. Handlers livianos. Worker pools separados por proyección.
  • Snapshots inconsistentes con eventos: Snapshot tomado en versión 50 pero eventos desde 51 no incluyen un cambio clave.
    • Fix: Snapshots deben capturar estado completo. Validar con tests de rehidratación tras cada cambio de schema.

13. 💡 Best Practices & Pro Tips

  • Eventos pequeños y cohesivos: Un evento = un hecho atómico. Mejor 5 eventos pequeños que uno gigante con todo.
  • Include what happened, not how to handle it: El evento describe el hecho; el handler decide qué hacer.
  • Naming conventions estrictas: {Aggregate}{VerbPast} (e.g., OrderConfirmed, PaymentRefunded). Consistencia reduce errores.
  • Idempotencia everywhere: Commands, handlers, proyecciones, consumers de cola. Todo debe poder ejecutarse dos veces sin efectos duplicados.
  • Tests de rehidratación como smoke test: Cargar un aggregate desde eventos, validar estado. Detecta problemas de evolve temprano.
  • Upcasters versionados y testeables: Cada upcaster tiene sus propios tests unitarios con fixtures de eventos viejos.
  • Schema registry para eventos: Avro/Protobuf con backward compatibility forzada en CI. Previene breaking changes accidentales.
  • Correlation + causation IDs: Todo evento lleva correlationId (trace global) y causationId (evento que lo causó). Debug forense posible.
  • Outbox + CDC en producción: Garantiza exactly-once delivery a consumidores externos sin doble escritura.
  • Snapshots cada N eventos + cada T tiempo: Combinar frecuencia por conteo y por tiempo. Evita streams calientes muy grandes.
  • Métricas de lag = SLA de proyección: “La orden aparece en el read model en ≤ 2s (p99)”. Si no se cumple, alertas.
  • Nunca loguear datos sensibles en eventos: Eventos son auditables por naturaleza. PII → encriptar o tokenizar antes de persistir.
  • DLQ obligatoria: Proyecciones fallan. Redes fallan. Sin DLQ, un evento mal formado detiene todo el sistema.
  • Feature flags para evolucionar proyecciones: Nueva proyección corre en paralelo a la vieja. Tráfico 5% → 100%. Swap atómico.
  • Replay completo como ritual de release: Antes de cada deploy mayor, hacer replay de todas las proyecciones en staging. Detecta incompatibilidades temprano.
  • Evitar lógica condicional basada en versión del evento en el aggregate: El aggregate solo debe aplicar eventos; upcasters hacen la traducción.
  • Separar eventos de integración de eventos de dominio: Los de dominio son internos. Los de integración son contratos públicos con otros bounded contexts.
  • Event Store como sistema de record, no de reporting: Para reporting pesado, proyectar a data warehouse (BigQuery, ClickHouse).
  • Versionar el stream name, no solo el evento: Si el contrato cambia radicalmente, migrar a order-v2-{id}.
  • Measure rehydration time, set budgets: Si un aggregate tarda > 50ms en rehidratarse, optimizar (snapshot, reducir eventos, refactor).
  • Usar herramientas específicas antes que custom: EventStoreDB, Axon, Marten resuelven problemas que son caros de implementar a mano.
  • Documenta cada evento: JSDoc con qué significa, qué campos, qué invariantes respeta. Los eventos son contratos a largo plazo.
  • Prueba el camino del replay: for each event from beginning → rehydrate → validate final state. Asegura que el sistema puede reconstruirse en cualquier momento.

Este cheatsheet proporciona una referencia completa para Event Sourcing, cubriendo los conceptos fundamentales de eventos y streams, definición inmutable de eventos, operaciones básicas de lectura y escritura, rehidratación de aggregates, proyecciones a read models, optimización con snapshots, versionado de esquemas con upcasting, concurrencia optimista e idempotencia, integración con CQRS, patrones de producción como catch-up subscriptions y DLQ, junto con los errores comunes y mejores prácticas para construir sistemas auditables, trazables y altamente evolucionables donde el estado se deriva de una secuencia inmutable de hechos de dominio.

Descarga completada