Boas práticas de gestão de jobs assíncronos e filas de tarefas

1. Fundamentos de filas de tarefas e jobs assíncronos

1.1. Conceitos básicos: produtor, consumidor, broker e fila

Uma fila de tarefas é composta por quatro elementos fundamentais:

  • Produtor: sistema que cria e envia jobs para a fila
  • Broker: middleware responsável por armazenar e gerenciar as mensagens
  • Fila: estrutura onde os jobs aguardam processamento
  • Consumidor (worker): processo que executa os jobs

1.2. Diferença entre processamento síncrono, assíncrono e batch

Tipo Característica Exemplo
Síncrono Resposta imediata, bloqueante Requisição HTTP
Assíncrono Processamento em background Envio de e-mail
Batch Processamento agendado em lote Geração de relatórios

1.3. Casos de uso típicos

text
# Exemplo: Envio de e-mail assíncrono com Redis/Bull
const Queue = require('bull');
const emailQueue = new Queue('email', 'redis://localhost:6379');

// Produtor
app.post('/send-email', async (req, res) => {
  await emailQueue.add({
    to: req.body.email,
    subject: 'Bem-vindo!',
    template: 'welcome'
  });
  res.status(202).json({ message: 'Email enfileirado' });
});

// Consumidor
emailQueue.process(async (job) => {
  await sendEmail(job.data.to, job.data.subject);
});

2. Escolha da tecnologia e arquitetura da fila

2.1. Comparação entre tecnologias

Tecnologia Persistência Throughput Caso de uso ideal
Redis/Bull Em memória com persistência opcional Alto Aplicações Node.js
RabbitMQ Disco e memória Médio-alto Sistemas empresariais
Amazon SQS Totalmente gerenciado Alto Aplicações AWS
Kafka Logs em disco Muito alto Stream processing

2.2. Definição de prioridades, TTL e retenção

text
// Configuração de job com prioridade e TTL
const imageQueue = new Queue('image-processing', {
  defaultJobOptions: {
    priority: 10,
    timeout: 30000, // 30 segundos
    removeOnComplete: true,
    removeOnFail: 1000
  }
});

// Job com prioridade alta
await imageQueue.add({ imageId: 123 }, { priority: 1 });

2.3. Padrões de roteamento

text
// Filas separadas por tipo de job
const emailQueue = new Queue('email-processing');
const imageQueue = new Queue('image-processing');
const reportQueue = new Queue('report-generation');

// Workers especializados
emailQueue.process(5, processEmailJob);
imageQueue.process(3, processImageJob);
reportQueue.process(1, processReportJob);

3. Design e estruturação dos jobs

3.1. Idempotência

text
// Job idempotente com verificação de processamento
const paymentQueue = new Queue('payment');

paymentQueue.process(async (job) => {
  const { paymentId } = job.data;
  const processed = await checkIfProcessed(paymentId);

  if (processed) {
    console.log(`Job duplicado ignorado: ${paymentId}`);
    return { skipped: true };
  }

  await processPayment(paymentId);
  await markAsProcessed(paymentId);
  return { processed: true };
});

3.2. Serialização e versionamento de payloads

text
// Schema do payload com versionamento
const JOB_SCHEMA_V1 = {
  version: 1,
  userId: 'string',
  action: 'string',
  metadata: 'object'
};

// Validação do payload
function validateJobPayload(data) {
  if (!data.version) throw new Error('Versão do payload ausente');

  switch (data.version) {
    case 1:
      if (!data.userId || !data.action) {
        throw new Error('Payload V1 inválido');
      }
      break;
    default:
      throw new Error(`Versão desconhecida: ${data.version}`);
  }
}

3.3. Granularidade: jobs atômicos vs. compostos

text
// Job atômico: processa uma única imagem
const imageQueue = new Queue('single-image');
imageQueue.process(async (job) => {
  await resizeImage(job.data.imageId, 800, 600);
});

// Job composto: processa um lote de imagens
const batchQueue = new Queue('image-batch');
batchQueue.process(async (job) => {
  for (const imageId of job.data.imageIds) {
    await resizeImage(imageId, 800, 600);
  }
});

// Encadeamento de jobs
const chainQueue = new Queue('processing-chain');
chainQueue.process(async (job) => {
  const result = await step1(job.data);
  await step2(result);
  await step3(result);
});

4. Estratégias de retry, falha e dead letter queues

4.1. Políticas de retry com backoff exponencial

