Skip to content

Commit

Permalink
Adding support to read headers from message
Browse files Browse the repository at this point in the history
  • Loading branch information
Vitor Campos authored and jlavallee committed Feb 12, 2017
1 parent 2137342 commit b157526
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;

public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStateListener {
private static final int DEFAULT_PREFETCH_COUNT = 0; // unlimited
Expand All @@ -31,6 +32,10 @@ public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStat
private static final String PURGE_QUEUE = "AMQPConsumer.PurgeQueue";
private static final String AUTO_ACK = "AMQPConsumer.AutoAck";
private static final String RECEIVE_TIMEOUT = "AMQPConsumer.ReceiveTimeout";
public static final String TIMESTAMP_PARAMETER = "Timestamp";
public static final String EXCHANGE_PARAMETER = "Exchange";
public static final String ROUTING_KEY_PARAMETER = "Routing Key";
public static final String DELIVERY_TAG_PARAMETER = "Delivery Tag";

public static boolean DEFAULT_USE_TX = false;
private final static String USE_TX = "AMQPConsumer.UseTx";
Expand Down Expand Up @@ -58,7 +63,7 @@ public SampleResult sample(Entry entry) {
try {
initChannel();

// only do this once per thread. Otherwise it slows down the consumption by appx 50%
// only do this once per thread. Otherwise it slows down the consumption by appx 50%
if (consumer == null) {
log.info("Creating consumer");
consumer = new QueueingConsumer(channel);
Expand Down Expand Up @@ -114,6 +119,8 @@ public SampleResult sample(Entry entry) {

result.setResponseData("OK", null);
result.setDataType(SampleResult.TEXT);
result.setResponseHeaders(delivery != null ? formatHeaders(delivery) : null);


result.setResponseCodeOK();

Expand Down Expand Up @@ -223,7 +230,7 @@ public String getPrefetchCount() {
}

public void setPrefetchCount(String prefetchCount) {
setProperty(PREFETCH_COUNT, prefetchCount);
setProperty(PREFETCH_COUNT, prefetchCount);
}

public int getPrefetchCountAsInt() {
Expand Down Expand Up @@ -308,7 +315,7 @@ public void cleanup() {

try {
if (consumerTag != null) {
channel.basicCancel(consumerTag);
channel.basicCancel(consumerTag);
}
} catch(IOException e) {
log.error("Couldn't safely cancel the sample " + consumerTag, e);
Expand Down Expand Up @@ -336,4 +343,20 @@ protected boolean initChannel() throws IOException, NoSuchAlgorithmException, Ke
}
return ret;
}

private String formatHeaders(QueueingConsumer.Delivery delivery){
Map<String, Object> headers = delivery.getProperties().getHeaders();
StringBuilder sb = new StringBuilder();
sb.append(TIMESTAMP_PARAMETER).append(": ")
.append(delivery.getProperties().getTimestamp() != null && delivery.getProperties().getTimestamp() != null ?
delivery.getProperties().getTimestamp().getTime() : "")
.append("\n");
sb.append(EXCHANGE_PARAMETER).append(": ").append(delivery.getEnvelope().getExchange()).append("\n");
sb.append(ROUTING_KEY_PARAMETER).append(": ").append(delivery.getEnvelope().getRoutingKey()).append("\n");
sb.append(DELIVERY_TAG_PARAMETER).append(": ").append(delivery.getEnvelope().getDeliveryTag()).append("\n");
for (String key : headers.keySet()) {
sb.append(key).append(": ").append(headers.get(key)).append("\n");
}
return sb.toString();
}
}

0 comments on commit b157526

Please sign in to comment.