Como implementar event sourcing em sistemas distribuídos

1. Fundamentos do Event Sourcing em Sistemas Distribuídos

1.1. Conceitos essenciais: eventos imutáveis, armazenamento de eventos e reconstrução de estado

Event sourcing é um padrão arquitetural onde o estado atual de um sistema é derivado de uma sequência imutável de eventos passados. Em sistemas distribuídos, isso significa que cada alteração de estado é capturada como um evento que nunca é modificado ou excluído. O armazenamento de eventos funciona como um log de auditoria completo, permitindo reconstruir o estado de qualquer agregado em qualquer ponto do tempo.

// Estrutura básica de um evento
{
  "eventId": "evt_8a7b6c5d-1234-5678-9abc-def012345678",
  "eventType": "OrderPlaced",
  "timestamp": "2025-03-15T14:30:00.000Z",
  "aggregateId": "order_987654",
  "version": 3,
  "payload": {
    "orderId": "order_987654",
    "customerId": "cust_123456",
    "items": ["item_001", "item_002"],
    "totalAmount": 250.00
  },
  "metadata": {
    "sourceService": "order-service",
    "correlationId": "corr_abc123",
    "causationId": "cmd_xyz789"
  }
}

1.2. Diferenças entre event sourcing e registro de logs tradicionais em ambientes distribuídos

Enquanto logs tradicionais são descartáveis e focam em depuração, o event sourcing trata eventos como fonte única da verdade. Em sistemas distribuídos, essa distinção é crítica: logs tradicionais geralmente são armazenados localmente e perdidos em falhas de nó, enquanto eventos em event sourcing são replicados e persistidos de forma durável.

// Comparação de abordagens
Log tradicional:
  [2025-03-15] INFO: Order 987654 updated to status "shipped"
  (informação volátil, sem estrutura padronizada)

Event sourcing:
  EventStore.OrderStream:
    Event #1: OrderCreated (version 1)
    Event #2: PaymentProcessed (version 2)
    Event #3: OrderShipped (version 3)
  (eventos imutáveis, estruturados e replicados)

1.3. Benefícios específicos para sistemas distribuídos: rastreabilidade, auditoria e consistência eventual

Em ambientes distribuídos, o event sourcing oferece rastreabilidade completa de todas as operações, suporte a auditoria regulatória e a capacidade de alcançar consistência eventual sem bloqueios distribuídos complexos. Cada microsserviço pode processar eventos no seu próprio ritmo, mantendo independência.

2. Projetando o Armazenamento de Eventos Distribuído

2.1. Escolha do banco de dados de eventos: Apache Kafka, EventStoreDB ou bancos relacionais com append-only

A escolha do armazenamento depende dos requisitos de escala e consistência:

  • Apache Kafka: Ideal para alta throughput e streaming em tempo real, com particionamento nativo.
  • EventStoreDB: Especializado em event sourcing, com suporte a projeções e subscriptions.
  • Bancos relacionais com append-only: Adequado para sistemas menores que já usam PostgreSQL ou MySQL, implementando tabelas de eventos imutáveis.
// Configuração de tópico Kafka para eventos
kafka-topics.sh --create \
  --topic order-events \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.compaction.lag.ms=86400000

2.2. Estrutura do evento: identificador único, tipo, payload, metadados de origem e timestamp distribuído

Cada evento deve conter campos essenciais para operação distribuída:

// Estrutura completa de evento com timestamp híbrido
{
  "eventId": "uuid_v4",
  "eventType": "string (namespace:version)",
  "aggregateId": "string",
  "aggregateType": "string",
  "version": "integer (sequencial por agregado)",
  "timestamp": "hybrid_logical_clock",
  "payload": "json (dados da mudança)",
  "metadata": {
    "sourceService": "string",
    "sourceNode": "string",
    "correlationId": "uuid",
    "causationId": "uuid",
    "traceId": "uuid for distributed tracing"
  }
}

2.3. Particionamento e sharding de streams de eventos para escalabilidade horizontal

O particionamento por aggregateId garante ordenação por agregado sem necessidade de ordenação global. Para sistemas de alto volume, use hash consistente:

// Estratégia de particionamento
partition = hash(aggregateId) % numberOfPartitions

// Exemplo com Kafka: producer config
{
  "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
  "partitioner.class": "org.apache.kafka.clients.producer.UniformStickyPartitioner"
}

3. Consistência e Ordenação de Eventos em Múltiplos Nós

3.1. Garantia de ordenação global vs. ordenação por stream ou agregado

Em sistemas distribuídos, ordenação global é impraticável. A abordagem recomendada é ordenação por agregado: todos os eventos de um mesmo aggregateId são garantidamente sequenciais dentro de uma partição.

// Verificação de versão para evitar conflitos
function appendEvent(eventStore, aggregateId, expectedVersion, newEvent):
    currentVersion = eventStore.getCurrentVersion(aggregateId)
    if currentVersion != expectedVersion:
        throw ConcurrencyException("Versão desatualizada")
    eventStore.append(aggregateId, newEvent)

3.2. Uso de relógios vetoriais e timestamps híbridos para resolução de concorrência

Relógios vetoriais permitem detectar conflitos em operações concorrentes. Timestamps híbridos combinam tempo físico com lógico para ordenação aproximada:

// Timestamp híbrido (HLC)
{
  "physicalTime": 1742041800000,  // Unix timestamp ms
  "logicalCounter": 42             // Contador lógico para mesmo ms
}

3.3. Tratamento de eventos duplicados e idempotência em produtores e consumidores

Idempotência é crucial. Produtores devem usar identificadores únicos e consumidores devem implementar deduplicação:

