Como usar o Bull e BullMQ para filas de jobs em Node.js com Redis

1. Introdução ao Bull e BullMQ: Conceitos Fundamentais

Filas de jobs são estruturas essenciais em aplicações Node.js que precisam processar tarefas assíncronas de forma confiável. Em vez de executar operações pesadas durante uma requisição HTTP, você enfileira a tarefa e a processa em segundo plano. Isso melhora a responsividade do sistema e permite escalar o processamento horizontalmente.

Bull e BullMQ são bibliotecas populares para gerenciamento de filas em Node.js com Redis como backend. A diferença fundamental entre elas:

  • Bull (versão clássica) usa redis como cliente Redis e oferece uma API madura e estável.
  • BullMQ (versão moderna) usa ioredis e foi reescrita com suporte nativo a Promises, tipagem TypeScript e melhor desempenho.

O Redis atua como um broker de mensagens persistente, garantindo que jobs não sejam perdidos mesmo após reinicializações do servidor. Ele armazena as filas em estruturas de dados otimizadas (listas, sorted sets, hashes) que permitem operações atômicas e de alta performance.

2. Configuração do Ambiente e Instalação

Pré-requisitos: Node.js (v16+), Redis rodando localmente ou via Docker.

# Usando Docker para Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine

Instalação das bibliotecas:

npm install bull bullmq ioredis
# ou
yarn add bull bullmq ioredis

Configuração básica de conexão com Redis:

const IORedis = require('ioredis');

const connection = new IORedis({
  host: 'localhost',
  port: 6379,
  maxRetriesPerRequest: null,
  enableReadyCheck: false
});

// Para ambientes com senha ou TLS:
const connectionSecure = new IORedis({
  host: 'redis-cluster.example.com',
  port: 6380,
  password: 'sua-senha',
  tls: {},
  retryStrategy: (times) => Math.min(times * 50, 2000)
});

3. Criando e Enfileirando Jobs

Definição de uma fila com BullMQ:

const { Queue } = require('bullmq');

const emailQueue = new Queue('email-service', {
  connection,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000
    }
  }
});

// Adicionando jobs com diferentes opções
async function enfileirarJobs() {
  // Job simples
  await emailQueue.add('send-welcome', {
    to: 'usuario@exemplo.com',
    template: 'boas-vindas'
  });

  // Job com prioridade e atraso
  await emailQueue.add('send-recovery', {
    to: 'admin@exemplo.com',
    token: 'abc123'
  }, {
    priority: 10,
    delay: 5000, // 5 segundos
    jobId: 'recovery-abc123' // Evita duplicatas
  });

  // Job com versionamento no nome
  await emailQueue.add('send-notification:v2', {
    userId: 42,
    message: 'Sua conta foi atualizada'
  });
}

4. Processamento de Jobs com Workers

Criação de um worker para consumir jobs:

const { Worker } = require('bullmq');

const worker = new Worker('email-service', async (job) => {
  console.log(`Processando job ${job.id} - ${job.name}`);

  switch (job.name) {
    case 'send-welcome':
      await enviarEmailBemVindo(job.data);
      break;
    case 'send-recovery':
      await enviarEmailRecuperacao(job.data);
      break;
    case 'send-notification:v2':
      await enviarNotificacaoV2(job.data);
      break;
    default:
      throw new Error(`Tipo de job desconhecido: ${job.name}`);
  }
}, {
  connection,
  concurrency: 5, // Processa 5 jobs simultaneamente
  limiter: {
    max: 10,       // Máximo de jobs
    duration: 1000 // Por segundo
  }
});

worker.on('completed', (job) => {
  console.log(`Job ${job.id} concluído com sucesso`);
});

worker.on('failed', (job, err) => {
  console.error(`Job ${job.id} falhou: ${err.message}`);
});

Função de processamento assíncrona com controle de progresso:

const { Worker } = require('bullmq');

const workerProgress = new Worker('processamento-arquivo', async (job) => {
  const arquivo = job.data.arquivo;
  const linhas = arquivo.split('\n');
  const total = linhas.length;

  for (let i = 0; i < total; i++) {
    await processarLinha(linhas[i]);
    await job.updateProgress(Math.round(((i + 1) / total) * 100));
  }

  return { linhasProcessadas: total };
}, { connection });

5. Tratamento de Erros e Retentativas

Configuração avançada de retentativas:

const { Queue, Worker } = require('bullmq');

const pedidosQueue = new Queue('processamento-pedidos', {
  connection,
  defaultJobOptions: {
    attempts: 5,
    backoff: {
      type: 'exponential',
      delay: 1000
    },
    removeOnComplete: {
      age: 3600, // Remove após 1 hora
      count: 100 // Mantém apenas 100 registros
    },
    removeOnFail: {
      age: 86400 // Remove após 24 horas
    }
  }
});

const workerPedidos = new Worker('processamento-pedidos', async (job) => {
  try {
    const resultado = await processarPedido(job.data);
    return resultado;
  } catch (error) {
    if (error.code === 'TEMPORARY_FAILURE') {
      // Erro recuperável - deixa o BullMQ tentar novamente
      throw error;
    }

    // Erro fatal - move para DLQ manualmente
    await deadLetterQueue.add('pedido-falho', {
      ...job.data,
      erroOriginal: error.message,
      jobIdOriginal: job.id
    });

    // Finaliza o job como concluído para não tentar novamente
    return { status: 'fatal_error', error: error.message };
  }
}, { connection });

// Fila para jobs que falharam definitivamente
const deadLetterQueue = new Queue('pedidos-dlq', { connection });

