Como implementar filas de mensagens com RabbitMQ

1. Fundamentos do RabbitMQ e Mensageria Assíncrona

RabbitMQ é um broker de mensagens open-source que implementa o protocolo AMQP (Advanced Message Queuing Protocol). Sua arquitetura permite que sistemas distribuídos se comuniquem de forma assíncrona, desacoplando produtores de consumidores. Os componentes fundamentais incluem:

  • Produtor: aplicação que envia mensagens
  • Consumidor: aplicação que recebe e processa mensagens
  • Fila: buffer que armazena mensagens até serem consumidas
  • Exchange: roteador que distribui mensagens para filas baseado em regras

Casos de uso típicos incluem desacoplamento entre microsserviços, balanceamento de carga em processamento de tarefas e pipelines de dados assíncronos.

2. Instalação e Configuração do Ambiente

A maneira mais rápida de iniciar é usando Docker:

docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  rabbitmq:3-management

A interface de gerenciamento estará disponível em http://localhost:15672 (credenciais padrão: guest/guest). Após o login, crie um vhost e usuário dedicado:

# Via interface web:
Admin > Virtual Hosts > Add a new virtual host > "meu_vhost"
Admin > Users > Add a user > "dev_user" / "dev_pass" > Set permission para "meu_vhost"

3. Tipos de Exchanges e Roteamento de Mensagens

Exchange Direct

Roteia mensagens para filas cuja routing key corresponde exatamente à chave definida na publicação.

# Exemplo de configuração
Exchange: logs_direct (tipo direct)
Fila A: binding com routing key "error"
Fila B: binding com routing key "info"

Mensagem com routing key "error" → apenas Fila A recebe

Exchange Topic

Permite padrões com curingas: * substitui uma palavra, # substitui zero ou mais palavras.

Exchange: logs_topic (tipo topic)
Fila A: binding "*.critical"
Fila B: binding "app.#"

Mensagem "app.error.critical" → ambas filas recebem
Mensagem "auth.warning" → nenhuma fila recebe

Exchange Fanout

Ignora routing keys e envia cópia da mensagem para todas as filas vinculadas.

Exchange: broadcast (tipo fanout)
Fila 1, Fila 2, Fila 3 vinculadas

Qualquer mensagem publicada → todas as 3 filas recebem

4. Implementação de Produtores e Consumidores

Usaremos Python com a biblioteca pika. Instalação:

pip install pika

Produtor básico

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=pika.PlainCredentials('dev_user', 'dev_pass'), virtual_host='meu_vhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='Processar pedido #123',
    properties=pika.BasicProperties(
        delivery_mode=2,  # mensagem persistente
        priority=5,
        expiration='60000'  # TTL de 60 segundos
    ))

print("Mensagem enviada")
connection.close()

Consumidor com confirmação manual

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=pika.PlainCredentials('dev_user', 'dev_pass'), virtual_host='meu_vhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    print(f"Recebido: {body}")
    # Simula processamento
    import time
    time.sleep(2)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print("Aguardando mensagens...")
channel.start_consuming()

5. Padrões de Resiliência e Tratamento de Falhas

Dead Letter Queue (DLQ)

Configure uma fila para receber mensagens rejeitadas ou expiradas:

# Declarar DLQ e exchange principal
channel.exchange_declare(exchange='main_exchange', exchange_type='direct')
channel.queue_declare(queue='dlq', durable=True)

# Fila principal com argumentos DLQ
arguments = {
    'x-dead-letter-exchange': '',
    'x-dead-letter-routing-key': 'dlq'
}
channel.queue_declare(queue='main_queue', durable=True, arguments=arguments)
channel.queue_bind(queue='main_queue', exchange='main_exchange', routing_key='task')

Retry com atraso usando TTL

# Fila de retry com TTL de 30 segundos
arguments_retry = {
    'x-dead-letter-exchange': '',
    'x-dead-letter-routing-key': 'retry_to_main',
    'x-message-ttl': 30000
}
channel.queue_declare(queue='retry_queue', durable=True, arguments=arguments_retry)

Publisher Confirms

channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body='Mensagem crítica',
        properties=pika.BasicProperties(delivery_mode=2),
        mandatory=True)
    print("Mensagem confirmada pelo broker")
except pika.exceptions.UnroutableError:
    print("Mensagem não roteada")
except pika.exceptions.NackError:
    print("Mensagem rejeitada pelo broker")

6. Estratégias de Escalabilidade e Performance

Prefetch Count

Controla quantas mensagens são enviadas simultaneamente a um consumidor:

# No consumidor: processa 1 mensagem por vez
channel.basic_qos(prefetch_count=1)

Cluster RabbitMQ

Para alta disponibilidade, configure um cluster com 3 nós:

# No docker-compose.yml
rabbitmq1:
  image: rabbitmq:3-management
  hostname: rabbit1
  environment:
    - RABBITMQ_ERLANG_COOKIE=secret_cookie
    - RABBITMQ_NODENAME=rabbit@rabbit1

rabbitmq2:
  image: rabbitmq:3-management
  hostname: rabbit2
  environment:
    - RABBITMQ_ERLANG_COOKIE=secret_cookie
    - RABBITMQ_NODENAME=rabbit@rabbit2
  depends_on: [rabbitmq1]

# Comando para juntar ao cluster
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app

7. Integração com Microsserviços e Pipelines de Dados

Comunicação entre serviços

# Serviço de pedidos (produtor)
def criar_pedido(dados):
    channel.basic_publish(
        exchange='pedidos_exchange',
        routing_key='pedido.criado',
        body=json.dumps(dados),
        properties=pika.BasicProperties(
            content_type='application/json',
            delivery_mode=2))

# Serviço de estoque (consumidor)
def processar_pedido(ch, method, properties, body):
    dados = json.loads(body)
    atualizar_estoque(dados['produto_id'], dados['quantidade'])
    ch.basic_ack(method.delivery_tag)

Integração com Apache Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
import pika

def publicar_mensagem():
    connection = pika.BlockingConnection(...)
    channel = connection.channel()
    channel.basic_publish(
        exchange='airflow_tasks',
        routing_key='etl.iniciar',
        body='Iniciar pipeline de dados')
    connection.close()

with DAG('pipeline_etl', schedule_interval='@daily') as dag:
    task_publicar = PythonOperator(
        task_id='publicar_mensagem_rabbitmq',
        python_callable=publicar_mensagem)

8. Boas Práticas e Troubleshooting

Versionamento de mensagens

Inclua um campo version no payload para compatibilidade futura:

{
  "version": "1.0",
  "event": "pedido.criado",
  "data": { "pedido_id": "123" }
}

Segurança

# Ativar TLS no RabbitMQ
rabbitmq-plugins enable rabbitmq_management
# Configurar certificados no rabbitmq.conf
ssl_options.certfile = /path/to/cert.pem
ssl_options.keyfile = /path/to/key.pem

Troubleshooting comum

Problema: Filas acumuladas sem consumidores
Solução: Verificar se consumidores estão ativos e se prefetch_count não está muito alto.

Problema: Mensagens duplicadas
Solução: Implementar idempotência no consumidor usando IDs únicos nas mensagens.

Problema: Consumidores lentos
Solução: Aumentar número de consumidores paralelos ou otimizar processamento.

Referências