Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions comm/sensorhub-comm-mqtt/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,45 @@
description = 'MQTT Service API'
ext.details = 'Common API for MQTT service implementations'
description = 'MQTT API & Comm Provider'
ext.details = 'Common API for MQTT service implementations and comm provider to publish/subscribe on MQTT topics'
version = '1.0.1'

dependencies {
implementation 'org.sensorhub:sensorhub-core:' + oshCoreVersion
implementation 'org.sensorhub:sensorhub-core:' + oshCoreVersion
embeddedImpl 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'

testImplementation('junit:junit:4.13.1')

}

test {
useJUnit()
}

osgi {
manifest {
attributes ('Bundle-Vendor': 'Botts Innovative Research, Inc.')
attributes ('Bundle-Activator': 'com.botts.impl.comm.mqtt.Activator')
}
}

ext.pom >>= {
developers {
developer {
id 'alexrobin'
name 'Alex Robin'
organization 'Sensia Software LLC'
organizationUrl 'http://www.sensiasoftware.com'
}
developer {
id 'earocorn'
name 'Alex Almanza'
organization 'Botts Innovative Research, Inc'
organizationUrl 'https://www.botts-inc.com'
}
developer {
id 'kalynstricklin'
name 'Kalyn Stricklin'
organization 'Botts Innovative Research, Inc'
organizationUrl 'https://www.botts-inc.com'
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.botts.impl.comm.mqtt;

import org.sensorhub.utils.OshBundleActivator;

public class Activator extends OshBundleActivator {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package com.botts.impl.comm.mqtt;


import org.eclipse.paho.client.mqttv3.*;
import org.sensorhub.api.comm.IMessageQueuePush;
import org.sensorhub.api.common.SensorHubException;
import org.sensorhub.impl.module.AbstractSubModule;
import org.vast.util.Asserts;

import javax.net.ssl.SSLSocketFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;


public class MqttMessageQueue extends AbstractSubModule<MqttMessageQueueConfig> implements IMessageQueuePush<MqttMessageQueueConfig> {
private final Set<MessageListener> listeners = new CopyOnWriteArraySet<>();

MqttClient mqttClient;

public final static String QUALITY_OF_SERVICE = "qos";
public final static String TOPIC_NAME = "topic";
public final static String RETAINED = "retained";

/**
* @param config
* @throws SensorHubException
*/
@Override
public void init(MqttMessageQueueConfig config) throws SensorHubException {
super.init(config);

Asserts.checkNotNull(config.brokerAddress, "Must specify broker address in config");
Asserts.checkNotNull(config.protocol, "Must specify protocol in config");
Asserts.checkNotNull(config.topicName, "Must specify topic name in config");
Asserts.checkNotNull(config.clientId, "Must specify client id in config");
}

/**
* @param config
* @param protocol
* @return
*/
private static MqttConnectOptions getConnectOptions(MqttMessageQueueConfig config, String protocol) {

MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(config.cleanSession);
connectOptions.setKeepAliveInterval(config.keepAlive);
connectOptions.setConnectionTimeout(config.connectionTimeout);
connectOptions.setAutomaticReconnect(config.isAutoReconnect);

// auth
if (config.username != null && config.username.trim().length() != 0)
connectOptions.setUserName(config.username);

if (config.password != null && config.password.trim().length() != 0)
connectOptions.setPassword(config.password.toCharArray());

if (protocol.equals("wss") || protocol.equals("ssl"))
connectOptions.setSocketFactory(SSLSocketFactory.getDefault());

return connectOptions;
}

/**
*
*/
@Override
public void start() throws SensorHubException{

String protocol = config.protocol.getName();
String brokerAddress = config.brokerAddress;
String clientId = config.clientId;
try {
mqttClient = new MqttClient(protocol + "://" + brokerAddress, clientId);

MqttConnectOptions connectOptions = getConnectOptions(config, protocol);
mqttClient.connect(connectOptions);

} catch (MqttException e) {
throw new RuntimeException("MQTT client failed to connect", e);
}

mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
if (!mqttClient.isConnected()) {
getLogger().debug("MQTT client connection lost");
try {
getLogger().debug("MQTT client reconnecting");
mqttClient.reconnect();
} catch (MqttException e) {
throw new RuntimeException("MQTT Client connection interrupted: ", e);
}
}
}

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
Map<String, String> attributes = new HashMap<>();
attributes.put(TOPIC_NAME, topic);
attributes.put(QUALITY_OF_SERVICE, String.valueOf(mqttMessage.getQos()));
attributes.put(RETAINED, String.valueOf(mqttMessage.isRetained()));

for (MessageListener listener : listeners) {
listener.receive(attributes, mqttMessage.getPayload());
}
}

@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
getLogger().debug("managed to deliver: ", iMqttDeliveryToken.isComplete());
}
});


if (config.enableSubscribe) {
try {
getLogger().info("Subscribed to topic: {}", config.topicName);
mqttClient.subscribe(config.topicName, config.qos.getValue());
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}


/**
*
* @throws SensorHubException
*/
@Override
public void stop() throws SensorHubException{

if (mqttClient == null)
return;

try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}

/**
*
* @param payload
*/
@Override
public void publish(byte[] payload) {
publish(null, payload);
}

/**
*
* @param attrs
* @param payload
*/
@Override
public void publish(Map<String, String> attrs, byte[] payload) {

if (!config.enablePublish)
return;

MqttMessage mqttMessage = new MqttMessage(payload);

if (attrs != null) {
var qosAttr = attrs.get(QUALITY_OF_SERVICE);

if (qosAttr != null) {
int qos = Integer.parseInt(qosAttr);
if (qos >= 0 && qos < MqttMessageQueueConfig.QoS.values().length) {
mqttMessage.setQos(qos);
} else {
mqttMessage.setQos(config.qos.getValue());
}
}

var retainedAttr = attrs.get(RETAINED);
mqttMessage.setRetained(retainedAttr != null ? Boolean.parseBoolean(qosAttr) : config.retain);
}

try {
mqttClient.publish(config.topicName, mqttMessage);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}

/**
*
* @param listener
*/
@Override
public void registerListener(MessageListener listener) {
listeners.add(listener);
}

/**
*
* @param listener
*/
@Override
public void unregisterListener(MessageListener listener) {
listeners.remove(listener);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.botts.impl.comm.mqtt;

import org.sensorhub.api.comm.MessageQueueConfig;
import org.sensorhub.api.config.DisplayInfo;

public class MqttMessageQueueConfig extends MessageQueueConfig {

@DisplayInfo.Required
@DisplayInfo(label="Client", desc="A unique identifier for client, used by the broker to track connections")
public String clientId;

@DisplayInfo.Required
@DisplayInfo(label="Protocol", desc="")
public Protocol protocol = Protocol.TCP;

@DisplayInfo.Required
@DisplayInfo(label="Broker Address", desc="The hostname or IP address and port of the MQTT Broker (e.g. localhost:8282/sensorhub/mqtt")
public String brokerAddress;

@DisplayInfo.Required
@DisplayInfo(label="Quality of Service", desc="Determines the reliability of the message delivery (0,1,2)")
public QoS qos;

@DisplayInfo(label="Username", desc="An optional username if needed for connecting to MQTT Broker")
public String username;

@DisplayInfo(label="Password", desc="An optional password if needed for connecting to MQTT Broker")
public String password;

@DisplayInfo(label="Retain", desc="Check to allow MQTT broker to store the last message sent on the specific topic")
public boolean retain;

@DisplayInfo(label="Clean Session", desc="Check ")
public boolean cleanSession;

@DisplayInfo(label="Keep Alive Interval", desc="")
public int keepAlive = 60;

@DisplayInfo(label="Connection Timeout", desc="")
public int connectionTimeout = 10;

@DisplayInfo(label="Automatic Reconnect", desc="")
public boolean isAutoReconnect = true;

public enum QoS {
AT_MOST_ONCE(0),
AT_LEAST_ONCE(1),
EXACTLY_ONCE(2);

final int value;
QoS(int value){ this.value = value; }
public int getValue(){ return value; }
}

public enum Protocol {
WS("ws"),
WSS("wss"),
TCP("tcp"),
SSL("ssl");

final String protocol;
Protocol(String protocol) { this.protocol = protocol; }
public String getName() { return protocol; }
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.botts.impl.comm.mqtt;

import org.sensorhub.api.module.*;
import org.sensorhub.impl.module.JarModuleProvider;

public class MqttMessageQueueDescriptor extends JarModuleProvider implements IModuleProvider
{
@Override
public String getModuleName()
{
return "Mqtt Message Queue";
}


@Override
public String getModuleDescription()
{
return "Communication protocol that can publish/subscribe to MQTT topics";
}


@Override
public Class<? extends IModuleBase<?>> getModuleClass()
{
return MqttMessageQueue.class;
}


@Override
public Class<? extends ModuleConfigBase> getModuleConfigClass()
{
return MqttMessageQueueConfig.class;
}
}
Loading