Outbox pattern: garantindo entrega de eventos com transações

1. O Problema da Entrega de Eventos em Sistemas Distribuídos

1.1. A dualidade entre transações de banco de dados e publicação de mensagens

Em sistemas distribuídos modernos, uma operação de negócio frequentemente precisa realizar duas ações atômicas: persistir dados no banco e publicar um evento para outros serviços. O desafio surge porque essas operações ocorrem em sistemas diferentes — um banco de dados relacional e um message broker — cada um com suas próprias garantias transacionais.

1.2. Cenários típicos de falha: evento perdido vs. evento duplicado

Considere um serviço de pedidos que precisa:
1. Salvar o pedido no banco de dados
2. Publicar um evento "PedidoCriado" no Kafka

Se o sistema publicar o evento antes de salvar no banco, uma falha após a publicação pode resultar em um evento órfão — o pedido nunca foi persistido, mas outros serviços já reagiram ao evento. Se publicar depois, uma falha entre o salvamento e a publicação significa que o evento nunca será enviado, deixando o sistema em estado inconsistente.

1.3. Limitações de abordagens ingênuas

Abordagens como "tentar publicar, depois salvar" ou "salvar, depois publicar" são inerentemente frágeis. Transações distribuídas (2PC) são complexas e nem sempre suportadas. O Outbox Pattern surge como uma solução elegante para esse problema.

2. O Outbox Pattern: Conceitos Fundamentais

2.1. Definição e objetivo principal

O Outbox Pattern garante atomicidade entre a escrita no banco de dados e a emissão de eventos. A ideia central é simples: em vez de publicar o evento diretamente para o message broker, o sistema escreve o evento em uma tabela especial (outbox) dentro da mesma transação do banco de dados.

2.2. Arquitetura básica

Uma tabela outbox atua como repositório temporário de eventos. Um processo separado (publisher) lê dessa tabela e publica os eventos no broker. Como a escrita na outbox está na mesma transação da operação de negócio, ou ambos persistem, ou nenhum persiste.

Transação de Negócio
     |
     +---> INSERT na tabela de domínio (ex.: pedidos)
     |
     +---> INSERT na tabela outbox (ex.: evento PedidoCriado)

Processo Publisher (polling ou CDC)
     |
     +---> Lê registros pendentes da outbox
     |
     +---> Publica no Kafka/RabbitMQ
     |
     +---> Marca como publicado ou remove

2.3. Fluxo de execução

O fluxo completo envolve:
1. Iniciar transação no banco
2. Executar operações de negócio (INSERT/UPDATE nas tabelas de domínio)
3. Inserir um ou mais registros na tabela outbox
4. Confirmar a transação (COMMIT)
5. Processo assíncrono lê a outbox e publica os eventos

3. Implementação da Tabela Outbox

3.1. Estrutura mínima da tabela

CREATE TABLE outbox (
    id              UUID PRIMARY KEY,
    aggregate_type  VARCHAR(100) NOT NULL,
    aggregate_id    VARCHAR(100) NOT NULL,
    event_type      VARCHAR(100) NOT NULL,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMP NOT NULL DEFAULT NOW(),
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
    version         INTEGER NOT NULL DEFAULT 1
);

CREATE INDEX idx_outbox_status_created 
    ON outbox (status, created_at) 
    WHERE status = 'pending';

3.2. Estratégias de serialização do payload

O payload pode ser serializado em diferentes formatos:
- JSON: Simples, legível, mas sem schema enforcement
- Avro: Compacto, com schema evolution, requer Schema Registry
- Protobuf: Eficiente, tipado, ideal para alta performance

Para a maioria dos casos, JSON é suficiente inicialmente, podendo migrar para Avro ou Protobuf conforme a necessidade de performance e governança de schemas.

3.3. Garantias de ordenação e identificação

O campo version permite versionamento do agregado. Combinado com aggregate_type e aggregate_id, é possível rastrear a ordem correta dos eventos. O id UUID garante identificação única, essencial para idempotência no consumidor.

4. Mecanismos de Leitura e Publicação da Outbox

4.1. Polling-based

