From 6bd0d5313ddfbc2477487c2032f2c647c9df9c99 Mon Sep 17 00:00:00 2001 From: Jinsong Wang <64242752+jinsongo@users.noreply.github.com> Date: Thu, 24 Oct 2024 14:44:58 -0700 Subject: [PATCH] Improve the aggregation logic for tokens and duration (#63) * Improve the aggregation logic for tokens and duration * Use 10000 times the cost value * Add aiSystem as part of modelKey --- llm/config/config.yaml | 4 +- .../com/instana/dc/llm/AbstractLLMDc.java | 47 ++-- .../com/instana/dc/llm/DataCollector.java | 7 +- .../com/instana/dc/llm/LLMDcRegistry.java | 4 +- .../java/com/instana/dc/llm/LLMDcUtil.java | 3 +- .../instana/dc/llm/LLMRawMetricRegistry.java | 25 +- .../com/instana/dc/llm/impl/llm/LLMDc.java | 201 ++++++-------- .../llm/impl/llm/MetricsCollectorService.java | 245 ++++++++++++++---- 8 files changed, 331 insertions(+), 205 deletions(-) diff --git a/llm/config/config.yaml b/llm/config/config.yaml index 8bf6c8f..924bd6d 100644 --- a/llm/config/config.yaml +++ b/llm/config/config.yaml @@ -1,7 +1,9 @@ llm.application: LLM_DC instances: - - otel.backend.url: http://localhost:4317 + - otel.agentless.mode: true + otel.backend.url: http://:4317 + callback.interval: 10 otel.service.name: DC1 otel.service.port: 8000 #Only configure the settings of the AI provider you are using diff --git a/llm/src/main/java/com/instana/dc/llm/AbstractLLMDc.java b/llm/src/main/java/com/instana/dc/llm/AbstractLLMDc.java index b60fd69..9c02b23 100644 --- a/llm/src/main/java/com/instana/dc/llm/AbstractLLMDc.java +++ b/llm/src/main/java/com/instana/dc/llm/AbstractLLMDc.java @@ -4,14 +4,14 @@ */ package com.instana.dc.llm; -import com.instana.dc.AbstractDc; -import com.instana.dc.llm.DataCollector.CustomDcConfig; -import com.instana.dc.resources.ContainerResource; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.resources.Resource; +import static com.instana.dc.DcUtil.CALLBACK_INTERVAL; +import static com.instana.dc.DcUtil.INSTANA_PLUGIN; +import static com.instana.dc.DcUtil.OTEL_BACKEND_URL; +import static com.instana.dc.DcUtil.OTEL_BACKEND_USING_HTTP; +import static com.instana.dc.DcUtil.OTEL_SERVICE_NAME; +import static com.instana.dc.DcUtil.POLLING_INTERVAL; +import static com.instana.dc.DcUtil.mergeResourceAttributesFromEnv; +import static io.opentelemetry.api.common.AttributeKey.stringKey; import java.net.InetAddress; import java.net.UnknownHostException; @@ -21,9 +21,16 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Logger; -import static com.instana.dc.DcUtil.*; -import static com.instana.dc.llm.LLMDcUtil.*; -import static io.opentelemetry.api.common.AttributeKey.stringKey; +import com.instana.dc.AbstractDc; +import com.instana.dc.llm.DataCollector.CustomDcConfig; +import com.instana.dc.resources.ContainerResource; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.semconv.ResourceAttributes; public abstract class AbstractLLMDc extends AbstractDc { private static final Logger logger = Logger.getLogger(AbstractLLMDc.class.getName()); @@ -36,9 +43,8 @@ public abstract class AbstractLLMDc extends AbstractDc { private String serviceInstanceId; private CustomDcConfig cdcConfig; - // Used the fixed 10 seconds for poll interval - public static final int LLM_POLL_INTERVAL = 10; - public static final int LLM_CLBK_INTERVAL = 10; + public static final int DEFAULT_LLM_POLL_INTERVAL = 10; + public static final int DEFAULT_LLM_CLBK_INTERVAL = 10; private final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); @@ -52,10 +58,8 @@ public String getHostName() { public AbstractLLMDc(Map properties, CustomDcConfig cdcConfig) { super(new LLMRawMetricRegistry().getMap()); - // pollInterval = (Integer) properties.getOrDefault(POLLING_INTERVAL, DEFAULT_POLL_INTERVAL); - // callbackInterval = (Integer) properties.getOrDefault(CALLBACK_INTERVAL, DEFAULT_CALLBACK_INTERVAL); - pollInterval = LLM_POLL_INTERVAL; - callbackInterval = LLM_CLBK_INTERVAL; + callbackInterval = (Integer) properties.getOrDefault(CALLBACK_INTERVAL, DEFAULT_LLM_CLBK_INTERVAL); + pollInterval = (Integer) properties.getOrDefault(POLLING_INTERVAL, callbackInterval); otelBackendUrl = (String) properties.get(OTEL_BACKEND_URL); otelUsingHttp = (Boolean) properties.getOrDefault(OTEL_BACKEND_USING_HTTP, Boolean.FALSE); serviceName = (String) properties.get(OTEL_SERVICE_NAME); @@ -68,12 +72,11 @@ public Resource getResourceAttributes() { Resource resource = Resource.getDefault() .merge(Resource.create(Attributes.of( stringKey("llm.platform"), "LLM", - stringKey(SERVICE_NAME), serviceName, - stringKey(SERVICE_INSTANCE_ID), serviceInstanceId + ResourceAttributes.SERVICE_NAME, serviceName, + ResourceAttributes.SERVICE_INSTANCE_ID, serviceInstanceId ))) .merge(Resource.create(Attributes.of( - stringKey("INSTANA_PLUGIN"), // com.instana.agent.sensorsdk.semconv.ResourceAttributes.INSTANA_PLUGIN - "llmonitor" // com.instana.agent.sensorsdk.semconv.ResourceAttributes.LLM + stringKey(INSTANA_PLUGIN), "llmonitor" ))); resource = resource.merge(ContainerResource.get()); diff --git a/llm/src/main/java/com/instana/dc/llm/DataCollector.java b/llm/src/main/java/com/instana/dc/llm/DataCollector.java index 8a8f575..4b85465 100644 --- a/llm/src/main/java/com/instana/dc/llm/DataCollector.java +++ b/llm/src/main/java/com/instana/dc/llm/DataCollector.java @@ -4,6 +4,10 @@ */ package com.instana.dc.llm; +import static com.instana.dc.DcUtil.CONFIG_ENV; +import static com.instana.dc.DcUtil.CONFIG_YAML; +import static com.instana.dc.DcUtil.LOGGING_PROP; + import java.io.File; import java.util.ArrayList; import java.util.List; @@ -14,9 +18,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import static com.instana.dc.DcUtil.CONFIG_ENV; -import static com.instana.dc.DcUtil.CONFIG_YAML; -import static com.instana.dc.DcUtil.LOGGING_PROP; import com.instana.dc.IDc; public class DataCollector { diff --git a/llm/src/main/java/com/instana/dc/llm/LLMDcRegistry.java b/llm/src/main/java/com/instana/dc/llm/LLMDcRegistry.java index c62c71a..c2e5c54 100644 --- a/llm/src/main/java/com/instana/dc/llm/LLMDcRegistry.java +++ b/llm/src/main/java/com/instana/dc/llm/LLMDcRegistry.java @@ -4,12 +4,10 @@ */ package com.instana.dc.llm; -import com.instana.dc.llm.AbstractLLMDc; -import com.instana.dc.llm.impl.llm.LLMDc; - import java.util.HashMap; import java.util.Map; +import com.instana.dc.llm.impl.llm.LLMDc; public class LLMDcRegistry { /* Add all DataCollector implementations here: **/ diff --git a/llm/src/main/java/com/instana/dc/llm/LLMDcUtil.java b/llm/src/main/java/com/instana/dc/llm/LLMDcUtil.java index ea70c09..dba4725 100644 --- a/llm/src/main/java/com/instana/dc/llm/LLMDcUtil.java +++ b/llm/src/main/java/com/instana/dc/llm/LLMDcUtil.java @@ -13,8 +13,6 @@ public class LLMDcUtil { */ public static final String DEFAULT_INSTRUMENTATION_SCOPE = "instana.sensor-sdk.dc.llm"; public static final String DEFAULT_INSTRUMENTATION_SCOPE_VER = "1.0.0"; - public static final String SERVICE_NAME = "service.name"; - public static final String SERVICE_INSTANCE_ID = "service.instance.id"; public final static String WATSONX_PRICE_PROMPT_TOKES_PER_KILO = "watsonx.price.prompt.tokens.per.kilo"; public final static String WATSONX_PRICE_COMPLETE_TOKES_PER_KILO = "watsonx.price.complete.tokens.per.kilo"; public final static String OPENAI_PRICE_PROMPT_TOKES_PER_KILO = "openai.price.prompt.tokens.per.kilo"; @@ -24,6 +22,7 @@ public class LLMDcUtil { public final static String BEDROCK_PRICE_PROMPT_TOKES_PER_KILO = "bedrock.price.prompt.tokens.per.kilo"; public final static String BEDROCK_PRICE_COMPLETE_TOKES_PER_KILO = "bedrock.price.complete.tokens.per.kilo"; public final static String SERVICE_LISTEN_PORT = "otel.service.port"; + public final static String OTEL_AGENTLESS_MODE = "otel.agentless.mode"; /* Configurations for Metrics: */ diff --git a/llm/src/main/java/com/instana/dc/llm/LLMRawMetricRegistry.java b/llm/src/main/java/com/instana/dc/llm/LLMRawMetricRegistry.java index d66d828..5caea05 100644 --- a/llm/src/main/java/com/instana/dc/llm/LLMRawMetricRegistry.java +++ b/llm/src/main/java/com/instana/dc/llm/LLMRawMetricRegistry.java @@ -4,14 +4,31 @@ */ package com.instana.dc.llm; -import com.instana.dc.RawMetric; +import static com.instana.dc.InstrumentType.GAUGE; +import static com.instana.dc.InstrumentType.UPDOWN_COUNTER; +import static com.instana.dc.llm.LLMDcUtil.LLM_COST_DESC; +import static com.instana.dc.llm.LLMDcUtil.LLM_COST_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_COST_UNIT; +import static com.instana.dc.llm.LLMDcUtil.LLM_DURATION_DESC; +import static com.instana.dc.llm.LLMDcUtil.LLM_DURATION_MAX_DESC; +import static com.instana.dc.llm.LLMDcUtil.LLM_DURATION_MAX_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_DURATION_MAX_UNIT; +import static com.instana.dc.llm.LLMDcUtil.LLM_DURATION_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_DURATION_UNIT; +import static com.instana.dc.llm.LLMDcUtil.LLM_REQ_COUNT_DESC; +import static com.instana.dc.llm.LLMDcUtil.LLM_REQ_COUNT_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_REQ_COUNT_UNIT; +import static com.instana.dc.llm.LLMDcUtil.LLM_STATUS_DESC; +import static com.instana.dc.llm.LLMDcUtil.LLM_STATUS_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_STATUS_UNIT; +import static com.instana.dc.llm.LLMDcUtil.LLM_TOKEN_DESC; +import static com.instana.dc.llm.LLMDcUtil.LLM_TOKEN_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_TOKEN_UNIT; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static com.instana.dc.InstrumentType.GAUGE; -import static com.instana.dc.InstrumentType.UPDOWN_COUNTER; -import static com.instana.dc.llm.LLMDcUtil.*; +import com.instana.dc.RawMetric; public class LLMRawMetricRegistry { private final Map map = new ConcurrentHashMap() {{ diff --git a/llm/src/main/java/com/instana/dc/llm/impl/llm/LLMDc.java b/llm/src/main/java/com/instana/dc/llm/impl/llm/LLMDc.java index d1065a5..4d0d666 100644 --- a/llm/src/main/java/com/instana/dc/llm/impl/llm/LLMDc.java +++ b/llm/src/main/java/com/instana/dc/llm/impl/llm/LLMDc.java @@ -4,6 +4,8 @@ */ package com.instana.dc.llm.impl.llm; +import static com.instana.dc.DcUtil.CALLBACK_INTERVAL; +import static com.instana.dc.DcUtil.POLLING_INTERVAL; import static com.instana.dc.llm.LLMDcUtil.ANTHROPIC_PRICE_COMPLETE_TOKES_PER_KILO; import static com.instana.dc.llm.LLMDcUtil.ANTHROPIC_PRICE_PROMPT_TOKES_PER_KILO; import static com.instana.dc.llm.LLMDcUtil.BEDROCK_PRICE_COMPLETE_TOKES_PER_KILO; @@ -16,6 +18,7 @@ import static com.instana.dc.llm.LLMDcUtil.LLM_TOKEN_NAME; import static com.instana.dc.llm.LLMDcUtil.OPENAI_PRICE_COMPLETE_TOKES_PER_KILO; import static com.instana.dc.llm.LLMDcUtil.OPENAI_PRICE_PROMPT_TOKES_PER_KILO; +import static com.instana.dc.llm.LLMDcUtil.OTEL_AGENTLESS_MODE; import static com.instana.dc.llm.LLMDcUtil.SERVICE_LISTEN_PORT; import static com.instana.dc.llm.LLMDcUtil.WATSONX_PRICE_COMPLETE_TOKES_PER_KILO; import static com.instana.dc.llm.LLMDcUtil.WATSONX_PRICE_PROMPT_TOKES_PER_KILO; @@ -40,9 +43,11 @@ public class LLMDc extends AbstractLLMDc { private static final Logger logger = Logger.getLogger(LLMDc.class.getName()); - public static final String SENSOR_NAME = "com.instana.plugin.watsonx"; private HashMap modelAggrMap = new HashMap<>(); private MetricsCollectorService metricsCollector = new MetricsCollectorService(); + private Boolean otelAgentlessMode = Boolean.FALSE; + private Integer callbackInterval = DEFAULT_LLM_CLBK_INTERVAL; + private Integer otelPollInterval = DEFAULT_LLM_POLL_INTERVAL; private Double watsonxPricePromptTokens = 0.0; private Double watsonxPriceCompleteTokens = 0.0; private Double openaiPricePromptTokens = 0.0; @@ -61,15 +66,11 @@ public class LLMDc extends AbstractLLMDc { private class ModelAggregation { private final String modelId; private final String aiSystem; - private long deltaPromptTokens; - private long deltaCompleteTokens; + private long deltaInputTokens; + private long deltaOutputTokens; private long deltaDuration; - private long deltaReqCount; - private long maxDuration; - private long lastTotalPromptTokens; - private long lastTotalCompleteTokens; - private long lastTotalDuration; - private long lastTotalReqCount; + private long deltaRequestCount; + private long maxDurationSoFar; public ModelAggregation(String modelId, String aiSystem) { this.modelId = modelId; @@ -81,90 +82,50 @@ public String getModelId() { public String getAiSystem() { return aiSystem; } - public void addDeltaPromptTokens(long currTokens, long reqCount) { - if(currTokens == 0) { - return; - } - long diffPromptTokens = 0; - if(reqCount == 1) { - diffPromptTokens = currTokens; - } else if(currTokens > lastTotalPromptTokens && lastTotalPromptTokens != 0) { - diffPromptTokens = currTokens - lastTotalPromptTokens; - } - lastTotalPromptTokens = currTokens; - deltaPromptTokens += diffPromptTokens; + public long getDeltaInputTokens() { + return deltaInputTokens; } - - public long getDeltaPromptTokens() { - return deltaPromptTokens; - } - public void addDeltaCompleteTokens(long currTokens, long reqCount) { - if(currTokens == 0) { - return; - } - long diffCompleteTokens = 0; - if(reqCount == 1) { - diffCompleteTokens = currTokens; - } else if(currTokens > lastTotalCompleteTokens && lastTotalCompleteTokens != 0) { - diffCompleteTokens = currTokens - lastTotalCompleteTokens; - } - lastTotalCompleteTokens = currTokens; - deltaCompleteTokens += diffCompleteTokens; - } - public long getDeltaCompleteTokens() { - return deltaCompleteTokens; - } - public void addDeltaDuration(long currDuration, long reqCount) { - if(currDuration == 0) { - return; - } - long diffDuration = 0; - if(reqCount == 1) { - diffDuration = currDuration; - } else if(currDuration > lastTotalDuration && lastTotalDuration != 0) { - diffDuration = currDuration - lastTotalDuration; - } - lastTotalDuration = currDuration; - deltaDuration += diffDuration; + public long getDeltaOutputTokens() { + return deltaOutputTokens; } public long getDeltaDuration() { return deltaDuration; } - public void setMaxDuration(long maxDuration) { - this.maxDuration = maxDuration; + public long getDeltaRequestCount() { + return deltaRequestCount; } - public long getMaxDuration() { - return maxDuration; + public long getMaxDurationSoFar() { + return maxDurationSoFar; } - public void addDeltaReqCount(long currCount) { - if(currCount == 0) { - return; - } - long diffReqCount = 0; - if(currCount == 1) { - diffReqCount = currCount; - } else if(currCount > lastTotalReqCount && lastTotalReqCount != 0) { - diffReqCount = currCount - lastTotalReqCount; - } - lastTotalReqCount = currCount; - deltaReqCount += diffReqCount; + public void addDeltaInputTokens(long inputTokens) { + deltaInputTokens += inputTokens; + } + public void addDeltaOutputTokens(long outputTokens) { + deltaOutputTokens += outputTokens; + } + public void addDeltaDuration(long duration) { + deltaDuration += duration; } - public long getDeltaReqCount() { - return deltaReqCount; + public void addDeltaRequestCount(long requestCount) { + deltaRequestCount += requestCount; } - public long getCurrentReqCount() { - return lastTotalReqCount; + public void setMaxDurationSoFar(long maxDuration) { + maxDurationSoFar = maxDuration; } - public void resetMetrics() { - deltaPromptTokens = 0; - deltaCompleteTokens = 0; + + public void resetDeltaValues() { + deltaInputTokens = 0; + deltaOutputTokens = 0; deltaDuration = 0; - deltaReqCount = 0; + deltaRequestCount = 0; } } public LLMDc(Map properties, CustomDcConfig cdcConfig) throws Exception { super(properties, cdcConfig); + otelAgentlessMode = (Boolean) properties.getOrDefault(OTEL_AGENTLESS_MODE, Boolean.FALSE); + callbackInterval = (Integer) properties.getOrDefault(CALLBACK_INTERVAL, DEFAULT_LLM_CLBK_INTERVAL); + otelPollInterval = (Integer) properties.getOrDefault(POLLING_INTERVAL, callbackInterval); watsonxPricePromptTokens = (Double) properties.getOrDefault(WATSONX_PRICE_PROMPT_TOKES_PER_KILO, 0.0); watsonxPriceCompleteTokens = (Double) properties.getOrDefault(WATSONX_PRICE_COMPLETE_TOKES_PER_KILO, 0.0); openaiPricePromptTokens = (Double) properties.getOrDefault(OPENAI_PRICE_PROMPT_TOKES_PER_KILO, 0.0); @@ -187,7 +148,7 @@ public void initOnce() throws ClassNotFoundException { .service( "/", (ctx, req) -> { - var requests = metricsCollector.getMetrics(); + var requests = metricsCollector.getDeltaMetricsList(); if (requests != null) { return HttpResponse.of( HttpStatus.OK, MediaType.JSON, HttpData.wrap("OK".getBytes())); @@ -215,31 +176,30 @@ public void collectData() { for(Map.Entry entry : modelAggrMap.entrySet()){ ModelAggregation aggr = entry.getValue(); - aggr.resetMetrics(); + aggr.resetDeltaValues(); } - List otelMetrics = metricsCollector.getMetrics(); - metricsCollector.clearMetrics(); + List otelMetrics = metricsCollector.getDeltaMetricsList(); + metricsCollector.resetMetricsDetla(); + for (OtelMetric metric : otelMetrics) { try { String modelId = metric.getModelId(); String aiSystem = metric.getAiSystem(); - long promptTokens = metric.getPromtTokens(); - long completeTokens = metric.getCompleteTokens(); - double duration = metric.getDuration(); - long requestCount = metric.getReqCount(); + long inputTokens = metric.getDeltaInputTokens(); + long outputTokens = metric.getDeltaOutputTokens(); + long duration = metric.getDeltaDuration(); + long requestCount = metric.getDeltaRequestCount(); ModelAggregation modelAggr = modelAggrMap.get(modelId); if (modelAggr == null) { modelAggr = new ModelAggregation(modelId, aiSystem); modelAggrMap.put(modelId, modelAggr); } - // Always handle duration first! - modelAggr.addDeltaDuration((long)(duration*1000), requestCount); - modelAggr.addDeltaReqCount(requestCount); - long currentReqCount = modelAggr.getCurrentReqCount(); - modelAggr.addDeltaPromptTokens(promptTokens, currentReqCount); - modelAggr.addDeltaCompleteTokens(completeTokens, currentReqCount); + modelAggr.addDeltaInputTokens(inputTokens); + modelAggr.addDeltaOutputTokens(outputTokens); + modelAggr.addDeltaDuration(duration); + modelAggr.addDeltaRequestCount(requestCount); } catch (Exception e) { e.printStackTrace(); @@ -251,39 +211,44 @@ public void collectData() { ModelAggregation aggr = entry.getValue(); String modelId = aggr.getModelId(); String aiSystem = aggr.getAiSystem(); - long deltaRequestCount = aggr.getDeltaReqCount(); + long deltaInputTokens = aggr.getDeltaInputTokens(); + long deltaOutputTokens = aggr.getDeltaOutputTokens(); long deltaDuration = aggr.getDeltaDuration(); - long deltaPromptTokens = aggr.getDeltaPromptTokens(); - long deltaCompleteTokens = aggr.getDeltaCompleteTokens(); - long maxDuration = aggr.getMaxDuration(); + long deltaRequestCount = aggr.getDeltaRequestCount(); + long maxDurationSoFar = aggr.getMaxDurationSoFar(); + aggr.resetDeltaValues(); - long avgDuration = deltaRequestCount == 0 ? 0 : deltaDuration/deltaRequestCount; - if(avgDuration > maxDuration) { - maxDuration = avgDuration; - aggr.setMaxDuration(maxDuration); + long avgDurationPerReq = deltaRequestCount == 0 ? 0 : deltaDuration/deltaRequestCount; + if(avgDurationPerReq > maxDurationSoFar) { + maxDurationSoFar = avgDurationPerReq; + aggr.setMaxDurationSoFar(maxDurationSoFar); } + int divisor = otelAgentlessMode? 1:otelPollInterval; - int intervalSeconds = LLM_POLL_INTERVAL; - String agentLess = System.getenv("AGENTLESS_MODE_ENABLED"); - if (agentLess != null) { - intervalSeconds = 1; - } + double priceInputTokens = getPricePromptTokens(aiSystem); + double priceOutputTokens = getPriceCompleteTokens(aiSystem); - double pricePromptTokens = getPricePromptTokens(aiSystem); - double priceCompleteTokens = getPriceCompleteTokens(aiSystem); + double intervalReqCount = (double)deltaRequestCount/divisor; + double intervalInputTokens = (double)deltaInputTokens/divisor; + double intervalOutputTokens = (double)deltaOutputTokens/divisor; + double intervalTotalTokens = intervalInputTokens + intervalOutputTokens; - double intervalReqCount = (double)deltaRequestCount/intervalSeconds; - double intervalPromptTokens = (double)deltaPromptTokens/intervalSeconds; - double intervalCompleteTokens = (double)deltaCompleteTokens/intervalSeconds; - double intervalTotalTokens = intervalPromptTokens + intervalCompleteTokens; - double intervalPromptCost = (intervalPromptTokens/1000) * pricePromptTokens; - double intervalCompleteCost = (intervalCompleteTokens/1000) * priceCompleteTokens; - double intervalTotalCost = intervalPromptCost + intervalCompleteCost; - aggr.resetMetrics(); + double intervalInputCost = intervalInputTokens/1000 * priceInputTokens; + double intervalOutputCost = intervalOutputTokens/1000 * priceOutputTokens; + double intervalTotalCost = intervalInputCost + intervalOutputCost; + // This costs are 10000 times the actual value to prevent very small numbers from being rounded off. + // And it will be adjusted to the correct value on UI. + String backwardCompatible = System.getenv("FORCE_BACKWARD_COMPATIBLE"); + if (backwardCompatible != null) { + System.out.printf("FORCE_BACKWARD_COMPATIBLE is set."); + } else { + intervalTotalCost = intervalTotalCost * 10000; + } + System.out.printf("Metrics for model %s of %s:%n", modelId, aiSystem); - System.out.println(" - Average Duration : " + avgDuration + " ms"); - System.out.println(" - Maximum Duration : " + maxDuration + " ms"); + System.out.println(" - Average Duration : " + avgDurationPerReq + " ms"); + System.out.println(" - Maximum Duration : " + maxDurationSoFar + " ms"); System.out.println(" - Interval Tokens : " + intervalTotalTokens); System.out.println(" - Interval Cost : " + intervalTotalCost); System.out.println(" - Interval Request : " + intervalReqCount); @@ -292,8 +257,8 @@ public void collectData() { attributes.put("model_id", modelId); attributes.put("ai_system", aiSystem); getRawMetric(LLM_STATUS_NAME).setValue(1); - getRawMetric(LLM_DURATION_NAME).getDataPoint(modelId).setValue(avgDuration, attributes); - getRawMetric(LLM_DURATION_MAX_NAME).getDataPoint(modelId).setValue(maxDuration, attributes); + getRawMetric(LLM_DURATION_NAME).getDataPoint(modelId).setValue(avgDurationPerReq, attributes); + getRawMetric(LLM_DURATION_MAX_NAME).getDataPoint(modelId).setValue(maxDurationSoFar, attributes); getRawMetric(LLM_COST_NAME).getDataPoint(modelId).setValue(intervalTotalCost, attributes); getRawMetric(LLM_TOKEN_NAME).getDataPoint(modelId).setValue(intervalTotalTokens, attributes); getRawMetric(LLM_REQ_COUNT_NAME).getDataPoint(modelId).setValue(intervalReqCount, attributes); diff --git a/llm/src/main/java/com/instana/dc/llm/impl/llm/MetricsCollectorService.java b/llm/src/main/java/com/instana/dc/llm/impl/llm/MetricsCollectorService.java index 73f0ad1..32e42eb 100644 --- a/llm/src/main/java/com/instana/dc/llm/impl/llm/MetricsCollectorService.java +++ b/llm/src/main/java/com/instana/dc/llm/impl/llm/MetricsCollectorService.java @@ -1,15 +1,12 @@ package com.instana.dc.llm.impl.llm; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.Objects; import java.util.logging.Logger; import com.google.common.collect.ImmutableList; -import java.util.HashMap; -import java.util.Map; - import io.grpc.stub.StreamObserver; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; @@ -29,70 +26,163 @@ class MetricsCollectorService extends MetricsServiceGrpc.MetricsServiceImplBase public class OtelMetric { private String modelId; - private long promptTokens; - private long completeTokens; - private double duration; + private double inputTokenSum; + private double outputTokenSum; + private double durationSum; private long requestCount; private String aiSystem; + private long durationStartTime; + private long inputTokenStartTime; + private long outputTokenStartTime; + private long deltaInputTokens; + private long deltaOutputTokens; + private long deltaDuration; + private long deltaRequestCount; + + public OtelMetric() {} + + public OtelMetric(OtelMetric other) { + this.modelId = other.modelId; + this.inputTokenSum = other.inputTokenSum; + this.outputTokenSum = other.outputTokenSum; + this.durationSum = other.durationSum; + this.requestCount = other.requestCount; + this.aiSystem = other.aiSystem; + this.durationStartTime = other.durationStartTime; + this.inputTokenStartTime = other.inputTokenStartTime; + this.outputTokenStartTime = other.outputTokenStartTime; + this.deltaInputTokens = other.deltaInputTokens; + this.deltaOutputTokens = other.deltaOutputTokens; + this.deltaDuration = other.deltaDuration; + this.deltaRequestCount = other.deltaRequestCount; + } public String getModelId() { return modelId; } - public long getPromtTokens() { - return promptTokens; + public String getAiSystem() { + return aiSystem; } - public long getCompleteTokens() { - return completeTokens; + public double getLastInputTokenSum() { + return inputTokenSum; } - public double getDuration() { - return duration; + public double getLastOutputTokenSum() { + return outputTokenSum; } - public long getReqCount() { + public double getLastDurationSum() { + return durationSum; + } + + public long getLastRequestCount() { return requestCount; } - public String getAiSystem() { - return aiSystem; + public long getDeltaInputTokens() { + return deltaInputTokens; + } + + public long getDeltaOutputTokens() { + return deltaOutputTokens; + } + + public long getDeltaDuration() { + return deltaDuration; + } + + public long getDeltaRequestCount() { + return deltaRequestCount; + } + + public long getLastDurationStartTime() { + return durationStartTime; + } + + public long getLastInputTokenStartTime() { + return inputTokenStartTime; + } + + public long getLastOutputTokenStartTime() { + return outputTokenStartTime; } public void setModelId(String modelId) { this.modelId = modelId; } - public void setPromptTokens(long promptTokens) { - this.promptTokens = promptTokens; + public void setAiSystem(String aiSystem) { + this.aiSystem = aiSystem; } - public void setCompleteTokens(long completeTokens) { - this.completeTokens = completeTokens; + public void setLastInputTokenSum(double inputTokenSum) { + this.inputTokenSum = inputTokenSum; } - public void setDuration(double duration) { - this.duration = duration; + public void setLastOutputTokenSum(double outputTokenSum) { + this.outputTokenSum = outputTokenSum; } - public void setReqCount(long requestCount) { + public void setLastDurationSum(double durationSum) { + this.durationSum = durationSum; + } + + public void setLastRequestCount(long requestCount) { this.requestCount = requestCount; } - public void setAiSystem(String aiSystem) { - this.aiSystem = aiSystem; + public void addDeltaInputTokens(double deltaInputTokens) { + this.deltaInputTokens += (long)deltaInputTokens; + } + + public void addDeltaOutputTokens(double deltaOutputTokens) { + this.deltaOutputTokens += (long)deltaOutputTokens; + } + + public void addDeltaDuration(double deltaDuration) { + this.deltaDuration += (long)(deltaDuration*1000); // seconds to milliseconds + } + + public void addDeltaRequestCount(long deltaRequestCount) { + this.deltaRequestCount += deltaRequestCount; + } + + public void setLastDurationStartTime(long startTime) { + this.durationStartTime = startTime; + } + + public void setLastInputTokenStartTime(long startTime) { + this.inputTokenStartTime = startTime; + } + + public void setLastOutputTokenStartTime(long startTime) { + this.outputTokenStartTime = startTime; + } + + public void resetDeltaValues() { + this.deltaInputTokens = 0; + this.deltaOutputTokens = 0; + this.deltaDuration = 0; + this.deltaRequestCount = 0; } } private HashMap exportMetrics = new HashMap<>(); - public List getMetrics() { + public List getDeltaMetricsList() { synchronized (mutex) { - return ImmutableList.copyOf(exportMetrics.values()); + return exportMetrics.values().stream() + .filter(Objects::nonNull) + .map(OtelMetric::new) + .collect(ImmutableList.toImmutableList()); } } - public void clearMetrics() { - exportMetrics.clear(); + public void resetMetricsDetla() { + for (OtelMetric metric : exportMetrics.values()) { + metric.resetDeltaValues(); + } } @Override @@ -103,6 +193,14 @@ public void export( synchronized (mutex) { List allResourceMetrics = request.getResourceMetricsList(); for (ResourceMetrics resourceMetrics : allResourceMetrics) { + Resource resource = resourceMetrics.getResource(); + String serviceName = ""; + for (KeyValue kv : resource.getAttributesList()) { + if (kv.getKey().compareTo("service.name") == 0) { + serviceName = kv.getValue().getStringValue(); + System.out.println("Recv Metric --- Service Name: " + serviceName); + } + } for (ScopeMetrics scoMetrics : resourceMetrics.getScopeMetricsList()) { InstrumentationScope instrumentationScope = scoMetrics.getScope(); instrumentationScope.getAttributesList(); @@ -112,7 +210,7 @@ public void export( System.out.println("Recv Metric --- Scope Desc: " + metric.getDescription()); switch (metric.getDataCase()) { case HISTOGRAM: - processHistogramMetrics(metric); + processHistogramMetrics(metric, serviceName); break; case SUM: case GAUGE: @@ -121,6 +219,7 @@ public void export( System.out.println("Skip Metric DataCase: " + metric.getDataCase()); } } + System.out.println(""); } } } @@ -128,15 +227,15 @@ public void export( responseObserver.onCompleted(); } - private void processHistogramMetrics(Metric metric) { + private void processHistogramMetrics(Metric metric, String serviceName) { if (metric.getName().compareTo("gen_ai.client.token.usage") == 0) { - processTokenMetric(metric); + processTokenMetric(metric, serviceName); } else if (metric.getName().compareTo("gen_ai.client.operation.duration") == 0) { - processDurationMetric(metric); + processDurationMetric(metric, serviceName); } } - private void processTokenMetric(Metric metric) { + private void processTokenMetric(Metric metric, String serviceName) { List histDataPoints = metric.getHistogram().getDataPointsList(); for (HistogramDataPoint dataPoint : histDataPoints) { List kvList = dataPoint.getAttributesList(); @@ -152,30 +251,54 @@ private void processTokenMetric(Metric metric) { System.out.println("Recv Metric --- AI System: " + aiSystem); } else if (kv.getKey().compareTo("gen_ai.token.type") == 0) { tokenType = kv.getValue().getStringValue(); + System.out.println("Recv Metric --- Token Type: " + tokenType); } } if (!modelId.isEmpty()) { - OtelMetric otelMetric = exportMetrics.get(modelId); + double tokenSum = dataPoint.getSum(); + long requestCount = dataPoint.getCount(); + long startTime = dataPoint.getStartTimeUnixNano(); + long endTime = dataPoint.getTimeUnixNano(); + System.out.println("Recv Metric --- Token Sum: " + tokenSum); + System.out.println("Recv Metric --- Request Count: " + requestCount); + System.out.println("Recv Metric --- Start Time : " + startTime); + System.out.println("Recv Metric --- End Time : " + endTime); + + String modelKey = String.format("%s:%s:%s", serviceName, aiSystem, modelId); + OtelMetric otelMetric = exportMetrics.get(modelKey); if (otelMetric == null ) { otelMetric = new OtelMetric(); - exportMetrics.put(modelId, otelMetric); + exportMetrics.put(modelKey, otelMetric); otelMetric.setModelId(modelId); otelMetric.setAiSystem(aiSystem); } + if (tokenType.compareTo("input") == 0) { - long inputTokens = (long) dataPoint.getSum(); - otelMetric.setPromptTokens(inputTokens); - System.out.println("Recv Metric --- Token Input: " + inputTokens); + long lastStartTime = otelMetric.getLastInputTokenStartTime(); + if (startTime != lastStartTime) { + otelMetric.setLastInputTokenStartTime(startTime); + otelMetric.addDeltaInputTokens(tokenSum); + } else { + double lastInputTokenSum = otelMetric.getLastInputTokenSum(); + otelMetric.addDeltaInputTokens(tokenSum - lastInputTokenSum); + } + otelMetric.setLastInputTokenSum(tokenSum); } else if (tokenType.compareTo("output") == 0) { - long outputTokens = (long) dataPoint.getSum(); - otelMetric.setCompleteTokens(outputTokens); - System.out.println("Recv Metric --- Token Output: " + outputTokens); + long lastStartTime = otelMetric.getLastOutputTokenStartTime(); + if (startTime != lastStartTime) { + otelMetric.setLastOutputTokenStartTime(startTime); + otelMetric.addDeltaOutputTokens(tokenSum); + } else { + double lastOutputTokenSum = otelMetric.getLastOutputTokenSum(); + otelMetric.addDeltaOutputTokens(tokenSum - lastOutputTokenSum); + } + otelMetric.setLastOutputTokenSum(tokenSum); } } } } - private void processDurationMetric(Metric metric) { + private void processDurationMetric(Metric metric, String serviceName) { List histDataPoints = metric.getHistogram().getDataPointsList(); for (HistogramDataPoint dataPoint : histDataPoints) { List kvList = dataPoint.getAttributesList(); @@ -191,19 +314,37 @@ private void processDurationMetric(Metric metric) { } } if (!modelId.isEmpty()) { - OtelMetric otelMetric = exportMetrics.get(modelId); + double durationSum = dataPoint.getSum(); + long requestCount = dataPoint.getCount(); + long startTime = dataPoint.getStartTimeUnixNano(); + long endTime = dataPoint.getTimeUnixNano(); + System.out.println("Recv Metric --- Duration Sum: " + durationSum); + System.out.println("Recv Metric --- Request Count: " + requestCount); + System.out.println("Recv Metric --- Start Time : " + startTime); + System.out.println("Recv Metric --- End Time : " + endTime); + + String modelKey = String.format("%s:%s:%s", serviceName, aiSystem, modelId); + OtelMetric otelMetric = exportMetrics.get(modelKey); if (otelMetric == null ) { otelMetric = new OtelMetric(); - exportMetrics.put(modelId, otelMetric); + exportMetrics.put(modelKey, otelMetric); otelMetric.setModelId(modelId); otelMetric.setAiSystem(aiSystem); } - Double durationSum = dataPoint.getSum(); - long requestCount = dataPoint.getCount(); - otelMetric.setDuration(durationSum); - otelMetric.setReqCount(requestCount); - System.out.println("Recv Metric --- Duration Sum: " + durationSum); - System.out.println("Recv Metric --- Duration Count: " + requestCount); + + long lastStartTime = otelMetric.getLastDurationStartTime(); + double lastDurationSum = otelMetric.getLastDurationSum(); + long lastRequestCount = otelMetric.getLastRequestCount(); + if (startTime != lastStartTime) { + otelMetric.setLastDurationStartTime(startTime); + otelMetric.addDeltaDuration(durationSum); + otelMetric.addDeltaRequestCount(requestCount); + } else { + otelMetric.addDeltaDuration(durationSum - lastDurationSum); + otelMetric.addDeltaRequestCount(requestCount - lastRequestCount); + } + otelMetric.setLastDurationSum(durationSum); + otelMetric.setLastRequestCount(requestCount); } } }