Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make benchmarks more accurate #1484

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions zio-kafka-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ the worst possible case for zio-kafka. This is because these consumers only coun
processing. This makes the comparison look bad for zio-kafka because zio-kafka programs normally process records in
parallel, while other Kafka consumers process records serially.

All benchmarks use [embedded-kafka](https://github.com/embeddedkafka/embedded-kafka). This means that the network
overhead of reaching out to Kafka is almost zero. Because of that, an extra network call has no impact on these
benchmarks, even though it does impact real applications. To make extra network calls visible, we need to simulate the
network.<br>
Because there is no easy way to simulate network behavior within the JVM, we add an
[artificial delay of 5ms](//zio-kafka-bench/src/main/scala/zio/kafka/bench/SlowKafkaConsumer.scala)
to each call to the underlying java consumer (also in the non-zio benchmarks), except when we know the call does not
result in network traffic. This is only a crude approximation since the calls of the underlying java consumer do not correspond
1:1 to network traffic. Nevertheless, we expect that a change in the number of consumer calls will become visible in
the benchmark results.

All consumer benchmarks send 50k ~512 byte records per run.

#### zio.kafka.bench.ZioKafkaConsumerBenchmark.throughput
Expand All @@ -41,10 +52,6 @@ The simplest possible Kafka client that subscribes to a topic. It directly calls

Same as above, but now using partition assignment instead of topic subscription.

#### zio.kafka.bench.comparison.ZioKafkaBenchmarks.zioKafka

Does the same as `zio.kafka.bench.ZioKafkaConsumerBenchmark.throughput`.

#### zio.kafka.bench.comparison.ZioKafkaBenchmarks.manualZioKafka

Does the same as `zio.kafka.bench.ZioKafkaConsumerBenchmark.throughput`, but uses a partition assignment instead of a
Expand Down
68 changes: 68 additions & 0 deletions zio-kafka-bench/src/main/scala/zio/kafka/bench/Layers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package zio.kafka.bench

import org.apache.kafka.clients.consumer.{ Consumer => IConsumer, KafkaConsumer }
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, ConsumerSettings }
import zio.kafka.testkit.{ Kafka, KafkaTestUtils }
import zio.{ Scope => _, _ }

import scala.jdk.CollectionConverters._

object Layers {

type LowLevelKafka = IConsumer[Array[Byte], Array[Byte]]

val embeddedKafka: ZLayer[Any, Nothing, Kafka] =
Kafka.embedded.orDie

def makeJavaKafkaConsumer(consumerSettings: ConsumerSettings): ZIO[zio.Scope, Throwable, LowLevelKafka] =
ZIO.acquireRelease {
ZIO.attemptBlocking {
val wrapped = new KafkaConsumer[Array[Byte], Array[Byte]](
consumerSettings.driverSettings.asJava,
new ByteArrayDeserializer(),
new ByteArrayDeserializer()
)
new SlowKafkaConsumer(wrapped, 5.millis)
}
}(c => ZIO.attemptBlocking(c.close()).orDie)

def javaKafkaConsumer: ZLayer[ConsumerSettings, Nothing, LowLevelKafka] =
ZLayer.scoped {
for {
settings <- ZIO.service[ConsumerSettings]
consumer <- makeJavaKafkaConsumer(settings)
} yield consumer
}.orDie

def makeConsumerSettings: ZIO[Kafka, Nothing, ConsumerSettings] =
KafkaTestUtils
.consumerSettings(
clientId = randomThing("client"),
groupId = Some(randomThing("client")),
`max.poll.records` = 1000
)
.map(_.withPartitionPreFetchBufferLimit(8192))

def consumerSettings: ZLayer[Kafka, Nothing, ConsumerSettings] =
ZLayer.fromZIO(makeConsumerSettings)

val makeConsumer: ZIO[zio.Scope & Kafka, Throwable, Consumer] =
for {
// Weird workaround, without the following line, the
// zio.kafka.bench.ZioKafkaConsumerBenchmark.throughputWithCommits
// benchmark locks up after the first iteration:
_ <- ZIO.addFinalizer(ZIO.sleep(10.millis))

settings <- Layers.makeConsumerSettings
javaConsumer <- Layers.makeJavaKafkaConsumer(settings)
access <- Semaphore.make(1L)
consumer <- Consumer.fromJavaConsumerWithPermit(javaConsumer, settings, access, Diagnostics.NoOp)
} yield consumer

val consumerLayer: ZLayer[Kafka, Nothing, Consumer] =
ZLayer.scoped[Kafka](makeConsumer).orDie

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package zio.kafka.bench

import org.apache.kafka.clients.consumer._
import org.apache.kafka.common._

import java.time.Duration
import java.util.regex.Pattern
import java.util.{ Collection => JavaCollection, List => JavaList, Map => JavaMap, OptionalLong, Set => JavaSet }
import java.lang.{ Long => JavaLong }
import scala.annotation.nowarn

/**
* A consumer that start each call with the given delay, and then calls the wrapped consumer. This can be used as a
* rough approximation of network delay.
*/
class SlowKafkaConsumer[K, V](wrapped: Consumer[K, V], delay: Duration) extends Consumer[K, V] {
private val delayMillis = delay.toMillis

override def assignment(): JavaSet[TopicPartition] =
// No network traffic, no delay
wrapped.assignment()

override def subscription(): JavaSet[String] = {
Thread.sleep(delayMillis)
wrapped.subscription()
}

override def subscribe(topics: JavaCollection[String]): Unit = {
Thread.sleep(delayMillis)
wrapped.subscribe(topics)
}

override def subscribe(topics: JavaCollection[String], callback: ConsumerRebalanceListener): Unit = {
Thread.sleep(delayMillis)
wrapped.subscribe(topics, callback)
}

override def assign(partitions: JavaCollection[TopicPartition]): Unit = {
Thread.sleep(delayMillis)
wrapped.assign(partitions)
}

override def subscribe(pattern: Pattern, callback: ConsumerRebalanceListener): Unit = {
Thread.sleep(delayMillis)
wrapped.subscribe(pattern, callback)
}

override def subscribe(pattern: Pattern): Unit = {
Thread.sleep(delayMillis)
wrapped.subscribe(pattern)
}

override def unsubscribe(): Unit = {
Thread.sleep(delayMillis)
wrapped.unsubscribe()
}

// noinspection ScalaDeprecation
@nowarn("msg=deprecated")
override def poll(timeout: Long): ConsumerRecords[K, V] = {
Thread.sleep(delayMillis)
wrapped.poll(timeout)
}

override def commitSync(): Unit = {
Thread.sleep(delayMillis)
wrapped.commitSync()
}

override def commitSync(timeout: Duration): Unit = {
Thread.sleep(delayMillis)
wrapped.commitSync(timeout)
}

override def commitSync(offsets: JavaMap[TopicPartition, OffsetAndMetadata]): Unit = {
Thread.sleep(delayMillis)
wrapped.commitSync(offsets)
}

override def commitSync(offsets: JavaMap[TopicPartition, OffsetAndMetadata], timeout: Duration): Unit = {
Thread.sleep(delayMillis)
wrapped.commitSync(offsets, timeout)
}

override def commitAsync(): Unit = {
Thread.sleep(delayMillis)
wrapped.commitAsync()
}

override def commitAsync(callback: OffsetCommitCallback): Unit = {
Thread.sleep(delayMillis)
wrapped.commitAsync(callback)
}

override def commitAsync(
offsets: JavaMap[TopicPartition, OffsetAndMetadata],
callback: OffsetCommitCallback
): Unit = {
Thread.sleep(delayMillis)
wrapped.commitAsync(offsets, callback)
}

override def seek(partition: TopicPartition, offset: Long): Unit =
// No network traffic, no delay
wrapped.seek(partition, offset)

override def seek(partition: TopicPartition, offsetAndMetadata: OffsetAndMetadata): Unit =
// No network traffic, no delay
wrapped.seek(partition, offsetAndMetadata)

override def seekToBeginning(partitions: JavaCollection[TopicPartition]): Unit =
// No network traffic, no delay
wrapped.seekToBeginning(partitions)

override def seekToEnd(partitions: JavaCollection[TopicPartition]): Unit =
// No network traffic, no delay
wrapped.seekToEnd(partitions)

override def position(partition: TopicPartition): Long = {
Thread.sleep(delayMillis)
wrapped.position(partition)
}

override def position(partition: TopicPartition, timeout: Duration): Long = {
Thread.sleep(delayMillis)
wrapped.position(partition, timeout)
}

// noinspection ScalaDeprecation
@nowarn("msg=deprecated")
override def committed(partition: TopicPartition): OffsetAndMetadata = {
Thread.sleep(delayMillis)
wrapped.committed(partition)
}

// noinspection ScalaDeprecation
@nowarn("msg=deprecated")
override def committed(partition: TopicPartition, timeout: Duration): OffsetAndMetadata = {
Thread.sleep(delayMillis)
wrapped.committed(partition, timeout)
}

override def poll(timeout: Duration): ConsumerRecords[K, V] = {
Thread.sleep(delayMillis)
wrapped.poll(timeout)
}

override def committed(partitions: JavaSet[TopicPartition]): JavaMap[TopicPartition, OffsetAndMetadata] = {
Thread.sleep(delayMillis)
wrapped.committed(partitions)
}

override def committed(
partitions: JavaSet[TopicPartition],
timeout: Duration
): JavaMap[TopicPartition, OffsetAndMetadata] = {
Thread.sleep(delayMillis)
wrapped.committed(partitions, timeout)
}

override def clientInstanceId(timeout: Duration): Uuid = {
Thread.sleep(delayMillis)
wrapped.clientInstanceId(timeout)
}

override def metrics(): JavaMap[MetricName, _ <: Metric] =
// No network traffic, no delay
wrapped.metrics()

override def partitionsFor(topic: String): JavaList[PartitionInfo] = {
Thread.sleep(delayMillis)
wrapped.partitionsFor(topic)
}

override def partitionsFor(topic: String, timeout: Duration): JavaList[PartitionInfo] = {
Thread.sleep(delayMillis)
wrapped.partitionsFor(topic, timeout)
}

override def listTopics(): JavaMap[String, JavaList[PartitionInfo]] = {
Thread.sleep(delayMillis)
wrapped.listTopics()
}

override def listTopics(timeout: Duration): JavaMap[String, JavaList[PartitionInfo]] = {
Thread.sleep(delayMillis)
wrapped.listTopics(timeout)
}

override def paused(): JavaSet[TopicPartition] =
// No network traffic, no delay
wrapped.paused()

override def pause(partitions: JavaCollection[TopicPartition]): Unit =
wrapped.pause(partitions)

override def resume(partitions: JavaCollection[TopicPartition]): Unit =
// No network traffic, no delay
wrapped.resume(partitions)

override def offsetsForTimes(
timestampsToSearch: JavaMap[TopicPartition, JavaLong]
): JavaMap[TopicPartition, OffsetAndTimestamp] = {
Thread.sleep(delayMillis)
wrapped.offsetsForTimes(timestampsToSearch)
}

override def offsetsForTimes(
timestampsToSearch: JavaMap[TopicPartition, JavaLong],
timeout: Duration
): JavaMap[TopicPartition, OffsetAndTimestamp] = {
Thread.sleep(delayMillis)
wrapped.offsetsForTimes(timestampsToSearch, timeout)
}

override def beginningOffsets(partitions: JavaCollection[TopicPartition]): JavaMap[TopicPartition, JavaLong] = {
Thread.sleep(delayMillis)
wrapped.beginningOffsets(partitions)
}

override def beginningOffsets(
partitions: JavaCollection[TopicPartition],
timeout: Duration
): JavaMap[TopicPartition, JavaLong] = {
Thread.sleep(delayMillis)
wrapped.beginningOffsets(partitions, timeout)
}

override def endOffsets(partitions: JavaCollection[TopicPartition]): JavaMap[TopicPartition, JavaLong] = {
Thread.sleep(delayMillis)
wrapped.endOffsets(partitions)
}

override def endOffsets(
partitions: JavaCollection[TopicPartition],
timeout: Duration
): JavaMap[TopicPartition, JavaLong] = {
Thread.sleep(delayMillis)
wrapped.endOffsets(partitions, timeout)
}

override def currentLag(topicPartition: TopicPartition): OptionalLong = {
Thread.sleep(delayMillis)
wrapped.currentLag(topicPartition)
}

override def groupMetadata(): ConsumerGroupMetadata =
// No network traffic, no delay
wrapped.groupMetadata()

override def enforceRebalance(): Unit = {
Thread.sleep(delayMillis)
wrapped.enforceRebalance()
}

override def enforceRebalance(reason: String): Unit = {
Thread.sleep(delayMillis)
wrapped.enforceRebalance(reason)
}

override def close(): Unit = {
Thread.sleep(delayMillis)
wrapped.close()
}

override def close(timeout: Duration): Unit = {
Thread.sleep(delayMillis)
wrapped.close(timeout)
}

override def wakeup(): Unit =
// No network traffic, no delay
wrapped.wakeup()
}
Loading