Skip to content

Releases: pellse/assembler

Assembler v0.7.5

02 Jul 03:33
4f5c87e
Compare
Choose a tag to compare

What's Changed

This release fixes an issue where there is no direct correlation ID between a top-level entity and a sub-level entity by introducing the concept of an ID join.

For example, before this release, there was no way to express the relationship between e.g. a PostDetails and a User because User doesn't have a postId field like Reply does, as described in #33.

record PostDetails(Long id, Long userId, String content) {
}

record User(Long Id, String username) { // No correlation Id back to PostDetails
}

record Reply(Long id, Long postId, Long userId, String content) {
}

record Post(PostDetails post, User author, List<Reply> replies) {
}

Assembler<PostDetails, Post> assembler = assemblerOf(Post.class)
    .withCorrelationIdResolver(PostDetails::id)
    .withRules(
        rule(XXXXX, oneToOne(call(PostDetails::userId, this::getUsersById))), // What should XXXXX be?
        rule(Reply::postId, oneToMany(Reply::id, call(this::getRepliesById))),
        Post::new)
    .build();

Since 0.7.5, this relationship can now be expressed:

Assembler<PostDetails, Post> assembler = assemblerOf(Post.class)
    .withCorrelationIdResolver(PostDetails::id)
    .withRules(
        rule(User::Id, PostDetails::userId, oneToOne(call(this::getUsersById))), // ID Join
        rule(Reply::postId, oneToMany(Reply::id, call(this::getRepliesById))),
        Post::new)
    .build();

This would be semantically equivalent to the following SQL query if all entities were stored in the same relational database:

SELECT 
    p.id AS post_id,
    p.userId AS post_userId,
    p.content AS post_content,
    u.id AS author_id,
    u.username AS author_username,
    r.id AS reply_id,
    r.postId AS reply_postId,
    r.userId AS reply_userId,
    r.content AS reply_content
FROM 
    PostDetails p
JOIN 
    User u ON p.userId = u.id -- rule(User::Id, PostDetails::userId, ...)
LEFT JOIN 
    Reply r ON p.id = r.postId -- rule(Reply::postId, ...)
WHERE 
    p.id IN (1, 2, 3); -- withCorrelationIdResolver(PostDetails::id)

Assembler v0.7.4

25 Jun 01:15
Compare
Choose a tag to compare

This release brings 2 new features:

import org.springframework.cache.CacheManager;
import static io.github.pellse.assembler.caching.spring.SpringCacheFactory.springCache;
...

@Controller
public class SpO2MonitoringGraphQLController {

  record SpO2Reading(SpO2 spO2, Patient patient, BodyMeasurement bodyMeasurement) {
  }

  private final Assembler<SpO2, SpO2Reading> spO2ReadingAssembler;

  SpO2MonitoringGraphQLController(
      PatientService ps,
      BodyMeasurementService bms,
      CacheManager cacheManager) {

    final var patientCache = cacheManager.getCache("patientCache");
    final var bodyMeasurementCache = cacheManager.getCache("bodyMeasurementCache");

    spO2ReadingAssembler = assemblerOf(SpO2Reading.class)
      .withCorrelationIdResolver(SpO2::patientId)
      .withRules(
        rule(Patient::id, oneToOne(cached(call(SpO2::healthCardNumber, ps::findPatientsByHealthCardNumber), springCache(patientCache)))),
        rule(BodyMeasurement::patientId, oneToOne(cached(call(bms::getBodyMeasurements), springCache(bodyMeasurementCache)))),
        SpO2Reading::new)
      .build();
  }
}
  • Ability to configure read and write non-blocking bounded queues in ConcurrentCache

Assembler v0.7.3

29 May 20:23
Compare
Choose a tag to compare

This release primarily aims to enhance performance. It features the following updates:

  • A redesigned Caching API that generically supports both single values and collections of values as cache entries, providing dual Map/MultiMap semantics.
  • A comprehensive overhaul of the caching concurrency logic, leading to a substantial performance improvement over the previous version.

Assembler v0.7.2

25 Apr 21:51
Compare
Choose a tag to compare

This release introduce asynchronous caching for default cache implementation, and reverts the name of the library back to Assembler (previously CohereFlux)

What's Changed

Full Changelog: v0.7.1...v0.7.2

CohereFlux v0.7.1

07 Jun 21:16
Compare
Choose a tag to compare

This is a big release, with a re-architecture of the framework allowing query functions to have access to the whole entity from the upstream instead of having to rely solely on IDs

