diff --git a/README.md b/README.md
index 4baa78f..a11b2cc 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
# kafka-partition-availability-benchmark
-This repository contains a Kafka partition stress test. The goal of it is to make it easier to validate changes to Kafka
-with respect how many concurrent replicated partitions it can support.
+This repository contains a Kafka partition stress test. The goal of it is to make it easier to validate changes to
+Kafka with respect how many concurrent replicated partitions it can support.
We want to ensure that our Kafka users have the following gaurantees:
@@ -24,7 +24,8 @@ mvn package
## Configuration
You can see all configurable parameters in `src/main/resources/kafka-partition-availability-benchmark.properties`
-The defaults are set to something you can run against a local single-broker installation of kafka. In most cases, you only probably need to set four things to put stress on a real test environment:
+The defaults are set to something you can run against a local single-broker installation of kafka. In most cases, you
+only probably need to set four things to put stress on a real test environment:
```
cat > ~/.kafka-partition-availability-benchmark.properties << EOF
kafka.replication.factor = 3
@@ -37,11 +38,22 @@ EOF
Depending on how powerful your test runner host is, you might be able to bump up the number of topics past `4000`. In
our testing, `4000` was what an i3.2xlarge instance could bear before test results started to get skewed.
-To get past `4000` topics, we ran this tool on multiple test runners. We recommend setting the default topic prefix to something unique per test runner by doing something like this:
+To get past `4000` topics, we ran this tool on multiple test runners. We recommend setting the default topic prefix to
+something unique per test runner by doing something like this:
```
echo "default_topic_prefix = `hostname`" >> ~/.kafka-partition-availability-benchmark.properties
```
+### Measuring produce and consume at the same time
+
+By default, the benchmark will only produce one message per partition and re-consume that messages every second. To test produce continously in the same fashion and not
+rely on re-consuming the same messages, set the following configuration options:
+```
+cat > ~/.kafka-partition-availability-benchmark.properties << EOF
+keep_producing = true
+EOF
+```
+
## Running
You can use your favorite configuration management tool such as Ansible to make the below more elegant. From a central
diff --git a/pom.xml b/pom.xml
index f0a4a78..3b8c05d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,7 +100,7 @@
- ${artifactId}
+ ${project.artifactId}
diff --git a/src/main/java/com/salesforce/ConsumeTopic.java b/src/main/java/com/salesforce/ConsumeTopic.java
index 7247ccf..c31c0e9 100644
--- a/src/main/java/com/salesforce/ConsumeTopic.java
+++ b/src/main/java/com/salesforce/ConsumeTopic.java
@@ -21,7 +21,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
class ConsumeTopic implements Callable {
private static final Logger log = LoggerFactory.getLogger(ConsumeTopic.class);
@@ -35,26 +34,29 @@ class ConsumeTopic implements Callable {
private final int topicId;
private final String key;
- private final int consumerPollInterval;
+ private final int readWriteInterval;
private final AdminClient kafkaAdminClient;
private final Map kafkaConsumerConfig;
private final short replicationFactor;
+ private final boolean keepProducing;
/**
- * @param topicId Each topic gets a numeric id
- * @param key Prefix for topics created by this tool
- * @param consumerPollInterval How long should we wait before polls for new messages?
+ * @param topicId Each topic gets a numeric id
+ * @param key Prefix for topics created by this tool
+ * @param readWriteInterval How long should we wait before polls for consuming new messages
* @param kafkaAdminClient
* @param kafkaConsumerConfig
+ * @param keepProducing Whether we are continuously producing messages rather than just producing once
*/
- public ConsumeTopic(int topicId, String key, int consumerPollInterval, AdminClient kafkaAdminClient,
- Map kafkaConsumerConfig, short replicationFactor) {
+ public ConsumeTopic(int topicId, String key, int readWriteInterval, AdminClient kafkaAdminClient,
+ Map kafkaConsumerConfig, short replicationFactor, boolean keepProducing) {
this.topicId = topicId;
this.key = key;
- this.consumerPollInterval = consumerPollInterval;
+ this.readWriteInterval = readWriteInterval;
this.kafkaAdminClient = kafkaAdminClient;
this.kafkaConsumerConfig = Collections.unmodifiableMap(kafkaConsumerConfig);
this.replicationFactor = replicationFactor;
+ this.keepProducing = keepProducing;
}
@Override
@@ -69,7 +71,6 @@ public Exception call() {
TopicPartition topicPartition = new TopicPartition(topicName, 0);
consumer.assign(Collections.singleton(topicPartition));
- AtomicInteger numMessages = new AtomicInteger();
while (true) {
ConsumerRecords messages;
Histogram.Timer consumerReceiveTimer = consumerReceiveTimeSecs.startTimer();
@@ -79,13 +80,15 @@ public Exception call() {
consumerReceiveTimer.observeDuration();
}
if (messages.count() == 0) {
+ if (keepProducing) {
+ Thread.sleep(readWriteInterval);
+ continue;
+ }
log.debug("Ran out of messages to process for topic {}; starting from beginning", topicName);
consumer.seekToBeginning(Collections.singleton(topicPartition));
- numMessages.set(0);
- Thread.sleep(consumerPollInterval);
+ Thread.sleep(readWriteInterval);
continue;
}
- numMessages.addAndGet(messages.count());
consumerCommitTimeSecs.time(consumer::commitSync);
@@ -94,7 +97,7 @@ public Exception call() {
log.debug("Last consumed message {}:{}, consumed {} messages, topic: {}",
lastMessage.key(), lastMessage.value(), messages.count(), topicName);
- Thread.sleep(consumerPollInterval);
+ Thread.sleep(readWriteInterval);
}
} catch (Exception e) {
log.error("Failed consume", e);
diff --git a/src/main/java/com/salesforce/CreateTopic.java b/src/main/java/com/salesforce/CreateTopic.java
new file mode 100644
index 0000000..525ad1f
--- /dev/null
+++ b/src/main/java/com/salesforce/CreateTopic.java
@@ -0,0 +1,53 @@
+package com.salesforce;
+
+import io.prometheus.client.Histogram;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class CreateTopic implements Callable {
+ private static final Logger log = LoggerFactory.getLogger(CreateTopic.class);
+
+ private static final Histogram topicCreateTimeSecs = Histogram
+ .build("topicCreateTimeSecs", "Topic create time in ms")
+ .register();
+
+ private final int topicId;
+ private final String key;
+ private final AdminClient kafkaAdminClient;
+ private final short replicationFactor;
+
+ public CreateTopic(int topicId, String key, AdminClient kafkaAdminClient, short replicationFactor) {
+ this.topicId = topicId;
+ this.key = key;
+ this.kafkaAdminClient = kafkaAdminClient;
+ this.replicationFactor = replicationFactor;
+ }
+
+ @Override
+ public Exception call() throws Exception {
+ String topicName = TopicName.createTopicName(key, topicId);
+
+ // TODO: Allow numPartitions to be changed
+ Set topic = Collections.singleton(new NewTopic(topicName, 1, replicationFactor));
+ kafkaAdminClient.createTopics(topic);
+
+ // Wait for topic to be created and for leader election to happen
+ topicCreateTimeSecs.time(() -> {
+ try {
+ TopicVerifier.checkTopic(kafkaAdminClient, topicName, replicationFactor);
+ } catch (InterruptedException e) {
+ log.error("Unable to record topic creation", e);
+ }
+ });
+
+
+ log.debug("Created topic {}", topic);
+ return null;
+ }
+}
diff --git a/src/main/java/com/salesforce/Main.java b/src/main/java/com/salesforce/Main.java
index 7e6286e..d0f58de 100644
--- a/src/main/java/com/salesforce/Main.java
+++ b/src/main/java/com/salesforce/Main.java
@@ -34,7 +34,7 @@ public class Main {
public static void main(String[] args) throws Exception {
Properties settings = new Properties();
- try (InputStream defaults = Thread.currentThread().getContextClassLoader()
+ try (InputStream defaults = Thread.currentThread().getContextClassLoader()
.getResourceAsStream("kafka-partition-availability-benchmark.properties")) {
settings.load(defaults);
}
@@ -53,6 +53,7 @@ public static void main(String[] args) throws Exception {
new PrometheusMetricsServer(Integer.valueOf(settings.getProperty("prometheus_metrics_port"))));
Runtime.getRuntime().addShutdownHook(new Thread(CollectorRegistry.defaultRegistry::clear));
+ Integer numConcurrentTopicCreations = Integer.valueOf(settings.getProperty("num_concurrent_topic_creations"));
Integer numConcurrentConsumers = Integer.valueOf(settings.getProperty("num_concurrent_consumers"));
Integer numConcurrentProducers = Integer.valueOf(settings.getProperty("num_concurrent_producers"));
Integer numTopics = Integer.valueOf(settings.getProperty("num_topics"));
@@ -68,8 +69,16 @@ public static void main(String[] args) throws Exception {
log.error("Havoc will ensue if you have fewer concurrent producers than consumers");
System.exit(3);
}
+ if (numConcurrentTopicCreations > numTopics) {
+ log.error("You cannot concurrently create more topics than desired");
+ System.exit(4);
+ }
String topicPrefix = settings.getProperty("default_topic_prefix");
- Integer consumerPollIntervalMs = Integer.valueOf(settings.getProperty("consumer_poll_interval_ms"));
+ Integer readWriteIntervalMs = Integer.valueOf(settings.getProperty("read_write_interval_ms"));
+
+ Integer numMessagesToSendPerBatch = Integer.valueOf(settings.getProperty("messages_per_batch"));
+
+ Boolean keepProducing = Boolean.valueOf(settings.getProperty("keep_producing"));
// Admin settings
Map kafkaAdminConfig = new HashMap<>();
@@ -91,6 +100,12 @@ public static void main(String[] args) throws Exception {
kafkaProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
// Global counters
+ Counter topicsCreated = Counter
+ .build("numTopicsCreated", "Number of topics we've attempted to create")
+ .register();
+ Counter topicsCreateFailed = Counter
+ .build("numTopicsCreateFailed", "Number of topics we've failed to create")
+ .register();
Counter topicsProduced = Counter
.build("numTopicsProduced", "Number of topics we've attempted to produce to")
.register();
@@ -104,26 +119,37 @@ public static void main(String[] args) throws Exception {
.build("numTopicsConsumeFailed", "Number of topics we've failed to consume from")
.register();
- try(AdminClient kafkaAdminClient = KafkaAdminClient.create(kafkaAdminConfig);
- KafkaProducer kafkaProducer = new KafkaProducer<>(kafkaProducerConfig)) {
+ try (AdminClient kafkaAdminClient = KafkaAdminClient.create(kafkaAdminConfig);
+ KafkaProducer kafkaProducer = new KafkaProducer<>(kafkaProducerConfig)) {
+ ExecutorService createTopics = Executors.newFixedThreadPool(numConcurrentTopicCreations);
ExecutorService writeTopics = Executors.newFixedThreadPool(numConcurrentProducers);
ExecutorService consumeTopics = Executors.newFixedThreadPool(numConcurrentConsumers);
- BlockingQueue> consumerFutures = new ArrayBlockingQueue<>(numConcurrentConsumers);
-
+ BlockingQueue> createTopicFutures = new ArrayBlockingQueue<>(numConcurrentTopicCreations);
BlockingQueue> writeFutures = new ArrayBlockingQueue<>(numConcurrentProducers);
+ BlockingQueue> consumerFutures = new ArrayBlockingQueue<>(numConcurrentConsumers);
log.info("Starting benchmark...");
for (int topic = 1; topic <= numTopics; topic++) {
+ createTopicFutures.put(createTopics.submit(new CreateTopic(topic, topicPrefix, kafkaAdminClient,
+ replicationFactor)));
+ topicsCreated.inc();
+ if (createTopicFutures.size() >= numConcurrentTopicCreations) {
+ log.info("Created {} topics, ensuring success before producing more...", numConcurrentTopicCreations);
+ clearQueue(createTopicFutures, topicsCreateFailed);
+ }
+
writeFutures.put(writeTopics.submit(new WriteTopic(topic, topicPrefix, kafkaAdminClient,
- replicationFactor, kafkaProducer)));
+ replicationFactor, numMessagesToSendPerBatch,
+ keepProducing, kafkaProducer, readWriteIntervalMs)));
topicsProduced.inc();
if (writeFutures.size() >= numConcurrentProducers) {
log.info("Produced {} topics, ensuring success before producing more...", numConcurrentProducers);
clearQueue(writeFutures, topicsProduceFailed);
}
- consumerFutures.put(consumeTopics.submit(new ConsumeTopic(topic, topicPrefix, consumerPollIntervalMs,
- kafkaAdminClient, kafkaConsumerConfig, replicationFactor)));
+
+ consumerFutures.put(consumeTopics.submit(new ConsumeTopic(topic, topicPrefix, readWriteIntervalMs,
+ kafkaAdminClient, kafkaConsumerConfig, replicationFactor, keepProducing)));
topicsConsumed.inc();
if (consumerFutures.size() >= numConcurrentConsumers) {
log.debug("Consumed {} topics, clearing queue before consuming more...", numConcurrentConsumers);
@@ -131,8 +157,9 @@ public static void main(String[] args) throws Exception {
}
}
- writeTopics.shutdown();
+ createTopics.shutdown();
try {
+ clearQueue(writeFutures, topicsProduceFailed);
clearQueue(consumerFutures, topicsConsumeFailed);
} finally {
consumeTopics.shutdownNow();
diff --git a/src/main/java/com/salesforce/WriteTopic.java b/src/main/java/com/salesforce/WriteTopic.java
index 361a504..03f5115 100644
--- a/src/main/java/com/salesforce/WriteTopic.java
+++ b/src/main/java/com/salesforce/WriteTopic.java
@@ -9,16 +9,13 @@
import io.prometheus.client.Histogram;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
-import java.util.Collections;
import java.util.Date;
-import java.util.Set;
import java.util.concurrent.Callable;
class WriteTopic implements Callable {
@@ -26,9 +23,6 @@ class WriteTopic implements Callable {
private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyyy-mm-dd hh:mm:ss");
- private static final Histogram topicCreateTimeSecs = Histogram
- .build("topicCreateTimeSecs", "Topic create time in ms")
- .register();
private static final Histogram firstMessageProduceTimeSecs = Histogram
.build("firstMessageProduceTimeSecs", "First message produce latency time in ms")
.register();
@@ -41,14 +35,34 @@ class WriteTopic implements Callable {
private final AdminClient kafkaAdminClient;
private final short replicationFactor;
private final KafkaProducer kafkaProducer;
+ private final int numMessagesToSendPerBatch;
+ private final boolean keepProducing;
+ private final int readWriteInterval;
+ /**
+ * Produce messages thread constructor
+ *
+ * @param topicId Unique identifier for topic
+ * @param key Key for the environment
+ * @param kafkaAdminClient
+ * @param replicationFactor Kafka's replication factor for messages
+ * @param numMessagesToSendPerBatch Number of messages to produce continously
+ * @param keepProducing Whether we should produce one message only or keep produce thread alive and
+ * produce each readWriteInterval
+ * @param kafkaProducer
+ * @param readWriteInterval How long to wait between message production
+ */
public WriteTopic(int topicId, String key, AdminClient kafkaAdminClient, short replicationFactor,
- KafkaProducer kafkaProducer) {
+ int numMessagesToSendPerBatch, boolean keepProducing,
+ KafkaProducer kafkaProducer, int readWriteInterval) {
this.topicId = topicId;
this.key = key;
this.kafkaAdminClient = kafkaAdminClient;
this.replicationFactor = replicationFactor;
+ this.numMessagesToSendPerBatch = numMessagesToSendPerBatch;
+ this.keepProducing = keepProducing;
this.kafkaProducer = kafkaProducer;
+ this.readWriteInterval = readWriteInterval;
}
@Override
@@ -56,35 +70,24 @@ public Exception call() {
String topicName = TopicName.createTopicName(key, topicId);
try {
- // TODO: Allow numPartitions to be changed
- Set topic = Collections.singleton(new NewTopic(topicName, 1, replicationFactor));
- kafkaAdminClient.createTopics(topic);
-
- // Wait for topic to be created and for leader election to happen
- topicCreateTimeSecs.time(() -> {
- try {
- TopicVerifier.checkTopic(kafkaAdminClient, topicName, replicationFactor);
- } catch (InterruptedException e) {
- log.error("Unable to record topic creation", e);
- }
- });
- log.debug("Created topic {}", topic);
+ TopicVerifier.checkTopic(kafkaAdminClient, topicName, replicationFactor);
// Produce one message to "warm" kafka up
- firstMessageProduceTimeSecs.time(() -> {
- kafkaProducer.send(new ProducerRecord<>(topicName, topicId, -1));
- });
+ firstMessageProduceTimeSecs.time(() ->
+ kafkaProducer.send(new ProducerRecord<>(topicName, topicId, -1)));
log.debug("Produced first message to topic {}", topicName);
- int numMessagesToSend = 120;
- produceMessageTimeSecs.time(() -> {
- // TODO: Get this from properties
- for (int i = 0; i < numMessagesToSend; i++) {
- kafkaProducer.send(new ProducerRecord<>(topicName, topicId, i));
- log.debug("{}: Produced message {}", formatter.format(new Date()), topicId);
- }
- });
- log.debug("Produce {} messages to topic {}", numMessagesToSend, topicName);
+ while (keepProducing) {
+ produceMessageTimeSecs.time(() -> {
+ // TODO: Get this from properties
+ for (int i = 0; i < numMessagesToSendPerBatch; i++) {
+ kafkaProducer.send(new ProducerRecord<>(topicName, topicId, i));
+ log.debug("{}: Produced message {}", formatter.format(new Date()), topicId);
+ }
+ });
+ Thread.sleep(readWriteInterval);
+ }
+ log.debug("Produce {} messages to topic {}", numMessagesToSendPerBatch, topicName);
// TODO: Also keep producers around and periodically publish new messages
return null;
diff --git a/src/main/resources/kafka-partition-availability-benchmark.properties b/src/main/resources/kafka-partition-availability-benchmark.properties
index 48376fc..72a217f 100644
--- a/src/main/resources/kafka-partition-availability-benchmark.properties
+++ b/src/main/resources/kafka-partition-availability-benchmark.properties
@@ -8,10 +8,14 @@
default_topic_prefix = default
num_topics = 15
+num_concurrent_topic_creations = 1
num_concurrent_producers = 15
num_concurrent_consumers = 15
-consumer_poll_interval_ms = 1000
+read_write_interval_ms = 1000
+
+messages_per_batch = 120
+keep_producing = false
prometheus_metrics_port = 9800