Stream trait e processamento assíncrono de dados

1. Introdução ao Stream trait

O Stream trait é a abstração fundamental do ecossistema Rust para processamento assíncrono de sequências de valores. Assim como Iterator permite iterar sobre coleções de forma síncrona, Stream permite consumir valores que chegam ao longo do tempo de forma assíncrona.

A definição central do trait é surpreendentemente simples:

pub trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}

As diferenças cruciais entre Stream e Iterator são:
- Lazy evaluation vs async polling: Enquanto Iterator produz valores sob demanda, Stream requer que o executor asíncrono agende o polling
- Contexto assíncrono: poll_next recebe um Context para registro de wakers
- Pin: A auto-referência é necessária para segurança em operações assíncronas

2. Implementando Stream manualmente

Vamos implementar um gerador assíncrono de números Fibonacci:

use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;

struct FibonacciStream {
    current: u64,
    next: u64,
    state: FibonacciState,
}

enum FibonacciState {
    Ready,
    Waiting,
    Done,
}

impl FibonacciStream {
    fn new() -> Self {
        FibonacciStream {
            current: 0,
            next: 1,
            state: FibonacciState::Ready,
        }
    }
}

impl Stream for FibonacciStream {
    type Item = u64;

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        match this.state {
            FibonacciState::Ready => {
                let result = this.current;
                let new_next = this.current.checked_add(this.next);

                match new_next {
                    Some(sum) => {
                        this.current = this.next;
                        this.next = sum;
                        Poll::Ready(Some(result))
                    }
                    None => {
                        this.state = FibonacciState::Done;
                        Poll::Ready(Some(result))
                    }
                }
            }
            FibonacciState::Waiting => {
                this.state = FibonacciState::Ready;
                Poll::Pending
            }
            FibonacciState::Done => Poll::Ready(None),
        }
    }
}

3. Combinadores e operações com Stream

O trait StreamExt fornece métodos de extensão poderosos:

use futures::StreamExt;
use tokio_stream::StreamExt as _;

#[tokio::main]
async fn main() {
    let fib = FibonacciStream::new();

    // Combinadores básicos
    let processed: Vec<u64> = fib
        .take(10)
        .map(|x| x * 2)
        .filter(|x| x % 3 == 0)
        .collect()
        .await;

    println!("Processados: {:?}", processed);

    // Combinação de streams
    let stream1 = futures::stream::iter(vec![1, 2, 3]);
    let stream2 = futures::stream::iter(vec![4, 5, 6]);

    let merged = stream1.merge(stream2);
    let zipped = stream1.zip(stream2);
    let chained = stream1.chain(stream2);
}

4. Processamento com backpressure e buffers

O controle de fluxo é essencial em pipelines assíncronos:

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};

async fn process_item(item: u32) -> u32 {
    sleep(Duration::from_millis(100)).await;
    item * 2
}

#[tokio::main]
async fn main() {
    let items = stream::iter(0..100);

    // Buffer com paralelismo controlado
    let buffered: Vec<u32> = items
        .map(|x| process_item(x))
        .buffer_unordered(10) // Máximo 10 operações concorrentes
        .collect()
        .await;

    // Processamento concorrente com for_each_concurrent
    stream::iter(0..50)
        .for_each_concurrent(5, |item| async move {
            let result = process_item(item).await;
            println!("Processado: {}", result);
        })
        .await;
}

5. Streams infinitos e cancelamento

Streams infinitos requerem cuidado especial com cancelamento:

use futures::StreamExt;
use tokio::time::{interval, Duration, timeout};
use tokio_stream::wrappers::IntervalStream;

async fn infinite_stream() {
    let mut interval_stream = IntervalStream::new(interval(Duration::from_millis(100)));

    // Timeout em cada item
    while let Some(item) = interval_stream.next().await {
        match timeout(Duration::from_millis(50), async {
            // Operação que pode demorar
            tokio::time::sleep(Duration::from_millis(30)).await;
        }).await {
            Ok(_) => println!("Item processado: {:?}", item),
            Err(_) => println!("Timeout no item"),
        }
    }
}

// Cancelamento seguro com select!
use tokio::select;

