Como aplicar o padrão competing consumers para processamento paralelo

1. Fundamentos do Padrão Competing Consumers

O padrão competing consumers é uma arquitetura de mensageria onde múltiplos consumidores competem por mensagens em uma mesma fila, garantindo que cada mensagem seja processada por exatamente um consumidor. O objetivo principal é balancear a carga de trabalho entre consumidores concorrentes, permitindo processamento paralelo e escalabilidade horizontal.

Diferentemente do padrão publish-subscribe, onde cada mensagem é entregue a todos os assinantes, no competing consumers cada mensagem é consumida uma única vez. No modelo point-to-point tradicional, um único consumidor processa a fila; no competing consumers, múltiplos consumidores dividem o trabalho.

Cenários típicos incluem:
- Processamento de pedidos em e-commerce
- Ingestão e transformação de logs em tempo real
- Processamento de jobs em lote
- Envio de notificações em massa

2. Arquitetura e Componentes Principais

A arquitetura do padrão competing consumers é composta por três elementos fundamentais:

Fila como ponto central de desacoplamento: A fila (ex: RabbitMQ, Apache Kafka, Amazon SQS) atua como buffer entre produtores e consumidores. Ela armazena mensagens até que sejam processadas, permitindo que produtores e consumidores operem em velocidades diferentes.

Papel do produtor: O produtor enfileira mensagens com payloads estruturados. É crucial implementar idempotência no lado do produtor para evitar duplicatas em caso de falhas de rede.

Papel do consumidor: Cada consumidor processa mensagens de forma independente, utilizando confirmação explícita (ack) para sinalizar sucesso ou rejeição (nack) para indicar falha.

3. Configuração de Concorrência e Escalabilidade

A configuração de concorrência depende do modelo de execução:

# Configuração de thread pool para consumidores
consumidores:
  numero_instancias: 3
  threads_por_instancia: 10
  prefetch_count: 5
  batch_size: 100
  timeout_segundos: 30

Estratégias de escalonamento horizontal:
- Adicionar consumidores quando a fila ultrapassar um threshold
- Remover consumidores ociosos para economizar recursos
- Utilizar auto-scaling groups em ambientes cloud

Controle de throughput:
- Prefetch count: Número máximo de mensagens não confirmadas por consumidor
- Batch size: Quantidade de mensagens processadas em lote
- Timeouts: Limite de tempo para processamento de cada mensagem

4. Garantia de Processamento e Tolerância a Falhas

Para garantir processamento confiável, implemente:

Mecanismos de retry e dead-letter queues (DLQ):

# Configuração de retry com backoff exponencial
retry:
  max_tentativas: 3
  backoff_inicial: 1s
  backoff_maximo: 30s
  dlq: "fila_erros"

Idempotência no consumidor: Utilize identificadores únicos (correlation IDs) para detectar e ignorar mensagens duplicadas durante reprocessamentos.

Tratamento de mensagens lentas: Implemente heartbeats para sinalizar que o consumidor ainda está ativo e configure redelivery automático para mensagens não confirmadas dentro do timeout.

5. Monitoramento e Observabilidade

Métricas essenciais para operação do padrão:

# Métricas de monitoramento
metricas:
  - nome: "fila_lag"
    descricao: "Número de mensagens não processadas"
    alerta: "> 10000"
  - nome: "taxa_processamento"
    descricao: "Mensagens processadas por segundo"
    alerta: "< 50"
  - nome: "erros_por_consumidor"
    descricao: "Taxa de erros por instância"
    alerta: "> 5%"
  - nome: "dlq_tamanho"
    descricao: "Mensagens na fila de dead letter"
    alerta: "> 100"

Logs estruturados com correlation IDs permitem rastrear o fluxo completo de cada mensagem desde a produção até o consumo.

Alertas para degradação: Configure notificações para consumidores parados, fila crescendo rapidamente ou DLQ acumulando mensagens.

