Como implementar rate limiting por IP com Redis e tolerância a proxies

1. Fundamentos do Rate Limiting e o Papel do Redis

Rate limiting é uma técnica de controle de tráfego que limita o número de requisições que um cliente pode fazer em um determinado intervalo de tempo. É essencial para prevenir abusos, ataques de força bruta, e garantir a estabilidade de APIs e serviços web. Sem ele, um único cliente mal-intencionado ou com um bug pode sobrecarregar o servidor, afetando todos os outros usuários.

O Redis é a ferramenta ideal para implementar rate limiting por várias razões:

  • Operações atômicas: Comandos como INCR e EXPIRE garantem que não haja condições de corrida.
  • Baixa latência: Opera em memória, com respostas na ordem de microssegundos.
  • Expiração automática: Chaves podem ter TTL (time-to-live), eliminando a necessidade de limpeza manual.
  • Estruturas de dados versáteis: Strings, Sorted Sets e Hashes permitem implementar diferentes estratégias.

As estratégias mais comuns incluem:

  • Fixed Window: Divide o tempo em janelas fixas (ex: 100 requisições por minuto). Simples, mas pode permitir picos no final de uma janela e início da próxima.
  • Sliding Window Log: Mantém um log de timestamps de cada requisição. Preciso, mas consome mais memória.
  • Sliding Window Counter: Combina dois contadores (janela atual e anterior) para aproximar o sliding window com menor uso de memória.
  • Token Bucket: Um bucket com tokens que são consumidos a cada requisição e recarregados a uma taxa constante.

2. Identificação Correta do IP Real em Ambientes com Proxies

Em arquiteturas modernas, as requisições raramente chegam diretamente ao servidor. Elas passam por proxies reversos (Nginx, HAProxy), CDNs (Cloudflare, Akamai) e balanceadores de carga. O IP que o servidor vê (REMOTE_ADDR) é o do último proxy, não o do cliente real.

Os cabeçalhos HTTP que carregam o IP original são:

  • X-Forwarded-For: Lista de IPs separados por vírgula. O primeiro é o cliente original, os demais são proxies.
  • X-Real-IP: Usado por alguns proxies (ex: Nginx) para enviar o IP real diretamente.

O desafio é que esses cabeçalhos podem ser falsificados por clientes maliciosos. A solução é confiar apenas em proxies conhecidos (whitelist) e extrair o IP correto:

def get_real_ip(request, trusted_proxies):
    """
    Extrai o IP real do cliente considerando proxies confiáveis.

    Args:
        request: Objeto de requisição com headers e remote_addr
        trusted_proxies: Lista de IPs de proxies confiáveis (CIDR ou IPs)

    Returns:
        String com o IP real do cliente
    """
    # Verifica se há X-Forwarded-For
    xff = request.headers.get('X-Forwarded-For')
    if xff:
        # Divide a lista de IPs
        ips = [ip.strip() for ip in xff.split(',')]

        # Remove proxies confiáveis do final da lista
        while ips and ips[-1] in trusted_proxies:
            ips.pop()

        # Se sobrou algum IP, o primeiro é o cliente original
        if ips:
            return ips[0]

    # Fallback para X-Real-IP (se presente)
    xri = request.headers.get('X-Real-IP')
    if xri and xri not in trusted_proxies:
        return xri

    # Último recurso: IP da conexão direta
    return request.remote_addr

3. Implementação Básica com Sliding Window Log no Redis

A estratégia Sliding Window Log usa um Sorted Set no Redis para armazenar timestamps de cada requisição. A chave é o IP do cliente, e cada membro do set é um timestamp único (ex: requisição:timestamp). O score é o próprio timestamp, permitindo consultas por intervalo.

Comandos Redis essenciais:

  • ZADD: Adiciona um membro com score
  • ZREMRANGEBYSCORE: Remove membros com score em um intervalo
  • ZCOUNT: Conta membros com score em um intervalo
  • EXPIRE: Define TTL para a chave
