diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java index 7dd6b32..2653388 100644 --- a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java +++ b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java @@ -1,8 +1,9 @@ package com.zeroclue.jmeter.protocol.amqp; -import java.io.IOException; -import java.security.*; - +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ConsumerCancelledException; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.ShutdownSignalException; import org.apache.jmeter.samplers.Entry; import org.apache.jmeter.samplers.Interruptible; import org.apache.jmeter.samplers.SampleResult; @@ -10,10 +11,9 @@ import org.apache.jorphan.logging.LoggingManager; import org.apache.log.Logger; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.ConsumerCancelledException; -import com.rabbitmq.client.QueueingConsumer; -import com.rabbitmq.client.ShutdownSignalException; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStateListener { private static final int DEFAULT_PREFETCH_COUNT = 0; // unlimited @@ -32,6 +32,9 @@ public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStat private static final String AUTO_ACK = "AMQPConsumer.AutoAck"; private static final String RECEIVE_TIMEOUT = "AMQPConsumer.ReceiveTimeout"; + public static boolean DEFAULT_USE_TX = false; + private final static String USE_TX = "AMQPConsumer.UseTx"; + private transient Channel channel; private transient QueueingConsumer consumer; private transient String consumerTag; @@ -104,6 +107,11 @@ public SampleResult sample(Entry entry) { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } + // commit the sample. + if (getUseTx()) { + channel.txCommit(); + } + result.setResponseData("OK", null); result.setDataType(SampleResult.TEXT); @@ -222,6 +230,14 @@ public int getPrefetchCountAsInt() { return getPropertyAsInt(PREFETCH_COUNT); } + public Boolean getUseTx() { + return getPropertyAsBoolean(USE_TX, DEFAULT_USE_TX); + } + + public void setUseTx(Boolean tx) { + setProperty(USE_TX, tx); + } + /** * set whether the sampler should read the response or not * @@ -315,6 +331,9 @@ private void trace(String s) { protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException { boolean ret = super.initChannel(); channel.basicQos(getPrefetchCountAsInt()); + if (getUseTx()) { + channel.txSelect(); + } return ret; } } diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java index 242de14..a1a670e 100644 --- a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java +++ b/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java @@ -1,11 +1,7 @@ package com.zeroclue.jmeter.protocol.amqp; import com.rabbitmq.client.AMQP; - -import java.io.IOException; -import java.security.*; -import java.util.*; - +import com.rabbitmq.client.Channel; import org.apache.commons.lang3.StringUtils; import org.apache.jmeter.config.Arguments; import org.apache.jmeter.samplers.Entry; @@ -15,7 +11,11 @@ import org.apache.jorphan.logging.LoggingManager; import org.apache.log.Logger; -import com.rabbitmq.client.Channel; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; /** * JMeter creates an instance of a sampler class for every occurrence of the @@ -44,10 +44,10 @@ public class AMQPPublisher extends AMQPSampler implements Interruptible { private final static String HEADERS = "AMQPPublisher.Headers"; public static boolean DEFAULT_PERSISTENT = false; - private final static String PERSISTENT = "AMQPConsumer.Persistent"; + private final static String PERSISTENT = "AMQPPublisher.Persistent"; public static boolean DEFAULT_USE_TX = false; - private final static String USE_TX = "AMQPConsumer.UseTx"; + private final static String USE_TX = "AMQPPublisher.UseTx"; private transient Channel channel; diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java b/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java index a9056dc..2353241 100644 --- a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java +++ b/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java @@ -1,13 +1,10 @@ package com.zeroclue.jmeter.protocol.amqp.gui; -import javax.swing.JCheckBox; -import javax.swing.JPanel; - +import com.zeroclue.jmeter.protocol.amqp.AMQPConsumer; import org.apache.jmeter.testelement.TestElement; import org.apache.jorphan.gui.JLabeledTextField; -import com.zeroclue.jmeter.protocol.amqp.AMQPConsumer; - +import javax.swing.*; import java.awt.*; @@ -17,9 +14,11 @@ public class AMQPConsumerGui extends AMQPSamplerGui { protected JLabeledTextField receiveTimeout = new JLabeledTextField("Receive Timeout"); protected JLabeledTextField prefetchCount = new JLabeledTextField("Prefetch Count"); + private final JCheckBox purgeQueue = new JCheckBox("Purge Queue", false); private final JCheckBox autoAck = new JCheckBox("Auto ACK", true); private final JCheckBox readResponse = new JCheckBox("Read Response", AMQPConsumer.DEFAULT_READ_RESPONSE); + private final JCheckBox useTx = new JCheckBox("Use Transactions?", AMQPConsumer.DEFAULT_USE_TX); private JPanel mainPanel; @@ -32,15 +31,15 @@ public AMQPConsumerGui(){ */ protected void init() { super.init(); - - mainPanel.add(readResponse); - prefetchCount.setPreferredSize(new Dimension(100,25)); - mainPanel.add(prefetchCount); + useTx.setPreferredSize(new Dimension(100,25)); mainPanel.add(receiveTimeout); + mainPanel.add(prefetchCount); mainPanel.add(purgeQueue); mainPanel.add(autoAck); + mainPanel.add(readResponse); + mainPanel.add(useTx); } @Override @@ -62,6 +61,7 @@ public void configure(TestElement element) { receiveTimeout.setText(sampler.getReceiveTimeout()); purgeQueue.setSelected(sampler.purgeQueue()); autoAck.setSelected(sampler.autoAck()); + useTx.setSelected(sampler.getUseTx()); } /** @@ -75,6 +75,7 @@ public void clearGui() { receiveTimeout.setText(""); purgeQueue.setSelected(false); autoAck.setSelected(true); + useTx.setSelected(AMQPConsumer.DEFAULT_USE_TX); } /** @@ -104,7 +105,7 @@ public void modifyTestElement(TestElement te) { sampler.setReceiveTimeout(receiveTimeout.getText()); sampler.setPurgeQueue(purgeQueue.isSelected()); sampler.setAutoAck(autoAck.isSelected()); - + sampler.setUseTx(useTx.isSelected()); } /**