function pollOutbox():
    while true:
        eventos = SELECT * FROM outbox 
                  WHERE status = 'pending' 
                  ORDER BY created_at ASC 
                  LIMIT 100

        for each evento in eventos:
            try:
                publicarNoKafka(evento)
                UPDATE outbox SET status = 'published' 
                WHERE id = evento.id
            catch Exception:
                logError("Falha ao publicar evento: " + evento.id)
                // Retry será tratado na próxima iteração

        sleep(intervalo_configurado)  // Ex.: 100ms

Vantagens: Simples de implementar, sem dependências externas
Desvantagens: Latência adicional (polling interval), overhead de queries constantes

4.2. Transaction log tailing (CDC)

Ferramentas como Debezium capturam mudanças no log de transações do banco de dados (PostgreSQL WAL, MySQL binlog) e as convertem em eventos. Isso elimina a necessidade de polling, oferecendo latência quase zero.

// Configuração Debezium para PostgreSQL
{
    "name": "outbox-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.dbname": "pedidos",
        "table.include.list": "public.outbox",
        "plugin.name": "pgoutput",
        "transforms": "unwrap,extractNewDocumentState",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
}

4.3. Trade-offs

Característica Polling CDC
Complexidade Baixa Alta
Latência Média (depende do intervalo) Baixa (quase real-time)
Overhead no banco Queries constantes Mínimo (lê log)
Dependências Nenhuma Debezium + Kafka Connect
Manutenção Simples Complexa (configuração, monitoramento)

5. Tratamento de Falhas e Consistência Eventual

5.1. Remoção ou marcação de eventos

Após publicação bem-sucedida, o registro pode ser:
- Marcado como 'published': Mantém histórico, permite auditoria
- Removido: Economiza espaço, mas perde rastreabilidade
- Arquivado em tabela separada: Melhor prática para ambientes de produção

5.2. Retry com backoff

function processarEventos():
    eventos = SELECT * FROM outbox 
              WHERE status = 'pending' 
              AND (tentativas < 5 OR 
                   (tentativas >= 5 AND 
                    NOW() > ultima_tentativa + INTERVAL '5 minutes'))
              ORDER BY created_at ASC

    for each evento in eventos:
        try:
            publicarNoKafka(evento)
            UPDATE outbox SET status = 'published' WHERE id = evento.id
        catch:
            UPDATE outbox SET 
                tentativas = tentativas + 1,
                ultima_tentativa = NOW(),
                erro = 'Falha na publicação'
            WHERE id = evento.id

5.3. Dead letter queue

Eventos que excedem o limite de tentativas devem ser movidos para uma dead letter queue (DLQ):

UPDATE outbox SET status = 'failed' 
WHERE tentativas >= 10 AND status = 'pending';

// Notificar equipe de operações sobre eventos na DLQ

6. Relação com Padrões Vizinhos na Série

6.1. Event versioning

Versionar o payload na outbox permite evolução de schema sem quebrar consumidores existentes. Cada versão do evento carrega metadados sobre o schema utilizado.

6.2. Schema registry

Antes de inserir na outbox, validar o payload contra um schema registrado. Isso garante que apenas eventos válidos sejam publicados.

6.3. Idempotent consumers

Consumidores idempotentes são essenciais para o Outbox Pattern. Como o publisher pode publicar o mesmo evento mais de uma vez (em caso de falha após publicação, mas antes de marcar como published), o consumidor deve ser capaz de processar o mesmo evento múltiplas vezes sem efeitos colaterais.

// Exemplo de consumidor idempotente
function processarEvento(evento):
    if not eventoJaProcessado(evento.id):
        processarLogicaDeNegocio(evento)
        marcarComoProcessado(evento.id)

7. Boas Práticas e Armadilhas Comuns

7.1. Cuidado com transações longas

Manter transações curtas é crucial. A inserção na outbox deve ser rápida. Evite processamento pesado dentro da transação.

7.2. Escolha do nível de isolamento

Para leitura da outbox, READ COMMITTED é suficiente. Evite SERIALIZABLE, que pode causar deadlocks desnecessários.

7.3. Monitoramento

Métricas essenciais:
- Tamanho da fila outbox (quantidade de registros 'pending')
- Latência entre criação e publicação do evento
- Taxa de falhas por tipo de evento
- Número de eventos na DLQ

8. Exemplo Prático: Outbox com PostgreSQL e Kafka

