diff --git a/src/main/java/com/salesforce/WriteTopic.java b/src/main/java/com/salesforce/WriteTopic.java index 294a948..169d372 100644 --- a/src/main/java/com/salesforce/WriteTopic.java +++ b/src/main/java/com/salesforce/WriteTopic.java @@ -83,13 +83,13 @@ public Exception call() { while (keepProducing) { - produceMessageTimeSecs.time(() -> { - // TODO: Get this from properties - for (int i = 0; i < numMessagesToSendPerBatch; i++) { - kafkaProducer.send(new ProducerRecord<>(topicName, topicId, i)); - log.debug("{}: Produced message {}", formatter.format(new Date()), topicId); - } - }); + // TODO: Get this from properties + for (int i = 0; i < numMessagesToSendPerBatch; i++) { + Histogram.Timer requestTimer = produceMessageTimeSecs.startTimer(); + kafkaProducer.send(new ProducerRecord<>(topicName, topicId, i)); + requestTimer.observeDuration(); + log.debug("{}: Produced message {}", formatter.format(new Date()), topicId); + } threadsAwaitingMessageProduce.dec(); Thread.sleep(readWriteInterval); threadsAwaitingMessageProduce.inc();