Como aplicar backpressure em sistemas de alto throughput

1. Fundamentos do backpressure em sistemas distribuídos

Backpressure é o mecanismo de controle de fluxo que permite que sistemas de alto throughput regulem a taxa de processamento entre produtores e consumidores. Em vez de permitir que um produtor sobrecarregue um consumidor, o backpressure sinaliza ao produtor que reduza sua taxa de envio ou pare temporariamente.

Princípios fundamentais:
- Controle reativo: O consumidor informa sua capacidade atual ao produtor
- Controle proativo: O produtor ajusta sua taxa baseado em limites pré-definidos
- Bufferização controlada: Filas com tamanho máximo definido evitam estouro de memória

Problemas comuns em alto throughput sem backpressure:
- Sobrecarga de memória (OOM)
- Perda de dados por timeouts
- Degradação de latência (efeito "cadeira de rodas")
- Queda em cascata de serviços

2. Estratégias de backpressure no nível de aplicação

Implementação com semáforos e limites de concorrência

import threading
import time

class SemaphoreBackpressure:
    def __init__(self, max_concurrent=100):
        self.semaphore = threading.Semaphore(max_concurrent)
        self.active_requests = 0

    def process_with_backpressure(self, request):
        acquired = self.semaphore.acquire(blocking=False)
        if not acquired:
            # Backpressure ativo: rejeitar ou fazer backoff
            return {"status": "rejected", "reason": "backpressure"}

        try:
            # Processamento real
            result = self._process(request)
            return {"status": "success", "data": result}
        finally:
            self.semaphore.release()

Filas com tamanho fixo e política de rejeição

from collections import deque
import asyncio

class BoundedQueue:
    def __init__(self, max_size=1000, policy="drop"):
        self.queue = deque(maxlen=max_size)
        self.policy = policy  # drop, backoff, retry

    async def produce(self, item):
        if len(self.queue) >= self.queue.maxlen:
            if self.policy == "drop":
                # Descartar item mais antigo
                self.queue.popleft()
            elif self.policy == "backoff":
                # Esperar até haver espaço
                while len(self.queue) >= self.queue.maxlen:
                    await asyncio.sleep(0.01)
        self.queue.append(item)

Padrão de desacoplamento com canais (Go channels)

package main

import "fmt"

func producer(ch chan<- int, limit int) {
    for i := 0; i < limit; i++ {
        select {
        case ch <- i:
            // Sucesso no envio
        default:
            // Canal cheio - backpressure
            fmt.Printf("Backpressure: item %d dropped\n", i)
        }
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for item := range ch {
        // Processar item
        fmt.Printf("Processing: %d\n", item)
    }
}

func main() {
    channel := make(chan int, 10) // Buffer limitado
    go producer(channel, 100)
    consumer(channel)
}

3. Backpressure em sistemas de mensageria e streaming

Configuração de prefetch count no RabbitMQ

# RabbitMQ consumer com prefetch controlado
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Define quantas mensagens o consumidor pode ter não-acknowledged
channel.basic_qos(prefetch_count=10)

def callback(ch, method, properties, body):
    # Processa a mensagem
    process_message(body)
    # Só faz ack após processamento bem-sucedido
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue='high_throughput_queue',
    on_message_callback=callback)

Rate limiting adaptativo no consumidor Kafka

from kafka import KafkaConsumer
import time

class AdaptiveConsumer:
    def __init__(self, topic, bootstrap_servers):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            max_poll_records=100,  # Limite por poll
            enable_auto_commit=False
        )
        self.processing_rate = 1000  # msgs/segundo
        self.last_poll_time = time.time()

    def consume_with_backpressure(self):
        while True:
            messages = self.consumer.poll(timeout_ms=1000)
            current_time = time.time()
            elapsed = current_time - self.last_poll_time

            # Ajusta taxa baseado no tempo de processamento
            if elapsed < 0.5:  # Processou rápido demais
                self.processing_rate *= 1.1
            elif elapsed > 1.0:  # Processou devagar
                self.processing_rate *= 0.9

            for message in messages:
                process_message(message)
                self.consumer.commit()

            self.last_poll_time = current_time

Backpressure nativo em Project Reactor

import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers

