Comunicação entre processos e filas
1. Introdução à Comunicação entre Processos (IPC)
Em Python, cada processo possui seu próprio espaço de memória isolado, o que significa que variáveis e objetos não são compartilhados entre processos por padrão. Essa característica, embora traga segurança e estabilidade, cria a necessidade de mecanismos de Comunicação entre Processos (IPC) para que processos possam trocar dados e coordenar suas ações.
A comunicação entre processos é essencial quando:
- Precisamos distribuir tarefas pesadas entre múltiplos núcleos da CPU
- Desejamos construir sistemas produtor-consumidor
- Requeremos processamento paralelo com troca de resultados intermediários
O módulo multiprocessing do Python oferece diversos mecanismos de IPC, sendo as filas (Queue) e os pipes (Pipe) os mais comuns. Diferente de threading, onde o GIL (Global Interpreter Lock) limita a execução paralela em CPU-bound tasks, o multiprocessamento contorna essa limitação criando processos independentes.
2. O Módulo multiprocessing: Pilares da Comunicação
O módulo multiprocessing fornece APIs similares ao threading, mas com processos ao invés de threads. Vamos explorar seus componentes principais:
from multiprocessing import Process, Value, Array, Manager
# Compartilhamento de estado simples com Value e Array
contador = Value('i', 0) # Inteiro compartilhado
numeros = Array('d', [1.0, 2.0, 3.0]) # Array de doubles
# Gerenciador para objetos complexos
manager = Manager()
dicionario_compartilhado = manager.dict()
lista_compartilhada = manager.list()
3. Filas (Queues) para Troca de Mensagens
As filas são o mecanismo mais seguro e flexível para comunicação entre processos. O módulo multiprocessing oferece a classe Queue, que é thread-safe e process-safe.
Queue Básica
from multiprocessing import Process, Queue
def produtor(fila):
for i in range(5):
fila.put(f"Item {i}")
fila.put(None) # Sinal de término
def consumidor(fila):
while True:
item = fila.get()
if item is None:
break
print(f"Processado: {item}")
if __name__ == "__main__":
fila = Queue()
p1 = Process(target=produtor, args=(fila,))
p2 = Process(target=consumidor, args=(fila,))
p1.start()
p2.start()
p1.join()
p2.join()
JoinableQueue para Sincronização
A JoinableQueue estende a Queue adicionando métodos task_done() e join() para sincronização:
from multiprocessing import Process, JoinableQueue
def worker(fila):
while True:
tarefa = fila.get()
if tarefa is None:
fila.task_done()
break
resultado = tarefa ** 2
print(f"Resultado: {resultado}")
fila.task_done()
if __name__ == "__main__":
fila = JoinableQueue()
# Iniciar workers
workers = [Process(target=worker, args=(fila,)) for _ in range(3)]
for w in workers:
w.start()
# Alimentar tarefas
for i in range(10):
fila.put(i)
# Sinais de término
for _ in workers:
fila.put(None)
fila.join() # Aguarda todas as tarefas serem processadas
Comparação entre Tipos de Filas
multiprocessing.Queue: Thread-safe e process-safe, suporta objetos serializáveisqueue.Queue: Apenas thread-safe, não funciona entre processosSimpleQueue: Mais leve que Queue, sem funcionalidades extras
4. Pipes e Conexões Bidirecionais
Pipes oferecem comunicação direta entre dois processos, sendo mais rápidos que filas para comunicação ponto a ponto.
from multiprocessing import Process, Pipe
def processador(conn):
"""Processo filho que recebe e envia dados"""
dados = conn.recv()
print(f"Recebido: {dados}")
conn.send(f"Processado: {dados * 2}")
conn.close()
if __name__ == "__main__":
# Pipe() retorna duas extremidades: (parent_conn, child_conn)
parent_conn, child_conn = Pipe()
p = Process(target=processador, args=(child_conn,))
p.start()
parent_conn.send(42)
resultado = parent_conn.recv()
print(f"Resultado final: {resultado}")
p.join()
5. Sincronização e Controle de Concorrência
Para evitar condições de corrida e garantir acesso seguro a recursos compartilhados, o módulo multiprocessing oferece primitivas de sincronização:
from multiprocessing import Process, Lock, Semaphore, Event, Barrier
# Lock para exclusão mútua
lock = Lock()
def tarefa_segura(lock, recurso):
with lock:
# Acesso exclusivo ao recurso
recurso['valor'] += 1
# Semáforo para controlar acesso a recursos limitados
semaforo = Semaphore(2) # Permite 2 acessos simultâneos
# Evento para sincronização
evento = Event()
def esperar_evento(evento):
print("Aguardando evento...")
evento.wait()
print("Evento recebido!")
# Barreira para sincronizar múltiplos processos
barreira = Barrier(3) # 3 processos devem chegar antes de prosseguir
6. Padrões Práticos com Filas e Processos
Padrão Produtor-Consumidor Avançado
from multiprocessing import Process, Queue
import time
import random
def produtor(fila, id_produtor):
for i in range(3):
item = f"P{id_produtor}-Item{i}"
fila.put(item)
print(f"Produzido: {item}")
time.sleep(random.random())
def consumidor(fila, id_consumidor):
while True:
item = fila.get()
if item is "FIM":
fila.put("FIM") # Repassar para outros consumidores
break
print(f"Consumidor {id_consumidor} processou: {item}")
time.sleep(random.random())
if __name__ == "__main__":
fila = Queue(maxsize=5)
produtores = [Process(target=produtor, args=(fila, i)) for i in range(2)]
consumidores = [Process(target=consumidor, args=(fila, i)) for i in range(3)]
for p in produtores:
p.start()
for c in consumidores:
c.start()
for p in produtores:
p.join()
fila.put("FIM") # Sinal para consumidores
for c in consumidores:
c.join()
Pool de Workers com Fila de Resultados
from multiprocessing import Process, Queue
import time
def worker(tarefas, resultados):
while True:
tarefa = tarefas.get()
if tarefa is None:
break
resultado = tarefa ** 2
resultados.put(resultado)
time.sleep(0.1)
if __name__ == "__main__":
tarefas = Queue()
resultados = Queue()
# Criar workers
workers = [Process(target=worker, args=(tarefas, resultados))
for _ in range(4)]
for w in workers:
w.start()
# Enviar tarefas
for i in range(20):
tarefas.put(i)
# Sinal de término
for _ in workers:
tarefas.put(None)
# Coletar resultados
for _ in range(20):
resultado = resultados.get()
print(f"Resultado coletado: {resultado}")
for w in workers:
w.join()
7. Boas Práticas e Armadilhas Comuns
Deadlocks em Filas
# Evite deadlocks definindo timeout
try:
item = fila.get(timeout=1)
except:
print("Timeout ao aguardar item")
# Sempre defina maxsize para filas longas
fila = Queue(maxsize=100)
Gerenciamento de Recursos
from multiprocessing import Process, Queue
import atexit
def cleanup(fila, processos):
"""Limpeza segura de recursos"""
for _ in processos:
fila.put(None) # Sinal para workers pararem
for p in processos:
p.join(timeout=2)
if p.is_alive():
p.terminate()
# Use context managers ou try/finally para garantir limpeza
Dicas Importantes
- Serialização: Objetos enviados via filas/pipe são serializados com pickle; classes complexas precisam ser serializáveis
- Memória: Filas armazenam objetos em memória; evite itens muito grandes
- Debugging: Use
multiprocessing.log_to_stderr()para logs detalhados - Testes: Isole o código IPC para facilitar testes unitários
Referências
- Documentação oficial do módulo multiprocessing — Referência completa sobre criação de processos, filas, pipes e sincronização
- Python Multiprocessing: The Complete Guide — Guia prático com exemplos detalhados de IPC usando multiprocessing
- Comunicação entre Processos com Queue e Pipe — Tutorial do Real Python sobre concorrência e IPC
- PEP 371 – Addition of the multiprocessing package — Proposta original e especificações do módulo multiprocessing
- Effective Python: 90 Specific Ways to Write Better Python — Item 55: "Use multiprocessing for CPU-bound tasks" com exemplos práticos de IPC
- Python Multiprocessing Pool vs Process — Comparação detalhada entre Pool e Process para comunicação entre processos