Como implementar workflows de aprovação assíncronos com máquinas de estado persistentes
1. Fundamentos de workflows de aprovação assíncronos
1.1. Conceitos básicos: eventos, estados, transições e ações
Workflows de aprovação assíncronos são sistemas onde o fluxo de decisão não ocorre em tempo real, permitindo que participantes ajam em momentos distintos. Os elementos fundamentais incluem:
- Eventos: estímulos externos ou internos que disparam transições (ex:
pedido_submetido,aprovador_designado) - Estados: condições estáveis do workflow (ex:
pendente,em_analise) - Transições: movimentos válidos entre estados, acionados por eventos
- Ações: efeitos colaterais executados durante transições (ex:
notificar_aprovador)
1.2. Diferenças entre workflows síncronos e assíncronos
| Característica | Workflow Síncrono | Workflow Assíncrono |
|---|---|---|
| Resiliência | Baixa — falha interrompe processo | Alta — estados persistidos permitem retomada |
| Escalabilidade | Limitada por bloqueios | Alta — processamento paralelo via filas |
| Tolerância a falhas | Baixa — perda de contexto | Alta — recuperação via replay |
1.3. Casos de uso típicos
- Aprovação de pedidos de compra com múltiplos níveis hierárquicos
- Revisão de documentos com prazos de expiração
- Liberação de deploys em ambientes de produção
- Fluxos de compliance com auditoria obrigatória
2. Modelagem da máquina de estados persistente
2.1. Definição dos estados do workflow
ESTADOS_VALIDOS = {
"pendente": "Aguardando submissão inicial",
"em_analise": "Sob revisão do aprovador",
"aprovado": "Aprovado sem ressalvas",
"rejeitado": "Rejeitado pelo aprovador",
"cancelado": "Cancelado pelo solicitante",
"expirado": "Prazo de análise esgotado"
}
2.2. Mapeamento de transições permitidas e regras de guarda
TRANSICOES = {
"pendente": {
"submeter": { "destino": "em_analise", "guarda": "usuario_tem_permissao" }
},
"em_analise": {
"aprovar": { "destino": "aprovado", "guarda": "aprovador_autorizado" },
"rejeitar": { "destino": "rejeitado", "guarda": "motivo_obrigatorio" },
"cancelar": { "destino": "cancelado", "guarda": "solicitante_ou_admin" },
"expiracao": { "destino": "expirado", "guarda": "prazo_excedido" }
}
}
2.3. Representação formal: tabela de transições
| Estado Atual | Evento | Guarda | Estado Destino | Ação |
|----------------|--------------|-----------------------|----------------|-----------------------|
| pendente | submeter | permissao_valida | em_analise | notificar_aprovador |
| em_analise | aprovar | aprovador_autorizado | aprovado | liberar_pedido |
| em_analise | rejeitar | motivo_preenchido | rejeitado | notificar_solicitante |
| em_analise | expiracao | timeout_atingido | expirado | escalar_para_gestor |
3. Persistência do estado: armazenamento e recuperação
3.1. Estratégias de persistência
-- Tabela relacional para armazenamento de estados
CREATE TABLE workflow_estados (
workflow_id UUID PRIMARY KEY,
estado_atual VARCHAR(20) NOT NULL,
versao INTEGER NOT NULL DEFAULT 1,
dados_json JSONB,
criado_em TIMESTAMP DEFAULT NOW(),
atualizado_em TIMESTAMP DEFAULT NOW()
);
CREATE TABLE workflow_eventos (
evento_id UUID PRIMARY KEY,
workflow_id UUID REFERENCES workflow_estados(workflow_id),
tipo_evento VARCHAR(50) NOT NULL,
payload JSONB,
criado_em TIMESTAMP DEFAULT NOW()
);
3.2. Versionamento de estado e concorrência
-- Atualização com optimistic locking
UPDATE workflow_estados
SET estado_atual = 'em_analise',
versao = versao + 1,
atualizado_em = NOW()
WHERE workflow_id = $1 AND versao = $2
RETURNING versao;
3.3. Recuperação de estado após falhas
funcao recuperar_estado(workflow_id):
estado_atual = buscar_ultimo_snapshot(workflow_id)
eventos_pendentes = buscar_eventos_apos_snapshot(workflow_id, estado_atual.versao)
para cada evento em eventos_pendentes:
estado_atual = aplicar_evento(estado_atual, evento)
retornar estado_atual
4. Implementação do motor de workflow assíncrono
4.1. Estrutura do motor
class MotorWorkflow:
def __init__(self, fila_eventos, repositorio_estados):
self.fila = fila_eventos
self.repositorio = repositorio_estados
self.executores = {}
def processar_evento(self, evento):
workflow = self.repositorio.carregar(evento.workflow_id)
transicao = workflow.obter_transicao(evento.tipo)
if transicao.guarda_valida():
with self.repositorio.lock(workflow.id):
workflow.aplicar_transicao(transicao)
self.despachar_acoes(transicao.acoes)
self.repositorio.salvar(workflow)
4.2. Uso de filas e mensageria
# Publicação de evento no Kafka
producer.send('workflow-eventos', {
'workflow_id': 'abc-123',
'tipo_evento': 'submeter',
'payload': {'usuario': 'joao', 'pedido_id': 456},
'timestamp': datetime.now().isoformat()
})
# Consumidor processando eventos
@consumer(topic='workflow-eventos', group_id='motor-workflow')
def processar_evento(evento):
motor.processar_evento(evento)
4.3. Tratamento de timeouts e expiração de estados
# Scheduler para verificar expirações
agendar_tarefa_recorrente(intervalo=60) # a cada minuto
def verificar_expirados():
workflows_expirados = buscar_workflows_em_analise_com_prazo_vencido()
for wf in workflows_expirados:
publicar_evento({
'workflow_id': wf.id,
'tipo_evento': 'expiracao',
'payload': {'motivo': 'prazo_excedido'}
})
5. Notificações e ações em cada transição
5.1. Gatilhos para notificações
ACOES_POR_TRANSICAO = {
'pendente->em_analise': [
{'tipo': 'email', 'destinatario': 'aprovador', 'template': 'nova_solicitacao'},
{'tipo': 'webhook', 'url': 'https://api.exemplo.com/notificar'}
],
'em_analise->aprovado': [
{'tipo': 'email', 'destinatario': 'solicitante', 'template': 'aprovado'},
{'tipo': 'push', 'canal': 'mobile_app'}
]
}
5.2. Execução de ações colaterais
def executar_acoes_colaterais(workflow, transicao):
for acao in transicao.acoes:
if acao.tipo == 'atualizar_sistema_externo':
chamar_api_externa(acao.url, workflow.dados)
elif acao.tipo == 'registrar_auditoria':
inserir_log_auditoria(workflow, transicao)
5.3. Políticas de compensação
def compensar_acoes(workflow, motivo_rejeicao):
if workflow.estado_atual == 'rejeitado':
# Reverter ações executadas durante aprovação
reverter_reserva_estoque(workflow.pedido_id)
liberar_credito_cliente(workflow.cliente_id)
6. Tratamento de concorrência e consistência eventual
6.1. Garantia de idempotência em transições
funcao aplicar_evento_se_unico(workflow_id, evento_id, transicao):
if not evento_ja_processado(evento_id):
with lock_transacional(workflow_id):
workflow = carregar_workflow(workflow_id)
workflow.aplicar(transicao)
marcar_evento_processado(evento_id)
salvar_workflow(workflow)
6.2. Consistência eventual vs consistência forte
# Abordagem de consistência eventual com filas
# Vantagens: alta disponibilidade, baixa latência
# Desvantagens: leituras podem retornar estado obsoleto por segundos
def ler_workflow_consistente(workflow_id):
# Tenta cache primeiro, depois banco
estado_cache = cache.get(f'workflow:{workflow_id}')
if estado_cache:
return estado_cache
return banco.buscar(workflow_id)
6.3. Resolução de conflitos com saga patterns
class SagaCoordenador:
def executar_saga(self, workflow_id, passos):
passos_executados = []
try:
for passo in passos:
passo.executar()
passos_executados.append(passo)
except Exception as e:
# Compensar passos executados em ordem reversa
for passo in reversed(passos_executados):
passo.compensar()
raise
7. Monitoramento e observabilidade do workflow
7.1. Métricas-chave
# Métricas Prometheus
workflow_tempo_aprovacao_seconds = Histogram(
'workflow_tempo_aprovacao_seconds',
'Tempo entre submissão e aprovação',
buckets=[60, 300, 600, 1800, 3600]
)
workflow_taxa_rejeicao = Counter(
'workflow_transicoes_total',
'Total de transições por resultado',
['estado_destino']
)
7.2. Logging estruturado e rastreamento distribuído
import structlog
logger = structlog.get_logger()
def processar_evento(evento):
logger.info("processando_evento",
workflow_id=evento.workflow_id,
tipo=evento.tipo,
trace_id=evento.trace_id,
usuario=evento.payload.get('usuario'))
7.3. Health checks específicos
@app.route('/health/workflow-motor')
def health_check():
status = {
'fila_consumidores': verificar_consumidores_ativos(),
'repositorio_estados': verificar_conexao_banco(),
'scheduler_timeouts': verificar_scheduler_ativo()
}
return status, 200 if all(status.values()) else 503
8. Exemplo prático: workflow de aprovação de pedidos
8.1. Implementação passo a passo
# Definição completa do workflow
workflow_pedido = {
'estados': ['pendente', 'em_analise', 'aprovado', 'rejeitado', 'cancelado', 'expirado'],
'transicoes': {
('pendente', 'submeter'): {
'destino': 'em_analise',
'guardas': ['pedido_valido', 'limite_credito_suficiente'],
'acoes': ['notificar_aprovador', 'reservar_estoque']
},
('em_analise', 'aprovar'): {
'destino': 'aprovado',
'guardas': ['aprovador_autorizado'],
'acoes': ['confirmar_pedido', 'notificar_cliente', 'cobrar_cartao']
},
('em_analise', 'rejeitar'): {
'destino': 'rejeitado',
'guardas': ['motivo_preenchido'],
'acoes': ['notificar_cliente_rejeicao', 'liberar_estoque']
}
},
'timeout': {
'em_analise': {'duracao': '24h', 'acao': 'escalar_para_gestor'}
}
}
8.2. Código da máquina de estados
class MaquinaEstadosWorkflow:
def __init__(self, workflow_id, definicao, repositorio):
self.id = workflow_id
self.definicao = definicao
self.repositorio = repositorio
self.estado_atual = 'pendente'
self.versao = 1
self.historico = []
def processar_evento(self, evento):
transicao = self.definicao['transicoes'].get((self.estado_atual, evento.tipo))
if not transicao:
raise TransicaoInvalida(f"{self.estado_atual} -> {evento.tipo}")
if not self._validar_guardas(transicao['guardas'], evento):
raise GuardaFalhou("Condições não atendidas")
estado_anterior = self.estado_atual
self.estado_atual = transicao['destino']
self.versao += 1
self.historico.append({
'de': estado_anterior,
'para': self.estado_atual,
'evento': evento.tipo,
'timestamp': datetime.now()
})
self._executar_acoes(transicao['acoes'], evento)
self.repositorio.salvar(self)
return self.estado_atual
# Uso do motor
motor = MotorWorkflow(
fila=KafkaProducer(bootstrap_servers='localhost:9092'),
repositorio=RepositorioPostgres(connection_string='dbname=workflows')
)
# Submeter novo pedido
motor.processar_evento(Evento(
workflow_id='wf-001',
tipo='submeter',
payload={'pedido_id': 123, 'valor': 5000, 'cliente': 'Maria'}
))
8.3. Testes de integração e cenários de falha
def test_workflow_aprovacao_completo():
# Cenário: aprovação bem-sucedida
wf = MaquinaEstadosWorkflow('test-1', workflow_pedido, repositorio_mock)
assert wf.processar_evento(Evento('submeter', {'pedido_id': 1})) == 'em_analise'
assert wf.processar_evento(Evento('aprovar', {'aprovador': 'joao'})) == 'aprovado'
# Cenário: rejeição com compensação
wf2 = MaquinaEstadosWorkflow('test-2', workflow_pedido, repositorio_mock)
wf2.processar_evento(Evento('submeter', {'pedido_id': 2}))
with patch('api_externa.liberar_estoque') as mock_liberar:
wf2.processar_evento(Evento('rejeitar', {'motivo': 'sem_estoque'}))
assert wf2.estado_atual == 'rejeitado'
mock_liberar.assert_called_once()
# Cenário: timeout e expiração
with freeze_time('2024-01-01 10:00:00'):
wf3 = MaquinaEstadosWorkflow('test-3', workflow_pedido, repositorio_mock)
wf3.processar_evento(Evento('submeter', {'pedido_id': 3}))
with freeze_time('2024-01-02 11:00:00'): # Após 25h
wf3.processar_evento(Evento('expiracao', {}))
assert wf3.estado_atual == 'expirado'
Referências
- AWS Step Functions Documentation — Documentação oficial da AWS sobre workflows de estado persistente, incluindo exemplos de aprovação assíncrona
- Temporal.io — Workflow as Code — Plataforma open-source para workflows duráveis com máquinas de estado persistentes e tolerância a falhas
- Event Sourcing Pattern — Martin Fowler — Artigo fundamental sobre
event sourcing, base para implementação de máquinas de estado persistentes com replay de eventos
- Microsoft — Saga Pattern — Guia prático sobre implementação de sagas para gerenciamento de transações distribuídas em workflows assíncronos
- Redis — Streams Documentation — Documentação sobre uso de streams como fila de eventos para workflows assíncronos
- PostgreSQL — Advisory Locks — Técnica de locking otimista para controle de concorrência em persistência de estados
Conclusão
Implementar workflows de aprovação assíncronos com máquinas de estado persistentes é uma abordagem robusta para lidar com processos longos, distribuídos e sujeitos a falhas. A combinação de modelagem formal de estados, persistência resiliente, filas de mensagens e tratamento cuidadoso de concorrência permite construir sistemas que escalam horizontalmente, recuperam-se automaticamente de falhas e mantêm consistência eventual.
Os principais aprendizados deste artigo são:
- Modelagem clara: Definir estados, transições, guardas e ações de forma explícita é essencial para evitar comportamentos inesperados.
- Persistência adequada: Escolher entre banco relacional, event store ou banco de documentos depende dos requisitos de consistência e volume de dados.
- Assincronia com filas: Kafka, RabbitMQ ou Redis Streams desacoplam os componentes e permitem processamento escalável.
- Tratamento de falhas: Timeouts, retentativas com backoff, sagas e compensações garantem resiliência mesmo em cenários adversos.
- Observabilidade: Métricas, logs estruturados e health checks são fundamentais para operar o motor de workflow em produção.
Com esses conceitos e exemplos práticos, você está preparado para implementar seu próprio motor de workflow assíncrono, adaptando-o às necessidades específicas do seu domínio de negócio.