From a61e92a6738968a78a885600411a600c075b4baa Mon Sep 17 00:00:00 2001 From: Santeri Korri Date: Fri, 29 Nov 2024 14:14:22 +0200 Subject: [PATCH] OK-611 Siirretty ConcurrencyLimiter omaan luokkaan --- .../actor/ConcurrencyLimiter.java | 160 ++++++++++++++++++ .../actor/LaskentaResurssiProvider.java | 111 +----------- 2 files changed, 164 insertions(+), 107 deletions(-) create mode 100644 src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/ConcurrencyLimiter.java diff --git a/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/ConcurrencyLimiter.java b/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/ConcurrencyLimiter.java new file mode 100644 index 000000000..59f1c2bb0 --- /dev/null +++ b/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/ConcurrencyLimiter.java @@ -0,0 +1,160 @@ +package fi.vm.sade.valinta.kooste.valintalaskenta.actor; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Luokka jonka avulla voi hallita lähtötietojen haun rinnakkaisuutta taustajärjestelmistä. Perustuu + * semaforiin, ts. siihen että rinnakkaisille kutsuille on tarjolla tietty määrä lupia, joita kutsut + * tarvitsevat. Lupia voi tarvita joko yhden per kutsu, tai sitten suuremman määrän riippuen + * haettavan data määrästä (esim. suoritusrekisteri). Tämä luokka pitää myös kirjaa odotukseen ja + * kutsujen suorittamiseen käytetystä ajasta jotta tämä voidaan raportoida CloudWatchiin. + */ +public class ConcurrencyLimiter { + + private static final Logger LOG = LoggerFactory.getLogger(ConcurrencyLimiter.class); + + public static final Duration TIMEOUT = Duration.ofMillis(Long.MIN_VALUE + 1); + public static final Duration ERROR = Duration.ofMillis(Long.MIN_VALUE + 2); + + private int maxPermits; + private final String nimi; + private final Semaphore semaphore; + private final ExecutorService executor; + private final AtomicInteger waiting; + private final AtomicInteger active; + + /** + * Luo uuden limitterin + * + * @param permits kuinka monta rinnakkaista "lupaa" taustajärjestelmäkutsuihin on tarjolla (yksi + * kutsu saattaa tarvita useamman luvan) + * @param nimi limitterin nimi + * @param executor limitterissä käytettävä {@link Executor} + */ + public ConcurrencyLimiter(int permits, String nimi, ExecutorService executor) { + this.nimi = nimi; + this.maxPermits = permits; + this.semaphore = new Semaphore(permits, true); + this.executor = executor; + this.waiting = new AtomicInteger(0); + this.active = new AtomicInteger(0); + } + + /** + * Asettaa rinnakkaisten "lupien" määrän + * + * @param newPermits uusi lupien määrä + */ + public void setMaxPermits(int newPermits) { + this.maxPermits = newPermits; + int dPermits = newPermits - this.maxPermits; + if (dPermits == 0) { + return; + } + if (dPermits > 0) { + LOG.info("Lisätään vaiheen " + this.nimi + " limitteriin " + dPermits + " permittiä."); + this.semaphore.release(dPermits); + } else { + LOG.info("Vähennetään vaiheen " + this.nimi + " limitteristä " + dPermits + " permittiä."); + this.executor.submit(() -> this.semaphore.acquireUninterruptibly(dPermits)); + } + } + + /** + * Palauttaa kuinka monta pyyntöä odottaa lupaa + * + * @return + */ + public int getWaiting() { + return this.waiting.get(); + } + + /** + * Palauttaa kuinka montaa pyyntöä suoritetaan + * + * @return + */ + public int getActive() { + return this.active.get(); + } + + /** + * Palauttaa limitterin nimen + * + * @return + */ + public String getNimi() { + return this.nimi; + } + + public static String asDurationString(Duration duration) { + if (duration == TIMEOUT) { + return "timeout"; + } else if (duration == ERROR) { + return "error"; + } + return duration.toMillis() + ""; + } + + /** + * Suoritetaan ratelimitoitu pyyntö + * + * @param permits kuinka monta "lupaa" pyyntöön tarvitaan + * @param waitDurations mappi johon tallennetaan odotuksen kesto + * @param invokeDurations mappi johon tallennetaan suorituksen kesto + * @param supplier {@link Supplier} joka palauttaa suoritettavan pyynnön + * @return {@link CompletableFuture} joka palauttaa pyynnön paluuarvon + */ + public CompletableFuture withConcurrencyLimit( + int permits, + Map waitDurations, + Map invokeDurations, + Supplier> supplier) { + + Instant waitStart = Instant.now(); + return CompletableFuture.supplyAsync( + () -> { + // haetaan lupa suorittaa pyyntö ja tallennetaan odottamiseen mennyt aika + this.waiting.incrementAndGet(); + try { + this.semaphore.acquireUninterruptibly(Math.min(this.maxPermits, permits)); + } finally { + this.waiting.decrementAndGet(); + } + Instant invokeStart = Instant.now(); + waitDurations.put(this.nimi, Duration.between(waitStart, invokeStart)); + + // suoritetaan pyyntö + this.active.incrementAndGet(); + try { + T result = + supplier + .get() + .exceptionallyAsync( + e -> { + if (e instanceof TimeoutException) { + invokeDurations.put(this.nimi, TIMEOUT); + } else { + invokeDurations.put(this.nimi, ERROR); + } + throw new CompletionException(e); + }, + this.executor) + .join(); + invokeDurations.put(this.nimi, Duration.between(invokeStart, Instant.now())); + return result; + } finally { + semaphore.release(permits); + this.active.decrementAndGet(); + } + }, + this.executor); + } +} diff --git a/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/LaskentaResurssiProvider.java b/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/LaskentaResurssiProvider.java index 262833c31..7974456da 100644 --- a/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/LaskentaResurssiProvider.java +++ b/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/LaskentaResurssiProvider.java @@ -28,7 +28,6 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -126,102 +125,6 @@ public LaskentaResurssiProvider( this.lueParametrit(); } - static class ConcurrencyLimiter { - - public static final Duration TIMEOUT = Duration.ofMillis(Long.MIN_VALUE + 1); - public static final Duration ERROR = Duration.ofMillis(Long.MIN_VALUE + 2); - - private int maxPermits; - private final String vaihe; - private final Semaphore semaphore; - private final ExecutorService executor; - private final AtomicInteger waiting; - private final AtomicInteger active; - - public ConcurrencyLimiter(int permits, String vaihe, ExecutorService executor) { - this.vaihe = vaihe; - this.maxPermits = permits; - this.semaphore = new Semaphore(permits, true); - this.executor = executor; - this.waiting = new AtomicInteger(0); - this.active = new AtomicInteger(0); - } - - public void setMaxPermits(int newPermits) { - this.maxPermits = newPermits; - int dPermits = newPermits - this.maxPermits; - if (dPermits == 0) { - return; - } - if (dPermits > 0) { - this.semaphore.release(dPermits); - } else { - this.executor.submit(() -> this.semaphore.acquireUninterruptibly(dPermits)); - } - } - - public int getWaiting() { - return this.waiting.get(); - } - - public int getActive() { - return this.active.get(); - } - - public String getVaihe() { - return this.vaihe; - } - - public static String asLabel(Duration duration) { - if (duration == TIMEOUT) { - return "timeout"; - } else if (duration == ERROR) { - return "error"; - } - return duration.toMillis() + ""; - } - - public CompletableFuture withConcurrencyLimit( - int permits, - Map waitDurations, - Map invokeDurations, - Supplier> supplier) { - - Instant waitStart = Instant.now(); - this.waiting.incrementAndGet(); - return CompletableFuture.supplyAsync( - () -> { - this.semaphore.acquireUninterruptibly(Math.min(this.maxPermits, permits)); - this.waiting.decrementAndGet(); - this.active.incrementAndGet(); - try { - Instant invokeStart = Instant.now(); - waitDurations.put(this.vaihe, Duration.between(waitStart, invokeStart)); - T result = - supplier - .get() - .exceptionallyAsync( - e -> { - if (e instanceof TimeoutException) { - invokeDurations.put(this.vaihe, TIMEOUT); - } else { - invokeDurations.put(this.vaihe, ERROR); - } - throw new CompletionException(e); - }, - this.executor) - .join(); - invokeDurations.put(this.vaihe, Duration.between(invokeStart, Instant.now())); - return result; - } finally { - semaphore.release(permits); - this.active.decrementAndGet(); - } - }, - this.executor); - } - } - @Scheduled(initialDelay = 15, fixedDelay = 15, timeUnit = TimeUnit.SECONDS) public void lueParametrit() { this.executor.submit( @@ -309,7 +212,7 @@ private void tallennaLokitJaMetriikat( + hakukohdeOid + ": " + waitDurations.entrySet().stream() - .map(e -> e.getKey() + ":" + ConcurrencyLimiter.asLabel(e.getValue())) + .map(e -> e.getKey() + ":" + ConcurrencyLimiter.asDurationString(e.getValue())) .collect(Collectors.joining(", "))); LOG.info( @@ -317,7 +220,7 @@ private void tallennaLokitJaMetriikat( + hakukohdeOid + ": " + invokeDurations.entrySet().stream() - .map(e -> e.getKey() + ":" + ConcurrencyLimiter.asLabel(e.getValue())) + .map(e -> e.getKey() + ":" + ConcurrencyLimiter.asDurationString(e.getValue())) .collect(Collectors.joining(", "))); } @@ -336,10 +239,7 @@ public void tallennaMaarat() { .storageResolution(1) .dimensions( List.of( - Dimension.builder() - .name("vaihe") - .value(limiter.getVaihe()) - .build())) + Dimension.builder().name("vaihe").value(limiter.getNimi()).build())) .timestamp(Instant.now()) .unit(StandardUnit.COUNT) .build()) @@ -356,10 +256,7 @@ public void tallennaMaarat() { .storageResolution(1) .dimensions( List.of( - Dimension.builder() - .name("vaihe") - .value(limiter.getVaihe()) - .build())) + Dimension.builder().name("vaihe").value(limiter.getNimi()).build())) .timestamp(Instant.now()) .unit(StandardUnit.COUNT) .build())