Como construir pipelines de dados com Apache Airflow
1. Fundamentos do Apache Airflow e conceitos essenciais
Apache Airflow é uma plataforma de orquestração de workflows criada pela Airbnb em 2014 e posteriormente doada à Apache Software Foundation. Sua principal função é programar, monitorar e gerenciar pipelines de dados complexos como DAGs (Directed Acyclic Graphs). Diferente de ferramentas como Luigi (mais simples, sem scheduler nativo robusto), Prefect (foco em estado e observabilidade) e Dagster (ênfase em tipagem e asset management), o Airflow se destaca pela maturidade, ecossistema extenso de integrações e interface web rica.
Os componentes centrais do Airflow incluem:
- DAGs: grafos acíclicos que definem a ordem de execução das tarefas
- Tasks: unidades individuais de trabalho dentro de uma DAG
- Operators: templates para tarefas comuns (PythonOperator, BashOperator, etc.)
- Scheduler: processo que monitora e dispara DAGs conforme agendamento
- Executor: mecanismo que define como as tarefas são executadas (Local, Celery, Kubernetes)
2. Configuração do ambiente e instalação
Para instalar o Airflow, utilize pip em um ambiente virtual:
pip install apache-airflow
A estrutura de diretórios padrão inclui:
- dags/: onde seus arquivos de DAG são armazenados
- plugins/: para custom operators e hooks
- logs/: logs de execução de tarefas
- airflow.cfg: arquivo de configuração principal
Configurações essenciais no airflow.cfg:
[core]
dags_folder = /home/user/airflow/dags
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://user:pass@localhost/airflow
load_examples = False
[scheduler]
dag_dir_list_interval = 30
Para iniciar, configure o banco de dados de metadados. SQLite é usado para desenvolvimento, mas para produção recomenda-se PostgreSQL ou MySQL:
airflow db init
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com
airflow webserver --port 8080 &
airflow scheduler &
3. Criação da primeira DAG
Uma DAG básica possui default_args, schedule_interval e catchup. Exemplo de pipeline ETL simples:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'etl_pipeline',
default_args=default_args,
description='Pipeline ETL simples',
schedule_interval='0 6 * * *', # Executa diariamente às 6h
catchup=False,
)
def extract():
import json
# Simula extração de dados de uma API
data = {'users': [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]}
with open('/tmp/extracted_data.json', 'w') as f:
json.dump(data, f)
return 'Extração concluída'
def transform():
import json
with open('/tmp/extracted_data.json', 'r') as f:
data = json.load(f)
# Transformação: adiciona campo uppercase
for user in data['users']:
user['name_upper'] = user['name'].upper()
with open('/tmp/transformed_data.json', 'w') as f:
json.dump(data, f)
return 'Transformação concluída'
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag,
)
load_task = BashOperator(
task_id='load',
bash_command='cp /tmp/transformed_data.json /data/final/',
dag=dag,
)
extract_task >> transform_task >> load_task
4. Gerenciamento de dependências e execução de tarefas
As dependências entre tasks podem ser definidas usando operadores de deslocamento:
task1 >> task2 >> task3 # Sequencial
task1 >> [task2, task3] # Paralelo
task1.set_downstream(task2) # Alternativa
Para fluxos condicionais, use BranchPythonOperator:
from airflow.operators.python import BranchPythonOperator
def decide_branch(**kwargs):
execution_date = kwargs['execution_date']
if execution_date.day % 2 == 0:
return 'process_even'
else:
return 'process_odd'
branch_task = BranchPythonOperator(
task_id='branch_decision',
python_callable=decide_branch,
provide_context=True,
dag=dag,
)
Trigger rules controlam quando uma task deve ser executada:
task_final = DummyOperator(
task_id='final',
trigger_rule='all_done', # Executa independente de sucesso/falha
dag=dag,
)
5. Operadores avançados e integrações
Operadores para bancos de dados e cloud são cruciais em pipelines reais:
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='postgres_default',
sql='''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT NOW()
);
''',
dag=dag,
)
wait_for_file = S3KeySensor(
task_id='wait_for_s3_file',
bucket_key='data/input.csv',
bucket_name='my-data-bucket',
aws_conn_id='aws_default',
timeout=600,
poke_interval=30,
dag=dag,
)
load_to_bigquery = BigQueryInsertJobOperator(
task_id='load_to_bq',
configuration={
'load': {
'sourceUris': ['gs://bucket/data.csv'],
'destinationTable': {'projectId': 'my-project', 'datasetId': 'analytics', 'tableId': 'raw_data'},
}
},
dag=dag,
)
6. Monitoramento, logging e boas práticas
O Airflow UI oferece múltiplas visualizações:
- Grid View: visão geral das execuções ao longo do tempo
- Graph View: visualização do grafo de dependências
- Logs: logs detalhados de cada task (acessíveis clicando na task)
Configuração de retry e notificações:
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': True,
'email': ['data-team@company.com'],
}
Para notificações via Slack, instale o provider e configure:
pip install apache-airflow-providers-slack
Boas práticas incluem:
- Versionamento de DAGs em Git
- Uso de variáveis de ambiente para configurações sensíveis
- Organização de código reutilizável em plugins/
7. Escalabilidade e execução em produção
A escolha do executor impacta diretamente a escalabilidade:
- LocalExecutor: executa tasks em paralelo no mesmo host (adequado para ambientes pequenos)
- CeleryExecutor: distribui tasks para workers via fila de mensagens (RabbitMQ/Redis) — ideal para médio porte
- KubernetesExecutor: cada task é executada em um pod Kubernetes separado — máxima escalabilidade e isolamento
Configuração de pools para controle de concorrência:
# No UI: Admin > Pools
# Nome: heavy_tasks, Slots: 2
with DAG(...) as dag:
heavy_task = PythonOperator(
task_id='heavy_task',
pool='heavy_tasks',
...
)
Manutenção essencial em produção:
- Backup regular do banco de metadados
- Limpeza de logs antigos: airflow db clean --clean-before-timestamp 2024-01-01
- Monitoramento do scheduler com health checks
8. Padrões de design e resolução de problemas comuns
Pipeline incremental com sensores:
from airflow.sensors.time_delta import TimeDeltaSensor
from airflow.utils.dates import days_ago
def get_last_run(**context):
last_run = context['dag_run'].conf.get('last_run', '2024-01-01')
return last_run
start_sensor = TimeDeltaSensor(
task_id='wait_for_data',
delta=timedelta(hours=1),
dag=dag,
)
Gerenciamento de dados sensíveis usando conexões criptografadas:
# No UI: Admin > Connections
# Conn ID: postgres_prod
# Conn Type: Postgres
# Host: prod-db.company.com
# Login: readonly_user
# Password: [criptografado automaticamente]
Testes unitários para DAGs:
# test_dag.py
from airflow.models import DagBag
import unittest
class TestETLDAG(unittest.TestCase):
def setUp(self):
self.dagbag = DagBag(dag_folder='dags/')
def test_dag_loaded(self):
dag = self.dagbag.get_dag('etl_pipeline')
self.assertIsNotNone(dag)
self.assertEqual(len(dag.tasks), 3)
def test_dag_structure(self):
dag = self.dagbag.get_dag('etl_pipeline')
tasks = dag.tasks
# Verifica dependências
self.assertIn(dag.get_task('extract'), dag.get_task('transform').upstream_list)
Referências
- Documentação Oficial do Apache Airflow — Guia completo de instalação, configuração e todos os operadores disponíveis
- Airflow: A Complete Guide for Beginners — Tutorial passo a passo da DataCamp cobrindo conceitos fundamentais e exemplos práticos
- Best Practices for Apache Airflow — Documentação oficial com recomendações de design, testes e produção
- Airflow DAGs: Patterns and Anti-patterns — Artigo no Medium sobre padrões comuns e armadilhas na construção de DAGs
- Deploying Airflow at Scale with Kubernetes — Guia oficial para deploy do Airflow em clusters Kubernetes usando Helm chart