6. Exemplo Prático com Código

Estrutura do produtor:

# produtor.py
import json
import uuid
from fila import FilaMensageria

fila = FilaMensageria("fila_pedidos")

def produzir_mensagem(pedido):
    mensagem = {
        "correlation_id": str(uuid.uuid4()),
        "tipo": "pedido_criado",
        "payload": {
            "pedido_id": pedido["id"],
            "cliente": pedido["cliente"],
            "itens": pedido["itens"],
            "total": pedido["total"]
        },
        "timestamp": "2024-01-15T10:30:00Z"
    }

    fila.enfileirar(json.dumps(mensagem))
    print(f"Mensagem enfileirada: {mensagem['correlation_id']}")

# Exemplo de uso
pedido_exemplo = {
    "id": 12345,
    "cliente": "Maria Silva",
    "itens": ["produto_a", "produto_b"],
    "total": 150.00
}
produzir_mensagem(pedido_exemplo)

Estrutura do consumidor:

# consumidor.py
import json
import time
from fila import FilaMensageria
from threading import Thread

fila = FilaMensageria("fila_pedidos")

def processar_mensagem(mensagem):
    try:
        dados = json.loads(mensagem)
        correlation_id = dados["correlation_id"]

        print(f"Processando mensagem {correlation_id}")

        # Simula processamento
        time.sleep(0.5)

        # Confirma processamento bem-sucedido
        fila.confirmar(mensagem)
        print(f"Mensagem {correlation_id} processada com sucesso")

    except Exception as erro:
        print(f"Erro ao processar mensagem: {erro}")
        fila.rejeitar(mensagem, requeue=False)

def consumir_mensagens():
    while True:
        mensagem = fila.receber(prefetch_count=5)
        if mensagem:
            processar_mensagem(mensagem)
        else:
            time.sleep(1)

# Configuração de concorrência com 10 threads
NUM_THREADS = 10
threads = []

for i in range(NUM_THREADS):
    thread = Thread(target=consumir_mensagens, name=f"Consumidor-{i}")
    thread.daemon = True
    threads.append(thread)
    thread.start()

# Aguarda threads (em produção, use graceful shutdown)
for thread in threads:
    thread.join()

Configuração de concorrência:

# config.yaml
fila:
  nome: "fila_pedidos"
  tipo: "rabbitmq"
  host: "localhost"
  porta: 5672

consumidor:
  threads: 10
  prefetch_count: 5
  batch_size: 100
  timeout: 30
  retry_max: 3
  dlq: "fila_pedidos_dlq"

7. Desafios e Boas Práticas Avançadas

Ordenação parcial de mensagens: Utilize partições por chave (sharding) para garantir que mensagens relacionadas sejam processadas em ordem:

# Configuração de partições por chave
particoes:
  - chave: "cliente_id"
  - chave: "pedido_id"
  - numero_particoes: 10

Evitar starvation: Implemente fairness entre consumidores utilizando prioridades ou pesos:

# Configuração de fairness
fairness:
  modo: "round_robin"
  pesos:
    consumidor_alta_prioridade: 3
    consumidor_baixa_prioridade: 1
  timeout_starvation: 60

Testes de resiliência: Simule falhas de consumidor e picos de carga:

# Testes de resiliência
testes:
  - nome: "falha_consumidor"
    descricao: "Simular queda de 2 consumidores"
    acao: "matar_processo"

  - nome: "pico_carga"
    descricao: "Produzir 10000 mensagens em 1 segundo"
    acao: "aumentar_producao"

  - nome: "lentidao"
    descricao: "Consumidor com delay de 10s por mensagem"
    acao: "injetar_lentidao"

Boas práticas adicionais:
- Implemente circuit breaker para evitar sobrecarga
- Utilize health checks para detectar consumidores mortos
- Configure graceful shutdown para drenar mensagens em processamento
- Monitore a taxa de rejeição para identificar problemas no consumidor

Referências