Como usar o módulo threading e multiprocessing em Python com segurança

1. Fundamentos: Threads vs Processos no Contexto do GIL

O Global Interpreter Lock (GIL) é um mutex que protege o interpretador Python, garantindo que apenas uma thread execute bytecode por vez. Isso significa que threads em Python não executam código em paralelo real para operações CPU-bound. Para operações I/O-bound (como leitura de arquivos, requisições HTTP), o GIL é liberado durante operações de I/O, permitindo concorrência eficiente.

import threading
import time

def operacao_io_bound():
    time.sleep(2)  # Simula operação de I/O
    return "Concluído"

def operacao_cpu_bound():
    soma = 0
    for i in range(10**7):
        soma += i
    return soma

# Threading é eficiente para I/O-bound
threads = [threading.Thread(target=operacao_io_bound) for _ in range(4)]
inicio = time.time()
for t in threads: t.start()
for t in threads: t.join()
print(f"Tempo I/O-bound com threads: {time.time() - inicio:.2f}s")

Para operações CPU-bound, multiprocessing cria processos com espaços de memória isolados, contornando o GIL e permitindo paralelismo real.

import multiprocessing as mp

def operacao_cpu_paralela():
    soma = 0
    for i in range(10**7):
        soma += i
    return soma

with mp.Pool(4) as pool:
    resultados = pool.map(operacao_cpu_paralela, range(4))

2. Sincronização Segura com threading.Lock e threading.RLock

O Lock evita condições de corrida quando múltiplas threads acessam recursos compartilhados.

import threading

contador = 0
lock = threading.Lock()

def incrementar_seguro():
    global contador
    with lock:  # Padrão seguro
        contador += 1

def incrementar_inseguro():
    global contador
    contador += 1  # Condição de corrida!

threads = [threading.Thread(target=incrementar_seguro) for _ in range(1000)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Contador seguro: {contador}")  # Sempre 1000

RLock (Reentrant Lock) permite que a mesma thread adquira o lock múltiplas vezes sem deadlock, útil em chamadas aninhadas.

rlock = threading.RLock()

def funcao_aninhada():
    with rlock:
        # Já possui o lock, pode adquirir novamente
        with rlock:
            print("Reentrada segura")

# Com Lock comum, isso causaria deadlock
threading.Thread(target=funcao_aninhada).start()

3. Evitando Deadlocks e Starvation em Múltiplos Locks

A hierarquia de locks é a técnica mais eficaz contra deadlocks: sempre adquirir locks na mesma ordem.

lock_a = threading.Lock()
lock_b = threading.Lock()

def tarefa_segura():
    with lock_a:
        with lock_b:  # Ordem consistente: A -> B
            print("Operação segura")

def tarefa_perigosa():
    with lock_b:
        with lock_a:  # Ordem invertida: B -> A (risco de deadlock)
            print("Operação perigosa")

Timeouts ajudam a detectar bloqueios:

def tarefa_com_timeout():
    if lock_a.acquire(timeout=5):
        try:
            # Operação crítica
            pass
        finally:
            lock_a.release()
    else:
        print("Timeout ao adquirir lock_a")

4. Comunicação Segura entre Processos com multiprocessing.Queue e Pipe

Queue é thread-safe e process-safe, ideal para padrão produtor-consumidor.

import multiprocessing as mp

def produtor(queue):
    for i in range(10):
        queue.put(f"Item {i}")

def consumidor(queue):
    while True:
        item = queue.get()
        if item is None:  # Sinal de término
            break
        print(f"Processado: {item}")

queue = mp.Queue()
p1 = mp.Process(target=produtor, args=(queue,))
p2 = mp.Process(target=consumidor, args=(queue,))
p1.start(); p2.start()
p1.join()
queue.put(None)  # Sinaliza término
p2.join()

Pipe oferece comunicação bidirecional, mas requer cuidados com fechamento de extremidades.

conexao_pai, conexao_filho = mp.Pipe()

def processo_filho(conn):
    conn.send("Mensagem do filho")
    conn.close()

p = mp.Process(target=processo_filho, args=(conexao_filho,))
p.start()
mensagem = conexao_pai.recv()
print(f"Recebido: {mensagem}")
conexao_pai.close()
p.join()

5. Gerenciamento de Estado Compartilhado com multiprocessing.Manager

Manager cria objetos compartilhados entre processos com sincronização implícita.

import multiprocessing as mp

def trabalhador(dicionario, chave, valor):
    dicionario[chave] = valor

with mp.Manager() as manager:
    dicionario_compartilhado = manager.dict()
    processos = []
    for i in range(5):
        p = mp.Process(target=trabalhador, args=(dicionario_compartilhado, f"chave_{i}", i))
        processos.append(p)
        p.start()
    for p in processos:
        p.join()
    print(dicionario_compartilhado)

Para maior performance, use multiprocessing.Value e multiprocessing.Array com locks embutidos.

contador_compartilhado = mp.Value('i', 0)
array_compartilhado = mp.Array('d', [0.0, 0.0, 0.0])

def incrementar(valor):
    with valor.get_lock():  # Lock embutido
        valor.value += 1

6. Pool de Trabalhadores e Tratamento de Exceções

multiprocessing.Pool e concurrent.futures.ThreadPoolExecutor simplificam o gerenciamento de trabalhadores.

from concurrent.futures import ThreadPoolExecutor, as_completed

def tarefa_com_erro(x):
    if x == 5:
        raise ValueError(f"Erro no item {x}")
    return x * 2

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(tarefa_com_erro, i): i for i in range(10)}
    for future in as_completed(futures):
        try:
            resultado = future.result()
            print(f"Sucesso: {resultado}")
        except Exception as e:
            print(f"Exceção capturada: {e}")

