Programação assíncrona em Python com asyncio

1. Fundamentos do Modelo Assíncrono em Python

A programação assíncrona em Python representa uma mudança paradigmática na forma como lidamos com operações de I/O. Para compreendê-la, é essencial distinguir três conceitos frequentemente confundidos:

Concorrência é a capacidade de lidar com múltiplas tarefas alternando sua execução. Paralelismo envolve executar tarefas simultaneamente em múltiplos núcleos de CPU. Assincronismo é um modelo de concorrência onde uma única thread gerencia múltiplas operações, suspendendo e retomando tarefas conforme necessário.

O GIL (Global Interpreter Lock) do Python impede que múltiplas threads executem bytecode simultaneamente. O asyncio contorna essa limitação não através de paralelismo real, mas sim utilizando um event loop — um gerenciador central que coordena a execução de corrotinas. O event loop funciona como um maestro: ele mantém uma fila de tarefas, executa cada uma até encontrar um await, então suspende essa tarefa e passa para a próxima. Esse ciclo de vida inclui as fases de agendamento, execução, suspensão e retomada.

import asyncio

async def exemplo_loop():
    print("Início do event loop")
    await asyncio.sleep(1)
    print("Fim do event loop")

asyncio.run(exemplo_loop())

2. Corrotinas e Await: A Base da Programação Assíncrona

Corrotinas são funções definidas com async def que podem ser suspensas e retomadas. A palavra-chave await indica pontos onde a corrotina pode ceder o controle para o event loop.

import asyncio

async def saudacao(nome):
    await asyncio.sleep(1)
    return f"Olá, {nome}!"

async def main():
    # Executando uma corrotina diretamente
    resultado = await saudacao("Maria")
    print(resultado)

    # Criando tarefas para execução concorrente
    tarefa1 = asyncio.create_task(saudacao("João"))
    tarefa2 = asyncio.create_task(saudacao("Ana"))

    resultados = await asyncio.gather(tarefa1, tarefa2)
    print(resultados)

asyncio.run(main())

Boas práticas: Evite usar await em operações bloqueantes síncronas como time.sleep() ou leitura de arquivos com open(). Isso bloquearia todo o event loop. Prefira asyncio.sleep() e bibliotecas assíncronas.

3. Gerenciamento de Tarefas e Concorrência

O asyncio oferece ferramentas poderosas para gerenciar múltiplas tarefas concorrentes:

import asyncio

async def tarefa_demorada(id_tarefa, tempo):
    print(f"Tarefa {id_tarefa} iniciada")
    await asyncio.sleep(tempo)
    print(f"Tarefa {id_tarefa} concluída")
    return id_tarefa

async def main():
    # gather: executa todas e aguarda todas
    resultados = await asyncio.gather(
        tarefa_demorada(1, 2),
        tarefa_demorada(2, 1),
        tarefa_demorada(3, 3)
    )
    print(f"Resultados: {resultados}")

    # wait: controle mais granular com timeout
    tarefas = [
        asyncio.create_task(tarefa_demorada(4, 5)),
        asyncio.create_task(tarefa_demorada(5, 1))
    ]

    try:
        done, pending = await asyncio.wait_for(
            asyncio.wait(tarefas, timeout=2),
            timeout=3
        )
        print(f"Concluídas: {len(done)}, Pendentes: {len(pending)}")
    except asyncio.TimeoutError:
        print("Timeout atingido")

asyncio.run(main())

4. Sincronização e Compartilhamento de Estado

Em ambientes assíncronos, race conditions podem ocorrer mesmo em uma única thread. O asyncio fornece primitivas de sincronização:

import asyncio

async def exemplo_lock():
    recurso_compartilhado = 0
    lock = asyncio.Lock()

    async def incrementar():
        nonlocal recurso_compartilhado
        async with lock:
            valor_atual = recurso_compartilhado
            await asyncio.sleep(0.1)  # Simula operação de I/O
            recurso_compartilhado = valor_atual + 1

    tarefas = [incrementar() for _ in range(10)]
    await asyncio.gather(*tarefas)
    print(f"Valor final: {recurso_compartilhado}")  # Sempre 10

asyncio.run(exemplo_lock())

Filas assíncronas são essenciais para comunicação entre produtores e consumidores:

async def produtor_consumidor():
    fila = asyncio.Queue()

    async def produtor():
        for i in range(5):
            await fila.put(f"Item {i}")
            await asyncio.sleep(0.5)

    async def consumidor():
        while True:
            item = await fila.get()
            print(f"Processado: {item}")
            fila.task_done()

    produtor_task = asyncio.create_task(produtor())
    consumidor_task = asyncio.create_task(consumidor())

    await produtor_task
    await fila.join()  # Aguarda processamento de todos os itens
    consumidor_task.cancel()