Flux.range(1, 100000)
    .publishOn(Schedulers.boundedElastic())
    .limitRate(500)  // Backpressure: máximo 500 itens por ciclo
    .map(this::processItem)
    .subscribe(
        item -> System.out.println("Processed: " + item),
        error -> System.err.println("Error: " + error)
    )

4. Mecanismos de backpressure em bancos de dados e APIs

Pooling de conexões com backpressure

import psycopg2
from psycopg2 import pool

class BackpressureConnectionPool:
    def __init__(self, min_conn=5, max_conn=20):
        self.pool = pool.ThreadedConnectionPool(
            min_conn, max_conn, 
            database="mydb", user="user", password="pass"
        )
        self.active_connections = 0

    def get_connection(self, timeout=5):
        if self.active_connections >= self.pool.maxconn:
            # Backpressure: esperar conexão disponível
            start = time.time()
            while self.active_connections >= self.pool.maxconn:
                if time.time() - start > timeout:
                    raise Exception("Backpressure timeout")
                time.sleep(0.1)
        conn = self.pool.getconn()
        self.active_connections += 1
        return conn

Throttling em API REST com token bucket

import time

class TokenBucket:
    def __init__(self, rate=100, burst=200):
        self.rate = rate  # tokens por segundo
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()

    def consume(self, tokens=1):
        self._refill()
        if self.tokens < tokens:
            return False  # Backpressure ativo
        self.tokens -= tokens
        return True

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(
            self.burst, 
            self.tokens + elapsed * self.rate
        )
        self.last_refill = now

# Uso na API
bucket = TokenBucket(rate=100, burst=200)

@app.route('/api/data')
def handle_request():
    if not bucket.consume():
        return {"error": "rate_limit_exceeded"}, 429
    # Processa requisição
    return process_data()

5. Monitoramento e ajuste dinâmico de backpressure

Métricas críticas para monitorar

import prometheus_client as prom

# Métricas de backpressure
backpressure_rejections = prom.Counter(
    'backpressure_rejections_total',
    'Total de requisições rejeitadas por backpressure'
)

queue_latency = prom.Histogram(
    'queue_latency_seconds',
    'Latência na fila de processamento',
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)

active_backpressure = prom.Gauge(
    'active_backpressure',
    'Indica se backpressure está ativo (1=sim, 0=não)'
)

resource_utilization = prom.Gauge(
    'resource_utilization_percent',
    'Utilização de recursos do sistema'
)

Algoritmo AIMD adaptativo

class AIMDController:
    def __init__(self, initial_rate=1000):
        self.rate = initial_rate
        self.add_increment = 50
        self.mult_decrement = 0.5
        self.error_threshold = 0.1  # 10% de erro

    def adjust_rate(self, error_rate):
        if error_rate < self.error_threshold:
            # Additive Increase
            self.rate += self.add_increment
        else:
            # Multiplicative Decrease
            self.rate *= self.mult_decrement
        return max(1, int(self.rate))

6. Estudo de caso: Pipeline de alto throughput (100k req/s)

Implementação combinada

class HighThroughputPipeline:
    def __init__(self):
        self.queue = BoundedQueue(max_size=10000, policy="backoff")
        self.circuit_breaker = CircuitBreaker(failure_threshold=50)
        self.rate_limiter = TokenBucket(rate=100000, burst=150000)
        self.aimd = AIMDController(initial_rate=50000)

    async def process_event(self, event):
        if not self.rate_limiter.consume():
            self.aimd.adjust_rate(0.3)  # Alta taxa de erro
            return {"status": "backpressure"}

        if not self.circuit_breaker.is_open():
            try:
                await self.queue.produce(event)
                result = await self._process(event)
                self.circuit_breaker.record_success()
                self.aimd.adjust_rate(0.01)  # Baixa taxa de erro
                return {"status": "success", "data": result}
            except Exception as e:
                self.circuit_breaker.record_failure()
                return {"status": "error", "detail": str(e)}
        else:
            # Circuit breaker aberto - backpressure total
            return {"status": "circuit_open"}

Resultados obtidos:
- Perda de dados reduzida em 99% (de 15% para 0.15%)
- Latência estável abaixo de 200ms (p95)
- Utilização de memória controlada em 70% do limite
- Zero quedas em cascata durante picos de 150k req/s

Referências