Skip to content

Commit

Permalink
added transaction support.
Browse files Browse the repository at this point in the history
  • Loading branch information
sgunapu authored and jlavallee committed May 9, 2013
1 parent 1357746 commit 0ac0c86
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 28 deletions.
2 changes: 2 additions & 0 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public SampleResult sample(Entry entry) {
/*
* Perform the sampling
*/

// aggregate samples.
int loop = getIterationsAsInt();
result.sampleStart(); // Start timing
QueueingConsumer.Delivery delivery = null;
Expand Down
36 changes: 28 additions & 8 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.rabbitmq.client.AMQP;
import java.io.IOException;

import com.rabbitmq.client.MessageProperties;
import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.Interruptible;
import org.apache.jmeter.samplers.SampleResult;
Expand Down Expand Up @@ -37,6 +38,9 @@ public class AMQPPublisher extends AMQPSampler implements Interruptible {
public static boolean DEFAULT_PERSISTENT = false;
private final static String PERSISTENT = "AMQPConsumer.Persistent";

public static boolean DEFAULT_USE_TX = false;
private final static String USE_TX = "AMQPConsumer.UseTx";

private transient Channel channel;

public AMQPPublisher() {
Expand Down Expand Up @@ -68,22 +72,25 @@ public SampleResult sample(Entry e) {
* Perform the sampling
*/

// aggregate samples.
int loop = getIterationsAsInt();
result.sampleStart(); // Start timing
try {
AMQP.BasicProperties messageProperties = getProperties();
byte[] messageBytes = getMessageBytes();

for (int idx = 0; idx < loop; idx++) {
// try to force jms semantics.
// but this does not work since RabbitMQ does not sync to disk if consumers are connected as
// seen by iostat -cd 1. TPS value remains at 0.

//if (getPersistent()) {
// channel.txSelect();
//}
channel.basicPublish(getExchange(), getMessageRoutingKey(), getProperties(), getMessageBytes());
//if (getPersistent()) {
// channel.txCommit();
//}
channel.basicPublish(getExchange(), getMessageRoutingKey(), messageProperties, messageBytes);

}

// commit the sample.
if (getUseTx()) {
channel.txCommit();
}

/*
Expand Down Expand Up @@ -177,6 +184,14 @@ public void setPersistent(Boolean persistent) {
setProperty(PERSISTENT, persistent);
}

public Boolean getUseTx() {
return getPropertyAsBoolean(USE_TX, DEFAULT_USE_TX);
}

public void setUseTx(Boolean tx) {
setProperty(USE_TX, tx);
}

@Override
public boolean interrupt() {
cleanup();
Expand All @@ -197,9 +212,11 @@ protected void setChannel(Channel channel) {
protected AMQP.BasicProperties getProperties() {
AMQP.BasicProperties parentProps = super.getProperties();

int deliveryMode = getPersistent() ? 2 : 1;

AMQP.BasicProperties publishProperties =
new AMQP.BasicProperties(parentProps.getContentType(), parentProps.getContentEncoding(),
parentProps.getHeaders(), getPersistent() ? 2 : parentProps.getDeliveryMode(), parentProps.getPriority(),
parentProps.getHeaders(), deliveryMode, parentProps.getPriority(),
getCorrelationId(), getReplyToQueue(), parentProps.getExpiration(),
parentProps.getMessageId(), parentProps.getTimestamp(), getMessageType(),
parentProps.getUserId(), parentProps.getAppId(), parentProps.getClusterId());
Expand All @@ -209,6 +226,9 @@ protected AMQP.BasicProperties getProperties() {

protected boolean initChannel() throws IOException{
boolean ret = super.initChannel();
if (getUseTx()) {
channel.txSelect();
}
return ret;
}
}
43 changes: 23 additions & 20 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,26 +369,27 @@ public void threadStarted() {

protected Channel createChannel() throws IOException {
log.info("Creating channel " + getVirtualHost()+":"+getPortAsInt());
factory.setConnectionTimeout(getTimeoutAsInt());
factory.setVirtualHost(getVirtualHost());
factory.setHost(getHost());
factory.setPort(getPortAsInt());
factory.setUsername(getUsername());
factory.setPassword(getPassword());

log.info("RabbitMQ ConnectionFactory using:"
+"\n\t virtual host: " + getVirtualHost()
+"\n\t host: " + getHost()
+"\n\t port: " + getPort()
+"\n\t username: " + getUsername()
+"\n\t password: " + getPassword()
+"\n\t timeout: " + getTimeout()
+"\n\t heartbeat: " + factory.getRequestedHeartbeat()
+"\nin " + this
);

if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
factory.setConnectionTimeout(getTimeoutAsInt());
factory.setVirtualHost(getVirtualHost());
factory.setHost(getHost());
factory.setPort(getPortAsInt());
factory.setUsername(getUsername());
factory.setPassword(getPassword());

log.info("RabbitMQ ConnectionFactory using:"
+"\n\t virtual host: " + getVirtualHost()
+"\n\t host: " + getHost()
+"\n\t port: " + getPort()
+"\n\t username: " + getUsername()
+"\n\t password: " + getPassword()
+"\n\t timeout: " + getTimeout()
+"\n\t heartbeat: " + factory.getRequestedHeartbeat()
+"\nin " + this
);

connection = factory.newConnection();
}

Channel channel = connection.createChannel();
Expand All @@ -399,13 +400,14 @@ protected Channel createChannel() throws IOException {
}

protected void deleteQueue() throws IOException {
// use a different channel since channel closes on exception.
Channel channel = createChannel();
try {
log.info("Deleting queue " + getQueue());
channel.queueDelete(getQueue());
}
catch(Exception ex) {
ex.printStackTrace();
log.debug(ex.toString(), ex);
// ignore it.
}
finally {
Expand All @@ -416,13 +418,14 @@ protected void deleteQueue() throws IOException {
}

protected void deleteExchange() throws IOException {
// use a different channel since channel closes on exception.
Channel channel = createChannel();
try {
log.info("Deleting exchange " + getExchange());
channel.exchangeDelete(getExchange());
}
catch(Exception ex) {
ex.printStackTrace();
log.debug(ex.toString(), ex);
// ignore it.
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class AMQPPublisherGui extends AMQPSamplerGui {
private JLabeledTextField correlationId = new JLabeledTextField("Correlation Id");

private JCheckBox persistent = new JCheckBox("Persistent?", AMQPPublisher.DEFAULT_PERSISTENT);
private JCheckBox useTx = new JCheckBox("Use Transactions?", AMQPPublisher.DEFAULT_USE_TX);

public AMQPPublisherGui(){
init();
Expand Down Expand Up @@ -66,6 +67,7 @@ public void configure(TestElement element) {
AMQPPublisher sampler = (AMQPPublisher) element;

persistent.setSelected(sampler.getPersistent());
useTx.setSelected(sampler.getUseTx());

messageRoutingKey.setText(sampler.getMessageRoutingKey());
messageType.setText(sampler.getMessageType());
Expand Down Expand Up @@ -96,6 +98,7 @@ public void modifyTestElement(TestElement te) {
super.modifyTestElement(sampler);

sampler.setPersistent(persistent.isSelected());
sampler.setUseTx(useTx.isSelected());

sampler.setMessageRoutingKey(messageRoutingKey.getText());
sampler.setMessage(message.getText());
Expand All @@ -116,13 +119,15 @@ protected void setMainPanel(JPanel panel){
protected final void init() {
super.init();
persistent.setPreferredSize(new Dimension(100, 25));
useTx.setPreferredSize(new Dimension(100, 25));
messageRoutingKey.setPreferredSize(new Dimension(100, 25));
messageType.setPreferredSize(new Dimension(100, 25));
replyToQueue.setPreferredSize(new Dimension(100, 25));
correlationId.setPreferredSize(new Dimension(100, 25));
message.setPreferredSize(new Dimension(400, 150));

mainPanel.add(persistent);
mainPanel.add(useTx);
mainPanel.add(messageRoutingKey);
mainPanel.add(messageType);
mainPanel.add(replyToQueue);
Expand All @@ -137,6 +142,7 @@ protected final void init() {
public void clearGui() {
super.clearGui();
persistent.setSelected(AMQPPublisher.DEFAULT_PERSISTENT);
useTx.setSelected(AMQPPublisher.DEFAULT_USE_TX);
messageRoutingKey.setText("");
messageType.setText("");
replyToQueue.setText("");
Expand Down

0 comments on commit 0ac0c86

Please sign in to comment.