🎯 CQRS (Command Query Responsibility Segregation) — Complete Cheatsheet 🎯
CQRS es un patrón de arquitectura que segrega las operaciones de lectura (queries) de las de escritura (commands) en modelos independientes, permitiendo optimizar cada lado por separado. Popularizado por Greg Young en 2010, no impone el uso de dos bases de datos ni de Event Sourcing, aunque combina naturalmente con ambos. Este cheatsheet cubre desde los fundamentos teóricos y la sintaxis básica de commands/queries hasta la integración con Event Sourcing, proyecciones asíncronas, consistencia eventual, optimización de lecturas, testing, despliegue en producción y patrones de escalabilidad. Ideal para arquitectos y desarrolladores backend que trabajan con dominios complejos, asimetrías de carga lectura/escritura o sistemas distribuidos donde el modelo clásico CRUD no escala.
1. 🌟 Conceptos Fundamentales
- Segregación Comando/Consulta: Principio de Bertrand Meyer adaptado a arquitectura. Una operación o modifica estado (command) o retorna datos (query), nunca ambos.
- Por qué importa: Elimina métodos como
getUserAndIncrementLoginCount()que mezclan lectura y escritura, causando bugs sutiles y dificultando el escalado.
- Por qué importa: Elimina métodos como
- Command: Objeto inmutable que representa una intención de cambio en el sistema. No retorna datos de negocio, solo confirmación (éxito/fallo). Se nombra en imperativo:
CreateOrder,CancelSubscription. - Query: Objeto inmutable que representa una solicitud de datos. No tiene side effects. Se nombra como pregunta o sustantivo:
GetUserById,ListActiveOrders. - Modelos Separados:
- No Implica Dos Bases de Datos: CQRS puede implementarse con una sola BD, separando solo los modelos. La separación física (write DB + read DB) es un caso avanzado.
- Consistencia Eventual: En implementaciones distribuidas, los read models pueden estar temporalmente desactualizados respecto al write model. Los usuarios deben aceptarlo.
- Escalado Independiente: Lecturas y escrituras pueden escalarse por separado (más réplicas de lectura, menos de escritura, diferentes tipos de BD).
- Ortogonal a Event Sourcing: CQRS puede usarse sin ES (almacenando estado actual en write DB). Pero juntos forman una combinación poderosa: eventos como fuente de verdad, proyecciones como read models.
2.
Setup y Estructura de Proyecto
2.1. Estructura Típica (TypeScript/Node.js)
src/
├── application/
│ ├── commands/ # Commands + handlers
│ │ ├── create-order/
│ │ │ ├── CreateOrderCommand.ts
│ │ │ ├── CreateOrderHandler.ts
│ │ │ └── index.ts
│ │ └── cancel-order/
│ ├── queries/ # Queries + handlers
│ │ ├── get-order-by-id/
│ │ │ ├── GetOrderByIdQuery.ts
│ │ │ ├── GetOrderByIdHandler.ts
│ │ │ └── index.ts
│ │ └── list-customer-orders/
│ └── dtos/ # Read models / proyecciones
│ ├── OrderDto.ts
│ └── OrderSummaryDto.ts
├── domain/ # Agregados, value objects, eventos
│ ├── order/
│ │ ├── Order.ts # Write model (agregado)
│ │ ├── OrderItem.ts
│ │ └── OrderCreatedEvent.ts
│ └── customer/
├── infrastructure/
│ ├── repositories/ # Write repositories
│ ├── read-models/ # Read repositories (SQL, ES, Redis)
│ ├── event-bus/ # Publicación/suscripción
│ └── projections/ # Handlers de eventos → read models
└── presentation/
├── controllers/ # HTTP endpoints
└── subscribers/ # Event listeners
2.2. Dependencias Comunes
# Node.js / TypeScript
npm install @nestjs/cqrs # NestJS: implementación popular
npm install mediatr # Inspirado en MediatR de .NET
npm install @eventstore/db-client # Event Store DB
npm install bullmq # Colas para proyecciones async
# .NET
dotnet add package MediatR # Estándar de facto
dotnet add package MassTransit # Bus de mensajes
dotnet add package Wolverine # Alternativa moderna
# Java
# implementation("org.axonframework:axon-spring-boot-starter:4.9.0")
# Python
pip install py-cqrs # Ligero
pip install eventsourcing # CQRS + Event Sourcing
3. 📝 Commands, Queries y Handlers
3.1. Definición de un Command
// application/commands/create-order/CreateOrderCommand.ts
export class CreateOrderCommand {
constructor(
public readonly customerId: string,
public readonly items: Array<{ productId: string; quantity: number }>,
public readonly shippingAddress: Address,
) {}
}
// Inmutable, con todos los datos necesarios para ejecutar la acción.
// No contiene lógica. Es un "DTO con intención".
3.2. Handler del Command
// application/commands/create-order/CreateOrderHandler.ts
import { OrderRepository } from '../../../infrastructure/repositories/OrderRepository';
import { EventBus } from '../../../infrastructure/event-bus/EventBus';
export class CreateOrderHandler {
constructor(
private orderRepo: OrderRepository,
private eventBus: EventBus,
) {}
async handle(command: CreateOrderCommand): Promise<string> {
// 1. Cargar o crear agregado (write model)
const order = Order.create(command.customerId, command.items);
// ↑ Aquí ocurren las validaciones de dominio: stock, precio, etc.
// 2. Persistir (write model)
await this.orderRepo.save(order);
// 3. Publicar eventos de dominio
for (const event of order.pullDomainEvents()) {
await this.eventBus.publish(event);
}
// 4. Retornar identificador (no datos completos)
return order.id;
}
}
3.3. Definición de una Query
// application/queries/get-order-by-id/GetOrderByIdQuery.ts
export class GetOrderByIdQuery {
constructor(public readonly orderId: string) {}
}
3.4. Handler de la Query (usa Read Model)
// application/queries/get-order-by-id/GetOrderByIdHandler.ts
import { OrderReadRepository } from '../../../infrastructure/read-models/OrderReadRepository';
import { OrderDto } from '../../dtos/OrderDto';
export class GetOrderByIdHandler {
constructor(private orderReadRepo: OrderReadRepository) {}
async handle(query: GetOrderByIdQuery): Promise<OrderDto | null> {
// Lee directamente del read model (denormalizado, optimizado)
// Sin cargar agregado, sin validaciones de dominio
return this.orderReadRepo.findById(query.orderId);
}
}
3.5. Dispatcher / Mediator (patrón mediador)
// application/mediator/Mediator.ts
type Handler<T, R> = { handle(request: T): Promise<R> };
export class Mediator {
private commandHandlers = new Map<Function, Handler<any, any>>();
private queryHandlers = new Map<Function, Handler<any, any>>();
registerCommand<T, R>(commandClass: new (...args: any[]) => T, handler: Handler<T, R>) {
this.commandHandlers.set(commandClass, handler);
}
registerQuery<T, R>(queryClass: new (...args: any[]) => T, handler: Handler<T, R>) {
this.queryHandlers.set(queryClass, handler);
}
async send<T, R>(request: T): Promise<R> {
const handler = this.commandHandlers.get(request.constructor);
if (!handler) throw new Error(`No handler for command ${request.constructor.name}`);
return handler.handle(request);
}
async ask<T, R>(query: T): Promise<R> {
const handler = this.queryHandlers.get(query.constructor);
if (!handler) throw new Error(`No handler for query ${query.constructor.name}`);
return handler.handle(query);
}
}
3.6. Uso desde un Controller
// presentation/controllers/OrderController.ts
app.post('/orders', async (req, res) => {
const command = new CreateOrderCommand(
req.body.customerId,
req.body.items,
req.body.shippingAddress,
);
const orderId = await mediator.send(command);
res.status(201).json({ orderId });
});
app.get('/orders/:id', async (req, res) => {
const query = new GetOrderByIdQuery(req.params.id);
const order = await mediator.ask(query);
if (!order) return res.status(404).send();
res.json(order);
});
4. 🗂️ Write Models vs Read Models
4.1. Write Model (Agregado DDD)
// domain/order/Order.ts
export class Order {
private domainEvents: DomainEvent[] = [];
private constructor(
public readonly id: string,
public readonly customerId: string,
private items: OrderItem[],
private status: OrderStatus,
private createdAt: Date,
) {}
static create(customerId: string, items: OrderItemDto[]): Order {
if (items.length === 0) throw new Error('Order must have at least one item');
const order = new Order(
generateUuid(),
customerId,
items.map(i => OrderItem.create(i.productId, i.quantity)),
'PENDING',
new Date(),
);
order.domainEvents.push(new OrderCreatedEvent(order.id, customerId));
return order;
}
confirm(paymentId: string): void {
if (this.status !== 'PENDING') {
throw new Error(`Cannot confirm order in status ${this.status}`);
}
this.status = 'CONFIRMED';
this.domainEvents.push(new OrderConfirmedEvent(this.id, paymentId));
}
cancel(reason: string): void {
if (this.status === 'SHIPPED') throw new Error('Cannot cancel shipped order');
this.status = 'CANCELLED';
this.domainEvents.push(new OrderCancelledEvent(this.id, reason));
}
pullDomainEvents(): DomainEvent[] {
const events = [...this.domainEvents];
this.domainEvents = [];
return events;
}
}
4.2. Read Model (Proyección plana)
// application/dtos/OrderDto.ts
export interface OrderDto {
id: string;
customerId: string;
customerEmail: string; // ← Denormalizado (JOIN evitado)
status: string;
total: number; // ← Calculado y cacheado
itemCount: number; // ← Calculado
items: Array<{
productId: string;
productName: string; // ← Denormalizado
quantity: number;
unitPrice: number;
}>;
createdAt: string;
updatedAt: string;
}
// Optimizado para el caso de uso: "ver detalle de pedido en la UI".
// No tiene métodos, no tiene invariantes, no se comporta.
// Solo datos listos para serializar.
4.3. Comparativa Clave
| Aspecto | Write Model | Read Model |
|---|---|---|
| Propósito | Ejecutar lógica de negocio | Servir datos a UI/API |
| Estructura | Rica, con comportamiento | Plana, denormalizada |
| Estado | Transaccional, consistente | Eventual, optimizado |
| Origen | Agregados, entidades | Proyecciones, vistas materializadas |
| Validación | Fuerte (invariantes) | Mínima (solo formatos) |
| Ejemplos | Order, User, Account | OrderDto, DashboardView, SearchResult |
5. 🔄 CQRS + Event Sourcing (Combinación Natural)
5.1. Flujo Completo
[Command] → [Handler] → [Aggregate] → [Domain Events] → [Event Store]
↓
[Projection Handlers]
↓
[Read Model DB]
↑
[Query] ← [Handler] ← [Read Repository] ←─────────────────────┘
5.2. Aggregate que Persiste Eventos
// domain/order/Order.ts (con Event Sourcing)
export class Order {
private uncommittedEvents: DomainEvent[] = [];
private version = 0;
// Recreate from history
static rehydrate(events: DomainEvent[]): Order {
const order = new Order();
for (const event of events) {
order.apply(event);
}
return order;
}
static create(customerId: string, items: OrderItemDto[]): Order {
const order = new Order();
order.raise(new OrderCreatedEvent(generateUuid(), customerId, items));
return order;
}
confirm(paymentId: string): void {
if (this.status !== 'PENDING') throw new Error('Invalid status');
this.raise(new OrderConfirmedEvent(this.id, paymentId));
}
private raise(event: DomainEvent): void {
this.apply(event); // Aplica el cambio de estado
this.uncommittedEvents.push(event); // Lo encola para persistir
}
private apply(event: DomainEvent): void {
switch (event.type) {
case 'OrderCreated':
this.id = event.orderId;
this.customerId = event.customerId;
this.status = 'PENDING';
break;
case 'OrderConfirmed':
this.status = 'CONFIRMED';
this.paymentId = event.paymentId;
break;
}
this.version++;
}
}
5.3. Event Store Repository
// infrastructure/repositories/EventSourcedOrderRepository.ts
export class EventSourcedOrderRepository implements OrderRepository {
constructor(private eventStore: EventStore) {}
async save(order: Order): Promise<void> {
const events = order.pullUncommittedEvents();
if (events.length === 0) return;
// Atomic append + optimistic concurrency via version
await this.eventStore.appendToStream(`order-${order.id}`, events, order.version);
}
async findById(id: string): Promise<Order | null> {
const events = await this.eventStore.readStream(`order-${id}`);
if (events.length === 0) return null;
return Order.rehydrate(events);
}
}
5.4. Proyección a Read Model
// infrastructure/projections/OrderReadProjection.ts
export class OrderReadProjection {
constructor(private readDb: OrderReadRepository) {}
@Subscribe('OrderCreated')
async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.readDb.insert({
id: event.orderId,
customerId: event.customerId,
customerEmail: await this.lookupEmail(event.customerId),
status: 'PENDING',
items: event.items,
total: calculateTotal(event.items),
itemCount: event.items.length,
createdAt: event.occurredAt,
updatedAt: event.occurredAt,
});
}
@Subscribe('OrderConfirmed')
async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
await this.readDb.updateStatus(event.orderId, 'CONFIRMED', event.occurredAt);
}
@Subscribe('OrderCancelled')
async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
await this.readDb.updateStatus(event.orderId, 'CANCELLED', event.occurredAt);
}
}
6. 📦 Buses de Mensajes y Comunicación
6.1. In-Memory (Simple CQRS)
// MediatR en .NET, mediator-ts en TypeScript
// Sincrónico, ideal para monolitos modulares
// NestJS + @nestjs/cqrs
@Injectable()
export class OrderController {
constructor(private commandBus: CommandBus, private queryBus: QueryBus) {}
@Post()
async create(@Body() dto: CreateOrderDto) {
return this.commandBus.execute(new CreateOrderCommand(dto));
}
@Get(':id')
async getById(@Param('id') id: string) {
return this.queryBus.execute(new GetOrderByIdQuery(id));
}
}
6.2. Distributed (Microservicios)
// RabbitMQ / Kafka / Azure Service Bus
// Asincrónico, con colas de comandos y topics de eventos
// Publicar command a cola
await messageBus.sendCommand('orders.create', {
customerId,
items,
}, {
idempotencyKey: requestId, // ¡Importante!
replyTo: 'orders.responses',
});
// Consumir command (worker)
messageBus.onCommand('orders.create', async (msg) => {
await handler.handle(new CreateOrderCommand(msg.data));
await messageBus.ack(msg);
});
6.3. Outbox Pattern (Consistencia Garantizada)
// Escribir comando + evento en la misma transacción de BD
await db.transaction(async (tx) => {
// 1. Persistir estado (write model)
await tx.orders.save(order);
// 2. Escribir evento en tabla outbox
await tx.outbox.insert({
eventType: 'OrderCreated',
payload: JSON.stringify(event),
aggregateId: order.id,
status: 'PENDING',
});
});
// Worker separado publica el outbox al broker
// Garantiza: o ambos se guardan, o ninguno
7.
Optimización de Queries
7.1. Proyecciones Especializadas por Caso de Uso
// Cada caso de uso tiene su propio read model, no uno genérico
// Para listado: mínimo
interface OrderListItemDto {
id: string;
createdAt: string;
status: string;
total: number;
}
// Para detalle: completo
interface OrderDetailDto {
id: string;
customerId: string;
customerEmail: string;
items: Array<{...}>;
payments: Array<{...}>;
shipments: Array<{...}>;
timeline: Array<{...}>;
}
// Para dashboard: agregado
interface OrderDashboardDto {
todayTotal: number;
todayCount: number;
byStatus: Record<string, number>;
topProducts: Array<{name: string; count: number}>;
}
7.2. Bases de Datos Especializadas para Lectura
// Write: PostgreSQL (transaccional)
// Read: diferentes opciones según caso de uso
class OrderReadRepository {
// Búsqueda full-text → Elasticsearch
async search(criteria: SearchCriteria): Promise<OrderListItemDto[]> {
return this.elastic.search({ index: 'orders', body: criteria.toEsQuery() });
}
// Conteos en tiempo real → Redis
async getOrderCountByStatus(): Promise<Record<string, number>> {
const statuses = ['PENDING', 'CONFIRMED', 'SHIPPED', 'DELIVERED'];
const results = await Promise.all(
statuses.map(s => this.redis.get(`orders:count:${s}`))
);
return Object.fromEntries(statuses.map((s, i) => [s, Number(results[i]) || 0]));
}
// Consultas relacionales complejas → PostgreSQL replica
async getMonthlyReport(year: number, month: number): Promise<ReportDto> {
return this.pgReplica.query(/* SQL */);
}
// Time-series → TimescaleDB / InfluxDB
async getOrderRatePerMinute(lastMinutes: number): Promise<number[]> {
return this.timescale.query(/* SQL */);
}
}
7.3. Vistas Materializadas
-- PostgreSQL: vista materializada para reportes
CREATE MATERIALIZED VIEW order_daily_summary AS
SELECT
DATE(created_at) AS order_date,
status,
COUNT(*) AS order_count,
SUM(total) AS revenue,
AVG(total) AS avg_ticket
FROM orders
GROUP BY DATE(created_at), status;
CREATE UNIQUE INDEX idx_order_daily_summary
ON order_daily_summary (order_date, status);
-- Refresh periódico (cada 5 min) o por trigger
REFRESH MATERIALIZED VIEW CONCURRENTLY order_daily_summary;
7.4. CQRS con Caché
class CachedGetOrderByIdHandler {
constructor(
private inner: GetOrderByIdHandler,
private cache: RedisCache,
) {}
async handle(query: GetOrderByIdQuery): Promise<OrderDto | null> {
const cacheKey = `order:${query.orderId}`;
// 1. Intentar caché
const cached = await this.cache.get<OrderDto>(cacheKey);
if (cached) return cached;
// 2. Fallback a read model
const order = await this.inner.handle(query);
if (order) {
// 3. Cachear con TTL
await this.cache.set(cacheKey, order, { ttl: 60 });
}
return order;
}
}
// Invalidación cuando el write model cambia
@Subscribe('OrderConfirmed')
async onOrderConfirmed(event: OrderConfirmedEvent) {
await this.cache.del(`order:${event.orderId}`);
}
8. 🎯 Consistencia Eventual y Sincronización
8.1. Entendiendo el Lag
t0: Usuario crea orden (write model actualizado)
t1: Evento "OrderCreated" publicado al broker
t2: Proyector recibe el evento
t3: Read model actualizado
t4: Usuario hace GET /orders/:id → ve la orden
Gap = t4 - t0 = latencia de propagación (típicamente 10ms-5s)
8.2. Estrategias de Manejo
A. Read-through from Write Model (lectura crítica)
// Para casos donde la consistencia inmediata es obligatoria
class GetOrderByIdHandler {
async handle(query: GetOrderByIdQuery): Promise<OrderDto> {
// 1. Intentar read model
const fromRead = await this.readRepo.findById(query.orderId);
if (fromRead) return fromRead;
// 2. Fallback al write model (más lento pero consistente)
const aggregate = await this.writeRepo.findById(query.orderId);
if (!aggregate) throw new NotFoundException();
return OrderMapper.toDto(aggregate);
}
}
B. WaitForProjection (espera a proyección)
// Tras ejecutar command, esperar a que la proyección alcance la versión
async function createOrderAndWait(command: CreateOrderCommand): Promise<OrderDto> {
const orderId = await mediator.send(command);
// Poll hasta que el read model tenga la orden
const deadline = Date.now() + 5000;
while (Date.now() < deadline) {
const order = await readRepo.findById(orderId);
if (order) return order;
await sleep(50);
}
throw new ProjectionTimeoutError();
}
C. Return 202 Accepted
// API asíncrona: retornar aceptación inmediata
app.post('/orders', async (req, res) => {
const commandId = await mediator.send(new CreateOrderCommand(req.body));
res.status(202).json({
commandId,
statusUrl: `/commands/${commandId}/status`,
});
});
// Cliente pollea status o usa WebSocket para notificación
8.3. Monitoreo del Lag
// Métrica crítica: tiempo entre evento escrito y proyección aplicada
class LagMonitor {
@Subscribe('*')
async onAnyEvent(event: DomainEvent): Promise<void> {
const lag = Date.now() - event.occurredAt.getTime();
metrics.histogram('cqrs.projection_lag_ms', lag, {
eventType: event.type,
projector: this.name,
});
if (lag > 30000) {
alerting.warn(`Projection lag > 30s for ${event.type}`);
}
}
}
9. 🧪 Testing en CQRS
9.1. Unit Test de Handlers
describe('CreateOrderHandler', () => {
let handler: CreateOrderHandler;
let mockRepo: jest.Mocked<OrderRepository>;
let mockBus: jest.Mocked<EventBus>;
beforeEach(() => {
mockRepo = { save: jest.fn(), findById: jest.fn() } as any;
mockBus = { publish: jest.fn() } as any;
handler = new CreateOrderHandler(mockRepo, mockBus);
});
it('should persist order and publish event', async () => {
const command = new CreateOrderCommand('cust-1', [
{ productId: 'p-1', quantity: 2 },
], validAddress);
const orderId = await handler.handle(command);
expect(orderId).toMatch(/^ord-/);
expect(mockRepo.save).toHaveBeenCalledTimes(1);
expect(mockBus.publish).toHaveBeenCalledWith(
expect.objectContaining({ type: 'OrderCreated' })
);
});
it('should reject empty items', async () => {
const command = new CreateOrderCommand('cust-1', [], validAddress);
await expect(handler.handle(command)).rejects.toThrow('at least one item');
});
});
9.2. Test de Proyecciones
describe('OrderReadProjection', () => {
let projection: OrderReadProjection;
let mockReadDb: jest.Mocked<OrderReadRepository>;
beforeEach(() => {
mockReadDb = {
insert: jest.fn(),
updateStatus: jest.fn(),
} as any;
projection = new OrderReadProjection(mockReadDb);
});
it('should insert read model on OrderCreated', async () => {
const event: OrderCreatedEvent = {
orderId: 'ord-1',
customerId: 'cust-1',
items: [{ productId: 'p-1', quantity: 2, unitPrice: 10 }],
occurredAt: new Date('2026-06-01'),
};
await projection.onOrderCreated(event);
expect(mockReadDb.insert).toHaveBeenCalledWith(
expect.objectContaining({
id: 'ord-1',
status: 'PENDING',
total: 20,
itemCount: 1,
})
);
});
});
9.3. Test de Integración (End-to-End)
describe('CQRS Integration', () => {
it('should project order to read model after creation', async () => {
// 1. Ejecutar command
const orderId = await mediator.send(new CreateOrderCommand(
'cust-1',
[{ productId: 'p-1', quantity: 1 }],
validAddress,
));
// 2. Esperar proyección
await waitForProjection(() => readRepo.findById(orderId), 5000);
// 3. Ejecutar query
const dto = await mediator.ask(new GetOrderByIdQuery(orderId));
expect(dto).not.toBeNull();
expect(dto!.status).toBe('PENDING');
expect(dto!.itemCount).toBe(1);
});
});
async function waitForProjection<T>(fn: () => Promise<T>, timeoutMs: number): Promise<T> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const result = await fn();
if (result) return result;
await sleep(50);
}
throw new Error('Projection timeout');
}
10. 🏗️ Integración con DDD y Bounded Contexts
10.1. Cómo encajan los Patrones
┌─────────────────────────────────────────────┐
│ Bounded Context (Orders) │
│ │
│ ┌──────────────┐ ┌────────────────┐ │
│ │ Commands │ │ Queries │ │
│ │ (Application│ │ (Application │ │
│ │ Layer) │ │ Layer) │ │
│ └──────┬───────┘ └───────┬────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌────────────────┐ │
│ │ Write Model │ │ Read Model │ │
│ │ (Aggregates) │ │ (Projections) │ │
│ └──────┬───────┘ └───────▲────────┘ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────────────────────┴────────┐ │
│ │ Domain Events │ │
│ └───────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
10.2. Comunicación entre Contextos vía Eventos
// Orders context publica eventos
await eventBus.publish(new OrderShippedEvent(orderId, trackingNumber));
// Shipping context se suscribe
class ShippingProjection {
@Subscribe('OrderShipped')
async onOrderShipped(event: OrderShippedEvent) {
await this.shipmentRepo.create({
orderId: event.orderId,
trackingNumber: event.trackingNumber,
status: 'IN_TRANSIT',
});
}
}
// Invoicing context también reacciona
class InvoiceProjection {
@Subscribe('OrderShipped')
async onOrderShipped(event: OrderShippedEvent) {
await this.invoiceService.finalizeForOrder(event.orderId);
}
}
10.3. Anti-Corruption Layer en Queries
// Cuando el contexto externo no habla nuestro lenguaje
class ExternalCustomerAdapter {
async getCustomerForOrder(customerId: string): Promise<CustomerContext> {
const raw = await externalCrmApi.getCustomer(customerId);
// Traduce al modelo de nuestro contexto
return {
id: raw.id,
fullName: `${raw.first_name} ${raw.last_name}`,
email: raw.contact_email,
segment: this.mapSegment(raw.tier),
};
}
}
11.
Patrones de Producción
11.1. Escalado Horizontal de Proyectores
// Kafka: particionar eventos por aggregateId para orden dentro de la misma entidad
const projectorConsumer = new KafkaConsumer({
groupId: 'order-projection-v1',
topic: 'order-events',
partitionAssignmentStrategy: 'range',
});
// Cada partición → un consumidor → orden garantizado para un agregado
// Diferentes proyectores pueden consumir la misma partición en paralelo
11.2. Rebuild de Read Models desde Cero
// Cuando el read model se corrompe o cambia el schema
async function rebuildReadModel(): Promise<void> {
// 1. Crear nueva tabla/índice con versión
await readDb.createCollection('orders_v2');
// 2. Replay todos los eventos desde el inicio
const projectorV2 = new OrderProjectionV2(readDb);
for await (const event of eventStore.readAllFromBeginning()) {
await projectorV2.handle(event);
}
// 3. Swap atómico
await readDb.swapCollections('orders_v1', 'orders_v2');
// 4. Continuar consumiendo eventos nuevos (catch-up)
}
11.3. Versionado de Read Models
// Los read models evolucionan independientemente del write model
class OrderDtoV1 {
constructor(public id: string, public total: number) {}
}
class OrderDtoV2 {
constructor(
public id: string,
public total: number,
public currency: string, // ← nuevo campo
public subtotal: number, // ← nuevo campo
) {}
}
// Proyector decide qué versión generar según consumidor
@Subscribe('OrderCreated')
async onOrderCreated(event: OrderCreatedEvent) {
await readRepo.insertV2({
id: event.orderId,
total: event.total,
currency: event.currency,
subtotal: event.subtotal,
});
}
11.4. Observabilidad
// Métricas recomendadas
metrics.histogram('cqrs.command_duration_ms', duration, { commandType });
metrics.histogram('cqrs.query_duration_ms', duration, { queryType });
metrics.counter('cqrs.command_failures_total', { commandType, errorType });
metrics.gauge('cqrs.projection_lag_ms', lag, { projector });
metrics.counter('cqrs.events_projected_total', { eventType, projector });
// Tracing con OpenTelemetry
const span = tracer.startSpan(`command.${command.constructor.name}`);
span.setAttribute('cqrs.aggregate_id', aggregateId);
span.setAttribute('cqrs.command_type', command.constructor.name);
try {
const result = await handler.handle(command);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.setStatus({ code: SpanStatusCode.ERROR });
span.recordException(error);
throw error;
} finally {
span.end();
}
11.5. Multi-Tenancy
// Cada tenant tiene sus propios read models (isolación lógica o física)
class TenantAwareOrderReadRepository {
async findById(tenantId: string, orderId: string): Promise<OrderDto | null> {
// Opción 1: Filtro en query (mismo índice/tabla)
return this.db.orders.findOne({ tenantId, id: orderId });
// Opción 2: Colección por tenant (mejor aislamiento)
return this.db.collection(`orders_${tenantId}`).findById(orderId);
// Opción 3: Esquema por tenant (Postgres schemas)
return this.db.setSchema(tenantId).orders.findById(orderId);
}
}
12. ⚠️ Errores Comunes y Trampas
- Aplicar CQRS a un CRUD simple: Sistema con 5 entidades, lecturas = escrituras, 100 usuarios.
- Fix: Empieza con transacciones clásicas. Introduce CQRS solo cuando haya asimetría o complejidad real. El 80% de las apps no necesitan CQRS.
- Confundir CQRS con dos bases de datos obligatorias:
- Fix: “Simple CQRS” usa una sola BD con modelos separados. La separación física es un caso avanzado, no un requisito.
- Read models con lógica de negocio:
if (order.total > 1000) applyDiscount(dto).- Fix: Read models son pasivos. La lógica va en el write model o en comandos.
- Ignorar la consistencia eventual: Usuarios se quejan de que “la orden que acabo de crear no aparece”.
- Fix: Documentar el lag esperado. Ofrecer “read-through from write model” para casos críticos. Retornar 202 + polling para operaciones largas.
- Commands que retornan datos de negocio:
CreateOrderHandlerretorna elOrderDtocompleto.- Fix: Retorna solo el ID. El cliente hace una query separada si necesita datos.
- Queries que modifican estado:
GetUserByIdAndIncrementLoginCount.- Fix: Separa en
GetUserByIdQuery+IncrementLoginCountCommand.
- Fix: Separa en
- Proyecciones síncronas dentro del handler del command:
- Fix: Publicar evento y dejar que un consumidor asíncrono actualice el read model. Permite escalar y aislar fallos.
- Read models sin invalidación tras cambios en el write model:
- Fix: Cada cambio en el write model publica un evento. Cada proyector escucha y actualiza. Cache invalidada por eventos, no por TTL ciego.
- Idempotencia olvidada en commands: Retry de red ejecuta
CreateOrderdos veces → dos órdenes.- Fix: Cada command lleva un
idempotencyKey. Handler verifica en BD antes de ejecutar.
- Fix: Cada command lleva un
- Eventos con estructura del write model acoplada:
OrderCreatedEventexpone campos internos del agregado.- Fix: Eventos son contratos públicos. Refactor del agregado no debe romperlos. Versionar eventos (
OrderCreatedV1,OrderCreatedV2).
- Fix: Eventos son contratos públicos. Refactor del agregado no debe romperlos. Versionar eventos (
- Over-engineering desde el día 1: CQRS + Event Sourcing + Kafka + 5 BDs para una app de 100 usuarios.
- Fix: Evoluciona incrementalmente. Empieza con separación de modelos. Añade Event Sourcing si hace falta. Separa BDs solo cuando el escalado lo exija.
- Proyectores sin DLQ (Dead Letter Queue): Un evento mal formado rompe la proyección y detiene todo.
- Fix: Eventos fallidos van a DLQ. Alerta al equipo. Replay manual tras fix.
- Olvidar el versionado de esquemas de eventos:
- Fix: Schema Registry (Avro, Protobuf) o upcasting en el event store para migrar versiones antiguas.
- Comandos con demasiados parámetros:
CreateOrderCommand(customerId, item1, item2, ..., item20, addr, card, ...).- Fix: Agrupar en value objects:
items: OrderItemsList,shipping: ShippingInfo,payment: PaymentInfo.
- Fix: Agrupar en value objects:
13.
Mejores Prácticas y Consejos de Experto
- Empieza con “Simple CQRS”: Misma BD, solo separa modelos y handlers. Evita distribuir lo que puedes mantener monolítico.
- Los read models son descartables: Puedes reconstruirlos desde cero en cualquier momento. Si no puedes, algo estás haciendo mal.
- Nombra commands en imperativo, queries en forma de pregunta:
CreateOrder,CancelSubscription,GetUserById,ListActiveOrders. Refleja la intención. - Handlers delgados: El handler coordina, no decide. Lógica de negocio vive en agregados/servicios de dominio.
- Un command = un agregado modificado: Si un command toca dos agregados, probablemente sea dos commands con eventos de dominio intermedios.
- Valida en el borde, confía en el interior: DTOs validados con Zod/Joi en el controller. Commands ya son válidos. Agregados aplican invariantes.
- Usa MediatR / mediator-ts: Patrón mediator elimina acoplamiento entre controllers y handlers. Testeable y extensible.
- Eventos de dominio como contratos públicos: Una vez publicados, no se rompen. Añadir campos opcionales sí; renombrar o eliminar, no.
- Outbox pattern siempre: Nunca publiques eventos tras commit exitoso. Escribe en tabla outbox dentro de la transacción.
- Métricas de lag son críticas: Define SLOs para proyecciones (p99 < 2s). Alerta cuando se violen.
- Caching estratégico, no indiscriminado: Cachea queries frecuentes y costosas. Invalide por eventos, no por TTL corto.
- Testea proyecciones como unidades puras: Input: evento. Output: cambio en read model. Sin BD real, sin broker.
- Replay desde evento cero debe ser posible: Diseña proyectores idempotentes. Cada evento debe poder reprocesarse sin efectos secundarios duplicados.
- No expongas el write model directamente a la API: Siempre pasa por commands. Evita que clientes muten estado arbitrariamente.
- Separa la base de datos de lectura cuando escale: Si los reads son 100x las writes, replica PostgreSQL, o migra reads a Elasticsearch/Redis.
- Dead Letter Queue obligatoria: Proyectores fallan. Redes fallan. Eventos malformados aparecen. DLQ + replay manual salva production.
- Documenta el lag esperado: “La orden aparece en el listado en ≤ 2s”. Los stakeholders aceptan eventual consistency si está documentada.
- Feature flags para evolucionar read models: Nuevo read model en paralelo al viejo. Tráfico 5% → 50% → 100%. Cero downtime en migración.
- Idempotencia a todos los niveles: Commands idempotentes, handlers idempotentes, proyectores idempotentes, consumidores de cola idempotentes.
- Distributed tracing con
sagaId/correlationId: Traza un command desde HTTP hasta la proyección final. Indispensable para debuggear lag o errores en producción. - Evita la tentación de CQRS “por moda”: Si tu equipo no entiende por qué lo necesita, no lo necesita. Empieza simple, evoluciona con datos reales de rendimiento.
Este cheatsheet proporciona una referencia exhaustiva para CQRS, cubriendo los fundamentos de segregación comando/consulta, implementación práctica con handlers y mediadores, separación de modelos de escritura y lectura, integración con Event Sourcing, buses de mensajes y outbox pattern, optimización de queries con proyecciones y caché, manejo de consistencia eventual, testing unitario e integración, patterns de producción con escalado y observabilidad, junto con los errores comunes y mejores prácticas para implementar este patrón de forma pragmática en sistemas reales, evitando el over-engineering y aprovechando sus beneficios solo cuando el dominio y la escala realmente lo justifican.