Skip to content

Commit

Permalink
Merge pull request #15 from cirrus-dev/upgrade-rabbit-library
Browse files Browse the repository at this point in the history
Upgading version of jmeter and rabbit amqp library.
  • Loading branch information
jlavallee committed Aug 29, 2014
2 parents ecb3591 + cfcf1b1 commit 073ee2b
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 16 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ lib/
/build/
/dist/
/ivy/
*.iml
.idea
6 changes: 3 additions & 3 deletions ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<conf name="runtime" extends="build" description="Libraries that need to be included with project jar" />
</configurations>
<dependencies>
<dependency org="commons-lang" name="commons-lang" rev="2.4" conf="build->default"/>
<dependency org="org.apache.commons" name="commons-lang3" rev="3.0" conf="build->default"/>
<dependency org="commons-io" name="commons-io" rev="1.4" conf="build->default"/>
<dependency org="commons-jexl" name="commons-jexl" rev="1.1" conf="build->default"/>
<dependency org="commons-codec" name="commons-codec" rev="1.4" conf="build->default"/>
Expand All @@ -15,7 +15,7 @@
<dependency org="commons-net" name="commons-net" rev="1.4.1" conf="build->default"/>
<dependency org="org.apache.jmeter" name="jorphan" rev="2.6" conf="build->default"/>
<dependency org="avalon-logkit" name="avalon-logkit" rev="2.0" conf="build->default"/>
<dependency org="com.rabbitmq" name="amqp-client" rev="3.0.4" conf="build->default"/>
<dependency org="org.apache.jmeter" name="ApacheJMeter_core" rev="2.9" conf="build->default"/>
<dependency org="com.rabbitmq" name="amqp-client" rev="3.3.4" conf="build->default"/>
<dependency org="org.apache.jmeter" name="ApacheJMeter_core" rev="2.11" conf="build->default"/>
</dependencies>
</ivy-module>
16 changes: 14 additions & 2 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ public SampleResult sample(Entry entry) {

// only do this once per thread. Otherwise it slows down the consumption by appx 50%
if (consumer == null) {
log.info("Creating consumer");
consumer = new QueueingConsumer(channel);
}
if (consumerTag == null) {
log.info("Starting basic consumer");
consumerTag = channel.basicConsume(getQueue(), autoAck(), consumer);
}
} catch (Exception ex) {
Expand Down Expand Up @@ -90,7 +92,9 @@ public SampleResult sample(Entry entry) {
* Set up the sample result details
*/
if (getReadResponseAsBoolean()) {
result.setSamplerData(new String(delivery.getBody()));
String response = new String(delivery.getBody());
result.setSamplerData(response);
result.setResponseMessage(response);
}
else {
result.setSamplerData("Read response is false.");
Expand All @@ -104,24 +108,32 @@ public SampleResult sample(Entry entry) {
result.setDataType(SampleResult.TEXT);

result.setResponseCodeOK();
result.setResponseMessage("OK");

result.setSuccessful(true);

} catch (ShutdownSignalException e) {
consumer = null;
consumerTag = null;
log.warn("AMQP consumer failed to consume", e);
result.setResponseCode("400");
result.setResponseMessage(e.getMessage());
interrupt();
} catch (ConsumerCancelledException e) {
consumer = null;
consumerTag = null;
log.warn("AMQP consumer failed to consume", e);
result.setResponseCode("300");
result.setResponseMessage(e.getMessage());
interrupt();
} catch (InterruptedException e) {
consumer = null;
consumerTag = null;
log.info("interuppted while attempting to consume");
result.setResponseCode("200");
result.setResponseMessage(e.getMessage());
} catch (IOException e) {
consumer = null;
consumerTag = null;
log.warn("AMQP consumer failed to consume", e);
result.setResponseCode("100");
result.setResponseMessage(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public SampleResult sample(Entry e) {
* Set up the sample result details
*/
result.setSamplerData(data);
result.setResponseData("OK", null);
result.setResponseData(new String(messageBytes), null);
result.setDataType(SampleResult.TEXT);

result.setResponseCodeOK();
Expand Down
20 changes: 10 additions & 10 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@
import java.util.*;
import java.security.*;

import com.rabbitmq.client.*;
import org.apache.jmeter.samplers.AbstractSampler;
import org.apache.jmeter.testelement.ThreadListener;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;

public abstract class AMQPSampler extends AbstractSampler implements ThreadListener {

Expand Down Expand Up @@ -73,7 +69,7 @@ protected boolean initChannel() throws IOException, NoSuchAlgorithmException, Ke
+ " closed unexpectedly: ", channel.getCloseReason());
channel = null; // so we re-open it below
}

if(channel == null) {
channel = createChannel();
setChannel(channel);
Expand Down Expand Up @@ -386,8 +382,6 @@ protected Channel createChannel() throws IOException, NoSuchAlgorithmException,
if (connection == null || !connection.isOpen()) {
factory.setConnectionTimeout(getTimeoutAsInt());
factory.setVirtualHost(getVirtualHost());
factory.setHost(getHost());
factory.setPort(getPortAsInt());
factory.setUsername(getUsername());
factory.setPassword(getPassword());
if (connectionSSL()) {
Expand All @@ -405,7 +399,13 @@ protected Channel createChannel() throws IOException, NoSuchAlgorithmException,
+"\nin " + this
);

connection = factory.newConnection();
String[] hosts = getHost().split(",");
Address[] addresses = new Address[hosts.length];
for (int i = 0; i < hosts.length; i++) {
addresses[i] = new Address(hosts[i], getPortAsInt());
}
log.info("Using hosts: " + Arrays.toString(hosts) + " addresses: " + Arrays.toString(addresses));
connection = factory.newConnection(addresses);
}

Channel channel = connection.createChannel();
Expand Down

0 comments on commit 073ee2b

Please sign in to comment.