Como usar o Flink para processamento de eventos em tempo real
1. Introdução ao Apache Flink e processamento de eventos
O Apache Flink é uma plataforma de processamento de streams distribuída e de código aberto, desenvolvida originalmente na Universidade Técnica de Berlim como parte do projeto Stratosphere. Lançado como projeto de nível superior da Apache Software Foundation em 2015, o Flink se destaca por oferecer processamento de dados em tempo real com baixa latência, alta throughput e garantias de consistência rigorosas.
No contexto de processamento de eventos, três conceitos são fundamentais:
- Eventos: unidades atômicas de dados que representam algo que ocorreu em um sistema (uma transação, um clique, uma leitura de sensor).
- Streams: sequências ordenadas e potencialmente infinitas de eventos.
- Tempo de evento vs tempo de processamento: o tempo de evento é quando o fato realmente ocorreu (timestamp embutido no dado), enquanto o tempo de processamento é quando o Flink processa o evento. O Flink oferece suporte nativo a ambos, sendo o tempo de evento essencial para aplicações que exigem ordenação correta.
Comparado a outras ferramentas, o Flink oferece vantagens significativas: ao contrário do Spark Streaming (que opera em micro-batches), o Flink processa evento por evento, resultando em latências menores. Diferente do Kafka Streams (biblioteca embutida no Kafka), o Flink possui gerenciamento de estado mais sofisticado e suporte a windowing complexo. Em relação ao Storm, o Flink oferece garantias exactly-once mais robustas e API mais expressiva.
2. Arquitetura e modelo de execução do Flink
A arquitetura do Flink é composta por três componentes principais:
- JobManager: coordena a execução distribuída, gerencia checkpoints e recuperação de falhas.
- TaskManager: executa as tarefas (operators) em slots de tarefas (task slots).
- Slots de tarefas: unidades de paralelismo que determinam quantos sub-tasks podem executar concorrentemente.
O modelo de paralelismo permite que operadores sejam distribuídos entre múltiplos TaskManagers, com cada operador podendo ter seu próprio grau de paralelismo. O Flink suporta três garantias de processamento: at-most-once, at-least-once e exactly-once, sendo esta última alcançada através de checkpointing baseado em barreiras distribuídas (algoritmo de Chandy-Lamport).
3. Configuração do ambiente e primeiros passos
Para iniciar, baixe o Flink em flink.apache.org e extraia o arquivo. Para modo standalone local:
# Iniciar cluster Flink local
./bin/start-cluster.sh
# Verificar se está rodando
curl http://localhost:8081
A estrutura básica de um job Flink inclui ambiente de execução, fonte de dados, transformações e sumidouro. Exemplo prático com socket TCP:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object SocketWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print()
env.execute("Socket Word Count")
}
}
Execute com nc -lk 9999 em outro terminal e envie palavras para testar.
4. Fontes de dados (Sources) e sumidouros (Sinks) em tempo real
O Flink oferece conectores integrados para diversas fontes:
Fontes comuns:
- Kafka: FlinkKafkaConsumer para ler tópicos em tempo real
- Kinesis: FlinkKinesisConsumer para AWS
- RabbitMQ: RMQSource para filas AMQP
- Sockets: socketTextStream para testes
- Arquivos: readTextFile para processamento de arquivos (modo streaming ou batch)
Sumidouros comuns:
- Kafka: FlinkKafkaProducer para escrever em tópicos
- JDBC: JDBCOutputFormat para bancos relacionais
- Elasticsearch: ElasticsearchSink para indexação
- File systems: StreamingFileSink para HDFS, S3, etc.
Exemplo de fonte Kafka com sumidouro Elasticsearch:
val kafkaSource = new FlinkKafkaConsumer[String](
"eventos",
new SimpleStringSchema(),
properties
)
val esSink = new ElasticsearchSink.Builder[String](
httpHosts,
new ElasticsearchSinkFunction[String] {
def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
indexer.add(createIndexRequest(element))
}
}
).build()
env.addSource(kafkaSource)
.addSink(esSink)
5. Processamento de streams: transformações e operações
Operações básicas:
val stream = env.fromElements(1, 2, 3, 4, 5)
// map: transforma cada elemento
val dobrados = stream.map(_ * 2)
// flatMap: zero ou mais saídas por entrada
val palavras = stream.flatMap { n =>
if (n % 2 == 0) Seq(s"par_$n") else Seq()
}
// filter: seleciona elementos
val pares = stream.filter(_ % 2 == 0)
// keyBy + reduce: agregação por chave
val streamChaveado = stream.keyBy(_ % 2).reduce(_ + _)
Operações baseadas em tempo:
Windowing é crucial para processamento de eventos. Três tipos principais:
- Tumbling window: janelas fixas sem sobreposição
- Sliding window: janelas com sobreposição
- Session window: janelas baseadas em gaps de inatividade
// Tumbling window de 1 minuto
stream
.keyBy(event => event.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new CountAggregate())
// Watermarking para eventos atrasados
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(10))
.withTimestampAssigner((event, _) => event.timestamp)
stream
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(_.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateOutputTag)
6. Gerenciamento de estado e tolerância a falhas
O Flink gerencia estado de forma eficiente com tipos especializados:
- ValueState: estado simples de valor único
- ListState: lista de valores
- MapState: mapa chave-valor
- State TTL: expiração automática de estado
class MeuProcessFunction extends KeyedProcessFunction[String, Event, Result] {
private var estado: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
val desc = new ValueStateDescriptor[Long](
"contador",
TypeInformation.of(classOf[Long])
)
desc.enableTimeToLive(StateTtlConfig
.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build())
estado = getRuntimeContext.getState(desc)
}
override def processElement(
value: Event,
ctx: KeyedProcessFunction[String, Event, Result]#Context,
out: Collector[Result]
): Unit = {
val atual = Option(estado.value()).getOrElse(0L) + 1
estado.update(atual)
out.collect(Result(value.key, atual))
}
}
Checkpointing é configurado no ambiente:
env.enableCheckpointing(5000) // checkpoint a cada 5 segundos
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
7. Otimização de desempenho e boas práticas
Ajustes de configuração:
- Paralelismo:
env.setParallelism(n)ou por operador - Buffer timeout:
env.setBufferTimeout(100)(milissegundos) - Rede: ajustar
taskmanager.network.memorypara evitar backpressure
Monitoramento: a Flink Web UI (porta 8081) mostra métricas como throughput, latência, backpressure e uso de memória. Para métricas customizadas:
val contador = getRuntimeContext.getMetricGroup.counter("eventos_processados")
contador.inc()
Boas práticas:
- Use ValueState em vez de variáveis mutáveis para consistência
- Configure watermarks adequadamente para evitar perda de dados
- Teste com env.fromElements() antes de conectar fontes reais
- Utilize savepoints para atualizações de jobs sem perda de estado
8. Casos de uso reais e integração com ecossistema
Detecção de fraudes em tempo real:
val transacoes = env.addSource(kafkaConsumer)
transacoes
.keyBy(_.contaId)
.process(new FraudDetector())
.addSink(alertaSink)
class FraudDetector extends KeyedProcessFunction[String, Transacao, Alerta] {
private var ultimasTransacoes: ListState[Transacao] = _
override def processElement(
transacao: Transacao,
ctx: Context,
out: Collector[Alerta]
): Unit = {
val historico = ultimasTransacoes.get().asScala.toList
if (historico.size >= 3 &&
historico.last.valor > 10000 &&
transacao.valor > 50000) {
out.collect(Alerta(transacao.contaId, "Movimentação suspeita"))
}
ultimasTransacoes.add(transacao)
}
}
Monitoramento de infraestrutura: pipelines que consomem métricas de CPU, memória e disco de servidores, aplicam janelas para calcular médias móveis e disparam alertas no Grafana via Elasticsearch sink.
Integrações comuns:
- Kafka para ingestão e saída de eventos
- Hadoop/S3 para armazenamento de checkpoints
- Grafana + Elasticsearch para visualização de métricas
- Prometheus para monitoramento do próprio Flink
Referências
- Documentação Oficial do Apache Flink — Guia completo de conceitos, APIs, configuração e deploy do Flink
- Apache Flink: Processamento de Streams em Tempo Real — Livro técnico sobre processamento de streams com Flink (versão online)
- Tutorial: Primeiros Passos com Flink — Guia oficial para instalação local e execução do primeiro job
- Flink + Kafka: Integração Prática — Artigo da Confluent sobre como integrar Flink com Kafka para streaming
- Checkpointing e Tolerância a Falhas no Flink — Explicação detalhada sobre checkpointing, estado e garantias exactly-once
- Flink no Kubernetes: Guia de Deploy — Tutorial oficial sobre execução de jobs Flink em clusters Kubernetes
- Padrões de Processamento de Eventos com Flink — Blog da Ververica (criadores do Flink) sobre tempo de evento, watermarks e windowing