5. Operações de I/O Assíncronas na Prática

Para I/O de arquivos, utilizamos aiofiles:

import asyncio
import aiofiles

async def ler_arquivo_grande():
    async with aiofiles.open('dados.txt', 'r') as arquivo:
        conteudo = await arquivo.read()
        return conteudo

async def escrever_arquivo():
    async with aiofiles.open('saida.txt', 'w') as arquivo:
        await arquivo.write("Dados assíncronos\n")

Para requisições HTTP, aiohttp e httpx são as principais opções:

import aiohttp
import asyncio

async def buscar_dados():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.exemplo.com/dados') as response:
            dados = await response.json()
            return dados

async def multiplas_requisicoes():
    urls = [
        'https://api.exemplo.com/recurso1',
        'https://api.exemplo.com/recurso2',
        'https://api.exemplo.com/recurso3'
    ]

    async with aiohttp.ClientSession() as session:
        tarefas = [session.get(url) for url in urls]
        respostas = await asyncio.gather(*tarefas)

        resultados = []
        for resp in respostas:
            dados = await resp.json()
            resultados.append(dados)

        return resultados

Para bancos de dados, drivers como asyncpg (PostgreSQL) e aiomysql oferecem operações assíncronas nativas.

6. Debugging, Profiling e Otimização de Código Assíncrono

O modo debug do asyncio é ativado com:

import asyncio
import os

async def main():
    # Ativar debug
    loop = asyncio.get_event_loop()
    loop.set_debug(True)

    # Exemplo com logging detalhado
    os.environ['PYTHONASYNCIODEBUG'] = '1'

    tarefa = asyncio.create_task(tarefa_demorada(1, 2))
    await tarefa

asyncio.run(main(), debug=True)

Para profiling, asyncio.Task permite rastrear pendências:

async def monitorar_tarefas():
    tarefas = []
    for i in range(5):
        tarefa = asyncio.create_task(tarefa_demorada(i, i))
        tarefas.append(tarefa)

    # Verificar tarefas pendentes
    pendentes = [t for t in tarefas if not t.done()]
    print(f"Tarefas pendentes: {len(pendentes)}")

    # as_completed para processar resultados à medida que chegam
    for coro in asyncio.as_completed(tarefas):
        resultado = await coro
        print(f"Tarefa concluída: {resultado}")

Padrões de otimização: Evite await desnecessários dentro de loops — prefira criar todas as tarefas primeiro e depois aguardá-las com gather() ou as_completed().

7. Padrões Avançados e Casos de Uso Reais

Produtor-consumidor com fila:

async def sistema_processamento():
    fila = asyncio.Queue(maxsize=10)

    async def produtor():
        for i in range(20):
            await fila.put(f"Dado {i}")
            print(f"Produzido: Dado {i}")
            await asyncio.sleep(0.1)

    async def consumidor(id_consumidor):
        while True:
            dado = await fila.get()
            print(f"Consumidor {id_consumidor} processando: {dado}")
            await asyncio.sleep(0.3)  # Simula processamento
            fila.task_done()

    produtor_task = asyncio.create_task(produtor())
    consumidores = [asyncio.create_task(consumidor(i)) for i in range(3)]

    await produtor_task
    await fila.join()  # Aguarda processamento de todos os itens

    for c in consumidores:
        c.cancel()

Timeouts e retentativas com backoff exponencial:

import random

async def requisicao_com_retry(url, max_tentativas=3):
    for tentativa in range(max_tentativas):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, timeout=5) as response:
                    return await response.json()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if tentativa == max_tentativas - 1:
                raise
            espera = 2 ** tentativa + random.uniform(0, 1)
            print(f"Tentativa {tentativa + 1} falhou. Aguardando {espera:.2f}s")
            await asyncio.sleep(espera)

Integração com código síncrono usando run_in_executor:

import concurrent.futures

async def operacao_cpu_bound():
    def processamento_pesado():
        # Operação CPU-bound que bloquearia o event loop
        total = 0
        for i in range(10**7):
            total += i
        return total

    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        resultado = await loop.run_in_executor(pool, processamento_pesado)
        return resultado

A programação assíncrona com asyncio transforma a forma como construímos aplicações Python que lidam com I/O intensivo. Ao dominar corrotinas, tarefas e primitivas de sincronização, você pode criar sistemas altamente eficientes e responsivos. Lembre-se: asyncio não é sobre paralelismo real, mas sobre gerenciamento inteligente de espera — enquanto uma operação aguarda I/O, outras podem progredir.

Referências