// Consumidor com deduplicação
function processEvent(event):
    if redis.exists("processed:" + event.eventId):
        return  // Evento já processado
    try:
        applyEvent(event)
        redis.set("processed:" + event.eventId, "1", ttl=86400)
    catch Exception:
        // Retry ou dead letter queue

4. Reconstrução de Estado e Snapshots em Larga Escala

4.1. Técnicas de replay de eventos e materialização de agregados

Para reconstruir um agregado, faça replay de todos os eventos desde o início ou desde o último snapshot:

// Reconstrução de agregado
function rebuildAggregate(aggregateId):
    state = AggregateState()
    events = eventStore.getEvents(aggregateId)
    for event in events:
        state.apply(event)
    return state

4.2. Estratégias de snapshots periódicos para evitar replay completo em sistemas de alto volume

Snapshots reduzem o custo de reconstrução. Armazene o estado completo do agregado a cada N eventos:

// Lógica de snapshot
function appendEvent(eventStore, aggregateId, event):
    eventStore.append(aggregateId, event)
    eventCount = eventStore.getEventCount(aggregateId)
    if eventCount % SNAPSHOT_INTERVAL == 0:
        state = rebuildAggregate(aggregateId)
        snapshotStore.save(aggregateId, eventCount, state)

4.3. Cache distribuído de agregados e invalidação baseada em versões de eventos

Use Redis ou Memcached para cache de agregados, invalidando quando novos eventos são recebidos:

// Cache com versão
function getAggregate(aggregateId):
    cached = redis.get("aggregate:" + aggregateId)
    if cached:
        return cached
    state = rebuildAggregate(aggregateId)
    redis.set("aggregate:" + aggregateId, state, ttl=300)
    return state

5. Projeções e Leituras Otimizadas em Ambientes Distribuídos

5.1. Criação de projeções assíncronas para views de leitura (CQRS)

Projeções transformam eventos em views otimizadas para consulta:

// Projeção de resumo de pedidos
function orderSummaryProjection(event):
    if event.type == "OrderPlaced":
        db.update("INSERT INTO order_summary (id, customer, total) VALUES (?, ?, ?)",
                  [event.aggregateId, event.payload.customerId, event.payload.totalAmount])
    elif event.type == "OrderShipped":
        db.update("UPDATE order_summary SET status = 'shipped' WHERE id = ?",
                  [event.aggregateId])

5.2. Sincronização de projeções entre microsserviços com garantia de pelo menos uma vez

Use acknowledgments e offsets para garantir processamento:

// Consumidor Kafka com commit após processamento
while True:
    records = consumer.poll(timeout=1000)
    for record in records:
        processEvent(record.value)
        consumer.commit()  // Garante pelo menos uma vez

5.3. Tratamento de projeções defasadas e reconciliação de inconsistências temporárias

Projeções podem ficar defasadas. Implemente verificações periódicas de consistência:

// Verificação de consistência
function reconcileProjection(aggregateId):
    expectedState = rebuildAggregate(aggregateId)
    actualState = projectionStore.getProjection(aggregateId)
    if expectedState != actualState:
        projectionStore.update(aggregateId, expectedState)

6. Integração com Padrões de Resiliência Distribuída

6.1. Implementação de circuit breaker em produtores e consumidores de eventos

Proteja o sistema contra falhas em cascata:

// Circuit breaker para produtor
circuitBreaker = CircuitBreaker(failureThreshold=5, resetTimeout=30000)

function publishEvent(event):
    if circuitBreaker.isOpen():
        return Error("Circuit breaker open")
    try:
        kafkaProducer.send(event)
        circuitBreaker.recordSuccess()
    except Exception:
        circuitBreaker.recordFailure()
        throw

6.2. Retry com backoff exponencial e dead letter queues para eventos com falha

Eventos que falham repetidamente devem ir para DLQ:

// Retry com backoff
function processWithRetry(event, maxRetries=3):
    for attempt in range(maxRetries):
        try:
            processEvent(event)
            return
        except Exception:
            wait = 2^attempt * 1000  // 1s, 2s, 4s
            sleep(wait)
    // Falhou após retries: enviar para DLQ
    deadLetterQueue.send(event)

6.3. Estratégias de rollback e compensação em workflows baseados em eventos

Para workflows distribuídos, implemente eventos de compensação:

// Saga com compensação
function processOrderSaga(orderId):
    try:
        publishEvent("PaymentRequested", {orderId})
        publishEvent("InventoryReserved", {orderId})
        publishEvent("OrderConfirmed", {orderId})
    except Exception:
        publishEvent("PaymentRefunded", {orderId})
        publishEvent("InventoryReleased", {orderId})
        publishEvent("OrderCancelled", {orderId})

7. Governança, Versionamento e Migração de Eventos

7.1. Versionamento de esquemas de eventos com upcasting e downcasting

Evolua eventos sem quebrar consumidores existentes:

// Upcasting: converte evento antigo para novo formato
function upcastEvent(event):
    if event.version == 1:
        return {
            ...event,
            version: 2,
            payload: {
                ...event.payload,
                customerEmail: event.payload.email  // renomeado
            }
        }
    return event

7.2. Estratégias de migração de streams de eventos sem interrupção de serviço

Use blue-green deployment para streams:

// Migração com dual-write
function appendEvent(event):
    oldStream.append(event)  // Stream antigo
    newStream.append(event)  // Novo stream
    // Após migração completa, desativar oldStream

7.3. Políticas de retenção, compactação e expurgo de eventos em clusters distribuídos

Defina políticas claras de gerenciamento do ciclo de vida:

// Política de retenção no Kafka
{
  "retention.ms": 604800000,     // 7 dias
  "retention.bytes": 1073741824, // 1 GB por partição
  "cleanup.policy": "compact,delete",
  "delete.retention.ms": 86400000 // 1 dia para marcadores de exclusão
}

Referências