Skip to content

Assembler v0.6.4

Compare
Choose a tag to compare
@pellse pellse released this 15 Apr 13:23
· 333 commits to main since this release

What's Changed

  • New errorHandler() methods on AutoCacheFactoryBuilder to simplify error handler config when we only want to log the error and continue processing
  • New concurrency() method, ability to configure concurrency settings on AutoCacheFactoryBuilder for the underlying ConcurrentCache
import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactoryBuilder.autoCacheBuilder;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.OnErrorContinue.onErrorContinue;
import static java.time.Duration.*;
import static java.lang.System.Logger.Level.WARNING;
import static java.lang.System.getLogger;

var logger = getLogger("logger");
Consumer<Throwable> logWarning = error -> logger.log(WARNING, "Error in autoCache", error);

Flux<BillingInfo> billingInfoFlux = ... // BillingInfo data coming from e.g. Kafka;
Flux<OrderItem> orderItemFlux = ... // OrderItem data coming from e.g. Kafka;

var assembler = assemblerOf(Transaction.class)
    .withCorrelationIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
            autoCacheBuilder(billingInfoFlux)
                .maxWindowSizeAndTime(100, ofSeconds(5))
                .errorHandler(error -> logger.log(WARNING, error)) // New `errorHandler()` method
                .concurrency(20) // New `concurrency()` method -> maxAttempts = 20
                .build()))),
        rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
            autoCacheBuilder(orderItemFlux)
                .maxWindowSize(50)
                .errorHandler(logWarning) // New `errorHandler()` method
                .concurrency(20, ofMillis(10)) // New `concurrency()` method -> maxAttempts = 20, delay = 10 ms
                .build()))),
        Transaction::new)
    .build();
    
var transactionFlux = getCustomers()
    .window(3)
    .flatMapSequential(assembler::assemble);

Dependencies upgrades

  • Caffeine 3.1.6