text
// Configuração de retry com backoff exponencial
const apiQueue = new Queue('api-calls', {
  defaultJobOptions: {
    attempts: 5,
    backoff: {
      type: 'exponential',
      delay: 2000 // 2s, 4s, 8s, 16s, 32s
    }
  }
});

apiQueue.process(async (job) => {
  try {
    await callExternalAPI(job.data);
  } catch (error) {
    if (error.status === 429) { // Rate limit
      throw error; // Retry automático
    }
    if (error.status >= 500) { // Erro servidor
      await job.discard(); // Não retry
    }
  }
});

4.2. Dead letter queues (DLQ)

text
// Configuração de DLQ
const dlq = new Queue('dead-letter-queue');

const mainQueue = new Queue('main-queue', {
  defaultJobOptions: {
    attempts: 3
  }
});

mainQueue.on('failed', async (job, err) => {
  if (job.attemptsMade >= 3) {
    await dlq.add({
      originalJob: job.data,
      error: err.message,
      failedAt: new Date().toISOString()
    });
  }
});

4.3. Monitoramento de jobs falhos

text
// Dashboard de monitoramento
const Bull = require('bull');
const Queue = require('bull');

async function monitorQueues() {
  const queues = [emailQueue, imageQueue, reportQueue];

  for (const queue of queues) {
    const [waiting, active, failed, completed] = await Promise.all([
      queue.getWaitingCount(),
      queue.getActiveCount(),
      queue.getFailedCount(),
      queue.getCompletedCount()
    ]);

    console.log({
      queue: queue.name,
      waiting,
      active,
      failed,
      completed,
      errorRate: (failed / (completed + failed)) * 100
    });
  }
}

setInterval(monitorQueues, 60000); // A cada minuto

5. Controle de concorrência e escalabilidade

5.1. Configuração de workers

text
// Worker com controle de concorrência
const emailWorker = new Queue('email', {
  limiter: {
    max: 10, // Jobs por vez
    duration: 1000 // Por segundo
  }
});

// Múltiplos workers em processos diferentes
// worker1.js
emailQueue.process(5, processEmail);

// worker2.js
emailQueue.process(5, processEmail);

// worker3.js
emailQueue.process(5, processEmail);

5.2. Rate limiting para APIs externas

text
// Rate limiter por API externa
const rateLimiter = new Map();

async function callWithRateLimit(apiName, fn) {
  const now = Date.now();
  const windowMs = 60000; // 1 minuto
  const maxCalls = 100;

  if (!rateLimiter.has(apiName)) {
    rateLimiter.set(apiName, []);
  }

  const timestamps = rateLimiter.get(apiName);
  const windowStart = now - windowMs;

  // Remove timestamps antigos
  const recent = timestamps.filter(t => t > windowStart);

  if (recent.length >= maxCalls) {
    const waitMs = recent[0] + windowMs - now;
    await new Promise(resolve => setTimeout(resolve, waitMs));
  }

  recent.push(now);
  rateLimiter.set(apiName, recent);

  return fn();
}

5.3. Balanceamento de carga

text
// Filas paralelas para balanceamento
const queues = [
  new Queue('worker-pool-1'),
  new Queue('worker-pool-2'),
  new Queue('worker-pool-3')
];

// Distribuição round-robin
let currentQueue = 0;
async function addBalancedJob(data) {
  const queue = queues[currentQueue];
  currentQueue = (currentQueue + 1) % queues.length;
  return queue.add(data);
}

6. Observabilidade e debugging de jobs

6.1. Métricas de performance

text
// Instrumentação com Prometheus
const prometheus = require('prom-client');

const jobDuration = new prometheus.Histogram({
  name: 'job_duration_seconds',
  help: 'Duração do processamento de jobs',
  labelNames: ['queue', 'status'],
  buckets: [0.1, 0.5, 1, 2, 5, 10]
});

queue.process(async (job) => {
  const start = Date.now();
  try {
    await processJob(job);
    jobDuration.labels(job.queue.name, 'success').observe((Date.now() - start) / 1000);
  } catch (error) {
    jobDuration.labels(job.queue.name, 'error').observe((Date.now() - start) / 1000);
    throw error;
  }
});

6.2. Logs estruturados com correlation IDs

text
// Log estruturado com correlation ID
const correlationId = require('uuid').v4();

