Como usar o módulo asyncio em Python para I/O concorrente eficiente

1. Fundamentos do asyncio: Event Loop e Corrotinas

O módulo asyncio introduz um modelo de concorrência baseado em event loop (laço de eventos). Diferente do threading tradicional, onde múltiplas threads compartilham a CPU com preempção, o asyncio utiliza um único thread que gerencia uma fila de tarefas cooperativas. O event loop alterna entre essas tarefas quando elas encontram operações de I/O bloqueantes, permitindo que o programa continue executando outras tarefas enquanto aguarda respostas externas.

Uma corrotina é definida com async def e pode ser pausada com await. O exemplo abaixo mostra a estrutura básica:

import asyncio

async def saudacao():
    print("Olá...")
    await asyncio.sleep(1)  # Simula I/O não bloqueante
    print("Mundo!")

# Execução
asyncio.run(saudacao())

A principal diferença entre concorrência assíncrona e paralelismo é que o asyncio não executa código simultaneamente em múltiplos núcleos. Ele gerencia a alternância entre tarefas de forma cooperativa, enquanto o paralelismo (com multiprocessing) utiliza múltiplos processos para execução real simultânea. O threading, por sua vez, usa múltiplas threads que podem sofrer com condições de corrida e overhead de contexto.

2. Gerenciamento do Event Loop e Tarefas

O método asyncio.run() é a forma recomendada para iniciar o event loop desde Python 3.7+. Ele cria, gerencia e fecha o loop automaticamente. Para agendar múltiplas tarefas, usamos asyncio.create_task() ou asyncio.gather():

import asyncio

async def tarefa(nome, tempo):
    print(f"Iniciando {nome}")
    await asyncio.sleep(tempo)
    print(f"Finalizando {nome}")
    return f"{nome} concluída"

async def main():
    # Criando tarefas explicitamente
    t1 = asyncio.create_task(tarefa("Tarefa A", 2))
    t2 = asyncio.create_task(tarefa("Tarefa B", 1))

    # gather executa todas e retorna resultados
    resultados = await asyncio.gather(t1, t2)
    print(resultados)

    # Cancelamento de tarefa
    t3 = asyncio.create_task(tarefa("Tarefa C", 3))
    await asyncio.sleep(0.5)
    t3.cancel()
    try:
        await t3
    except asyncio.CancelledError:
        print("Tarefa C foi cancelada")

asyncio.run(main())

O asyncio.shield() protege uma tarefa de cancelamento externo, útil para operações críticas:

async def operacao_critica():
    await asyncio.sleep(5)
    return "Dado importante"

async def main():
    tarefa = asyncio.create_task(operacao_critica())
    # Mesmo que tarefa seja cancelada, shield protege
    protegida = asyncio.shield(tarefa)
    await asyncio.sleep(1)
    tarefa.cancel()
    resultado = await protegida  # Ainda obtém o resultado
    print(resultado)

3. Operações de I/O Assíncronas: Awaitables e Futures

Operações de I/O tradicionalmente bloqueantes podem ser substituídas por versões assíncronas. O asyncio.sleep() é o exemplo mais simples, mas também temos asyncio.open_connection() para redes:

import asyncio

async def consultar_google():
    reader, writer = await asyncio.open_connection(
        'google.com', 80
    )
    request = (
        "GET / HTTP/1.1\r\n"
        "Host: google.com\r\n"
        "Connection: close\r\n\r\n"
    ).encode()
    writer.write(request)
    await writer.drain()

    resposta = await reader.read(1024)
    print(resposta[:200].decode(errors='ignore'))
    writer.close()

asyncio.run(consultar_google())

asyncio.Future e asyncio.Task são awaitables que representam resultados futuros. Enquanto Task é uma subclasse de Future associada a uma corrotina, Future pode ser usado para resultados arbitrários:

async def usar_future():
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    # Aguardando múltiplos awaitables com as_completed
    async def tarefa_rapida():
        await asyncio.sleep(0.5)
        return "Rápida"

    async def tarefa_lenta():
        await asyncio.sleep(2)
        return "Lenta"

    tarefas = [tarefa_rapida(), tarefa_lenta()]
    for coro in asyncio.as_completed(tarefas):
        resultado = await coro
        print(f"Resultado: {resultado}")

4. Sincronização entre Corrotinas

