Releases: pellse/assembler
Assembler v0.7.5
What's Changed
This release fixes an issue where there is no direct correlation ID between a top-level entity and a sub-level entity by introducing the concept of an ID join.
For example, before this release, there was no way to express the relationship between e.g. a PostDetails
and a User
because User
doesn't have a postId
field like Reply
does, as described in #33.
record PostDetails(Long id, Long userId, String content) {
}
record User(Long Id, String username) { // No correlation Id back to PostDetails
}
record Reply(Long id, Long postId, Long userId, String content) {
}
record Post(PostDetails post, User author, List<Reply> replies) {
}
Assembler<PostDetails, Post> assembler = assemblerOf(Post.class)
.withCorrelationIdResolver(PostDetails::id)
.withRules(
rule(XXXXX, oneToOne(call(PostDetails::userId, this::getUsersById))), // What should XXXXX be?
rule(Reply::postId, oneToMany(Reply::id, call(this::getRepliesById))),
Post::new)
.build();
Since 0.7.5, this relationship can now be expressed:
Assembler<PostDetails, Post> assembler = assemblerOf(Post.class)
.withCorrelationIdResolver(PostDetails::id)
.withRules(
rule(User::Id, PostDetails::userId, oneToOne(call(this::getUsersById))), // ID Join
rule(Reply::postId, oneToMany(Reply::id, call(this::getRepliesById))),
Post::new)
.build();
This would be semantically equivalent to the following SQL query if all entities were stored in the same relational database:
SELECT
p.id AS post_id,
p.userId AS post_userId,
p.content AS post_content,
u.id AS author_id,
u.username AS author_username,
r.id AS reply_id,
r.postId AS reply_postId,
r.userId AS reply_userId,
r.content AS reply_content
FROM
PostDetails p
JOIN
User u ON p.userId = u.id -- rule(User::Id, PostDetails::userId, ...)
LEFT JOIN
Reply r ON p.id = r.postId -- rule(Reply::postId, ...)
WHERE
p.id IN (1, 2, 3); -- withCorrelationIdResolver(PostDetails::id)
Assembler v0.7.4
This release brings 2 new features:
- Support for Spring Caching, allowing the use of any caching implementation supported by the Spring Framework (example taken from demo project https://github.com/pellse/assembler-spring-graphql-example):
import org.springframework.cache.CacheManager;
import static io.github.pellse.assembler.caching.spring.SpringCacheFactory.springCache;
...
@Controller
public class SpO2MonitoringGraphQLController {
record SpO2Reading(SpO2 spO2, Patient patient, BodyMeasurement bodyMeasurement) {
}
private final Assembler<SpO2, SpO2Reading> spO2ReadingAssembler;
SpO2MonitoringGraphQLController(
PatientService ps,
BodyMeasurementService bms,
CacheManager cacheManager) {
final var patientCache = cacheManager.getCache("patientCache");
final var bodyMeasurementCache = cacheManager.getCache("bodyMeasurementCache");
spO2ReadingAssembler = assemblerOf(SpO2Reading.class)
.withCorrelationIdResolver(SpO2::patientId)
.withRules(
rule(Patient::id, oneToOne(cached(call(SpO2::healthCardNumber, ps::findPatientsByHealthCardNumber), springCache(patientCache)))),
rule(BodyMeasurement::patientId, oneToOne(cached(call(bms::getBodyMeasurements), springCache(bodyMeasurementCache)))),
SpO2Reading::new)
.build();
}
}
- Ability to configure read and write non-blocking bounded queues in
ConcurrentCache
Assembler v0.7.3
This release primarily aims to enhance performance. It features the following updates:
- A redesigned Caching API that generically supports both single values and collections of values as cache entries, providing dual Map/MultiMap semantics.
- A comprehensive overhaul of the caching concurrency logic, leading to a substantial performance improvement over the previous version.
Assembler v0.7.2
This release introduce asynchronous caching for default cache implementation, and reverts the name of the library back to Assembler
(previously CohereFlux
)
What's Changed
Full Changelog: v0.7.1...v0.7.2
CohereFlux v0.7.1
This is a big release, with a re-architecture of the framework allowing query functions to have access to the whole entity from the upstream instead of having to rely solely on IDs
What's Changed
- The framework is now called CohereFlux, the whole API was modified to reflect that change
- Now passing entities T down the entire processing chain instead of ID
- Adding
RuleMapperSource.call()
to invoke a queryFunction with list of IDs instead of Collection of top level entities - New
BatchRule
API - Adding factory methods in
CaffeineCacheFactory
for max size and cache access expiry duration - Replaced
RC
withFlux<R>
in CohereFlux interface, type parameters are nowCohereFlux<T, R>
- FetchFunction in
CacheFactory
now returns an empty Map when noRuleMapperSource
is defined
Assembler v0.6.6
What's Changed (from Git Commit messages)
- Adding
BatchRuleBuilder
andBatchRule
for easier integration with Spring GraphQL
import io.github.pellse.reactive.assembler.Rule.BatchRule;
import static io.github.pellse.reactive.assembler.Rule.batchRule;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.autoCache;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
@Controller
public class PatientObservationGraphQLController {
private final PatientService patientService;
private final BatchRule<Patient, BodyMeasurement> bodyMeasurementBatchRule;
private final BatchRule<Patient, List<SpO2>> spO2BatchRule;
PatientObservationGraphQLController(PatientService ps, BodyMeasurementService bms, SpO2StreamingService spO2ss) {
this.patientService = ps;
this.bodyMeasurementBatchRule = batchRule(BodyMeasurement::patientId, oneToOne(cached(bms::retrieveBodyMeasurements)))
.withIdExtractor(Patient::id);
this.spO2BatchRule = batchRule(SpO2::patientId, oneToMany(SpO2::id, cached(autoCache(spO2ss::spO2Flux))))
.withIdExtractor(Patient::id);
}
@QueryMapping
Flux<Patient> patients() {
return patientService.findAllPatients();
}
@BatchMapping
Mono<Map<Patient, BodyMeasurement>> bodyMeasurement(List<Patient> patients) {
return bodyMeasurementBatchRule.executeToMono(patients);
}
@BatchMapping
Flux<List<SpO2>> spO2(List<Patient> patients) {
return spO2BatchRule.executeToFlux(patients);
}
}
Full Changelog: v0.6.5...v0.6.6
Assembler v0.6.5
This release focuses on performance and consistency optimization of caching, including the auto caching feature.
What's Changed (from Git Commit messages)
- Cache concurrency optimization by switching to
MULTIPLE_READERS
strategy when caching fetchFunction is guaranteed to be empty - Ability to configure Retry strategies with specific
Scheduler
in concurrent cache, sameScheduler
used for auto cache and retry strategies - Ref count instead of boolean flag to manage start/stop in
concurrentLifeCycleEventListener()
Breaking API changes:
concurrency()
factory methods renamed toxxxRetryStrategy()
inAutoCacheFactoryBuilder
var assembler = assemblerOf(Transaction.class)
.withCorrelationIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
autoCacheBuilder(billingInfoFlux)
.maxWindowSizeAndTime(100, ofSeconds(5))
.errorHandler(error -> logger.log(WARNING, "Error in autoCache", error))
.scheduler(newParallel("billing-info"))
.maxRetryStrategy(50) // used to be named `concurrency()`
.build()))),
rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
autoCacheBuilder(orderItemFlux)
.maxWindowSize(50)
.errorHandler(onErrorMap(MyException::new))
.scheduler(newParallel("order-item"))
.backoffRetryStrategy(100, ofMillis(10)) // used to be named `concurrency()`
.build()))),
Transaction::new)
.build();
Dependencies upgrades
- Project Reactor 3.5.6
- Kotlin 1.8.21
Assembler v0.6.4
What's Changed
- New
errorHandler()
methods onAutoCacheFactoryBuilder
to simplify error handler config when we only want to log the error and continue processing - New
concurrency()
method, ability to configure concurrency settings on AutoCacheFactoryBuilder for the underlying ConcurrentCache
import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactoryBuilder.autoCacheBuilder;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.OnErrorContinue.onErrorContinue;
import static java.time.Duration.*;
import static java.lang.System.Logger.Level.WARNING;
import static java.lang.System.getLogger;
var logger = getLogger("logger");
Consumer<Throwable> logWarning = error -> logger.log(WARNING, "Error in autoCache", error);
Flux<BillingInfo> billingInfoFlux = ... // BillingInfo data coming from e.g. Kafka;
Flux<OrderItem> orderItemFlux = ... // OrderItem data coming from e.g. Kafka;
var assembler = assemblerOf(Transaction.class)
.withCorrelationIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
autoCacheBuilder(billingInfoFlux)
.maxWindowSizeAndTime(100, ofSeconds(5))
.errorHandler(error -> logger.log(WARNING, error)) // New `errorHandler()` method
.concurrency(20) // New `concurrency()` method -> maxAttempts = 20
.build()))),
rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
autoCacheBuilder(orderItemFlux)
.maxWindowSize(50)
.errorHandler(logWarning) // New `errorHandler()` method
.concurrency(20, ofMillis(10)) // New `concurrency()` method -> maxAttempts = 20, delay = 10 ms
.build()))),
Transaction::new)
.build();
var transactionFlux = getCustomers()
.window(3)
.flatMapSequential(assembler::assemble);
Dependencies upgrades
- Caffeine 3.1.6
Assembler v0.6.3
What's Changed
- Renamed
AutoCacheFactoryBuilder.autoCache()
toAutoCacheFactoryBuilder.autoCacheBuilder()
:
import reactor.core.publisher.Flux;
import io.github.pellse.reactive.assembler.Assembler;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactoryBuilder.autoCacheBuilder;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.OnErrorContinue.onErrorContinue;
import static java.time.Duration.*;
import static java.lang.System.Logger.Level.WARNING;
import static java.lang.System.getLogger;
var logger = getLogger("warning-logger");
Consumer<Throwable> logWarning = error -> logger.log(WARNING, "Error in autoCache", error);
Flux<BillingInfo> billingInfoFlux = ... // BillingInfo data coming from e.g. Kafka;
Flux<OrderItem> orderItemFlux = ... // OrderItem data coming from e.g. Kafka;
var assembler = assemblerOf(Transaction.class)
.withCorrelationIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo,
autoCacheBuilder(billingInfoFlux) // Used to be `autoCache()`, now `autoCacheBuilder()`
.maxWindowSizeAndTime(100, ofSeconds(5))
.errorHandler(onErrorContinue(logWarning))
.build()))),
rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders,
autoCacheBuilder(orderItemFlux) // Used to be `autoCache()`, now `autoCacheBuilder()`
.maxWindowSize(50)
.errorHandler(onErrorContinue(logWarning))
.build()))),
Transaction::new)
.build();
var transactionFlux = getCustomers()
.window(3)
.flatMapSequential(assembler::assemble);
Dependencies upgrades
- Project Reactor 3.5.5
Assembler v0.6.2
What's Changed
- New
autoCache()
event based helper method to avoid usingAutoCacheFactoryBuilder
when using default windowing strategy, error handler, life cycle management and scheduler:
import io.github.pellse.reactive.assembler.Assembler;
import io.github.pellse.reactive.assembler.caching.CacheFactory.CacheTransformer;
import static io.github.pellse.reactive.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToMany;
import static io.github.pellse.reactive.assembler.RuleMapper.oneToOne;
import static io.github.pellse.reactive.assembler.Rule.rule;
import static io.github.pellse.reactive.assembler.caching.CacheFactory.cached;
import static io.github.pellse.reactive.assembler.caching.AutoCacheFactory.autoCache;
// Example of your custom domain events not known by the Assembler Library
sealed interface MyEvent<T> {
T item();
}
record Add<T>(T item) implements MyEvent<T> {}
record Delete<T>(T item) implements MyEvent<T> {}
record MyOtherEvent<T>(T value, boolean isAddEvent) {}
// E.g. Flux coming from a CDC/Kafka source
Flux<MyOtherEvent<BillingInfo>> billingInfoFlux = Flux.just(
new MyOtherEvent<>(billingInfo1, true), new MyOtherEvent<>(billingInfo2, true),
new MyOtherEvent<>(billingInfo2, false), new MyOtherEvent<>(billingInfo3, false));
// E.g. Flux coming from a CDC/Kafka source
Flux<MyEvent<OrderItem>> orderItemFlux = Flux.just(
new Add<>(orderItem11), new Add<>(orderItem12), new Add<>(orderItem13),
new Delete<>(orderItem31), new Delete<>(orderItem32), new Delete<>(orderItem33));
CacheTransformer<Long, BillingInfo, BillingInfo> billingInfoAutoCache =
autoCache(billingInfoFlux, MyOtherEvent::isAddEvent, MyOtherEvent::value); // New autoCache() method
CacheTransformer<Long, OrderItem, List<OrderItem>> orderItemAutoCache =
autoCache(orderItemFlux, Add.class::isInstance, MyEvent::item); // New autoCache() method
Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class)
.withCorrelationIdExtractor(Customer::customerId)
.withAssemblerRules(
rule(BillingInfo::customerId, oneToOne(cached(this::getBillingInfo, billingInfoAutoCache))),
rule(OrderItem::customerId, oneToMany(OrderItem::id, cached(this::getAllOrders, orderItemAutoCache))),
Transaction::new)
.build();
- Replaced onErrorStop() with onErrorContinue() as the default autoCache() error handler
Dependencies upgrades
- Kotlin 1.8.20