What's Changed
- New
errorHandler()
methods on AutoCacheFactoryBuilder
to simplify error handler config when we only want to log the error and continue processing
- New
concurrency()
method, ability to configure concurrency settings on AutoCacheFactoryBuilder for the underlying ConcurrentCache
import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactoryBuilder.autoCacheBuilder;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.OnErrorContinue.onErrorContinue;
import static java.time.Duration.*;
import static java.lang.System.Logger.Level.WARNING;
import static java.lang.System.getLogger;
var logger = getLogger("logger");
Consumer<Throwable> logWarning = error -> logger.log(WARNING, "Error in autoCache", error);
Flux<BillingInfo> billingInfoFlux = ... // BillingInfo data coming from e.g. Kafka;
Flux<OrderItem> orderItemFlux = ... // OrderItem data coming from e.g. Kafka;
var assembler = assemblerOf(Transaction.class)
.withCorrelationIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
autoCacheBuilder(billingInfoFlux)
.maxWindowSizeAndTime(100, ofSeconds(5))
.errorHandler(error -> logger.log(WARNING, error)) // New `errorHandler()` method
.concurrency(20) // New `concurrency()` method -> maxAttempts = 20
.build()))),
rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
autoCacheBuilder(orderItemFlux)
.maxWindowSize(50)
.errorHandler(logWarning) // New `errorHandler()` method
.concurrency(20, ofMillis(10)) // New `concurrency()` method -> maxAttempts = 20, delay = 10 ms
.build()))),
Transaction::new)
.build();
var transactionFlux = getCustomers()
.window(3)
.flatMapSequential(assembler::assemble);
Dependencies upgrades