Programação reativa com RxJS e Reactor
1. Fundamentos da Programação Reativa
A programação reativa representa uma mudança paradigmática na forma como construímos sistemas que lidam com fluxos de dados assíncronos. Diferentemente da programação imperativa tradicional, onde o código executa sequencialmente e o programador controla explicitamente o fluxo, a abordagem reativa trata dados como streams contínuos que podem ser observados, transformados e combinados.
No modelo imperativo, uma requisição HTTP bloqueia a thread até obter resposta. No paradigma reativo, definimos um pipeline de processamento que reage automaticamente à chegada dos dados. O Manifesto Reativo estabelece quatro pilares fundamentais: responsividade (respostas rápidas), resiliência (recuperação de falhas), elasticidade (adaptação à carga) e orientação a mensagens (comunicação assíncrona).
// Imperativo tradicional
const resultado = dados.filter(item => item.ativo).map(item => item.nome);
// Reativo com RxJS
from(dados).pipe(
filter(item => item.ativo),
map(item => item.nome)
).subscribe(console.log);
2. RxJS: Reactive Extensions para JavaScript
RxJS implementa o padrão Observer com operadores funcionais. O trio central é composto por Observable (fonte de dados), Observer (consumidor) e Subscription (controle do ciclo de vida).
import { fromEvent, of } from 'rxjs';
import { map, debounceTime, switchMap } from 'rxjs/operators';
// Observable de eventos de input
const inputBusca = document.getElementById('busca');
const busca$ = fromEvent(inputBusca, 'input').pipe(
map(event => event.target.value),
debounceTime(300),
switchMap(termo => buscarDados(termo))
);
busca$.subscribe(resultados => {
atualizarUI(resultados);
});
Operadores essenciais:
- map: transforma cada valor emitido
- filter: seleciona valores que atendem a condição
- mergeMap: mapeia para múltiplos observables internos simultaneamente
- switchMap: cancela observable anterior ao receber novo valor
- debounceTime: aguarda pausa nas emissões antes de propagar
3. Reactor: Programação Reativa no Ecossistema Java
Reactor é a implementação reativa para JVM, baseada em Reactive Streams. Seus dois tipos principais são Mono (0 ou 1 elemento) e Flux (N elementos).
Flux<String> fluxoNomes = Flux.just("Ana", "Bruno", "Carla")
.map(nome -> nome.toUpperCase())
.filter(nome -> nome.startsWith("C"));
Mono<User> usuario = Mono.fromCallable(() -> buscarUsuario(1))
.timeout(Duration.ofSeconds(5))
.onErrorResume(error -> Mono.just(new User("anonimo")));
Exemplo completo com Spring WebFlux e R2DBC:
@RestController
public class ProdutoController {
@GetMapping("/produtos")
public Flux<Produto> listarProdutos() {
return produtoRepository.findAll()
.filter(produto -> produto.getPreco() > 100)
.flatMap(produto -> calcularDesconto(produto))
.doOnNext(produto -> log.info("Produto processado: {}", produto.getId()));
}
private Mono<Produto> calcularDesconto(Produto produto) {
return Mono.just(produto)
.map(p -> {
p.setPreco(p.getPreco() * 0.9);
return p;
});
}
}
4. Padrões Comuns em Fluxos Reativos
Backpressure: controla a velocidade de processamento quando produtor é mais rápido que consumidor.
// RxJS - backpressure com buffer
fromEvent(document, 'mousemove').pipe(
bufferTime(200),
filter(clicks => clicks.length > 0)
).subscribe(console.log);
// Reactor - backpressure com limitRate
Flux.range(1, 1000)
.limitRate(10)
.subscribe(System.out::println);
Tratamento de erros:
// RxJS
observable$.pipe(
catchError(err => of(valorFallback)),
retry(3)
);
// Reactor
fluxo
.onErrorResume(ex -> Flux.just("fallback1", "fallback2"))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
Combinação de fluxos:
// RxJS - combineLatest
const combinado$ = combineLatest([
observable1$,
observable2$
]).pipe(map(([val1, val2]) => val1 + val2));
// Reactor - zip
Flux.zip(fluxoA, fluxoB, (a, b) -> a + " " + b);
5. Comparação entre RxJS e Reactor
Ambas bibliotecas compartilham conceitos fundamentais: Observable/Flux, operadores transformadores e schedulers para controle de concorrência. As principais diferenças residem no ecossistema:
- RxJS: tipagem dinâmica, ideal para frontend com Angular, manipulação de eventos DOM e requisições HTTP
- Reactor: tipagem forte, integração nativa com Spring Boot, suporte a backpressure via Reactive Streams
RxJS oferece maior flexibilidade em cenários de UI, enquanto Reactor proporciona segurança de tipos e performance otimizada para servidores.
6. Integração com Frameworks Populares
Angular + RxJS:
@Injectable()
export class UserService {
constructor(private http: HttpClient) {}
getUsers(): Observable<User[]> {
return this.http.get<User[]>('/api/users').pipe(
map(users => users.filter(u => u.active)),
catchError(error => {
console.error('Erro ao buscar usuários', error);
return of([]);
})
);
}
}
Spring Boot + Reactor:
@Configuration
public class WebFluxConfig {
@Bean
public RouterFunction<ServerResponse> rotas() {
return route()
.GET("/api/eventos", request ->
ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(fluxoEventos(), Evento.class))
.build();
}
}
7. Boas Práticas e Armadilhas Comuns
Evitando memory leaks:
// RxJS - gerenciamento de subscriptions
const subscription = observable$.subscribe();
subscription.unsubscribe(); // ou use takeUntil
// Reactor - evitar bloqueios
Mono.fromCallable(this::operacaoBloqueante)
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
Testando código reativo:
// RxJS - marble testing
const source$ = cold('--a--b|', { a: 1, b: 2 });
const expected$ = cold('--A--B|', { A: 2, B: 4 });
expect(source$.pipe(map(x => x * 2))).toBeObservable(expected$);
// Reactor - StepVerifier
StepVerifier.create(fluxo)
.expectNext("A", "B", "C")
.expectComplete()
.verify();
Debugging:
// RxJS
observable$.pipe(
tap(console.log),
map(transformar)
);
// Reactor
fluxo
.doOnNext(item -> log.debug("Processando: {}", item))
.flatMap(this::processar);
Referências
- Documentação oficial do RxJS — Guia completo sobre Observables, operadores e práticas recomendadas para programação reativa em JavaScript
- Reactor Reference Guide — Documentação oficial do Reactor com Mono, Flux, operadores e integração com Spring
- Reactive Manifesto — Os princípios fundamentais da programação reativa: responsivo, resiliente, elástico e orientado a mensagens
- Spring WebFlux Documentation — Guia oficial para construção de aplicações web reativas com Spring Boot e Reactor
- RxJS Marble Testing Guide — Tutorial detalhado sobre testes com diagramas marble para verificar comportamento de streams
- R2DBC Specification — Especificação para acesso reativo a bancos de dados relacionais, complementar ao Reactor em aplicações Spring