Skip to content

Commit

Permalink
added
Browse files Browse the repository at this point in the history
    pub/sub : Aggregate sample results to make it more like the JMS sampler
    pub: Persistent Message Option to set easily set message type, Use publisher Confirms to fsync messages
    sub: Read response, prefetch count
  • Loading branch information
sgunapu authored and jlavallee committed May 9, 2013
1 parent 13a1c1c commit 8066e1d
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 43 deletions.
124 changes: 98 additions & 26 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStat

private static final Logger log = LoggingManager.getLoggerForClass();

public static int DEFAULT_PREFETCH_COUNT = 0; // unlimited
public static String DEFAULT_PREFETCH_COUNT_STRING = Integer.toString(DEFAULT_PREFETCH_COUNT);
private final static String PREFETCH_COUNT = "AMQPConsumer.PrefetchCount";

public static boolean DEFAULT_READ_RESPONSE = true;
private final static String READ_RESPONSE = "AMQPConsumer.ReadResponse";


//++ These are JMX names, and must not be changed
private final static String PURGE_QUEUE = "AMQPConsumer.PurgeQueue";
private final static String AUTO_ACK = "AMQPConsumer.AutoAck";
Expand All @@ -41,77 +49,94 @@ public SampleResult sample(Entry entry) {
result.setSuccessful(false);
result.setResponseCode("500");

QueueingConsumer consumer;
String consumerTag;

trace("AMQPConsumer.sample()");

try {
initChannel();

consumer = new QueueingConsumer(channel);
channel.basicQos(1); // TODO: make prefetchCount configurable?
consumerTag = channel.basicConsume(getQueue(), autoAck(), consumer);
} catch (IOException ex) {
log.error("Failed to initialize channel", ex);
result.setResponseMessage(ex.getMessage());
return result;
}

QueueingConsumer consumer = new QueueingConsumer(channel);
//channel.basicQos(1); // TODO: make prefetchCount configurable?
String consumerTag = null;
try {
consumerTag = channel.basicConsume(getQueue(), autoAck(), consumer);
} catch (IOException ex) {
log.error("Failed to consume from channel", ex);
result.setResponseMessage(ex.getMessage());
return result;
}

result.setSampleLabel(getTitle());
/*
* Perform the sampling
*/
int loop = getIterationsAsInt();
result.sampleStart(); // Start timing
QueueingConsumer.Delivery delivery = null;
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(getReceiveTimeoutAsInt());

if(delivery == null){
log.warn("nextDelivery timed out");
return result;
for (int idx = 0; idx < loop; idx++) {
delivery = consumer.nextDelivery(getReceiveTimeoutAsInt());

if(delivery == null){
log.warn("nextDelivery timed out");
return result;
}

/*
* Set up the sample result details
*/
if (getReadResponseAsBoolean()) {
result.setSamplerData(new String(delivery.getBody()));
}
else {
result.setSamplerData("Read response is false.");
}

if(!autoAck())
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

/*
* Set up the sample result details
*/
result.setSamplerData(new String(delivery.getBody()));

result.setResponseData("OK", null);
result.setDataType(SampleResult.TEXT);

result.setResponseCodeOK();
result.setResponseMessage("OK");
result.setSuccessful(true);

if(!autoAck())
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

} catch (ShutdownSignalException e) {
log.warn("AMQP consumer failed to consume", e);
result.setResponseCode("400");
result.setResponseMessage(e.toString());
result.setResponseMessage(e.getMessage());
interrupt();
} catch (ConsumerCancelledException e) {
log.warn("AMQP consumer failed to consume", e);
result.setResponseCode("300");
result.setResponseMessage(e.toString());
result.setResponseMessage(e.getMessage());
interrupt();
} catch (InterruptedException e) {
log.info("interuppted while attempting to consume");
result.setResponseCode("200");
result.setResponseMessage(e.toString());
result.setResponseMessage(e.getMessage());
} catch (IOException e) {
log.warn("AMQP consumer failed to consume", e);
result.setResponseCode("100");
result.setResponseMessage(e.toString());
result.setResponseMessage(e.getMessage());
} finally {
result.sampleEnd(); // End timimg
try {
channel.basicCancel(consumerTag);
if (delivery != null) {
channel.basicCancel(consumerTag);
}
} catch(IOException e) {
log.error("Couldn't safely cancel the sample's consumer", e);
log.error("Couldn't safely cancel the sample " + consumerTag, e);
}
}

result.sampleEnd(); // End timimg
trace("AMQPConsumer.sample ended");

return result;
Expand Down Expand Up @@ -181,6 +206,47 @@ public void setReceiveTimeout(String s) {
setProperty(RECEIVE_TIMEOUT, s);
}

public String getPrefetchCount() {
return getPropertyAsString(PREFETCH_COUNT, DEFAULT_PREFETCH_COUNT_STRING);
}

public void setPrefetchCount(String prefetchCount) {
setProperty(PREFETCH_COUNT, prefetchCount);
}

public int getPrefetchCountAsInt() {
return getPropertyAsInt(PREFETCH_COUNT);
}

/**
* set whether the sampler should read the response or not
*
* @param read whether the sampler should read the response or not
*/
public void setReadResponse(Boolean read) {
setProperty(READ_RESPONSE, read);
}

/**
* return whether the sampler should read the response
*
* @return whether the sampler should read the response
*/
public String getReadResponse() {
return getPropertyAsString(READ_RESPONSE);
}

/**
* return whether the sampler should read the response as a boolean value
*
* @return whether the sampler should read the response as a boolean value
*/
public boolean getReadResponseAsBoolean() {
return getPropertyAsBoolean(READ_RESPONSE);
}



@Override
public boolean interrupt() {
testEnded();
Expand Down Expand Up @@ -226,4 +292,10 @@ private void trace(String s) {
String th = this.toString();
log.debug(tn + " " + tl + " " + s + " " + th);
}

protected boolean initChannel() throws IOException {
boolean ret = super.initChannel();
channel.basicQos(getPrefetchCountAsInt());
return ret;
}
}
55 changes: 48 additions & 7 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.rabbitmq.client.AMQP;
import java.io.IOException;

import com.rabbitmq.client.MessageProperties;
import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.Interruptible;
import org.apache.jmeter.samplers.SampleResult;
Expand Down Expand Up @@ -34,6 +35,14 @@ public class AMQPPublisher extends AMQPSampler implements Interruptible {
private final static String REPLY_TO_QUEUE = "AMQPPublisher.ReplyToQueue";
private final static String CORRELATION_ID = "AMQPPublisher.CorrelationId";

public static boolean DEFAULT_USE_PUBLISHER_CONFIRMS = false;
//public static String DEFAULT_USE_PUBLISHER_CONFIRMS_STRING = Boolean.toString(DEFAULT_USE_PUBLISHER_CONFIRMS);
private final static String USE_PUBLISHER_CONFIRMS = "AMQPConsumer.UsePublisherConfirms";

public static boolean DEFAULT_PERSISTENT = false;
//public static String DEFAULT_PERSISTENT_STRING = Boolean.toString(DEFAULT_PERSISTENT);
private final static String PERSISTENT = "AMQPConsumer.Persistent";

private transient Channel channel;

public AMQPPublisher() {
Expand Down Expand Up @@ -63,9 +72,15 @@ public SampleResult sample(Entry e) {
/*
* Perform the sampling
*/

int loop = getIterationsAsInt();
result.sampleStart(); // Start timing
try {
channel.basicPublish(getExchange(), getMessageRoutingKey(), getProperties(), getMessageBytes());

for (int idx = 0; idx < loop; idx++) {
channel.basicPublish(getExchange(), getMessageRoutingKey(), getProperties(), getMessageBytes());
}

/*
* Set up the sample result details
*/
Expand All @@ -81,8 +96,9 @@ public SampleResult sample(Entry e) {
result.setResponseCode("000");
result.setResponseMessage(ex.toString());
}

result.sampleEnd(); // End timimg
finally {
result.sampleEnd(); // End timimg
}

return result;
}
Expand Down Expand Up @@ -147,6 +163,24 @@ public void setCorrelationId(String content) {
setProperty(CORRELATION_ID, content);
}


public Boolean getPersistent() {
return getPropertyAsBoolean(PERSISTENT, DEFAULT_PERSISTENT);
}

public void setPersistent(Boolean persistent) {
setProperty(PERSISTENT, persistent);
}

public Boolean getUsePublisherConfirms() {
return getPropertyAsBoolean(USE_PUBLISHER_CONFIRMS, DEFAULT_USE_PUBLISHER_CONFIRMS);
}

public void setUsePublisherConfirms(Boolean usePublisherConfirms) {
setProperty(USE_PUBLISHER_CONFIRMS, usePublisherConfirms);
}


@Override
public boolean interrupt() {
cleanup();
Expand All @@ -166,15 +200,22 @@ protected void setChannel(Channel channel) {
@Override
protected AMQP.BasicProperties getProperties() {
AMQP.BasicProperties parentProps = super.getProperties();
AMQP.BasicProperties publishProperties =

AMQP.BasicProperties publishProperties =
new AMQP.BasicProperties(parentProps.getContentType(), parentProps.getContentEncoding(),
parentProps.getHeaders(), parentProps.getDeliveryMode(), parentProps.getPriority(),
parentProps.getHeaders(), getPersistent() ? 2 : parentProps.getDeliveryMode(), parentProps.getPriority(),
getCorrelationId(), getReplyToQueue(), parentProps.getExpiration(),
parentProps.getMessageId(), parentProps.getTimestamp(), getMessageType(),
parentProps.getUserId(), parentProps.getAppId(), parentProps.getClusterId());

return publishProperties;
}

protected boolean initChannel() throws IOException{
boolean ret = super.initChannel();
if (getUsePublisherConfirms()) {
channel.confirmSelect();
}
return ret;
}
}
Loading

0 comments on commit 8066e1d

Please sign in to comment.