Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preparing for producer diagnostics #1452

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion project/MimaSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ object MimaSettings {
Seq(
mimaPreviousArtifacts := Set(organization.value %% name.value % binCompatVersion),
mimaBinaryIssueFilters ++= Seq(
exclude[Problem]("zio.kafka.consumer.internal.*")
exclude[Problem]("zio.kafka.consumer.internal.*"),
exclude[Problem]("zio.kafka.diagnostics.internal.*")
),
mimaFailOnProblem := failOnProblem
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.kafka.bench
import org.openjdk.jmh.annotations._
import zio.kafka.admin.AdminClient.NewTopic
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.Consumer.NoDiagnostics
import zio.kafka.consumer.{ Consumer, Offset, OffsetBatch, Subscription }
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
Expand Down Expand Up @@ -40,7 +40,7 @@ class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka] {
`max.poll.records` = 1000
)
.map(_.withPartitionPreFetchBufferLimit(8192))
consumer <- Consumer.make(settings, Diagnostics.NoOp)
consumer <- Consumer.make(settings, NoDiagnostics)
} yield consumer

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package zio.kafka.example

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription }
import zio.kafka.serde.Serde
import zio.logging.backend.SLF4J
Expand Down Expand Up @@ -67,7 +66,7 @@ object Main extends ZIOAppDefault {
runConsumerStream
.provide(
consumerSettings,
ZLayer.succeed(Diagnostics.NoOp),
ZLayer.succeed(Consumer.NoDiagnostics),
Consumer.live,
MyKafka.embedded
)
Expand Down
168 changes: 85 additions & 83 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, CommitTimeout, OffsetRetrieval }
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{
ConsumerFinalized,
RunloopFinalized,
SubscriptionFinalized
}
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, CommitTimeout, ConsumerDiagnostics, OffsetRetrieval }
import zio.kafka.consumer.ConsumerDiagnosticEvent.{ ConsumerFinalized, RunloopFinalized, SubscriptionFinalized }
import zio.kafka.diagnostics.SlidingDiagnostics
import zio.kafka.producer.TransactionalProducer
import zio.kafka.serde.Serde
import zio.kafka.testkit.{ Kafka, KafkaRandom, KafkaTestUtils }
Expand Down Expand Up @@ -625,64 +620,62 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
val partitionCount = 6

