Canais: mpsc e comunicação entre threads

1. Introdução aos Canais em Rust

Em sistemas concorrentes, um dos maiores desafios é permitir que threads troquem informações sem incorrer em condições de corrida ou acesso simultâneo a dados mutáveis. Rust oferece uma abordagem elegante para esse problema através do módulo std::sync::mpsc (Multiple Producer, Single Consumer).

O conceito de canal é simples: um transmissor (Sender) envia mensagens para um receptor (Receiver), que as recebe em ordem FIFO. Essa abstração elimina a necessidade de compartilhar estado mutável entre threads — em vez disso, os dados são enviados através do canal, transferindo a propriedade (ownership) de uma thread para outra.

O módulo mpsc implementa o padrão "múltiplos produtores, um único consumidor", permitindo que várias threads enviem mensagens para uma única thread receptora. Essa arquitetura é ideal para pipelines de processamento, sistemas de eventos e distribuição de tarefas.

2. Criando e Utilizando um Canal Básico

A função mpsc::channel() retorna um par (Sender, Receiver). Vamos criar um exemplo simples com uma thread produtora e uma consumidora:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        let mensagens = vec!["Olá", "do", "produtor", "Rust!"];
        for msg in mensagens {
            sender.send(msg).unwrap();
            thread::sleep(Duration::from_millis(500));
        }
    });

    for recebida in receiver {
        println!("Recebido: {}", recebida);
    }
}

O método recv() bloqueia a thread atual até receber uma mensagem, enquanto try_recv() tenta receber sem bloquear, retornando Result<T, TryRecvError>. Quando o Sender é dropado (todas as threads produtoras encerradas), o canal é fechado e receiver.iter() ou receiver.recv() retornam None/Err(RecvError).

3. Produtores Múltiplos: Clonando o Sender

O Sender<T> implementa Clone, permitindo que múltiplas threads compartilhem o mesmo canal de saída:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();
    let mut handles = vec![];

    for id in 0..4 {
        let thread_sender = sender.clone();
        handles.push(thread::spawn(move || {
            thread_sender.send(format!("Mensagem da thread {}", id)).unwrap();
        }));
    }

    // Drop do sender original para fechar o canal quando todas as threads terminarem
    drop(sender);

    for recebida in receiver {
        println!("{}", recebida);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

Cada thread produtora clona o Sender e envia suas mensagens. O receptor coleta todas as mensagens em ordem de chegada. É crucial dropar o Sender original após distribuir as cópias para que o canal seja fechado corretamente quando todas as threads terminarem.

4. Tipos de Mensagens e Tratamento de Erros

Canais podem transportar qualquer tipo que implemente Send. Structs e enums são perfeitamente válidos:

use std::sync::mpsc;
use std::thread;

#[derive(Debug)]
enum Comando {
    Soma(i32, i32),
    Subtracao(i32, i32),
    Sair,
}

fn main() {
    let (sender, receiver) = mpsc::channel::<Comando>();

    thread::spawn(move || {
        loop {
            match receiver.recv() {
                Ok(Comando::Soma(a, b)) => println!("Soma: {}", a + b),
                Ok(Comando::Subtracao(a, b)) => println!("Subtração: {}", a - b),
                Ok(Comando::Sair) => {
                    println!("Encerrando...");
                    break;
                }
                Err(_) => {
                    println!("Canal fechado");
                    break;
                }
            }
        }
    });

    sender.send(Comando::Soma(10, 5)).unwrap();
    sender.send(Comando::Subtracao(20, 8)).unwrap();
    sender.send(Comando::Sair).unwrap();
}

Erros comuns incluem SendError<T> (quando o receptor foi dropado) e RecvError (quando o canal está vazio e todos os transmissores foram dropados). Sempre trate esses erros para evitar panics em produção.

5. Iteração sobre Mensagens e Controle de Fluxo

O método receiver.iter() cria um iterador que bloqueia esperando por novas mensagens e termina quando o canal é fechado:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        for i in 1..=5 {
            sender.send(i).unwrap();
        }
    });

    for valor in receiver.iter() {
        println!("Processando: {}", valor);
    }
}

Para controle de fluxo, Rust oferece sync_channel(n) que cria um canal com capacidade limitada. Quando o buffer está cheio, send() bloqueia até que haja espaço:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::sync_channel(2); // buffer de 2 mensagens

    thread::spawn(move || {
        for i in 1..=5 {
            sender.send(i).unwrap();
            println!("Enviado: {}", i);
        }
    });

    for valor in receiver {
        println!("Recebido: {}", valor);
        thread::sleep_ms(1000); // simula processamento lento
    }
}

Canais assíncronos (channel()) têm buffer ilimitado, enquanto síncronos (sync_channel(n)) limitam o buffer e fornecem backpressure natural.

6. Padrões Avançados de Comunicação

Fan-in (múltiplos produtores, um consumidor) é o uso natural do mpsc, como vimos na seção 3. Fan-out (um produtor, múltiplos consumidores) requer um design diferente, já que o Receiver não pode ser clonado. Uma solução é usar Arc<Mutex<Receiver>> ou broadcast channels de crates externas.

Para cenários complexos, combine canais com Mutex<T> e Arc<T>:

use std::sync::{mpsc, Arc, Mutex};
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();
    let receiver = Arc::new(Mutex::new(receiver));
    let mut handles = vec![];

    for _ in 0..3 {
        let receiver = Arc::clone(&receiver);
        handles.push(thread::spawn(move || {
            loop {
                let msg = receiver.lock().unwrap().recv();
                match msg {
                    Ok(valor) => println!("Thread processou: {}", valor),
                    Err(_) => break,
                }
            }
        }));
    }

    for i in 0..10 {
        sender.send(i).unwrap();
    }
    drop(sender);

    for handle in handles {
        handle.join().unwrap();
    }
}

7. Boas Práticas e Armadilhas Comuns

Evitando deadlocks: Canais podem causar deadlock se o produtor e consumidor dependerem um do outro. Sempre projete pipelines lineares ou use timeouts com recv_timeout().

Gerenciamento de transmissores: Mantenha apenas as referências necessárias ao Sender. Drop transmissores não utilizados para que o canal seja fechado corretamente e o receptor não fique bloqueado indefinidamente.

Performance: Canais são geralmente mais eficientes que Mutex<T> para comunicação one-way, pois evitam contenção de lock. Para workloads intensivos, considere o uso de canais com buffer síncrono para limitar o consumo de memória.

Testes: Canais facilitam testes de concorrência — você pode simular produtores e consumidores em testes unitários sem threads reais, usando apenas o canal:

#[test]
fn test_canal() {
    let (sender, receiver) = mpsc::channel();
    sender.send(42).unwrap();
    assert_eq!(receiver.recv().unwrap(), 42);
}

Canais são uma ferramenta poderosa no arsenal de concorrência do Rust. Quando combinados com o sistema de tipos e ownership, eles permitem comunicação segura e eficiente entre threads sem os riscos de dados compartilhados. Dominar o padrão mpsc é essencial para construir sistemas concorrentes robustos em Rust.

Referências