Skip to content

Commit

Permalink
add kamon-pekko-connectors-kafka module (#1367)
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning authored Oct 10, 2024
1 parent 28e2310 commit 07436ee
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 0 deletions.
16 changes: 16 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,21 @@ lazy val `kamon-pekko-grpc` = (project in file("instrumentation/kamon-pekko-grpc
)
)).dependsOn(`kamon-pekko-http`, `kamon-testkit` % "test")

lazy val `kamon-pekko-connectors-kafka` = (project in file("instrumentation/kamon-pekko-connectors-kafka"))
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(instrumentationSettings)
.settings(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"org.apache.pekko" %% "pekko-connectors-kafka" % "1.0.0" % "provided",
"org.apache.pekko" %% "pekko-stream" % "1.0.1" % "provided",
scalatest % "test",
logbackClassic % "test"
)
).dependsOn(`kamon-core`, `kamon-pekko`, `kamon-testkit` % "test")

lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc"))
.enablePlugins(JavaAgent, AkkaGrpcPlugin)
.disablePlugins(AssemblyPlugin)
Expand Down Expand Up @@ -1156,6 +1171,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo
`kamon-pekko`,
`kamon-pekko-http`,
`kamon-pekko-grpc`,
`kamon-pekko-connectors-kafka`,
`kamon-tapir`,
`kamon-alpakka-kafka`
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# ==================================================== #
# Kamon Pekko Connectors Kafka Reference Configuration #
# ==================================================== #

kanela {
modules {
pekko-connectors-kafka {

name = "Apache Pekko Connectors Kafka Instrumentation"
description = "PREVIEW. Provides context propagation for Apache Pekko Connectors Kafka applications"
instrumentations = [
"kamon.instrumentation.pekko.connectors.kafka.ProducerMessageInstrumentation"
]

within = [
"org.apache.pekko.kafka.ProducerMessage\\$Message",
"org.apache.pekko.kafka.ProducerMessage\\$MultiMessage",
"org.apache.pekko.kafka.internal.DefaultProducerStageLogic"
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* ==========================================================================================
* Copyright © 2013-2022 The Kamon Project <https://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* ==========================================================================================
*/

package kamon
package instrumentation
package pekko
package connectors
package kafka

import kamon.Kamon
import kamon.context.Storage
import kamon.context.Storage.Scope
import kamon.instrumentation.context.HasContext
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice

class ProducerMessageInstrumentation extends InstrumentationBuilder {

/**
* Captures the current context the a Message or MultiMessage is created and restores it while
* the ProducerLogic is running, so the proper context gets propagated to the Kafka Producer.
*/
onTypes("org.apache.pekko.kafka.ProducerMessage$Message", "org.apache.pekko.kafka.ProducerMessage$MultiMessage")
.mixin(classOf[HasContext.MixinWithInitializer])

onTypes(
"org.apache.pekko.kafka.internal.DefaultProducerStageLogic",
"org.apache.pekko.kafka.internal.CommittingProducerSinkStageLogic"
)
.advise(method("produce"), ProduceWithEnvelopeContext)
}

object ProduceWithEnvelopeContext {

@Advice.OnMethodEnter
def enter(@Advice.Argument(0) envelope: Any): Storage.Scope = {
envelope match {
case hasContext: HasContext => Kamon.storeContext(hasContext.context)
case _ => Scope.Empty
}
}

@Advice.OnMethodExit(onThrowable = classOf[Throwable])
def exit(@Advice.Enter scope: Storage.Scope): Unit =
scope.close()
}

0 comments on commit 07436ee

Please sign in to comment.