Como usar o Argo Workflows para pipelines de dados e ML no Kubernetes

1. Introdução ao Argo Workflows e seu papel em pipelines de dados e ML

O Argo Workflows é um motor de workflow nativo do Kubernetes que permite orquestrar tarefas complexas como sequências de contêineres. Diferente de ferramentas tradicionais, ele opera diretamente sobre o cluster Kubernetes, aproveitando sua escalabilidade e resiliência. Para pipelines de dados e Machine Learning, o Argo oferece paralelismo nativo, suporte a DAGs (Directed Acyclic Graphs) e integração profunda com o ecossistema K8s.

Comparado ao Apache Airflow, o Argo é mais leve e não requer banco de dados externo. Em relação ao Kubeflow, o Argo foca exclusivamente na orquestração, sendo mais flexível para integrar com ferramentas de MLOps como MLflow ou Seldon. O Prefect, por sua vez, tem sintaxe Python-first, mas o Argo se destaca quando o ambiente já é Kubernetes-native.

2. Instalação e configuração básica do Argo Workflows no cluster

A instalação pode ser feita via Helm ou manifestos YAML. Via Helm, execute:

helm repo add argo https://argoproj.github.io/argo-helm
helm install argo-workflows argo/argo-workflows --namespace argo --create-namespace

Para acesso via CLI, instale o binário argo:

curl -sLO https://github.com/argoproj/argo-workflows/releases/latest/download/argo-linux-amd64.gz
gunzip argo-linux-amd64.gz
chmod +x argo-linux-amd64 && sudo mv argo-linux-amd64 /usr/local/bin/argo

Configure RBAC criando um ServiceAccount com permissões adequadas:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: argo-workflows-sa
  namespace: argo
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: argo-workflows-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: cluster-admin
subjects:
  - kind: ServiceAccount
    name: argo-workflows-sa
    namespace: argo

Acesse a interface web via port-forward:

kubectl port-forward svc/argo-workflows-server -n argo 2746:2746

3. Estrutura fundamental de um Workflow: templates, steps e DAGs

Um Workflow no Argo é composto por Workflow, Template, Steps e DAG. Templates definem tarefas reutilizáveis. Steps executam sequencialmente, enquanto DAGs permitem paralelismo.

Exemplo de pipeline ETL simples:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: etl-pipeline-
spec:
  entrypoint: etl
  templates:
  - name: extract
    container:
      image: python:3.9
      command: [python, -c]
      args: ["print('Extraindo dados da fonte...')"]
  - name: transform
    container:
      image: python:3.9
      command: [python, -c]
      args: ["print('Transformando dados...')"]
  - name: load
    container:
      image: python:3.9
      command: [python, -c]
      args: ["print('Carregando dados no destino...')"]
  - name: etl
    steps:
    - - name: extract-step
        template: extract
    - - name: transform-step
        template: transform
    - - name: load-step
        template: load

4. Execução de tarefas de dados com containers customizados e volumes

Para tarefas de ML, use imagens Docker customizadas com bibliotecas como TensorFlow ou PyTorch. Monte volumes persistentes (PVCs) para datasets e modelos.

Exemplo com PVC:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: data-pvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: ml-training-
spec:
  entrypoint: train-model
  volumes:
  - name: data-volume
    persistentVolumeClaim:
      claimName: data-pvc
  templates:
  - name: train-model
    container:
      image: tensorflow/tensorflow:2.12.0
      command: [python, /scripts/train.py]
      volumeMounts:
      - name: data-volume
        mountPath: /data

Para gerenciar artefatos, use volumes compartilhados ou integração com S3/GCS via sidecar containers.

5. Pipelines de Machine Learning: treinamento, validação e deploy

Um pipeline de ML típico inclui pré-processamento, treinamento, avaliação e registro do modelo. Use DAGs para paralelizar buscas de hiperparâmetros.

