Skip to content

Commit

Permalink
Make it possible to configure transactional amqp consumers from the gui.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklascarnegie committed Oct 24, 2016
1 parent f8dffce commit c2ababc
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
33 changes: 26 additions & 7 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.zeroclue.jmeter.protocol.amqp;

import java.io.IOException;
import java.security.*;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.Interruptible;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.testelement.TestStateListener;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;

public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStateListener {
private static final int DEFAULT_PREFETCH_COUNT = 0; // unlimited
Expand All @@ -32,6 +32,9 @@ public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStat
private static final String AUTO_ACK = "AMQPConsumer.AutoAck";
private static final String RECEIVE_TIMEOUT = "AMQPConsumer.ReceiveTimeout";

public static boolean DEFAULT_USE_TX = false;
private final static String USE_TX = "AMQPConsumer.UseTx";

private transient Channel channel;
private transient QueueingConsumer consumer;
private transient String consumerTag;
Expand Down Expand Up @@ -104,6 +107,11 @@ public SampleResult sample(Entry entry) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

// commit the sample.
if (getUseTx()) {
channel.txCommit();
}

result.setResponseData("OK", null);
result.setDataType(SampleResult.TEXT);

Expand Down Expand Up @@ -222,6 +230,14 @@ public int getPrefetchCountAsInt() {
return getPropertyAsInt(PREFETCH_COUNT);
}

public Boolean getUseTx() {
return getPropertyAsBoolean(USE_TX, DEFAULT_USE_TX);
}

public void setUseTx(Boolean tx) {
setProperty(USE_TX, tx);
}

/**
* set whether the sampler should read the response or not
*
Expand Down Expand Up @@ -315,6 +331,9 @@ private void trace(String s) {
protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException {
boolean ret = super.initChannel();
channel.basicQos(getPrefetchCountAsInt());
if (getUseTx()) {
channel.txSelect();
}
return ret;
}
}
16 changes: 8 additions & 8 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package com.zeroclue.jmeter.protocol.amqp;

import com.rabbitmq.client.AMQP;

import java.io.IOException;
import java.security.*;
import java.util.*;

import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.samplers.Entry;
Expand All @@ -15,7 +11,11 @@
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

/**
* JMeter creates an instance of a sampler class for every occurrence of the
Expand Down Expand Up @@ -44,10 +44,10 @@ public class AMQPPublisher extends AMQPSampler implements Interruptible {
private final static String HEADERS = "AMQPPublisher.Headers";

public static boolean DEFAULT_PERSISTENT = false;
private final static String PERSISTENT = "AMQPConsumer.Persistent";
private final static String PERSISTENT = "AMQPPublisher.Persistent";

public static boolean DEFAULT_USE_TX = false;
private final static String USE_TX = "AMQPConsumer.UseTx";
private final static String USE_TX = "AMQPPublisher.UseTx";

private transient Channel channel;

Expand Down
21 changes: 11 additions & 10 deletions src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.zeroclue.jmeter.protocol.amqp.gui;

import javax.swing.JCheckBox;
import javax.swing.JPanel;

import com.zeroclue.jmeter.protocol.amqp.AMQPConsumer;
import org.apache.jmeter.testelement.TestElement;
import org.apache.jorphan.gui.JLabeledTextField;

import com.zeroclue.jmeter.protocol.amqp.AMQPConsumer;

import javax.swing.*;
import java.awt.*;


Expand All @@ -17,9 +14,11 @@ public class AMQPConsumerGui extends AMQPSamplerGui {

protected JLabeledTextField receiveTimeout = new JLabeledTextField("Receive Timeout");
protected JLabeledTextField prefetchCount = new JLabeledTextField("Prefetch Count");

private final JCheckBox purgeQueue = new JCheckBox("Purge Queue", false);
private final JCheckBox autoAck = new JCheckBox("Auto ACK", true);
private final JCheckBox readResponse = new JCheckBox("Read Response", AMQPConsumer.DEFAULT_READ_RESPONSE);
private final JCheckBox useTx = new JCheckBox("Use Transactions?", AMQPConsumer.DEFAULT_USE_TX);

private JPanel mainPanel;

Expand All @@ -32,15 +31,15 @@ public AMQPConsumerGui(){
*/
protected void init() {
super.init();

mainPanel.add(readResponse);

prefetchCount.setPreferredSize(new Dimension(100,25));
mainPanel.add(prefetchCount);
useTx.setPreferredSize(new Dimension(100,25));

mainPanel.add(receiveTimeout);
mainPanel.add(prefetchCount);
mainPanel.add(purgeQueue);
mainPanel.add(autoAck);
mainPanel.add(readResponse);
mainPanel.add(useTx);
}

@Override
Expand All @@ -62,6 +61,7 @@ public void configure(TestElement element) {
receiveTimeout.setText(sampler.getReceiveTimeout());
purgeQueue.setSelected(sampler.purgeQueue());
autoAck.setSelected(sampler.autoAck());
useTx.setSelected(sampler.getUseTx());
}

/**
Expand All @@ -75,6 +75,7 @@ public void clearGui() {
receiveTimeout.setText("");
purgeQueue.setSelected(false);
autoAck.setSelected(true);
useTx.setSelected(AMQPConsumer.DEFAULT_USE_TX);
}

/**
Expand Down Expand Up @@ -104,7 +105,7 @@ public void modifyTestElement(TestElement te) {
sampler.setReceiveTimeout(receiveTimeout.getText());
sampler.setPurgeQueue(purgeQueue.isSelected());
sampler.setAutoAck(autoAck.isSelected());

sampler.setUseTx(useTx.isSelected());
}

/**
Expand Down

0 comments on commit c2ababc

Please sign in to comment.