-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
OK-611 Siirretty ConcurrencyLimiter omaan luokkaan
- Loading branch information
Showing
2 changed files
with
164 additions
and
101 deletions.
There are no files selected for viewing
160 changes: 160 additions & 0 deletions
160
src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/ConcurrencyLimiter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
package fi.vm.sade.valinta.kooste.valintalaskenta.actor; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.Map; | ||
import java.util.concurrent.*; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Luokka jonka avulla voi hallita lähtötietojen haun rinnakkaisuutta taustajärjestelmistä. Perustuu semaforiin, ts. | ||
* siihen että rinnakkaisille kutsuille on tarjolla tietty määrä lupia, joita kutsut tarvitsevat. Lupia voi tarvita joko | ||
* yhden per kutsu, tai sitten suuremman määrän riippuen haettavan data määrästä (esim. suoritusrekisteri). Tämä luokka | ||
* pitää myös kirjaa odotukseen ja kutsujen suorittamiseen käytetystä ajasta jotta tämä voidaan raportoida CloudWatchiin. | ||
*/ | ||
public class ConcurrencyLimiter { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(ConcurrencyLimiter.class); | ||
|
||
public static final Duration TIMEOUT = Duration.ofMillis(Long.MIN_VALUE + 1); | ||
public static final Duration ERROR = Duration.ofMillis(Long.MIN_VALUE + 2); | ||
|
||
private int maxPermits; | ||
private final String nimi; | ||
private final Semaphore semaphore; | ||
private final ExecutorService executor; | ||
private final AtomicInteger waiting; | ||
private final AtomicInteger active; | ||
|
||
/** | ||
* Luo uuden limitterin | ||
* | ||
* @param permits kuinka monta rinnakkaista "lupaa" taustajärjestelmäkutsuihin on tarjolla (yksi kutsu saattaa tarvita | ||
* useamman luvan) | ||
* @param nimi limitterin nimi | ||
* @param executor limitterissä käytettävä {@link Executor} | ||
*/ | ||
public ConcurrencyLimiter(int permits, String nimi, ExecutorService executor) { | ||
this.nimi = nimi; | ||
this.maxPermits = permits; | ||
this.semaphore = new Semaphore(permits, true); | ||
this.executor = executor; | ||
this.waiting = new AtomicInteger(0); | ||
this.active = new AtomicInteger(0); | ||
} | ||
|
||
/** | ||
* Asettaa rinnakkaisten "lupien" määrän | ||
* | ||
* @param newPermits uusi lupien määrä | ||
*/ | ||
public void setMaxPermits(int newPermits) { | ||
this.maxPermits = newPermits; | ||
int dPermits = newPermits - this.maxPermits; | ||
if (dPermits == 0) { | ||
return; | ||
} | ||
if (dPermits > 0) { | ||
LOG.info("Lisätään vaiheen " + this.nimi + " limitteriin " + dPermits + " permittiä."); | ||
this.semaphore.release(dPermits); | ||
} else { | ||
LOG.info("Vähennetään vaiheen " + this.nimi + " limitteristä " + dPermits + " permittiä."); | ||
this.executor.submit(() -> this.semaphore.acquireUninterruptibly(dPermits)); | ||
} | ||
} | ||
|
||
/** | ||
* Palauttaa kuinka monta pyyntöä odottaa lupaa | ||
* | ||
* @return | ||
*/ | ||
public int getWaiting() { | ||
return this.waiting.get(); | ||
} | ||
|
||
/** | ||
* Palauttaa kuinka montaa pyyntöä suoritetaan | ||
* | ||
* @return | ||
*/ | ||
public int getActive() { | ||
return this.active.get(); | ||
} | ||
|
||
/** | ||
* Palauttaa limitterin nimen | ||
* | ||
* @return | ||
*/ | ||
public String getNimi() { | ||
return this.nimi; | ||
} | ||
|
||
public static String asDurationString(Duration duration) { | ||
if (duration == TIMEOUT) { | ||
return "timeout"; | ||
} else if (duration == ERROR) { | ||
return "error"; | ||
} | ||
return duration.toMillis() + ""; | ||
} | ||
|
||
/** | ||
* Suoritetaan ratelimitoitu pyyntö | ||
* | ||
* @param permits kuinka monta "lupaa" pyyntöön tarvitaan | ||
* @param waitDurations mappi johon tallennetaan odotuksen kesto | ||
* @param invokeDurations mappi johon tallennetaan suorituksen kesto | ||
* @param supplier {@link Supplier} joka palauttaa suoritettavan pyynnön | ||
* @return {@link CompletableFuture} joka palauttaa pyynnön paluuarvon | ||
*/ | ||
public <T> CompletableFuture<T> withConcurrencyLimit( | ||
int permits, | ||
Map<String, Duration> waitDurations, | ||
Map<String, Duration> invokeDurations, | ||
Supplier<CompletableFuture<T>> supplier) { | ||
|
||
Instant waitStart = Instant.now(); | ||
return CompletableFuture.supplyAsync( | ||
() -> { | ||
// haetaan lupa suorittaa pyyntö ja tallennetaan odottamiseen mennyt aika | ||
this.waiting.incrementAndGet(); | ||
try { | ||
this.semaphore.acquireUninterruptibly(Math.min(this.maxPermits, permits)); | ||
} finally { | ||
this.waiting.decrementAndGet(); | ||
} | ||
Instant invokeStart = Instant.now(); | ||
waitDurations.put(this.nimi, Duration.between(waitStart, invokeStart)); | ||
|
||
// suoritetaan pyyntö | ||
this.active.incrementAndGet(); | ||
try { | ||
T result = | ||
supplier | ||
.get() | ||
.exceptionallyAsync( | ||
e -> { | ||
if (e instanceof TimeoutException) { | ||
invokeDurations.put(this.nimi, TIMEOUT); | ||
} else { | ||
invokeDurations.put(this.nimi, ERROR); | ||
} | ||
throw new CompletionException(e); | ||
}, | ||
this.executor) | ||
.join(); | ||
invokeDurations.put(this.nimi, Duration.between(invokeStart, Instant.now())); | ||
return result; | ||
} finally { | ||
semaphore.release(permits); | ||
this.active.decrementAndGet(); | ||
} | ||
}, | ||
this.executor); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters