Como implementar change data capture (CDC) com Debezium e Kafka

1. Fundamentos do Change Data Capture (CDC)

Change Data Capture (CDC) é uma técnica de engenharia de dados que permite capturar e propagar alterações ocorridas em bancos de dados em tempo real. Em vez de realizar consultas periódicas (polling) ou depender de gatilhos complexos, o CDC observa diretamente o log de transações do banco, detectando inserts, updates e deletes no momento exato em que ocorrem.

Os casos de uso mais comuns incluem: sincronização entre bancos de dados replicados, alimentação de data lakes com dados atualizados, integração entre microsserviços e manutenção de caches consistentes. Existem três mecanismos principais de CDC:

  • Log-based (WAL): Utiliza o Write-Ahead Log ou o log binário do banco. É o método mais eficiente e não intrusivo, pois não requer alterações na estrutura das tabelas.
  • Trigger-based: Cria triggers no banco para capturar mudanças. Pode impactar a performance do banco de origem.
  • Polling: Consulta a tabela periodicamente por registros modificados (usando timestamps ou flags). Simples, mas com latência maior.

A combinação Debezium + Kafka se destaca por oferecer conectores prontos para diversos bancos (MySQL, PostgreSQL, MongoDB, SQL Server), escalabilidade horizontal via Kafka, baixa latência (sub-segundos) e integração nativa com o ecossistema de streaming.

2. Arquitetura e Componentes da Solução

A arquitetura típica de uma solução CDC com Debezium e Kafka envolve:

  • Kafka Connect (distributed mode): Framework que gerencia a execução dos conectores. O Debezium atua como um conector fonte (source connector) dentro do Kafka Connect.
  • Debezium: Conector especializado que lê o log de transações do banco de dados e publica cada mudança como uma mensagem em tópicos Kafka.
  • Apache Kafka: Barramento de eventos distribuído que armazena e entrega as mensagens de forma durável e ordenada.
  • Consumidores downstream: Aplicações, data lakes ou outros sistemas que processam os eventos em tempo real.

O fluxo de dados segue esta sequência: banco de origem → log de transações → Debezium → tópicos Kafka → consumidores. Cada tabela monitorada gera um tópico Kafka específico, facilitando o roteamento.

3. Preparação do Ambiente de Infraestrutura

Para iniciar, vamos preparar o ambiente usando Docker Compose. Crie um arquivo docker-compose.yml:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.5
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets
      STATUS_STORAGE_TOPIC: connect-status

Para o banco de origem, ative o log binário no MySQL. Edite o arquivo my.cnf:

[mysqld]
server-id=1
log_bin=mysql-bin
binlog_format=row
binlog_row_image=full
expire_logs_days=7

Reinicie o MySQL e crie um usuário para o Debezium:

CREATE USER 'debezium'@'%' IDENTIFIED BY 'senha123';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

4. Configuração do Conector Debezium

Com o ambiente rodando, registre o conector Debezium via REST API. Para MySQL, crie o arquivo mysql-connector.json:

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "senha123",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "database.include.list": "minha_base",
    "table.include.list": "minha_base.clientes,minha_base.pedidos",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.minha_base",
    "snapshot.mode": "initial",
    "tombstones.on.delete": "true"
  }
}

Registre o conector com:

curl -X POST -H "Content-Type: application/json" --data @mysql-connector.json http://localhost:8083/connectors

Para PostgreSQL, use configuração similar:

{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "senha123",
    "database.dbname": "minha_base",
    "topic.prefix": "dbserver1",
    "table.include.list": "public.clientes,public.pedidos",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "snapshot.mode": "initial"
  }
}

Parâmetros críticos:
- snapshot.mode: initial faz snapshot completo antes de capturar mudanças; schema_only captura apenas mudanças futuras.
- tombstones.on.delete: quando true, gera uma mensagem tombstone (valor nulo) após o delete, permitindo compactação de tópicos.
- database.history.kafka.topic: armazena o histórico de schemas para evolução.

5. Monitoramento e Gerenciamento de Eventos

Após o conector iniciar, inspecione os tópicos criados:

kafka-topics --bootstrap-server localhost:9092 --list

Você verá tópicos como dbserver1.minha_base.clientes e dbserver1.minha_base.pedidos. Consuma eventos com:

kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.minha_base.clientes --from-beginning

A estrutura de cada evento CDC segue o formato:

{
  "schema": {
    "type": "struct",
    "fields": [
      {"field": "before", "type": "struct", "fields": [...]},
      {"field": "after", "type": "struct", "fields": [...]},
      {"field": "source", "type": "struct", "fields": [...]},
      {"field": "op", "type": "string"},
      {"field": "ts_ms", "type": "int64"}
    ]
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1,
      "nome": "João Silva",
      "email": "joao@email.com"
    },
    "source": {
      "version": "2.5.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1700000000000,
      "snapshot": "false",
      "db": "minha_base",
      "table": "clientes"
    },
    "op": "c",
    "ts_ms": 1700000000001
  }
}

O campo op indica: c (create), u (update), d (delete), r (snapshot read). Monitore o status do conector:

curl http://localhost:8083/connectors/mysql-connector/status

6. Consumo e Processamento dos Streams de Dados

Consumimos os eventos com Kafka Streams (Java) para processamento em tempo real. Exemplo de aplicação que mantém um cache atualizado:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CDCConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cdc-consumer");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("dbserver1.minha_base.clientes");

        stream.foreach((key, value) -> {
            try {
                ObjectMapper mapper = new ObjectMapper();
                JsonNode payload = mapper.readTree(value).get("payload");
                String operacao = payload.get("op").asText();
                JsonNode after = payload.get("after");

                if ("c".equals(operacao) || "u".equals(operacao)) {
                    System.out.println("UPSERT: " + after.toString());
                    // Atualizar cache ou banco de dados destino
                } else if ("d".equals(operacao)) {
                    System.out.println("DELETE: key=" + key);
                    // Remover do cache
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

Para roteamento por tabela, use SMT (Single Message Transform) na configuração do conector:

"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)",
"transforms.route.replacement": "cdc.$1"

7. Boas Práticas, Limitações e Troubleshooting

Boas práticas:
- Utilize tópicos com múltiplas partições (ex: 6-12) para paralelizar o consumo.
- Configure tasks.max igual ao número de partições para balanceamento.
- Use Avro com Schema Registry para evolução segura de schemas.
- Implemente idempotência nos consumidores para garantir exactly-once.

Limitações:
- Snapshot inicial pode ser lento para tabelas grandes (milhões de registros).
- Eventos de delete geram tombstones que exigem compactação de tópicos.
- Ordenação global não é garantida; apenas por chave primária dentro de uma partição.

Troubleshooting comum:
- Falha de conexão: verifique se o banco de origem permite conexões externas e se o log binário está ativo.
- Offset reset: utilize curl -X DELETE http://localhost:8083/connectors/mysql-connector/offsets para reiniciar o offset.
- Conflitos de schema: configure schema.history.internal.kafka.topic com cleanup.policy=compact e retention.ms=604800000 (7 dias).
- Recuperação pós-crash: o Debezium salva offsets no Kafka; ao reiniciar, ele retoma do último offset commitado.

Para verificar a saúde do conector em produção, monitore as métricas JMX (ex: debezium-mysql:type=connector-metrics,context=streaming) e configure alertas para MilliSecondsBehindSource > 5000 (5 segundos de atraso).

Referências