8.1. Estrutura de tabelas e trigger

-- Tabela de domínio
CREATE TABLE pedidos (
    id UUID PRIMARY KEY,
    cliente_id VARCHAR(100) NOT NULL,
    valor DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) NOT NULL DEFAULT 'criado',
    created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

-- Tabela outbox
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT NOW(),
    status VARCHAR(20) NOT NULL DEFAULT 'pending',
    tentativas INTEGER NOT NULL DEFAULT 0,
    ultima_tentativa TIMESTAMP,
    erro TEXT
);

-- Função trigger para inserir na outbox automaticamente
CREATE OR REPLACE FUNCTION insere_outbox_pedido()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
    VALUES (
        'pedido',
        NEW.id::text,
        'PedidoCriado',
        jsonb_build_object(
            'pedido_id', NEW.id,
            'cliente_id', NEW.cliente_id,
            'valor', NEW.valor,
            'status', NEW.status
        )
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_outbox_pedido
    AFTER INSERT ON pedidos
    FOR EACH ROW
    EXECUTE FUNCTION insere_outbox_pedido();

8.2. Código do publisher (polling)

import psycopg2
from kafka import KafkaProducer
import json
import time
import uuid

def criar_conexoes():
    conn = psycopg2.connect(
        host="localhost",
        database="pedidos",
        user="app",
        password="secret"
    )
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    return conn, producer

def publicar_eventos():
    conn, producer = criar_conexoes()

    while True:
        try:
            with conn.cursor() as cursor:
                cursor.execute("""
                    SELECT id, aggregate_type, aggregate_id, 
                           event_type, payload
                    FROM outbox
                    WHERE status = 'pending'
                    AND (tentativas < 3 OR 
                         (tentativas >= 3 AND 
                          NOW() > ultima_tentativa + INTERVAL '1 minute'))
                    ORDER BY created_at ASC
                    LIMIT 50
                    FOR UPDATE SKIP LOCKED
                """)

                eventos = cursor.fetchall()

                for evento in eventos:
                    event_id, agg_type, agg_id, event_type, payload = evento

                    try:
                        future = producer.send(
                            topic=agg_type,
                            key=agg_id.encode(),
                            value={
                                'event_id': str(event_id),
                                'event_type': event_type,
                                'aggregate_id': agg_id,
                                'payload': payload,
                                'timestamp': time.time()
                            }
                        )
                        future.get(timeout=5)

                        cursor.execute("""
                            UPDATE outbox 
                            SET status = 'published' 
                            WHERE id = %s
                        """, (event_id,))

                    except Exception as e:
                        cursor.execute("""
                            UPDATE outbox 
                            SET tentativas = tentativas + 1,
                                ultima_tentativa = NOW(),
                                erro = %s
                            WHERE id = %s
                        """, (str(e), event_id))

                conn.commit()

        except Exception as e:
            print(f"Erro no ciclo de publicação: {e}")
            conn.rollback()
            time.sleep(5)

        time.sleep(0.1)  # 100ms entre ciclos

if __name__ == "__main__":
    publicar_eventos()

8.3. Fluxo completo

  1. Transação de negócio: A aplicação insere um pedido na tabela pedidos. O trigger automaticamente insere um evento na outbox.
  2. Outbox: O evento fica com status 'pending' no banco.
  3. CDC (Debezium): Captura a inserção na outbox através do WAL do PostgreSQL.
  4. Kafka Connect: Publica o evento no tópico 'pedidos' do Kafka.
  5. Consumidor idempotente: Processa o evento, verificando se já foi processado pelo event_id.
// Exemplo de evento publicado no Kafka
{
    "event_id": "a1b2c3d4-...",
    "event_type": "PedidoCriado",
    "aggregate_id": "pedido-123",
    "payload": {
        "pedido_id": "pedido-123",
        "cliente_id": "cliente-456",
        "valor": 150.00,
        "status": "criado"
    },
    "timestamp": 1699000000.123
}

O Outbox Pattern resolve elegantemente o problema da entrega confiável de eventos, garantindo que nenhum evento seja perdido e que a consistência entre banco e mensageria seja mantida. Combinado com boas práticas de monitoramento e tratamento de falhas, torna-se uma ferramenta indispensável na arquitetura de sistemas distribuídos.

Referências