import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def is_rate_limited_sliding_window(ip, limit, window_seconds):
    """
    Verifica se o IP excedeu o limite de requisições usando Sliding Window Log.

    Args:
        ip: IP do cliente
        limit: Número máximo de requisições permitidas
        window_seconds: Tamanho da janela em segundos

    Returns:
        Boolean: True se estiver limitado, False caso contrário
    """
    key = f"ratelimit:sliding:{ip}"
    now = time.time()
    window_start = now - window_seconds

    # Pipeline para atomicidade
    pipe = r.pipeline()

    # Remove timestamps antigos (fora da janela)
    pipe.zremrangebyscore(key, 0, window_start)

    # Conta requisições na janela atual
    pipe.zcard(key)

    # Adiciona timestamp atual
    pipe.zadd(key, {f"req:{now}": now})

    # Define TTL para limpeza automática
    pipe.expire(key, window_seconds)

    results = pipe.execute()
    current_count = results[1]  # Resultado de zcard

    # Se já excedeu o limite, remove a requisição que acabamos de adicionar
    if current_count > limit:
        pipe2 = r.pipeline()
        pipe2.zrem(key, f"req:{now}")
        pipe2.execute()
        return True

    return False

4. Estratégia Avançada: Sliding Window Counters com Redis

O Sliding Window Log é preciso, mas consome muita memória para IPs com muitas requisições. O Sliding Window Counter resolve isso usando apenas dois contadores por janela: um para a janela atual e outro para a anterior.

A ideia é dividir o tempo em janelas fixas (ex: 1 segundo). Para cada requisição, calculamos em qual janela ela cai. Mantemos um contador para a janela atual e outro para a anterior. Quando uma requisição chega, calculamos um peso proporcional ao tempo decorrido na janela atual:

def is_rate_limited_sliding_counter(ip, limit, window_seconds):
    """
    Verifica se o IP excedeu o limite usando Sliding Window Counter.

    Args:
        ip: IP do cliente
        limit: Número máximo de requisições permitidas
        window_seconds: Tamanho da janela em segundos (ex: 60)

    Returns:
        Boolean: True se estiver limitado, False caso contrário
    """
    key = f"ratelimit:counter:{ip}"
    now = time.time()

    # Divide o tempo em janelas de 1 segundo (granularidade ajustável)
    current_window = int(now)
    previous_window = current_window - 1

    # Pesos para a janela anterior (proporcional ao tempo decorrido)
    elapsed = now - current_window
    weight_previous = 1 - elapsed  # Quanto mais perto do final, menor o peso

    pipe = r.pipeline()

    # Incrementa contador da janela atual
    pipe.hincrby(key, current_window, 1)

    # Obtém contadores de ambas as janelas
    pipe.hget(key, current_window)
    pipe.hget(key, previous_window)

    # Define TTL para a chave (2 janelas para segurança)
    pipe.expire(key, window_seconds + 2)

    results = pipe.execute()
    current_count = int(results[1] or 0)
    previous_count = int(results[2] or 0)

    # Calcula requisições estimadas na janela deslizante
    estimated = current_count + (previous_count * weight_previous)

    if estimated > limit:
        # Reverte o incremento
        r.hincrby(key, current_window, -1)
        return True

    return False

5. Tolerância a Proxies: Whitelist e Headers Dinâmicos

Para lidar com proxies de forma confiável, precisamos de uma whitelist de IPs confiáveis. Isso inclui:

  • IPs internos da infraestrutura (ex: range 10.0.0.0/8)
  • IPs de CDNs conhecidos (Cloudflare, Akamai, Fastly)
  • IPs de balanceadores de carga

A estratégia de fallback deve considerar que o cabeçalho X-Forwarded-For pode estar ausente ou ter sido falsificado:

# Whitelist de proxies confiáveis (exemplo com Cloudflare)
TRUSTED_PROXIES = {
    '173.245.48.0/20',
    '103.21.244.0/22',
    '103.22.200.0/22',
    # ... outros ranges
}

