Como projetar filas de processamento assíncrono

1. Fundamentos do processamento assíncrono com filas

O processamento assíncrono com filas é um padrão arquitetural fundamental para sistemas distribuídos modernos. Diferentemente do processamento síncrono, onde o solicitante aguarda a conclusão da operação, o modelo assíncrono introduz um intermediário — a fila — que desacopla produtores e consumidores.

Diferenças críticas:
- Latência: No síncrono, a latência total é a soma de todas as operações. No assíncrono, o produtor retorna imediatamente após enfileirar a mensagem.
- Acoplamento: Síncrono exige que ambos os sistemas estejam disponíveis simultaneamente. Assíncrono permite que consumidores fiquem offline temporariamente.
- Escalabilidade: Filas permitem escalar produtores e consumidores independentemente.

Casos de uso típicos:
- Envio de e-mails transacionais
- Processamento de imagens e vídeos
- Processamento de pagamentos
- Notificações push
- Sincronização de dados entre microsserviços

Métricas-chave para monitoramento:
- Throughput (mensagens/segundo)
- Latência de enfileiramento (tempo entre produção e consumo)
- Taxa de falhas e retentativas
- Profundidade da fila (backlog)

2. Arquitetura de uma fila de mensagens

Uma arquitetura típica de fila de mensagens possui quatro componentes essenciais:

Produtor → [Broker (Fila)] → Consumidor

Componentes:
- Produtor: Aplicação que cria e envia mensagens
- Fila: Estrutura de dados que armazena mensagens temporariamente
- Consumidor: Aplicação que processa as mensagens
- Broker: Serviço que gerencia as filas (RabbitMQ, Apache Kafka, Amazon SQS)

Modelos de entrega:

Ponto a ponto (Queue):

Produtor A → [Fila Pedidos] → Consumidor 1
                              → Consumidor 2

Cada mensagem é processada por exatamente um consumidor. Ideal para workloads onde cada tarefa deve ser executada uma única vez.

Publish-Subscribe (Topic):

Produtor A → [Topic Eventos] → Consumidor 1 (log)
                              → Consumidor 2 (notificação)
                              → Consumidor 3 (análise)

Cada mensagem é entregue a todos os consumidores inscritos. Ideal para eventos de domínio.

Garantias de entrega:

Garantia Descrição Trade-off
At-most-once Mensagem entregue no máximo uma vez Pode perder mensagens
At-least-once Mensagem entregue pelo menos uma vez Pode duplicar processamento
Exactly-once Mensagem entregue exatamente uma vez Maior latência e complexidade

Implementação de exactly-once requer idempotência no consumidor e confirmações em duas fases:

# Pseudocódigo para exactly-once com idempotência
função processarMensagem(mensagem):
    se mensagem.id já processado:
        retornar SUCESSO
    processarOperacao(mensagem)
    marcarComoProcessado(mensagem.id)
    retornar SUCESSO

3. Estratégias de roteamento e particionamento

Roteamento baseado em chave:
Para garantir ordenação por entidade (ex.: todas as transações de um mesmo usuário):

# Roteamento por hash da chave
chave = mensagem.usuario_id
particao = hash(chave) % NUMERO_PARTICOES
enviarParaParticao(mensagem, particao)

Particionamento:
- Hash consistente: Minimiza redistribuição quando consumidores entram/saem
- Round-robin: Distribuição uniforme, mas sem garantia de ordenação

Dead Letter Queue (DLQ):
Mensagens que falharam após todas as tentativas são movidas para uma fila especial:

# Configuração de DLQ no consumidor
tentativas = 0
MÁXIMO_TENTATIVAS = 3

enquanto tentativas < MÁXIMO_TENTATIVAS:
    try:
        processar(mensagem)
        confirmar(mensagem)
        break
    except ErroPermanente:
        moverParaDLQ(mensagem, "Erro permanente")
        break
    except ErroTemporario:
        tentativas++
        aguardar(backoff(tentativas))

4. Projeto do consumidor e controle de concorrência

Modelos de consumo:

Polling:

enquanto verdadeiro:
    mensagens = fila.receber(max_mensagens=10, timeout=20)
    para cada msg em mensagens:
        processar(msg)
        confirmar(msg)
    aguardar(1 segundo)  # Evita polling excessivo

