Filas com RabbitMQ e php-amqplib
1. Introdução às Filas de Mensagens e RabbitMQ
Filas de mensagens são um padrão arquitetural fundamental para sistemas distribuídos. O RabbitMQ, implementação do protocolo AMQP (Advanced Message Queuing Protocol), é um dos message brokers mais populares no ecossistema PHP. Seus conceitos básicos incluem:
- Produtor: aplicação que envia mensagens
- Consumidor: aplicação que recebe e processa mensagens
- Fila: buffer que armazena mensagens até serem processadas
- Exchange: roteador que distribui mensagens para filas baseado em regras
- Binding: ligação entre exchange e fila com uma chave de roteamento
Por que usar RabbitMQ em aplicações PHP? Ele oferece desacoplamento entre serviços, escalabilidade horizontal (múltiplos consumidores processando em paralelo) e resiliência (mensagens persistem mesmo se consumidores caírem). A biblioteca php-amqplib é a implementação AMQP mais madura para PHP.
2. Configuração do Ambiente
Instalação do RabbitMQ com Docker
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: user
RABBITMQ_DEFAULT_PASS: password
Instalação da biblioteca php-amqplib
composer require php-amqplib/php-amqplib
Conexão básica
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
try {
$connection = new AMQPStreamConnection(
'localhost', // host
5672, // port
'user', // user
'password', // password
'/' // vhost
);
$channel = $connection->channel();
echo "Conectado ao RabbitMQ!\n";
} catch (AMQPConnectionClosedException $e) {
echo "Erro de conexão: " . $e->getMessage() . "\n";
}
3. Publicando Mensagens (Produtor)
Produtor básico com exchange direct
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'user', 'password');
$channel = $connection->channel();
// Declara exchange do tipo direct
$channel->exchange_declare(
'email_exchange', // nome
'direct', // tipo
false, // passive
true, // durable
false // auto_delete
);
// Declara fila durável
$channel->queue_declare(
'email_queue',
false,
true, // durable
false,
false
);
// Binding entre exchange e fila
$channel->queue_bind('email_queue', 'email_exchange', 'email.send');
// Cria mensagem com propriedades
$data = json_encode([
'to' => 'user@example.com',
'subject' => 'Bem-vindo!',
'body' => 'Conteúdo do e-mail'
]);
$message = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
'expiration' => '3600000', // TTL de 1 hora em ms
'priority' => 5
]);
// Publisher confirm (confirmação de entrega)
$channel->set_ack_handler(function (AMQPMessage $message) {
echo "Mensagem confirmada: " . $message->getBody() . "\n";
});
$channel->confirm_select();
$channel->basic_publish($message, 'email_exchange', 'email.send');
$channel->wait_for_pending_acks();
$channel->close();
$connection->close();
4. Consumindo Mensagens (Consumidor)
Consumidor com acknowledgment e QoS
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'user', 'password');
$channel = $connection->channel();
// Configura QoS: processa 1 mensagem por vez
$channel->basic_qos(
null, // tamanho da mensagem (0 = ilimitado)
1, // prefetch count
null // global
);
$callback = function (AMQPMessage $message) {
$data = json_decode($message->getBody(), true);
try {
// Simula processamento
echo "Processando e-mail para: " . $data['to'] . "\n";
// Confirma processamento bem-sucedido
$message->ack();
} catch (\Exception $e) {
echo "Erro: " . $e->getMessage() . "\n";
// Rejeita e encaminha para dead letter queue
$message->nack(false, true);
}
};
$channel->basic_consume(
'email_queue', // fila
'', // consumer tag
false, // no_local
false, // no_ack (false = manual ack)
false, // exclusive
false, // no_wait
$callback // callback
);
echo "Aguardando mensagens...\n";
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
5. Tópicos Avançados de Roteamento
Exchange topic com wildcards
<?php
// Produtor com exchange topic
$channel->exchange_declare('logs_topic', 'topic', false, true, false);
// Publica mensagens com routing keys variadas
$messages = [
['routing_key' => 'log.error.user', 'body' => 'Erro no módulo de usuário'],
['routing_key' => 'log.warning.payment', 'body' => 'Aviso no pagamento'],
['routing_key' => 'log.info.email', 'body' => 'Informação de e-mail']
];
foreach ($messages as $msg) {
$message = new AMQPMessage($msg['body'], ['delivery_mode' => 2]);
$channel->basic_publish($message, 'logs_topic', $msg['routing_key']);
}
// Consumidor: fila que recebe todos os logs de erro
$channel->queue_declare('error_logs', false, true, false, false);
$channel->queue_bind('error_logs', 'logs_topic', 'log.error.*');
// Consumidor: fila que recebe todos os logs
$channel->queue_declare('all_logs', false, true, false, false);
$channel->queue_bind('all_logs', 'logs_topic', 'log.#');
6. Padrões de Mensageria com PHP
Work Queue com retry e backoff exponencial
<?php
function processWithRetry(AMQPMessage $message, int $retryCount = 0): void {
$maxRetries = 3;
$data = json_decode($message->getBody(), true);
try {
// Processamento que pode falhar
if (rand(0, 1) === 0) {
throw new \RuntimeException('Falha temporária');
}
echo "Processado com sucesso: " . $data['id'] . "\n";
$message->ack();
} catch (\Exception $e) {
if ($retryCount < $maxRetries) {
$delay = pow(2, $retryCount) * 1000; // 1s, 2s, 4s
echo "Retry " . ($retryCount + 1) . " em {$delay}ms\n";
// Reencaminha com delay
$newMessage = new AMQPMessage($message->getBody(), [
'delivery_mode' => 2,
'application_headers' => new AMQPTable([
'x-retry-count' => $retryCount + 1
])
]);
// Publica em exchange com delay (requer plugin rabbitmq_delayed_message_exchange)
$channel->basic_publish($newMessage, 'delayed_exchange', 'delayed');
$message->ack();
} else {
echo "Falha definitiva, enviando para DLQ\n";
$message->nack(false, false); // Rejeita sem reencaminhar
}
}
}
7. Monitoramento e Boas Práticas
Dead Letter Exchange (DLX)
<?php
// Declara exchange para mensagens mortas
$channel->exchange_declare('dlx_exchange', 'direct', false, true, false);
// Declara fila de mensagens mortas
$channel->queue_declare('dead_letter_queue', false, true, false, false);
$channel->queue_bind('dead_letter_queue', 'dlx_exchange', 'dead');
// Declara fila principal com DLX configurado
$channel->queue_declare(
'main_queue',
false,
true,
false,
false,
false,
new AMQPTable([
'x-dead-letter-exchange' => 'dlx_exchange',
'x-dead-letter-routing-key' => 'dead',
'x-message-ttl' => 30000 // 30 segundos
])
);
Gerenciamento de conexões
<?php
// Singleton para reutilizar conexão
class RabbitMQConnection {
private static ?AMQPStreamConnection $connection = null;
public static function getConnection(): AMQPStreamConnection {
if (self::$connection === null || !self::$connection->isConnected()) {
self::$connection = new AMQPStreamConnection(
$_ENV['RABBITMQ_HOST'],
$_ENV['RABBITMQ_PORT'],
$_ENV['RABBITMQ_USER'],
$_ENV['RABBITMQ_PASS'],
$_ENV['RABBITMQ_VHOST'],
false, // insist
'AMQPLAIN', // login_method
null, // login_response
'en_US', // locale
3.0, // connection_timeout
3.0, // read_write_timeout
null, // context
false, // keepalive
60 // heartbeat
);
}
return self::$connection;
}
}
8. Integração com Frameworks PHP
Exemplo com Laravel (configuração personalizada)
<?php
// config/queue.php
'connections' => [
'rabbitmq' => [
'driver' => 'rabbitmq',
'host' => env('RABBITMQ_HOST', 'localhost'),
'port' => env('RABBITMQ_PORT', 5672),
'vhost' => env('RABBITMQ_VHOST', '/'),
'login' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASS', 'guest'),
'queue' => env('RABBITMQ_QUEUE', 'default'),
'options' => [
'exchange' => [
'name' => 'laravel_exchange',
'type' => 'direct',
'passive' => false,
'durable' => true,
'auto_delete' => false,
],
],
],
];
// Job de exemplo
class SendEmailJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function handle()
{
Mail::to($this->user->email)->send(new WelcomeMail($this->user));
}
}
Testes unitários com mock
<?php
use PHPUnit\Framework\TestCase;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
class EmailServiceTest extends TestCase
{
public function testSendEmailViaQueue()
{
$channelMock = $this->createMock(AMQPChannel::class);
$channelMock->expects($this->once())
->method('basic_publish')
->with(
$this->callback(function (AMQPMessage $message) {
$data = json_decode($message->getBody(), true);
return $data['to'] === 'test@example.com';
}),
$this->equalTo('email_exchange'),
$this->equalTo('email.send')
);
$service = new EmailService($channelMock);
$service->sendEmail('test@example.com', 'Assunto', 'Corpo');
}
}
Referências
- RabbitMQ Official Documentation — Documentação completa do RabbitMQ, incluindo tutoriais AMQP, configuração e boas práticas
- php-amqplib GitHub Repository — Repositório oficial da biblioteca com exemplos de código, issues e documentação da API
- RabbitMQ Tutorials - PHP Edition — Seis tutoriais oficiais do RabbitMQ implementados em PHP, cobrindo todos os padrões básicos
- Laravel Queues Documentation — Documentação oficial do Laravel sobre filas, incluindo drivers personalizados e RabbitMQ
- Symfony Messenger Component Documentation — Documentação do componente Messenger do Symfony, com suporte nativo a RabbitMQ e AMQP
- CloudAMQP PHP Guide — Guia prático de instalação e uso do php-amqplib com exemplos reais de produtores e consumidores
- RabbitMQ Best Practices — Artigo com boas práticas de uso do RabbitMQ em produção, incluindo gerenciamento de conexões e DLX