Como usar o Redis Streams para processamento de eventos leve
1. Introdução ao Redis Streams e conceitos fundamentais
Redis Streams é uma estrutura de dados introduzida no Redis 5.0 que implementa um log de mensagens imutável e ordenado. Diferente de listas tradicionais, cada mensagem recebe um ID único baseado em timestamp, permitindo leituras precisas e consumidores concorrentes.
Em comparação com Kafka e RabbitMQ, o Redis Streams oferece uma abordagem mais leve e de baixa latência para cenários que não exigem replicação multi-broker ou particionamento complexo. É ideal para aplicações que já utilizam Redis como cache e precisam de mensageria simples sem infraestrutura adicional.
Os principais componentes são:
- Entries: mensagens com um par chave-valor associado a um ID único
- Consumer Groups: grupos que permitem distribuir mensagens entre múltiplos consumidores
- Stream IDs: identificadores no formato <timestamp>-<sequência>
- Blocking operations: leituras que bloqueiam até novas mensagens chegarem
2. Configuração e primeiros passos com Redis Streams
Para começar, instale o Redis (versão 5.0+). Em ambientes Docker:
docker run -d --name redis-streams -p 6379:6379 redis:7-alpine
Comandos fundamentais no redis-cli:
# Adicionar uma entrada
XADD eventos * usuario "joao" acao "login"
# Ler entradas recentes
XREAD COUNT 2 STREAMS eventos 0
# Contar entradas
XLEN eventos
# Listar intervalo específico
XRANGE eventos - +
Exemplo prático com Node.js e ioredis:
const Redis = require('ioredis');
const redis = new Redis();
async function publicarEvento() {
const id = await redis.xadd(
'eventos',
'*',
'usuario', 'maria',
'acao', 'compra',
'valor', '150.00'
);
console.log(`Evento publicado com ID: ${id}`);
}
publicarEvento();
3. Consumidores e grupos de consumo
Consumer groups permitem que múltiplos consumidores processem mensagens de forma concorrente, cada um recebendo mensagens diferentes.
Criando um grupo de consumo:
# Criar grupo (deve existir pelo menos uma entrada)
XGROUP CREATE eventos processadores $ MKSTREAM
Leitura concorrente com XREADGROUP:
const Redis = require('ioredis');
const redis = new Redis();
async function consumir() {
const resultado = await redis.xreadgroup(
'GROUP', 'processadores', 'consumidor-1',
'COUNT', 1,
'BLOCK', 5000,
'STREAMS', 'eventos', '>'
);
if (resultado) {
const [stream, entries] = resultado[0];
const [id, campos] = entries[0];
console.log(`Processando evento ${id}:`, campos);
// Confirmar processamento
await redis.xack('eventos', 'processadores', id);
}
}
setInterval(consumir, 1000);
Gerenciamento de mensagens pendentes:
# Verificar pendências
XPENDING eventos processadores
# Reivindicar mensagens não processadas (timeout > 30000ms)
XCLAIM eventos processadores consumidor-2 30000 1650000000000-0
4. Processamento de eventos com garantias de entrega
O acknowledgment manual (XACK) é essencial para garantir que mensagens foram processadas com sucesso. Para idempotência, utilize IDs únicos nas mensagens.
Estratégia de processamento com retentativas:
const Redis = require('ioredis');
const redis = new Redis();
async function processarPedido(evento) {
const { pedido_id, acao } = evento;
try {
// Processamento do pedido
await executarAcao(pedido_id, acao);
// Confirmar sucesso
await redis.xack('pedidos', 'processadores', evento.id);
} catch (erro) {
// Registrar falha em dead-letter queue
await redis.xadd('pedidos-falhos', '*',
'pedido_id', pedido_id,
'erro', erro.message,
'original_id', evento.id
);
// Ou rejeitar para retentativa
await redis.xack('pedidos', 'processadores', evento.id);
}
}
Controle de idempotência com chave única:
async function evitarDuplicacao(eventoId) {
const chave = `processado:${eventoId}`;
const jaProcessado = await redis.setnx(chave, '1');
if (jaProcessado === 0) {
return false; // Evento já processado
}
await redis.expire(chave, 86400); // Expira em 24h
return true;
}
5. Operações avançadas e otimizações
Filtragem com bloqueio e timeout:
# Ler apenas eventos novos, bloqueando por 10 segundos
XREAD COUNT 10 BLOCK 10000 STREAMS eventos $
Gerenciamento de capacidade com XTRIM:
# Manter apenas as últimas 1000 entradas
XTRIM eventos MAXLEN ~ 1000
# Manter entradas dos últimos 7 dias
XTRIM eventos MINID ~ 1700000000000
Monitoramento com XINFO:
XINFO STREAM eventos
XINFO GROUPS eventos
XINFO CONSUMERS eventos processadores
6. Integração com aplicações Node.js e cenários reais
Pipeline de logs com encadeamento de streams:
const Redis = require('ioredis');
const redis = new Redis();
// Produtor de logs
async function logAplicacao(nivel, mensagem) {
await redis.xadd('logs-raw', '*',
'nivel', nivel,
'mensagem', mensagem,
'timestamp', Date.now()
);
}
// Consumidor que transforma e encadeia
async function processarLogs() {
const resultado = await redis.xreadgroup(
'GROUP', 'transformadores', 'worker-1',
'COUNT', 10,
'BLOCK', 1000,
'STREAMS', 'logs-raw', '>'
);
if (resultado) {
for (const [, entries] of resultado) {
for (const [id, campos] of entries) {
const logProcessado = {
...campos,
processado_em: new Date().toISOString()
};
// Encadear para próximo stream
await redis.xadd('logs-processados', '*', ...Object.entries(logProcessado).flat());
await redis.xack('logs-raw', 'transformadores', id);
}
}
}
}
Notificações em tempo real com WebSockets:
const WebSocket = require('ws');
const Redis = require('ioredis');
const redis = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
const consumerId = `ws-${Date.now()}`;
const interval = setInterval(async () => {
const resultado = await redis.xreadgroup(
'GROUP', 'notificacoes', consumerId,
'COUNT', 5,
'BLOCK', 100,
'STREAMS', 'notificacoes', '>'
);
if (resultado) {
ws.send(JSON.stringify(resultado));
}
}, 200);
ws.on('close', () => {
clearInterval(interval);
});
});
7. Comparação com alternativas e boas práticas
Quando usar Redis Streams:
- Carga leve (< 10.000 mensagens/segundo)
- Baixa latência (milissegundos)
- Infraestrutura Redis já existente
- Necessidade de persistência simples
Armadilhas comuns:
- Consumo duplicado: sempre usar XACK e controle de idempotência
- Perda de mensagens: configurar persistência AOF e replicação
- Escalabilidade: para altas cargas, considere Kafka
Checklist de boas práticas:
1. Nomes de streams: use prefixos (app:stream:tipo)
2. Tamanho de blocos: MAXLEN ~ 10000 para logs, ~ 100000 para eventos
3. Timeouts: BLOCK máximo de 30 segundos para evitar conexões órfãs
4. Persistência: ativar appendonly yes no redis.conf
5. Monitoramento: XINFO e RedisInsight para visualização
Redis Streams oferece uma solução elegante e de baixa complexidade para processamento de eventos leves, combinando a simplicidade do Redis com garantias de entrega adequadas para aplicações modernas.
Referências
- Documentação oficial do Redis Streams — Guia completo sobre comandos, consumer groups e exemplos práticos
- Tutorial Redis Streams com Node.js (ioredis) — Exemplos detalhados de implementação com a biblioteca ioredis
- Redis Streams vs Kafka: Comparação técnica — Análise aprofundada das diferenças e casos de uso ideais
- Padrões de mensageria com Redis Streams — Guia oficial com padrões de design e boas práticas
- Monitoramento e gerenciamento de Redis Streams — Documentação dos comandos XINFO para diagnóstico e métricas
- Redis Streams em produção: armadilhas e soluções — Artigo técnico sobre problemas comuns e como evitá-los