Boas práticas de gestão de jobs assíncronos e filas de tarefas
1. Fundamentos de filas de tarefas e jobs assíncronos
1.1. Conceitos básicos: produtor, consumidor, broker e fila
Uma fila de tarefas é composta por quatro elementos fundamentais:
- Produtor: sistema que cria e envia jobs para a fila
- Broker: middleware responsável por armazenar e gerenciar as mensagens
- Fila: estrutura onde os jobs aguardam processamento
- Consumidor (worker): processo que executa os jobs
1.2. Diferença entre processamento síncrono, assíncrono e batch
| Tipo |
Característica |
Exemplo |
| Síncrono |
Resposta imediata, bloqueante |
Requisição HTTP |
| Assíncrono |
Processamento em background |
Envio de e-mail |
| Batch |
Processamento agendado em lote |
Geração de relatórios |
1.3. Casos de uso típicos
text
# Exemplo: Envio de e-mail assíncrono com Redis/Bull
const Queue = require('bull');
const emailQueue = new Queue('email', 'redis://localhost:6379');
// Produtor
app.post('/send-email', async (req, res) => {
await emailQueue.add({
to: req.body.email,
subject: 'Bem-vindo!',
template: 'welcome'
});
res.status(202).json({ message: 'Email enfileirado' });
});
// Consumidor
emailQueue.process(async (job) => {
await sendEmail(job.data.to, job.data.subject);
});
2. Escolha da tecnologia e arquitetura da fila
2.1. Comparação entre tecnologias
| Tecnologia |
Persistência |
Throughput |
Caso de uso ideal |
| Redis/Bull |
Em memória com persistência opcional |
Alto |
Aplicações Node.js |
| RabbitMQ |
Disco e memória |
Médio-alto |
Sistemas empresariais |
| Amazon SQS |
Totalmente gerenciado |
Alto |
Aplicações AWS |
| Kafka |
Logs em disco |
Muito alto |
Stream processing |
2.2. Definição de prioridades, TTL e retenção
text
// Configuração de job com prioridade e TTL
const imageQueue = new Queue('image-processing', {
defaultJobOptions: {
priority: 10,
timeout: 30000, // 30 segundos
removeOnComplete: true,
removeOnFail: 1000
}
});
// Job com prioridade alta
await imageQueue.add({ imageId: 123 }, { priority: 1 });
2.3. Padrões de roteamento
text
// Filas separadas por tipo de job
const emailQueue = new Queue('email-processing');
const imageQueue = new Queue('image-processing');
const reportQueue = new Queue('report-generation');
// Workers especializados
emailQueue.process(5, processEmailJob);
imageQueue.process(3, processImageJob);
reportQueue.process(1, processReportJob);
3. Design e estruturação dos jobs
3.1. Idempotência
text
// Job idempotente com verificação de processamento
const paymentQueue = new Queue('payment');
paymentQueue.process(async (job) => {
const { paymentId } = job.data;
const processed = await checkIfProcessed(paymentId);
if (processed) {
console.log(`Job duplicado ignorado: ${paymentId}`);
return { skipped: true };
}
await processPayment(paymentId);
await markAsProcessed(paymentId);
return { processed: true };
});
3.2. Serialização e versionamento de payloads
text
// Schema do payload com versionamento
const JOB_SCHEMA_V1 = {
version: 1,
userId: 'string',
action: 'string',
metadata: 'object'
};
// Validação do payload
function validateJobPayload(data) {
if (!data.version) throw new Error('Versão do payload ausente');
switch (data.version) {
case 1:
if (!data.userId || !data.action) {
throw new Error('Payload V1 inválido');
}
break;
default:
throw new Error(`Versão desconhecida: ${data.version}`);
}
}
3.3. Granularidade: jobs atômicos vs. compostos
text
// Job atômico: processa uma única imagem
const imageQueue = new Queue('single-image');
imageQueue.process(async (job) => {
await resizeImage(job.data.imageId, 800, 600);
});
// Job composto: processa um lote de imagens
const batchQueue = new Queue('image-batch');
batchQueue.process(async (job) => {
for (const imageId of job.data.imageIds) {
await resizeImage(imageId, 800, 600);
}
});
// Encadeamento de jobs
const chainQueue = new Queue('processing-chain');
chainQueue.process(async (job) => {
const result = await step1(job.data);
await step2(result);
await step3(result);
});
4. Estratégias de retry, falha e dead letter queues
text
// Configuração de retry com backoff exponencial
const apiQueue = new Queue('api-calls', {
defaultJobOptions: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000 // 2s, 4s, 8s, 16s, 32s
}
}
});
apiQueue.process(async (job) => {
try {
await callExternalAPI(job.data);
} catch (error) {
if (error.status === 429) { // Rate limit
throw error; // Retry automático
}
if (error.status >= 500) { // Erro servidor
await job.discard(); // Não retry
}
}
});
4.2. Dead letter queues (DLQ)
text
// Configuração de DLQ
const dlq = new Queue('dead-letter-queue');
const mainQueue = new Queue('main-queue', {
defaultJobOptions: {
attempts: 3
}
});
mainQueue.on('failed', async (job, err) => {
if (job.attemptsMade >= 3) {
await dlq.add({
originalJob: job.data,
error: err.message,
failedAt: new Date().toISOString()
});
}
});
4.3. Monitoramento de jobs falhos
text
// Dashboard de monitoramento
const Bull = require('bull');
const Queue = require('bull');
async function monitorQueues() {
const queues = [emailQueue, imageQueue, reportQueue];
for (const queue of queues) {
const [waiting, active, failed, completed] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getFailedCount(),
queue.getCompletedCount()
]);
console.log({
queue: queue.name,
waiting,
active,
failed,
completed,
errorRate: (failed / (completed + failed)) * 100
});
}
}
setInterval(monitorQueues, 60000); // A cada minuto
5. Controle de concorrência e escalabilidade
5.1. Configuração de workers
text
// Worker com controle de concorrência
const emailWorker = new Queue('email', {
limiter: {
max: 10, // Jobs por vez
duration: 1000 // Por segundo
}
});
// Múltiplos workers em processos diferentes
// worker1.js
emailQueue.process(5, processEmail);
// worker2.js
emailQueue.process(5, processEmail);
// worker3.js
emailQueue.process(5, processEmail);
5.2. Rate limiting para APIs externas
text
// Rate limiter por API externa
const rateLimiter = new Map();
async function callWithRateLimit(apiName, fn) {
const now = Date.now();
const windowMs = 60000; // 1 minuto
const maxCalls = 100;
if (!rateLimiter.has(apiName)) {
rateLimiter.set(apiName, []);
}
const timestamps = rateLimiter.get(apiName);
const windowStart = now - windowMs;
// Remove timestamps antigos
const recent = timestamps.filter(t => t > windowStart);
if (recent.length >= maxCalls) {
const waitMs = recent[0] + windowMs - now;
await new Promise(resolve => setTimeout(resolve, waitMs));
}
recent.push(now);
rateLimiter.set(apiName, recent);
return fn();
}
5.3. Balanceamento de carga
text
// Filas paralelas para balanceamento
const queues = [
new Queue('worker-pool-1'),
new Queue('worker-pool-2'),
new Queue('worker-pool-3')
];
// Distribuição round-robin
let currentQueue = 0;
async function addBalancedJob(data) {
const queue = queues[currentQueue];
currentQueue = (currentQueue + 1) % queues.length;
return queue.add(data);
}
6. Observabilidade e debugging de jobs
text
// Instrumentação com Prometheus
const prometheus = require('prom-client');
const jobDuration = new prometheus.Histogram({
name: 'job_duration_seconds',
help: 'Duração do processamento de jobs',
labelNames: ['queue', 'status'],
buckets: [0.1, 0.5, 1, 2, 5, 10]
});
queue.process(async (job) => {
const start = Date.now();
try {
await processJob(job);
jobDuration.labels(job.queue.name, 'success').observe((Date.now() - start) / 1000);
} catch (error) {
jobDuration.labels(job.queue.name, 'error').observe((Date.now() - start) / 1000);
throw error;
}
});
text
// Log estruturado com correlation ID
const correlationId = require('uuid').v4();
queue.process(async (job) => {
const logger = {
correlationId: correlationId,
jobId: job.id,
queue: job.queue.name,
timestamp: new Date().toISOString()
};
console.log(JSON.stringify({
...logger,
level: 'info',
message: 'Iniciando processamento do job',
data: job.data
}));
try {
await processJob(job);
console.log(JSON.stringify({
...logger,
level: 'info',
message: 'Job processado com sucesso'
}));
} catch (error) {
console.error(JSON.stringify({
...logger,
level: 'error',
message: 'Falha no processamento',
error: error.message,
stack: error.stack
}));
}
});
6.3. Ferramentas de inspeção visual
text
// Bull Board para monitoramento visual
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullAdapter(emailQueue),
new BullAdapter(imageQueue),
new BullAdapter(reportQueue)
],
serverAdapter
});
app.use('/admin/queues', serverAdapter.getRouter());
7. Segurança e boas práticas operacionais
7.1. Autenticação no broker
text
// Conexão segura com Redis
const Queue = require('bull');
const queue = new Queue('secure-queue', {
redis: {
host: 'redis.example.com',
port: 6379,
password: process.env.REDIS_PASSWORD,
tls: {
rejectUnauthorized: true
}
}
});
7.2. Validação e sanitização
text
// Sanitização de dados sensíveis
function sanitizeJobData(data) {
const sensitiveFields = ['password', 'creditCard', 'ssn'];
const sanitized = { ...data };
for (const field of sensitiveFields) {
if (sanitized[field]) {
sanitized[field] = '***REDACTED***';
}
}
return sanitized;
}
queue.on('completed', (job) => {
console.log('Job completed:', sanitizeJobData(job.data));
});
7.3. Backup e recuperação
text
// Estratégia de backup de filas
async function backupQueue(queue) {
const jobs = await queue.getJobs(['waiting', 'active', 'delayed']);
const backup = {
queueName: queue.name,
timestamp: new Date().toISOString(),
jobs: jobs.map(job => ({
id: job.id,
data: job.data,
opts: job.opts,
timestamp: job.timestamp
}))
};
await fs.writeFile(`backup-${queue.name}-${Date.now()}.json`,
JSON.stringify(backup, null, 2));
}
// Restauração de backup
async function restoreQueue(queue, backupFile) {
const backup = JSON.parse(await fs.readFile(backupFile, 'utf8'));
for (const job of backup.jobs) {
await queue.add(job.data, job.opts);
}
}
8. Anti-padrões e armadilhas comuns
8.1. Jobs bloqueantes e dependências circulares
text
// ❌ Anti-padrão: dependência circular
const queueA = new Queue('queue-a');
const queueB = new Queue('queue-b');
queueA.process(async (job) => {
await queueB.add({ fromA: true });
});
queueB.process(async (job) => {
await queueA.add({ fromB: true }); // Loop infinito!
});
// ✅ Solução: usar timeout máximo
queueA.process(async (job) => {
const result = await Promise.race([
queueB.add({ fromA: true }),
new Promise((_, reject) => setTimeout(() => reject(new Error('Timeout')), 5000))
]);
});
8.2. Overflow de memória
text
// ❌ Anti-padrão: jobs que nunca expiram
const queue = new Queue('memory-leak');
queue.process(async (job) => {
// Job nunca é removido
// Acumula memória indefinidamente
});
// ✅ Solução: remover jobs após conclusão
const queue = new Queue('memory-safe', {
defaultJobOptions: {
removeOnComplete: 100, // Mantém últimos 100
removeOnFail: 50, // Mantém últimos 50 falhos
timeout: 60000 // Timeout de 1 minuto
}
});
8.3. Acumulação de jobs órfãos
text
// Monitoramento de jobs órfãos
async function checkOrphanJobs(queue) {
const failed = await queue.getFailed();
const stalled = await queue.getStalled();
console.log({
queue: queue.name,
failedJobs: failed.length,
stalledJobs: stalled.length,
oldestFailed: failed[0]?.failedReason,
oldestStalled: stalled[0]?.data
});
// Limpeza automática de jobs órfãos antigos
const oneWeekAgo = Date.now() - 7 * 24 * 60 * 60 * 1000;
for (const job of failed) {
if (job.timestamp < oneWeekAgo) {
await job.remove();
console.log(`Job órfão removido: ${job.id}`);
}
}
}
setInterval(() => checkOrphanJobs(queue), 3600000); // A cada hora
Referências