Skip to content

Assembler v0.6.2

Compare
Choose a tag to compare
@pellse pellse released this 12 Apr 14:18
· 356 commits to main since this release

What's Changed

  • New autoCache() event based helper method to avoid using AutoCacheFactoryBuilder when using default windowing strategy, error handler, life cycle management and scheduler:
import io.github.pellse.reactive.assembler.Assembler;
import io.github.pellse.reactive.assembler.caching.CacheFactory.CacheTransformer;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.autoCache;

// Example of your custom domain events not known by the Assembler Library
sealed interface MyEvent<T> {
    T item();
}
record Add<T>(T item) implements MyEvent<T> {}
record Delete<T>(T item) implements MyEvent<T> {}

record MyOtherEvent<T>(T value, boolean isAddEvent) {}

// E.g. Flux coming from a CDC/Kafka source
Flux<MyOtherEvent<BillingInfo>> billingInfoFlux = Flux.just(
    new MyOtherEvent<>(billingInfo1, true), new MyOtherEvent<>(billingInfo2, true),
    new MyOtherEvent<>(billingInfo2, false), new MyOtherEvent<>(billingInfo3, false));

// E.g. Flux coming from a CDC/Kafka source
Flux<MyEvent<OrderItem>> orderItemFlux = Flux.just(
    new Add<>(orderItem11), new Add<>(orderItem12), new Add<>(orderItem13),
    new Delete<>(orderItem31), new Delete<>(orderItem32), new Delete<>(orderItem33));

CacheTransformer<Long, BillingInfo, BillingInfo> billingInfoAutoCache =
    autoCache(billingInfoFlux, MyOtherEvent::isAddEvent, MyOtherEvent::value); // New autoCache() method

CacheTransformer<Long, OrderItem, List<OrderItem>> orderItemAutoCache =
    autoCache(orderItemFlux, Add.class::isInstance, MyEvent::item); // New autoCache() method

Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
    .withCorrelationIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, billingInfoAutoCache))),
        rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders, orderItemAutoCache))),
        Transaction::new)
    .build();
  • Replaced onErrorStop() with onErrorContinue() as the default autoCache() error handler

Dependencies upgrades

  • Kotlin 1.8.20