Padrão fan-out e fan-in

1. Introdução ao padrão fan-out/fan-in

O padrão fan-out e fan-in é uma técnica de concorrência em Go que permite distribuir trabalho entre múltiplas goroutines e depois agregar os resultados. O fan-out consiste em dividir tarefas de um único canal de entrada para vários workers paralelos, enquanto o fan-in combina múltiplos canais de saída em um único canal consolidado.

A diferença fundamental está na direção do fluxo: fan-out expande o processamento (um canal para muitos workers), fan-in contrai os resultados (muitos canais para um consumidor). Casos de uso comuns incluem processamento paralelo de lotes de dados, agregação de resultados de APIs externas, e transformação concorrente de streams.

2. Fan-out: Distribuindo trabalho entre múltiplas goroutines

O fan-out começa com um canal de entrada compartilhado. Múltiplas goroutines workers consomem desse mesmo canal, cada uma processando itens independentemente:

func fanOut(input <-chan int, numWorkers int) []<-chan int {
    channels := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        channels[i] = worker(input, i)
    }
    return channels
}

func worker(input <-chan int, id int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for value := range input {
            result := value * 2 // processamento
            output <- result
            fmt.Printf("Worker %d processou %d\n", id, value)
        }
    }()
    return output
}

Cada worker possui seu próprio canal de saída. Para garantir finalização correta, usamos defer close(output) e iteramos até o canal de entrada ser fechado.

3. Fan-in: Agregando resultados de múltiplas fontes

O fan-in multiplexa vários canais de saída em um único canal. Implementamos com goroutines dedicadas que escutam cada canal de entrada:

func fanIn(channels ...<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for value := range c {
                output <- value
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(output)
    }()

    return output
}

O sync.WaitGroup coordena a coleta: quando todas as goroutines de merge terminam, fechamos o canal de saída.

4. Exemplo prático completo: Processamento de números

Vamos processar uma lista de números calculando seus quadrados em paralelo:

package main

import (
    "fmt"
    "sync"
)

func generateNumbers(numbers []int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range numbers {
            out <- n
        }
    }()
    return out
}

func squareWorker(input <-chan int, id int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for num := range input {
            result := num * num
            fmt.Printf("Worker %d: %d² = %d\n", id, num, result)
            output <- result
        }
    }()
    return output
}

func merge(channels ...<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                output <- val
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(output)
    }()

    return output
}

func main() {
    numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

    // Fan-out: distribui números entre 3 workers
    input := generateNumbers(numbers)
    channels := fanOut(input, 3)

    // Fan-in: agrega resultados
    results := merge(channels...)

    // Consome resultados
    for result := range results {
        fmt.Printf("Resultado: %d\n", result)
    }
}

func fanOut(input <-chan int, numWorkers int) []<-chan int {
    channels := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        channels[i] = squareWorker(input, i+1)
    }
    return channels
}

A saída exibe os resultados na ordem de chegada, não necessariamente sequencial.

5. Gerenciamento de erros e cancelamento

Para propagar erros, usamos um canal de erro separado:

type Result struct {
    Value int
    Err   error
}

func safeWorker(input <-chan int, ctx context.Context) <-chan Result {
    output := make(chan Result)
    go func() {
        defer close(output)
        for num := range input {
            select {
            case <-ctx.Done():
                output <- Result{Err: ctx.Err()}
                return
            default:
                if num < 0 {
                    output <- Result{Err: fmt.Errorf("número negativo: %d", num)}
                    continue
                }
                output <- Result{Value: num * num}
            }
        }
    }()
    return output
}

O context.Context permite cancelamento antecipado. O padrão de canal de erro separado evita bloqueios, pois o consumidor pode tratar erros sem interromper o fluxo principal.

6. Considerações de desempenho e boas práticas

O número ideal de workers depende dos recursos do sistema. Uma boa prática é usar runtime.GOMAXPROCS(0) como base:

numWorkers := runtime.GOMAXPROCS(0) // número de CPUs lógicas

Para evitar vazamento de goroutines:
- Sempre feche canais quando o produtor terminar
- Use defer close() em goroutines que produzem dados
- Garanta que consumidores esvaziem canais até o fechamento
- Utilize sync.WaitGroup para sincronizar finalização

Limpeza graciosa envolve:

func gracefulShutdown(input chan int, cancel context.CancelFunc) {
    // Sinal para workers pararem
    cancel()
    // Fecha canal de entrada
    close(input)
    // Aguarda workers finalizarem (via WaitGroup)
}

7. Comparação com alternativas

Fan-out/fan-in vs. Pipeline de stages:
- Pipeline é linear (stage1 → stage2 → stage3), cada stage pode ter múltiplos workers
- Fan-out/fan-in é mais flexível para distribuição não-linear e agregação arbitrária
- Pipeline é melhor para transformações sequenciais obrigatórias

sync.WaitGroup vs. canais para sincronização:
- WaitGroup é mais simples para coordenar finalização de grupos
- Canais são melhores para comunicação de dados e sinais
- Combinar ambos (como no fan-in) é prática comum

Trade-offs:
- Fan-out/fan-in adiciona complexidade de gerenciamento de canais
- Oferece controle fino sobre concorrência
- Pode ser substituído por errgroup.Group para casos mais simples
- Exige cuidado com deadlocks e vazamento de goroutines

O padrão é ideal quando você precisa processar dados independentes em paralelo e depois consolidar resultados, oferecendo um equilíbrio entre performance e clareza de código.

Referências