What's Changed

  • The framework is now called CohereFlux, the whole API was modified to reflect that change
  • Now passing entities T down the entire processing chain instead of ID
  • Adding RuleMapperSource.call() to invoke a queryFunction with list of IDs instead of Collection of top level entities
  • New BatchRule API
  • Adding factory methods in CaffeineCacheFactory for max size and cache access expiry duration
  • Replaced RC with Flux<R> in CohereFlux interface, type parameters are now CohereFlux<T, R>
  • FetchFunction in CacheFactory now returns an empty Map when no RuleMapperSource is defined

Assembler v0.6.6

30 May 19:42
Compare
Choose a tag to compare

What's Changed (from Git Commit messages)

  • Adding BatchRuleBuilder and BatchRule for easier integration with Spring GraphQL
import io.github.pellse.reactive.assembler.Rule.BatchRule;

import static io.github.pellse.reactive.assembler.Rule.batchRule;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.autoCache;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;

@Controller
public class PatientObservationGraphQLController {

    private final PatientService patientService;

    private final BatchRule<Patient, BodyMeasurement> bodyMeasurementBatchRule;
    private final BatchRule<Patient, List<SpO2>> spO2BatchRule;

    PatientObservationGraphQLController(PatientService ps, BodyMeasurementService bms, SpO2StreamingService spO2ss) {

        this.patientService = ps;

        this.bodyMeasurementBatchRule = batchRule(BodyMeasurement::patientId, oneToOne(cached(bms::retrieveBodyMeasurements)))
                .withIdExtractor(Patient::id);

        this.spO2BatchRule = batchRule(SpO2::patientId, oneToMany(SpO2::id, cached(autoCache(spO2ss::spO2Flux))))
                .withIdExtractor(Patient::id);
    }

    @QueryMapping
    Flux<Patient> patients() {
        return patientService.findAllPatients();
    }

    @BatchMapping
    Mono<Map<Patient, BodyMeasurement>> bodyMeasurement(List<Patient> patients) {
        return bodyMeasurementBatchRule.executeToMono(patients);
    }

    @BatchMapping
    Flux<List<SpO2>> spO2(List<Patient> patients) {
        return spO2BatchRule.executeToFlux(patients);
    }
}

Full Changelog: v0.6.5...v0.6.6

Assembler v0.6.5

19 May 02:57
Compare
Choose a tag to compare

This release focuses on performance and consistency optimization of caching, including the auto caching feature.

What's Changed (from Git Commit messages)

  • Cache concurrency optimization by switching to MULTIPLE_READERS strategy when caching fetchFunction is guaranteed to be empty
  • Ability to configure Retry strategies with specific Scheduler in concurrent cache, same Scheduler used for auto cache and retry strategies
  • Ref count instead of boolean flag to manage start/stop in concurrentLifeCycleEventListener()

Breaking API changes:

  • concurrency() factory methods renamed to xxxRetryStrategy() in AutoCacheFactoryBuilder
var assembler = assemblerOf(Transaction.class)
  .withCorrelationIdExtractor(Customer::customerId)
  .withAssemblerRules(
    rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
      autoCacheBuilder(billingInfoFlux)
        .maxWindowSizeAndTime(100, ofSeconds(5))
        .errorHandler(error -> logger.log(WARNING, "Error in autoCache", error))
        .scheduler(newParallel("billing-info"))
        .maxRetryStrategy(50) // used to be named `concurrency()`
        .build()))),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
      autoCacheBuilder(orderItemFlux)
        .maxWindowSize(50)
        .errorHandler(onErrorMap(MyException::new))
        .scheduler(newParallel("order-item"))
        .backoffRetryStrategy(100, ofMillis(10)) // used to be named `concurrency()`
        .build()))),
    Transaction::new)
  .build();

Dependencies upgrades

  • Project Reactor 3.5.6
  • Kotlin 1.8.21

Assembler v0.6.4

15 Apr 13:23
Compare
Choose a tag to compare

What's Changed

  • New errorHandler() methods on AutoCacheFactoryBuilder to simplify error handler config when we only want to log the error and continue processing
  • New concurrency() method, ability to configure concurrency settings on AutoCacheFactoryBuilder for the underlying ConcurrentCache
import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactoryBuilder.autoCacheBuilder;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.OnErrorContinue.onErrorContinue;
import static java.time.Duration.*;
import static java.lang.System.Logger.Level.WARNING;
import static java.lang.System.getLogger;

var logger = getLogger("logger");
Consumer<Throwable> logWarning = error -> logger.log(WARNING, "Error in autoCache", error);

Flux<BillingInfo> billingInfoFlux = ... // BillingInfo data coming from e.g. Kafka;
Flux<OrderItem> orderItemFlux = ... // OrderItem data coming from e.g. Kafka;

