Event Sourcing: o estado como sequência de eventos

1. Fundamentos do Event Sourcing

1.1. Definição: armazenar eventos de mudança de estado em vez do estado atual

Event Sourcing é um padrão arquitetural onde, em vez de persistir o estado atual de uma entidade, armazenamos uma sequência imutável de eventos que representam todas as mudanças que ocorreram nessa entidade ao longo do tempo. O estado atual é derivado — ou "reconstruído" — a partir da aplicação sequencial desses eventos.

1.2. Diferença fundamental para o modelo CRUD tradicional

No modelo CRUD tradicional, atualizamos registros diretamente:

-- Abordagem CRUD: UPDATE sobrescreve o estado anterior
UPDATE carrinhos SET status = 'finalizado', total = 250.00 WHERE id = 123;

No Event Sourcing, apenas anexamos novos eventos:

-- Abordagem Event Sourcing: APPEND de um novo evento
INSERT INTO eventos_carrinho (aggregate_id, tipo, dados, versao, timestamp)
VALUES ('123', 'CarrinhoFinalizado', '{"total": 250.00}', 5, '2024-01-15T10:30:00Z');

1.3. Propriedades: imutabilidade dos eventos, append-only log, event store como fonte da verdade

Os eventos são imutáveis — uma vez escritos, nunca são alterados ou excluídos. O event store funciona como um log append-only, tornando-se a única fonte da verdade sobre o histórico completo do sistema.

2. Arquitetura e Componentes Centrais

2.1. Event Store

O Event Store é um banco de dados especializado para armazenar eventos. Exemplos incluem EventStoreDB, ou implementações sobre PostgreSQL:

-- PostgreSQL como event store
CREATE TABLE eventos (
    id SERIAL PRIMARY KEY,
    aggregate_id UUID NOT NULL,
    aggregate_type VARCHAR(100) NOT NULL,
    tipo_evento VARCHAR(100) NOT NULL,
    dados JSONB NOT NULL,
    versao INT NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE(aggregate_id, versao)
);

2.2. Aggregate e Event Stream

Os eventos são organizados por aggregate ID, formando um stream de eventos para cada entidade:

Stream do Carrinho #123:
[versao 1] ItemAdicionado { produto: "Camiseta", qtd: 2, preco: 49.90 }
[versao 2] ItemAdicionado { produto: "Calça", qtd: 1, preco: 89.90 }
[versao 3] ItemRemovido { produto: "Camiseta", qtd: 1 }
[versao 4] CarrinhoFinalizado { total: 189.70 }

2.3. Projeções (Projections)

Projeções materializam visões de leitura a partir do stream de eventos:

-- Projeção: carrinhos_ativos
CREATE TABLE carrinhos_ativos AS
SELECT 
    aggregate_id,
    SUM((dados->>'preco')::numeric * (dados->>'qtd')::int) as total,
    MAX(timestamp) as ultima_atualizacao
FROM eventos
WHERE tipo_evento IN ('ItemAdicionado', 'ItemRemovido')
  AND aggregate_id NOT IN (
      SELECT aggregate_id FROM eventos WHERE tipo_evento = 'CarrinhoFinalizado'
  )
GROUP BY aggregate_id;

3. Ciclo de Vida de um Evento

3.1. Geração do evento

Antes de emitir um evento, o aggregate valida as regras de negócio:

class Carrinho {
    constructor(eventos) {
        this.itens = [];
        this.status = 'ativo';
        this.versao = 0;
        this.aplicarEventos(eventos);
    }

    adicionarItem(produto, quantidade, preco) {
        if (this.status !== 'ativo') {
            throw new Error('Carrinho já finalizado');
        }
        this.emitirEvento(new ItemAdicionado(produto, quantidade, preco));
    }
}

3.2. Persistência atômica

A gravação usa controle de concorrência otimista:

-- Persistência com verificação de versão
INSERT INTO eventos (aggregate_id, aggregate_type, tipo_evento, dados, versao, timestamp)
SELECT '123', 'Carrinho', 'ItemAdicionado', '{"produto": "Camiseta", "qtd": 1}', 6, NOW()
WHERE NOT EXISTS (
    SELECT 1 FROM eventos 
    WHERE aggregate_id = '123' AND versao >= 6
);

3.3. Publicação do evento

Após persistir, o evento é publicado para sistemas downstream:

// Publicação via message broker
eventBus.publish(new EventoPublicado(
    aggregateId: '123',
    tipoEvento: 'ItemAdicionado',
    dados: { produto: 'Camiseta', qtd: 1 }
));

4. Reconstrução de Estado (Rehydration)

4.1. Processo de replay

Para obter o estado atual de um aggregate, aplicamos todos os eventos do stream:

function reconstruirCarrinho(aggregateId) {
    const eventos = eventStore.obterEventos(aggregateId);
    let carrinho = new Carrinho();

    for (const evento of eventos) {
        switch(evento.tipo) {
            case 'ItemAdicionado':
                carrinho.itens.push(evento.dados);
                break;
            case 'ItemRemovido':
                carrinho.itens = carrinho.itens.filter(
                    i => i.produto !== evento.dados.produto
                );
                break;
            case 'CarrinhoFinalizado':
                carrinho.status = 'finalizado';
                break;
        }
    }
    return carrinho;
}

4.2. Snapshots

Snapshots evitam replay completo em streams longos:

-- Estrutura de snapshot
CREATE TABLE snapshots_carrinho (
    aggregate_id UUID PRIMARY KEY,
    estado JSONB NOT NULL,
    versao INT NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL
);

-- Reconstrução com snapshot
function reconstruirComSnapshot(aggregateId) {
    const snapshot = obterSnapshot(aggregateId);
    const eventos = eventStore.obterEventosDesde(
        aggregateId, snapshot.versao + 1
    );
    return aplicarEventos(snapshot.estado, eventos);
}

4.3. Estratégias de snapshot

  • Periódico: a cada 24 horas
  • Por número de eventos: a cada 100 eventos
  • Baseado em versão: a cada 50 versões do aggregate

5. Vantagens Arquiteturais do Event Sourcing

5.1. Auditoria completa e rastreabilidade

Cada mudança é registrada com timestamp, versão e dados completos, permitindo auditoria forense detalhada.

5.2. Capacidade de "time travel"

Podemos reconstruir o estado em qualquer ponto do passado:

function estadoNoMomento(aggregateId, timestamp) {
    const eventos = eventStore.obterEventosAte(aggregateId, timestamp);
    return reconstruirCarrinho(eventos);
}

5.3. Desacoplamento temporal

Eventos futuros podem ser analisados e bugs podem ser corrigidos retrospectivamente, aplicando novos eventos corretivos.

6. Desafios e Padrões de Mitigação

6.1. Evolução do esquema de eventos

Para versionamento, usamos upcasting:

-- Upcaster: versão 1 -> versão 2
function upcastItemAdicionadoV1(evento) {
    return {
        ...evento,
        dados: {
            ...evento.dados,
            precoUnitario: evento.dados.preco,
            moeda: 'BRL'
        }
    };
}

6.2. Consistência eventual e latência de projeções

Projeções podem ter latência de segundos a minutos. Para leituras críticas, use leitura direta do event store com cache.

6.3. Complexidade operacional

Gerencie retenção de dados com políticas claras: eventos principais mantidos permanentemente, snapshots compactados periodicamente.

7. Relação com Padrões Vizinhos

7.1. Event Sourcing + CQRS

Separação natural: event store para escritas, projeções materializadas para leituras.

7.2. Event Sourcing + Domain Events

Eventos de domínio (Domain Events) são a base natural para o event store.

7.3. Quando Event Sourcing não é adequado

Evite em cenários de baixa necessidade de auditoria, alta latência crítica, ou quando o volume de eventos é extremamente alto sem necessidade de histórico.

8. Exemplo Prático: Sistema de Carrinho de Compras

8.1. Eventos do domínio

Eventos do Carrinho:
ItemAdicionado { produtoId, nome, quantidade, precoUnitario }
ItemRemovido { produtoId }
QuantidadeAlterada { produtoId, novaQuantidade }
CarrinhoFinalizado { total, formaPagamento }
CarrinhoCancelado { motivo }

8.2. Reconstrução do aggregate

const eventos = [
    { tipo: 'ItemAdicionado', dados: { produto: 'Camiseta', qtd: 2, preco: 49.90 }},
    { tipo: 'ItemAdicionado', dados: { produto: 'Calça', qtd: 1, preco: 89.90 }},
    { tipo: 'QuantidadeAlterada', dados: { produto: 'Camiseta', qtd: 3 }},
    { tipo: 'CarrinhoFinalizado', dados: { total: 239.60 }}
];

const carrinho = reconstruirCarrinho(eventos);
// Resultado: { itens: [{Camiseta, 3, 49.90}, {Calça, 1, 89.90}], status: 'finalizado', total: 239.60 }

8.3. Projeção para consulta

-- Projeção: carrinhos_ativos_para_interface
CREATE MATERIALIZED VIEW carrinhos_ativos_view AS
SELECT 
    c.aggregate_id,
    json_agg(json_build_object(
        'produto', e.dados->>'produto',
        'quantidade', (e.dados->>'qtd')::int,
        'preco', (e.dados->>'preco')::numeric
    )) as itens,
    SUM((e.dados->>'preco')::numeric * (e.dados->>'qtd')::int) as total
FROM eventos e
JOIN (
    SELECT aggregate_id, MAX(versao) as max_versao
    FROM eventos
    GROUP BY aggregate_id
) c ON e.aggregate_id = c.aggregate_id
WHERE NOT EXISTS (
    SELECT 1 FROM eventos 
    WHERE aggregate_id = e.aggregate_id 
    AND tipo_evento IN ('CarrinhoFinalizado', 'CarrinhoCancelado')
)
GROUP BY c.aggregate_id;

Referências