Corrotinas e tasks no asyncio

1. Introdução ao asyncio e ao Modelo de Corrotinas

O módulo asyncio (introduzido no Python 3.4 e amadurecido em versões posteriores) é a biblioteca padrão para escrever código concorrente usando a sintaxe async/await. No coração desse modelo estão as corrotinas — funções assíncronas definidas com async def que podem suspender sua execução sem bloquear a thread principal.

Diferentemente de funções síncronas tradicionais, uma corrotina não executa imediatamente quando chamada. Em vez disso, ela retorna um objeto corrotina que precisa ser agendado em um event loop. O event loop é o núcleo do asyncio: ele gerencia a fila de tarefas, coordena a execução concorrente e lida com operações de I/O de forma não bloqueante.

# Função síncrona tradicional
def saudacao_sincrona():
    print("Olá, mundo!")

# Corrotina assíncrona
async def saudacao_assincrona():
    print("Olá, mundo assíncrono!")

2. Criando e Executando Corrotinas

Para definir uma corrotina, usamos async def. Dentro dela, utilizamos await para suspender a execução até que outra corrotina ou operação assíncrona seja concluída.

async def buscar_dados():
    print("Buscando dados...")
    await asyncio.sleep(2)  # Simula operação de I/O
    print("Dados obtidos!")
    return {"status": "ok"}

Para executar uma corrotina, usamos asyncio.run(), que cria um novo event loop, executa a corrotina e fecha o loop ao finalizar.

import asyncio

async def main():
    print("Iniciando...")
    await asyncio.sleep(1)
    print("Finalizado!")

asyncio.run(main())

Erro comum: esquecer de await uma corrotina. Isso retorna um aviso do tipo RuntimeWarning: coroutine '...' was never awaited.

async def tarefa():
    return 42

# ERRADO: a corrotina nunca é executada
resultado = tarefa()  # Apenas cria o objeto corrotina

# CORRETO
resultado = await tarefa()

3. Tasks: Agendamento e Concorrência

Uma task é um wrapper que agenda uma corrotina para execução concorrente no event loop. Enquanto uma corrotina precisa ser aguardada imediatamente para executar, uma task pode ser criada e executada em paralelo com outras operações.

async def processar_item(item):
    print(f"Processando item {item}...")
    await asyncio.sleep(1)
    print(f"Item {item} processado!")
    return item * 2

async def main():
    # Criando tasks para execução concorrente
    task1 = asyncio.create_task(processar_item(1))
    task2 = asyncio.create_task(processar_item(2))
    task3 = asyncio.create_task(processar_item(3))

    # Aguarda todas as tasks
    resultados = await asyncio.gather(task1, task2, task3)
    print(f"Resultados: {resultados}")

asyncio.run(main())

A diferença fundamental: uma corrotina chamada diretamente executa sequencialmente, enquanto tasks permitem concorrência real.

4. Aguardando Tasks: await, gather e as_completed

Temos três formas principais de aguardar tasks:

await individual — aguarda uma única task:

task = asyncio.create_task(minha_corrotina())
resultado = await task

asyncio.gather() — aguarda múltiplas tasks simultaneamente e retorna os resultados na mesma ordem:

async def main():
    tasks = [asyncio.create_task(operacao(i)) for i in range(5)]
    resultados = await asyncio.gather(*tasks)
    print(resultados)

asyncio.as_completed() — processa tasks conforme elas terminam, útil para resultados parciais:

async def main():
    tasks = [asyncio.create_task(operacao(i)) for i in range(5)]
    for coro in asyncio.as_completed(tasks):
        resultado = await coro
        print(f"Task concluída: {resultado}")

5. Cancelamento e Timeout de Tasks

Cancelar tasks é essencial para controle de execução. Usamos task.cancel() e tratamos asyncio.CancelledError:

async def tarefa_longa():
    try:
        print("Iniciando tarefa longa...")
        await asyncio.sleep(10)
        print("Tarefa concluída!")
    except asyncio.CancelledError:
        print("Tarefa foi cancelada!")
        raise

async def main():
    task = asyncio.create_task(tarefa_longa())
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Cancelamento tratado no main")

asyncio.run(main())

Para timeouts, usamos asyncio.wait_for() ou asyncio.timeout() (Python 3.11+):

async def operacao_demorada():
    await asyncio.sleep(5)
    return "Feito!"

async def main():
    try:
        resultado = await asyncio.wait_for(operacao_demorada(), timeout=3)
        print(resultado)
    except asyncio.TimeoutError:
        print("Operação excedeu o tempo limite!")

6. Gerenciamento de Estado e Dependências entre Tasks

Compartilhar dados entre tasks requer cuidado. Use estruturas thread-safe ou mecanismos de sincronização:

asyncio.Lock — evita condições de corrida:

async def atualizar_recurso(lock, recurso):
    async with lock:
        recurso["valor"] += 1
        await asyncio.sleep(0.1)

async def main():
    lock = asyncio.Lock()
    recurso = {"valor": 0}
    tasks = [asyncio.create_task(atualizar_recurso(lock, recurso)) for _ in range(10)]
    await asyncio.gather(*tasks)
    print(f"Valor final: {recurso['valor']}")

asyncio.Event — sincroniza tasks baseado em eventos:

async def produtor(evento):
    await asyncio.sleep(2)
    print("Produziu dado!")
    evento.set()

async def consumidor(evento):
    print("Aguardando dado...")
    await evento.wait()
    print("Dado recebido! Processando...")

Encadeamento com callbackstask.add_done_callback:

def callback(task):
    print(f"Task concluída com resultado: {task.result()}")

async def main():
    task = asyncio.create_task(calcular())
    task.add_done_callback(callback)
    await task

7. Boas Práticas e Erros Comuns

Evite bloqueios síncronos: nunca use time.sleep() dentro de corrotinas — use await asyncio.sleep().

# ERRADO: bloqueia o event loop
async def errado():
    import time
    time.sleep(1)

# CORRETO: cede controle ao event loop
async def certo():
    await asyncio.sleep(1)

Use asyncio.sleep(0) para ceder controle intencionalmente:

async def processar_lote(dados):
    for item in dados:
        await processar(item)
        await asyncio.sleep(0)  # Permite que outras tasks executem

Debugging de tasks: para rastrear tasks pendentes, use asyncio.all_tasks():

async def main():
    task = asyncio.create_task(tarefa())
    await asyncio.sleep(0.1)
    pendentes = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    print(f"Tasks pendentes: {len(pendentes)}")

Cuidado com loops infinitos: sempre inclua pontos de await em loops para evitar starving de outras tasks:

async def loop_seguro():
    while True:
        await realizar_operacao()
        await asyncio.sleep(0)  # Cede controle

Referências