Asyncio: programação assíncrona com async/await
1. Fundamentos da Programação Assíncrona em Python
A programação assíncrona em Python resolve um problema clássico: operações de I/O (entrada/saída) que bloqueiam a execução do programa. Quando seu código precisa ler um arquivo, fazer uma requisição HTTP ou consultar um banco de dados, o tempo de espera pode ser significativo — e nesse período, o processador fica ocioso.
O problema do I/O bloqueante
Imagine um servidor web que precisa atender 100 requisições simultâneas. Com código síncrono tradicional, cada requisição bloquearia a thread até que a resposta fosse completamente processada. Isso resulta em baixa utilização de recursos e tempo de resposta elevado.
Concorrência vs Paralelismo vs Threading
- Paralelismo (multiprocessing): Executa múltiplas tarefas simultaneamente em diferentes núcleos da CPU. Ideal para tarefas CPU-bound (processamento intensivo).
- Threading: Alterna entre threads no mesmo processo. Útil para I/O, mas sofre com contenção de recursos e o GIL (Global Interpreter Lock).
- Concorrência (asyncio): Usa um único thread com um loop de eventos que gerencia múltiplas tarefas de forma cooperativa. Cada tarefa cede o controle voluntariamente quando está esperando I/O.
O loop de eventos
O event loop é o coração do asyncio. Ele gerencia uma fila de tarefas (corrotinas), executa uma de cada vez, e quando uma tarefa encontra um await, ela "dorme" enquanto o loop passa para a próxima tarefa pronta para executar.
import asyncio
async def tarefa_demorada(nome, tempo):
print(f"Iniciando {nome}")
await asyncio.sleep(tempo) # Cede o controle aqui
print(f"Finalizando {nome}")
return f"Resultado de {nome}"
async def main():
# O loop gerencia ambas as tarefas concorrentemente
resultado = await asyncio.gather(
tarefa_demorada("A", 2),
tarefa_demorada("B", 1)
)
print(resultado)
asyncio.run(main())
2. Sintaxe Essencial: async/await e Corrotinas
Definindo corrotinas
Uma corrotina é definida com async def. Diferente de funções comuns, ela não executa imediatamente — retorna um objeto corrotina.
async def minha_corrotina():
return 42
# Isso NÃO executa a corrotina
corrotina_obj = minha_corrotina()
print(type(corrotina_obj)) # <class 'coroutine'>
# Para executar, precisamos de await ou asyncio.run()
resultado = await corrotina_obj # Funciona apenas dentro de outra corrotina
O poder do await
await pausa a execução da corrotina atual sem bloquear o loop de eventos. Enquanto uma corrotina espera, outras podem executar.
async def buscar_dados():
print("Buscando dados...")
await asyncio.sleep(2) # Simula I/O
return {"status": "ok"}
async def processar():
dados = await buscar_dados() # Aguarda sem bloquear
print(f"Dados recebidos: {dados}")
asyncio.run(processar())
asyncio.run() vs await
asyncio.run(): Cria um novo event loop, executa a corrotina e fecha o loop. Deve ser chamado apenas uma vez no ponto de entrada do programa.await: Usado dentro de corrotinas para aguardar outras corrotinas.
3. Gerenciamento de Tarefas (Tasks) e Concorrência
Criando tasks com create_task()
Tasks permitem executar múltiplas corrotinas concorrentemente:
async def tarefa_1():
await asyncio.sleep(2)
return "Tarefa 1 concluída"
async def tarefa_2():
await asyncio.sleep(1)
return "Tarefa 2 concluída"
async def main():
task1 = asyncio.create_task(tarefa_1())
task2 = asyncio.create_task(tarefa_2())
# Aguarda ambas as tasks
resultado1 = await task1
resultado2 = await task2
print(resultado1, resultado2)
asyncio.run(main())
asyncio.gather() e asyncio.wait()
gather() agrupa múltiplas corrotinas e retorna uma lista com todos os resultados:
async def main():
resultados = await asyncio.gather(
tarefa_1(),
tarefa_2(),
return_exceptions=True # Captura exceções sem interromper outras tasks
)
print(resultados)
Cancelamento e tratamento de exceções
async def tarefa_longa():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("Tarefa cancelada!")
raise
async def main():
task = asyncio.create_task(tarefa_longa())
await asyncio.sleep(1)
task.cancel() # Cancela a task
try:
await task
except asyncio.CancelledError:
print("Cancelamento tratado")
4. Operações Assíncronas de I/O na Prática
Leitura/escrita assíncrona com aiofiles
import aiofiles
async def ler_arquivo():
async with aiofiles.open('dados.txt', mode='r') as f:
conteudo = await f.read()
return conteudo
async def escrever_arquivo():
async with aiofiles.open('saida.txt', mode='w') as f:
await f.write("Dados assíncronos")
Requisições HTTP com aiohttp
import aiohttp
async def buscar_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://api.exemplo.com/dados1",
"https://api.exemplo.com/dados2"
]
async with aiohttp.ClientSession() as session:
tarefas = [buscar_url(session, url) for url in urls]
resultados = await asyncio.gather(*tarefas)
for resultado in resultados:
print(len(resultado))
asyncio.run(main())
Timeouts e delays
async def operacao_lenta():
await asyncio.sleep(5)
return "Concluído"
async def main():
try:
resultado = await asyncio.wait_for(
operacao_lenta(),
timeout=3.0 # Levanta TimeoutError após 3 segundos
)
except asyncio.TimeoutError:
print("Operação excedeu o tempo limite")
5. Sincronização e Compartilhamento de Estado
Locks assíncronos
lock = asyncio.Lock()
recurso_compartilhado = 0
async def incrementar():
global recurso_compartilhado
async with lock: # Garante acesso exclusivo
valor_atual = recurso_compartilhado
await asyncio.sleep(0.1) # Simula processamento
recurso_compartilhado = valor_atual + 1
Semáforos e eventos
# Limita a 3 conexões simultâneas
semaphore = asyncio.Semaphore(3)
async def acessar_api(id):
async with semaphore:
print(f"Requisição {id} iniciada")
await asyncio.sleep(1)
print(f"Requisição {id} finalizada")
# Evento para sincronização
evento = asyncio.Event()
async def produtor():
await asyncio.sleep(2)
evento.set() # Sinaliza que o evento ocorreu
async def consumidor():
await evento.wait() # Aguarda até o evento ser setado
print("Evento recebido!")
Filas assíncronas
async def produtor(fila):
for i in range(5):
await fila.put(f"Item {i}")
await asyncio.sleep(0.5)
async def consumidor(fila):
while True:
item = await fila.get()
print(f"Processando {item}")
fila.task_done()
async def main():
fila = asyncio.Queue(maxsize=10)
await asyncio.gather(
produtor(fila),
consumidor(fila)
)
6. Padrões Avançados e Boas Práticas
Context managers assíncronos
class ConexaoAsync:
async def __aenter__(self):
print("Abrindo conexão...")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Fechando conexão...")
await asyncio.sleep(0.5)
async def main():
async with ConexaoAsync() as conn:
print("Usando conexão")
Iteração assíncrona
class AsyncRange:
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.current - 1
async def main():
async for numero in AsyncRange(1, 5):
print(numero)
Evitando código bloqueante
import concurrent.futures
def operacao_cpu_intensiva():
# Código síncrono pesado
return sum(i * i for i in range(10**7))
async def main():
loop = asyncio.get_event_loop()
# Executa em thread separada para não bloquear o event loop
resultado = await loop.run_in_executor(
None, # Usa ThreadPoolExecutor padrão
operacao_cpu_intensiva
)
print(resultado)
7. Comparação com Alternativas e Limitações do asyncio
Quando usar cada abordagem
| Cenário | Abordagem Recomendada |
|---|---|
| Muitas conexões de rede simultâneas | asyncio |
| Processamento CPU intensivo | multiprocessing |
| I/O com bibliotecas que não suportam async | threading |
| Operações de I/O mistas com CPU | asyncio + run_in_executor |
Limitações do asyncio
- Tasks cooperativas: Se uma task demorar muito sem usar
await, todas as outras ficam bloqueadas - CPU-bound: Para processamento intensivo, use
multiprocessingourun_in_executor - GIL: Embora asyncio contorne o GIL para I/O, operações CPU-bound ainda sofrem com ele
- Curva de aprendizado: Requer mudança de mentalidade em relação ao código síncrono
Ferramentas complementares
# uvloop: substitui o event loop padrão por uma implementação mais rápida
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Modo debug para identificar problemas
asyncio.run(main(), debug=True)
Referências
- Documentação Oficial do asyncio — Referência completa da biblioteca asyncio, incluindo todas as APIs e exemplos oficiais.
- Real Python: Async IO in Python — Tutorial abrangente com exemplos práticos e explicações detalhadas sobre programação assíncrona.
- PEP 492 – Coroutines with async and await syntax — Especificação oficial que introduziu as palavras-chave async/await em Python.
- aiohttp Documentation — Documentação da biblioteca aiohttp para requisições HTTP assíncronas.
- uvloop: Fast asyncio Event Loop — Documentação do uvloop, uma alternativa de alto desempenho ao event loop padrão.
- Python Asyncio: The Complete Guide — Guia completo com exemplos avançados, padrões de design e boas práticas.