Skip to content

Commit

Permalink
seems to work
Browse files Browse the repository at this point in the history
  • Loading branch information
hughsimpson committed Aug 22, 2023
1 parent 1abb8b3 commit 403cab8
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
def hasNext: Boolean = it.hasNext || !showedLast

def next(): T = if (it.hasNext) it.next() else if (!showedLast) {
showedLast = true; last
showedLast = true
last
} else throw new RuntimeException("Next on empty Iterator")
}

Expand All @@ -127,26 +128,27 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
val lowerBoundaryIterator: Iterator[Double] = ((-maxBucketCount to maxBucketCount).map(i => Math.pow(base, i)) :+ Double.MaxValue).iterator
val valuesIterator = new ItWithLast[Distribution.Bucket](s.value.bucketsIterator, new Distribution.Bucket {
def value: Long = Long.MaxValue

def frequency: Long = 0
})
var fromLowerBound = valuesIterator.next()
var fromUpperBound = valuesIterator.next()
var toLowerBound = lowerBoundaryIterator.next()
var toUpperBound = lowerBoundaryIterator.next()
var zeroCount = 0d
var countInBucket = 0d
var zeroCount: JLong = 0L
var countInBucket = 0L

val negativeCounts = ArrayBuffer.newBuilder[JDouble]
val positiveCounts = ArrayBuffer.newBuilder[JDouble]
val negativeCounts = ArrayBuffer.newBuilder[JLong]
val positiveCounts = ArrayBuffer.newBuilder[JLong]

def iterFrom: Double = {
val d = fromLowerBound.frequency.toDouble
def iterFrom: JLong = {
val d = fromLowerBound.frequency
fromLowerBound = fromUpperBound
fromUpperBound = valuesIterator.next()
d
}

def iterTo: Double = {
def iterTo: JLong = {
toLowerBound = toUpperBound
toUpperBound = lowerBoundaryIterator.next()
val res = countInBucket
Expand All @@ -172,7 +174,7 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
countInBucket += iterFrom
positiveCounts += iterTo
} else if (fromUpperBound.value > toUpperBound) {
val firstBonus: JDouble = countInBucket
val firstBonus: JLong = countInBucket
var negBuckets = 0
var zeroBuckets = 0
var posBuckets = 0
Expand All @@ -183,14 +185,16 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
toLowerBound = toUpperBound
toUpperBound = lowerBoundaryIterator.next()
}
val totalBuckets = negBuckets + zeroBuckets + posBuckets
val avg = JDouble valueOf iterFrom / totalBuckets
negativeCounts ++= (if (negBuckets > 0) JDouble.valueOf(firstBonus + avg) +: Array.fill(negBuckets - 1)(avg) else Nil)
zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JDouble.valueOf(firstBonus + avg) else if (zeroBuckets == 1) avg else JDouble.valueOf(0))
val total = iterFrom
// Not sure about this... everything's going into the first bucket, even though we might be spanning multiple target buckets.
// Might be better to do something like push the avg.floor into each bucket, interpolating the remainder.
// OTOH it may not really come up much in practice, since the internal histos are likely to have similar or finer granularity
negativeCounts ++= (if (negBuckets > 0) JLong.valueOf(firstBonus + total) +: Array.fill(negBuckets - 1)(JLong.valueOf(0)) else Nil)
zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JLong.valueOf(firstBonus + total) else JLong.valueOf(0))
positiveCounts ++= (
if (negBuckets == 0 && zeroBuckets == 0 && posBuckets > 0)
JDouble.valueOf(firstBonus + avg) +: Array.fill(posBuckets - 1)(avg)
else Array.fill(posBuckets)(avg))
JLong.valueOf(firstBonus + total) +: Array.fill(posBuckets - 1)(JLong.valueOf(0))
else Array.fill(posBuckets)(JLong.valueOf(0)))
} else /*if (fromUpperBound.value < toUpperBound) */ toLowerBound match {
case 1 => zeroCount += iterFrom
case _ => countInBucket += iterFrom
Expand All @@ -216,38 +220,34 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
if (!usedLastValue) countInBucket += fromLowerBound.frequency
positiveCounts += countInBucket

val negBucket = new ExponentialHistogramBuckets {
val negBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets {
val getOffset: Int = -maxBucketCount
private val doubles: ArrayBuffer[JLong] = negativeCounts.result().map(JLong valueOf _.toLong) // TODO: toLong here loses things
val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava)
val getTotalCount: Long = doubles.foldLeft(0L)(_ + _)
private val longs: ArrayBuffer[JLong] = negativeCounts.result()
val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava)
val getTotalCount: Long = longs.foldLeft(0L)(_ + _)
}
val posBucket = new ExponentialHistogramBuckets {
val posBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets {
val getOffset: Int = 1
private val doubles: ArrayBuffer[JLong] = positiveCounts.result().map(JLong valueOf _.toLong) // TODO: we should normalise at avg
val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava)
val getTotalCount: Long = doubles.foldLeft(0L)(_ + _)
private val longs: ArrayBuffer[JLong] = positiveCounts.result()
val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava)
val getTotalCount: Long = longs.foldLeft(0L)(_ + _)
}
(negBucket, zeroCount.longValue(), posBucket) // TODO: instead of having these toLongs
(negBucket, zeroCount, posBucket)
}