ZIO.scoped {
Diagnostics.SlidingQueue
.make()
.flatMap { diagnostics =>
for {
// Produce messages on several partitions
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient
SlidingDiagnostics.make[ConsumerDiagnosticEvent]().flatMap { diagnostics =>
for {
// Produce messages on several partitions
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient

_ <- KafkaTestUtils.createCustomTopic(topic, partitionCount)
producer <- KafkaTestUtils.makeProducer
_ <- ZIO.foreachDiscard(1 to nrMessages) { i =>
KafkaTestUtils
.produceMany(producer, topic, partition = i % partitionCount, kvs = List(s"key$i" -> s"msg$i"))
}

consumer1 <- KafkaTestUtils.makeConsumer(client1, Some(group), diagnostics = diagnostics)
consumer2 <- KafkaTestUtils.makeConsumer(client2, Some(group))

// Consume messages
subscription = Subscription.topics(topic)
consumer1Ready <- Promise.make[Nothing, Unit]
assignedPartitionsRef <- Ref.make(Set.empty[Int]) // Set of partition numbers
consumer1Fib <- consumer1
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMapPar(partitionCount) { case (tp, partition) =>
ZStream
.fromZIO(
consumer1Ready
.succeed(())
.whenZIO(
assignedPartitionsRef
.updateAndGet(_ + tp.partition())
.map(_.size >= (partitionCount / 2))
) *>
partition.runDrain
)
.as(tp)
}
.take(partitionCount.toLong / 2)
.runDrain
.fork
diagnosticStream <- ZStream
.fromQueue(diagnostics.queue)
.collect { case rebalance: DiagnosticEvent.Rebalance => rebalance }
.runCollect
.fork
_ <- consumer1Ready.await
consumer2Fib <- consumer2
.partitionedStream(subscription, Serde.string, Serde.string)
.take(partitionCount.toLong / 2)
.runDrain
.fork
_ <- consumer1Fib.join
_ <- consumer2Fib.join
} yield diagnosticStream.join
}
_ <- KafkaTestUtils.createCustomTopic(topic, partitionCount)
producer <- KafkaTestUtils.makeProducer
_ <- ZIO.foreachDiscard(1 to nrMessages) { i =>
KafkaTestUtils
.produceMany(producer, topic, partition = i % partitionCount, kvs = List(s"key$i" -> s"msg$i"))
}

consumer1 <- KafkaTestUtils.makeConsumer(client1, Some(group), diagnostics = diagnostics)
consumer2 <- KafkaTestUtils.makeConsumer(client2, Some(group))

// Consume messages
subscription = Subscription.topics(topic)
consumer1Ready <- Promise.make[Nothing, Unit]
assignedPartitionsRef <- Ref.make(Set.empty[Int]) // Set of partition numbers
consumer1Fib <- consumer1
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMapPar(partitionCount) { case (tp, partition) =>
ZStream
.fromZIO(
consumer1Ready
.succeed(())
.whenZIO(
assignedPartitionsRef
.updateAndGet(_ + tp.partition())
.map(_.size >= (partitionCount / 2))
) *>
partition.runDrain
)
.as(tp)
}
.take(partitionCount.toLong / 2)
.runDrain
.fork
diagnosticStream <- ZStream
.fromQueue(diagnostics.queue)
.collect { case rebalance: ConsumerDiagnosticEvent.Rebalance => rebalance }
.runCollect
.fork
_ <- consumer1Ready.await
consumer2Fib <- consumer2
.partitionedStream(subscription, Serde.string, Serde.string)
.take(partitionCount.toLong / 2)
.runDrain
.fork
_ <- consumer1Fib.join
_ <- consumer2Fib.join
} yield diagnosticStream.join
}
}.flatten
.map(diagnosticEvents => assert(diagnosticEvents.size)(isGreaterThanEqualTo(2)))
},
Expand Down Expand Up @@ -975,7 +968,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
clientId: String,
groupId: String,
rebalanceSafeCommits: Boolean,
diagnostics: Diagnostics
diagnostics: ConsumerDiagnostics
): ZIO[Scope with Kafka, Throwable, Consumer] =
for {
settings <- KafkaTestUtils.consumerSettings(
Expand Down Expand Up @@ -1009,12 +1002,12 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.fork
_ <- ZIO.logDebug("Starting consumer 1")
rebalanceEndTimePromise <- Promise.make[Nothing, Instant]
c1Diagnostics = new Diagnostics {
override def emit(event: => DiagnosticEvent): UIO[Unit] = event match {
case r: DiagnosticEvent.Rebalance if r.assigned.size == 1 =>
c1Diagnostics = new ConsumerDiagnostics {
override def emit(event: => ConsumerDiagnosticEvent): UIO[Unit] = event match {
case r: ConsumerDiagnosticEvent.Rebalance if r.assigned.size == 1 =>
ZIO.logDebug(s"Rebalance finished: $r") *>
Clock.instant.flatMap(rebalanceEndTimePromise.succeed).unit
case r: DiagnosticEvent.Rebalance =>
case r: ConsumerDiagnosticEvent.Rebalance =>
ZIO.logDebug(s"Rebalance finished: $r")
case _ => ZIO.unit
}
Expand Down Expand Up @@ -1048,7 +1041,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.fork
_ <- c1Started.await
_ <- ZIO.logDebug("Starting consumer 2")
c2 <- makeConsumer(clientId2, groupId, rebalanceSafeCommits = false, Diagnostics.NoOp)
c2 <- makeConsumer(clientId2, groupId, rebalanceSafeCommits = false, Consumer.NoDiagnostics)
rebalanceStartTime <- Clock.instant
_ <- ZIO
.logAnnotate("consumer", "2") {
Expand Down Expand Up @@ -1592,7 +1585,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
test(
"Booting a Consumer to do something else than consuming should not fail with a timeout exception"
) {
def test(diagnostics: Diagnostics) =
def test(diagnostics: ConsumerDiagnostics) =
for {
clientId <- randomClient
settings <- KafkaTestUtils.consumerSettings(
Expand All @@ -1604,36 +1597,36 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assertCompletes

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
diagnostics <- SlidingDiagnostics.make[ConsumerDiagnosticEvent](1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isFinalizationEvent))
} yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized)))
},
suite(
"Ordering of finalizers matters. If subscriptions are finalized after the runloop, it creates a deadlock"
)(
test("When not consuming, the Runloop is not started so only the Consumer is finalized") {
def test(diagnostics: Diagnostics): ZIO[Scope & Kafka, Throwable, TestResult] =
def test(diagnostics: ConsumerDiagnostics): ZIO[Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
settings <- KafkaTestUtils.consumerSettings(clientId = clientId)
_ <- Consumer.make(settings, diagnostics = diagnostics)
} yield assertCompletes

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
diagnostics <- SlidingDiagnostics.make[ConsumerDiagnosticEvent](1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isFinalizationEvent))
} yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized)))
},
test("When consuming, the Runloop is started. The finalization orders matters to avoid a deadlock") {
// This test ensures that we're not inadvertently introducing a deadlock by changing the order of finalizers.

def test(diagnostics: Diagnostics): ZIO[Scope & Kafka, Throwable, TestResult] =
def test(diagnostics: ConsumerDiagnostics): ZIO[Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
Expand All @@ -1655,11 +1648,11 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assert(consumed0)(equalTo(1L)) && assert(consumed1)(equalTo(1L))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
diagnostics <- SlidingDiagnostics.make[ConsumerDiagnosticEvent](1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isFinalizationEvent))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
Expand All @@ -1680,7 +1673,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
val messagesToConsumeBeforeStop = 1000 // Adjust this threshold as needed
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Scope & Kafka, Throwable, TestResult] =
def test(diagnostics: ConsumerDiagnostics): ZIO[Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
Expand Down Expand Up @@ -1720,11 +1713,11 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
assert(consumed1)(equalTo(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
diagnostics <- SlidingDiagnostics.make[ConsumerDiagnosticEvent](1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isFinalizationEvent))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
Expand Down Expand Up @@ -1765,4 +1758,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Kafka.embedded
) @@ withLiveClock @@ timeout(2.minutes) @@ TestAspect.timed

private final implicit class ConsumerDiagnosticsOps(private val event: ConsumerDiagnosticEvent) extends AnyVal {
def isFinalizationEvent: Boolean = event match {
case SubscriptionFinalized => true
case RunloopFinalized => true
case ConsumerFinalized => true
case _ => false
}
}

}
Loading
Loading