Filas de tarefas com Bull e Redis
1. Introdução às filas de tarefas no Node.js
No desenvolvimento de aplicações modernas, é comum enfrentarmos situações onde operações síncronas simplesmente não são viáveis. Processamento de imagens, envio de emails em massa, geração de relatórios complexos ou integrações com APIs externas podem bloquear o event loop do Node.js e degradar a experiência do usuário.
Filas de tarefas resolvem esses problemas ao permitir que operações pesadas sejam executadas de forma assíncrona, em segundo plano. Os conceitos fundamentais são simples: um produtor adiciona jobs (tarefas) a uma fila, e um ou mais workers (consumidores) processam esses jobs quando possível.
Por que Bull + Redis? O Bull é a biblioteca mais robusta para filas no ecossistema Node.js. Aliado ao Redis, oferece:
- Performance: Redis é um banco em memória extremamente rápido
- Persistência: jobs sobrevivem a reinicializações da aplicação
- Recursos avançados: agendamento, prioridades, retry automático, rate limiting
2. Configuração do ambiente e instalação
Primeiro, precisamos do Redis em execução. A forma mais prática é com Docker:
docker run -d -p 6379:6379 --name redis-bull redis:7-alpine
Verifique se está funcionando:
docker exec redis-bull redis-cli ping
# Resposta esperada: PONG
Agora, crie um novo projeto Node.js e instale o Bull:
mkdir filas-com-bull
cd filas-com-bull
npm init -y
npm install bull express socket.io
Estrutura inicial do projeto:
// queue.js
const Bull = require('bull');
const emailQueue = new Bull('email', {
redis: {
host: 'localhost',
port: 6379
}
});
module.exports = emailQueue;
3. Criando e gerenciando filas com Bull
Instanciar uma fila é simples. O Bull gerencia automaticamente a conexão com Redis:
const taskQueue = new Bull('tarefas', {
redis: { host: 'localhost', port: 6379 }
});
Para adicionar jobs, use o método add(). Você pode passar qualquer objeto serializável em JSON:
async function adicionarJob() {
const job = await taskQueue.add(
{
userId: 123,
tipo: 'processar_imagem',
caminho: '/uploads/foto.jpg'
},
{
delay: 5000, // Espera 5 segundos antes de processar
attempts: 3, // Tenta 3 vezes em caso de falha
removeOnComplete: true // Remove após concluir
}
);
console.log(`Job ${job.id} adicionado`);
}
4. Processamento de jobs com workers
O worker é a função que realmente executa o trabalho. Cada job passa por estados: waiting → active → completed ou failed.
taskQueue.process(async (job) => {
console.log(`Processando job ${job.id}: ${job.data.tipo}`);
// Simulando processamento pesado
await new Promise(resolve => setTimeout(resolve, 3000));
// Relata progresso (0-100)
await job.progress(50);
// Continua processando...
return { resultado: 'sucesso' };
});
Para controle de concorrência, especifique quantos jobs o worker pode processar simultaneamente:
// Processa até 5 jobs simultaneamente
taskQueue.process(5, async (job) => {
// ... lógica do worker
});
Tratamento de erros e retry automático:
taskQueue.process(async (job) => {
try {
const resultado = await operacaoArriscada(job.data);
return resultado;
} catch (error) {
// Se falhar, o Bull tentará novamente baseado na opção 'attempts'
throw error;
}
});
5. Eventos e monitoramento da fila
O Bull emite eventos que permitem monitorar cada aspecto da fila:
taskQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} concluído com resultado:`, result);
});
taskQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} falhou:`, err.message);
});
taskQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} está em ${progress}%`);
});
taskQueue.on('waiting', (jobId) => {
console.log(`Job ${jobId} aguardando processamento`);
});
6. Funcionalidades avançadas
Agendamento com cron:
const relatorioQueue = new Bull('relatorios');
// Executa toda segunda-feira às 8h
await relatorioQueue.add(
{ tipo: 'relatorio_semanal' },
{ repeat: { cron: '0 8 * * 1' } }
);
Prioridades entre jobs:
await taskQueue.add({ tipo: 'urgente' }, { priority: 1 });
await taskQueue.add({ tipo: 'normal' }, { priority: 10 });
Limpeza automática de jobs antigos:
// Remove jobs completados há mais de 24 horas
await taskQueue.clean(24 * 60 * 60 * 1000, 'completed');
// Ou configure na criação do job
await taskQueue.add(data, { removeOnComplete: { age: 86400 } });
Rate limiting para controlar throughput:
const taskQueue = new Bull('tarefas', {
limiter: {
max: 100, // máximo de jobs
duration: 1000 // por segundo
}
});
7. Integração com React: interface de gerenciamento
Vamos criar um backend Express que expõe endpoints para gerenciar a fila:
// server.js
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const taskQueue = require('./queue');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, { cors: { origin: '*' } });
app.use(express.json());
// Endpoint para adicionar job
app.post('/jobs', async (req, res) => {
const { tipo, dados } = req.body;
const job = await taskQueue.add({ tipo, ...dados });
res.json({ jobId: job.id });
});
// Endpoint para listar jobs
app.get('/jobs', async (req, res) => {
const waiting = await taskQueue.getWaiting();
const active = await taskQueue.getActive();
const completed = await taskQueue.getCompleted();
const failed = await taskQueue.getFailed();
res.json({ waiting, active, completed, failed });
});
// WebSocket para atualizações em tempo real
taskQueue.on('completed', (job, result) => {
io.emit('job-completed', { jobId: job.id, result });
});
taskQueue.on('progress', (job, progress) => {
io.emit('job-progress', { jobId: job.id, progress });
});
server.listen(3001, () => console.log('API rodando na porta 3001'));
Componente React para exibir os jobs em tempo real:
// JobDashboard.jsx
import React, { useState, useEffect } from 'react';
import io from 'socket.io-client';
const socket = io('http://localhost:3001');
function JobDashboard() {
const [jobs, setJobs] = useState([]);
const [formData, setFormData] = useState({ tipo: '', dados: '' });
useEffect(() => {
socket.on('job-completed', (data) => {
setJobs(prev => [...prev, { id: data.jobId, status: 'completed', ...data }]);
});
return () => socket.disconnect();
}, []);
const handleSubmit = async (e) => {
e.preventDefault();
await fetch('http://localhost:3001/jobs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(formData)
});
};
return (
<div>
<h2>Gerenciador de Filas</h2>
<form onSubmit={handleSubmit}>
<input
type="text"
placeholder="Tipo do job"
value={formData.tipo}
onChange={(e) => setFormData({...formData, tipo: e.target.value})}
/>
<button type="submit">Adicionar Job</button>
</form>
<table>
<thead>
<tr>
<th>ID</th>
<th>Status</th>
<th>Progresso</th>
</tr>
</thead>
<tbody>
{jobs.map(job => (
<tr key={job.id}>
<td>{job.id}</td>
<td>{job.status}</td>
<td>{job.progress || 0}%</td>
</tr>
))}
</tbody>
</table>
</div>
);
}
export default JobDashboard;
8. Boas práticas e troubleshooting
Tratamento de falhas no Redis:
const queue = new Bull('minha-fila', {
redis: {
host: 'localhost',
port: 6379,
maxRetriesPerRequest: 3,
retryStrategy: (times) => Math.min(times * 100, 3000)
}
});
queue.on('error', (error) => {
console.error('Erro na conexão Redis:', error.message);
});
Testes unitários com Jest:
// queue.test.js
const Queue = require('bull');
jest.mock('bull', () => {
return jest.fn().mockImplementation(() => ({
add: jest.fn().mockResolvedValue({ id: 'mock-job-123' }),
process: jest.fn(),
on: jest.fn()
}));
});
test('deve adicionar um job na fila', async () => {
const queue = new Queue();
const job = await queue.add({ tarefa: 'teste' });
expect(job.id).toBe('mock-job-123');
});
Monitoramento com Bull Board:
npm install @bull-board/express
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [new BullAdapter(taskQueue)],
serverAdapter
});
app.use('/admin/queues', serverAdapter.getRouter());
Considerações de deploy:
- Em produção, use Redis Cluster ou ElastiCache para alta disponibilidade
- Separe workers em processos ou containers independentes
- Configure variáveis de ambiente para conexão Redis
- Implemente health checks periódicos
Referências
- Documentação oficial do Bull — Repositório oficial com documentação completa, exemplos e guia de API
- Documentação do Redis — Guia oficial do Redis, incluindo configuração, comandos e boas práticas
- Bull Board - Dashboard para filas — Interface web para monitoramento visual de filas Bull
- Tutorial: Filas com Bull e Redis no Node.js — Artigo prático da LogRocket sobre implementação de filas
- Node.js Performance com Filas — Guia do freeCodeCamp sobre filas e processamento assíncrono
- Bull + React: Tempo Real — Artigo da Smashing Magazine sobre integração Bull com React e WebSockets
- Redis em Produção — Guia oficial de configuração do Redis para ambientes produtivos