async fn process_with_cancellation() {
    let mut stream = IntervalStream::new(interval(Duration::from_millis(100)));
    let cancel_token = tokio_util::sync::CancellationToken::new();
    let cancelled = cancel_token.clone();

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(2)).await;
        cancelled.cancel();
    });

    loop {
        select! {
            _ = cancel_token.cancelled() => {
                println!("Operação cancelada");
                break;
            }
            Some(_) = stream.next() => {
                println!("Processando...");
            }
        }
    }
}

6. Integração com o ecossistema Tokio

Tokio oferece wrappers para converter suas primitivas em Streams:

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    // Convertendo canal mpsc em Stream
    let (tx, rx) = mpsc::channel::<String>(32);
    let mut stream = ReceiverStream::new(rx);

    tokio::spawn(async move {
        tx.send("Hello".to_string()).await.unwrap();
        tx.send("World".to_string()).await.unwrap();
    });

    while let Some(msg) = stream.next().await {
        println!("Recebido: {}", msg);
    }

    // Lendo de socket TCP como Stream
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);

    incoming
        .take(10)
        .for_each_concurrent(5, |stream| async move {
            match stream {
                Ok(mut stream) => {
                    use tokio::io::AsyncReadExt;
                    let mut buf = [0; 1024];
                    let n = stream.read(&mut buf).await.unwrap();
                    println!("Recebido {} bytes", n);
                }
                Err(e) => eprintln!("Erro: {}", e),
            }
        })
        .await;
}

7. Tratamento de erros em pipelines de Stream

Erros em Streams assíncronos requerem estratégias específicas:

use futures::stream::{self, StreamExt};
use thiserror::Error;

#[derive(Error, Debug)]
enum ProcessingError {
    #[error("Falha no processamento: {0}")]
    Processing(String),
    #[error("Timeout")]
    Timeout,
}

async fn fallible_operation(x: u32) -> Result<u32, ProcessingError> {
    if x % 3 == 0 {
        Err(ProcessingError::Processing(format!("Erro em {}", x)))
    } else {
        Ok(x * 2)
    }
}

#[tokio::main]
async fn main() {
    let items = stream::iter(0..20);

    // Propagação de erros
    let results: Vec<Result<u32, ProcessingError>> = items
        .map(|x| fallible_operation(x))
        .collect()
        .await;

    // Estratégias de recuperação
    let successful: Vec<u32> = stream::iter(0..20)
        .map(|x| fallible_operation(x))
        .filter_map(|result| async move {
            match result {
                Ok(val) => Some(val),
                Err(_) => None, // Skip errors
            }
        })
        .collect()
        .await;

    // Combinadores de erro
    stream::iter(0..20)
        .map(|x| fallible_operation(x))
        .map_err(|e| {
            eprintln!("Erro encontrado: {}", e);
            e
        })
        .for_each(|result| async move {
            if let Ok(val) = result {
                println!("Sucesso: {}", val);
            }
        })
        .await;
}

8. Padrões avançados e boas práticas

Para performance e manutenibilidade:

use futures::StreamExt;
use std::pin::Pin;
use std::task::{Context, Poll};

// Stream infinito com estado mutável
struct CounterStream {
    count: u64,
}

impl Stream for CounterStream {
    type Item = u64;

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        this.count += 1;
        Poll::Ready(Some(this.count))
    }
}

// Evitando alocações desnecessárias
fn create_stream() -> impl Stream<Item = u32> {
    futures::stream::iter(0..100)
        .map(|x| x * 2)
        .filter(|x| *x > 50)
}

#[tokio::main]
async fn main() {
    // Preferir impl Stream a Box::pin quando possível
    let mut stream = create_stream();

    while let Some(item) = stream.next().await {
        println!("Item: {}", item);
    }

    // Debugging: Use tracing para profiling
    tracing_subscriber::fmt::init();

    let stream = futures::stream::iter(0..100)
        .inspect(|x| tracing::debug!("Processando {}", x));

    let result: Vec<_> = stream.collect().await;
    println!("Resultado final: {:?}", result.len());
}

O Stream trait é uma ferramenta poderosa para processamento assíncrono em Rust. Combinado com o ecossistema Tokio, permite construir pipelines de dados complexos, resilientes e performáticos. A chave está em entender o modelo de polling assíncrono, gerenciar corretamente o estado interno com Pin, e utilizar os combinadores apropriados para cada cenário.

Referências