diff --git a/.gitignore b/.gitignore index a5fdfe8..9593f1c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .gradle/ .idea/ build/ +*.class diff --git a/llm/.vscode/settings.json b/llm/.vscode/settings.json new file mode 100644 index 0000000..7b016a8 --- /dev/null +++ b/llm/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.compile.nullAnalysis.mode": "automatic" +} \ No newline at end of file diff --git a/llm/src/main/java/com/instana/dc/llm/DataCollector.java b/llm/src/main/java/com/instana/dc/llm/DataCollector.java index ca807dc..8a8f575 100644 --- a/llm/src/main/java/com/instana/dc/llm/DataCollector.java +++ b/llm/src/main/java/com/instana/dc/llm/DataCollector.java @@ -4,12 +4,6 @@ */ package com.instana.dc.llm; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.instana.dc.IDc; -import com.instana.dc.llm.LLMDcRegistry; - import java.io.File; import java.util.ArrayList; import java.util.List; @@ -17,7 +11,13 @@ import java.util.logging.Level; import java.util.logging.Logger; -import static com.instana.dc.DcUtil.*; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import static com.instana.dc.DcUtil.CONFIG_ENV; +import static com.instana.dc.DcUtil.CONFIG_YAML; +import static com.instana.dc.DcUtil.LOGGING_PROP; +import com.instana.dc.IDc; public class DataCollector { static { diff --git a/llm/src/main/java/com/instana/dc/llm/impl/llm/LLMDc.java b/llm/src/main/java/com/instana/dc/llm/impl/llm/LLMDc.java index 91428d5..4eb0306 100644 --- a/llm/src/main/java/com/instana/dc/llm/impl/llm/LLMDc.java +++ b/llm/src/main/java/com/instana/dc/llm/impl/llm/LLMDc.java @@ -4,13 +4,27 @@ */ package com.instana.dc.llm.impl.llm; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + import com.instana.dc.llm.AbstractLLMDc; import com.instana.dc.llm.DataCollector.CustomDcConfig; +import static com.instana.dc.llm.LLMDcUtil.ANTHROPIC_PRICE_COMPLETE_TOKES_PER_KILO; +import static com.instana.dc.llm.LLMDcUtil.ANTHROPIC_PRICE_PROMPT_TOKES_PER_KILO; +import static com.instana.dc.llm.LLMDcUtil.LLM_COST_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_DURATION_MAX_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_DURATION_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_REQ_COUNT_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_STATUS_NAME; +import static com.instana.dc.llm.LLMDcUtil.LLM_TOKEN_NAME; +import static com.instana.dc.llm.LLMDcUtil.OPENAI_PRICE_COMPLETE_TOKES_PER_KILO; +import static com.instana.dc.llm.LLMDcUtil.OPENAI_PRICE_PROMPT_TOKES_PER_KILO; +import static com.instana.dc.llm.LLMDcUtil.SERVICE_LISTEN_PORT; +import static com.instana.dc.llm.LLMDcUtil.WATSONX_PRICE_COMPLETE_TOKES_PER_KILO; +import static com.instana.dc.llm.LLMDcUtil.WATSONX_PRICE_PROMPT_TOKES_PER_KILO; import com.instana.dc.llm.impl.llm.MetricsCollectorService.OtelMetric; - -import java.util.logging.Logger; -import java.util.*; - import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; @@ -19,9 +33,6 @@ import com.linecorp.armeria.server.grpc.GrpcService; import com.linecorp.armeria.server.healthcheck.HealthCheckService; -//import static com.instana.agent.sensorsdk.semconv.SemanticAttributes.*; -import static com.instana.dc.llm.LLMDcUtil.*; - @SuppressWarnings("null") public class LLMDc extends AbstractLLMDc { private static final Logger logger = Logger.getLogger(LLMDc.class.getName()); @@ -204,12 +215,11 @@ public void collectData() { metricsCollector.clearMetrics(); for (OtelMetric metric : otelMetrics) { try { - double duration = metric.getDuration(); - if(duration == 0.0) { - continue; - } String modelId = metric.getModelId(); String aiSystem = metric.getAiSystem(); + long promptTokens = metric.getPromtTokens(); + long completeTokens = metric.getCompleteTokens(); + double duration = metric.getDuration(); long requestCount = metric.getReqCount(); ModelAggregation modelAggr = modelAggrMap.get(modelId); @@ -217,33 +227,13 @@ public void collectData() { modelAggr = new ModelAggregation(modelId, aiSystem); modelAggrMap.put(modelId, modelAggr); } + // Always handle duration first! modelAggr.addDeltaDuration((long)(duration*1000), requestCount); modelAggr.addDeltaReqCount(requestCount); - } catch (Exception e) { - e.printStackTrace(); - } - } - for (OtelMetric metric : otelMetrics) { - try { - String modelId = metric.getModelId(); - String aiSystem = metric.getAiSystem(); - long promptTokens = metric.getPromtTokens(); - long completeTokens = metric.getCompleteTokens(); - if(promptTokens == 0 && completeTokens == 0) { - continue; - } - ModelAggregation modelAggr = modelAggrMap.get(modelId); - if (modelAggr == null) { - modelAggr = new ModelAggregation(modelId, aiSystem); - modelAggrMap.put(modelId, modelAggr); - } long currentReqCount = modelAggr.getCurrentReqCount(); - if(promptTokens > 0) { - modelAggr.addDeltaPromptTokens(promptTokens, currentReqCount); - } - if(completeTokens > 0) { - modelAggr.addDeltaCompleteTokens(completeTokens, currentReqCount); - } + modelAggr.addDeltaPromptTokens(promptTokens, currentReqCount); + modelAggr.addDeltaCompleteTokens(completeTokens, currentReqCount); + } catch (Exception e) { e.printStackTrace(); } @@ -260,7 +250,7 @@ public void collectData() { long deltaCompleteTokens = aggr.getDeltaCompleteTokens(); long maxDuration = aggr.getMaxDuration(); - long avgDuration = deltaDuration/(deltaRequestCount==0?1:deltaRequestCount); + long avgDuration = deltaRequestCount == 0 ? 0 : deltaDuration/deltaRequestCount; if(avgDuration > maxDuration) { maxDuration = avgDuration; aggr.setMaxDuration(maxDuration); @@ -272,18 +262,9 @@ public void collectData() { intervalSeconds = 1; } - double pricePromptTokens = 0.0; - double priceCompleteTokens = 0.0; - if (aiSystem.compareTo("watsonx") == 0) { - pricePromptTokens = watsonxPricePromptTokens; - priceCompleteTokens = watsonxPriceCompleteTokens; - } else if (aiSystem.compareTo("openai") == 0) { - pricePromptTokens = openaiPricePromptTokens; - priceCompleteTokens = openaiPriceCompleteTokens; - } else if (aiSystem.compareTo("anthropic") == 0) { - pricePromptTokens = anthropicPricePromptTokens; - priceCompleteTokens = anthropicPriceCompleteTokens; - } + double pricePromptTokens = getPricePromptTokens(aiSystem); + double priceCompleteTokens = getPriceCompleteTokens(aiSystem); + double intervalReqCount = (double)deltaRequestCount/intervalSeconds; double intervalPromptTokens = (double)deltaPromptTokens/intervalSeconds; double intervalCompleteTokens = (double)deltaCompleteTokens/intervalSeconds; @@ -293,13 +274,12 @@ public void collectData() { double intervalTotalCost = intervalPromptCost + intervalCompleteCost; aggr.resetMetrics(); - logger.info("ModelId : " + modelId); - logger.info("AiSystem : " + aiSystem); - logger.info("AvgDuration : " + avgDuration); - logger.info("MaxDuration : " + maxDuration); - logger.info("IntervalTokens : " + intervalTotalTokens); - logger.info("IntervalCost : " + intervalTotalCost); - logger.info("IntervalRequest : " + intervalReqCount); + System.out.printf("Metrics for model %s of %s:%n", modelId, aiSystem); + System.out.println(" - Average Duration : " + avgDuration + " ms"); + System.out.println(" - Maximum Duration : " + maxDuration + " ms"); + System.out.println(" - Interval Tokens : " + intervalTotalTokens); + System.out.println(" - Interval Cost : " + intervalTotalCost); + System.out.println(" - Interval Request : " + intervalReqCount); Map attributes = new HashMap<>(); attributes.put("model_id", modelId); @@ -313,4 +293,22 @@ public void collectData() { } logger.info("-----------------------------------------"); } + + private double getPricePromptTokens(String aiSystem) { + switch (aiSystem) { + case "watsonx": return watsonxPricePromptTokens; + case "openai": return openaiPricePromptTokens; + case "anthropic": return anthropicPricePromptTokens; + default: return 0.0; + } + } + + private double getPriceCompleteTokens(String aiSystem) { + switch (aiSystem) { + case "watsonx": return watsonxPriceCompleteTokens; + case "openai": return openaiPriceCompleteTokens; + case "anthropic": return anthropicPriceCompleteTokens; + default: return 0.0; + } + } } diff --git a/llm/src/main/java/com/instana/dc/llm/impl/llm/MetricsCollectorService.java b/llm/src/main/java/com/instana/dc/llm/impl/llm/MetricsCollectorService.java index 3ba3b18..73f0ad1 100644 --- a/llm/src/main/java/com/instana/dc/llm/impl/llm/MetricsCollectorService.java +++ b/llm/src/main/java/com/instana/dc/llm/impl/llm/MetricsCollectorService.java @@ -1,24 +1,27 @@ package com.instana.dc.llm.impl.llm; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.logging.Logger; + import com.google.common.collect.ImmutableList; + +import java.util.HashMap; +import java.util.Map; + import io.grpc.stub.StreamObserver; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; import io.opentelemetry.proto.common.v1.InstrumentationScope; import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.proto.metrics.v1.ScopeMetrics; -import io.opentelemetry.proto.metrics.v1.NumberDataPoint; -import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; import io.opentelemetry.proto.resource.v1.Resource; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.logging.Logger; - class MetricsCollectorService extends MetricsServiceGrpc.MetricsServiceImplBase { private static final Logger logger = Logger.getLogger(MetricsCollectorService.class.getName()); @@ -80,12 +83,11 @@ public void setAiSystem(String aiSystem) { this.aiSystem = aiSystem; } } - - private final BlockingQueue exportMetrics = new LinkedBlockingDeque<>(); + private HashMap exportMetrics = new HashMap<>(); public List getMetrics() { synchronized (mutex) { - return ImmutableList.copyOf(exportMetrics); + return ImmutableList.copyOf(exportMetrics.values()); } } @@ -98,150 +100,111 @@ public void export( ExportMetricsServiceRequest request, StreamObserver responseObserver) { - logger.info("--------------------------------------------------------"); - synchronized (mutex) { - List allResourceMetrics = request.getResourceMetricsList(); for (ResourceMetrics resourceMetrics : allResourceMetrics) { - - Resource resource = resourceMetrics.getResource(); - for (KeyValue reskv : resource.getAttributesList()) { - logger.info("Received metric --- Resource attrKey: " + reskv.getKey()); - logger.info("Received metric --- Resource attrVal: " + reskv.getValue().getStringValue()); - } - for (ScopeMetrics scoMetrics : resourceMetrics.getScopeMetricsList()) { InstrumentationScope instrumentationScope = scoMetrics.getScope(); instrumentationScope.getAttributesList(); - for (KeyValue inskv : instrumentationScope.getAttributesList()) { - logger.info("Received metric --- Scope attrKey: " + inskv.getKey()); - logger.info("Received metric --- Scope attrVal: " + inskv.getValue().getStringValue()); - } - for (Metric metric : scoMetrics.getMetricsList()) { - logger.info("Received metric --- Scope Name: " + metric.getName()); - logger.info("Received metric --- Scope Desc: " + metric.getDescription()); - logger.info("Received metric --- Scope Unit: " + metric.getUnit()); - logger.info("Received metric --- Scope Case: " + metric.getDataCase().getNumber()); - + System.out.println("-----------------"); + System.out.println("Recv Metric --- Scope Name: " + metric.getName()); + System.out.println("Recv Metric --- Scope Desc: " + metric.getDescription()); switch (metric.getDataCase()) { - case SUM: - if (metric.getName().compareTo("llm.watsonx.completions.tokens") == 0 || - metric.getName().compareTo("llm.openai.chat_completions.tokens") == 0 || - metric.getName().compareTo("llm.anthropic.completion.tokens") == 0 || - metric.getName().compareTo("gen_ai.client.token.usage") == 0) { - - List sumDataPoints = metric.getSum().getDataPointsList(); - for (NumberDataPoint dataPoint : sumDataPoints) { - - List kvList = dataPoint.getAttributesList(); - - String modelId = ""; - String tokenType = ""; - String aiSystem = ""; - for (KeyValue kv : kvList) { - logger.info("Received metric --- Tokens attrKey: " + kv.getKey()); - logger.info("Received metric --- Tokens attrVal: " - + kv.getValue().getStringValue()); - if (kv.getKey().compareTo("llm.response.model") == 0 || kv.getKey().compareTo("gen_ai.response.model") == 0) { - modelId = kv.getValue().getStringValue(); - } else if (kv.getKey().compareTo("llm.usage.token_type") == 0 || kv.getKey().compareTo("gen_ai.token.type") == 0) { - tokenType = kv.getValue().getStringValue(); - } else if (kv.getKey().compareTo("gen_ai.system") == 0) { - aiSystem = kv.getValue().getStringValue(); - } - } - if (aiSystem.isEmpty() && metric.getName().compareTo("gen_ai.client.token.usage") != 0) { - String[] parts = metric.getName().split("\\.", 3); - aiSystem = parts[1]; - } else { - aiSystem = "n/a"; - } - - long promptTokens = 0; - long completeTokens = 0; - if (tokenType.compareTo("prompt") == 0 || tokenType.compareTo("input") == 0) { - promptTokens = dataPoint.getAsInt(); - logger.info("Received metric --- Prompt Value: " + promptTokens); - } else if (tokenType.compareTo("completion") == 0 || tokenType.compareTo("output") == 0) { - completeTokens = dataPoint.getAsInt(); - logger.info("Received metric --- Complete Value: " + completeTokens); - } - - if (!modelId.isEmpty()) { - OtelMetric otelMetric = new OtelMetric(); - otelMetric.setModelId(modelId); - otelMetric.setAiSystem(aiSystem); - if(promptTokens > 0) { - otelMetric.setPromptTokens(promptTokens); - } - if(completeTokens > 0) { - otelMetric.setCompleteTokens(completeTokens); - } - exportMetrics.add(otelMetric); - } - } - } - break; case HISTOGRAM: - if (metric.getName().compareTo("llm.watsonx.completions.duration") == 0 || - metric.getName().compareTo("llm.openai.chat_completions.duration") == 0 || - metric.getName().compareTo("llm.anthropic.completion.duration") == 0 || - metric.getName().compareTo("gen_ai.client.operation.duration") == 0) { - - List histDataPoints = metric.getHistogram().getDataPointsList(); - for (HistogramDataPoint dataPoint : histDataPoints) { - - List kvList = dataPoint.getAttributesList(); - - String modelId = ""; - String aiSystem = ""; - for (KeyValue kv : kvList) { - logger.info("Received metric --- Duration attrKey: " + kv.getKey()); - logger.info("Received metric --- Duration attrVal: " - + kv.getValue().getStringValue()); - if (kv.getKey().compareTo("llm.response.model") == 0 || kv.getKey().compareTo("gen_ai.response.model") == 0) { - modelId = kv.getValue().getStringValue(); - } else if (kv.getKey().compareTo("gen_ai.system") == 0) { - aiSystem = kv.getValue().getStringValue(); - } - } - if (aiSystem.isEmpty() && metric.getName().compareTo("gen_ai.client.token.usage") != 0) { - String[] parts = metric.getName().split("\\.", 3); - aiSystem = parts[1]; - } else { - aiSystem = "n/a"; - } - - Double durationSum = dataPoint.getSum(); - long requestCount = dataPoint.getCount(); - logger.info("Received metric --- Duration Sum Value: " + durationSum); - logger.info("Received metric --- Duration Count Value: " + requestCount); - - if (!modelId.isEmpty()) { - OtelMetric otelMetric = new OtelMetric(); - otelMetric.setModelId(modelId); - otelMetric.setAiSystem(aiSystem); - otelMetric.setDuration(durationSum); - otelMetric.setReqCount(requestCount); - exportMetrics.add(otelMetric); - } - } - } + processHistogramMetrics(metric); break; + case SUM: case GAUGE: case SUMMARY: default: - logger.info("Unsupported metric DataCase: " + metric.getDataCase()); - throw new AssertionError("Unsupported metric DataCase: " + metric.getDataCase()); + System.out.println("Skip Metric DataCase: " + metric.getDataCase()); } } } } } - responseObserver.onNext(ExportMetricsServiceResponse.getDefaultInstance()); responseObserver.onCompleted(); } + + private void processHistogramMetrics(Metric metric) { + if (metric.getName().compareTo("gen_ai.client.token.usage") == 0) { + processTokenMetric(metric); + } else if (metric.getName().compareTo("gen_ai.client.operation.duration") == 0) { + processDurationMetric(metric); + } + } + + private void processTokenMetric(Metric metric) { + List histDataPoints = metric.getHistogram().getDataPointsList(); + for (HistogramDataPoint dataPoint : histDataPoints) { + List kvList = dataPoint.getAttributesList(); + String modelId = ""; + String tokenType = ""; + String aiSystem = ""; + for (KeyValue kv : kvList) { + if (kv.getKey().compareTo("gen_ai.response.model") == 0) { + modelId = kv.getValue().getStringValue(); + System.out.println("Recv Metric --- Model ID: " + modelId); + } else if (kv.getKey().compareTo("gen_ai.system") == 0) { + aiSystem = kv.getValue().getStringValue(); + System.out.println("Recv Metric --- AI System: " + aiSystem); + } else if (kv.getKey().compareTo("gen_ai.token.type") == 0) { + tokenType = kv.getValue().getStringValue(); + } + } + if (!modelId.isEmpty()) { + OtelMetric otelMetric = exportMetrics.get(modelId); + if (otelMetric == null ) { + otelMetric = new OtelMetric(); + exportMetrics.put(modelId, otelMetric); + otelMetric.setModelId(modelId); + otelMetric.setAiSystem(aiSystem); + } + if (tokenType.compareTo("input") == 0) { + long inputTokens = (long) dataPoint.getSum(); + otelMetric.setPromptTokens(inputTokens); + System.out.println("Recv Metric --- Token Input: " + inputTokens); + } else if (tokenType.compareTo("output") == 0) { + long outputTokens = (long) dataPoint.getSum(); + otelMetric.setCompleteTokens(outputTokens); + System.out.println("Recv Metric --- Token Output: " + outputTokens); + } + } + } + } + + private void processDurationMetric(Metric metric) { + List histDataPoints = metric.getHistogram().getDataPointsList(); + for (HistogramDataPoint dataPoint : histDataPoints) { + List kvList = dataPoint.getAttributesList(); + String modelId = ""; + String aiSystem = ""; + for (KeyValue kv : kvList) { + if (kv.getKey().compareTo("gen_ai.response.model") == 0) { + modelId = kv.getValue().getStringValue(); + System.out.println("Recv Metric --- Model ID: " + modelId); + } else if (kv.getKey().compareTo("gen_ai.system") == 0) { + aiSystem = kv.getValue().getStringValue(); + System.out.println("Recv Metric --- AI System: " + aiSystem); + } + } + if (!modelId.isEmpty()) { + OtelMetric otelMetric = exportMetrics.get(modelId); + if (otelMetric == null ) { + otelMetric = new OtelMetric(); + exportMetrics.put(modelId, otelMetric); + otelMetric.setModelId(modelId); + otelMetric.setAiSystem(aiSystem); + } + Double durationSum = dataPoint.getSum(); + long requestCount = dataPoint.getCount(); + otelMetric.setDuration(durationSum); + otelMetric.setReqCount(requestCount); + System.out.println("Recv Metric --- Duration Sum: " + durationSum); + System.out.println("Recv Metric --- Duration Count: " + requestCount); + } + } + } }