queue.process(async (job) => {
  const logger = {
    correlationId: correlationId,
    jobId: job.id,
    queue: job.queue.name,
    timestamp: new Date().toISOString()
  };

  console.log(JSON.stringify({
    ...logger,
    level: 'info',
    message: 'Iniciando processamento do job',
    data: job.data
  }));

  try {
    await processJob(job);
    console.log(JSON.stringify({
      ...logger,
      level: 'info',
      message: 'Job processado com sucesso'
    }));
  } catch (error) {
    console.error(JSON.stringify({
      ...logger,
      level: 'error',
      message: 'Falha no processamento',
      error: error.message,
      stack: error.stack
    }));
  }
});

6.3. Ferramentas de inspeção visual

text
// Bull Board para monitoramento visual
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullAdapter(emailQueue),
    new BullAdapter(imageQueue),
    new BullAdapter(reportQueue)
  ],
  serverAdapter
});

app.use('/admin/queues', serverAdapter.getRouter());

7. Segurança e boas práticas operacionais

7.1. Autenticação no broker

text
// Conexão segura com Redis
const Queue = require('bull');

const queue = new Queue('secure-queue', {
  redis: {
    host: 'redis.example.com',
    port: 6379,
    password: process.env.REDIS_PASSWORD,
    tls: {
      rejectUnauthorized: true
    }
  }
});

7.2. Validação e sanitização

text
// Sanitização de dados sensíveis
function sanitizeJobData(data) {
  const sensitiveFields = ['password', 'creditCard', 'ssn'];

  const sanitized = { ...data };
  for (const field of sensitiveFields) {
    if (sanitized[field]) {
      sanitized[field] = '***REDACTED***';
    }
  }

  return sanitized;
}

queue.on('completed', (job) => {
  console.log('Job completed:', sanitizeJobData(job.data));
});

7.3. Backup e recuperação

text
// Estratégia de backup de filas
async function backupQueue(queue) {
  const jobs = await queue.getJobs(['waiting', 'active', 'delayed']);

  const backup = {
    queueName: queue.name,
    timestamp: new Date().toISOString(),
    jobs: jobs.map(job => ({
      id: job.id,
      data: job.data,
      opts: job.opts,
      timestamp: job.timestamp
    }))
  };

  await fs.writeFile(`backup-${queue.name}-${Date.now()}.json`, 
    JSON.stringify(backup, null, 2));
}

// Restauração de backup
async function restoreQueue(queue, backupFile) {
  const backup = JSON.parse(await fs.readFile(backupFile, 'utf8'));

  for (const job of backup.jobs) {
    await queue.add(job.data, job.opts);
  }
}

8. Anti-padrões e armadilhas comuns

8.1. Jobs bloqueantes e dependências circulares

text
// ❌ Anti-padrão: dependência circular
const queueA = new Queue('queue-a');
const queueB = new Queue('queue-b');

queueA.process(async (job) => {
  await queueB.add({ fromA: true });
});

queueB.process(async (job) => {
  await queueA.add({ fromB: true }); // Loop infinito!
});

// ✅ Solução: usar timeout máximo
queueA.process(async (job) => {
  const result = await Promise.race([
    queueB.add({ fromA: true }),
    new Promise((_, reject) => setTimeout(() => reject(new Error('Timeout')), 5000))
  ]);
});

8.2. Overflow de memória

text
// ❌ Anti-padrão: jobs que nunca expiram
const queue = new Queue('memory-leak');

queue.process(async (job) => {
  // Job nunca é removido
  // Acumula memória indefinidamente
});

// ✅ Solução: remover jobs após conclusão
const queue = new Queue('memory-safe', {
  defaultJobOptions: {
    removeOnComplete: 100, // Mantém últimos 100
    removeOnFail: 50,      // Mantém últimos 50 falhos
    timeout: 60000         // Timeout de 1 minuto
  }
});

8.3. Acumulação de jobs órfãos

text
// Monitoramento de jobs órfãos
async function checkOrphanJobs(queue) {
  const failed = await queue.getFailed();
  const stalled = await queue.getStalled();

  console.log({
    queue: queue.name,
    failedJobs: failed.length,
    stalledJobs: stalled.length,
    oldestFailed: failed[0]?.failedReason,
    oldestStalled: stalled[0]?.data
  });

  // Limpeza automática de jobs órfãos antigos
  const oneWeekAgo = Date.now() - 7 * 24 * 60 * 60 * 1000;

  for (const job of failed) {
    if (job.timestamp < oneWeekAgo) {
      await job.remove();
      console.log(`Job órfão removido: ${job.id}`);
    }
  }
}

setInterval(() => checkOrphanJobs(queue), 3600000); // A cada hora

Referências