Skip to content

Commit

Permalink
OK-611 Refaktoroitu metriikoita
Browse files Browse the repository at this point in the history
  • Loading branch information
jkorri committed Nov 25, 2024
1 parent 7ceff44 commit 6266569
Showing 1 changed file with 95 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,23 @@ public LaskentaResurssiProvider(

static class ConcurrencyLimiter {

public static final Duration TIMEOUT = Duration.ofMillis(Long.MIN_VALUE + 1);
public static final Duration ERROR = Duration.ofMillis(Long.MIN_VALUE + 2);

private int maxPermits;
private final String vaihe;
private final Semaphore semaphore;
private final ExecutorService executor;
private final AtomicInteger ongoing;
private final AtomicInteger waiting;
private final AtomicInteger active;

public ConcurrencyLimiter(int permits, String vaihe, ExecutorService executor) {
this.vaihe = vaihe;
this.maxPermits = permits;
this.semaphore = new Semaphore(permits, true);
this.executor = executor;
this.ongoing = new AtomicInteger(0);
this.waiting = new AtomicInteger(0);
this.active = new AtomicInteger(0);
}

public void setMaxPermits(int newPermits) {
Expand All @@ -155,36 +160,62 @@ public void setMaxPermits(int newPermits) {
}
}

public int getOngoing() {
return this.ongoing.get();
public int getWaiting() {
return this.waiting.get();
}

public int getActive() {
return this.active.get();
}

public String getVaihe() {
return this.vaihe;
}

public static String asLabel(Duration duration) {
if (duration == TIMEOUT) {
return "timeout";
} else if (duration == ERROR) {
return "error";
}
return duration.toMillis() + "";
}

public <T> CompletableFuture<T> withConcurrencyLimit(
int permits,
Map<String, Optional<Duration>> waitDurations,
Map<String, Optional<Duration>> invokeDurations,
Map<String, Duration> waitDurations,
Map<String, Duration> invokeDurations,
Supplier<CompletableFuture<T>> supplier) {
invokeDurations.put(this.vaihe, Optional.empty()); // invoket voi timeoutata

Instant waitStart = Instant.now();
this.ongoing.incrementAndGet();
this.waiting.incrementAndGet();
return CompletableFuture.supplyAsync(
() -> {
this.semaphore.acquireUninterruptibly(Math.min(this.maxPermits, permits));
this.waiting.decrementAndGet();
this.active.incrementAndGet();
try {
Instant invokeStart = Instant.now();
waitDurations.put(this.vaihe, Optional.of(Duration.between(waitStart, invokeStart)));
T result = supplier.get().join();
invokeDurations.put(
this.vaihe, Optional.of(Duration.between(invokeStart, Instant.now())));
waitDurations.put(this.vaihe, Duration.between(waitStart, invokeStart));
T result =
supplier
.get()
.exceptionallyAsync(
e -> {
if (e instanceof TimeoutException) {
invokeDurations.put(this.vaihe, TIMEOUT);
} else {
invokeDurations.put(this.vaihe, ERROR);
}
throw new CompletionException(e);
},
this.executor)
.join();
invokeDurations.put(this.vaihe, Duration.between(invokeStart, Instant.now()));
return result;
} finally {
semaphore.release(permits);
this.ongoing.decrementAndGet();
this.active.decrementAndGet();
}
},
this.executor);
Expand All @@ -208,16 +239,16 @@ public void lueParametrit() {

private void tallennaLokitJaMetriikat(
String hakukohdeOid,
Map<String, Optional<Duration>> waitDurations,
Map<String, Optional<Duration>> invokeDurations) {
Map<String, Duration> waitDurations,
Map<String, Duration> invokeDurations) {
Collection<MetricDatum> datums = new ArrayList<>();
datums.addAll(
waitDurations.entrySet().stream()
.map(
e ->
MetricDatum.builder()
.metricName("odotus")
.value((double) e.getValue().get().toMillis())
.value((double) e.getValue().toMillis())
.storageResolution(60)
.dimensions(
List.of(Dimension.builder().name("vaihe").value(e.getKey()).build()))
Expand All @@ -230,17 +261,17 @@ private void tallennaLokitJaMetriikat(
invokeDurations.entrySet().stream()
.map(
e -> {
if (e.getValue().isPresent()) {
if (e.getValue() == ConcurrencyLimiter.ERROR) {
return MetricDatum.builder()
.metricName("kesto")
.value((double) e.getValue().get().toMillis())
.metricName("errors")
.value(1.0)
.storageResolution(60)
.dimensions(
List.of(Dimension.builder().name("vaihe").value(e.getKey()).build()))
.timestamp(Instant.now())
.unit(StandardUnit.MILLISECONDS)
.unit(StandardUnit.COUNT)
.build();
} else {
} else if (e.getValue() == ConcurrencyLimiter.TIMEOUT) {
return MetricDatum.builder()
.metricName("timeouts")
.value(1.0)
Expand All @@ -250,6 +281,16 @@ private void tallennaLokitJaMetriikat(
.timestamp(Instant.now())
.unit(StandardUnit.COUNT)
.build();
} else {
return MetricDatum.builder()
.metricName("kesto")
.value((double) e.getValue().toMillis())
.storageResolution(60)
.dimensions(
List.of(Dimension.builder().name("vaihe").value(e.getKey()).build()))
.timestamp(Instant.now())
.unit(StandardUnit.MILLISECONDS)
.build();
}
})
.collect(Collectors.toList()));
Expand All @@ -268,36 +309,50 @@ private void tallennaLokitJaMetriikat(
+ hakukohdeOid
+ ": "
+ waitDurations.entrySet().stream()
.map(
e ->
e.getKey()
+ ":"
+ e.getValue().map(d -> d.toMillis() + "").orElse("timeout"))
.map(e -> e.getKey() + ":" + ConcurrencyLimiter.asLabel(e.getValue()))
.collect(Collectors.joining(", ")));

LOG.info(
"Kestot: Hakukohde: "
+ hakukohdeOid
+ ": "
+ invokeDurations.entrySet().stream()
.map(
e ->
e.getKey()
+ ":"
+ e.getValue().map(d -> d.toMillis() + "").orElse("timeout"))
.map(e -> e.getKey() + ":" + ConcurrencyLimiter.asLabel(e.getValue()))
.collect(Collectors.joining(", ")));
}

@Scheduled(initialDelay = 15, fixedDelay = 15, timeUnit = TimeUnit.SECONDS)
public void tallennaMaarat() {
Collection<MetricDatum> datums =
Collection<MetricDatum> datums = new ArrayList<>();

datums.addAll(
this.limiters.values().stream()
.filter(limiter -> limiter.getActive() > 0)
.map(
limiter ->
MetricDatum.builder()
.metricName("active")
.value((double) limiter.getActive())
.storageResolution(1)
.dimensions(
List.of(
Dimension.builder()
.name("vaihe")
.value(limiter.getVaihe())
.build()))
.timestamp(Instant.now())
.unit(StandardUnit.COUNT)
.build())
.toList());

datums.addAll(
this.limiters.values().stream()
.filter(limiter -> limiter.getOngoing() > 0)
.filter(limiter -> limiter.getWaiting() > 0)
.map(
limiter ->
MetricDatum.builder()
.metricName("ongoing")
.value((double) limiter.getOngoing())
.metricName("waiting")
.value((double) limiter.getWaiting())
.storageResolution(1)
.dimensions(
List.of(
Expand All @@ -308,7 +363,7 @@ public void tallennaMaarat() {
.timestamp(Instant.now())
.unit(StandardUnit.COUNT)
.build())
.toList();
.toList());

if (!datums.isEmpty()) {
CompletableFuture.supplyAsync(
Expand Down Expand Up @@ -464,8 +519,8 @@ public CompletableFuture<LaskeDTO> fetchResourcesForOneLaskenta(
Date nyt) {

Instant start = Instant.now();
Map<String, Optional<Duration>> waitDurations = new ConcurrentHashMap<>();
Map<String, Optional<Duration>> invokeDurations = new ConcurrentHashMap<>();
Map<String, Duration> waitDurations = new ConcurrentHashMap<>();
Map<String, Duration> invokeDurations = new ConcurrentHashMap<>();

final CompletableFuture<ParametritDTO> parametritDTOFuture =
this.parametritLimiter.withConcurrencyLimit(
Expand Down Expand Up @@ -673,7 +728,7 @@ public CompletableFuture<LaskeDTO> fetchResourcesForOneLaskenta(
.thenApplyAsync(
laskeDTO -> {
laskeDTO.populoiSuoritustiedotHakemuksille(suoritustiedotDTO);
invokeDurations.put("Total", Optional.of(Duration.between(start, Instant.now())));
invokeDurations.put("Total", Duration.between(start, Instant.now()));
LOG.info(
"Haettiin lähtötiedot hakukohteelle "
+ hakukohdeOid
Expand All @@ -689,8 +744,7 @@ public CompletableFuture<LaskeDTO> fetchResourcesForOneLaskenta(
.exceptionally(
ex -> {
if (ex instanceof TimeoutException) {
invokeDurations.put(
"Total (timeout)", Optional.of(Duration.between(start, Instant.now())));
invokeDurations.put("Total (timeout)", Duration.between(start, Instant.now()));
this.tallennaLokitJaMetriikat(hakukohdeOid, waitDurations, invokeDurations);
}
throw new RuntimeException(ex);
Expand Down

0 comments on commit 6266569

Please sign in to comment.