Logical replication: filtrando e transformando dados em trânsito
1. Fundamentos da Replicação Lógica
A replicação lógica no PostgreSQL é um mecanismo que permite copiar dados de um servidor (publisher) para um ou mais servidores (subscribers) com granularidade fina. Diferente da replicação física, que replica o banco inteiro no nível do disco, a replicação lógica opera no nível das transações e permite selecionar quais tabelas, linhas e colunas serão replicadas.
O fluxo básico é: o publisher escreve as alterações no Write-Ahead Log (WAL). Um processo de decodificação lógica converte essas alterações em um formato compreensível (como SQL ou JSON). O subscriber então aplica essas alterações em suas tabelas locais.
A configuração se dá através de dois objetos principais:
- Publication: define o que será publicado (tabelas, operações, filtros)
- Subscription: define onde e como os dados serão recebidos
-- No publisher:
CREATE PUBLICATION vendas_publication FOR TABLE vendas;
-- No subscriber:
CREATE SUBSCRIPTION vendas_subscription
CONNECTION 'host=publisher_host dbname=prod user=replicator'
PUBLICATION vendas_publication;
2. Filtragem de Dados na Publicação
Filtragem por tabela
Você pode publicar apenas um subconjunto de tabelas do banco:
-- Publica apenas tabelas específicas
CREATE PUBLICATION pub_clientes FOR TABLE clientes, pedidos, itens_pedido;
-- Publica todas as tabelas (menos comum, mas possível)
CREATE PUBLICATION pub_tudo FOR ALL TABLES;
Filtragem por operação
Por padrão, todas as operações (INSERT, UPDATE, DELETE, TRUNCATE) são replicadas. Você pode limitar:
-- Apenas inserts e updates (sem deletes)
CREATE PUBLICATION pub_auditoria FOR TABLE log_atividades
WITH (publish = 'insert, update');
Filtragem por colunas
Para evitar replicar colunas sensíveis ou desnecessárias:
-- Publica apenas nome e email, omitindo senha e cpf
CREATE PUBLICATION pub_usuarios_parcial FOR TABLE usuarios (id, nome, email);
3. Filtragem por Linhas com Expressões WHERE
Este é um dos recursos mais poderosos da replicação lógica: replicar apenas linhas que atendem a uma condição.
-- Replicar apenas pedidos da região Sul
CREATE PUBLICATION pub_pedidos_sul FOR TABLE pedidos
WHERE (regiao = 'SUL');
-- Replicar apenas clientes ativos
CREATE PUBLICATION pub_clientes_ativos FOR TABLE clientes
WHERE (status = 'ativo');
-- Replicar apenas vendas do mês corrente
CREATE PUBLICATION pub_vendas_mes FOR TABLE vendas
WHERE (data_venda >= date_trunc('month', CURRENT_DATE));
Limitações importantes:
- A expressão WHERE deve usar apenas funções imutáveis (não pode depender de
now()ourandom()) - Todas as colunas usadas no filtro precisam estar incluídas na publicação
- O filtro é avaliado no momento da replicação, não no momento da inserção original
-- ERRADO: uso de função volátil
CREATE PUBLICATION pub_erro FOR TABLE vendas
WHERE (data_venda = CURRENT_DATE); -- CURRENT_DATE é estável, mas não imutável
-- CORRETO: uso de função imutável
CREATE PUBLICATION pub_correto FOR TABLE vendas
WHERE (data_venda >= '2025-01-01');
4. Transformação de Dados com Triggers no Assinante
No lado do assinante, você pode usar triggers para transformar os dados antes de serem inseridos ou atualizados.
Mascaramento de dados sensíveis
-- No subscriber, criar trigger para mascarar CPF
CREATE OR REPLACE FUNCTION mascarar_cpf()
RETURNS TRIGGER AS $$
BEGIN
NEW.cpf := substring(NEW.cpf, 1, 3) || '.***.***-**';
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_mascarar_cpf
BEFORE INSERT OR UPDATE ON clientes
FOR EACH ROW EXECUTE FUNCTION mascarar_cpf();
Normalização de dados
-- Converter todas as strings para maiúsculas no destino
CREATE OR REPLACE FUNCTION normalizar_nome()
RETURNS TRIGGER AS $$
BEGIN
NEW.nome := UPPER(TRIM(NEW.nome));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_normalizar_nome
BEFORE INSERT OR UPDATE ON clientes
FOR EACH ROW EXECUTE FUNCTION normalizar_nome();
Enriquecimento com dados locais
-- Adicionar timestamp de recebimento no assinante
CREATE OR REPLACE FUNCTION adicionar_metadados()
RETURNS TRIGGER AS $$
BEGIN
NEW.recebido_em := NOW();
NEW.origem := 'replicacao_logica';
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
5. Transformação com Funções de Decodificação Customizadas
O PostgreSQL padrão usa o plugin pgoutput para decodificar o WAL. Plugins alternativos como wal2json permitem transformações mais elaboradas.
Usando wal2json para transformar dados
-- No postgresql.conf do publisher
wal_level = logical
max_replication_slots = 5
-- Criar slot de replicação com wal2json
SELECT pg_create_logical_replication_slot('slot_json', 'wal2json');
Plugin de decodificação customizado (exemplo conceitual)
Para cenários avançados, você pode criar um plugin em C que transforma os dados antes de enviá-los. Isso é útil quando:
- Você precisa normalizar dados antes da replicação (ex: converter fusos horários)
- Quer reduzir o volume de dados enviados pela rede
- Precisa aplicar regras de negócio complexas no publisher
Quando transformar no publisher vs. subscriber:
| Cenário | Publisher | Subscriber |
|---|---|---|
| Reduzir tráfego de rede | ✅ | ❌ |
| Manter dados originais no publisher | ❌ | ✅ |
| Regra de negócio global | ✅ | ❌ |
| Regra específica do subscriber | ❌ | ✅ |
6. Estratégias de Sincronização e Consistência
Tratamento de conflitos
Conflitos ocorrem quando o subscriber tenta inserir uma chave que já existe ou atualizar um registro divergente.
-- Verificar conflitos no subscriber
SELECT * FROM pg_stat_subscription_workers;
-- Opções de resolução:
-- 1. Ignorar conflitos (padrão)
ALTER SUBSCRIPTION sub_vendas SET (failover = true);
-- 2. Replicar novamente a partir de um LSN específico
ALTER SUBSCRIPTION sub_vendas REFRESH PUBLICATION;
Monitoramento de lag
-- Verificar lag da replicação
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_bytes,
ROUND(EXTRACT(EPOCH FROM (NOW() - confirmed_flush_lsn::text::timestamp))) AS lag_segundos
FROM pg_replication_slots
WHERE slot_type = 'logical';
Reaplicação após falha
-- Forçar reaplicação de todas as alterações pendentes
ALTER SUBSCRIPTION sub_vendas ENABLE;
-- Em caso de corrupção, recriar a subscription
DROP SUBSCRIPTION IF EXISTS sub_vendas;
CREATE SUBSCRIPTION sub_vendas
CONNECTION 'host=...'
PUBLICATION pub_vendas;
7. Casos de Uso Avançados e Boas Práticas
Replicação seletiva para staging/teste
-- Publisher: publica apenas dados de teste
CREATE PUBLICATION pub_staging FOR TABLE clientes, pedidos
WHERE (ambiente = 'staging');
Data warehouse incremental
-- Replicar apenas inserts para o DW
CREATE PUBLICATION pub_dw_incremental FOR TABLE transacoes
WITH (publish = 'insert');
Segurança com filtros e transformações
-- Publisher: filtra colunas sensíveis
CREATE PUBLICATION pub_segura FOR TABLE funcionarios (id, nome, cargo)
WHERE (cargo != 'DIRETOR');
-- Subscriber: mascara dados remanescentes
CREATE TRIGGER trg_mascarar_salario
BEFORE INSERT ON funcionarios
FOR EACH ROW EXECUTE FUNCTION mascarar_salario();
Boas práticas finais:
- Sempre teste filtros e transformações em ambiente de homologação
- Monitore o lag de replicação com alertas automáticos
- Documente todas as publicações e subscriptions criadas
- Use schemas separados para dados replicados quando possível
- Evite triggers no publisher que possam causar loops de replicação
Referências
- PostgreSQL Documentation: Logical Replication — Documentação oficial sobre replicação lógica no PostgreSQL, incluindo criação de publicações, subscriptions e filtros.
- PostgreSQL Documentation: CREATE PUBLICATION — Referência completa da sintaxe de CREATE PUBLICATION com exemplos de filtros por linha e coluna.
- PostgreSQL Documentation: Logical Decoding — Explicação detalhada sobre o processo de decodificação lógica e plugins de saída como pgoutput e wal2json.
- 2ndQuadrant: Logical Replication Filters and Transformations — Artigo técnico avançado sobre filtros e transformações em replicação lógica, com exemplos práticos.
- PostgreSQL WIKI: Logical Replication — Guia colaborativo com dicas de configuração, monitoramento e resolução de problemas em replicação lógica.