Worker pools e job queues em Go

1. Fundamentos de Worker Pools em Go

O padrão worker pool em Go é uma aplicação direta do conceito de fan-out/fan-in: múltiplas goroutines (workers) consomem tarefas de um canal compartilhado (fan-out) e enviam resultados para um canal de saída (fan-in). Esse modelo é nativo da linguagem, graças à sua filosofia de "não se comunique por memória compartilhada; compartilhe memória se comunicando".

O papel central das goroutines é permitir execução concorrente leve. Diferente de threads de SO, goroutines têm pilhas pequenas (alguns KB) e são multiplexadas em poucas threads do sistema operacional pelo runtime de Go. Isso torna viável criar milhares de workers sem esgotar recursos.

Os buffered channels atuam como fila de tarefas interna. Um canal com buffer de tamanho N permite que o dispatcher envie até N jobs sem bloquear, mesmo que todos os workers estejam ocupados. O sync.WaitGroup é essencial para coordenar o término dos workers: cada worker incrementa o contador ao iniciar e decrementa ao finalizar, permitindo que a função principal aguarde a conclusão de todos.

package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker %d processando job %d\n", id, job)
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    var wg sync.WaitGroup

    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, &wg)
    }

    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    wg.Wait()
}

2. Implementando uma Job Queue Simples

Vamos definir uma estrutura Job com ID e Payload. O dispatcher cria os jobs e os envia para o canal. Os workers consomem desse canal e processam cada job.

type Job struct {
    ID      int
    Payload string
}

func dispatcher(jobs chan<- Job, numJobs int) {
    for i := 1; i <= numJobs; i++ {
        jobs <- Job{ID: i, Payload: fmt.Sprintf("tarefa-%d", i)}
    }
    close(jobs)
}

func worker(id int, jobs <-chan Job, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker %d: processando %s\n", id, job.Payload)
    }
}

3. Gerenciamento do Ciclo de Vida dos Workers

Para um graceful shutdown, usamos context.Context com cancelamento. Quando o contexto é cancelado (por exemplo, por um sinal do SO), os workers finalizam o job atual e param de aceitar novos.

func workerCtx(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                return // canal fechado
            }
            // processa o job
            results <- Result{JobID: job.ID, Err: nil}
        case <-ctx.Done():
            fmt.Printf("Worker %d: recebeu sinal de parada\n", id)
            return
        }
    }
}

Para capturar sinais do SO:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigCh
        fmt.Println("Sinal recebido, iniciando shutdown...")
        cancel()
    }()

    // ... iniciar workers e dispatcher
}

4. Coleta de Resultados e Tratamento de Erros

Um canal de Result permite que os workers comuniquem resultados e erros de volta à goroutine principal.

type Result struct {
    JobID int
    Err   error
    Data  string
}

func workerResult(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        data, err := processJob(job)
        results <- Result{JobID: job.ID, Err: err, Data: data}
    }
}

Para jobs com falha, podemos implementar retry:

func processWithRetry(job Job, maxRetries int) (string, error) {
    var err error
    for attempt := 1; attempt <= maxRetries; attempt++ {
        var data string
        data, err = processJob(job)
        if err == nil {
            return data, nil
        }
        time.Sleep(time.Duration(attempt) * time.Second)
    }
    return "", fmt.Errorf("job %d falhou após %d tentativas: %w", job.ID, maxRetries, err)
}

Jobs que falham definitivamente podem ser enviados para uma dead letter queue (outro canal ou fila externa) para análise posterior.

5. Limitação de Taxa e Backpressure

O semaphore pattern com canal bufferizado limita o número de execuções concorrentes:

type WorkerPool struct {
    semaphore chan struct{}
    jobs      chan Job
}

func NewWorkerPool(maxWorkers int) *WorkerPool {
    return &WorkerPool{
        semaphore: make(chan struct{}, maxWorkers),
        jobs:      make(chan Job, 100),
    }
}

func (wp *WorkerPool) Submit(job Job) {
    wp.jobs <- job
}

func (wp *WorkerPool) Start(ctx context.Context) {
    for {
        select {
        case job := <-wp.jobs:
            wp.semaphore <- struct{}{} // adquire slot
            go func(j Job) {
                defer func() { <-wp.semaphore }() // libera slot
                processJob(j)
            }(job)
        case <-ctx.Done():
            return
        }
    }
}

Para controle de throughput com rate.Limiter:

import "golang.org/x/time/rate"

limiter := rate.NewLimiter(rate.Limit(10), 1) // 10 jobs por segundo

for job := range jobs {
    err := limiter.Wait(ctx)
    if err != nil {
        break
    }
    go processJob(job)
}

6. Integração com Filas Externas (Redis, RabbitMQ, NATS)

Com Redis, usamos BRPOP para bloquear até que um job esteja disponível:

import "github.com/go-redis/redis/v8"

func redisWorker(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        result, err := rdb.BRPop(ctx, 0, "job_queue").Result()
        if err != nil {
            return
        }
        var job Job
        json.Unmarshal([]byte(result[1]), &job)
        processJob(job)
    }
}

A serialização com JSON permite interoperabilidade. Diferente de canais em memória, filas externas persistem dados mesmo se a aplicação reiniciar, oferecendo resiliência e distribuição entre múltiplas instâncias.

7. Padrões Avançados e Boas Práticas

Worker pool dinâmico — ajuste o número de workers baseado na carga:

type DynamicPool struct {
    mu       sync.Mutex
    workers  int
    maxWorkers int
    jobs     chan Job
}

func (dp *DynamicPool) Scale(workers int) {
    dp.mu.Lock()
    defer dp.mu.Unlock()
    // ajusta workers dinamicamente
}

Pipeline de workers — encadeie estágios com canais tipados:

func stage1(input <-chan Job) <-chan Job {
    output := make(chan Job)
    go func() {
        for job := range input {
            job.Payload = strings.ToUpper(job.Payload)
            output <- job
        }
        close(output)
    }()
    return output
}

Monitoramento — exponha métricas com expvar ou Prometheus:

var (
    jobsProcessed = expvar.NewInt("jobs_processed")
    jobsFailed    = expvar.NewInt("jobs_failed")
)

func monitoredWorker(jobs <-chan Job) {
    for job := range jobs {
        err := processJob(job)
        if err != nil {
            jobsFailed.Add(1)
        } else {
            jobsProcessed.Add(1)
        }
    }
}

Worker pools bem projetados em Go oferecem concorrência eficiente, controle de recursos e resiliência. A escolha entre fila em memória ou externa depende dos requisitos de persistência, distribuição e tolerância a falhas do sistema.

Referências