forked from wac81/IM_mqtt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqttserver_cluster.js
155 lines (135 loc) · 4.41 KB
/
mqttserver_cluster.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/**
* Created by wuanc on 2015/10/1.
*/
var cluster = require('cluster');
var http=require('http');
var cpu_num = require('os').cpus().length;
if (cluster.isMaster) {
console.log("==start main process==");
console.log("==cpu_num=="+cpu_num);
for (var i = 0; i < 4; i++) {
cluster.fork();
//根据cpu的数量主进程 fork 了相同数量的子进程出来
}
cluster.on('listening',function(worker,address){
console.log('listening: worker ' + worker.process.pid);
});
cluster.on('exit', function(worker, code, signal) {
console.log('exit worker ' + worker.process.pid + ' died');
});
} else {
/**
* Created by wac on 9/29/15.
*/
var mosca = require('mosca');
var mongoUrl = 'mongodb://localhost:27017/mqtt';
//mongodb
var ascoltatore = {
//using ascoltatore
type: 'mongo',
url: mongoUrl,
pubsubCollection: 'ascoltatori',
mongo: {}
};
//zmq
//var ascoltatore = {
// type: 'zmq',
// json: false,
// zmq: require("zmq"),
// port: "tcp://127.0.0.1:33333",
// controlPort: "tcp://127.0.0.1:33334",
// delay: 5
//};
var settings = {
http: {
port: 5000,
bundle: true,
static: './'
},
persistence: {
factory: mosca.persistence.Mongo,
url: mongoUrl,
},
maxInflightMessages:300,
publishClientDisconnect:true,
backend: ascoltatore
};
// mqtt protocol
//var MqttServer = new mosca.Server({
// port: 8000
//});
//for websocket protocol
//var MqttServer = new mosca.Server({
// http: {
// port: 5000,
// bundle: true,
// static: './'
// }
//});
var MqttServer = new mosca.Server(settings);
MqttServer.on('clientConnected', function (client) {
console.log('client connected', client.id);
});
/**
* Listen MQTT topic messages
**/
MqttServer.on('published', function (packet, client) {
var topic = packet.topic;
switch (topic) {
case 'pubMsg':
console.log('message-publish', packet.payload.toString());
//MQTT转发主题消息
MqttServer.publish({topic: 'other', payload: 'sssss'});
//发送消息NODEJS
console.log('HD: ' + YHSocketMap.get('1000'));
//发送socket.io消息
//io.sockets.socket(YHSocketMap.get('1000')).emit('subState', packet);
break;
case 'other':
console.log('message-123', packet.payload.toString());
break;
}
console.log("Published :=", packet.payload);
});
MqttServer.on("error", function (err) {
console.log(err);
});
MqttServer.on('subscribed', function (topic, client) {
console.log("Subscribed :=", client.id);
});
MqttServer.on('clientDisconnected', function (client) {
console.log('Client Disconnected := ', client.id);
});
MqttServer.on('ready', function () {
//MqttServer.authenticate = authenticate;
//MqttServer.authorizePublish = authorizePublish;
//MqttServer.authorizeSubscribe = authorizeSubscribe;
console.log('mqtt is running...');
});
/**
* authenticate example
* https://github.com/mcollina/mosca/wiki/Authentication-&-Authorization
**/
var authenticate = function (client, username, password, callback) {
//var authorized = (username.toString() === '18FE34F48379-DC' && password.toString() === '666666');
var authorized = (username === 'alice' && password.toString() == 'secret' );
//if (authorized)
//client.user= username;
callback(null, authorized);
}
// the username from the topic and verifing it is the same of the authorized user
var authorizePublish = function (client, topic, payload, callback) {
//var ok = client.user == topic.split('/')[2];
// we can alter the message here
//if (ok) callback(null, payload);
//else callback(null, false);
callback(null, true);
}
// In this case the client authorized as alice can subscribe to /users/alice taking
// the username from the topic and verifing it is the same of the authorized user
var authorizeSubscribe = function (client, topic, callback) {
//var ok = client.user === topic.split('/')[2];
//callback(null, ok);
callback(null, true);
}
}