Skip to content

Commit

Permalink
OK-611 Siirretty ConcurrencyLimiter omaan luokkaan
Browse files Browse the repository at this point in the history
  • Loading branch information
jkorri committed Nov 29, 2024
1 parent 6266569 commit a61e92a
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package fi.vm.sade.valinta.kooste.valintalaskenta.actor;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Luokka jonka avulla voi hallita lähtötietojen haun rinnakkaisuutta taustajärjestelmistä. Perustuu
* semaforiin, ts. siihen että rinnakkaisille kutsuille on tarjolla tietty määrä lupia, joita kutsut
* tarvitsevat. Lupia voi tarvita joko yhden per kutsu, tai sitten suuremman määrän riippuen
* haettavan data määrästä (esim. suoritusrekisteri). Tämä luokka pitää myös kirjaa odotukseen ja
* kutsujen suorittamiseen käytetystä ajasta jotta tämä voidaan raportoida CloudWatchiin.
*/
public class ConcurrencyLimiter {

private static final Logger LOG = LoggerFactory.getLogger(ConcurrencyLimiter.class);

public static final Duration TIMEOUT = Duration.ofMillis(Long.MIN_VALUE + 1);
public static final Duration ERROR = Duration.ofMillis(Long.MIN_VALUE + 2);

private int maxPermits;
private final String nimi;
private final Semaphore semaphore;
private final ExecutorService executor;
private final AtomicInteger waiting;
private final AtomicInteger active;

/**
* Luo uuden limitterin
*
* @param permits kuinka monta rinnakkaista "lupaa" taustajärjestelmäkutsuihin on tarjolla (yksi
* kutsu saattaa tarvita useamman luvan)
* @param nimi limitterin nimi
* @param executor limitterissä käytettävä {@link Executor}
*/
public ConcurrencyLimiter(int permits, String nimi, ExecutorService executor) {
this.nimi = nimi;
this.maxPermits = permits;
this.semaphore = new Semaphore(permits, true);
this.executor = executor;
this.waiting = new AtomicInteger(0);
this.active = new AtomicInteger(0);
}

/**
* Asettaa rinnakkaisten "lupien" määrän
*
* @param newPermits uusi lupien määrä
*/
public void setMaxPermits(int newPermits) {
this.maxPermits = newPermits;
int dPermits = newPermits - this.maxPermits;
if (dPermits == 0) {
return;
}
if (dPermits > 0) {
LOG.info("Lisätään vaiheen " + this.nimi + " limitteriin " + dPermits + " permittiä.");
this.semaphore.release(dPermits);
} else {
LOG.info("Vähennetään vaiheen " + this.nimi + " limitteristä " + dPermits + " permittiä.");
this.executor.submit(() -> this.semaphore.acquireUninterruptibly(dPermits));
}
}

/**
* Palauttaa kuinka monta pyyntöä odottaa lupaa
*
* @return
*/
public int getWaiting() {
return this.waiting.get();
}

/**
* Palauttaa kuinka montaa pyyntöä suoritetaan
*
* @return
*/
public int getActive() {
return this.active.get();
}

/**
* Palauttaa limitterin nimen
*
* @return
*/
public String getNimi() {
return this.nimi;
}

public static String asDurationString(Duration duration) {
if (duration == TIMEOUT) {
return "timeout";
} else if (duration == ERROR) {
return "error";
}
return duration.toMillis() + "";
}

/**
* Suoritetaan ratelimitoitu pyyntö
*
* @param permits kuinka monta "lupaa" pyyntöön tarvitaan
* @param waitDurations mappi johon tallennetaan odotuksen kesto
* @param invokeDurations mappi johon tallennetaan suorituksen kesto
* @param supplier {@link Supplier} joka palauttaa suoritettavan pyynnön
* @return {@link CompletableFuture} joka palauttaa pyynnön paluuarvon
*/
public <T> CompletableFuture<T> withConcurrencyLimit(
int permits,
Map<String, Duration> waitDurations,
Map<String, Duration> invokeDurations,
Supplier<CompletableFuture<T>> supplier) {

Instant waitStart = Instant.now();
return CompletableFuture.supplyAsync(
() -> {
// haetaan lupa suorittaa pyyntö ja tallennetaan odottamiseen mennyt aika
this.waiting.incrementAndGet();
try {
this.semaphore.acquireUninterruptibly(Math.min(this.maxPermits, permits));
} finally {
this.waiting.decrementAndGet();
}
Instant invokeStart = Instant.now();
waitDurations.put(this.nimi, Duration.between(waitStart, invokeStart));

// suoritetaan pyyntö
this.active.incrementAndGet();
try {
T result =
supplier
.get()
.exceptionallyAsync(
e -> {
if (e instanceof TimeoutException) {
invokeDurations.put(this.nimi, TIMEOUT);
} else {
invokeDurations.put(this.nimi, ERROR);
}
throw new CompletionException(e);
},
this.executor)
.join();
invokeDurations.put(this.nimi, Duration.between(invokeStart, Instant.now()));
return result;
} finally {
semaphore.release(permits);
this.active.decrementAndGet();
}
},
this.executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -126,102 +125,6 @@ public LaskentaResurssiProvider(
this.lueParametrit();
}

static class ConcurrencyLimiter {

public static final Duration TIMEOUT = Duration.ofMillis(Long.MIN_VALUE + 1);
public static final Duration ERROR = Duration.ofMillis(Long.MIN_VALUE + 2);

private int maxPermits;
private final String vaihe;
private final Semaphore semaphore;
private final ExecutorService executor;
private final AtomicInteger waiting;
private final AtomicInteger active;

public ConcurrencyLimiter(int permits, String vaihe, ExecutorService executor) {
this.vaihe = vaihe;
this.maxPermits = permits;
this.semaphore = new Semaphore(permits, true);
this.executor = executor;
this.waiting = new AtomicInteger(0);
this.active = new AtomicInteger(0);
}

public void setMaxPermits(int newPermits) {
this.maxPermits = newPermits;
int dPermits = newPermits - this.maxPermits;
if (dPermits == 0) {
return;
}
if (dPermits > 0) {
this.semaphore.release(dPermits);
} else {
this.executor.submit(() -> this.semaphore.acquireUninterruptibly(dPermits));
}
}

public int getWaiting() {
return this.waiting.get();
}

public int getActive() {
return this.active.get();
}

public String getVaihe() {
return this.vaihe;
}

public static String asLabel(Duration duration) {
if (duration == TIMEOUT) {
return "timeout";
} else if (duration == ERROR) {
return "error";
}
return duration.toMillis() + "";
}

public <T> CompletableFuture<T> withConcurrencyLimit(
int permits,
Map<String, Duration> waitDurations,
Map<String, Duration> invokeDurations,
Supplier<CompletableFuture<T>> supplier) {

Instant waitStart = Instant.now();
this.waiting.incrementAndGet();
return CompletableFuture.supplyAsync(
() -> {
this.semaphore.acquireUninterruptibly(Math.min(this.maxPermits, permits));
this.waiting.decrementAndGet();
this.active.incrementAndGet();
try {
Instant invokeStart = Instant.now();
waitDurations.put(this.vaihe, Duration.between(waitStart, invokeStart));
T result =
supplier
.get()
.exceptionallyAsync(
e -> {
if (e instanceof TimeoutException) {
invokeDurations.put(this.vaihe, TIMEOUT);
} else {
invokeDurations.put(this.vaihe, ERROR);
}
throw new CompletionException(e);
},
this.executor)
.join();
invokeDurations.put(this.vaihe, Duration.between(invokeStart, Instant.now()));
return result;
} finally {
semaphore.release(permits);
this.active.decrementAndGet();
}
},
this.executor);
}
}

@Scheduled(initialDelay = 15, fixedDelay = 15, timeUnit = TimeUnit.SECONDS)
public void lueParametrit() {
this.executor.submit(
Expand Down Expand Up @@ -309,15 +212,15 @@ private void tallennaLokitJaMetriikat(
+ hakukohdeOid
+ ": "
+ waitDurations.entrySet().stream()
.map(e -> e.getKey() + ":" + ConcurrencyLimiter.asLabel(e.getValue()))
.map(e -> e.getKey() + ":" + ConcurrencyLimiter.asDurationString(e.getValue()))
.collect(Collectors.joining(", ")));

LOG.info(
"Kestot: Hakukohde: "
+ hakukohdeOid
+ ": "
+ invokeDurations.entrySet().stream()
.map(e -> e.getKey() + ":" + ConcurrencyLimiter.asLabel(e.getValue()))
.map(e -> e.getKey() + ":" + ConcurrencyLimiter.asDurationString(e.getValue()))
.collect(Collectors.joining(", ")));
}

Expand All @@ -336,10 +239,7 @@ public void tallennaMaarat() {
.storageResolution(1)
.dimensions(
List.of(
Dimension.builder()
.name("vaihe")
.value(limiter.getVaihe())
.build()))
Dimension.builder().name("vaihe").value(limiter.getNimi()).build()))
.timestamp(Instant.now())
.unit(StandardUnit.COUNT)
.build())
Expand All @@ -356,10 +256,7 @@ public void tallennaMaarat() {
.storageResolution(1)
.dimensions(
List.of(
Dimension.builder()
.name("vaihe")
.value(limiter.getVaihe())
.build()))
Dimension.builder().name("vaihe").value(limiter.getNimi()).build()))
.timestamp(Instant.now())
.unit(StandardUnit.COUNT)
.build())
Expand Down

0 comments on commit a61e92a

Please sign in to comment.