Canais: mpsc e comunicação entre threads
1. Introdução aos Canais em Rust
Em sistemas concorrentes, um dos maiores desafios é permitir que threads troquem informações sem incorrer em condições de corrida ou acesso simultâneo a dados mutáveis. Rust oferece uma abordagem elegante para esse problema através do módulo std::sync::mpsc (Multiple Producer, Single Consumer).
O conceito de canal é simples: um transmissor (Sender) envia mensagens para um receptor (Receiver), que as recebe em ordem FIFO. Essa abstração elimina a necessidade de compartilhar estado mutável entre threads — em vez disso, os dados são enviados através do canal, transferindo a propriedade (ownership) de uma thread para outra.
O módulo mpsc implementa o padrão "múltiplos produtores, um único consumidor", permitindo que várias threads enviem mensagens para uma única thread receptora. Essa arquitetura é ideal para pipelines de processamento, sistemas de eventos e distribuição de tarefas.
2. Criando e Utilizando um Canal Básico
A função mpsc::channel() retorna um par (Sender, Receiver). Vamos criar um exemplo simples com uma thread produtora e uma consumidora:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
let mensagens = vec!["Olá", "do", "produtor", "Rust!"];
for msg in mensagens {
sender.send(msg).unwrap();
thread::sleep(Duration::from_millis(500));
}
});
for recebida in receiver {
println!("Recebido: {}", recebida);
}
}
O método recv() bloqueia a thread atual até receber uma mensagem, enquanto try_recv() tenta receber sem bloquear, retornando Result<T, TryRecvError>. Quando o Sender é dropado (todas as threads produtoras encerradas), o canal é fechado e receiver.iter() ou receiver.recv() retornam None/Err(RecvError).
3. Produtores Múltiplos: Clonando o Sender
O Sender<T> implementa Clone, permitindo que múltiplas threads compartilhem o mesmo canal de saída:
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
let mut handles = vec![];
for id in 0..4 {
let thread_sender = sender.clone();
handles.push(thread::spawn(move || {
thread_sender.send(format!("Mensagem da thread {}", id)).unwrap();
}));
}
// Drop do sender original para fechar o canal quando todas as threads terminarem
drop(sender);
for recebida in receiver {
println!("{}", recebida);
}
for handle in handles {
handle.join().unwrap();
}
}
Cada thread produtora clona o Sender e envia suas mensagens. O receptor coleta todas as mensagens em ordem de chegada. É crucial dropar o Sender original após distribuir as cópias para que o canal seja fechado corretamente quando todas as threads terminarem.
4. Tipos de Mensagens e Tratamento de Erros
Canais podem transportar qualquer tipo que implemente Send. Structs e enums são perfeitamente válidos:
use std::sync::mpsc;
use std::thread;
#[derive(Debug)]
enum Comando {
Soma(i32, i32),
Subtracao(i32, i32),
Sair,
}
fn main() {
let (sender, receiver) = mpsc::channel::<Comando>();
thread::spawn(move || {
loop {
match receiver.recv() {
Ok(Comando::Soma(a, b)) => println!("Soma: {}", a + b),
Ok(Comando::Subtracao(a, b)) => println!("Subtração: {}", a - b),
Ok(Comando::Sair) => {
println!("Encerrando...");
break;
}
Err(_) => {
println!("Canal fechado");
break;
}
}
}
});
sender.send(Comando::Soma(10, 5)).unwrap();
sender.send(Comando::Subtracao(20, 8)).unwrap();
sender.send(Comando::Sair).unwrap();
}
Erros comuns incluem SendError<T> (quando o receptor foi dropado) e RecvError (quando o canal está vazio e todos os transmissores foram dropados). Sempre trate esses erros para evitar panics em produção.
5. Iteração sobre Mensagens e Controle de Fluxo
O método receiver.iter() cria um iterador que bloqueia esperando por novas mensagens e termina quando o canal é fechado:
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in 1..=5 {
sender.send(i).unwrap();
}
});
for valor in receiver.iter() {
println!("Processando: {}", valor);
}
}
Para controle de fluxo, Rust oferece sync_channel(n) que cria um canal com capacidade limitada. Quando o buffer está cheio, send() bloqueia até que haja espaço:
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::sync_channel(2); // buffer de 2 mensagens
thread::spawn(move || {
for i in 1..=5 {
sender.send(i).unwrap();
println!("Enviado: {}", i);
}
});
for valor in receiver {
println!("Recebido: {}", valor);
thread::sleep_ms(1000); // simula processamento lento
}
}
Canais assíncronos (channel()) têm buffer ilimitado, enquanto síncronos (sync_channel(n)) limitam o buffer e fornecem backpressure natural.
6. Padrões Avançados de Comunicação
Fan-in (múltiplos produtores, um consumidor) é o uso natural do mpsc, como vimos na seção 3. Fan-out (um produtor, múltiplos consumidores) requer um design diferente, já que o Receiver não pode ser clonado. Uma solução é usar Arc<Mutex<Receiver>> ou broadcast channels de crates externas.
Para cenários complexos, combine canais com Mutex<T> e Arc<T>:
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut handles = vec![];
for _ in 0..3 {
let receiver = Arc::clone(&receiver);
handles.push(thread::spawn(move || {
loop {
let msg = receiver.lock().unwrap().recv();
match msg {
Ok(valor) => println!("Thread processou: {}", valor),
Err(_) => break,
}
}
}));
}
for i in 0..10 {
sender.send(i).unwrap();
}
drop(sender);
for handle in handles {
handle.join().unwrap();
}
}
7. Boas Práticas e Armadilhas Comuns
Evitando deadlocks: Canais podem causar deadlock se o produtor e consumidor dependerem um do outro. Sempre projete pipelines lineares ou use timeouts com recv_timeout().
Gerenciamento de transmissores: Mantenha apenas as referências necessárias ao Sender. Drop transmissores não utilizados para que o canal seja fechado corretamente e o receptor não fique bloqueado indefinidamente.
Performance: Canais são geralmente mais eficientes que Mutex<T> para comunicação one-way, pois evitam contenção de lock. Para workloads intensivos, considere o uso de canais com buffer síncrono para limitar o consumo de memória.
Testes: Canais facilitam testes de concorrência — você pode simular produtores e consumidores em testes unitários sem threads reais, usando apenas o canal:
#[test]
fn test_canal() {
let (sender, receiver) = mpsc::channel();
sender.send(42).unwrap();
assert_eq!(receiver.recv().unwrap(), 42);
}
Canais são uma ferramenta poderosa no arsenal de concorrência do Rust. Quando combinados com o sistema de tipos e ownership, eles permitem comunicação segura e eficiente entre threads sem os riscos de dados compartilhados. Dominar o padrão mpsc é essencial para construir sistemas concorrentes robustos em Rust.
Referências
- Documentação oficial do módulo std::sync::mpsc — Referência completa com todos os tipos, traits e funções do módulo mpsc.
- The Rust Programming Language - Capítulo 16: Canais — Seção do livro oficial sobre passagem de mensagens com canais.
- Rust by Example: Canais — Exemplos práticos e didáticos de uso de canais em Rust.
- Rust Cookbook: Comunicação entre threads com canais — Receitas práticas para comunicação entre threads usando canais.
- Rust Nomicon: Canais e sincronização — Discussão avançada sobre sincronização e canais no contexto de código inseguro.