Exemplo de Grid Search paralelo:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hyperparameter-search-
spec:
  entrypoint: grid-search
  templates:
  - name: train-with-params
    inputs:
      parameters:
      - name: learning-rate
      - name: batch-size
    container:
      image: python:3.9
      command: [python, -c]
      args: ["print('Treinando com lr={{inputs.parameters.learning-rate}} e batch={{inputs.parameters.batch-size}}')"]
  - name: grid-search
    dag:
      tasks:
      - name: train-lr001-bs32
        template: train-with-params
        arguments:
          parameters:
          - name: learning-rate
            value: "0.001"
          - name: batch-size
            value: "32"
      - name: train-lr01-bs64
        template: train-with-params
        arguments:
          parameters:
          - name: learning-rate
            value: "0.01"
          - name: batch-size
            value: "64"

Para deploy, integre com MLflow para registro de modelos e Kserve para servir o modelo como API.

6. Otimização e boas práticas para pipelines de dados e ML

Controle recursos com limites de CPU/memória e solicitação de GPUs:

resources:
  requests:
    memory: "4Gi"
    cpu: "2"
    nvidia.com/gpu: "1"
  limits:
    memory: "8Gi"
    cpu: "4"

Configure retry e timeout para tarefas críticas:

retryStrategy:
  limit: 3
  backoff:
    duration: "10s"
    factor: 2
activeDeadlineSeconds: 3600

Use WorkflowTemplate para reutilizar pipelines e CronWorkflow para agendamento recorrente:

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: daily-training
spec:
  schedule: "0 2 * * *"
  workflowSpec:
    entrypoint: ml-pipeline
    templates:
    - name: ml-pipeline
      steps:
      - - name: preprocess
          template: preprocess
      - - name: train
          template: train

7. Monitoramento, logging e depuração de workflows

No Argo UI, visualize o DAG completo e logs de cada step. Para métricas avançadas, exporte para Prometheus:

helm upgrade argo-workflows argo/argo-workflows \
  --set server.metrics.enabled=true \
  --set server.metrics.prometheus.enabled=true

Crie dashboards no Grafana para monitorar execuções, tempos de step e uso de recursos. Para debug, execute workflows localmente com argo submit --watch e simule falhas com failFast: false.

8. Caso prático completo: pipeline de classificação de imagens

Pipeline completo para classificação de imagens com TensorFlow:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: image-classification-
spec:
  entrypoint: full-pipeline
  volumes:
  - name: data
    persistentVolumeClaim:
      claimName: ml-data-pvc
  - name: models
    persistentVolumeClaim:
      claimName: ml-models-pvc
  templates:
  - name: download-dataset
    container:
      image: python:3.9
      command: [python, -c]
      args: ["import urllib.request; urllib.request.urlretrieve('https://example.com/dataset.zip', '/data/dataset.zip')"]
      volumeMounts:
      - name: data
        mountPath: /data
  - name: preprocess
    container:
      image: tensorflow/tensorflow:2.12.0
      command: [python, /scripts/preprocess.py]
      volumeMounts:
      - name: data
        mountPath: /data
  - name: train-model
    container:
      image: tensorflow/tensorflow:2.12.0-gpu
      command: [python, /scripts/train.py]
      resources:
        requests:
          nvidia.com/gpu: "1"
      volumeMounts:
      - name: data
        mountPath: /data
      - name: models
        mountPath: /models
  - name: evaluate-model
    container:
      image: tensorflow/tensorflow:2.12.0
      command: [python, /scripts/evaluate.py]
      volumeMounts:
      - name: data
        mountPath: /data
      - name: models
        mountPath: /models
  - name: register-model
    container:
      image: python:3.9
      command: [python, -c]
      args: ["print('Modelo registrado no MLflow')"]
  - name: deploy-model
    container:
      image: python:3.9
      command: [python, -c]
      args: ["print('Modelo deployado como serviço Kserve')"]
  - name: full-pipeline
    dag:
      tasks:
      - name: download
        template: download-dataset
      - name: preprocess
        template: preprocess
        dependencies: [download]
      - name: train
        template: train-model
        dependencies: [preprocess]
      - name: evaluate
        template: evaluate-model
        dependencies: [train]
      - name: register
        template: register-model
        dependencies: [evaluate]
      - name: deploy
        template: deploy-model
        dependencies: [register]

Este workflow executa download, pré-processamento, treinamento com GPU, avaliação, registro e deploy do modelo como serviço no cluster.

Referências