Como construir pipelines de dados com Apache Airflow

1. Fundamentos do Apache Airflow e conceitos essenciais

Apache Airflow é uma plataforma de orquestração de workflows criada pela Airbnb em 2014 e posteriormente doada à Apache Software Foundation. Sua principal função é programar, monitorar e gerenciar pipelines de dados complexos como DAGs (Directed Acyclic Graphs). Diferente de ferramentas como Luigi (mais simples, sem scheduler nativo robusto), Prefect (foco em estado e observabilidade) e Dagster (ênfase em tipagem e asset management), o Airflow se destaca pela maturidade, ecossistema extenso de integrações e interface web rica.

Os componentes centrais do Airflow incluem:
- DAGs: grafos acíclicos que definem a ordem de execução das tarefas
- Tasks: unidades individuais de trabalho dentro de uma DAG
- Operators: templates para tarefas comuns (PythonOperator, BashOperator, etc.)
- Scheduler: processo que monitora e dispara DAGs conforme agendamento
- Executor: mecanismo que define como as tarefas são executadas (Local, Celery, Kubernetes)

2. Configuração do ambiente e instalação

Para instalar o Airflow, utilize pip em um ambiente virtual:

pip install apache-airflow

A estrutura de diretórios padrão inclui:
- dags/: onde seus arquivos de DAG são armazenados
- plugins/: para custom operators e hooks
- logs/: logs de execução de tarefas
- airflow.cfg: arquivo de configuração principal

Configurações essenciais no airflow.cfg:

[core]
dags_folder = /home/user/airflow/dags
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://user:pass@localhost/airflow
load_examples = False

[scheduler]
dag_dir_list_interval = 30

Para iniciar, configure o banco de dados de metadados. SQLite é usado para desenvolvimento, mas para produção recomenda-se PostgreSQL ou MySQL:

airflow db init
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com
airflow webserver --port 8080 &
airflow scheduler &

3. Criação da primeira DAG

Uma DAG básica possui default_args, schedule_interval e catchup. Exemplo de pipeline ETL simples:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'etl_pipeline',
    default_args=default_args,
    description='Pipeline ETL simples',
    schedule_interval='0 6 * * *',  # Executa diariamente às 6h
    catchup=False,
)

def extract():
    import json
    # Simula extração de dados de uma API
    data = {'users': [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]}
    with open('/tmp/extracted_data.json', 'w') as f:
        json.dump(data, f)
    return 'Extração concluída'

def transform():
    import json
    with open('/tmp/extracted_data.json', 'r') as f:
        data = json.load(f)
    # Transformação: adiciona campo uppercase
    for user in data['users']:
        user['name_upper'] = user['name'].upper()
    with open('/tmp/transformed_data.json', 'w') as f:
        json.dump(data, f)
    return 'Transformação concluída'

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag,
)

load_task = BashOperator(
    task_id='load',
    bash_command='cp /tmp/transformed_data.json /data/final/',
    dag=dag,
)

extract_task >> transform_task >> load_task

4. Gerenciamento de dependências e execução de tarefas

As dependências entre tasks podem ser definidas usando operadores de deslocamento:

task1 >> task2 >> task3  # Sequencial
task1 >> [task2, task3]  # Paralelo
task1.set_downstream(task2)  # Alternativa

Para fluxos condicionais, use BranchPythonOperator:

from airflow.operators.python import BranchPythonOperator

def decide_branch(**kwargs):
    execution_date = kwargs['execution_date']
    if execution_date.day % 2 == 0:
        return 'process_even'
    else:
        return 'process_odd'

branch_task = BranchPythonOperator(
    task_id='branch_decision',
    python_callable=decide_branch,
    provide_context=True,
    dag=dag,
)

Trigger rules controlam quando uma task deve ser executada:

task_final = DummyOperator(
    task_id='final',
    trigger_rule='all_done',  # Executa independente de sucesso/falha
    dag=dag,
)

