Como implementar filas de mensagens com RabbitMQ
1. Fundamentos do RabbitMQ e Mensageria Assíncrona
RabbitMQ é um broker de mensagens open-source que implementa o protocolo AMQP (Advanced Message Queuing Protocol). Sua arquitetura permite que sistemas distribuídos se comuniquem de forma assíncrona, desacoplando produtores de consumidores. Os componentes fundamentais incluem:
- Produtor: aplicação que envia mensagens
- Consumidor: aplicação que recebe e processa mensagens
- Fila: buffer que armazena mensagens até serem consumidas
- Exchange: roteador que distribui mensagens para filas baseado em regras
Casos de uso típicos incluem desacoplamento entre microsserviços, balanceamento de carga em processamento de tarefas e pipelines de dados assíncronos.
2. Instalação e Configuração do Ambiente
A maneira mais rápida de iniciar é usando Docker:
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3-management
A interface de gerenciamento estará disponível em http://localhost:15672 (credenciais padrão: guest/guest). Após o login, crie um vhost e usuário dedicado:
# Via interface web:
Admin > Virtual Hosts > Add a new virtual host > "meu_vhost"
Admin > Users > Add a user > "dev_user" / "dev_pass" > Set permission para "meu_vhost"
3. Tipos de Exchanges e Roteamento de Mensagens
Exchange Direct
Roteia mensagens para filas cuja routing key corresponde exatamente à chave definida na publicação.
# Exemplo de configuração
Exchange: logs_direct (tipo direct)
Fila A: binding com routing key "error"
Fila B: binding com routing key "info"
Mensagem com routing key "error" → apenas Fila A recebe
Exchange Topic
Permite padrões com curingas: * substitui uma palavra, # substitui zero ou mais palavras.
Exchange: logs_topic (tipo topic)
Fila A: binding "*.critical"
Fila B: binding "app.#"
Mensagem "app.error.critical" → ambas filas recebem
Mensagem "auth.warning" → nenhuma fila recebe
Exchange Fanout
Ignora routing keys e envia cópia da mensagem para todas as filas vinculadas.
Exchange: broadcast (tipo fanout)
Fila 1, Fila 2, Fila 3 vinculadas
Qualquer mensagem publicada → todas as 3 filas recebem
4. Implementação de Produtores e Consumidores
Usaremos Python com a biblioteca pika. Instalação:
pip install pika
Produtor básico
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=pika.PlainCredentials('dev_user', 'dev_pass'), virtual_host='meu_vhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(
exchange='',
routing_key='task_queue',
body='Processar pedido #123',
properties=pika.BasicProperties(
delivery_mode=2, # mensagem persistente
priority=5,
expiration='60000' # TTL de 60 segundos
))
print("Mensagem enviada")
connection.close()
Consumidor com confirmação manual
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=pika.PlainCredentials('dev_user', 'dev_pass'), virtual_host='meu_vhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(f"Recebido: {body}")
# Simula processamento
import time
time.sleep(2)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print("Aguardando mensagens...")
channel.start_consuming()
5. Padrões de Resiliência e Tratamento de Falhas
Dead Letter Queue (DLQ)
Configure uma fila para receber mensagens rejeitadas ou expiradas:
# Declarar DLQ e exchange principal
channel.exchange_declare(exchange='main_exchange', exchange_type='direct')
channel.queue_declare(queue='dlq', durable=True)
# Fila principal com argumentos DLQ
arguments = {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'dlq'
}
channel.queue_declare(queue='main_queue', durable=True, arguments=arguments)
channel.queue_bind(queue='main_queue', exchange='main_exchange', routing_key='task')
Retry com atraso usando TTL
# Fila de retry com TTL de 30 segundos
arguments_retry = {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'retry_to_main',
'x-message-ttl': 30000
}
channel.queue_declare(queue='retry_queue', durable=True, arguments=arguments_retry)
Publisher Confirms
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='',
routing_key='task_queue',
body='Mensagem crítica',
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True)
print("Mensagem confirmada pelo broker")
except pika.exceptions.UnroutableError:
print("Mensagem não roteada")
except pika.exceptions.NackError:
print("Mensagem rejeitada pelo broker")
6. Estratégias de Escalabilidade e Performance
Prefetch Count
Controla quantas mensagens são enviadas simultaneamente a um consumidor:
# No consumidor: processa 1 mensagem por vez
channel.basic_qos(prefetch_count=1)
Cluster RabbitMQ
Para alta disponibilidade, configure um cluster com 3 nós:
# No docker-compose.yml
rabbitmq1:
image: rabbitmq:3-management
hostname: rabbit1
environment:
- RABBITMQ_ERLANG_COOKIE=secret_cookie
- RABBITMQ_NODENAME=rabbit@rabbit1
rabbitmq2:
image: rabbitmq:3-management
hostname: rabbit2
environment:
- RABBITMQ_ERLANG_COOKIE=secret_cookie
- RABBITMQ_NODENAME=rabbit@rabbit2
depends_on: [rabbitmq1]
# Comando para juntar ao cluster
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app
7. Integração com Microsserviços e Pipelines de Dados
Comunicação entre serviços
# Serviço de pedidos (produtor)
def criar_pedido(dados):
channel.basic_publish(
exchange='pedidos_exchange',
routing_key='pedido.criado',
body=json.dumps(dados),
properties=pika.BasicProperties(
content_type='application/json',
delivery_mode=2))
# Serviço de estoque (consumidor)
def processar_pedido(ch, method, properties, body):
dados = json.loads(body)
atualizar_estoque(dados['produto_id'], dados['quantidade'])
ch.basic_ack(method.delivery_tag)
Integração com Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
import pika
def publicar_mensagem():
connection = pika.BlockingConnection(...)
channel = connection.channel()
channel.basic_publish(
exchange='airflow_tasks',
routing_key='etl.iniciar',
body='Iniciar pipeline de dados')
connection.close()
with DAG('pipeline_etl', schedule_interval='@daily') as dag:
task_publicar = PythonOperator(
task_id='publicar_mensagem_rabbitmq',
python_callable=publicar_mensagem)
8. Boas Práticas e Troubleshooting
Versionamento de mensagens
Inclua um campo version no payload para compatibilidade futura:
{
"version": "1.0",
"event": "pedido.criado",
"data": { "pedido_id": "123" }
}
Segurança
# Ativar TLS no RabbitMQ
rabbitmq-plugins enable rabbitmq_management
# Configurar certificados no rabbitmq.conf
ssl_options.certfile = /path/to/cert.pem
ssl_options.keyfile = /path/to/key.pem
Troubleshooting comum
Problema: Filas acumuladas sem consumidores
Solução: Verificar se consumidores estão ativos e se prefetch_count não está muito alto.
Problema: Mensagens duplicadas
Solução: Implementar idempotência no consumidor usando IDs únicos nas mensagens.
Problema: Consumidores lentos
Solução: Aumentar número de consumidores paralelos ou otimizar processamento.
Referências
- RabbitMQ Official Documentation — Documentação completa do RabbitMQ, incluindo tutoriais, guias de instalação e referência da API AMQP.
- RabbitMQ Tutorials - Python (pika) — Série de tutoriais oficiais para implementar filas com Python e pika, do básico ao avançado.
- Pika Documentation — Documentação oficial da biblioteca pika para Python, com exemplos de conexão, publicação e consumo.
- RabbitMQ Best Practices — Artigo da CloudAMQP com boas práticas de design, performance e segurança no RabbitMQ.
- Dead Letter Exchanges and Queues — Guia oficial sobre Dead Letter Exchanges, configuração e casos de uso para tratamento de falhas.
- Clustering and High Availability — Documentação oficial sobre clusterização do RabbitMQ para alta disponibilidade e escalabilidade.