Como construir uma fila de jobs simples sem RabbitMQ ou Kafka

1. Fundamentos de uma fila de jobs in-memory

Uma fila de jobs é uma estrutura que permite o processamento assíncrono de tarefas, separando a produção (criação) do consumo (execução). Os conceitos fundamentais envolvem três componentes:

  • Produtor: responsável por adicionar jobs à fila
  • Fila: estrutura de dados que armazena jobs pendentes
  • Consumidor: processo que retira e executa jobs da fila

Optar por uma fila simples in-memory é adequado quando:
- O volume de jobs é moderado (centenas por minuto)
- A perda eventual de jobs é aceitável
- O sistema é single-process ou com concorrência controlada
- A complexidade operacional precisa ser mínima

Os trade-offs principais incluem: ausência de persistência robusta, falta de garantias de entrega (at-most-once), e limitações de escalabilidade horizontal.

2. Estrutura de dados para a fila

A implementação mais direta utiliza um array como fila FIFO (First In, First Out). Para cenários que exigem priorização, podemos usar um heap binário.

class FilaJobs {
    constructor() {
        this.fila = [];
        this.jobs = new Map(); // id -> job
        this.estados = {
            pending: new Set(),
            running: new Set(),
            completed: new Set(),
            failed: new Set()
        };
        this.idCounter = 0;
    }
}

Para gerenciamento de estado, cada job transita por: pendingrunningcompleted ou failed. Manter sets separados permite consultas rápidas e monitoramento.

3. Mecanismo de produtor: adicionando jobs à fila

A função enqueue valida o payload e atribui um identificador único:

enqueue(tipo, dados, opcoes = {}) {
    if (!dados || typeof dados !== 'object') {
        throw new Error('Payload inválido');
    }

    const id = ++this.idCounter;
    const job = {
        id,
        tipo,
        dados,
        status: 'pending',
        tentativas: 0,
        maxTentativas: opcoes.maxTentativas || 3,
        criadoEm: Date.now()
    };

    this.jobs.set(id, job);
    this.estados.pending.add(id);
    this.fila.push(id);

    this.emitter.emit('job:added', job);
    return id;
}

Para evitar duplicação, implemente idempotência via hash do payload ou campo idempotencyKey:

enqueueComIdempotencia(tipo, dados, chaveIdempotencia) {
    if (this.jobsProcessados.has(chaveIdempotencia)) {
        return this.jobsProcessados.get(chaveIdempotencia);
    }
    const id = this.enqueue(tipo, dados);
    this.jobsProcessados.set(chaveIdempotencia, id);
    return id;
}

4. Mecanismo de consumidor: processamento assíncrono

O loop de polling pode ser implementado com setInterval ou process.nextTick para maior eficiência:

iniciarConsumidor(concorrencia = 1) {
    this.concorrencia = concorrencia;
    this.ativo = true;

    const processarProximo = () => {
        if (!this.ativo) return;

        const emExecucao = this.estados.running.size;
        if (emExecucao >= this.concorrencia) return;

        const idJob = this.fila.shift();
        if (!idJob) return;

        this.executarJob(idJob);
        setImmediate(processarProximo);
    };

    setImmediate(processarProximo);
}

Para execução concorrente com limite de paralelismo:

async executarJob(idJob) {
    const job = this.jobs.get(idJob);
    job.status = 'running';
    this.estados.pending.delete(idJob);
    this.estados.running.add(idJob);

    try {
        const resultado = await this.workers[job.tipo](job.dados);
        job.status = 'completed';
        job.resultado = resultado;
        this.estados.running.delete(idJob);
        this.estados.completed.add(idJob);
        this.emitter.emit('job:completed', job);
    } catch (erro) {
        job.tentativas++;
        if (job.tentativas < job.maxTentativas) {
            job.status = 'pending';
            this.estados.running.delete(idJob);
            this.estados.pending.add(idJob);
            this.fila.push(idJob);
            this.emitter.emit('job:retry', job);
        } else {
            job.status = 'failed';
            job.erro = erro.message;
            this.estados.running.delete(idJob);
            this.estados.failed.add(idJob);
            this.emitter.emit('job:failed', job);
        }
    }
}

5. Gerenciamento de estado e notificações

Utilize EventEmitter para notificações em tempo real:

const EventEmitter = require('events');

class FilaJobs extends EventEmitter {
    constructor() {
        super();
        // ... inicialização
    }

    // Eventos emitidos:
    // 'job:added' - quando job é enfileirado
    // 'job:started' - quando processamento inicia
    // 'job:completed' - quando job é concluído
    // 'job:failed' - quando job falha após todas tentativas
    // 'job:retry' - quando job será retentado
}

Monitoramento básico:

obterMetricas() {
    return {
        pendentes: this.estados.pending.size,
        executando: this.estados.running.size,
        completados: this.estados.completed.size,
        falhos: this.estados.failed.size,
        totalJobs: this.jobs.size,
        taxaErro: this.estados.failed.size / (this.estados.completed.size + this.estados.failed.size) || 0
    };
}