Push (via webhook ou callback):

# Configuração de push no broker
fila.registrarConsumidor(
    url="https://meuservico.com/processar",
    prefetch_count=5  # Máximo de mensagens simultâneas
)

Worker pools com concorrência controlada:

POOL_TAMANHO = 10
semáforo = Semáforo(POOL_TAMANHO)

função consumir():
    enquanto verdadeiro:
        mensagem = fila.receber()
        semáforo.adquirir()
        executarEmParalelo(() => {
            processar(mensagem)
            semáforo.liberar()
        })

Idempotência no consumidor:

função processarPedido(mensagem):
    transação = iniciarTransacao()
    try:
        # Verificar se já processou
        se banco.existe("pedido", mensagem.pedido_id):
            transação.confirmar()
            retornar

        # Processar pedido
        criarPedido(mensagem)
        debitarEstoque(mensagem.itens)
        transação.confirmar()
    except:
        transação.reverter()
        lançarErro()

5. Tratamento de falhas e retentativas

Backoff exponencial com jitter:

função calcularBackoff(tentativa):
    base = 1000  # 1 segundo
    maximo = 60000  # 1 minuto
    expoente = min(2 ** tentativa, maximo / base)
    jitter = aleatorio(0, base * 0.1)
    retornar min(base * expoente + jitter, maximo)

Circuit breaker para proteção:

se taxaErro > 50% nos últimos 60 segundos:
    abrirCircuito()  # Rejeita requisições imediatamente
    agendarReabertura(apos=30 segundos)

Monitoramento essencial:

# Métricas a serem coletadas
métricas = {
    "fila_profundidade": fila.tamanhoAtual(),
    "mensagem_mais_antiga": fila.idadeMaxima(),
    "taxa_consumo": fila.mensagensPorSegundo(),
    "taxa_erro": fila.errosPorSegundo(),
    "tempo_medio_processamento": consumidor.tempoMedio()
}

6. Escalabilidade e resiliência do sistema de filas

Escalonamento horizontal de consumidores:

# Estratégia de rebalanceamento
grupoConsumidores = ["consumer-1", "consumer-2", "consumer-3"]
particoes = ["p0", "p1", "p2", "p3", "p4"]

# Quando consumer-4 entra:
grupoConsumidores = ["consumer-1", "consumer-2", "consumer-3", "consumer-4"]
# Partições são redistribuídas:
# consumer-1: p0, p4
# consumer-2: p1
# consumer-3: p2
# consumer-4: p3

Garantia de ordenação vs. escalabilidade:
- Para ordenação estrita: usar uma única partição por chave
- Para máxima escalabilidade: aceitar ordenação parcial

Persistência e replicação:

# Configuração de durabilidade no RabbitMQ
fila_declarar("pedidos", durable=True)  # Sobrevive a restart do broker
mensagem_enviar("pedidos", corpo, delivery_mode=2)  # Persistente em disco

7. Práticas avançadas e anti-patterns

Filas temporárias para respostas assíncronas:

# Padrão Request-Reply com fila temporária
correlationId = gerarUUID()
filaResposta = criarFilaTemporaria(exclusiva=True)
mensagem = {
    "correlationId": correlationId,
    "payload": dados,
    "replyTo": filaResposta.nome
}
filaPedidos.enviar(mensagem)
resposta = filaResposta.aguardar(timeout=30)

Anti-patterns a evitar:

  1. Fila como banco de dados: Não armazenar estado permanente em filas
  2. Polling excessivo: Usar push ou long polling quando possível
  3. Mensagens muito grandes: Filas não são para transferência de arquivos (>1MB)
  4. Processamento síncrono disfarçado: Aguardar resposta da fila no mesmo thread

Integração com sagas:

# Saga para pedido com pagamento e estoque
passos = [
    {"ação": "reservarEstoque", "compensação": "liberarEstoque"},
    {"ação": "processarPagamento", "compensação": "reembolsar"},
    {"ação": "confirmarPedido", "compensação": "cancelarPedido"}
]

para passo em passos:
    try:
        executar(passo.ação)
        enfileirarPróximoPasso()
    except:
        para passoAnterior em passos.reverso():
            executar(passoAnterior.compensação)
        lançarErro()

Referências