# Cache Redis para evitar parsing repetitivo
PROXY_CACHE_KEY = "proxy_cache"

def resolve_client_ip(request):
    """
    Resolve o IP real do cliente com cache em Redis.
    """
    # Chave de cache baseada no IP do proxy
    proxy_ip = request.remote_addr
    cache_key = f"{PROXY_CACHE_KEY}:{proxy_ip}"

    # Verifica cache
    cached = r.get(cache_key)
    if cached:
        return cached

    # Processa headers
    real_ip = get_real_ip(request, TRUSTED_PROXIES)

    # Armazena em cache por 5 minutos
    r.setex(cache_key, 300, real_ip)

    return real_ip

6. Tratamento de Exceções e Cenários de Borda

IPs Privados e IPv6

IPs privados (RFC 1918: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16) nunca devem ser considerados como IPs de clientes reais em produção. IPv6 mapeados em IPv4 (ex: ::ffff:192.168.1.1) devem ser normalizados.

Granularidade: Rota vs. Global

Rate limiting pode ser aplicado globalmente (por IP) ou por rota (endpoint). A abordagem por rota é mais justa, mas requer chaves compostas:

def rate_limit_by_route(ip, route, limit, window):
    key = f"ratelimit:route:{route}:{ip}"
    # ... mesma lógica dos exemplos anteriores

Respostas Adequadas

Quando uma requisição é bloqueada, a resposta deve incluir:

  • Status HTTP: 429 Too Many Requests
  • Cabeçalho Retry-After: Tempo em segundos até poder tentar novamente
  • Logging: Registrar o IP, rota e timestamp para análise
def rate_limit_response(retry_after):
    return {
        'status': 429,
        'headers': {
            'Retry-After': str(retry_after),
            'Content-Type': 'application/json'
        },
        'body': {
            'error': 'Too Many Requests',
            'retry_after': retry_after
        }
    }

7. Monitoramento e Ajuste Dinâmico dos Limites

Métricas no Redis

Para monitorar a eficácia do rate limiting, armazene métricas em chaves separadas:

# Incrementa contadores de métricas
r.incr(f"metrics:ratelimit:blocked:{ip}")
r.incr(f"metrics:ratelimit:allowed:{ip}")

# Taxa de bloqueio por IP
blocked = int(r.get(f"metrics:ratelimit:blocked:{ip}") or 0)
allowed = int(r.get(f"metrics:ratelimit:allowed:{ip}") or 0)
rate = blocked / (blocked + allowed) if (blocked + allowed) > 0 else 0

Adaptive Rate Limiting

Limites podem ser ajustados dinamicamente com base na carga do servidor ou no comportamento do cliente:

def adaptive_limit(ip, base_limit, server_load):
    """
    Ajusta o limite baseado na carga do servidor (0.0 a 1.0).
    """
    if server_load > 0.8:
        return int(base_limit * 0.5)  # Reduz 50%
    elif server_load > 0.6:
        return int(base_limit * 0.8)  # Reduz 20%
    return base_limit

8. Boas Práticas de Segurança e Performance

Pool de Conexões Redis

Use um pool de conexões para evitar overhead de criação de conexões:

pool = redis.ConnectionPool(host='localhost', port=6379, max_connections=20)
r = redis.Redis(connection_pool=pool)

Proteção contra Ataques ao Redis

  • Limite o tamanho de Sorted Sets: remova entradas antigas frequentemente
  • Use MAXMEMORY no Redis para evitar estouro de memória
  • Implemente circuit breaker: se o Redis ficar indisponível, permita requisições temporariamente

Versionamento de Chaves

Para migrar entre estratégias sem interromper o serviço:

STRATEGY_VERSION = 2
key = f"ratelimit:v{STRATEGY_VERSION}:{ip}"

Isso permite que versões antigas expirem naturalmente enquanto a nova estratégia é adotada gradualmente.


Referências