Como implementar event sourcing em sistemas distribuídos
1. Fundamentos do Event Sourcing em Sistemas Distribuídos
1.1. Conceitos essenciais: eventos imutáveis, armazenamento de eventos e reconstrução de estado
Event sourcing é um padrão arquitetural onde o estado atual de um sistema é derivado de uma sequência imutável de eventos passados. Em sistemas distribuídos, isso significa que cada alteração de estado é capturada como um evento que nunca é modificado ou excluído. O armazenamento de eventos funciona como um log de auditoria completo, permitindo reconstruir o estado de qualquer agregado em qualquer ponto do tempo.
// Estrutura básica de um evento
{
"eventId": "evt_8a7b6c5d-1234-5678-9abc-def012345678",
"eventType": "OrderPlaced",
"timestamp": "2025-03-15T14:30:00.000Z",
"aggregateId": "order_987654",
"version": 3,
"payload": {
"orderId": "order_987654",
"customerId": "cust_123456",
"items": ["item_001", "item_002"],
"totalAmount": 250.00
},
"metadata": {
"sourceService": "order-service",
"correlationId": "corr_abc123",
"causationId": "cmd_xyz789"
}
}
1.2. Diferenças entre event sourcing e registro de logs tradicionais em ambientes distribuídos
Enquanto logs tradicionais são descartáveis e focam em depuração, o event sourcing trata eventos como fonte única da verdade. Em sistemas distribuídos, essa distinção é crítica: logs tradicionais geralmente são armazenados localmente e perdidos em falhas de nó, enquanto eventos em event sourcing são replicados e persistidos de forma durável.
// Comparação de abordagens
Log tradicional:
[2025-03-15] INFO: Order 987654 updated to status "shipped"
(informação volátil, sem estrutura padronizada)
Event sourcing:
EventStore.OrderStream:
Event #1: OrderCreated (version 1)
Event #2: PaymentProcessed (version 2)
Event #3: OrderShipped (version 3)
(eventos imutáveis, estruturados e replicados)
1.3. Benefícios específicos para sistemas distribuídos: rastreabilidade, auditoria e consistência eventual
Em ambientes distribuídos, o event sourcing oferece rastreabilidade completa de todas as operações, suporte a auditoria regulatória e a capacidade de alcançar consistência eventual sem bloqueios distribuídos complexos. Cada microsserviço pode processar eventos no seu próprio ritmo, mantendo independência.
2. Projetando o Armazenamento de Eventos Distribuído
2.1. Escolha do banco de dados de eventos: Apache Kafka, EventStoreDB ou bancos relacionais com append-only
A escolha do armazenamento depende dos requisitos de escala e consistência:
- Apache Kafka: Ideal para alta throughput e streaming em tempo real, com particionamento nativo.
- EventStoreDB: Especializado em event sourcing, com suporte a projeções e subscriptions.
- Bancos relacionais com append-only: Adequado para sistemas menores que já usam PostgreSQL ou MySQL, implementando tabelas de eventos imutáveis.
// Configuração de tópico Kafka para eventos
kafka-topics.sh --create \
--topic order-events \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.compaction.lag.ms=86400000
2.2. Estrutura do evento: identificador único, tipo, payload, metadados de origem e timestamp distribuído
Cada evento deve conter campos essenciais para operação distribuída:
// Estrutura completa de evento com timestamp híbrido
{
"eventId": "uuid_v4",
"eventType": "string (namespace:version)",
"aggregateId": "string",
"aggregateType": "string",
"version": "integer (sequencial por agregado)",
"timestamp": "hybrid_logical_clock",
"payload": "json (dados da mudança)",
"metadata": {
"sourceService": "string",
"sourceNode": "string",
"correlationId": "uuid",
"causationId": "uuid",
"traceId": "uuid for distributed tracing"
}
}
2.3. Particionamento e sharding de streams de eventos para escalabilidade horizontal
O particionamento por aggregateId garante ordenação por agregado sem necessidade de ordenação global. Para sistemas de alto volume, use hash consistente:
// Estratégia de particionamento
partition = hash(aggregateId) % numberOfPartitions
// Exemplo com Kafka: producer config
{
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"partitioner.class": "org.apache.kafka.clients.producer.UniformStickyPartitioner"
}
3. Consistência e Ordenação de Eventos em Múltiplos Nós
3.1. Garantia de ordenação global vs. ordenação por stream ou agregado
Em sistemas distribuídos, ordenação global é impraticável. A abordagem recomendada é ordenação por agregado: todos os eventos de um mesmo aggregateId são garantidamente sequenciais dentro de uma partição.
// Verificação de versão para evitar conflitos
function appendEvent(eventStore, aggregateId, expectedVersion, newEvent):
currentVersion = eventStore.getCurrentVersion(aggregateId)
if currentVersion != expectedVersion:
throw ConcurrencyException("Versão desatualizada")
eventStore.append(aggregateId, newEvent)
3.2. Uso de relógios vetoriais e timestamps híbridos para resolução de concorrência
Relógios vetoriais permitem detectar conflitos em operações concorrentes. Timestamps híbridos combinam tempo físico com lógico para ordenação aproximada:
// Timestamp híbrido (HLC)
{
"physicalTime": 1742041800000, // Unix timestamp ms
"logicalCounter": 42 // Contador lógico para mesmo ms
}
3.3. Tratamento de eventos duplicados e idempotência em produtores e consumidores
Idempotência é crucial. Produtores devem usar identificadores únicos e consumidores devem implementar deduplicação:
// Consumidor com deduplicação
function processEvent(event):
if redis.exists("processed:" + event.eventId):
return // Evento já processado
try:
applyEvent(event)
redis.set("processed:" + event.eventId, "1", ttl=86400)
catch Exception:
// Retry ou dead letter queue
4. Reconstrução de Estado e Snapshots em Larga Escala
4.1. Técnicas de replay de eventos e materialização de agregados
Para reconstruir um agregado, faça replay de todos os eventos desde o início ou desde o último snapshot:
// Reconstrução de agregado
function rebuildAggregate(aggregateId):
state = AggregateState()
events = eventStore.getEvents(aggregateId)
for event in events:
state.apply(event)
return state
4.2. Estratégias de snapshots periódicos para evitar replay completo em sistemas de alto volume
Snapshots reduzem o custo de reconstrução. Armazene o estado completo do agregado a cada N eventos:
// Lógica de snapshot
function appendEvent(eventStore, aggregateId, event):
eventStore.append(aggregateId, event)
eventCount = eventStore.getEventCount(aggregateId)
if eventCount % SNAPSHOT_INTERVAL == 0:
state = rebuildAggregate(aggregateId)
snapshotStore.save(aggregateId, eventCount, state)
4.3. Cache distribuído de agregados e invalidação baseada em versões de eventos
Use Redis ou Memcached para cache de agregados, invalidando quando novos eventos são recebidos:
// Cache com versão
function getAggregate(aggregateId):
cached = redis.get("aggregate:" + aggregateId)
if cached:
return cached
state = rebuildAggregate(aggregateId)
redis.set("aggregate:" + aggregateId, state, ttl=300)
return state
5. Projeções e Leituras Otimizadas em Ambientes Distribuídos
5.1. Criação de projeções assíncronas para views de leitura (CQRS)
Projeções transformam eventos em views otimizadas para consulta:
// Projeção de resumo de pedidos
function orderSummaryProjection(event):
if event.type == "OrderPlaced":
db.update("INSERT INTO order_summary (id, customer, total) VALUES (?, ?, ?)",
[event.aggregateId, event.payload.customerId, event.payload.totalAmount])
elif event.type == "OrderShipped":
db.update("UPDATE order_summary SET status = 'shipped' WHERE id = ?",
[event.aggregateId])
5.2. Sincronização de projeções entre microsserviços com garantia de pelo menos uma vez
Use acknowledgments e offsets para garantir processamento:
// Consumidor Kafka com commit após processamento
while True:
records = consumer.poll(timeout=1000)
for record in records:
processEvent(record.value)
consumer.commit() // Garante pelo menos uma vez
5.3. Tratamento de projeções defasadas e reconciliação de inconsistências temporárias
Projeções podem ficar defasadas. Implemente verificações periódicas de consistência:
// Verificação de consistência
function reconcileProjection(aggregateId):
expectedState = rebuildAggregate(aggregateId)
actualState = projectionStore.getProjection(aggregateId)
if expectedState != actualState:
projectionStore.update(aggregateId, expectedState)
6. Integração com Padrões de Resiliência Distribuída
6.1. Implementação de circuit breaker em produtores e consumidores de eventos
Proteja o sistema contra falhas em cascata:
// Circuit breaker para produtor
circuitBreaker = CircuitBreaker(failureThreshold=5, resetTimeout=30000)
function publishEvent(event):
if circuitBreaker.isOpen():
return Error("Circuit breaker open")
try:
kafkaProducer.send(event)
circuitBreaker.recordSuccess()
except Exception:
circuitBreaker.recordFailure()
throw
6.2. Retry com backoff exponencial e dead letter queues para eventos com falha
Eventos que falham repetidamente devem ir para DLQ:
// Retry com backoff
function processWithRetry(event, maxRetries=3):
for attempt in range(maxRetries):
try:
processEvent(event)
return
except Exception:
wait = 2^attempt * 1000 // 1s, 2s, 4s
sleep(wait)
// Falhou após retries: enviar para DLQ
deadLetterQueue.send(event)
6.3. Estratégias de rollback e compensação em workflows baseados em eventos
Para workflows distribuídos, implemente eventos de compensação:
// Saga com compensação
function processOrderSaga(orderId):
try:
publishEvent("PaymentRequested", {orderId})
publishEvent("InventoryReserved", {orderId})
publishEvent("OrderConfirmed", {orderId})
except Exception:
publishEvent("PaymentRefunded", {orderId})
publishEvent("InventoryReleased", {orderId})
publishEvent("OrderCancelled", {orderId})
7. Governança, Versionamento e Migração de Eventos
7.1. Versionamento de esquemas de eventos com upcasting e downcasting
Evolua eventos sem quebrar consumidores existentes:
// Upcasting: converte evento antigo para novo formato
function upcastEvent(event):
if event.version == 1:
return {
...event,
version: 2,
payload: {
...event.payload,
customerEmail: event.payload.email // renomeado
}
}
return event
7.2. Estratégias de migração de streams de eventos sem interrupção de serviço
Use blue-green deployment para streams:
// Migração com dual-write
function appendEvent(event):
oldStream.append(event) // Stream antigo
newStream.append(event) // Novo stream
// Após migração completa, desativar oldStream
7.3. Políticas de retenção, compactação e expurgo de eventos em clusters distribuídos
Defina políticas claras de gerenciamento do ciclo de vida:
// Política de retenção no Kafka
{
"retention.ms": 604800000, // 7 dias
"retention.bytes": 1073741824, // 1 GB por partição
"cleanup.policy": "compact,delete",
"delete.retention.ms": 86400000 // 1 dia para marcadores de exclusão
}
Referências
-
Event Sourcing Pattern - Microsoft Azure Architecture Center — Documentação oficial da Microsoft sobre o padrão event sourcing, incluindo exemplos práticos e considerações de design para sistemas distribuídos.
-
Event Store Documentation - Getting Started — Guia oficial do EventStoreDB, banco de dados especializado em event sourcing com suporte nativo a projeções e subscriptions.
-
Apache Kafka Documentation - Event Sourcing — Documentação oficial do Apache Kafka sobre como usar o sistema de mensageria para implementar event sourcing em larga escala.
-
CQRS and Event Sourcing - Martin Fowler — Artigo clássico de Martin Fowler explicando os conceitos fundamentais de event sourcing e sua relação com CQRS.
-
Implementing Event Sourcing with PostgreSQL — Tutorial oficial do PostgreSQL sobre como implementar armazenamento de eventos usando tabelas append-only e funções de trigger.