Para evitar condições de corrida ao acessar recursos compartilhados, o asyncio oferece primitivas de sincronização:

import asyncio

async def exemplo_lock():
    lock = asyncio.Lock()
    recurso_compartilhado = 0

    async def atualizar():
        nonlocal recurso_compartilhado
        async with lock:  # Garante acesso exclusivo
            valor_atual = recurso_compartilhado
            await asyncio.sleep(0.1)
            recurso_compartilhado = valor_atual + 1

    await asyncio.gather(atualizar(), atualizar(), atualizar())
    print(f"Valor final: {recurso_compartilhado}")  # 3

asyncio.run(exemplo_lock())

Semáforos limitam concorrência, e filas assíncronas permitem comunicação produtor-consumidor:

async def exemplo_fila():
    fila = asyncio.Queue(maxsize=3)

    async def produtor():
        for i in range(5):
            await fila.put(f"Item {i}")
            await asyncio.sleep(0.2)

    async def consumidor():
        while True:
            item = await fila.get()
            print(f"Processando {item}")
            fila.task_done()

    prod = asyncio.create_task(produtor())
    cons = asyncio.create_task(consumidor())
    await prod
    await fila.join()  # Aguarda até fila esvaziar
    cons.cancel()

5. I/O com Arquivos e Sockets Assíncronos

Para operações de arquivo, a biblioteca aiofiles fornece versões assíncronas:

import asyncio
import aiofiles

async def ler_arquivo_grande():
    async with aiofiles.open('dados.txt', 'r') as f:
        async for linha in f:
            print(linha.strip())

asyncio.run(ler_arquivo_grande())

Para sockets, podemos criar servidores TCP:

import asyncio

async def handle_client(reader, writer):
    data = await reader.read(100)
    mensagem = data.decode()
    addr = writer.get_extra_info('peername')
    print(f"Recebido de {addr}: {mensagem}")

    writer.write(f"Eco: {mensagem}".encode())
    await writer.drain()
    writer.close()

async def servidor_echo():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888
    )
    async with server:
        await server.serve_forever()

# asyncio.run(servidor_echo())  # Descomentar para executar

6. Integração com Bibliotecas e Código Síncrono

Para executar código bloqueante sem travar o event loop, use run_in_executor:

import asyncio
import time

def funcao_bloqueante():
    time.sleep(2)  # Simula operação pesada
    return "Pronto!"

async def main():
    loop = asyncio.get_running_loop()
    resultado = await loop.run_in_executor(None, funcao_bloqueante)
    print(resultado)

asyncio.run(main())

Bibliotecas assíncronas populares incluem aiohttp para HTTP, asyncpg para PostgreSQL e aiomysql para MySQL:

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch_url(session, 'https://example.com')
        print(f"Tamanho do HTML: {len(html)} bytes")

asyncio.run(main())

7. Boas Práticas e Padrões de Projeto

Sempre use await em operações de I/O para evitar bloqueios acidentais. Gerencie contexto com async with:

import asyncio

class RecursoAssincrono:
    async def __aenter__(self):
        print("Abrindo recurso...")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, *args):
        print("Fechando recurso...")
        await asyncio.sleep(0.1)

async def main():
    async with RecursoAssincrono() as recurso:
        print("Usando recurso")

asyncio.run(main())

Para timeouts, use asyncio.timeout() (Python 3.11+):

async def operacao_lenta():
    await asyncio.sleep(10)
    return "Resultado"

async def main():
    try:
        async with asyncio.timeout(2):
            resultado = await operacao_lenta()
    except TimeoutError:
        print("Operação excedeu o tempo limite!")

asyncio.run(main())

8. Comparação com Alternativas e Limitações

Característica asyncio threading multiprocessing
Concorrência real Não (cooperativa) Sim (preemptiva) Sim (paralela)
Overhead Baixo Médio Alto
Compartilhamento Fácil (mesmo thread) Cuidadoso (locks) Difícil (IPC)
CPU-bound Não recomendado Não (GIL) Recomendado

O asyncio é ideal para I/O-bound tasks (requisições HTTP, bancos de dados, leitura de arquivos), mas não para tarefas pesadas de CPU. Alternativas como curio e trio oferecem modelos de concorrência estruturada, enquanto uvloop substitui o event loop padrão por uma implementação mais rápida em C:

import asyncio
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# A partir daqui, asyncio.run() usará uvloop automaticamente

Referências