private def toExponentialHistogramData(maxBucketCount: Int, distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
distributions.filter(_.value.buckets.nonEmpty) match {
case Nil => None
case nonEmpty =>
val mapped = nonEmpty.flatMap { s =>
s.value match {
case zigZag: Distribution.ZigZagCounts =>
def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v)
val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble))
val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s)
Some(ExponentialHistogramPointData.create(
scale, zigZag.sum, zero, pos, neg, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
))
case _ =>
logger.error("Unable to construct exponential histogram data - only ZigZagCounts distribution can be converted")
None
}
def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v)

// Could also calculate an 'offset' here, but defaulting to offset = 1 for simplicity
val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble))
val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s)
Some(ExponentialHistogramPointData.create(
scale, s.value.sum, zero, pos, neg, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
))
}
if (mapped.nonEmpty) Some(ImmutableExponentialHistogramData.create(AggregationTemporality.DELTA, mapped.asJava))
else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package kamon.otel

import com.typesafe.config.{Config, ConfigValue, ConfigValueFactory}
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.ExponentialHistogramData
import kamon.Kamon
import kamon.Kamon.config
import kamon.metric._
Expand All @@ -37,10 +39,10 @@ class OpenTelemetryMetricReporterSpec extends AnyWordSpec
with Matchers with Reconfigure {
reconfigure =>

private def openTelemetryMetricsReporter(): (OpenTelemetryMetricsReporter, MockMetricsService) = {
private def openTelemetryMetricsReporter(newConfig: Config = config): (OpenTelemetryMetricsReporter, MockMetricsService) = {
val metricsService = new MockMetricsService()
val reporter = new OpenTelemetryMetricsReporter(_ => metricsService)(ExecutionContext.global)
reporter.reconfigure(config)
reporter.reconfigure(newConfig)
(reporter, metricsService)
}

Expand Down Expand Up @@ -148,6 +150,51 @@ class OpenTelemetryMetricReporterSpec extends AnyWordSpec
points.head.getBoundaries.asScala shouldEqual Seq[JDouble](1d, 2d, 3d, 4d, 10d)
points.head.getCounts.asScala shouldEqual Seq[JDouble](2d, 2d, 3d, 0d, 1d, 1d)
}
"send exponential histogram metrics" in {
val newConfig = config.withValue("kamon.otel.metrics.histogram-format", ConfigValueFactory.fromAnyRef("base2_exponential_bucket_histogram"))
val (reporter, mockService) = openTelemetryMetricsReporter(newConfig)
val now = Instant.now()
reporter.reportPeriodSnapshot(
PeriodSnapshot.apply(
now.minusMillis(1000),
now,
Nil,
Nil,
MetricSnapshot.ofDistributions(
"test.histogram",
"test",
Metric.Settings.ForDistributionInstrument(MeasurementUnit.none, java.time.Duration.ZERO, DynamicRange.Default),
Instrument.Snapshot(
TagSet.from(Map("tag1" -> "value1")),
buildHistogramDist(Seq(1L -> 2L, 2L -> 2L, 3L -> 3L, 5L -> 1L, 15L -> 1L))
) :: Nil) :: Nil,
Nil,
Nil
)
)
// basic sanity
mockService.exportMetricsServiceRequest should not be empty
mockService.exportMetricsServiceRequest.get should have size 1
val exportedMetrics: Seq[MetricData] = mockService.exportMetricsServiceRequest.get.asScala.toSeq
exportedMetrics should have size 1
val metricData = exportedMetrics.head


// check value
metricData.getName should equal("test.histogram")
metricData.getDescription should equal("test")
val sumData = ExponentialHistogramData.fromMetricData(metricData)
val points = sumData.getPoints.asScala.toSeq
points should have size 1
points.head.getAttributes should have size 1
points.head.getAttributes.get(AttributeKey.stringKey("tag1")) should equal("value1")
points.head.getScale shouldEqual 5
points.head.getNegativeBuckets.getTotalCount shouldEqual 0L
points.head.getZeroCount shouldEqual 2L
points.head.getPositiveBuckets.getTotalCount shouldEqual 7L
points.head.getSum shouldEqual 35L
points.head.getCount shouldEqual 9L
}

"calculate sensible scales for values" in {
def randomDouble = Random.nextInt(10) match {
Expand Down

0 comments on commit 403cab8

Please sign in to comment.