From 6266569a7dbef110d002ee8388f0cc9039612356 Mon Sep 17 00:00:00 2001 From: Santeri Korri Date: Mon, 25 Nov 2024 14:18:34 +0200 Subject: [PATCH] OK-611 Refaktoroitu metriikoita --- .../actor/LaskentaResurssiProvider.java | 136 ++++++++++++------ 1 file changed, 95 insertions(+), 41 deletions(-) diff --git a/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/LaskentaResurssiProvider.java b/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/LaskentaResurssiProvider.java index 14a3b4948..262833c31 100644 --- a/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/LaskentaResurssiProvider.java +++ b/src/main/java/fi/vm/sade/valinta/kooste/valintalaskenta/actor/LaskentaResurssiProvider.java @@ -128,18 +128,23 @@ public LaskentaResurssiProvider( static class ConcurrencyLimiter { + public static final Duration TIMEOUT = Duration.ofMillis(Long.MIN_VALUE + 1); + public static final Duration ERROR = Duration.ofMillis(Long.MIN_VALUE + 2); + private int maxPermits; private final String vaihe; private final Semaphore semaphore; private final ExecutorService executor; - private final AtomicInteger ongoing; + private final AtomicInteger waiting; + private final AtomicInteger active; public ConcurrencyLimiter(int permits, String vaihe, ExecutorService executor) { this.vaihe = vaihe; this.maxPermits = permits; this.semaphore = new Semaphore(permits, true); this.executor = executor; - this.ongoing = new AtomicInteger(0); + this.waiting = new AtomicInteger(0); + this.active = new AtomicInteger(0); } public void setMaxPermits(int newPermits) { @@ -155,36 +160,62 @@ public void setMaxPermits(int newPermits) { } } - public int getOngoing() { - return this.ongoing.get(); + public int getWaiting() { + return this.waiting.get(); + } + + public int getActive() { + return this.active.get(); } public String getVaihe() { return this.vaihe; } + public static String asLabel(Duration duration) { + if (duration == TIMEOUT) { + return "timeout"; + } else if (duration == ERROR) { + return "error"; + } + return duration.toMillis() + ""; + } + public CompletableFuture withConcurrencyLimit( int permits, - Map> waitDurations, - Map> invokeDurations, + Map waitDurations, + Map invokeDurations, Supplier> supplier) { - invokeDurations.put(this.vaihe, Optional.empty()); // invoket voi timeoutata Instant waitStart = Instant.now(); - this.ongoing.incrementAndGet(); + this.waiting.incrementAndGet(); return CompletableFuture.supplyAsync( () -> { this.semaphore.acquireUninterruptibly(Math.min(this.maxPermits, permits)); + this.waiting.decrementAndGet(); + this.active.incrementAndGet(); try { Instant invokeStart = Instant.now(); - waitDurations.put(this.vaihe, Optional.of(Duration.between(waitStart, invokeStart))); - T result = supplier.get().join(); - invokeDurations.put( - this.vaihe, Optional.of(Duration.between(invokeStart, Instant.now()))); + waitDurations.put(this.vaihe, Duration.between(waitStart, invokeStart)); + T result = + supplier + .get() + .exceptionallyAsync( + e -> { + if (e instanceof TimeoutException) { + invokeDurations.put(this.vaihe, TIMEOUT); + } else { + invokeDurations.put(this.vaihe, ERROR); + } + throw new CompletionException(e); + }, + this.executor) + .join(); + invokeDurations.put(this.vaihe, Duration.between(invokeStart, Instant.now())); return result; } finally { semaphore.release(permits); - this.ongoing.decrementAndGet(); + this.active.decrementAndGet(); } }, this.executor); @@ -208,8 +239,8 @@ public void lueParametrit() { private void tallennaLokitJaMetriikat( String hakukohdeOid, - Map> waitDurations, - Map> invokeDurations) { + Map waitDurations, + Map invokeDurations) { Collection datums = new ArrayList<>(); datums.addAll( waitDurations.entrySet().stream() @@ -217,7 +248,7 @@ private void tallennaLokitJaMetriikat( e -> MetricDatum.builder() .metricName("odotus") - .value((double) e.getValue().get().toMillis()) + .value((double) e.getValue().toMillis()) .storageResolution(60) .dimensions( List.of(Dimension.builder().name("vaihe").value(e.getKey()).build())) @@ -230,17 +261,17 @@ private void tallennaLokitJaMetriikat( invokeDurations.entrySet().stream() .map( e -> { - if (e.getValue().isPresent()) { + if (e.getValue() == ConcurrencyLimiter.ERROR) { return MetricDatum.builder() - .metricName("kesto") - .value((double) e.getValue().get().toMillis()) + .metricName("errors") + .value(1.0) .storageResolution(60) .dimensions( List.of(Dimension.builder().name("vaihe").value(e.getKey()).build())) .timestamp(Instant.now()) - .unit(StandardUnit.MILLISECONDS) + .unit(StandardUnit.COUNT) .build(); - } else { + } else if (e.getValue() == ConcurrencyLimiter.TIMEOUT) { return MetricDatum.builder() .metricName("timeouts") .value(1.0) @@ -250,6 +281,16 @@ private void tallennaLokitJaMetriikat( .timestamp(Instant.now()) .unit(StandardUnit.COUNT) .build(); + } else { + return MetricDatum.builder() + .metricName("kesto") + .value((double) e.getValue().toMillis()) + .storageResolution(60) + .dimensions( + List.of(Dimension.builder().name("vaihe").value(e.getKey()).build())) + .timestamp(Instant.now()) + .unit(StandardUnit.MILLISECONDS) + .build(); } }) .collect(Collectors.toList())); @@ -268,11 +309,7 @@ private void tallennaLokitJaMetriikat( + hakukohdeOid + ": " + waitDurations.entrySet().stream() - .map( - e -> - e.getKey() - + ":" - + e.getValue().map(d -> d.toMillis() + "").orElse("timeout")) + .map(e -> e.getKey() + ":" + ConcurrencyLimiter.asLabel(e.getValue())) .collect(Collectors.joining(", "))); LOG.info( @@ -280,24 +317,42 @@ private void tallennaLokitJaMetriikat( + hakukohdeOid + ": " + invokeDurations.entrySet().stream() - .map( - e -> - e.getKey() - + ":" - + e.getValue().map(d -> d.toMillis() + "").orElse("timeout")) + .map(e -> e.getKey() + ":" + ConcurrencyLimiter.asLabel(e.getValue())) .collect(Collectors.joining(", "))); } @Scheduled(initialDelay = 15, fixedDelay = 15, timeUnit = TimeUnit.SECONDS) public void tallennaMaarat() { - Collection datums = + Collection datums = new ArrayList<>(); + + datums.addAll( + this.limiters.values().stream() + .filter(limiter -> limiter.getActive() > 0) + .map( + limiter -> + MetricDatum.builder() + .metricName("active") + .value((double) limiter.getActive()) + .storageResolution(1) + .dimensions( + List.of( + Dimension.builder() + .name("vaihe") + .value(limiter.getVaihe()) + .build())) + .timestamp(Instant.now()) + .unit(StandardUnit.COUNT) + .build()) + .toList()); + + datums.addAll( this.limiters.values().stream() - .filter(limiter -> limiter.getOngoing() > 0) + .filter(limiter -> limiter.getWaiting() > 0) .map( limiter -> MetricDatum.builder() - .metricName("ongoing") - .value((double) limiter.getOngoing()) + .metricName("waiting") + .value((double) limiter.getWaiting()) .storageResolution(1) .dimensions( List.of( @@ -308,7 +363,7 @@ public void tallennaMaarat() { .timestamp(Instant.now()) .unit(StandardUnit.COUNT) .build()) - .toList(); + .toList()); if (!datums.isEmpty()) { CompletableFuture.supplyAsync( @@ -464,8 +519,8 @@ public CompletableFuture fetchResourcesForOneLaskenta( Date nyt) { Instant start = Instant.now(); - Map> waitDurations = new ConcurrentHashMap<>(); - Map> invokeDurations = new ConcurrentHashMap<>(); + Map waitDurations = new ConcurrentHashMap<>(); + Map invokeDurations = new ConcurrentHashMap<>(); final CompletableFuture parametritDTOFuture = this.parametritLimiter.withConcurrencyLimit( @@ -673,7 +728,7 @@ public CompletableFuture fetchResourcesForOneLaskenta( .thenApplyAsync( laskeDTO -> { laskeDTO.populoiSuoritustiedotHakemuksille(suoritustiedotDTO); - invokeDurations.put("Total", Optional.of(Duration.between(start, Instant.now()))); + invokeDurations.put("Total", Duration.between(start, Instant.now())); LOG.info( "Haettiin lähtötiedot hakukohteelle " + hakukohdeOid @@ -689,8 +744,7 @@ public CompletableFuture fetchResourcesForOneLaskenta( .exceptionally( ex -> { if (ex instanceof TimeoutException) { - invokeDurations.put( - "Total (timeout)", Optional.of(Duration.between(start, Instant.now()))); + invokeDurations.put("Total (timeout)", Duration.between(start, Instant.now())); this.tallennaLokitJaMetriikat(hakukohdeOid, waitDurations, invokeDurations); } throw new RuntimeException(ex);