6. Monitoramento e Eventos da Fila

Eventos lifecycle para observabilidade:

const { QueueEvents } = require('bullmq');

const queueEvents = new QueueEvents('email-service', { connection });

queueEvents.on('completed', ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} completou com:`, returnvalue);
  registrarMetrica('email_enviado', { jobId });
});

queueEvents.on('failed', ({ jobId, failedReason }) => {
  console.error(`Job ${jobId} falhou:`, failedReason);
  registrarMetrica('email_falhou', { jobId, motivo: failedReason });
});

queueEvents.on('progress', ({ jobId, data }) => {
  console.log(`Job ${jobId} progresso: ${data}%`);
});

queueEvents.on('stalled', ({ jobId }) => {
  console.warn(`Job ${jobId} foi marcado como stalled`);
  notificarTime('job_stalled', { jobId });
});

// Para Bull (versão clássica)
const QueueBull = require('bull');
const filaBull = new QueueBull('notificacoes', 'redis://localhost:6379');

filaBull.on('global:completed', (jobId, result) => {
  console.log(`Bull: Job ${jobId} completou`);
});

Uso do Bull Board para visualização:

const { createBullBoard } = require('@bull-board/api');
const { BullMQAdapter } = require('@bull-board/api/bullMQAdapter');
const { ExpressAdapter } = require('@bull-board/express');

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/filas');

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(pedidosQueue),
    new BullMQAdapter(deadLetterQueue)
  ],
  serverAdapter
});

app.use('/admin/filas', serverAdapter.getRouter());

7. Padrões Avançados e Boas Práticas

Encadeamento de filas (workflows):

const { Queue, Worker } = require('bullmq');

const filaUpload = new Queue('upload-arquivos', { connection });
const filaProcessamento = new Queue('processamento-arquivos', { connection });
const filaNotificacao = new Queue('notificacao-usuarios', { connection });

// Worker da fila de upload que encadeia para processamento
const workerUpload = new Worker('upload-arquivos', async (job) => {
  const arquivoSalvo = await salvarArquivo(job.data.arquivo);

  // Enfileira próximo job na fila de processamento
  await filaProcessamento.add('processar', {
    arquivoId: arquivoSalvo.id,
    usuarioId: job.data.usuarioId
  }, {
    parent: { id: job.id, queue: 'upload-arquivos' }
  });

  return arquivoSalvo;
}, { connection });

// Worker que notifica após processamento
const workerProcessamento = new Worker('processamento-arquivos', async (job) => {
  const resultado = await processarArquivo(job.data.arquivoId);

  await filaNotificacao.add('enviar-notificacao', {
    usuarioId: job.data.usuarioId,
    mensagem: `Arquivo ${resultado.nome} processado com sucesso`
  });

  return resultado;
}, { connection });

Escalabilidade com múltiplos workers e filas por prioridade:

// Fila de alta prioridade
const filaAlta = new Queue('jobs-alta-prioridade', {
  connection,
  defaultJobOptions: { priority: 1 }
});

// Fila de baixa prioridade
const filaBaixa = new Queue('jobs-baixa-prioridade', {
  connection,
  defaultJobOptions: { priority: 10 }
});

// Workers dedicados para cada fila
new Worker('jobs-alta-prioridade', processadorAlta, { 
  connection, 
  concurrency: 10 
});

new Worker('jobs-baixa-prioridade', processadorBaixa, { 
  connection, 
  concurrency: 2 
});

8. Migração do Bull para BullMQ e Considerações de Produção

Diferenças de API para migração gradual:

// Bull (clássico)
const QueueBull = require('bull');
const filaAntiga = new Queue('fila-legado', 'redis://localhost:6379');

filaAntiga.add({ dados: 'exemplo' }, { 
  attempts: 3,
  backoff: { type: 'fixed', delay: 5000 }
});

// BullMQ (moderno)
const { Queue } = require('bullmq');
const filaNova = new Queue('fila-moderna', { 
  connection,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'fixed', delay: 5000 }
  }
});

filaNova.add('job-exemplo', { dados: 'exemplo' });

Gerenciamento de memória e limpeza:

// Limpeza programada de jobs antigos
const { Queue } = require('bullmq');

const fila = new Queue('fila-principal', { connection });

async function limpezaProgramada() {
  // Remove jobs completados há mais de 7 dias
  await fila.clean(7 * 24 * 3600 * 1000, 1000, 'completed');

  // Remove jobs falhos há mais de 30 dias
  await fila.clean(30 * 24 * 3600 * 1000, 1000, 'failed');

  // Remove jobs stalled
  await fila.clean(3600 * 1000, 100, 'stalled');
}

// Agendar limpeza diária
setInterval(limpezaProgramada, 24 * 60 * 60 * 1000);

Segurança em produção:

const IORedis = require('ioredis');

// Conexão segura com Redis em produção
const connectionProd = new IORedis({
  host: process.env.REDIS_HOST,
  port: parseInt(process.env.REDIS_PORT),
  password: process.env.REDIS_PASSWORD,
  tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
  retryStrategy: (times) => {
    if (times > 10) return null; // Desiste após 10 tentativas
    return Math.min(times * 200, 3000);
  },
  maxRetriesPerRequest: null,
  enableReadyCheck: false
});

// Filas isoladas por ambiente
const ambiente = process.env.NODE_ENV || 'development';
const filaUsuario = new Queue(`fila-usuario:${ambiente}`, { 
  connection: connectionProd 
});

Referências