Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Add continuous produce mode
Browse files Browse the repository at this point in the history
  • Loading branch information
afalko authored Feb 15, 2018
1 parent 37d6cb6 commit 46838a3
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 61 deletions.
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# kafka-partition-availability-benchmark

This repository contains a Kafka partition stress test. The goal of it is to make it easier to validate changes to Kafka
with respect how many concurrent replicated partitions it can support.
This repository contains a Kafka partition stress test. The goal of it is to make it easier to validate changes to
Kafka with respect how many concurrent replicated partitions it can support.

We want to ensure that our Kafka users have the following gaurantees:

Expand All @@ -24,7 +24,8 @@ mvn package
## Configuration
You can see all configurable parameters in `src/main/resources/kafka-partition-availability-benchmark.properties`

The defaults are set to something you can run against a local single-broker installation of kafka. In most cases, you only probably need to set four things to put stress on a real test environment:
The defaults are set to something you can run against a local single-broker installation of kafka. In most cases, you
only probably need to set four things to put stress on a real test environment:
```
cat > ~/.kafka-partition-availability-benchmark.properties << EOF
kafka.replication.factor = 3
Expand All @@ -37,11 +38,22 @@ EOF
Depending on how powerful your test runner host is, you might be able to bump up the number of topics past `4000`. In
our testing, `4000` was what an i3.2xlarge instance could bear before test results started to get skewed.

To get past `4000` topics, we ran this tool on multiple test runners. We recommend setting the default topic prefix to something unique per test runner by doing something like this:
To get past `4000` topics, we ran this tool on multiple test runners. We recommend setting the default topic prefix to
something unique per test runner by doing something like this:
```
echo "default_topic_prefix = `hostname`" >> ~/.kafka-partition-availability-benchmark.properties
```

### Measuring produce and consume at the same time

By default, the benchmark will only produce one message per partition and re-consume that messages every second. To test produce continously in the same fashion and not
rely on re-consuming the same messages, set the following configuration options:
```
cat > ~/.kafka-partition-availability-benchmark.properties << EOF
keep_producing = true
EOF
```

## Running

You can use your favorite configuration management tool such as Ansible to make the below more elegant. From a central
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
</manifestEntries>
</transformer>
</transformers>
<finalName>${artifactId}</finalName>
<finalName>${project.artifactId}</finalName>
</configuration>
</execution>
</executions>
Expand Down
29 changes: 16 additions & 13 deletions src/main/java/com/salesforce/ConsumeTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

class ConsumeTopic implements Callable<Exception> {
private static final Logger log = LoggerFactory.getLogger(ConsumeTopic.class);
Expand All @@ -35,26 +34,29 @@ class ConsumeTopic implements Callable<Exception> {

private final int topicId;
private final String key;
private final int consumerPollInterval;
private final int readWriteInterval;
private final AdminClient kafkaAdminClient;
private final Map<String, Object> kafkaConsumerConfig;
private final short replicationFactor;
private final boolean keepProducing;

/**
* @param topicId Each topic gets a numeric id
* @param key Prefix for topics created by this tool
* @param consumerPollInterval How long should we wait before polls for new messages?
* @param topicId Each topic gets a numeric id
* @param key Prefix for topics created by this tool
* @param readWriteInterval How long should we wait before polls for consuming new messages
* @param kafkaAdminClient
* @param kafkaConsumerConfig
* @param keepProducing Whether we are continuously producing messages rather than just producing once
*/
public ConsumeTopic(int topicId, String key, int consumerPollInterval, AdminClient kafkaAdminClient,
Map<String, Object> kafkaConsumerConfig, short replicationFactor) {
public ConsumeTopic(int topicId, String key, int readWriteInterval, AdminClient kafkaAdminClient,
Map<String, Object> kafkaConsumerConfig, short replicationFactor, boolean keepProducing) {
this.topicId = topicId;
this.key = key;
this.consumerPollInterval = consumerPollInterval;
this.readWriteInterval = readWriteInterval;
this.kafkaAdminClient = kafkaAdminClient;
this.kafkaConsumerConfig = Collections.unmodifiableMap(kafkaConsumerConfig);
this.replicationFactor = replicationFactor;
this.keepProducing = keepProducing;
}

@Override
Expand All @@ -69,7 +71,6 @@ public Exception call() {
TopicPartition topicPartition = new TopicPartition(topicName, 0);
consumer.assign(Collections.singleton(topicPartition));

AtomicInteger numMessages = new AtomicInteger();
while (true) {
ConsumerRecords<Integer, Integer> messages;
Histogram.Timer consumerReceiveTimer = consumerReceiveTimeSecs.startTimer();
Expand All @@ -79,13 +80,15 @@ public Exception call() {
consumerReceiveTimer.observeDuration();
}
if (messages.count() == 0) {
if (keepProducing) {
Thread.sleep(readWriteInterval);
continue;
}
log.debug("Ran out of messages to process for topic {}; starting from beginning", topicName);
consumer.seekToBeginning(Collections.singleton(topicPartition));
numMessages.set(0);
Thread.sleep(consumerPollInterval);
Thread.sleep(readWriteInterval);
continue;
}
numMessages.addAndGet(messages.count());

consumerCommitTimeSecs.time(consumer::commitSync);

Expand All @@ -94,7 +97,7 @@ public Exception call() {

log.debug("Last consumed message {}:{}, consumed {} messages, topic: {}",
lastMessage.key(), lastMessage.value(), messages.count(), topicName);
Thread.sleep(consumerPollInterval);
Thread.sleep(readWriteInterval);
}
} catch (Exception e) {
log.error("Failed consume", e);
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/com/salesforce/CreateTopic.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.salesforce;

import io.prometheus.client.Histogram;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;

public class CreateTopic implements Callable<Exception> {
private static final Logger log = LoggerFactory.getLogger(CreateTopic.class);

private static final Histogram topicCreateTimeSecs = Histogram
.build("topicCreateTimeSecs", "Topic create time in ms")
.register();

private final int topicId;
private final String key;
private final AdminClient kafkaAdminClient;
private final short replicationFactor;

public CreateTopic(int topicId, String key, AdminClient kafkaAdminClient, short replicationFactor) {
this.topicId = topicId;
this.key = key;
this.kafkaAdminClient = kafkaAdminClient;
this.replicationFactor = replicationFactor;
}

@Override
public Exception call() throws Exception {
String topicName = TopicName.createTopicName(key, topicId);

// TODO: Allow numPartitions to be changed
Set<NewTopic> topic = Collections.singleton(new NewTopic(topicName, 1, replicationFactor));
kafkaAdminClient.createTopics(topic);

// Wait for topic to be created and for leader election to happen
topicCreateTimeSecs.time(() -> {
try {
TopicVerifier.checkTopic(kafkaAdminClient, topicName, replicationFactor);
} catch (InterruptedException e) {
log.error("Unable to record topic creation", e);
}
});


log.debug("Created topic {}", topic);
return null;
}
}
47 changes: 37 additions & 10 deletions src/main/java/com/salesforce/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class Main {

public static void main(String[] args) throws Exception {
Properties settings = new Properties();
try (InputStream defaults = Thread.currentThread().getContextClassLoader()
try (InputStream defaults = Thread.currentThread().getContextClassLoader()
.getResourceAsStream("kafka-partition-availability-benchmark.properties")) {
settings.load(defaults);
}
Expand All @@ -53,6 +53,7 @@ public static void main(String[] args) throws Exception {
new PrometheusMetricsServer(Integer.valueOf(settings.getProperty("prometheus_metrics_port"))));
Runtime.getRuntime().addShutdownHook(new Thread(CollectorRegistry.defaultRegistry::clear));

Integer numConcurrentTopicCreations = Integer.valueOf(settings.getProperty("num_concurrent_topic_creations"));
Integer numConcurrentConsumers = Integer.valueOf(settings.getProperty("num_concurrent_consumers"));
Integer numConcurrentProducers = Integer.valueOf(settings.getProperty("num_concurrent_producers"));
Integer numTopics = Integer.valueOf(settings.getProperty("num_topics"));
Expand All @@ -68,8 +69,16 @@ public static void main(String[] args) throws Exception {
log.error("Havoc will ensue if you have fewer concurrent producers than consumers");
System.exit(3);
}
if (numConcurrentTopicCreations > numTopics) {
log.error("You cannot concurrently create more topics than desired");
System.exit(4);
}
String topicPrefix = settings.getProperty("default_topic_prefix");
Integer consumerPollIntervalMs = Integer.valueOf(settings.getProperty("consumer_poll_interval_ms"));
Integer readWriteIntervalMs = Integer.valueOf(settings.getProperty("read_write_interval_ms"));

Integer numMessagesToSendPerBatch = Integer.valueOf(settings.getProperty("messages_per_batch"));

Boolean keepProducing = Boolean.valueOf(settings.getProperty("keep_producing"));

// Admin settings
Map<String, Object> kafkaAdminConfig = new HashMap<>();
Expand All @@ -91,6 +100,12 @@ public static void main(String[] args) throws Exception {
kafkaProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());

// Global counters
Counter topicsCreated = Counter
.build("numTopicsCreated", "Number of topics we've attempted to create")
.register();
Counter topicsCreateFailed = Counter
.build("numTopicsCreateFailed", "Number of topics we've failed to create")
.register();
Counter topicsProduced = Counter
.build("numTopicsProduced", "Number of topics we've attempted to produce to")
.register();
Expand All @@ -104,35 +119,47 @@ public static void main(String[] args) throws Exception {
.build("numTopicsConsumeFailed", "Number of topics we've failed to consume from")
.register();

try(AdminClient kafkaAdminClient = KafkaAdminClient.create(kafkaAdminConfig);
KafkaProducer<Integer, Integer> kafkaProducer = new KafkaProducer<>(kafkaProducerConfig)) {
try (AdminClient kafkaAdminClient = KafkaAdminClient.create(kafkaAdminConfig);
KafkaProducer<Integer, Integer> kafkaProducer = new KafkaProducer<>(kafkaProducerConfig)) {
ExecutorService createTopics = Executors.newFixedThreadPool(numConcurrentTopicCreations);
ExecutorService writeTopics = Executors.newFixedThreadPool(numConcurrentProducers);
ExecutorService consumeTopics = Executors.newFixedThreadPool(numConcurrentConsumers);

BlockingQueue<Future<Exception>> consumerFutures = new ArrayBlockingQueue<>(numConcurrentConsumers);

BlockingQueue<Future<Exception>> createTopicFutures = new ArrayBlockingQueue<>(numConcurrentTopicCreations);
BlockingQueue<Future<Exception>> writeFutures = new ArrayBlockingQueue<>(numConcurrentProducers);
BlockingQueue<Future<Exception>> consumerFutures = new ArrayBlockingQueue<>(numConcurrentConsumers);

log.info("Starting benchmark...");
for (int topic = 1; topic <= numTopics; topic++) {
createTopicFutures.put(createTopics.submit(new CreateTopic(topic, topicPrefix, kafkaAdminClient,
replicationFactor)));
topicsCreated.inc();
if (createTopicFutures.size() >= numConcurrentTopicCreations) {
log.info("Created {} topics, ensuring success before producing more...", numConcurrentTopicCreations);
clearQueue(createTopicFutures, topicsCreateFailed);
}

writeFutures.put(writeTopics.submit(new WriteTopic(topic, topicPrefix, kafkaAdminClient,
replicationFactor, kafkaProducer)));
replicationFactor, numMessagesToSendPerBatch,
keepProducing, kafkaProducer, readWriteIntervalMs)));
topicsProduced.inc();
if (writeFutures.size() >= numConcurrentProducers) {
log.info("Produced {} topics, ensuring success before producing more...", numConcurrentProducers);
clearQueue(writeFutures, topicsProduceFailed);
}
consumerFutures.put(consumeTopics.submit(new ConsumeTopic(topic, topicPrefix, consumerPollIntervalMs,
kafkaAdminClient, kafkaConsumerConfig, replicationFactor)));

consumerFutures.put(consumeTopics.submit(new ConsumeTopic(topic, topicPrefix, readWriteIntervalMs,
kafkaAdminClient, kafkaConsumerConfig, replicationFactor, keepProducing)));
topicsConsumed.inc();
if (consumerFutures.size() >= numConcurrentConsumers) {
log.debug("Consumed {} topics, clearing queue before consuming more...", numConcurrentConsumers);
clearQueue(consumerFutures, topicsConsumeFailed);
}
}

writeTopics.shutdown();
createTopics.shutdown();
try {
clearQueue(writeFutures, topicsProduceFailed);
clearQueue(consumerFutures, topicsConsumeFailed);
} finally {
consumeTopics.shutdownNow();
Expand Down
Loading

0 comments on commit 46838a3

Please sign in to comment.