Batch processing no K8s: Kubernetes Batch e KubeFlow
1. Fundamentos do Batch Processing no Kubernetes
1.1 O que são workloads batch e por que rodá-los no K8s
Workloads batch são tarefas computacionais que executam de início ao fim, processam um volume finito de dados e, ao concluir, encerram-se automaticamente. Diferentemente de serviços web que permanecem ativos 24/7, jobs batch consomem recursos apenas durante sua execução.
Rodar batch processing no Kubernetes oferece vantagens estratégicas:
- Infraestrutura unificada: mesmo cluster para microsserviços e jobs batch
- Elasticidade nativa: escalonamento horizontal sob demanda
- Gerenciamento de falhas integrado: restart policies, backoff limits e TTLs
- Isolamento por contêineres: dependências encapsuladas, reprodutibilidade
1.2 Diferenças entre jobs interativos, serviços contínuos e jobs batch
| Tipo | Comportamento | Exemplo |
|---|---|---|
| Serviço contínuo (Deployment) | Mantém N pods sempre ativos | API REST, web server |
| Job interativo (Pod direto) | CLI ou debug temporário | kubectl run -it --rm |
| Job batch | Executa até completar, então termina | ETL diário, treinamento de ML |
1.3 Arquitetura do scheduler do Kubernetes para jobs: limitações e oportunidades
O scheduler padrão do K8s aloca pods individualmente. Para jobs batch, isso gera limitações:
- Scheduling de gang: jobs distribuídos (ex: MPI, PyTorch DDP) exigem que todos os pods iniciem simultaneamente
- Priorização: jobs batch podem competir com serviços críticos
- Escalonamento de recursos: jobs podem exigir alocação explícita de GPUs ou grandes volumes de memória
Oportunidades surgem com projetos como Volcano, Kueue e KubeFlow, que estendem o scheduler nativo.
2. Kubernetes Batch API: Jobs, CronJobs e Work Queues
2.1 Criando e gerenciando Jobs: paralelismo, completions e backoff
Um Job Kubernetes executa um ou mais pods até que um número específico de conclusões bem-sucedidas seja atingido.
apiVersion: batch/v1
kind: Job
metadata:
name: processador-lotes
spec:
completions: 10
parallelism: 3
backoffLimit: 4
ttlSecondsAfterFinished: 3600
template:
spec:
containers:
- name: worker
image: alpine:3.19
command: ["sh", "-c", "sleep 10 && echo 'Lote processado'"]
restartPolicy: Never
completions: 10— total de execuções bem-sucedidasparallelism: 3— até 3 pods simultâneosbackoffLimit: 4— número máximo de tentativas antes de marcar como FailedttlSecondsAfterFinished— limpeza automática após conclusão
2.2 CronJobs: agendamento recorrente de tarefas batch
CronJobs criam Jobs em intervalos definidos por expressão cron.
apiVersion: batch/v1
kind: CronJob
metadata:
name: backup-diario
spec:
schedule: "0 2 * * *"
startingDeadlineSeconds: 300
concurrencyPolicy: Forbid
jobTemplate:
spec:
template:
spec:
containers:
- name: backup
image: busybox:1.36
command: ["sh", "-c", "pg_dump -h db -U user mydb > /backup/dump.sql"]
restartPolicy: Never
concurrencyPolicy: Forbid— impede execução simultâneastartingDeadlineSeconds— tolerância máxima de atraso
2.3 Padrões avançados: work queues com Redis/RabbitMQ e index jobs
Work Queue com Redis: jobs consomem tarefas de uma fila externa.
apiVersion: batch/v1
kind: Job
metadata:
name: redis-worker
spec:
parallelism: 5
completions: 1
template:
spec:
containers:
- name: worker
image: redis:7-alpine
command: ["redis-cli", "-h", "redis-service", "brpop", "task-queue", "0"]
restartPolicy: Never
Index Jobs: cada pod recebe um índice único via variável de ambiente JOB_COMPLETION_INDEX.
apiVersion: batch/v1
kind: Job
metadata:
name: processa-particoes
spec:
completions: 4
parallelism: 2
completionMode: Indexed
template:
spec:
containers:
- name: worker
image: alpine:3.19
env:
- name: JOB_COMPLETION_INDEX
valueFrom:
fieldRef:
fieldPath: metadata.labels['batch.kubernetes.io/job-completion-index']
command: ["sh", "-c", "echo 'Processando partição $JOB_COMPLETION_INDEX'"]
restartPolicy: Never
3. Gerenciamento de Recursos e Escalabilidade para Batch
3.1 Resource Quotas, LimitRanges e prioridades para workloads batch
ResourceQuota por namespace:
apiVersion: v1
kind: ResourceQuota
metadata:
name: batch-quota
namespace: batch-ns
spec:
hard:
requests.cpu: "10"
requests.memory: "32Gi"
limits.cpu: "20"
limits.memory: "64Gi"
count/jobs.batch: "50"
PriorityClass para priorizar jobs sobre serviços:
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: batch-high
value: 1000
globalDefault: false
description: "Prioridade alta para jobs batch críticos"
3.2 Escalonamento eficiente: Kueue, Volcano e scheduling de gang
Volcano implementa scheduling de gang para jobs distribuídos:
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: treinamento-distribuido
spec:
minAvailable: 4
schedulerName: volcano
tasks:
- replicas: 4
name: worker
template:
spec:
containers:
- name: worker
image: pytorch/pytorch:2.0.1-cuda11.7
command: ["python", "train.py"]
resources:
requests:
nvidia.com/gpu: 1
minAvailable: 4— todos os 4 workers devem ser alocados simultaneamenteschedulerName: volcano— delega ao scheduler Volcano
3.3 Tratamento de falhas, retry policies e TTLs para jobs
spec:
backoffLimit: 6
activeDeadlineSeconds: 3600
ttlSecondsAfterFinished: 86400
backoffLimit: tentativas exponenciais (1s, 2s, 4s, 8s...)activeDeadlineSeconds: timeout total do jobttlSecondsAfterFinished: limpeza automática após sucesso ou falha
4. KubeFlow: Plataforma de ML Pipelines e Batch
4.1 Visão geral do ecossistema KubeFlow
KubeFlow é uma plataforma MLOps que integra:
- Pipelines: orquestração de DAGs de componentes batch
- Training Operator: suporte nativo a TensorFlow, PyTorch, MXNet, MPI
- Katib: AutoML e hyperparameter tuning
- KFServing: deploy serverless de modelos
4.2 KubeFlow Pipelines: orquestração de DAGs batch
Pipeline declarado em Python com o SDK do KubeFlow:
import kfp
from kfp import dsl, components
@dsl.component
def preprocess(data_path: str) -> str:
return f"/processed/{data_path.split('/')[-1]}"
@dsl.component
def train(processed_data: str, epochs: int) -> str:
return f"/models/model_{epochs}e.pkl"
@dsl.pipeline(name="batch-ml-pipeline")
def ml_pipeline(data_path: str = "/data/raw", epochs: int = 10):
prep = preprocess(data_path=data_path)
train_model = train(processed_data=prep.output, epochs=epochs)
kfp.compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")
4.3 KubeFlow Training Operator: suporte a frameworks
PyTorchJob para treinamento distribuído:
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-dist-train
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.0.1-cuda11.7
command: ["python", "-m", "torch.distributed.run", "train.py"]
Worker:
replicas: 3
template:
spec:
containers:
- name: pytorch
image: pytorch/pytorch:2.0.1-cuda11.7
command: ["python", "-m", "torch.distributed.run", "train.py"]
resources:
limits:
nvidia.com/gpu: 1
5. Exemplos Práticos com Código
5.1 Deploy de um Job Kubernetes simples com paralelismo
# job-paralelo.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: etl-diario
spec:
completions: 20
parallelism: 5
backoffLimit: 3
template:
spec:
containers:
- name: etl-worker
image: python:3.11-slim
command: ["python", "-c", "
import time, os
idx = os.environ.get('JOB_COMPLETION_INDEX', '0')
print(f'Worker {idx}: iniciando ETL...')
time.sleep(5)
print(f'Worker {idx}: concluído')
"]
restartPolicy: Never
kubectl apply -f job-paralelo.yaml
kubectl get jobs --watch
kubectl logs job/etl-diario
5.2 Pipeline KubeFlow completo: pré-processamento, treino, avaliação
# pipeline_completo.py
import kfp
from kfp import dsl
@dsl.component(base_image="python:3.11-slim")
def load_data(url: str) -> str:
return f"/data/raw/{url.split('/')[-1]}"
@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "scikit-learn"])
def preprocess(data_path: str) -> str:
import pandas as pd
df = pd.read_csv(data_path)
df.to_parquet("/data/processed/dataset.parquet")
return "/data/processed/dataset.parquet"
@dsl.component(base_image="python:3.11-slim", packages_to_install=["scikit-learn", "joblib"])
def train(processed_path: str, model_path: str) -> str:
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import joblib
df = pd.read_parquet(processed_path)
model = RandomForestClassifier().fit(df.drop("target", axis=1), df["target"])
joblib.dump(model, f"{model_path}/model.pkl")
return f"{model_path}/model.pkl"
@dsl.pipeline(name="ml-batch-pipeline")
def ml_batch(url: str = "https://example.com/dataset.csv"):
data = load_data(url=url)
processed = preprocess(data_path=data.output)
trained = train(processed_path=processed.output, model_path="/models")
kfp.compiler.Compiler().compile(ml_batch, "ml_batch_pipeline.yaml")
5.3 Uso de Volcano para scheduling de gang em jobs distribuídos
# volcano-gang-job.yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: mpi-gang
spec:
minAvailable: 4
schedulerName: volcano
queue: default
tasks:
- replicas: 1
name: launcher
template:
spec:
containers:
- image: mpioperator/openmpi:latest
name: mpi-launcher
command: ["mpirun", "--allow-run-as-root", "-np", "3", "/app/mpi_hello"]
- replicas: 3
name: worker
template:
spec:
containers:
- image: mpioperator/openmpi:latest
name: mpi-worker
command: ["sleep", "infinity"]
resources:
requests:
cpu: "2"
memory: "4Gi"
6. Monitoramento, Logs e Observabilidade em Batch
6.1 Coleta de logs e métricas específicas para jobs batch
Prometheus + kube-state-metrics: métricas como kube_job_status_succeeded, kube_job_status_failed, kube_job_status_active.
Fluentd para logs centralizados:
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-batch-config
data:
fluent.conf: |
<source>
@type tail
path /var/log/containers/*.log
tag kubernetes.*
<parse>
@type json
</parse>
</source>
<match kubernetes.var.log.containers.**batch**>
@type elasticsearch
host elasticsearch-logging
port 9200
logstash_format true
</match>
6.2 Alertas e notificações para falhas e deadlines de jobs
# prometheus-alert.yaml
groups:
- name: batch-alerts
rules:
- alert: JobFailing
expr: kube_job_status_failed > 0
for: 5m
annotations:
summary: "Job {{ $labels.job_name }} falhou"
- alert: JobStuckActive
expr: kube_job_status_active > 0 and time() - kube_job_status_start_time > 3600
annotations:
summary: "Job {{ $labels.job_name }} ativo por mais de 1 hora"
6.3 Visualização de pipelines KubeFlow no Kubeflow Dashboard
O Kubeflow Dashboard oferece:
- Visualização gráfica de DAGs de pipelines
- Histórico de execuções com artefatos e parâmetros
- Comparação de experimentos e runs
7. Casos de Uso e Considerações de Produção
7.1 Exemplos reais
- ETL: extração, transformação e carga de dados em data warehouses
- Treinamento de modelos: jobs distribuídos com GPU para deep learning
- Processamento de imagens/vídeo: transcodificação, análise frame-a-frame
- Relatórios financeiros: cálculos batch noturnos com deadlines rígidos
7.2 Estratégias de custo e otimização
Spot instances via priorityClassName:
apiVersion: v1
kind: PriorityClass
metadata:
name: batch-spot
value: 100
globalDefault: false
description: "Jobs tolerantes a interrupção"
# Node affinity para spot nodes
spec:
affinity:
nodeAffinity:
preferredDuringScheduling:
- weight: 100
preference:
matchExpressions:
- key: "spot"
operator: In
values: ["true"]
7.3 Integração com armazenamento persistente
PVC compartilhado entre jobs:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: batch-data
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
storageClassName: nfs-client
MinIO para dados de entrada/saída:
env:
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: S3_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-creds
key: access-key
- name: S3_SECRET_KEY
valueFrom:
secretKeyRef:
name: minio-creds
key: secret-key
Referências
- Kubernetes Jobs Documentation — Documentação oficial sobre Jobs, CronJobs e padrões de batch processing no Kubernetes
- KubeFlow Pipelines Documentation — Guia completo de orquestração de pipelines batch com KubeFlow
- Volcano: Batch Scheduling in Kubernetes — Documentação oficial do Volcano para scheduling de gang e gerenciamento de jobs batch
- Kueue: Kubernetes Queue Management — Projeto oficial para gerenciamento de filas e quotas em workloads batch
- Training Operator for Kubeflow — Document
ação sobre operadores de treinamento distribuído (TFJob, PyTorchJob, MPIJob)
Conclusão
Batch processing no Kubernetes evoluiu significativamente desde os simples Jobs e CronJobs. Com ferramentas como Kueue, Volcano e KubeFlow, é possível orquestrar desde tarefas ETL simples até pipelines complexos de machine learning com centenas de workers distribuídos.
A escolha entre Kubernetes Batch nativo e KubeFlow depende do cenário:
- Kubernetes Batch é ideal para jobs independentes, cronogramas fixos e workloads que não exigem orquestração complexa de DAGs
- KubeFlow brilha em cenários de ML, com pipelines reutilizáveis, experimentação e integração nativa com frameworks de treinamento
Para produção, considere:
- Utilizar Volcano ou Kueue para evitar starvation e melhorar utilização de recursos
- Implementar monitoramento com Prometheus e alertas para falhas
- Adotar spot instances para reduzir custos em jobs tolerantes a interrupção
- Garantir persistência de dados com PVCs compartilhados ou armazenamento externo (S3/MinIO)
Com essas ferramentas e padrões, sua plataforma Kubernetes estará pronta para executar workloads batch de forma eficiente, escalável e resiliente.