var assembler = assemblerOf(Transaction.class)
    .withCorrelationIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
            autoCacheBuilder(billingInfoFlux)
                .maxWindowSizeAndTime(100, ofSeconds(5))
                .errorHandler(error -> logger.log(WARNING, error)) // New `errorHandler()` method
                .concurrency(20) // New `concurrency()` method -> maxAttempts = 20
                .build()))),
        rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
            autoCacheBuilder(orderItemFlux)
                .maxWindowSize(50)
                .errorHandler(logWarning) // New `errorHandler()` method
                .concurrency(20, ofMillis(10)) // New `concurrency()` method -> maxAttempts = 20, delay = 10 ms
                .build()))),
        Transaction::new)
    .build();
    
var transactionFlux = getCustomers()
    .window(3)
    .flatMapSequential(assembler::assemble);

Dependencies upgrades

  • Caffeine 3.1.6

Assembler v0.6.3

12 Apr 16:54
Compare
Choose a tag to compare

What's Changed

  • Renamed AutoCacheFactoryBuilder.autoCache() to AutoCacheFactoryBuilder.autoCacheBuilder():
import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactoryBuilder.autoCacheBuilder;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.OnErrorContinue.onErrorContinue;
import static java.time.Duration.*;
import static java.lang.System.Logger.Level.WARNING;
import static java.lang.System.getLogger;

var logger = getLogger("warning-logger");
Consumer<Throwable> logWarning = error -> logger.log(WARNING, "Error in autoCache", error);

Flux<BillingInfo> billingInfoFlux = ... // BillingInfo data coming from e.g. Kafka;
Flux<OrderItem> orderItemFlux = ... // OrderItem data coming from e.g. Kafka;

var assembler = assemblerOf(Transaction.class)
    .withCorrelationIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
            autoCacheBuilder(billingInfoFlux) // Used to be `autoCache()`, now `autoCacheBuilder()`
                .maxWindowSizeAndTime(100, ofSeconds(5))
                .errorHandler(onErrorContinue(logWarning))
                .build()))),
        rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
            autoCacheBuilder(orderItemFlux) // Used to be `autoCache()`, now `autoCacheBuilder()`
                .maxWindowSize(50)
                .errorHandler(onErrorContinue(logWarning))
                .build()))),
        Transaction::new)
    .build();
    
var transactionFlux = getCustomers()
    .window(3)
    .flatMapSequential(assembler::assemble);

Dependencies upgrades

  • Project Reactor 3.5.5

Assembler v0.6.2

12 Apr 14:18
Compare
Choose a tag to compare

What's Changed

  • New autoCache() event based helper method to avoid using AutoCacheFactoryBuilder when using default windowing strategy, error handler, life cycle management and scheduler:
import io.github.pellse.reactive.assembler.Assembler;
import io.github.pellse.reactive.assembler.caching.CacheFactory.CacheTransformer;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.autoCache;

// Example of your custom domain events not known by the Assembler Library
sealed interface MyEvent<T> {
    T item();
}
record Add<T>(T item) implements MyEvent<T> {}
record Delete<T>(T item) implements MyEvent<T> {}

record MyOtherEvent<T>(T value, boolean isAddEvent) {}

// E.g. Flux coming from a CDC/Kafka source
Flux<MyOtherEvent<BillingInfo>> billingInfoFlux = Flux.just(
    new MyOtherEvent<>(billingInfo1, true), new MyOtherEvent<>(billingInfo2, true),
    new MyOtherEvent<>(billingInfo2, false), new MyOtherEvent<>(billingInfo3, false));

// E.g. Flux coming from a CDC/Kafka source
Flux<MyEvent<OrderItem>> orderItemFlux = Flux.just(
    new Add<>(orderItem11), new Add<>(orderItem12), new Add<>(orderItem13),
    new Delete<>(orderItem31), new Delete<>(orderItem32), new Delete<>(orderItem33));

CacheTransformer<Long, BillingInfo, BillingInfo> billingInfoAutoCache =
    autoCache(billingInfoFlux, MyOtherEvent::isAddEvent, MyOtherEvent::value); // New autoCache() method

CacheTransformer<Long, OrderItem, List<OrderItem>> orderItemAutoCache =
    autoCache(orderItemFlux, Add.class::isInstance, MyEvent::item); // New autoCache() method

Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
    .withCorrelationIdExtractor(Customer::customerId)
    .withAssemblerRules(
        rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, billingInfoAutoCache))),
        rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders, orderItemAutoCache))),
        Transaction::new)
    .build();
  • Replaced onErrorStop() with onErrorContinue() as the default autoCache() error handler

Dependencies upgrades

  • Kotlin 1.8.20