Como usar streaming de respostas HTTP para processamento eficiente em lote

1. Fundamentos do Streaming HTTP em APIs de Alto Tráfego

O streaming de respostas HTTP é uma técnica que permite enviar dados ao cliente de forma incremental, chunk por chunk, sem esperar que todo o payload seja gerado. Diferente do modelo tradicional (buffered), onde o servidor monta a resposta completa em memória antes de enviá-la, o streaming utiliza chunked transfer encoding para transmitir dados assim que disponíveis.

Principais vantagens para operações em lote:
- Latência reduzida: o cliente começa a processar resultados em milissegundos, sem aguardar a conclusão total
- Memória eficiente: servidor e cliente mantêm apenas um buffer pequeno (~64KB por chunk)
- Throughput superior: permite processar conjuntos de dados que excederiam a RAM disponível

Quando evitar streaming:
- Operações que exigem atomicidade total (rollback completo em caso de erro)
- Cenários onde a ordem de processamento depende de validação cruzada entre todos os itens
- APIs que precisam garantir idempotência estrita sem checkpointing

2. Arquitetura de Streaming para Processamento em Lote

O padrão produtor-consumidor com canais assíncronos é a base arquitetural mais eficaz. O servidor atua como produtor, gerando chunks de dados, enquanto o cliente consome e processa cada lote.

Server-Sent Events (SSE) vs. NDJSON:
- SSE (text/event-stream): ideal para notificações em tempo real e eventos unidirecionais
- NDJSON (application/x-ndjson): mais adequado para lotes de dados estruturados, com cada linha sendo um JSON válido

Backpressure é implementado controlando a taxa de envio de chunks. No servidor, usa-se buffers limitados; no cliente, pausa-se a leitura quando o processamento interno fica sobrecarregado.

3. Implementação de Endpoint de Streaming em APIs

Configuração de Headers HTTP

HTTP/1.1 200 OK
Content-Type: application/x-ndjson
Transfer-Encoding: chunked
Connection: keep-alive

Exemplo em Python com FastAPI

from fastapi import FastAPI, Response
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def generate_lotes(ids: list[int]):
    for i in range(0, len(ids), 100):
        lote = ids[i:i+100]
        # Simula processamento em lote
        await asyncio.sleep(0.1)
        resultado = {"indice_inicio": i, "itens": lote}
        yield json.dumps(resultado) + "\n"

@app.get("/stream/lotes")
async def stream_lotes(ids: str):
    ids_list = [int(x) for x in ids.split(",")]
    return StreamingResponse(
        generate_lotes(ids_list),
        media_type="application/x-ndjson",
        headers={"Transfer-Encoding": "chunked"}
    )

Tratamento de Erros Parciais

# No meio do stream, enviamos um chunk de erro
{
  "tipo": "erro",
  "indice": 450,
  "mensagem": "Falha ao processar item 450",
  "acao": "ignorar"
}

4. Consumo Eficiente do Stream no Cliente

Leitura Incremental com JavaScript (fetch)

async function consumirStream(url) {
  const response = await fetch(url);
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const linhas = buffer.split("\n");
    buffer = linhas.pop(); // Mantém linha incompleta no buffer

    for (const linha of linhas) {
      if (linha.trim()) {
        const lote = JSON.parse(linha);
        await processarLote(lote); // Processamento assíncrono
      }
    }
  }
}

Agrupamento de Chunks no Cliente

Para reduzir chamadas de banco, acumule múltiplos chunks antes de persistir:

let acumulador = [];
const TAMANHO_LOTE = 500;

async function processarLote(lote) {
  acumulador.push(...lote.itens);
  if (acumulador.length >= TAMANHO_LOTE) {
    await salvarNoBanco(acumulador);
    acumulador = [];
  }
}

Estratégia de Retry por Índices

async function consumirComRetry(url, indicesFalhos = []) {
  // Reenvia apenas os lotes com falha
  const urlRetry = `${url}&retry=${indicesFalhos.join(",")}`;
  return consumirStream(urlRetry);
}

5. Otimizações de Performance e Escalabilidade

Compressão gzip em tempo real: ative Content-Encoding: gzip no servidor. Para streams longos, a compressão incremental reduz o tráfego em até 70%.

Workers paralelos no servidor:

import concurrent.futures

def gerar_stream_paralelo(ids):
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = []
        for i in range(0, len(ids), 250):
            lote = ids[i:i+250]
            futures.append(executor.submit(processar_lote, lote, i))

        for future in concurrent.futures.as_completed(futures):
            yield future.result()

Métricas essenciais:
- Throughput: itens/segundo = total_itens / tempo_total
- Latência por chunk: tempo entre início do processamento e envio do chunk
- Taxa de retry: chunks_reenviados / total_chunks

6. Casos de Uso e Exemplos Práticos

Exportação de Relatórios com Milhões de Registros

GET /relatorios/vendas?formato=ndjson&data_inicio=2024-01-01&data_fim=2024-12-31

# Resposta em streaming com 2 milhões de linhas
# Cliente processa e salva em arquivo sem carregar tudo em memória

Pipeline de Transformação Contínua

1. Servidor A lê dados do banco (stream)
2. Transforma cada lote (enriquecimento)
3. Envia para Servidor B via NDJSON
4. Servidor B persiste no data warehouse

Integração com Filas (Kafka)

async def stream_para_kafka(url_stream, producer, topico):
    async for lote in consumir_stream(url_stream):
        await producer.send(topico, value=lote)
        # Confirmação assíncrona do Kafka

7. Boas Práticas e Armadilhas Comuns

Timeouts e Keep-Alive:
- Configure write_timeout como 0 (desabilitado) em streams longos
- Use Connection: keep-alive com timeout=300 segundos

Detecção de Desconexão:

# No servidor FastAPI
async def generate():
    try:
        while True:
            chunk = await obter_proximo_chunk()
            yield chunk
    except asyncio.CancelledError:
        # Cliente desconectou
        await interromper_processamento()

Versionamento de Formato:

{
  "version": "2.0",
  "indice": 100,
  "tipo": "dados",
  "dados": [...]
}

Armadilhas comuns:
- Não enviar \n ao final de cada chunk NDJSON
- Ignorar backpressure no cliente (buffer cresce indefinidamente)
- Usar streaming para payloads menores que 10MB (overhead desnecessário)


Referências