Como usar CQRS com projeções incrementais em bancos de leitura otimizados

1. Fundamentos do CQRS e Projeções Incrementais

O padrão CQRS (Command Query Responsibility Segregation) propõe a separação radical entre operações de comando (escrita) e consulta (leitura). Em sistemas tradicionais, um único modelo de dados atende ambas as finalidades, gerando complexidade e gargalos de desempenho. Com CQRS, cada operação possui seu próprio modelo, permitindo otimizações específicas.

As projeções incrementais são o mecanismo que mantém os bancos de leitura atualizados. Elas processam eventos gerados pelo lado de comando e aplicam transformações contínuas para construir visões de dados otimizadas. Diferente de abordagens batch, as projeções incrementais atualizam o banco de leitura em tempo real ou quase real, reduzindo drasticamente a latência entre a escrita e a disponibilidade dos dados para consulta.

As vantagens práticas incluem:
- Redução de latência de consulta em até 80% em cenários com alta concorrência
- Consistência eventual controlada, com janelas de atualização configuráveis
- Possibilidade de múltiplas visões do mesmo dado sem impacto no modelo de escrita

2. Arquitetura Base para Projeções Incrementais

A arquitetura típica envolve três componentes principais:

Event Store: Armazena todos os eventos gerados pelos comandos, servindo como fonte única da verdade.

Processadores de Eventos: Consomem eventos e executam lógica de projeção, atualizando o banco de leitura.

Banco de Leitura Otimizado: Estrutura de dados projetada exclusivamente para consultas rápidas.

O fluxo básico segue esta sequência:

Comando → Event Store → Fila de Eventos → Processador de Projeção → Banco de Leitura

Para garantir idempotência, cada evento deve conter um identificador único. O processador verifica se o evento já foi processado antes de aplicar a projeção:

function processarEvento(evento):
  if evento.id já processado:
    return SKIP
  aplicarProjecao(evento)
  marcarComoProcessado(evento.id)

O versionamento de eventos é crítico: cada evento carrega um número de sequência que permite reprocessamento parcial e detecção de eventos perdidos.

3. Modelagem de Projeções para Diferentes Padrões de Consulta

Projeções planas são ideais para consultas simples que exigem acesso rápido a dados desnormalizados. Por exemplo, uma projeção de pedidos para um dashboard de vendas:

CREATE TABLE projecao_pedidos_resumo (
  pedido_id UUID PRIMARY KEY,
  cliente_nome TEXT,
  produto_nome TEXT,
  valor_total DECIMAL,
  status TEXT,
  data_criacao TIMESTAMP,
  ultima_atualizacao TIMESTAMP
);

Projeções agregadas são melhores para consultas analíticas que envolvem cálculos em tempo real:

CREATE TABLE projecao_vendas_diarias (
  data DATE,
  total_pedidos INTEGER,
  receita_total DECIMAL,
  ticket_medio DECIMAL,
  PRIMARY KEY (data)
);

Para dashboards em tempo real, use projeções com índices compostos:

CREATE INDEX idx_projecao_pedidos_cliente_status 
ON projecao_pedidos_resumo (cliente_nome, status);

4. Implementação de Processadores de Eventos com Tolerância a Falhas

Filas confiáveis como Kafka ou RabbitMQ garantem entrega durável de eventos. O processador deve implementar checkpointing para retomar de onde parou em caso de falha:

processarEventos():
  ultimoOffset = recuperarCheckpoint()
  while true:
    eventos = fila.consumir(desde=ultimoOffset, max=100)
    for evento in eventos:
      try:
        aplicarProjecao(evento)
        ultimoOffset = evento.offset
      except Exception as e:
        registrarFalha(evento, e)
        continuarProcessamento()
    salvarCheckpoint(ultimoOffset)

A garantia de ordenação é obtida particionando eventos por chave de agregação. Para cenários que exigem "pelo menos uma vez", use identificadores de evento para detectar duplicatas.

5. Otimização de Bancos de Leitura com Projeções Incrementais

A escolha do banco depende do padrão de consulta:

  • Colunar (ClickHouse, Redshift): Ideal para agregações analíticas e consultas de larga escala
  • Documental (MongoDB, Couchbase): Flexível para projeções com esquemas variáveis
  • Relacional (PostgreSQL, MySQL): Excelente para consultas com joins e transações complexas

Técnicas de batch update reduzem operações de escrita:

atualizarProjecaoEmLote(eventos):
  upserts = []
  for evento in eventos:
    upserts.append(gerarUpsert(evento))
  banco.executarEmTransacao(upserts)

Índices materiais aceleram consultas frequentes:

CREATE MATERIALIZED VIEW mv_vendas_mensais AS
SELECT 
  date_trunc('month', data) as mes,
  sum(receita_total) as receita
FROM projecao_vendas_diarias
GROUP BY mes;

Cache de consultas frequentes com TTL configurável:

respostaCache = cache.obter(chaveConsulta)
if respostaCache:
  return respostaCache
resultado = banco.executar(consulta)
cache.armazenar(chaveConsulta, resultado, ttl=60)
return resultado

6. Sincronização e Consistência entre Escrita e Leitura

A consistência eventual é aceitável na maioria dos cenários, desde que a latência seja previsível. Configure notificações para clientes que exigem atualização imediata:

notificarClientes(evento):
  for cliente in inscritos[evento.tipo]:
    webhook.enviar(cliente.url, evento)

Para rollbacks, projeções devem ser reversíveis:

reverterProjecao(evento):
  if evento.tipo == 'PEDIDO_CRIADO':
    banco.deletar('projecao_pedidos', where={'id': evento.pedido_id})
  elif evento.tipo == 'PAGAMENTO_REALIZADO':
    banco.atualizar('projecao_pedidos', 
      set={'status': 'cancelado', 'valor_total': 0},
      where={'id': evento.pedido_id})

7. Monitoramento e Evolução de Projeções Incrementais

Métricas essenciais para acompanhamento:

# Prometheus metrics
projecao_eventos_processados_total{tipo="pedido_criado"} 1024
projecao_tempo_processamento_seconds 0.045
projecao_latencia_leitura_seconds 0.012
projecao_eventos_na_fila 234

Para versionamento de esquemas sem downtime:

migrarProjecao(novaVersao):
  criarNovaTabela('projecao_vendas_v2')
  processador.alterarDestino('projecao_vendas_v2')
  reprocessarEventosDesde(ultimoCheckpoint)
  # Após verificação, trocar consultas para nova tabela
  banco.renomear('projecao_vendas', 'projecao_vendas_v1')
  banco.renomear('projecao_vendas_v2', 'projecao_vendas')

Testes de integração validam a consistência:

testeConsistencia():
  evento = gerarEventoTeste()
  processador.processar(evento)
  resultado = bancoLeitura.consultar(evento.id)
  assert resultado.dados == esperado

Conclusão

CQRS com projeções incrementais oferece uma abordagem robusta para construir bancos de leitura otimizados que atendem requisitos de performance e escalabilidade. A separação de responsabilidades permite que cada lado do sistema evolua independentemente, enquanto as projeções incrementais garantem dados atualizados com latência controlada.

A implementação bem-sucedida depende de escolhas arquiteturais consistentes: versionamento de eventos, processadores tolerantes a falhas, bancos adequados ao padrão de consulta e monitoramento contínuo. Com essas práticas, é possível construir sistemas que processam milhões de eventos por dia com latência de leitura inferior a 10ms.

Referências