Skip to content

Mqtt on demand

cflurin edited this page Jan 12, 2019 · 2 revisions

A mqtt on demand dsm.

  1. Set your broker options in the dsm configuration > data.options.
  2. Send a message (hello mqtt) to a mqtt-out node (Topic: test/ondemand, QoS: 2, Retain: true)
  3. Click the inject node with the Topic: test/ondemand

mqtt

Configuration

{
    "triggerInput": "trigger",
    "stateOutput": "current",
    "currentState": "disconnected",
    "states": {
        "disconnected": {
            "connect": "connected"
        },
        "connected": {
            "get_message": "disconnected",
            "disconnect": "disconnected"
        }
    },
    "data": {
        "options": {
            "host": "",
            "port": 1883,
            "username": "",
            "password": ""
        }
    },
    "methods": {
        "init": [
            "sm.mqtt = require('mqtt');",
            "sta.fill = 'yellow';",
            "sta.shape = 'ring';"
        ],
        "onBeforeTransition": [
            "if (!msg.trigger) msg.trigger = 'connect';",
            "if (msg.payload.options) sm.data.options = msg.payload.options;"
        ],
        "connect": [
            "var options = sm.data.options;",
            "var topic = msg.topic;",
            "timeout.id = setTimeout(function () {",
            "   if (!sm.client.connected) {",
            "       node.error('connect error');",
            "       resume('disconnect', msg);",
            "   };",
            "}, 2000);",
            "try {",
            "   sm.client = sm.mqtt.connect(options);",
            "   sm.client.on('connect', function () {",
            "       clearTimeout(timeout.id);",
            "       sm.client.subscribe(topic, function (error) {",
            "           if (!error) {",
            "               resume('get_message', msg);",
            "           } else {",
            "               node.error(error);",
            "               resume('disconnect', msg);",
            "           }",
            "       });",
            "       sm.fill = 'green';",
            "       sm.shape = 'dot';",
            "   });",
            "   sm.client.on('close', function () {",
            "       resume('disconnect', msg);",
            "   });",
            "   sm.client.on('error', function (error) {",
            "       node.error(error);",
            "       resume('disconnect', msg);",
            "   });",
            "} catch (err){",
            "   node.error(err);",
            "   resume('disconnect', msg);",
            "}",
            "output = false;"
        ],
        "get_message": [
            "var topic = msg.topic;",
            "sm.client.on('message', function (topic, buffer) {",
            "   msg.payload = buffer.toString();",
            "   node.send(msg);",
            "   resume('disconnect', msg);",
            "});",
            "output = false;"
        ],
        "disconnect" : [
            "sm.client.end();",
            "sm.fill = 'yellow';",
            "sm.shape = 'ring';",
            "output = false;"
        ],
        "status": {
            "fill": {
                "get": "sm.fill;"
            },
            "shape": {
                "get": "sm.shape;"
            },
            "text": {
                "get": "sm.currentState;"
            }
        }
    }
}

Flow

[{"id":"15e599ec.c22a26","type":"inject","z":"ac4aa9f6.c24288","name":"","topic":"","payload":"hello mqtt","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":120,"y":1480,"wires":[["2cf21620.69b9aa"]]},{"id":"2cf21620.69b9aa","type":"mqtt out","z":"ac4aa9f6.c24288","name":"","topic":"test/ondemand","qos":"2","retain":"true","broker":"325f5be5.555734","x":320,"y":1480,"wires":[]},{"id":"4f8bb985.cfee38","type":"debug","z":"ac4aa9f6.c24288","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":450,"y":1580,"wires":[]},{"id":"d05821f7.f287","type":"inject","z":"ac4aa9f6.c24288","name":"","topic":"test/ondemand","payload":"","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":140,"y":1580,"wires":[["8683d17e.11d9a"]]},{"id":"745ab32.25b384c","type":"comment","z":"ac4aa9f6.c24288","name":"set your mqtt-brocker options","info":"","x":360,"y":1540,"wires":[]},{"id":"8683d17e.11d9a","type":"dsm","z":"ac4aa9f6.c24288","name":"mqtt","sm_config":"{\n    \"triggerInput\": \"trigger\",\n    \"stateOutput\": \"current\",\n    \"currentState\": \"disconnected\",\n    \"states\": {\n        \"disconnected\": {\n            \"connect\": \"connected\"\n        },\n        \"connected\": {\n            \"get_message\": \"disconnected\",\n            \"disconnect\": \"disconnected\"\n        }\n    },\n    \"data\": {\n        \"options\": {\n            \"host\": \"\",\n            \"port\": 1883,\n            \"username\": \"\",\n            \"password\": \"\"\n        }\n    },\n    \"methods\": {\n        \"init\": [\n            \"sm.mqtt = require('mqtt');\",\n            \"sta.fill = 'yellow';\",\n            \"sta.shape = 'ring';\"\n        ],\n        \"onBeforeTransition\": [\n            \"if (!msg.trigger) msg.trigger = 'connect';\",\n            \"if (msg.payload.options) sm.data.options = msg.payload.options;\"\n        ],\n        \"connect\": [\n            \"var options = sm.data.options;\",\n            \"var topic = msg.topic;\",\n            \"timeout.id = setTimeout(function () {\",\n            \"   if (!sm.client.connected) {\",\n            \"       node.error('connect error');\",\n            \"       resume('disconnect', msg);\",\n            \"   };\",\n            \"}, 2000);\",\n            \"try {\",\n            \"   sm.client = sm.mqtt.connect(options);\",\n            \"   sm.client.on('connect', function () {\",\n            \"       clearTimeout(timeout.id);\",\n            \"       sm.client.subscribe(topic, function (error) {\",\n            \"           if (!error) {\",\n            \"               resume('get_message', msg);\",\n            \"           } else {\",\n            \"               node.error(error);\",\n            \"               resume('disconnect', msg);\",\n            \"           }\",\n            \"       });\",\n            \"       sm.fill = 'green';\",\n            \"       sm.shape = 'dot';\",\n            \"   });\",\n            \"   sm.client.on('close', function () {\",\n            \"       resume('disconnect', msg);\",\n            \"   });\",\n            \"   sm.client.on('error', function (error) {\",\n            \"       node.error(error);\",\n            \"       resume('disconnect', msg);\",\n            \"   });\",\n            \"} catch (err){\",\n            \"   node.error(err);\",\n            \"   resume('disconnect', msg);\",\n            \"}\",\n            \"output = false;\"\n        ],\n        \"get_message\": [\n            \"var topic = msg.topic;\",\n            \"sm.client.on('message', function (topic, buffer) {\",\n            \"   msg.payload = buffer.toString();\",\n            \"   node.send(msg);\",\n            \"   resume('disconnect', msg);\",\n            \"});\",\n            \"output = false;\"\n        ],\n        \"disconnect\" : [\n            \"sm.client.end();\",\n            \"sm.fill = 'yellow';\",\n            \"sm.shape = 'ring';\",\n            \"output = false;\"\n        ],\n        \"status\": {\n            \"fill\": {\n                \"get\": \"sm.fill;\"\n            },\n            \"shape\": {\n                \"get\": \"sm.shape;\"\n            },\n            \"text\": {\n                \"get\": \"sm.currentState;\"\n            }\n        }\n    }\n}\n","x":300,"y":1580,"wires":[["4f8bb985.cfee38"]]},{"id":"325f5be5.555734","type":"mqtt-broker","z":"","name":"","broker":"192.168.0.35","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]
Clone this wiki locally