Filas com RabbitMQ e php-amqplib

1. Introdução às Filas de Mensagens e RabbitMQ

Filas de mensagens são um padrão arquitetural fundamental para sistemas distribuídos. O RabbitMQ, implementação do protocolo AMQP (Advanced Message Queuing Protocol), é um dos message brokers mais populares no ecossistema PHP. Seus conceitos básicos incluem:

  • Produtor: aplicação que envia mensagens
  • Consumidor: aplicação que recebe e processa mensagens
  • Fila: buffer que armazena mensagens até serem processadas
  • Exchange: roteador que distribui mensagens para filas baseado em regras
  • Binding: ligação entre exchange e fila com uma chave de roteamento

Por que usar RabbitMQ em aplicações PHP? Ele oferece desacoplamento entre serviços, escalabilidade horizontal (múltiplos consumidores processando em paralelo) e resiliência (mensagens persistem mesmo se consumidores caírem). A biblioteca php-amqplib é a implementação AMQP mais madura para PHP.

2. Configuração do Ambiente

Instalação do RabbitMQ com Docker

# docker-compose.yml
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: user
      RABBITMQ_DEFAULT_PASS: password

Instalação da biblioteca php-amqplib

composer require php-amqplib/php-amqplib

Conexão básica

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;

try {
    $connection = new AMQPStreamConnection(
        'localhost', // host
        5672,        // port
        'user',      // user
        'password',  // password
        '/'          // vhost
    );

    $channel = $connection->channel();
    echo "Conectado ao RabbitMQ!\n";
} catch (AMQPConnectionClosedException $e) {
    echo "Erro de conexão: " . $e->getMessage() . "\n";
}

3. Publicando Mensagens (Produtor)

Produtor básico com exchange direct

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'user', 'password');
$channel = $connection->channel();

// Declara exchange do tipo direct
$channel->exchange_declare(
    'email_exchange', // nome
    'direct',         // tipo
    false,            // passive
    true,             // durable
    false             // auto_delete
);

// Declara fila durável
$channel->queue_declare(
    'email_queue',
    false,
    true,  // durable
    false,
    false
);

// Binding entre exchange e fila
$channel->queue_bind('email_queue', 'email_exchange', 'email.send');

// Cria mensagem com propriedades
$data = json_encode([
    'to' => 'user@example.com',
    'subject' => 'Bem-vindo!',
    'body' => 'Conteúdo do e-mail'
]);

$message = new AMQPMessage($data, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'content_type' => 'application/json',
    'expiration' => '3600000', // TTL de 1 hora em ms
    'priority' => 5
]);

// Publisher confirm (confirmação de entrega)
$channel->set_ack_handler(function (AMQPMessage $message) {
    echo "Mensagem confirmada: " . $message->getBody() . "\n";
});

$channel->confirm_select();
$channel->basic_publish($message, 'email_exchange', 'email.send');

$channel->wait_for_pending_acks();
$channel->close();
$connection->close();

4. Consumindo Mensagens (Consumidor)

Consumidor com acknowledgment e QoS

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'user', 'password');
$channel = $connection->channel();

// Configura QoS: processa 1 mensagem por vez
$channel->basic_qos(
    null,  // tamanho da mensagem (0 = ilimitado)
    1,     // prefetch count
    null   // global
);

$callback = function (AMQPMessage $message) {
    $data = json_decode($message->getBody(), true);

    try {
        // Simula processamento
        echo "Processando e-mail para: " . $data['to'] . "\n";

        // Confirma processamento bem-sucedido
        $message->ack();
    } catch (\Exception $e) {
        echo "Erro: " . $e->getMessage() . "\n";

        // Rejeita e encaminha para dead letter queue
        $message->nack(false, true);
    }
};

$channel->basic_consume(
    'email_queue', // fila
    '',            // consumer tag
    false,         // no_local
    false,         // no_ack (false = manual ack)
    false,         // exclusive
    false,         // no_wait
    $callback      // callback
);

echo "Aguardando mensagens...\n";
while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

5. Tópicos Avançados de Roteamento

Exchange topic com wildcards

<?php
// Produtor com exchange topic
$channel->exchange_declare('logs_topic', 'topic', false, true, false);

// Publica mensagens com routing keys variadas
$messages = [
    ['routing_key' => 'log.error.user', 'body' => 'Erro no módulo de usuário'],
    ['routing_key' => 'log.warning.payment', 'body' => 'Aviso no pagamento'],
    ['routing_key' => 'log.info.email', 'body' => 'Informação de e-mail']
];

foreach ($messages as $msg) {
    $message = new AMQPMessage($msg['body'], ['delivery_mode' => 2]);
    $channel->basic_publish($message, 'logs_topic', $msg['routing_key']);
}

// Consumidor: fila que recebe todos os logs de erro
$channel->queue_declare('error_logs', false, true, false, false);
$channel->queue_bind('error_logs', 'logs_topic', 'log.error.*');

// Consumidor: fila que recebe todos os logs
$channel->queue_declare('all_logs', false, true, false, false);
$channel->queue_bind('all_logs', 'logs_topic', 'log.#');

6. Padrões de Mensageria com PHP

Work Queue com retry e backoff exponencial

<?php
function processWithRetry(AMQPMessage $message, int $retryCount = 0): void {
    $maxRetries = 3;
    $data = json_decode($message->getBody(), true);

    try {
        // Processamento que pode falhar
        if (rand(0, 1) === 0) {
            throw new \RuntimeException('Falha temporária');
        }
        echo "Processado com sucesso: " . $data['id'] . "\n";
        $message->ack();
    } catch (\Exception $e) {
        if ($retryCount < $maxRetries) {
            $delay = pow(2, $retryCount) * 1000; // 1s, 2s, 4s
            echo "Retry " . ($retryCount + 1) . " em {$delay}ms\n";

            // Reencaminha com delay
            $newMessage = new AMQPMessage($message->getBody(), [
                'delivery_mode' => 2,
                'application_headers' => new AMQPTable([
                    'x-retry-count' => $retryCount + 1
                ])
            ]);

            // Publica em exchange com delay (requer plugin rabbitmq_delayed_message_exchange)
            $channel->basic_publish($newMessage, 'delayed_exchange', 'delayed');
            $message->ack();
        } else {
            echo "Falha definitiva, enviando para DLQ\n";
            $message->nack(false, false); // Rejeita sem reencaminhar
        }
    }
}

7. Monitoramento e Boas Práticas

Dead Letter Exchange (DLX)

<?php
// Declara exchange para mensagens mortas
$channel->exchange_declare('dlx_exchange', 'direct', false, true, false);

// Declara fila de mensagens mortas
$channel->queue_declare('dead_letter_queue', false, true, false, false);
$channel->queue_bind('dead_letter_queue', 'dlx_exchange', 'dead');

// Declara fila principal com DLX configurado
$channel->queue_declare(
    'main_queue',
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-dead-letter-exchange' => 'dlx_exchange',
        'x-dead-letter-routing-key' => 'dead',
        'x-message-ttl' => 30000 // 30 segundos
    ])
);

Gerenciamento de conexões

<?php
// Singleton para reutilizar conexão
class RabbitMQConnection {
    private static ?AMQPStreamConnection $connection = null;

    public static function getConnection(): AMQPStreamConnection {
        if (self::$connection === null || !self::$connection->isConnected()) {
            self::$connection = new AMQPStreamConnection(
                $_ENV['RABBITMQ_HOST'],
                $_ENV['RABBITMQ_PORT'],
                $_ENV['RABBITMQ_USER'],
                $_ENV['RABBITMQ_PASS'],
                $_ENV['RABBITMQ_VHOST'],
                false,       // insist
                'AMQPLAIN',  // login_method
                null,        // login_response
                'en_US',     // locale
                3.0,         // connection_timeout
                3.0,         // read_write_timeout
                null,        // context
                false,       // keepalive
                60           // heartbeat
            );
        }
        return self::$connection;
    }
}

8. Integração com Frameworks PHP

Exemplo com Laravel (configuração personalizada)

<?php
// config/queue.php
'connections' => [
    'rabbitmq' => [
        'driver' => 'rabbitmq',
        'host' => env('RABBITMQ_HOST', 'localhost'),
        'port' => env('RABBITMQ_PORT', 5672),
        'vhost' => env('RABBITMQ_VHOST', '/'),
        'login' => env('RABBITMQ_USER', 'guest'),
        'password' => env('RABBITMQ_PASS', 'guest'),
        'queue' => env('RABBITMQ_QUEUE', 'default'),
        'options' => [
            'exchange' => [
                'name' => 'laravel_exchange',
                'type' => 'direct',
                'passive' => false,
                'durable' => true,
                'auto_delete' => false,
            ],
        ],
    ],
];

// Job de exemplo
class SendEmailJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function handle()
    {
        Mail::to($this->user->email)->send(new WelcomeMail($this->user));
    }
}

Testes unitários com mock

<?php
use PHPUnit\Framework\TestCase;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;

class EmailServiceTest extends TestCase
{
    public function testSendEmailViaQueue()
    {
        $channelMock = $this->createMock(AMQPChannel::class);

        $channelMock->expects($this->once())
            ->method('basic_publish')
            ->with(
                $this->callback(function (AMQPMessage $message) {
                    $data = json_decode($message->getBody(), true);
                    return $data['to'] === 'test@example.com';
                }),
                $this->equalTo('email_exchange'),
                $this->equalTo('email.send')
            );

        $service = new EmailService($channelMock);
        $service->sendEmail('test@example.com', 'Assunto', 'Corpo');
    }
}

Referências