Para processos, encerramento gracioso é essencial:

with mp.Pool(4) as pool:
    try:
        resultados = pool.map(tarefa_com_erro, range(10))
    except Exception as e:
        print(f"Erro no pool: {e}")
        pool.terminate()  # Encerra imediatamente
    finally:
        pool.join()  # Aguarda término

7. Padrões Avançados de Segurança: Barrier, Semaphore e Event

Barrier sincroniza múltiplos pontos de encontro:

barreira = threading.Barrier(3)

def tarefa_sincronizada(nome):
    print(f"{nome} chegou na barreira")
    barreira.wait()  # Aguarda todas as 3 threads
    print(f"{nome} passou da barreira")

threads = [threading.Thread(target=tarefa_sincronizada, args=(f"Thread {i}",)) for i in range(3)]
for t in threads: t.start()
for t in threads: t.join()

Semaphore controla acesso a recursos limitados:

semaforo = threading.Semaphore(3)  # Máximo 3 conexões simultâneas

def conexao_banco(id_conexao):
    with semaforo:
        print(f"Conexão {id_conexao} ativa")
        time.sleep(1)
        print(f"Conexão {id_conexao} liberada")

Event para sinalização sem polling:

evento = threading.Event()

def trabalhador():
    print("Aguardando sinal...")
    evento.wait()  # Bloqueia até evento ser setado
    print("Sinal recebido!")

def sinalizador():
    time.sleep(2)
    evento.set()  # Libera todos os workers

threading.Thread(target=trabalhador).start()
threading.Thread(target=sinalizador).start()

8. Testes e Debug de Concorrência em Python

Para reproduzir condições de corrida, use time.sleep() controlado:

import time

contador_teste = 0
lock_teste = threading.Lock()

def operacao_lenta():
    global contador_teste
    with lock_teste:
        temp = contador_teste
        time.sleep(0.001)  # Força condição de corrida
        contador_teste = temp + 1

Ferramentas úteis para debug:

import faulthandler
faulthandler.enable()  # Dump de stacks em caso de deadlock

# Para detectar vazamento de threads
import threading
print(f"Threads ativas: {threading.active_count()}")
for thread in threading.enumerate():
    print(f"Thread: {thread.name}")

Referências