Como usar message deduplication em filas para garantir exatamente uma entrega
1. Fundamentos da Deduplicação de Mensagens em Filas
1.1. Definição de "exatamente uma entrega" vs. "pelo menos uma"
Em sistemas de mensageria, existem três modelos principais de garantia de entrega:
- At-most-once (no máximo uma): a mensagem é entregue zero ou uma vez. Pode ser perdida, mas nunca duplicada.
- At-least-once (pelo menos uma): a mensagem é entregue uma ou mais vezes. Pode haver duplicatas, mas nunca perda.
- Exactly-once (exatamente uma): a mensagem é entregue exatamente uma vez. É o modelo mais difícil de implementar, pois exige coordenação entre produtor, fila e consumidor.
A deduplicação de mensagens é a técnica central para transformar um sistema "at-least-once" em "exactly-once".
1.2. Causas de duplicação
Duplicatas ocorrem por diversas razões:
Causas comuns de duplicação em filas:
- Retries do produtor após timeout de confirmação
- Falhas de rede que ocultam confirmações bem-sucedidas
- Rebalanceamento de consumidores em grupos
- Processamento lento que excede o visibility timeout
- Falhas no consumidor após processamento mas antes do ACK
1.3. Trade-offs
| Abordagem | Desempenho | Latência | Garantia |
|---|---|---|---|
| Sem deduplicação | Alto | Baixa | At-least-once |
| Deduplicação em cache | Médio | Média | Exactly-once (com janela) |
| Deduplicação em BD | Baixo | Alta | Exactly-once (persistente) |
2. Identificação Única de Mensagens (Message ID)
2.1. Geração de IDs únicos
O primeiro passo é criar um identificador único para cada mensagem.
Exemplo de geração de Message ID em Python:
import uuid
import hashlib
import time
# Abordagem 1: UUID aleatório
message_id = str(uuid.uuid4())
# Abordagem 2: Hash baseado no conteúdo
content_hash = hashlib.sha256(payload.encode()).hexdigest()
# Abordagem 3: ID sequencial com origem
message_id = f"order-service-{timestamp}-{sequence_number}"
2.2. Inclusão do ID no cabeçalho ou payload
Estrutura de mensagem com ID de deduplicação:
{
"messageId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"timestamp": 1700000000,
"payload": {
"orderId": 12345,
"amount": 99.90
}
}
2.3. Tratamento de mensagens sem ID explícito
Quando o sistema legado não fornece IDs, use chaves compostas:
Chave composta para deduplicação:
{origem_sistema}:{tipo_evento}:{timestamp_unico}:{sequencia}
Exemplo: "payment-gateway:charge:1700000000:42"
3. Armazenamento de Estado de Deduplicação
3.1. Cache distribuído com TTL
Redis é a escolha mais comum para armazenamento temporário:
Configuração de deduplicação no Redis:
# Conectar ao Redis
redis_client = Redis(host='cache-cluster', port=6379)
# Verificar se mensagem já foi processada
def is_duplicate(message_id):
return redis_client.exists(f"dedup:{message_id}")
# Marcar como processada com TTL de 24 horas
def mark_processed(message_id):
redis_client.setex(f"dedup:{message_id}", 86400, "1")
3.2. Banco de dados relacional persistente
Para cenários que exigem histórico permanente:
Tabela de mensagens processadas:
CREATE TABLE processed_messages (
message_id VARCHAR(64) PRIMARY KEY,
consumer_id VARCHAR(64),
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(20),
payload_hash VARCHAR(64),
INDEX idx_processed_at (processed_at)
);
3.3. Mecanismos de expiração
Limpeza automática de registros antigos:
-- Remover registros com mais de 7 dias
DELETE FROM processed_messages
WHERE processed_at < NOW() - INTERVAL 7 DAY;
4. Implementação no Produtor
4.1. Geração e anexação do ID único
Produtor com deduplicação integrada:
import boto3
import uuid
import json
sqs = boto3.client('sqs', region_name='us-east-1')
def send_message_with_dedup(queue_url, payload):
message_id = str(uuid.uuid4())
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
"messageId": message_id,
"payload": payload
}),
MessageDeduplicationId=message_id # AWS SQS nativo
)
return response
4.2. Lógica de retry com idempotência
Retry idempotente no produtor:
def send_with_retry(queue_url, payload, max_retries=3):
message_id = str(uuid.uuid4())
for attempt in range(max_retries):
try:
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
"messageId": message_id,
"payload": payload
}),
MessageDeduplicationId=message_id
)
return response
except Exception as e:
if attempt == max_retries - 1:
raise e
time.sleep(2 ** attempt) # Exponential backoff
4.3. Uso de filas com suporte nativo
AWS SQS oferece deduplicação nativa com FIFO queues:
Configuração de fila FIFO no AWS SQS:
- Tipo: FIFO
- Content-based deduplication: ativado
- Deduplication scope: queue
- Message retention period: 4 dias
- Visibility timeout: 30 segundos
5. Implementação no Consumidor
5.1. Verificação de duplicação antes do processamento
Consumidor com verificação de duplicação:
def process_message(message):
message_id = json.loads(message['Body'])['messageId']
# Verificar duplicação
if redis_client.exists(f"dedup:{message_id}"):
print(f"Mensagem {message_id} já processada. Ignorando.")
return True # ACK sem processar
# Processar mensagem
try:
process_business_logic(message)
mark_processed(message_id)
return True # ACK bem-sucedido
except Exception as e:
return False # Não marca como processada
5.2. Transação atômica
Transação atômica com banco de dados:
def process_with_transaction(message):
message_id = json.loads(message['Body'])['messageId']
with db.transaction():
# Verificar duplicação dentro da transação
if db.exists("SELECT 1 FROM processed_messages WHERE message_id = ?", message_id):
return
# Executar lógica de negócio
update_inventory(message['payload'])
# Marcar como processada na mesma transação
db.execute(
"INSERT INTO processed_messages (message_id) VALUES (?)",
message_id
)
5.3. Tratamento de falhas
Estratégia de falhas com DLQ:
def consumer_loop(queue_url, dlq_url):
while True:
messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)
for msg in messages.get('Messages', []):
try:
if not process_message(msg):
# Falha de processamento - enviar para DLQ
sqs.send_message(
QueueUrl=dlq_url,
MessageBody=msg['Body']
)
except Exception as e:
# Erro crítico - enviar para DLQ
sqs.send_message(
QueueUrl=dlq_url,
MessageBody=json.dumps({
"originalMessage": msg['Body'],
"error": str(e)
})
)
6. Padrões de Arquitetura para Alta Disponibilidade
6.1. Combinação com Dead Letter Queue
Arquitetura com DLQ para deduplicação:
[Produtor] -> [Fila Principal] -> [Consumidor]
| |
v v
[Redrive Policy] [DLQ - Duplicatas]
6.2. Estratégias de particionamento
Particionamento do estado de deduplicação:
# Hash ring para distribuir estado
def get_cache_node(message_id):
hash_value = hash(message_id)
return cache_nodes[hash_value % len(cache_nodes)]
6.3. Monitoramento de taxa de duplicação
Métricas importantes para monitorar:
- Taxa de duplicação: (mensagens duplicadas / total) * 100
- Latência de verificação: tempo médio de consulta ao cache
- Taxa de expiração: mensagens que expiraram antes do processamento
- Tamanho do cache de deduplicação
7. Casos de Uso e Exemplos Práticos
7.1. Processamento de pagamentos
Cenário: Processamento de cobrança de cartão de crédito
1. Usuário clica em "Comprar"
2. Sistema gera messageId = "payment:user123:order456:timestamp"
3. Mensagem enviada para fila FIFO do SQS
4. Consumidor verifica se messageId já foi processado
5. Se não, executa cobrança e marca como processado
6. Se sim, ignora (evita cobrança duplicada)
7.2. Atualização de inventário
Cenário: Atualização de estoque em e-commerce
message_id = hash(f"inventory:sku123:qty-5:warehouse-1")
7.3. Envio de notificações
Cenário: Notificação de confirmação de pedido
message_id = f"email:user@example.com:order-789:1700000000"
8. Desafios e Boas Práticas
8.1. Limitações de TTL
Mensagens que chegam após o TTL do cache não serão detectadas como duplicatas. Defina TTL baseado no SLA máximo de processamento.
8.2. Consistência eventual vs. consistência forte
Escolha baseada no caso de uso:
- Pagamentos: consistência forte (transação atômica)
- Notificações: consistência eventual (aceitável perder algumas)
- Logs: consistência eventual (duplicatas toleráveis)
8.3. Testes de idempotência
Teste de cenário de falha:
1. Enviar mensagem A com ID único
2. Consumidor inicia processamento
3. Simular falha antes do ACK
4. Mensagem A é reentregue
5. Verificar se processamento ocorre apenas uma vez
Referências
- AWS SQS FIFO Queues Documentation — Documentação oficial sobre filas FIFO com suporte nativo a deduplicação de mensagens
- Redis as a Distributed Cache for Message Deduplication — Guia prático sobre uso de Redis para armazenamento de estado de deduplicação
- Exactly-Once Processing in Apache Kafka — Documentação oficial do Apache Kafka sobre garantias de entrega exatamente uma vez
- RabbitMQ Idempotent Consumer Pattern — Padrões de consumidor idempotente para RabbitMQ
- Message Deduplication Best Practices — Artigo de Martin Fowler sobre padrões de deduplicação em sistemas distribuídos