diff --git a/comm/sensorhub-comm-mqtt/build.gradle b/comm/sensorhub-comm-mqtt/build.gradle index fc0cbfd3a..a36f0e8c4 100644 --- a/comm/sensorhub-comm-mqtt/build.gradle +++ b/comm/sensorhub-comm-mqtt/build.gradle @@ -1,6 +1,45 @@ -description = 'MQTT Service API' -ext.details = 'Common API for MQTT service implementations' +description = 'MQTT API & Comm Provider' +ext.details = 'Common API for MQTT service implementations and comm provider to publish/subscribe on MQTT topics' +version = '1.0.1' dependencies { - implementation 'org.sensorhub:sensorhub-core:' + oshCoreVersion + implementation 'org.sensorhub:sensorhub-core:' + oshCoreVersion + embeddedImpl 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' + + testImplementation('junit:junit:4.13.1') + +} + +test { + useJUnit() } + +osgi { + manifest { + attributes ('Bundle-Vendor': 'Botts Innovative Research, Inc.') + attributes ('Bundle-Activator': 'com.botts.impl.comm.mqtt.Activator') + } +} + +ext.pom >>= { + developers { + developer { + id 'alexrobin' + name 'Alex Robin' + organization 'Sensia Software LLC' + organizationUrl 'http://www.sensiasoftware.com' + } + developer { + id 'earocorn' + name 'Alex Almanza' + organization 'Botts Innovative Research, Inc' + organizationUrl 'https://www.botts-inc.com' + } + developer { + id 'kalynstricklin' + name 'Kalyn Stricklin' + organization 'Botts Innovative Research, Inc' + organizationUrl 'https://www.botts-inc.com' + } + } +} \ No newline at end of file diff --git a/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/Activator.java b/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/Activator.java new file mode 100644 index 000000000..d22a20f9a --- /dev/null +++ b/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/Activator.java @@ -0,0 +1,6 @@ +package com.botts.impl.comm.mqtt; + +import org.sensorhub.utils.OshBundleActivator; + +public class Activator extends OshBundleActivator { +} diff --git a/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/MqttMessageQueue.java b/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/MqttMessageQueue.java new file mode 100644 index 000000000..80608ca00 --- /dev/null +++ b/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/MqttMessageQueue.java @@ -0,0 +1,210 @@ +package com.botts.impl.comm.mqtt; + + +import org.eclipse.paho.client.mqttv3.*; +import org.sensorhub.api.comm.IMessageQueuePush; +import org.sensorhub.api.common.SensorHubException; +import org.sensorhub.impl.module.AbstractSubModule; +import org.vast.util.Asserts; + +import javax.net.ssl.SSLSocketFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + + +public class MqttMessageQueue extends AbstractSubModule implements IMessageQueuePush { + private final Set listeners = new CopyOnWriteArraySet<>(); + + MqttClient mqttClient; + + public final static String QUALITY_OF_SERVICE = "qos"; + public final static String TOPIC_NAME = "topic"; + public final static String RETAINED = "retained"; + + /** + * @param config + * @throws SensorHubException + */ + @Override + public void init(MqttMessageQueueConfig config) throws SensorHubException { + super.init(config); + + Asserts.checkNotNull(config.brokerAddress, "Must specify broker address in config"); + Asserts.checkNotNull(config.protocol, "Must specify protocol in config"); + Asserts.checkNotNull(config.topicName, "Must specify topic name in config"); + Asserts.checkNotNull(config.clientId, "Must specify client id in config"); + } + + /** + * @param config + * @param protocol + * @return + */ + private static MqttConnectOptions getConnectOptions(MqttMessageQueueConfig config, String protocol) { + + MqttConnectOptions connectOptions = new MqttConnectOptions(); + connectOptions.setCleanSession(config.cleanSession); + connectOptions.setKeepAliveInterval(config.keepAlive); + connectOptions.setConnectionTimeout(config.connectionTimeout); + connectOptions.setAutomaticReconnect(config.isAutoReconnect); + + // auth + if (config.username != null && config.username.trim().length() != 0) + connectOptions.setUserName(config.username); + + if (config.password != null && config.password.trim().length() != 0) + connectOptions.setPassword(config.password.toCharArray()); + + if (protocol.equals("wss") || protocol.equals("ssl")) + connectOptions.setSocketFactory(SSLSocketFactory.getDefault()); + + return connectOptions; + } + + /** + * + */ + @Override + public void start() throws SensorHubException{ + + String protocol = config.protocol.getName(); + String brokerAddress = config.brokerAddress; + String clientId = config.clientId; + try { + mqttClient = new MqttClient(protocol + "://" + brokerAddress, clientId); + + MqttConnectOptions connectOptions = getConnectOptions(config, protocol); + mqttClient.connect(connectOptions); + + } catch (MqttException e) { + throw new RuntimeException("MQTT client failed to connect", e); + } + + mqttClient.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable throwable) { + if (!mqttClient.isConnected()) { + getLogger().debug("MQTT client connection lost"); + try { + getLogger().debug("MQTT client reconnecting"); + mqttClient.reconnect(); + } catch (MqttException e) { + throw new RuntimeException("MQTT Client connection interrupted: ", e); + } + } + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + Map attributes = new HashMap<>(); + attributes.put(TOPIC_NAME, topic); + attributes.put(QUALITY_OF_SERVICE, String.valueOf(mqttMessage.getQos())); + attributes.put(RETAINED, String.valueOf(mqttMessage.isRetained())); + + for (MessageListener listener : listeners) { + listener.receive(attributes, mqttMessage.getPayload()); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + getLogger().debug("managed to deliver: ", iMqttDeliveryToken.isComplete()); + } + }); + + + if (config.enableSubscribe) { + try { + getLogger().info("Subscribed to topic: {}", config.topicName); + mqttClient.subscribe(config.topicName, config.qos.getValue()); + } catch (MqttException e) { + throw new RuntimeException(e); + } + } + } + + + /** + * + * @throws SensorHubException + */ + @Override + public void stop() throws SensorHubException{ + + if (mqttClient == null) + return; + + try { + mqttClient.disconnect(); + mqttClient.close(); + } catch (MqttException e) { + throw new RuntimeException(e); + } + } + + /** + * + * @param payload + */ + @Override + public void publish(byte[] payload) { + publish(null, payload); + } + + /** + * + * @param attrs + * @param payload + */ + @Override + public void publish(Map attrs, byte[] payload) { + + if (!config.enablePublish) + return; + + MqttMessage mqttMessage = new MqttMessage(payload); + + if (attrs != null) { + var qosAttr = attrs.get(QUALITY_OF_SERVICE); + + if (qosAttr != null) { + int qos = Integer.parseInt(qosAttr); + if (qos >= 0 && qos < MqttMessageQueueConfig.QoS.values().length) { + mqttMessage.setQos(qos); + } else { + mqttMessage.setQos(config.qos.getValue()); + } + } + + var retainedAttr = attrs.get(RETAINED); + mqttMessage.setRetained(retainedAttr != null ? Boolean.parseBoolean(qosAttr) : config.retain); + } + + try { + mqttClient.publish(config.topicName, mqttMessage); + } catch (MqttException e) { + throw new RuntimeException(e); + } + } + + /** + * + * @param listener + */ + @Override + public void registerListener(MessageListener listener) { + listeners.add(listener); + } + + /** + * + * @param listener + */ + @Override + public void unregisterListener(MessageListener listener) { + listeners.remove(listener); + } + +} diff --git a/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/MqttMessageQueueConfig.java b/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/MqttMessageQueueConfig.java new file mode 100644 index 000000000..45af57b49 --- /dev/null +++ b/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/MqttMessageQueueConfig.java @@ -0,0 +1,67 @@ +package com.botts.impl.comm.mqtt; + +import org.sensorhub.api.comm.MessageQueueConfig; +import org.sensorhub.api.config.DisplayInfo; + +public class MqttMessageQueueConfig extends MessageQueueConfig { + + @DisplayInfo.Required + @DisplayInfo(label="Client", desc="A unique identifier for client, used by the broker to track connections") + public String clientId; + + @DisplayInfo.Required + @DisplayInfo(label="Protocol", desc="") + public Protocol protocol = Protocol.TCP; + + @DisplayInfo.Required + @DisplayInfo(label="Broker Address", desc="The hostname or IP address and port of the MQTT Broker (e.g. localhost:8282/sensorhub/mqtt") + public String brokerAddress; + + @DisplayInfo.Required + @DisplayInfo(label="Quality of Service", desc="Determines the reliability of the message delivery (0,1,2)") + public QoS qos; + + @DisplayInfo(label="Username", desc="An optional username if needed for connecting to MQTT Broker") + public String username; + + @DisplayInfo(label="Password", desc="An optional password if needed for connecting to MQTT Broker") + public String password; + + @DisplayInfo(label="Retain", desc="Check to allow MQTT broker to store the last message sent on the specific topic") + public boolean retain; + + @DisplayInfo(label="Clean Session", desc="Check ") + public boolean cleanSession; + + @DisplayInfo(label="Keep Alive Interval", desc="") + public int keepAlive = 60; + + @DisplayInfo(label="Connection Timeout", desc="") + public int connectionTimeout = 10; + + @DisplayInfo(label="Automatic Reconnect", desc="") + public boolean isAutoReconnect = true; + + public enum QoS { + AT_MOST_ONCE(0), + AT_LEAST_ONCE(1), + EXACTLY_ONCE(2); + + final int value; + QoS(int value){ this.value = value; } + public int getValue(){ return value; } + } + + public enum Protocol { + WS("ws"), + WSS("wss"), + TCP("tcp"), + SSL("ssl"); + + final String protocol; + Protocol(String protocol) { this.protocol = protocol; } + public String getName() { return protocol; } + } + +} + diff --git a/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/MqttMessageQueueDescriptor.java b/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/MqttMessageQueueDescriptor.java new file mode 100644 index 000000000..33c43dc03 --- /dev/null +++ b/comm/sensorhub-comm-mqtt/src/main/java/com/botts/impl/comm/mqtt/MqttMessageQueueDescriptor.java @@ -0,0 +1,34 @@ +package com.botts.impl.comm.mqtt; + +import org.sensorhub.api.module.*; +import org.sensorhub.impl.module.JarModuleProvider; + +public class MqttMessageQueueDescriptor extends JarModuleProvider implements IModuleProvider +{ + @Override + public String getModuleName() + { + return "Mqtt Message Queue"; + } + + + @Override + public String getModuleDescription() + { + return "Communication protocol that can publish/subscribe to MQTT topics"; + } + + + @Override + public Class> getModuleClass() + { + return MqttMessageQueue.class; + } + + + @Override + public Class getModuleConfigClass() + { + return MqttMessageQueueConfig.class; + } +} diff --git a/comm/sensorhub-comm-mqtt/src/test/java/MqttTest.java b/comm/sensorhub-comm-mqtt/src/test/java/MqttTest.java new file mode 100644 index 000000000..3a9debeba --- /dev/null +++ b/comm/sensorhub-comm-mqtt/src/test/java/MqttTest.java @@ -0,0 +1,69 @@ +import com.botts.impl.comm.mqtt.MqttMessageQueue; +import com.botts.impl.comm.mqtt.MqttMessageQueueConfig; +import org.eclipse.paho.client.mqttv3.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.sensorhub.api.comm.IMessageQueuePush; +import org.sensorhub.api.common.SensorHubException; + +import javax.net.ssl.SSLSocketFactory; +import javax.validation.constraints.AssertTrue; +import java.util.HashMap; +import java.util.Map; + +import static com.botts.impl.comm.mqtt.MqttMessageQueue.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MqttTest { + + MqttMessageQueue messageQueue = new MqttMessageQueue(); + + + public MqttMessageQueueConfig getMqttMessageQueueConfig() { + MqttMessageQueueConfig config = new MqttMessageQueueConfig(); + config.protocol = MqttMessageQueueConfig.Protocol.TCP; + config.brokerAddress = "mqtt.meshtastic.org:1883"; + config.clientId = "osh-test"; + config.cleanSession = true; + config.keepAlive = 60; + config.connectionTimeout = 1000; + config.isAutoReconnect = true; + config.username = "meshdev"; + config.password = "large4cats"; + config.topicName = "msh/US/2/json/"; + + return config; + } + @Test + public void testPublishToMessageQueue() throws SensorHubException { + + var config = getMqttMessageQueueConfig(); + + messageQueue.init(config); + messageQueue.start(); + + + var payload = new byte[0]; + messageQueue.publish(payload); + + } + + @Test + public void testPublish2() throws SensorHubException { + Map attributes = new HashMap<>(); + attributes.put("qos", "0"); + attributes.put("retained", "true"); + + var config = getMqttMessageQueueConfig(); + + messageQueue.init(config); + messageQueue.start(); + var payload = new byte[0]; + + messageQueue.publish(attributes, payload); + } + + +} \ No newline at end of file