Padrão fan-out e fan-in
1. Introdução ao padrão fan-out/fan-in
O padrão fan-out e fan-in é uma técnica de concorrência em Go que permite distribuir trabalho entre múltiplas goroutines e depois agregar os resultados. O fan-out consiste em dividir tarefas de um único canal de entrada para vários workers paralelos, enquanto o fan-in combina múltiplos canais de saída em um único canal consolidado.
A diferença fundamental está na direção do fluxo: fan-out expande o processamento (um canal para muitos workers), fan-in contrai os resultados (muitos canais para um consumidor). Casos de uso comuns incluem processamento paralelo de lotes de dados, agregação de resultados de APIs externas, e transformação concorrente de streams.
2. Fan-out: Distribuindo trabalho entre múltiplas goroutines
O fan-out começa com um canal de entrada compartilhado. Múltiplas goroutines workers consomem desse mesmo canal, cada uma processando itens independentemente:
func fanOut(input <-chan int, numWorkers int) []<-chan int {
channels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
channels[i] = worker(input, i)
}
return channels
}
func worker(input <-chan int, id int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for value := range input {
result := value * 2 // processamento
output <- result
fmt.Printf("Worker %d processou %d\n", id, value)
}
}()
return output
}
Cada worker possui seu próprio canal de saída. Para garantir finalização correta, usamos defer close(output) e iteramos até o canal de entrada ser fechado.
3. Fan-in: Agregando resultados de múltiplas fontes
O fan-in multiplexa vários canais de saída em um único canal. Implementamos com goroutines dedicadas que escutam cada canal de entrada:
func fanIn(channels ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for value := range c {
output <- value
}
}(ch)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
O sync.WaitGroup coordena a coleta: quando todas as goroutines de merge terminam, fechamos o canal de saída.
4. Exemplo prático completo: Processamento de números
Vamos processar uma lista de números calculando seus quadrados em paralelo:
package main
import (
"fmt"
"sync"
)
func generateNumbers(numbers []int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range numbers {
out <- n
}
}()
return out
}
func squareWorker(input <-chan int, id int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for num := range input {
result := num * num
fmt.Printf("Worker %d: %d² = %d\n", id, num, result)
output <- result
}
}()
return output
}
func merge(channels ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
output <- val
}
}(ch)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
func main() {
numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
// Fan-out: distribui números entre 3 workers
input := generateNumbers(numbers)
channels := fanOut(input, 3)
// Fan-in: agrega resultados
results := merge(channels...)
// Consome resultados
for result := range results {
fmt.Printf("Resultado: %d\n", result)
}
}
func fanOut(input <-chan int, numWorkers int) []<-chan int {
channels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
channels[i] = squareWorker(input, i+1)
}
return channels
}
A saída exibe os resultados na ordem de chegada, não necessariamente sequencial.
5. Gerenciamento de erros e cancelamento
Para propagar erros, usamos um canal de erro separado:
type Result struct {
Value int
Err error
}
func safeWorker(input <-chan int, ctx context.Context) <-chan Result {
output := make(chan Result)
go func() {
defer close(output)
for num := range input {
select {
case <-ctx.Done():
output <- Result{Err: ctx.Err()}
return
default:
if num < 0 {
output <- Result{Err: fmt.Errorf("número negativo: %d", num)}
continue
}
output <- Result{Value: num * num}
}
}
}()
return output
}
O context.Context permite cancelamento antecipado. O padrão de canal de erro separado evita bloqueios, pois o consumidor pode tratar erros sem interromper o fluxo principal.
6. Considerações de desempenho e boas práticas
O número ideal de workers depende dos recursos do sistema. Uma boa prática é usar runtime.GOMAXPROCS(0) como base:
numWorkers := runtime.GOMAXPROCS(0) // número de CPUs lógicas
Para evitar vazamento de goroutines:
- Sempre feche canais quando o produtor terminar
- Use defer close() em goroutines que produzem dados
- Garanta que consumidores esvaziem canais até o fechamento
- Utilize sync.WaitGroup para sincronizar finalização
Limpeza graciosa envolve:
func gracefulShutdown(input chan int, cancel context.CancelFunc) {
// Sinal para workers pararem
cancel()
// Fecha canal de entrada
close(input)
// Aguarda workers finalizarem (via WaitGroup)
}
7. Comparação com alternativas
Fan-out/fan-in vs. Pipeline de stages:
- Pipeline é linear (stage1 → stage2 → stage3), cada stage pode ter múltiplos workers
- Fan-out/fan-in é mais flexível para distribuição não-linear e agregação arbitrária
- Pipeline é melhor para transformações sequenciais obrigatórias
sync.WaitGroup vs. canais para sincronização:
- WaitGroup é mais simples para coordenar finalização de grupos
- Canais são melhores para comunicação de dados e sinais
- Combinar ambos (como no fan-in) é prática comum
Trade-offs:
- Fan-out/fan-in adiciona complexidade de gerenciamento de canais
- Oferece controle fino sobre concorrência
- Pode ser substituído por errgroup.Group para casos mais simples
- Exige cuidado com deadlocks e vazamento de goroutines
O padrão é ideal quando você precisa processar dados independentes em paralelo e depois consolidar resultados, oferecendo um equilíbrio entre performance e clareza de código.
Referências
- Go Concurrency Patterns: Pipelines and cancellation — Artigo oficial da equipe Go sobre pipelines, fan-out e fan-in com exemplos práticos
- Go by Example: Worker Pools — Tutorial interativo demonstrando implementação de pool de workers com fan-out
- Concurrency in Go: Fan-Out and Fan-In Pattern — Explicação detalhada com diagramas e código comentado
- Understanding Fan-Out and Fan-In Patterns in Go — Artigo no Dev.to com exemplos de merge e cancelamento
- Go Concurrency Patterns: Context — Documentação oficial sobre uso de context para cancelamento em padrões concorrentes
- Advanced Go Concurrency Patterns — Palestra de Rob Pike sobre padrões avançados incluindo fan-out/fan-in
- Fan-Out/Fan-In Pattern in Go: A Complete Guide — Guia completo com implementação passo a passo e tratamento de erros