Como usar streaming de respostas HTTP para processamento eficiente em lote
1. Fundamentos do Streaming HTTP em APIs de Alto Tráfego
O streaming de respostas HTTP é uma técnica que permite enviar dados ao cliente de forma incremental, chunk por chunk, sem esperar que todo o payload seja gerado. Diferente do modelo tradicional (buffered), onde o servidor monta a resposta completa em memória antes de enviá-la, o streaming utiliza chunked transfer encoding para transmitir dados assim que disponíveis.
Principais vantagens para operações em lote:
- Latência reduzida: o cliente começa a processar resultados em milissegundos, sem aguardar a conclusão total
- Memória eficiente: servidor e cliente mantêm apenas um buffer pequeno (~64KB por chunk)
- Throughput superior: permite processar conjuntos de dados que excederiam a RAM disponível
Quando evitar streaming:
- Operações que exigem atomicidade total (rollback completo em caso de erro)
- Cenários onde a ordem de processamento depende de validação cruzada entre todos os itens
- APIs que precisam garantir idempotência estrita sem checkpointing
2. Arquitetura de Streaming para Processamento em Lote
O padrão produtor-consumidor com canais assíncronos é a base arquitetural mais eficaz. O servidor atua como produtor, gerando chunks de dados, enquanto o cliente consome e processa cada lote.
Server-Sent Events (SSE) vs. NDJSON:
- SSE (text/event-stream): ideal para notificações em tempo real e eventos unidirecionais
- NDJSON (application/x-ndjson): mais adequado para lotes de dados estruturados, com cada linha sendo um JSON válido
Backpressure é implementado controlando a taxa de envio de chunks. No servidor, usa-se buffers limitados; no cliente, pausa-se a leitura quando o processamento interno fica sobrecarregado.
3. Implementação de Endpoint de Streaming em APIs
Configuração de Headers HTTP
HTTP/1.1 200 OK
Content-Type: application/x-ndjson
Transfer-Encoding: chunked
Connection: keep-alive
Exemplo em Python com FastAPI
from fastapi import FastAPI, Response
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def generate_lotes(ids: list[int]):
for i in range(0, len(ids), 100):
lote = ids[i:i+100]
# Simula processamento em lote
await asyncio.sleep(0.1)
resultado = {"indice_inicio": i, "itens": lote}
yield json.dumps(resultado) + "\n"
@app.get("/stream/lotes")
async def stream_lotes(ids: str):
ids_list = [int(x) for x in ids.split(",")]
return StreamingResponse(
generate_lotes(ids_list),
media_type="application/x-ndjson",
headers={"Transfer-Encoding": "chunked"}
)
Tratamento de Erros Parciais
# No meio do stream, enviamos um chunk de erro
{
"tipo": "erro",
"indice": 450,
"mensagem": "Falha ao processar item 450",
"acao": "ignorar"
}
4. Consumo Eficiente do Stream no Cliente
Leitura Incremental com JavaScript (fetch)
async function consumirStream(url) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const linhas = buffer.split("\n");
buffer = linhas.pop(); // Mantém linha incompleta no buffer
for (const linha of linhas) {
if (linha.trim()) {
const lote = JSON.parse(linha);
await processarLote(lote); // Processamento assíncrono
}
}
}
}
Agrupamento de Chunks no Cliente
Para reduzir chamadas de banco, acumule múltiplos chunks antes de persistir:
let acumulador = [];
const TAMANHO_LOTE = 500;
async function processarLote(lote) {
acumulador.push(...lote.itens);
if (acumulador.length >= TAMANHO_LOTE) {
await salvarNoBanco(acumulador);
acumulador = [];
}
}
Estratégia de Retry por Índices
async function consumirComRetry(url, indicesFalhos = []) {
// Reenvia apenas os lotes com falha
const urlRetry = `${url}&retry=${indicesFalhos.join(",")}`;
return consumirStream(urlRetry);
}
5. Otimizações de Performance e Escalabilidade
Compressão gzip em tempo real: ative Content-Encoding: gzip no servidor. Para streams longos, a compressão incremental reduz o tráfego em até 70%.
Workers paralelos no servidor:
import concurrent.futures
def gerar_stream_paralelo(ids):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for i in range(0, len(ids), 250):
lote = ids[i:i+250]
futures.append(executor.submit(processar_lote, lote, i))
for future in concurrent.futures.as_completed(futures):
yield future.result()
Métricas essenciais:
- Throughput: itens/segundo = total_itens / tempo_total
- Latência por chunk: tempo entre início do processamento e envio do chunk
- Taxa de retry: chunks_reenviados / total_chunks
6. Casos de Uso e Exemplos Práticos
Exportação de Relatórios com Milhões de Registros
GET /relatorios/vendas?formato=ndjson&data_inicio=2024-01-01&data_fim=2024-12-31
# Resposta em streaming com 2 milhões de linhas
# Cliente processa e salva em arquivo sem carregar tudo em memória
Pipeline de Transformação Contínua
1. Servidor A lê dados do banco (stream)
2. Transforma cada lote (enriquecimento)
3. Envia para Servidor B via NDJSON
4. Servidor B persiste no data warehouse
Integração com Filas (Kafka)
async def stream_para_kafka(url_stream, producer, topico):
async for lote in consumir_stream(url_stream):
await producer.send(topico, value=lote)
# Confirmação assíncrona do Kafka
7. Boas Práticas e Armadilhas Comuns
Timeouts e Keep-Alive:
- Configure write_timeout como 0 (desabilitado) em streams longos
- Use Connection: keep-alive com timeout=300 segundos
Detecção de Desconexão:
# No servidor FastAPI
async def generate():
try:
while True:
chunk = await obter_proximo_chunk()
yield chunk
except asyncio.CancelledError:
# Cliente desconectou
await interromper_processamento()
Versionamento de Formato:
{
"version": "2.0",
"indice": 100,
"tipo": "dados",
"dados": [...]
}
Armadilhas comuns:
- Não enviar \n ao final de cada chunk NDJSON
- Ignorar backpressure no cliente (buffer cresce indefinidamente)
- Usar streaming para payloads menores que 10MB (overhead desnecessário)
Referências
- MDN Web Docs: Transfer-Encoding — Documentação oficial sobre chunked transfer encoding e headers HTTP para streaming
- FastAPI: StreamingResponse — Guia oficial de implementação de respostas streaming em Python com FastAPI
- Node.js: Stream API Documentation — Documentação completa sobre streams nativos do Node.js para processamento eficiente
- JSON Lines (NDJSON) Specification — Especificação oficial do formato NDJSON utilizado em streams de dados estruturados
- HTTP Streaming with Fetch API — Tutorial do Google Web Dev sobre consumo de streams HTTP no navegador com fetch API
- Backpressure in Stream Processing — Artigo técnico da Confluent sobre estratégias de controle de fluxo em sistemas de streaming
- gzip Compression for HTTP Streaming — Documentação do Apache sobre compressão gzip em tempo real para respostas HTTP