What's Changed
- New
autoCache()
event based helper method to avoid using AutoCacheFactoryBuilder
when using default windowing strategy, error handler, life cycle management and scheduler:
import io.github.pellse.reactive.assembler.Assembler;
import io.github.pellse.reactive.assembler.caching.CacheFactory.CacheTransformer;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.autoCache;
// Example of your custom domain events not known by the Assembler Library
sealed interface MyEvent<T> {
T item();
}
record Add<T>(T item) implements MyEvent<T> {}
record Delete<T>(T item) implements MyEvent<T> {}
record MyOtherEvent<T>(T value, boolean isAddEvent) {}
// E.g. Flux coming from a CDC/Kafka source
Flux<MyOtherEvent<BillingInfo>> billingInfoFlux = Flux.just(
new MyOtherEvent<>(billingInfo1, true), new MyOtherEvent<>(billingInfo2, true),
new MyOtherEvent<>(billingInfo2, false), new MyOtherEvent<>(billingInfo3, false));
// E.g. Flux coming from a CDC/Kafka source
Flux<MyEvent<OrderItem>> orderItemFlux = Flux.just(
new Add<>(orderItem11), new Add<>(orderItem12), new Add<>(orderItem13),
new Delete<>(orderItem31), new Delete<>(orderItem32), new Delete<>(orderItem33));
CacheTransformer<Long, BillingInfo, BillingInfo> billingInfoAutoCache =
autoCache(billingInfoFlux, MyOtherEvent::isAddEvent, MyOtherEvent::value); // New autoCache() method
CacheTransformer<Long, OrderItem, List<OrderItem>> orderItemAutoCache =
autoCache(orderItemFlux, Add.class::isInstance, MyEvent::item); // New autoCache() method
Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
.withCorrelationIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, billingInfoAutoCache))),
rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders, orderItemAutoCache))),
Transaction::new)
.build();
- Replaced onErrorStop() with onErrorContinue() as the default autoCache() error handler
Dependencies upgrades