Skip to content

Commit

Permalink
removed publisher confirms.
Browse files Browse the repository at this point in the history
improved consumer consumption rates by caching consumer.
  • Loading branch information
sgunapu authored and jlavallee committed May 9, 2013
1 parent 8066e1d commit 1357746
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 87 deletions.
51 changes: 29 additions & 22 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStat

private transient Channel channel;

private QueueingConsumer consumer;
private String consumerTag;

public AMQPConsumer(){
super();
}
Expand All @@ -49,25 +52,21 @@ public SampleResult sample(Entry entry) {
result.setSuccessful(false);
result.setResponseCode("500");


trace("AMQPConsumer.sample()");

try {
initChannel();
} catch (IOException ex) {
log.error("Failed to initialize channel", ex);
result.setResponseMessage(ex.getMessage());
return result;
}

QueueingConsumer consumer = new QueueingConsumer(channel);
//channel.basicQos(1); // TODO: make prefetchCount configurable?
String consumerTag = null;
try {
consumerTag = channel.basicConsume(getQueue(), autoAck(), consumer);
} catch (IOException ex) {
log.error("Failed to consume from channel", ex);
result.setResponseMessage(ex.getMessage());
// only do this once per thread. Otherwise it slows down the consumption by appx 50%
if (consumer == null) {
consumer = new QueueingConsumer(channel);
}
if (consumerTag == null) {
consumerTag = channel.basicConsume(getQueue(), autoAck(), consumer);
}
} catch (Exception ex) {
log.error("Failed to initialize channel", ex);
result.setResponseMessage(ex.toString());
return result;
}

Expand All @@ -83,7 +82,7 @@ public SampleResult sample(Entry entry) {
delivery = consumer.nextDelivery(getReceiveTimeoutAsInt());

if(delivery == null){
log.warn("nextDelivery timed out");
result.setResponseMessage("timed out");
return result;
}

Expand Down Expand Up @@ -128,13 +127,6 @@ public SampleResult sample(Entry entry) {
result.setResponseMessage(e.getMessage());
} finally {
result.sampleEnd(); // End timimg
try {
if (delivery != null) {
channel.basicCancel(consumerTag);
}
} catch(IOException e) {
log.error("Couldn't safely cancel the sample " + consumerTag, e);
}
}

trace("AMQPConsumer.sample ended");
Expand Down Expand Up @@ -258,6 +250,7 @@ public boolean interrupt() {
*/
@Override
public void testEnded() {

if(purgeQueue()){
log.info("Purging queue " + getQueue());
try {
Expand All @@ -283,6 +276,20 @@ public void testStarted(String arg0) {

}

public void cleanup() {

try {
if (consumerTag != null) {
channel.basicCancel(consumerTag);
}
} catch(IOException e) {
log.error("Couldn't safely cancel the sample " + consumerTag, e);
}

super.cleanup();

}

/*
* Helper method
*/
Expand Down
35 changes: 14 additions & 21 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.rabbitmq.client.AMQP;
import java.io.IOException;

import com.rabbitmq.client.MessageProperties;
import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.Interruptible;
import org.apache.jmeter.samplers.SampleResult;
Expand Down Expand Up @@ -35,12 +34,7 @@ public class AMQPPublisher extends AMQPSampler implements Interruptible {
private final static String REPLY_TO_QUEUE = "AMQPPublisher.ReplyToQueue";
private final static String CORRELATION_ID = "AMQPPublisher.CorrelationId";

public static boolean DEFAULT_USE_PUBLISHER_CONFIRMS = false;
//public static String DEFAULT_USE_PUBLISHER_CONFIRMS_STRING = Boolean.toString(DEFAULT_USE_PUBLISHER_CONFIRMS);
private final static String USE_PUBLISHER_CONFIRMS = "AMQPConsumer.UsePublisherConfirms";

public static boolean DEFAULT_PERSISTENT = false;
//public static String DEFAULT_PERSISTENT_STRING = Boolean.toString(DEFAULT_PERSISTENT);
private final static String PERSISTENT = "AMQPConsumer.Persistent";

private transient Channel channel;
Expand All @@ -61,8 +55,9 @@ public SampleResult sample(Entry e) {

try {
initChannel();
} catch (IOException ex) {
log.error("Failed to initialize channel", ex);
} catch (Exception ex) {
log.error("Failed to initialize channel : ", ex);
result.setResponseMessage(ex.toString());
return result;
}

Expand All @@ -78,7 +73,17 @@ public SampleResult sample(Entry e) {
try {

for (int idx = 0; idx < loop; idx++) {
// try to force jms semantics.
// but this does not work since RabbitMQ does not sync to disk if consumers are connected as
// seen by iostat -cd 1. TPS value remains at 0.

//if (getPersistent()) {
// channel.txSelect();
//}
channel.basicPublish(getExchange(), getMessageRoutingKey(), getProperties(), getMessageBytes());
//if (getPersistent()) {
// channel.txCommit();
//}
}

/*
Expand All @@ -92,7 +97,7 @@ public SampleResult sample(Entry e) {
result.setResponseMessage("OK");
result.setSuccessful(true);
} catch (Exception ex) {
log.debug("", ex);
log.debug(ex.getMessage(), ex);
result.setResponseCode("000");
result.setResponseMessage(ex.toString());
}
Expand Down Expand Up @@ -172,15 +177,6 @@ public void setPersistent(Boolean persistent) {
setProperty(PERSISTENT, persistent);
}

public Boolean getUsePublisherConfirms() {
return getPropertyAsBoolean(USE_PUBLISHER_CONFIRMS, DEFAULT_USE_PUBLISHER_CONFIRMS);
}

public void setUsePublisherConfirms(Boolean usePublisherConfirms) {
setProperty(USE_PUBLISHER_CONFIRMS, usePublisherConfirms);
}


@Override
public boolean interrupt() {
cleanup();
Expand Down Expand Up @@ -213,9 +209,6 @@ protected AMQP.BasicProperties getProperties() {

protected boolean initChannel() throws IOException{
boolean ret = super.initChannel();
if (getUsePublisherConfirms()) {
channel.confirmSelect();
}
return ret;
}
}
143 changes: 105 additions & 38 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

public abstract class AMQPSampler extends AbstractSampler implements ThreadListener {

public static final boolean DEFAULT_EXCHANGE_DURABLE = true;
public static final boolean DEFAULT_EXCHANGE_REDECLARE = false;
public static final boolean DEFAULT_QUEUE_REDECLARE = false;

public static final int DEFAULT_PORT = 5672;
public static final String DEFAULT_PORT_STRING = Integer.toString(DEFAULT_PORT);

Expand All @@ -34,6 +38,7 @@ public abstract class AMQPSampler extends AbstractSampler implements ThreadListe
protected static final String EXCHANGE = "AMQPSampler.Exchange";
protected static final String EXCHANGE_TYPE = "AMQPSampler.ExchangeType";
protected static final String EXCHANGE_DURABLE = "AMQPSampler.ExchangeDurable";
protected static final String EXCHANGE_REDECLARE = "AMQPSampler.ExchangeRedeclare";
protected static final String QUEUE = "AMQPSampler.Queue";
protected static final String ROUTING_KEY = "AMQPSampler.RoutingKey";
protected static final String VIRUTAL_HOST = "AMQPSampler.VirtualHost";
Expand All @@ -45,6 +50,7 @@ public abstract class AMQPSampler extends AbstractSampler implements ThreadListe
private static final String ITERATIONS = "AMQPSampler.Iterations";
private static final String MESSAGE_TTL = "AMQPSampler.MessageTTL";
private static final String QUEUE_DURABLE = "AMQPSampler.QueueDurable";
private static final String QUEUE_REDECLARE = "AMQPSampler.Redeclare";
private static final String QUEUE_EXCLUSIVE = "AMQPSampler.QueueExclusive";
private static final String QUEUE_AUTO_DELETE = "AMQPSampler.QueueAutoDelete";
private static final int DEFAULT_HEARTBEAT = 1;
Expand All @@ -55,10 +61,6 @@ public abstract class AMQPSampler extends AbstractSampler implements ThreadListe
protected AMQPSampler(){
factory = new ConnectionFactory();
factory.setRequestedHeartbeat(DEFAULT_HEARTBEAT);

setExhangeDurable("true");

System.out.println("Created :"+ getTitle() + " -- " + this);
}

protected boolean initChannel() throws IOException {
Expand All @@ -71,39 +73,27 @@ protected boolean initChannel() throws IOException {
}

if(channel == null) {
log.info("Creating channel " + getVirtualHost()+":"+getPortAsInt());
factory.setConnectionTimeout(getTimeoutAsInt());
factory.setVirtualHost(getVirtualHost());
factory.setHost(getHost());
factory.setPort(getPortAsInt());
factory.setUsername(getUsername());
factory.setPassword(getPassword());

log.info("RabbitMQ ConnectionFactory using:"
+"\n\t virtual host: " + getVirtualHost()
+"\n\t host: " + getHost()
+"\n\t port: " + getPort()
+"\n\t username: " + getUsername()
+"\n\t password: " + getPassword()
+"\n\t timeout: " + getTimeout()
+"\n\t heartbeat: " + factory.getRequestedHeartbeat()
+"\nin " + this
);

connection = factory.newConnection();
channel = connection.createChannel();
channel.basicQos(0,0,false);
if(!channel.isOpen()){
log.fatalError("Failed to open channel: " + channel.getCloseReason().getLocalizedMessage());
}
channel = createChannel();
setChannel(channel);

//TODO: Break out queue binding
if(getQueue() != null && !getQueue().isEmpty()) {
channel.queueDeclare(getQueue(), queueDurable(), queueExclusive(), queueAutoDelete(), getQueueArguments());
boolean queueConfigured = (getQueue() != null && !getQueue().isEmpty());

if(queueConfigured) {
if (getQueueRedeclare()) {
deleteQueue();
}

if(!StringUtils.isBlank(getExchange())) { //Use a named exchange
channel.exchangeDeclare(getExchange(), getExchangeType(), true);
AMQP.Queue.DeclareOk declareQueueResp = channel.queueDeclare(getQueue(), queueDurable(), queueExclusive(), queueAutoDelete(), getQueueArguments());
}

if(!StringUtils.isBlank(getExchange())) { //Use a named exchange
if (getExchangeRedeclare()) {
deleteExchange();
}

AMQP.Exchange.DeclareOk declareExchangeResp = channel.exchangeDeclare(getExchange(), getExchangeType(), getExchangeDurable());
if (queueConfigured) {
channel.queueBind(getQueue(), getExchange(), getRoutingKey());
}
}
Expand Down Expand Up @@ -189,16 +179,20 @@ public void setExchangeType(String name) {
setProperty(EXCHANGE_TYPE, name);
}

public String getExchangeDurable() {
return getPropertyAsString(EXCHANGE_DURABLE);
public Boolean getExchangeDurable() {
return getPropertyAsBoolean(EXCHANGE_DURABLE);
}

public void setExhangeDurable(String content) {
public void setExchangeDurable(Boolean content) {
setProperty(EXCHANGE_DURABLE, content);
}

public boolean exchangeDurable() {
return getPropertyAsBoolean(EXCHANGE_DURABLE);
public Boolean getExchangeRedeclare() {
return getPropertyAsBoolean(EXCHANGE_REDECLARE);
}

public void setExchangeRedeclare(Boolean content) {
setProperty(EXCHANGE_REDECLARE, content);
}

public String getQueue() {
Expand Down Expand Up @@ -343,6 +337,15 @@ public boolean queueAutoDelete(){
return getPropertyAsBoolean(QUEUE_AUTO_DELETE);
}


public Boolean getQueueRedeclare() {
return getPropertyAsBoolean(QUEUE_REDECLARE);
}

public void setQueueRedeclare(Boolean content) {
setProperty(QUEUE_REDECLARE, content);
}

protected void cleanup() {
try {
//getChannel().close(); // closing the connection will close the channel if it's still open
Expand All @@ -364,4 +367,68 @@ public void threadStarted() {

}

protected Channel createChannel() throws IOException {
log.info("Creating channel " + getVirtualHost()+":"+getPortAsInt());
factory.setConnectionTimeout(getTimeoutAsInt());
factory.setVirtualHost(getVirtualHost());
factory.setHost(getHost());
factory.setPort(getPortAsInt());
factory.setUsername(getUsername());
factory.setPassword(getPassword());

log.info("RabbitMQ ConnectionFactory using:"
+"\n\t virtual host: " + getVirtualHost()
+"\n\t host: " + getHost()
+"\n\t port: " + getPort()
+"\n\t username: " + getUsername()
+"\n\t password: " + getPassword()
+"\n\t timeout: " + getTimeout()
+"\n\t heartbeat: " + factory.getRequestedHeartbeat()
+"\nin " + this
);

if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
}

Channel channel = connection.createChannel();
if(!channel.isOpen()){
log.fatalError("Failed to open channel: " + channel.getCloseReason().getLocalizedMessage());
}
return channel;
}

protected void deleteQueue() throws IOException {
Channel channel = createChannel();
try {
log.info("Deleting queue " + getQueue());
channel.queueDelete(getQueue());
}
catch(Exception ex) {
ex.printStackTrace();
// ignore it.
}
finally {
if (channel.isOpen()) {
channel.close();
}
}
}

protected void deleteExchange() throws IOException {
Channel channel = createChannel();
try {
log.info("Deleting exchange " + getExchange());
channel.exchangeDelete(getExchange());
}
catch(Exception ex) {
ex.printStackTrace();
// ignore it.
}
finally {
if (channel.isOpen()) {
channel.close();
}
}
}
}
Loading

0 comments on commit 1357746

Please sign in to comment.