6. Estratégias de persistência simples

Para persistência básica, serialize o estado da fila em JSON:

async salvarSnapshot(caminho = './fila_snapshot.json') {
    const snapshot = {
        jobs: Array.from(this.jobs.entries()),
        fila: this.fila,
        estados: {
            pending: Array.from(this.estados.pending),
            running: Array.from(this.estados.running),
            completed: Array.from(this.estados.completed),
            failed: Array.from(this.estados.failed)
        },
        idCounter: this.idCounter
    };

    const fs = require('fs');
    await fs.promises.writeFile(caminho, JSON.stringify(snapshot, null, 2));
}

async restaurarSnapshot(caminho = './fila_snapshot.json') {
    const fs = require('fs');
    const dados = JSON.parse(await fs.promises.readFile(caminho, 'utf8'));

    this.jobs = new Map(dados.jobs);
    this.fila = dados.fila;
    this.estados.pending = new Set(dados.estados.pending);
    this.estados.running = new Set(dados.estados.running);
    this.estados.completed = new Set(dados.estados.completed);
    this.estados.failed = new Set(dados.estados.failed);
    this.idCounter = dados.idCounter;
}

Para salvamento incremental, implemente um append-only log:

async registrarOperacao(operacao) {
    const linha = JSON.stringify({ timestamp: Date.now(), ...operacao }) + '\n';
    await fs.promises.appendFile('./fila_log.txt', linha);
}

7. Exemplo prático: fila de envio de e-mails

const fila = new FilaJobs();

// Registrar worker para tipo 'email'
fila.registrarWorker('email', async (dados) => {
    console.log(`Enviando email para ${dados.destinatario}...`);
    await new Promise(resolve => setTimeout(resolve, 1000));

    if (Math.random() < 0.2) {
        throw new Error('Falha no envio SMTP');
    }

    return { enviado: true, timestamp: Date.now() };
});

// Adicionar jobs
fila.enqueue('email', { 
    destinatario: 'usuario1@exemplo.com', 
    assunto: 'Bem-vindo!', 
    corpo: 'Olá, seja bem-vindo ao sistema.' 
});

fila.enqueue('email', { 
    destinatario: 'usuario2@exemplo.com', 
    assunto: 'Confirmação', 
    corpo: 'Sua conta foi confirmada.' 
});

fila.enqueue('email', { 
    destinatario: 'usuario3@exemplo.com', 
    assunto: 'Notificação', 
    corpo: 'Você tem uma nova mensagem.' 
});

fila.enqueue('email', { 
    destinatario: 'usuario4@exemplo.com', 
    assunto: 'Alerta', 
    corpo: 'Seu plano está próximo do limite.' 
});

fila.enqueue('email', { 
    destinatario: 'usuario5@exemplo.com', 
    assunto: 'Recibo', 
    corpo: 'Segue em anexo o recibo da compra.' 
});

// Eventos
fila.on('job:completed', (job) => {
    console.log(`Job ${job.id} concluído: email para ${job.dados.destinatario}`);
});

fila.on('job:failed', (job) => {
    console.error(`Job ${job.id} falhou após ${job.tentativas} tentativas`);
});

fila.on('job:retry', (job) => {
    console.log(`Job ${job.id} será retentado (tentativa ${job.tentativas})`);
});

// Iniciar processamento com concorrência 2
fila.iniciarConsumidor(2);

// Monitorar após 5 segundos
setTimeout(() => {
    console.log('\nMétricas finais:', fila.obterMetricas());
    fila.pararConsumidor();
}, 6000);

8. Limitações e quando migrar para soluções robustas

A implementação in-memory apresenta limitações significativas:

  • Garantias de entrega: apenas at-most-once; jobs podem ser perdidos em falhas
  • Concorrência: não suporta múltiplos processos ou servidores
  • Persistência: snapshots manuais não garantem consistência transacional
  • Escalabilidade: limitada à memória de um único processo

Critérios para migrar para soluções robustas:

Cenário Solução recomendada
Necessidade de persistência confiável Redis Bull
Alta throughput e particionamento Apache Kafka
Roteamento complexo de mensagens RabbitMQ
Agendamento avançado Sidekiq (Ruby) ou Celery (Python)

Sinais de que é hora de migrar:
- Perda de jobs causa impacto financeiro
- Necessidade de processamento distribuído
- Volume excede 10.000 jobs/minuto
- Requisitos de at-least-once delivery

A fila simples in-memory é uma excelente ferramenta para prototipação, sistemas internos com baixo volume, ou como componente de testes. Para produção crítica, invista em soluções como Bull (Redis), RabbitMQ ou Kafka, que oferecem garantias robustas de entrega, persistência e escalabilidade.

Referências