5. Operadores avançados e integrações

Operadores para bancos de dados e cloud são cruciais em pipelines reais:

from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

create_table = PostgresOperator(
    task_id='create_table',
    postgres_conn_id='postgres_default',
    sql='''
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100),
            created_at TIMESTAMP DEFAULT NOW()
        );
    ''',
    dag=dag,
)

wait_for_file = S3KeySensor(
    task_id='wait_for_s3_file',
    bucket_key='data/input.csv',
    bucket_name='my-data-bucket',
    aws_conn_id='aws_default',
    timeout=600,
    poke_interval=30,
    dag=dag,
)

load_to_bigquery = BigQueryInsertJobOperator(
    task_id='load_to_bq',
    configuration={
        'load': {
            'sourceUris': ['gs://bucket/data.csv'],
            'destinationTable': {'projectId': 'my-project', 'datasetId': 'analytics', 'tableId': 'raw_data'},
        }
    },
    dag=dag,
)

6. Monitoramento, logging e boas práticas

O Airflow UI oferece múltiplas visualizações:
- Grid View: visão geral das execuções ao longo do tempo
- Graph View: visualização do grafo de dependências
- Logs: logs detalhados de cada task (acessíveis clicando na task)

Configuração de retry e notificações:

default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': True,
    'email': ['data-team@company.com'],
}

Para notificações via Slack, instale o provider e configure:

pip install apache-airflow-providers-slack

Boas práticas incluem:
- Versionamento de DAGs em Git
- Uso de variáveis de ambiente para configurações sensíveis
- Organização de código reutilizável em plugins/

7. Escalabilidade e execução em produção

A escolha do executor impacta diretamente a escalabilidade:

  • LocalExecutor: executa tasks em paralelo no mesmo host (adequado para ambientes pequenos)
  • CeleryExecutor: distribui tasks para workers via fila de mensagens (RabbitMQ/Redis) — ideal para médio porte
  • KubernetesExecutor: cada task é executada em um pod Kubernetes separado — máxima escalabilidade e isolamento

Configuração de pools para controle de concorrência:

# No UI: Admin > Pools
# Nome: heavy_tasks, Slots: 2

with DAG(...) as dag:
    heavy_task = PythonOperator(
        task_id='heavy_task',
        pool='heavy_tasks',
        ...
    )

Manutenção essencial em produção:
- Backup regular do banco de metadados
- Limpeza de logs antigos: airflow db clean --clean-before-timestamp 2024-01-01
- Monitoramento do scheduler com health checks

8. Padrões de design e resolução de problemas comuns

Pipeline incremental com sensores:

from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.utils.dates import days_ago

def get_last_run(**context):
    last_run = context['dag_run'].conf.get('last_run', '2024-01-01')
    return last_run

start_sensor = TimeDeltaSensor(
    task_id='wait_for_data',
    delta=timedelta(hours=1),
    dag=dag,
)

Gerenciamento de dados sensíveis usando conexões criptografadas:

# No UI: Admin > Connections
# Conn ID: postgres_prod
# Conn Type: Postgres
# Host: prod-db.company.com
# Login: readonly_user
# Password: [criptografado automaticamente]

Testes unitários para DAGs:

# test_dag.py
from airflow.models import DagBag
import unittest

class TestETLDAG(unittest.TestCase):
    def setUp(self):
        self.dagbag = DagBag(dag_folder='dags/')

    def test_dag_loaded(self):
        dag = self.dagbag.get_dag('etl_pipeline')
        self.assertIsNotNone(dag)
        self.assertEqual(len(dag.tasks), 3)

    def test_dag_structure(self):
        dag = self.dagbag.get_dag('etl_pipeline')
        tasks = dag.tasks
        # Verifica dependências
        self.assertIn(dag.get_task('extract'), dag.get_task('transform').upstream_list)

Referências