Stream trait e processamento assíncrono de dados
1. Introdução ao Stream trait
O Stream trait é a abstração fundamental do ecossistema Rust para processamento assíncrono de sequências de valores. Assim como Iterator permite iterar sobre coleções de forma síncrona, Stream permite consumir valores que chegam ao longo do tempo de forma assíncrona.
A definição central do trait é surpreendentemente simples:
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
As diferenças cruciais entre Stream e Iterator são:
- Lazy evaluation vs async polling: Enquanto Iterator produz valores sob demanda, Stream requer que o executor asíncrono agende o polling
- Contexto assíncrono: poll_next recebe um Context para registro de wakers
- Pin: A auto-referência é necessária para segurança em operações assíncronas
2. Implementando Stream manualmente
Vamos implementar um gerador assíncrono de números Fibonacci:
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
struct FibonacciStream {
current: u64,
next: u64,
state: FibonacciState,
}
enum FibonacciState {
Ready,
Waiting,
Done,
}
impl FibonacciStream {
fn new() -> Self {
FibonacciStream {
current: 0,
next: 1,
state: FibonacciState::Ready,
}
}
}
impl Stream for FibonacciStream {
type Item = u64;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.state {
FibonacciState::Ready => {
let result = this.current;
let new_next = this.current.checked_add(this.next);
match new_next {
Some(sum) => {
this.current = this.next;
this.next = sum;
Poll::Ready(Some(result))
}
None => {
this.state = FibonacciState::Done;
Poll::Ready(Some(result))
}
}
}
FibonacciState::Waiting => {
this.state = FibonacciState::Ready;
Poll::Pending
}
FibonacciState::Done => Poll::Ready(None),
}
}
}
3. Combinadores e operações com Stream
O trait StreamExt fornece métodos de extensão poderosos:
use futures::StreamExt;
use tokio_stream::StreamExt as _;
#[tokio::main]
async fn main() {
let fib = FibonacciStream::new();
// Combinadores básicos
let processed: Vec<u64> = fib
.take(10)
.map(|x| x * 2)
.filter(|x| x % 3 == 0)
.collect()
.await;
println!("Processados: {:?}", processed);
// Combinação de streams
let stream1 = futures::stream::iter(vec![1, 2, 3]);
let stream2 = futures::stream::iter(vec![4, 5, 6]);
let merged = stream1.merge(stream2);
let zipped = stream1.zip(stream2);
let chained = stream1.chain(stream2);
}
4. Processamento com backpressure e buffers
O controle de fluxo é essencial em pipelines assíncronos:
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn process_item(item: u32) -> u32 {
sleep(Duration::from_millis(100)).await;
item * 2
}
#[tokio::main]
async fn main() {
let items = stream::iter(0..100);
// Buffer com paralelismo controlado
let buffered: Vec<u32> = items
.map(|x| process_item(x))
.buffer_unordered(10) // Máximo 10 operações concorrentes
.collect()
.await;
// Processamento concorrente com for_each_concurrent
stream::iter(0..50)
.for_each_concurrent(5, |item| async move {
let result = process_item(item).await;
println!("Processado: {}", result);
})
.await;
}
5. Streams infinitos e cancelamento
Streams infinitos requerem cuidado especial com cancelamento:
use futures::StreamExt;
use tokio::time::{interval, Duration, timeout};
use tokio_stream::wrappers::IntervalStream;
async fn infinite_stream() {
let mut interval_stream = IntervalStream::new(interval(Duration::from_millis(100)));
// Timeout em cada item
while let Some(item) = interval_stream.next().await {
match timeout(Duration::from_millis(50), async {
// Operação que pode demorar
tokio::time::sleep(Duration::from_millis(30)).await;
}).await {
Ok(_) => println!("Item processado: {:?}", item),
Err(_) => println!("Timeout no item"),
}
}
}
// Cancelamento seguro com select!
use tokio::select;
async fn process_with_cancellation() {
let mut stream = IntervalStream::new(interval(Duration::from_millis(100)));
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancelled = cancel_token.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
cancelled.cancel();
});
loop {
select! {
_ = cancel_token.cancelled() => {
println!("Operação cancelada");
break;
}
Some(_) = stream.next() => {
println!("Processando...");
}
}
}
}
6. Integração com o ecossistema Tokio
Tokio oferece wrappers para converter suas primitivas em Streams:
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
// Convertendo canal mpsc em Stream
let (tx, rx) = mpsc::channel::<String>(32);
let mut stream = ReceiverStream::new(rx);
tokio::spawn(async move {
tx.send("Hello".to_string()).await.unwrap();
tx.send("World".to_string()).await.unwrap();
});
while let Some(msg) = stream.next().await {
println!("Recebido: {}", msg);
}
// Lendo de socket TCP como Stream
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
incoming
.take(10)
.for_each_concurrent(5, |stream| async move {
match stream {
Ok(mut stream) => {
use tokio::io::AsyncReadExt;
let mut buf = [0; 1024];
let n = stream.read(&mut buf).await.unwrap();
println!("Recebido {} bytes", n);
}
Err(e) => eprintln!("Erro: {}", e),
}
})
.await;
}
7. Tratamento de erros em pipelines de Stream
Erros em Streams assíncronos requerem estratégias específicas:
use futures::stream::{self, StreamExt};
use thiserror::Error;
#[derive(Error, Debug)]
enum ProcessingError {
#[error("Falha no processamento: {0}")]
Processing(String),
#[error("Timeout")]
Timeout,
}
async fn fallible_operation(x: u32) -> Result<u32, ProcessingError> {
if x % 3 == 0 {
Err(ProcessingError::Processing(format!("Erro em {}", x)))
} else {
Ok(x * 2)
}
}
#[tokio::main]
async fn main() {
let items = stream::iter(0..20);
// Propagação de erros
let results: Vec<Result<u32, ProcessingError>> = items
.map(|x| fallible_operation(x))
.collect()
.await;
// Estratégias de recuperação
let successful: Vec<u32> = stream::iter(0..20)
.map(|x| fallible_operation(x))
.filter_map(|result| async move {
match result {
Ok(val) => Some(val),
Err(_) => None, // Skip errors
}
})
.collect()
.await;
// Combinadores de erro
stream::iter(0..20)
.map(|x| fallible_operation(x))
.map_err(|e| {
eprintln!("Erro encontrado: {}", e);
e
})
.for_each(|result| async move {
if let Ok(val) = result {
println!("Sucesso: {}", val);
}
})
.await;
}
8. Padrões avançados e boas práticas
Para performance e manutenibilidade:
use futures::StreamExt;
use std::pin::Pin;
use std::task::{Context, Poll};
// Stream infinito com estado mutável
struct CounterStream {
count: u64,
}
impl Stream for CounterStream {
type Item = u64;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
this.count += 1;
Poll::Ready(Some(this.count))
}
}
// Evitando alocações desnecessárias
fn create_stream() -> impl Stream<Item = u32> {
futures::stream::iter(0..100)
.map(|x| x * 2)
.filter(|x| *x > 50)
}
#[tokio::main]
async fn main() {
// Preferir impl Stream a Box::pin quando possível
let mut stream = create_stream();
while let Some(item) = stream.next().await {
println!("Item: {}", item);
}
// Debugging: Use tracing para profiling
tracing_subscriber::fmt::init();
let stream = futures::stream::iter(0..100)
.inspect(|x| tracing::debug!("Processando {}", x));
let result: Vec<_> = stream.collect().await;
println!("Resultado final: {:?}", result.len());
}
O Stream trait é uma ferramenta poderosa para processamento assíncrono em Rust. Combinado com o ecossistema Tokio, permite construir pipelines de dados complexos, resilientes e performáticos. A chave está em entender o modelo de polling assíncrono, gerenciar corretamente o estado interno com Pin, e utilizar os combinadores apropriados para cada cenário.
Referências
- Stream trait documentation (Tokio) — Documentação oficial do trait StreamExt com todos os combinadores disponíveis
- Futures crate documentation — Documentação completa do módulo stream da crate futures
- Tokio Stream tutorial — Tutorial oficial do Tokio sobre uso de Streams no ecossistema Tokio
- Async Processing with Streams (Rust Book) — Capítulo do Async Book sobre processamento assíncrono com Streams
- Rust Stream Combinators Guide — Guia prático sobre combinadores de Stream em Rust com exemplos detalhados
- Building Async Pipelines with Streams — Artigo técnico sobre construção de pipelines assíncronos com Streams em Rust