Streaming de respostas de LLMs no backend com Node.js e FastAPI
1. Fundamentos do Streaming de LLMs
1.1. O que é streaming de respostas e por que é essencial para UX em chatbots
Streaming de respostas é a técnica de enviar dados incrementalmente do servidor para o cliente conforme são gerados, em vez de aguardar a conclusão completa do processamento. Em chatbots baseados em LLMs, isso permite que o usuário veja o texto sendo "digitado" em tempo real, reduzindo a latência percebida de 5-15 segundos para milissegundos iniciais. Estudos de UX mostram que a exibição progressiva aumenta a taxa de retenção em até 40% e melhora a sensação de interatividade.
1.2. Diferença entre respostas completas (batch) e respostas incrementais (stream)
No modo batch, o servidor aguarda a geração completa do texto (ex: 500 tokens) e envia a resposta inteira de uma vez. No modo stream, cada token gerado pelo LLM é enviado imediatamente ao cliente. A diferença técnica está na forma como a conexão HTTP é gerenciada: batch usa uma resposta única, enquanto stream mantém a conexão aberta enviando chunks sucessivos.
// Exemplo de resposta batch (completa)
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 245
{"response": "Olá! Como posso ajudar você hoje com suas dúvidas sobre programação?"}
// Exemplo de resposta stream (incremental)
HTTP/1.1 200 OK
Content-Type: text/event-stream
Transfer-Encoding: chunked
data: {"token": "Olá"}
data: {"token": "!"}
data: {"token": " Como"}
1.3. Protocolos de streaming: SSE vs WebSockets vs chunked transfer encoding
- Server-Sent Events (SSE): Unidirecional (servidor→cliente), usa texto puro com formato
data:, suporte nativo em navegadores viaEventSource. Ideal para notificações e streams de texto. - WebSockets: Bidirecional, baixa latência, mas requer gerenciamento de estado. Adequado para aplicações que precisam enviar comandos do cliente durante o stream.
- Chunked Transfer Encoding: Mecanismo HTTP/1.1 que permite enviar resposta em partes sem saber o tamanho total. Usado internamente por SSE e streaming de arquivos.
2. Arquitetura de Streaming no Backend com FastAPI (Python)
2.1. Implementação de endpoints assíncronos com StreamingResponse do Starlette
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def token_generator():
"""Simula geração token a token de um LLM"""
frase = "Olá! Como posso ajudar você hoje?"
for palavra in frase.split():
yield f"data: {palavra}\n\n"
await asyncio.sleep(0.3) # Simula tempo de geração
@app.get("/stream")
async def stream_response():
return StreamingResponse(
token_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
2.2. Integração com clientes LLM (OpenAI, Anthropic, modelos locais)
from openai import AsyncOpenAI
from fastapi import HTTPException
client = AsyncOpenAI(api_key="sua-chave")
async def openai_stream_generator(prompt: str):
try:
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
yield f"data: {token}\n\n"
except Exception as e:
yield f"event: error\ndata: {str(e)}\n\n"
2.3. Tratamento de erros, timeouts e cancelamento de streams
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def managed_stream(stream_generator):
try:
async with asyncio.timeout(30): # Timeout de 30 segundos
yield stream_generator
except asyncio.TimeoutError:
yield iter(["event: error\ndata: Timeout na geração da resposta\n\n"])
except Exception as e:
yield iter([f"event: error\ndata: {str(e)}\n\n"])
finally:
# Limpeza de recursos
pass
3. Streaming no Backend com Node.js (Express/Fastify)
3.1. Uso de ReadableStream, AsyncIterators e res.write() para enviar chunks
const express = require('express');
const app = express();
app.get('/stream', async (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
const frase = "Olá! Como posso ajudar você hoje?";
const palavras = frase.split(' ');
for (const palavra of palavras) {
res.write(`data: ${palavra}\n\n`);
await new Promise(resolve => setTimeout(resolve, 300));
}
res.end();
});
3.2. Consumo de APIs de streaming de LLMs via fetch com response.body.getReader()
async function consumeOpenAIStream(prompt, res) {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`
},
body: JSON.stringify({
model: 'gpt-4',
messages: [{ role: 'user', content: prompt }],
stream: true
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// Processa e reenvia para o cliente
res.write(`data: ${chunk}\n\n`);
}
res.end();
}
3.3. Gerenciamento de conexões longas e backpressure
const { Transform } = require('stream');
class BackpressureStream extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.highWaterMark = 16; // Limita buffer interno
}
_transform(chunk, encoding, callback) {
// Aplica backpressure se o cliente estiver lento
if (this.writableLength > this.highWaterMark) {
this.pause();
setTimeout(() => this.resume(), 100);
}
this.push(chunk);
callback();
}
}
4. Padrões de Comunicação Cliente-Servidor para Streaming
4.1. Implementação de SSE (Server-Sent Events)
// Cliente JavaScript
const eventSource = new EventSource('/stream');
eventSource.onmessage = (event) => {
console.log('Token recebido:', event.data);
};
eventSource.onerror = (error) => {
console.error('Erro no stream:', error);
};
4.2. WebSockets para comunicação bidirecional
# FastAPI com WebSocket
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
async for token in generate_tokens(data):
await websocket.send_text(token)
4.3. Estratégias de fallback: polling
async function fallbackPolling(prompt) {
const response = await fetch('/generate', {
method: 'POST',
body: JSON.stringify({ prompt })
});
const { responseId } = await response.json();
// Polling a cada 500ms
const interval = setInterval(async () => {
const status = await fetch(`/status/${responseId}`);
const data = await status.json();
if (data.complete) {
clearInterval(interval);
console.log('Resposta completa:', data.response);
} else if (data.tokens) {
console.log('Tokens parciais:', data.tokens);
}
}, 500);
}
5. Otimização de Performance e Escalabilidade
5.1. Uso de buffers e throttling
import asyncio
async def throttled_stream(generator, max_tokens_per_second=10):
buffer = []
last_send_time = asyncio.get_event_loop().time()
async for token in generator:
buffer.append(token)
current_time = asyncio.get_event_loop().time()
if len(buffer) >= max_tokens_per_second or (current_time - last_send_time) >= 0.1:
yield ''.join(buffer)
buffer = []
last_send_time = current_time
await asyncio.sleep(0.1) # Rate limiting
5.2. Gerenciamento de conexões simultâneas
const rateLimit = require('express-rate-limit');
const streamLimiter = rateLimit({
windowMs: 60 * 1000, // 1 minuto
max: 10, // Máximo de 10 streams por minuto
message: 'Muitas requisições de stream. Tente novamente mais tarde.'
});
app.use('/stream', streamLimiter);
5.3. Cache de tokens parciais
from functools import lru_cache
@lru_cache(maxsize=100)
def get_cached_prefix(prompt_hash: str) -> str:
"""Recupera prefixos de respostas comuns para acelerar streaming"""
return ""
6. Segurança e Boas Práticas no Streaming
6.1. Validação de entrada e sanitização
from pydantic import BaseModel, validator
import html
class StreamRequest(BaseModel):
prompt: str
@validator('prompt')
def sanitize_prompt(cls, v):
# Remove caracteres perigosos
v = html.escape(v)
if len(v) > 1000:
raise ValueError('Prompt muito longo')
return v
6.2. Proteção contra injeção de conteúdo
function sanitizeToken(token) {
// Remove possíveis tags HTML/JS maliciosas
return token.replace(/<[^>]*>/g, '')
.replace(/[<>]/g, '')
.substring(0, 100); // Limita tamanho do token
}
6.3. Autenticação em conexões persistentes
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
security = HTTPBearer()
@app.get("/secure-stream")
async def secure_stream(token: str = Depends(security)):
if not validate_token(token.credentials):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido"
)
return StreamingResponse(token_generator())
7. Exemplos Práticos: FastAPI + Node.js em Ação
7.1. Endpoint FastAPI completo com GPT-4
# fastapi_stream.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import asyncio
app = FastAPI()
client = AsyncOpenAI()
@app.get("/chat/stream")
async def chat_stream(prompt: str):
async def generate():
try:
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=500
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"
await asyncio.sleep(0.01) # Controle de fluxo
yield "event: done\ndata: [DONE]\n\n"
except Exception as e:
yield f"event: error\ndata: {str(e)}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no"
}
)
7.2. Node.js consumindo o stream do FastAPI
// node_proxy.js
const express = require('express');
const fetch = require('node-fetch');
const app = express();
app.get('/proxy-stream', async (req, res) => {
const prompt = req.query.prompt || 'Olá';
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
try {
const response = await fetch(
`http://localhost:8000/chat/stream?prompt=${encodeURIComponent(prompt)}`
);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
res.write(chunk);
}
} catch (error) {
res.write(`event: error\ndata: ${error.message}\n\n`);
} finally {
res.end();
}
});
app.listen(3000, () => {
console.log('Proxy rodando em http://localhost:3000');
});
7.3. Demonstração com bibliotecas populares
<!-- index.html - Cliente com EventSource -->
<!DOCTYPE html>
<html>
<body>
<div id="response"></div>
<button onclick="startStream()">Iniciar Stream</button>
<script>
function startStream() {
const eventSource = new EventSource('/proxy-stream?prompt=Explique streaming');
eventSource.onmessage = (event) => {
document.getElementById('response').innerHTML += event.data;
};
eventSource.addEventListener('done', () => {
eventSource.close();
console.log('Stream completo');
});
eventSource.onerror = (error) => {
console.error('Erro:', error);
eventSource.close();
};
}
</script>
</body>
</html>
7. Conclusão
O streaming de respostas de LLMs é uma técnica fundamental para criar experiências de usuário responsivas e naturais em aplicações de IA conversacional. Tanto FastAPI (Python) quanto Node.js oferecem ferramentas maduras para implementar esses padrões, com SSE sendo a escolha mais simples para comunicação unidirecional e WebSockets para casos que exigem interação bidirecional. A escolha entre as tecnologias depende do ecossistema existente e requisitos específicos de escalabilidade.
Referências
- FastAPI StreamingResponse Documentation — Documentação oficial sobre implementação de respostas streaming em FastAPI com Starlette.
- Node.js Stream API Official Guide — Guia completo da API de streams do Node.js, incluindo Readable, Writable e Transform streams.
- OpenAI Streaming API Documentation — Documentação oficial da OpenAI sobre como consumir respostas streaming de modelos GPT.
- Server-Sent Events (SSE) MDN Web Docs — Referência completa sobre o protocolo SSE, incluindo EventSource API e formatos de evento.
- Express.js Response Streaming Patterns — Padrões recomendados pela equipe Express para implementar streaming eficiente em servidores Node.js.
- Fastify Response Streaming Examples — Exemplos práticos de streaming com Fastify, incluindo manipulação de backpressure e timeouts.