Skip to content

Commit

Permalink
## v0.2.21
Browse files Browse the repository at this point in the history
- amqplib ^0.7.1 => ^0.8.0
- node 12 => 16.17.1
  • Loading branch information
sponzillo committed Oct 18, 2022
1 parent 1b9bb41 commit 40cfc6c
Show file tree
Hide file tree
Showing 9 changed files with 1,382 additions and 1,264 deletions.
35 changes: 31 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@

**npm @chat21/chat21-server@0.2.20**

available on:
▶️ https://www.npmjs.com/package/@chat21/chat21-server

## v0.2.21 - online
- amqplib ^0.7.1 => ^0.8.0
- node 12 => 16.17.1

## v0.2.20
- added process.exit(0) on "[AMQP] channel error". It lets the server to silently restart on blocking AMQP errors.

## v0.2.19
- ack in sendMessageToGroupMembers() sent immediately.
- added group_id as memebr of inlineGroup.

## v0.2.18
- Added inlineGroup management. You can create on the fly group just sending a message with the "group.members" attribute.

## v0.2.17
- "ack" management improvements

Expand Down Expand Up @@ -53,10 +73,17 @@ to only enable "messages" queue.
## v0.1.13 npm online

- bugfix:
this: if (inbox_of === outgoing_message.sender) {
became: if (inbox_of === group.uid) { // choosing one member, the group ("volatile" member), for the "status=SENT", used by the "message-sent" webhook
this:
if (inbox_of === outgoing_message.sender) {
became:
if (inbox_of === group.uid) {
logger.debug("inbox_of === outgoing_message.sender. status=SENT system YES?", inbox_of);
outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT;
}
// choosing one member, the group ("volatile" member), for the "status=SENT", used by the "message-sent" webhook
// achived changing the delivered status to SENT "on-the-fly" when I deliver the message to the group id. This
will trigger the webhookSentOrDelivered to "Sent" only

If "system" sends info messages and he is not member of the group, webhooks are never called.
The "message-sent" webhook is called only once: when, iterating all the members, the selected one is the same as the group.
This because the "message-sent" must be called only once per message. The "sender" can't be used, because the "sender" not always
is a group's member (ex. info messages by system while system is not always a member of the group).
This because the "message-sent" must be called only once per message. The "sender" can't be used, because the "sender" not always is a group's member (ex. info messages by system while system is not always a member of the group).
61 changes: 40 additions & 21 deletions benchmarks/performance_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,33 @@ const { Console } = require('console');

// const APPID = 'tilechat';

// CONSOLE.NATIVE
let config = {
EXPECTED_AVG_DIRECT_MESSAGE_DELAY: 160,
EXPECTED_AVG_GROUP_MESSAGE_DELAY: 160,
REQS_PER_SECOND: 300,
REQS_PER_SECOND: 100,
MAX_SECONDS: 10,
CONCURRENCY: 1, // 2
API_SERVER_HOST: 'localhost',
API_SERVER_PORT: 8004,
MQTT_ENDPOINT: 'ws://localhost:15675/ws',
API_ENDPOINT: 'http://localhost:8004/api',
MQTT_ENDPOINT: 'wss://console.native.tiledesk.com:15675/ws',
API_ENDPOINT: 'https://console.native.tiledesk.com/chatapi/api',
APPID: 'tilechat'
}

// let config = {
// EXPECTED_AVG_DIRECT_MESSAGE_DELAY: 160,
// EXPECTED_AVG_GROUP_MESSAGE_DELAY: 160,
// REQS_PER_SECOND: 100,
// MAX_SECONDS: 10,
// CONCURRENCY: 1, // 2
// //API_SERVER_HOST: 'localhost',
// API_SERVER_PORT: 8004,
// MQTT_ENDPOINT: 'ws://localhost:15675/ws',
// API_ENDPOINT: 'http://localhost:8004/api',
// APPID: 'tilechat'
// }

const user1 = {
userid: 'USER1',
fullname: 'User 1',
Expand All @@ -54,7 +68,8 @@ let chatClient1 = new Chat21Client(
{
appId: config.APPID,
MQTTendpoint: config.MQTT_ENDPOINT,
APIendpoint: config.API_ENDPOINT
APIendpoint: config.API_ENDPOINT,
log: true
});

let chatClient2 = new Chat21Client(
Expand Down Expand Up @@ -86,18 +101,22 @@ let group_name; // got in before()
describe("Performance Test", function() {
before(function(done) {
chatClient1.connect(user1.userid, user1.token, () => {
// console.log("chatClient1 Connected...");
console.log("chatClient1 Connected...");
chatClient2.connect(user2.userid, user2.token, async () => {
// console.log("chatClient2 Connected...");
console.log("chatClient2 Connected...");
group_id = "group-" + uuidv4().replace("-", "");
group_name = "benchmarks group " + group_id;
const group_members = {}
group_members[user2.userid] = 1;
let total_ = 0
const start_ = Date.now();
chatClient1.groupCreate(
group_name,
group_id,
group_members,
(err, result) => {
total_ = Date.now() - start_
console.log("TOTAL GROUP CREATION TIME", total_)
assert(err == null);
assert(result != null);
assert(result.success == true);
Expand Down Expand Up @@ -145,13 +164,13 @@ describe("Performance Test", function() {
let total_iterations = config.REQS_PER_SECOND * config.MAX_SECONDS;
let test_start_time = Date.now();
let current = 0;
console.log("Direct - message average latency is expected to be <", config.EXPECTED_AVG_DIRECT_MESSAGE_DELAY + "ms");
console.log("Direct - MESSAGES/SEC =", config.REQS_PER_SECOND * config.CONCURRENCY);
console.log("Direct - MESSAGES/SEC/VU =", config.REQS_PER_SECOND);
console.log("Direct - TEST DURATION (s) =", config.MAX_SECONDS);
console.log("Direct - CONCURRENCY (#VUs) =", config.CONCURRENCY);
console.log("Direct - DELAY BETWEEN MESSAGES (ms) =", delay);
console.log("Direct - TOTAL ITERATIONS =", total_iterations);
console.log("Direct - Expected message average latency to be <", config.EXPECTED_AVG_DIRECT_MESSAGE_DELAY + "ms");
console.log("Direct - Expected MESSAGES/SEC =", config.REQS_PER_SECOND * config.CONCURRENCY);
console.log("Direct - Expected MESSAGES/SEC/VU =", config.REQS_PER_SECOND);
console.log("Direct - Expected TEST DURATION (s) =", config.MAX_SECONDS);
console.log("Direct - Expected CONCURRENCY (#VUs) =", config.CONCURRENCY);
console.log("Direct - Expected DELAY BETWEEN MESSAGES (ms) =", delay);
console.log("Direct - Expected TOTAL ITERATIONS =", total_iterations);
console.log("Direct - Running benchmark...");

for (let i = 0; i < total_iterations; i++) {
Expand All @@ -175,7 +194,7 @@ describe("Performance Test", function() {

function endCallback(latency) {
console.log("\n\n********* Direct - Benchmark results *********");
console.log("Direct - Final latency:", latency.meanLatencyMs);
console.log("Direct - Message mean latency:", latency.meanLatencyMs);
let test_duration = Math.round(current / 1000)
console.log("Direct - Test duration:", test_duration + " seconds" + " (" + current + ") ms");
let mesg_sec = Math.round(latency.totalMessages / test_duration)
Expand Down Expand Up @@ -205,13 +224,13 @@ describe("Performance Test", function() {
let total_iterations = config.REQS_PER_SECOND * config.MAX_SECONDS;
let test_start_time = Date.now();
let current = 0;
console.log("Group - message average latency is expected to be <", config.EXPECTED_AVG_DIRECT_MESSAGE_DELAY + "ms");
console.log("Group - CONCURRENCY (#VIRTUAL USERs aka VUs) =", config.CONCURRENCY);
console.log("Group - MESSAGES/SEC =", config.REQS_PER_SECOND * config.CONCURRENCY);
console.log("Group - MESSAGES/SEC/VU =", config.REQS_PER_SECOND);
console.log("Group - TEST DURATION (s) =", config.MAX_SECONDS);
console.log("Group - DELAY BETWEEN MESSAGES (ms) =", delay);
console.log("Group - TOTAL ITERATIONS =", total_iterations);
console.log("Group - Expected message average latency to be <", config.EXPECTED_AVG_DIRECT_MESSAGE_DELAY + "ms");
console.log("Group - Expected CONCURRENCY (#VIRTUAL USERs aka VUs) =", config.CONCURRENCY);
console.log("Group - Expected MESSAGES/SEC =", config.REQS_PER_SECOND * config.CONCURRENCY);
console.log("Group - Expected MESSAGES/SEC/VU =", config.REQS_PER_SECOND);
console.log("Group - Expected TEST DURATION (s) =", config.MAX_SECONDS);
console.log("Group - Expected DELAY BETWEEN MESSAGES (ms) =", delay);
console.log("Group - Expected TOTAL ITERATIONS =", total_iterations);
console.log("Group - Running benchmark...");
for (let i = 0; i < total_iterations; i++) {
// console.log("GROUP i:", i)
Expand Down
6 changes: 4 additions & 2 deletions mqttclient/chat21client.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class Chat21Client {
// callback - function (err)
// console.log("recipient_id:", recipient_id)
let dest_topic = `apps/${this.appid}/outgoing/users/${this.user_id}/messages/${recipient_id}/outgoing`
// console.log("dest_topic:", dest_topic)
console.log("dest_topic:", dest_topic)
// let outgoing_message = {
// text: text,
// type: type,
Expand Down Expand Up @@ -537,7 +537,9 @@ class Chat21Client {
this.subscribeToMyConversations()
// no more then one "on_message" handler, thanks.
this.on_message_handler = this.client.on('message', (topic, message) => {
// console.log("topic:" + topic + "\nmessage payload:" + message)
if (this.log) {
console.log("topic:" + topic + "\nmessage payload:" + message)
}
const _topic = this.parseTopic(topic)
if (!_topic) {
if (this.log) {
Expand Down
123 changes: 70 additions & 53 deletions observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ function startPublisher() {
if (closeOnErr(err)) return;
ch.on("error", function (err) {
logger.error("[AMQP] channel error", err);
process.exit(0);
});
ch.on("close", function () {
logger.debug("[AMQP] channel closed");
Expand Down Expand Up @@ -216,6 +217,7 @@ function startWorker() {
if (closeOnErr(err)) return;
ch.on("error", function (err) {
logger.error("[AMQP] channel error", err);
process.exit(0);
});
ch.on("close", function () {
logger.debug("[AMQP] channel closed");
Expand Down Expand Up @@ -407,11 +409,21 @@ function process_outgoing(topic, message_string, callback) {
});
}
else {
logger.debug("Group message.");
const group_id = recipient_id
if (outgoing_message.group) {
logger.debug("Inline Group message.", outgoing_message);
let inline_group = outgoing_message.group;
inline_group.uid = group_id;
inline_group.members[group_id] = 1
inline_group.members[sender_id] = 1
console.log("...inline_group:", inline_group);
sendMessageToGroupMembers(outgoing_message, inline_group, app_id, (ack) => {
callback(ack);
});
return;
}
// chatdb.getGroup(group_id, function(err, group) { // REDIS?
getGroup(group_id, function(err, group) {
// logger.debug("group found!", group)
if (!group) { // created only to temporary store group-messages in group-timeline
// TODO: 1. create group (on-the-fly), 2. remove this code, 3. continue as if the group exists.
logger.debug("group doesn't exist! Sending anyway to group timeline...");
Expand All @@ -421,65 +433,67 @@ function process_outgoing(topic, message_string, callback) {
members: {
}
}
group.members[me] = 1
group.members[sender_id] = 1
}
// if (!group.members[me]) {
// logger.debug(me + " can't write to this group")
// callback(true)
// return
// }

// Adding the group as a "volatile" member, so we easily get a copy of
// all the group messages in timelineOf: group.uid
group.members[group.uid] = 1
// logger.debug("Writing to group:", group)
let count = 0;
logger.debug("group members", group.members);
let max = Object.keys(group.members).length;
let error_encoutered = false;
for (let [member_id, value] of Object.entries(group.members)) {
const inbox_of = member_id;
const convers_with = recipient_id;
logger.debug("inbox_of: "+ inbox_of);
logger.debug("convers_with: " + convers_with);
logger.debug("sending group outgoing message to member", member_id);
// if (inbox_of === outgoing_message.sender) {
if (inbox_of === group.uid) { // choosing one member, the group ("volatile" member), for the "status=SENT", used by the "message-sent" webhook
logger.debug("inbox_of === outgoing_message.sender. status=SENT system YES?", inbox_of);
outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT;
}
else if (outgoing_message.attributes && outgoing_message.attributes.hiddenFor && outgoing_message.attributes.hiddenFor === inbox_of) {
logger.debug('sendGroupMessageToMembersTimeline skip message for ' + outgoing_message.attributes.hiddenFor);
break;
sendMessageToGroupMembers(outgoing_message, group, app_id, (ack) => {
callback(ack);
});
})
}
}

function sendMessageToGroupMembers(outgoing_message, group, app_id, callback) {
// logger.debug("Writing to group:", group)
let count = 0;
logger.debug("group members", group.members);
let max = Object.keys(group.members).length;
let error_encoutered = false;
for (let [member_id, value] of Object.entries(group.members)) {
const inbox_of = member_id;
const convers_with = group.uid;
logger.debug("inbox_of: "+ inbox_of);
logger.debug("convers_with: " + convers_with);
logger.debug("sending group outgoing message to member", member_id);
// if (inbox_of === outgoing_message.sender) {
if (inbox_of === group.uid) { // choosing one member, the group ("volatile" member), for the "status=SENT", used by the "message-sent" webhook
logger.debug("inbox_of === outgoing_message.sender. status=SENT system YES?", inbox_of);
outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT;
}
else if (outgoing_message.attributes && outgoing_message.attributes.hiddenFor && outgoing_message.attributes.hiddenFor === inbox_of) {
logger.debug('sendGroupMessageToMembersTimeline skip message for ' + outgoing_message.attributes.hiddenFor);
break;
}
else {
logger.debug("inbox_of != outgoing_message.sender. status=DELIVERED no system, is:", inbox_of);
outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.DELIVERED;
}
console.log("delivering group message with status...", outgoing_message.status, " to:", inbox_of);
deliverMessage(outgoing_message, app_id, inbox_of, convers_with, function(ok) {
logger.debug("GROUP MESSAGE DELIVERED?", ok)
count++;
logger.debug("Sent Counting:", count);
logger.debug("Max:", max);
if (!ok) {
logger.debug("Error sending message to group " + group.uid);
error_encoutered = true
}
if (count == max) {
if (error_encoutered) {
logger.error("ERROR SENDING MESSAGE TO GROUP!");
//callback(false)
}
else {
logger.debug("inbox_of != outgoing_message.sender. status=DELIVERED no system, is:", inbox_of);
outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.DELIVERED;
logger.log("ALL OK! MESSAGE SENT TO GROUP! ACK!");
//callback(true);
}
logger.debug("delivering group message with status...", outgoing_message.status, " to:", inbox_of);
deliverMessage(outgoing_message, app_id, inbox_of, convers_with, function(ok) {
logger.debug("GROUP MESSAGE DELIVERED?", ok)
count++;
logger.debug("Sent Counting:", count);
logger.debug("Max:", max);
if (!ok) {
logger.debug("Error sending message to group " + group.uid);
error_encoutered = true
}
if (count == max) {
if (error_encoutered) {
logger.error("ERROR SENDING MESSAGE TO GROUP!");
callback(false)
}
else {
logger.log("ALL OK! MESSAGE SENT TO GROUP! ACK!");
callback(true);
}
}
})
} // end for
}) // end getGroup
}
}
})
} // end for
callback(true);
}

let groups = {};
Expand Down Expand Up @@ -755,6 +769,7 @@ function process_update(topic, message_string, callback) {
// }

// TODO: MOVE TO A PERSIST_UPDATED TOPIC/QUEUE...
// TODO, BETTER: USE _WEBHOOK WITH MESSAGE-STATUS-UPDATED TO SAVE THE MESSAGE
logger.debug(">>> ON DISK... WITH A STATUS ON MY MESSAGE-UPDATE TOPIC", topic, "WITH PATCH", my_message_patch)
chatdb.saveOrUpdateMessage(my_message_patch, function(err, msg) {
// logger.debug(">>> MESSAGE ON TOPIC", topic, "UPDATED!")
Expand Down Expand Up @@ -795,6 +810,7 @@ function process_update(topic, message_string, callback) {
patch.conversWith = convers_with
logger.debug(">>> ON DISK... CONVERSATION TOPIC " + topic + " WITH PATCH " + patch)
logger.debug("Updating conversation 2.")
// BETTER: ACK, THEN WEBHOOK CONVERSATION-SAVE
chatdb.saveOrUpdateConversation(patch, function(err, doc) {
logger.debug(">>> CONVERSATION ON TOPIC:", topic, "UPDATED?")
if (err) {
Expand Down Expand Up @@ -845,6 +861,7 @@ function process_archive(topic, payload, callback) {
}
logger.debug("NOTIFY VIA WEBHOOK ON SAVE TOPIC "+ topic)
if (webhook_enabled) {
// BETTER: WEBHOOK CONVERSATION-SAVE WITH archived=true
webhooks.WHnotifyConversationArchived(conversation_archive_patch, topic, (err) => {
if (err) {
logger.error("Webhook notified with err:"+ err)
Expand Down
Loading

0 comments on commit 40cfc6c

Please sign in to comment.