diff --git a/chatdb/index.js b/chatdb/index.js index 9f4e8ee..f797ba3 100644 --- a/chatdb/index.js +++ b/chatdb/index.js @@ -3,7 +3,8 @@ Andrea Sponziello - (c) Tiledesk.com */ -const winston = require("../winston"); +// const winston = require("../winston"); +const logger = require('../tiledesk-logger').logger; /** * This is the class that manages DB persistence @@ -38,11 +39,11 @@ class ChatDB { } saveOrUpdateMessage(message, callback) { - winston.debug("saving message...", message) + logger.debug("saving message...", message) delete message['_id'] // if present (message is coming from a mongodb query?) it is illegal. It produces: MongoError: E11000 duplicate key error collection: tiledesk-dialogflow-proxy.messages index: _id_ dup key: { : "5ef72c2494e08ffec88a033a" } this.db.collection(this.messages_collection).updateOne({timelineOf: message.timelineOf, message_id: message.message_id}, { $set: message }, { upsert: true }, function(err, doc) { if (err) { - winston.error("db error...", err) + logger.error("db error...", err) if (callback) { callback(err, null) } @@ -56,17 +57,17 @@ class ChatDB { } saveOrUpdateConversation(conversation, callback) { - // winston.debug("saving conversation...", conversation) + // logger.debug("saving conversation...", conversation) this.db.collection(this.conversations_collection).updateOne({timelineOf: conversation.timelineOf, conversWith: conversation.conversWith}, { $set: conversation}, { upsert: true }, function(err, doc) { if (err) { - console.error("error saveOrUpdateConversation", err) + logger.error("error saveOrUpdateConversation", err) if (callback) { callback(err, null) } } else { if (callback) { - console.debug("Conversation saved.") + logger.debug("Conversation saved.") callback(null, doc) } } @@ -74,7 +75,7 @@ class ChatDB { } saveOrUpdateGroup(group, callback) { - winston.debug("saving group...", group) + logger.debug("saving group...", group) this.db.collection(this.groups_collection).updateOne( { uid: group.uid }, { $set: group }, { upsert: true }, function(err, doc) { if (callback) { callback(err, null) @@ -103,7 +104,7 @@ class ChatDB { } lastConversations(appid, userid, archived, callback) { - winston.debug("DB. app:", appid, "user:", userid, "archived:", archived) + logger.debug("DB. app:", appid, "user:", userid, "archived:", archived) this.db.collection(this.conversations_collection).find( { timelineOf: userid, app_id: appid, archived: archived } ).limit(200).sort( { timestamp: -1 } ).toArray(function(err, docs) { if (err) { if (callback) { @@ -134,7 +135,7 @@ class ChatDB { } lastMessages(appid, userid, convid, sort, limit, callback) { - winston.debug("DB. app:", appid, "user:", userid, "convid", convid) + logger.debug("DB. app:", appid, "user:", userid, "convid", convid) this.db.collection(this.messages_collection).find( { timelineOf: userid, app_id: appid, conversWith: convid } ).limit(limit).sort( { timestamp: sort } ).toArray(function(err, docs) { if (err) { if (callback) { diff --git a/chatservermq.js b/chatservermq.js index 2e3ec28..5729a4a 100644 --- a/chatservermq.js +++ b/chatservermq.js @@ -6,7 +6,8 @@ require('dotenv').config(); -const winston = require("./winston"); +// const winston = require("./winston"); +const logger = require('./tiledesk-logger').logger; var observer = require('./observer'); var startServer = observer.startServer; @@ -20,24 +21,24 @@ if (webhook_enabled == undefined || webhook_enabled === "true" || webhook_enable }else { webhook_enabled = false; } -winston.info("webhook_enabled: " + webhook_enabled); +logger.info("webhook_enabled: " + webhook_enabled); var webhook_endpoint = process.env.WEBHOOK_ENDPOINT; -winston.info("webhook_endpoint: " + webhook_endpoint); +logger.info("webhook_endpoint: " + webhook_endpoint); let webhook_events_array = null; if (process.env.WEBHOOK_EVENTS) { - console.log(typeof process.env.WEBHOOK_EVENTS); + // logger.log(typeof process.env.WEBHOOK_EVENTS); const webhook_events = process.env.WEBHOOK_EVENTS; webhook_events_array = webhook_events.split(","); } -winston.info("webhook_events_array: " , webhook_events_array); +logger.info("webhook_events_array: " , webhook_events_array); -winston.info("Starting observer") +logger.info("Starting observer") async function start() { observer.setWebHookEnabled(webhook_enabled); diff --git a/observer.js b/observer.js index f643026..5f808c8 100644 --- a/observer.js +++ b/observer.js @@ -1,4 +1,4 @@ -const winston = require("./winston"); +// const winston = require("./winston"); var amqp = require('amqplib/callback_api'); const { ChatDB } = require('./chatdb/index.js'); // const { Webhooks } = require('./webhooks/index.js'); @@ -13,18 +13,19 @@ const { Webhooks } = require("./webhooks"); const { Console } = require("console"); const app = express(); app.use(bodyParser.json()); +const logger = require('./tiledesk-logger').logger; /* var webhook_endpoint = process.env.WEBHOOK_ENDPOINT || "http://localhost:3000/chat21/requests"; -winston.info("webhook_endpoint: " + webhook_endpoint); +logger.info("webhook_endpoint: " + webhook_endpoint); let webhook_events_array = null; if (process.env.WEBHOOK_EVENTS) { - console.log(typeof process.env.WEBHOOK_EVENTS); + logger.log(typeof process.env.WEBHOOK_EVENTS); const webhook_events = process.env.WEBHOOK_EVENTS; webhook_events_array = webhook_events.split(","); } -winston.info("webhook_events_array: " , webhook_events_array); +logger.info("webhook_events_array: " , webhook_events_array); var webhook_enabled = process.env.WEBHOOK_ENABLED; if (webhook_enabled == undefined || webhook_enabled === "true" || webhook_enabled === true ) { @@ -32,13 +33,13 @@ if (webhook_enabled == undefined || webhook_enabled === "true" || webhook_enable }else { webhook_enabled = false; } -winston.info("webhook_enabled: " + webhook_enabled); +logger.info("webhook_enabled: " + webhook_enabled); */ /* var app_id = process.env.APP_ID || "tilechat"; -winston.info("app_id: " + app_id); +logger.info("app_id: " + app_id); */ @@ -84,7 +85,7 @@ if (webhook_enabled == undefined || webhook_enabled === "true" || webhook_enable }else { webhook_enabled = false; } -winston.info("webhook_enabled: " + webhook_enabled); +logger.info("webhook_enabled: " + webhook_enabled); let webhook_endpoint; let webhook_events_array; @@ -135,12 +136,12 @@ function startMQ(resolve, reject) { } else { autoRestart=false; } - winston.debug("Connecting to RabbitMQ...") + logger.debug("Connecting to RabbitMQ...") amqp.connect(rabbitmq_uri, (err, conn) => { if (err) { - winston.error("[AMQP]", err); + logger.error("[AMQP]", err); if (autoRestart) { - console.error("[AMQP] reconnecting"); + logger.error("[AMQP] reconnecting"); return setTimeout(() => { startMQ(resolve, reject) }, 1000); } else { process.exit(1); @@ -148,14 +149,14 @@ function startMQ(resolve, reject) { } conn.on("error", (err) => { if (err.message !== "Connection closing") { - winston.error("[AMQP] conn error", err); + logger.error("[AMQP] conn error", err); return reject(err); } }); conn.on("close", () => { - console.error("[AMQP] close"); + logger.error("[AMQP] close"); if (autoRestart) { - console.error("[AMQP] reconnecting"); + logger.error("[AMQP] reconnecting"); return setTimeout(() => { startMQ(resolve, reject) }, 1000); } else { process.exit(1); @@ -163,7 +164,7 @@ function startMQ(resolve, reject) { }); amqpConn = conn; whenConnected().then(function(ch) { - winston.debug("whenConnected() returned") + logger.debug("whenConnected() returned") return resolve({conn: conn, ch: ch}); }); }); @@ -183,10 +184,10 @@ function startPublisher() { amqpConn.createConfirmChannel( (err, ch) => { if (closeOnErr(err)) return; ch.on("error", function (err) { - winston.error("[AMQP] channel error", err); + logger.error("[AMQP] channel error", err); }); ch.on("close", function () { - winston.debug("[AMQP] channel closed"); + logger.debug("[AMQP] channel closed"); }); pubChannel = ch; // if (offlinePubQueue.length > 0) { @@ -205,18 +206,18 @@ function publish(exchange, routingKey, content, callback) { pubChannel.publish(exchange, routingKey, content, { persistent: true }, function (err, ok) { if (err) { - winston.error("[AMQP] publish error:", err); + logger.error("[AMQP] publish error:", err); offlinePubQueue.push([exchange, routingKey, content]); pubChannel.connection.close(); callback(err) } else { - // winston.debug("published to", routingKey, "result", ok) + // logger.debug("published to", routingKey, "result", ok) callback(null) } }); } catch (e) { - console.error("[AMQP] publish", e.message); + logger.error("[AMQP] publish", e.message); offlinePubQueue.push([exchange, routingKey, content]); callback(e) } @@ -228,10 +229,10 @@ function startWorker() { channel = ch; if (closeOnErr(err)) return; ch.on("error", function (err) { - winston.error("[AMQP] channel error", err); + logger.error("[AMQP] channel error", err); }); ch.on("close", function () { - winston.debug("[AMQP] channel closed"); + logger.debug("[AMQP] channel closed"); }); ch.prefetch(10); ch.assertExchange(exchange, 'topic', { @@ -255,32 +256,32 @@ function startWorker() { function subscribeTo(topic, channel, queue) { channel.bindQueue(queue, exchange, topic, {}, function (err, oka) { if (err) { - winston.error("Error:", err, " binding on queue:", queue, "topic:", topic) + logger.error("Error:", err, " binding on queue:", queue, "topic:", topic) } else { - winston.info("bind: '" + queue + "' on topic: " + topic); + logger.info("bind: '" + queue + "' on topic: " + topic); } }); } function processMsg(msg) { - // console.debug("processMsgw. New msg:", msg); + // logger.debug("processMsgw. New msg:", msg); if (msg == null) { - console.error("Error. Msg is null. Stop job") + logger.error("Error. Msg is null. Stop job") return; } work(msg, function (ok) { try { if (ok) { - console.debug("channel.ack(msg)"); + logger.debug("channel.ack(msg)"); channel.ack(msg); } else { - console.debug("channel.reject(msg, true)"); + logger.debug("channel.reject(msg, true)"); channel.reject(msg, true); } } catch (e) { - winston.debug("processMsgwork error ", e) + logger.debug("processMsgwork error ", e) closeOnErr(e); } }); @@ -288,11 +289,11 @@ function processMsg(msg) { function work(msg, callback) { if (!msg) { - console.error("Error. Work Message is empty. Removing this job with ack=ok.", msg); + logger.error("Error. Work Message is empty. Removing this job with ack=ok.", msg); callback(true); return; } - console.debug("work NEW TOPIC: " + msg.fields.routingKey) //, " message:", msg.content.toString()); + logger.debug("work NEW TOPIC: " + msg.fields.routingKey) //, " message:", msg.content.toString()); const topic = msg.fields.routingKey //.replace(/[.]/g, '/'); const message_string = msg.content.toString(); if (topic.endsWith('.outgoing')) { @@ -320,7 +321,7 @@ function work(msg, callback) { process_update(topic, message_string, callback); } else { - winston.error("unhandled topic:", topic) + logger.error("unhandled topic:", topic) callback(true) } } @@ -328,12 +329,12 @@ function work(msg, callback) { // ***** TOPIC HANDLERS ******/ function process_presence(topic, message_string, callback) { - winston.debug("got PRESENCE testament", message_string, " on topic", topic) + logger.debug("got PRESENCE testament", message_string, " on topic", topic) callback(true) } function process_outgoing(topic, message_string, callback) { - winston.debug("process outgoing topic:" + topic) + logger.debug("process outgoing topic:" + topic) var topic_parts = topic.split(".") // /apps/tilechat/users/(ME)SENDER_ID/messages/RECIPIENT_ID/outgoing const app_id = topic_parts[1] @@ -355,18 +356,18 @@ function process_outgoing(topic, message_string, callback) { let convers_with; if (!isMessageGroup(outgoing_message)) { - console.debug("Direct message."); + logger.debug("Direct message."); inbox_of = sender_id; convers_with = recipient_id; outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT // =100. DELIVERED (=150) it's better, but the JS client actually wants 100 to show the sent-checkbox deliverMessage(outgoing_message, app_id, inbox_of, convers_with, function(ok) { - console.debug("delivered to sender. OK?", ok); + logger.debug("delivered to sender. OK?", ok); if (ok) { outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.DELIVERED // =150 inbox_of = recipient_id; convers_with = sender_id; deliverMessage(outgoing_message, app_id, inbox_of, convers_with, function(ok) { - console.debug("delivered to recipient. OK?", ok); + logger.debug("delivered to recipient. OK?", ok); if (ok) { callback(true); } @@ -376,52 +377,52 @@ function process_outgoing(topic, message_string, callback) { }); } else { - winston.debug("Error delivering: ", outgoing_message) + logger.debug("Error delivering: ", outgoing_message) callback(false); } }); - // winston.debug("!isGroup") + // logger.debug("!isGroup") // let inbox_of = recipient_id // let convers_with = sender_id // deliverMessage(outgoing_message, app_id, inbox_of, convers_with, function(ok) { // // WEBHOOK DELIVERED STATUS // outgoing_message.status - // winston.debug("outgoing_message1 OK?", ok) + // logger.debug("outgoing_message1 OK?", ok) // if (ok) { // if (recipient_id !== sender_id) { // inbox_of = sender_id // convers_with = recipient_id // outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT // =100. DELIVERED it's better, but the JS client actually wants 100 to show the sent-checkbox // deliverMessage(outgoing_message, app_id, inbox_of, convers_with, function(ok) { - // winston.debug("outgoing_message2 OK?", ok) + // logger.debug("outgoing_message2 OK?", ok) // if (ok) { // callback(true) // } // else { - // winston.debug("Error delivering: ", outgoing_message) + // logger.debug("Error delivering: ", outgoing_message) // callback(false) // } // }) // } // else { - // winston.debug("message sent to myself. not delivering") + // logger.debug("message sent to myself. not delivering") // callback(true) // } // } // else { - // winston.debug("!ok") + // logger.debug("!ok") // callback(false) // } // }) } else { - console.log("message group."); + logger.log("message group."); const group_id = recipient_id chatdb.getGroup(group_id, function(err, group) { // REDIS? - // winston.debug("group found!", group) + // logger.debug("group found!", group) if (!group) { // created only to temporary store group-messages in group-timeline // TODO: 1. create group (on-the-fly), 2. remove this code, 3. continue as ifthe group exists. - winston.debug("group doesn't exist! Sending anyway to group timeline...") + logger.debug("group doesn't exist! Sending anyway to group timeline...") group = { uid: group_id, transient: true, @@ -431,7 +432,7 @@ function process_outgoing(topic, message_string, callback) { group.members[me] = 1 } // if (!group.members[me]) { - // winston.debug(me + " can't write to this group") + // logger.debug(me + " can't write to this group") // callback(true) // return // } @@ -439,42 +440,42 @@ function process_outgoing(topic, message_string, callback) { // all the group messages in timelineOf: group.uid group.members[group.uid] = 1 - // winston.debug("Writing to group:", group) + // logger.debug("Writing to group:", group) let count = 0; - console.log("group members", group.members); + logger.log("group members", group.members); let max = Object.keys(group.members).length; let error_encoutered = false; for (let [member_id, value] of Object.entries(group.members)) { const inbox_of = member_id; const convers_with = recipient_id; - winston.debug("inbox_of: "+ inbox_of); - winston.debug("convers_with: " + convers_with); - console.log("sending group outgoing message to member", member_id); + logger.debug("inbox_of: "+ inbox_of); + logger.debug("convers_with: " + convers_with); + logger.log("sending group outgoing message to member", member_id); if (inbox_of === outgoing_message.sender) { - console.log("inbox_of === outgoing_message.sender. status=SENT system YES?", inbox_of); + logger.log("inbox_of === outgoing_message.sender. status=SENT system YES?", inbox_of); outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT; } else { - console.log("inbox_of != outgoing_message.sender. status=DELIVERED no system, is:", inbox_of); + logger.log("inbox_of != outgoing_message.sender. status=DELIVERED no system, is:", inbox_of); outgoing_message.status = MessageConstants.CHAT_MESSAGE_STATUS_CODE.DELIVERED; } - console.log("delivering group message with status...", outgoing_message.status, " to:", inbox_of); + logger.log("delivering group message with status...", outgoing_message.status, " to:", inbox_of); deliverMessage(outgoing_message, app_id, inbox_of, convers_with, function(ok) { - winston.debug("GROUP MESSAGE DELIVERED?", ok) + logger.debug("GROUP MESSAGE DELIVERED?", ok) count++; - console.log("Sent Counting:", count); - console.log("Max:", max); + logger.log("Sent Counting:", count); + logger.log("Max:", max); if (!ok) { - winston.debug("Error sending message to group " + group.uid); + logger.debug("Error sending message to group " + group.uid); error_encoutered = true } if (count == max) { if (error_encoutered) { - console.error("ERROR SENDING MESSAGE TO GROUP!"); + logger.error("ERROR SENDING MESSAGE TO GROUP!"); callback(false) } else { - console.log("ALL OK! MESSAGE SENT TO GRUP! ACK!"); + logger.log("ALL OK! MESSAGE SENT TO GRUP! ACK!"); callback(true); } } @@ -485,12 +486,12 @@ function process_outgoing(topic, message_string, callback) { } function isMessageGroup(message) { - // console.debug("checking is group", message); + // logger.debug("checking is group", message); if (message.channel_type === 'group') { - // console.log("is group!") + // logger.log("is group!") return true } - // console.log("not a group") + // logger.log("not a group") return false } @@ -503,93 +504,93 @@ function isMessageGroup(message) { // Places te message in the inbox of the recipient function deliverMessage(message, app_id, inbox_of, convers_with_id, callback) { - console.log("DELIVERINGMESSAGE:",message) - winston.debug("DELIVERING:", message, "inbox_of:", inbox_of, "convers_with:", convers_with_id) + logger.log("DELIVERINGMESSAGE:",message) + logger.debug("DELIVERING:", message, "inbox_of:", inbox_of, "convers_with:", convers_with_id) // internal flow const persist_topic = `apps.observer.${app_id}.users.${inbox_of}.messages.${convers_with_id}.persist` // mqtt (client) flow const added_topic = `apps.${app_id}.users.${inbox_of}.messages.${convers_with_id}.clientadded` - winston.debug("persist_topic: " + persist_topic) - winston.debug("added_topic: " + added_topic) + logger.debug("persist_topic: " + persist_topic) + logger.debug("added_topic: " + added_topic) const mstatus = message.status; - console.log("mstatus:", mstatus) + logger.log("mstatus:", mstatus) const message_payload = JSON.stringify(message) // notifies to the client (on MQTT client topic) publish(exchange, added_topic, Buffer.from(message_payload), function(err, msg) { // .clientadded if (err) { - console.error("Error on topic: ", added_topic, " Err:", err); + logger.error("Error on topic: ", added_topic, " Err:", err); callback(false); return; } - console.debug("NOTIFY VIA WHnotifyMessageStatusDelivered, topic: " + added_topic); + logger.debug("NOTIFY VIA WHnotifyMessageStatusDelivered, topic: " + added_topic); if (webhooks && webhook_enabled) { - console.log("webhooks && webhook_enabled ON, processing webhooks, message:", message); + logger.log("webhooks && webhook_enabled ON, processing webhooks, message:", message); webhooks.WHnotifyMessageStatusSentOrDelivered(message_payload, added_topic, (err) => { if (err) { - console.error("WHnotifyMessageStatusSentOrDelivered with err:"+ err); + logger.error("WHnotifyMessageStatusSentOrDelivered with err:"+ err); callback(false); } else { - console.debug("WHnotifyMessageStatusSentOrDelivered ok"); - console.debug("ADDED. NOW PUBLISH TO 'persist' TOPIC: " + persist_topic); + logger.debug("WHnotifyMessageStatusSentOrDelivered ok"); + logger.debug("ADDED. NOW PUBLISH TO 'persist' TOPIC: " + persist_topic); publish(exchange, persist_topic, Buffer.from(message_payload), function(err, msg) { // .persist if (err) { - console.error("Error PUBLISH TO 'persist' TOPIC:", err); + logger.error("Error PUBLISH TO 'persist' TOPIC:", err); callback(false); return; } - console.debug("... ALL GOOD ON:", persist_topic); + logger.debug("... ALL GOOD ON:", persist_topic); callback(true); }) } }); } else { - console.debug("ADDED. NOW PUBLISH TO 'persist' TOPIC: " + persist_topic); + logger.debug("ADDED. NOW PUBLISH TO 'persist' TOPIC: " + persist_topic); publish(exchange, persist_topic, Buffer.from(message_payload), function(err, msg) { // .persist if (err) { - console.error("Error PUBLISH TO 'persist' TOPIC:", err); + logger.error("Error PUBLISH TO 'persist' TOPIC:", err); callback(false); return; } - console.debug("... ALL GOOD ON:", persist_topic); + logger.debug("... ALL GOOD ON:", persist_topic); callback(true); }) } // if (webhook_enabled) { - // console.log("webhook_enabled!!!!!", webhook_enabled, message.status) + // logger.log("webhook_enabled!!!!!", webhook_enabled, message.status) // if (message.status == MessageConstants.CHAT_MESSAGE_STATUS_CODE.DELIVERED) { - // winston.debug("WHnotifyMessageStatusDelivered before message.status == MessageConstants.CHAT_MESSAGE_STATUS_CODE.DELIVERED"); + // logger.debug("WHnotifyMessageStatusDelivered before message.status == MessageConstants.CHAT_MESSAGE_STATUS_CODE.DELIVERED"); // webhooks.WHnotifyMessageStatusDelivered(message, (err) => { // if (err) { - // winston.error("WHnotifyMessageStatusDelivered with err:"+ err) + // logger.error("WHnotifyMessageStatusDelivered with err:"+ err) // } else { - // winston.debug("WHnotifyMessageStatusDelivered ok") + // logger.debug("WHnotifyMessageStatusDelivered ok") // } // }) // } // else if (message.status == MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT) { - // winston.debug("WHnotifyMessageStatusDelivered before message.status == MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT"); + // logger.debug("WHnotifyMessageStatusDelivered before message.status == MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT"); // webhooks.WHnotifyMessageStatusSent(message, (err) => { // if (err) { - // winston.error("Webhook notified with err:"+ err) + // logger.error("Webhook notified with err:"+ err) // } else { - // winston.debug("Webhook notified WHnotifyMessageReceived ok") + // logger.debug("Webhook notified WHnotifyMessageReceived ok") // } // }) // }else { - // winston.debug("WHnotifyMessageStatusDelivered before else other???"); + // logger.debug("WHnotifyMessageStatusDelivered before else other???"); // } // } // saves on db and creates conversation - // winston.debug("ADDED. NOW PUBLISH TO 'persist' TOPIC: " + persist_topic) + // logger.debug("ADDED. NOW PUBLISH TO 'persist' TOPIC: " + persist_topic) // publish(exchange, persist_topic, Buffer.from(message_payload), function(err, msg) { // .persist // if (err) { - // console.error("Error PUBLISH TO 'persist' TOPIC:", err) + // logger.error("Error PUBLISH TO 'persist' TOPIC:", err) // callback(false) // return // } - // winston.debug("... ALL GOOD ON:", persist_topic) + // logger.debug("... ALL GOOD ON:", persist_topic) // callback(true) // // publish convs .clientadded // }) @@ -598,7 +599,7 @@ function deliverMessage(message, app_id, inbox_of, convers_with_id, callback) { // delivers messages to inboxes with rabbitmq queues function process_delivered(topic, message_string, callback) { - console.debug(">>>>> DELIVERING:", topic, "MESSAGE PAYLOAD:",message_string) + logger.debug(">>>>> DELIVERING:", topic, "MESSAGE PAYLOAD:",message_string) var topic_parts = topic.split(".") // delivers the message payload in INBOX_OF -> CONVERS_WITH timeline // /apps/observer/tilechat/users/INBOX_OF/messages/CONVERS_WITH/delivered @@ -606,16 +607,16 @@ function process_delivered(topic, message_string, callback) { const inbox_of = topic_parts[4] const convers_with = topic_parts[6] const message = JSON.parse(message_string) - console.debug("____DELIVER MESSAGE:", message.message_id) + logger.debug("____DELIVER MESSAGE:", message.message_id) deliverMessage(message, app_id, inbox_of, convers_with, function(ok) { - console.debug("MESSAGE DELIVERED?: "+ ok) + logger.debug("MESSAGE DELIVERED?: "+ ok) if (!ok) { - console.error("____Error delivering message. NOACKED:", message) - console.log("____DELIVER MESSAGE:", message.message_id, " NOACKED!") + logger.error("____Error delivering message. NOACKED:", message) + logger.log("____DELIVER MESSAGE:", message.message_id, " NOACKED!") callback(false) } else { - console.log("____DELIVER MESSAGE ", message.message_id, " ACKED") + logger.log("____DELIVER MESSAGE ", message.message_id, " ACKED") callback(true) } }) @@ -624,7 +625,7 @@ function process_delivered(topic, message_string, callback) { // This handler only persists messages and persists/updates conversations. // Original messages were already delivered with *.messages.*.clientadded function process_persist(topic, message_string, callback) { - winston.debug(">>>>> TOPIC persist: " + topic + " MESSAGE PAYLOAD: " +message_string) + logger.debug(">>>>> TOPIC persist: " + topic + " MESSAGE PAYLOAD: " +message_string) var topic_parts = topic.split(".") // /apps/observer/tilechat/users/ME/messages/CONVERS_WITH/persist -> WITH "SERVER" THIS MESSAGES WILL NOT BE DELIVERED TO CLIENTS const app_id = topic_parts[2] @@ -641,10 +642,10 @@ function process_persist(topic, message_string, callback) { if (savedMessage.attributes && savedMessage.attributes.updateconversation == false) { update_conversation = false } - winston.debug("updateconversation = " + update_conversation) + logger.debug("updateconversation = " + update_conversation) chatdb.saveOrUpdateMessage(savedMessage, function(err, msg) { - winston.debug("Message saved", savedMessage) - winston.debug("Updating conversation? updateconversation is: " + update_conversation) + logger.debug("Message saved", savedMessage) + logger.debug("Updating conversation? updateconversation is: " + update_conversation) if (update_conversation) { const my_conversation_topic = 'apps.tilechat.users.' + me + '.conversations.' + convers_with + ".clientadded" let conversation = persist_message @@ -654,10 +655,10 @@ function process_persist(topic, message_string, callback) { conversation.archived = false conversation.last_message_text = conversation.text // retro comp const conversation_payload = JSON.stringify(conversation) - winston.debug("Updating conversation...") + logger.debug("Updating conversation...") chatdb.saveOrUpdateConversation(conversation, (err, doc) => { if (err) { - console.error("(chatdb.saveOrUpdateConversation callback) ERROR: ", err) + logger.error("(chatdb.saveOrUpdateConversation callback) ERROR: ", err) callback(false) } else { @@ -666,7 +667,7 @@ function process_persist(topic, message_string, callback) { }) } else { - winston.debug("Skip updating conversation. (update_conversation = false)") + logger.debug("Skip updating conversation. (update_conversation = false)") callback(true) } }) @@ -674,15 +675,15 @@ function process_persist(topic, message_string, callback) { function process_update(topic, message_string, callback) { var topic_parts = topic.split(".") - winston.debug("UPDATE. TOPIC PARTS:", topic_parts) - winston.debug("payload:" + message_string) + logger.debug("UPDATE. TOPIC PARTS:", topic_parts) + logger.debug("payload:" + message_string) if (topic_parts.length < 5) { - winston.debug("process_update topic error.") + logger.debug("process_update topic error.") callback(false) return } if (topic_parts[4] === "messages") { - winston.debug(" MESSAGE UPDATE.") + logger.debug(" MESSAGE UPDATE.") // 'apps.tilechat.users.*.messages.*.*.update' // 'apps/tilechat/users/USER_ID/messages/CONVERS_WITH/MESSAGE_ID/update' // message update, only status update actually supported @@ -690,7 +691,7 @@ function process_update(topic, message_string, callback) { const user_id = topic_parts[3] const convers_with = topic_parts[5] const message_id = topic_parts[6] - console.debug("updating message:", message_id, "on convers_with", convers_with, "for user", user_id, "patch", message_string) + logger.debug("updating message:", message_id, "on convers_with", convers_with, "for user", user_id, "patch", message_string) const patch = JSON.parse(message_string) if (!patch.status || patch.status != 200) { @@ -718,45 +719,45 @@ function process_update(topic, message_string, callback) { const dest_message_patch_payload = JSON.stringify(dest_message_patch) // PUBLISH DEST_PATCH: RETURN_RECEIPT const recipient_message_update_topic = 'apps.tilechat.users.' + convers_with + '.messages.' + me + '.' + message_id + '.clientupdated' - winston.debug(">>> NOW PUBLISHING... DEST_PATCH: RETURN_RECEIPT. TOPIC: " + recipient_message_update_topic + ", PATCH ", dest_message_patch) + logger.debug(">>> NOW PUBLISHING... DEST_PATCH: RETURN_RECEIPT. TOPIC: " + recipient_message_update_topic + ", PATCH ", dest_message_patch) publish(exchange, recipient_message_update_topic, Buffer.from(dest_message_patch_payload), function(err) { - winston.debug(">>> PUBLISHED!!!! RECIPIENT MESSAGE TOPIC UPDATE" + recipient_message_update_topic + " WITH PATCH " , dest_message_patch) + logger.debug(">>> PUBLISHED!!!! RECIPIENT MESSAGE TOPIC UPDATE" + recipient_message_update_topic + " WITH PATCH " , dest_message_patch) if (err) { - winston.error("error",err); + logger.error("error",err); callback(false) } else { - console.log("webhook_enabled?????", webhook_enabled); + logger.log("webhook_enabled?????", webhook_enabled); if (webhook_enabled) { webhooks.WHnotifyMessageStatusReturnReceipt(dest_message_patch, (err) => { if (err) { - winston.error("WHnotifyMessageStatusReturnReceipt with err:" + err) + logger.error("WHnotifyMessageStatusReturnReceipt with err:" + err) } else { - winston.debug("WHnotifyMessageStatusReturnReceipt ok") + logger.debug("WHnotifyMessageStatusReturnReceipt ok") } }) } // DISABLED BECAUSE NOT REALLY NECESSARY (FOR PERF) TO NOTIFY STATUS MODIFICATION TO THE ONE WHO COMMITED THE SAME MOD // PUBLISH MY_PATCH: RECEIVED // const my_message_update_topic = 'apps.tilechat.users.' + me + '.messages.' + convers_with + '.' + message_id + '.clientupdate' - // winston.debug(">>> NOW PUBLISHING... MY MESSAGE TOPIC UPDATE", my_message_update_topic, "WITH PATCH", my_message_patch) + // logger.debug(">>> NOW PUBLISHING... MY MESSAGE TOPIC UPDATE", my_message_update_topic, "WITH PATCH", my_message_patch) // publish(exchange, my_message_update_topic, Buffer.from(my_message_patch_payload), function(err) { - // winston.debug(">>> PUBLISHED!!!! MY MESSAGE TOPIC UPDATE", my_message_update_topic, "WITH PATCH", my_message_patch) + // logger.debug(">>> PUBLISHED!!!! MY MESSAGE TOPIC UPDATE", my_message_update_topic, "WITH PATCH", my_message_patch) // if (err) { // callback(false) // return // } // TODO: MOVE TO A PERSIST_UPDATED TOPIC/QUEUE... - winston.debug(">>> ON DISK... WITH A STATUS ON MY MESSAGE-UPDATE TOPIC", topic, "WITH PATCH", my_message_patch) + logger.debug(">>> ON DISK... WITH A STATUS ON MY MESSAGE-UPDATE TOPIC", topic, "WITH PATCH", my_message_patch) chatdb.saveOrUpdateMessage(my_message_patch, function(err, msg) { - winston.debug(">>> MESSAGE ON TOPIC", topic, "UPDATED!") + logger.debug(">>> MESSAGE ON TOPIC", topic, "UPDATED!") if (err) { - winston.error("error",err); + logger.error("error",err); callback(false) return } - winston.debug(">>> ON DISK... RECIPIENT MESSAGE ON DB WITH", dest_message_patch) + logger.debug(">>> ON DISK... RECIPIENT MESSAGE ON DB WITH", dest_message_patch) chatdb.saveOrUpdateMessage(dest_message_patch, function(err, msg) { callback(true) }); @@ -767,11 +768,11 @@ function process_update(topic, message_string, callback) { else if (topic_parts[4] === "conversations") { // conversation update, only is_new update actually supported // 'apps/tilechat/users/USER_ID/conversations/CONVERS_WITH/update' - winston.debug(" CONVERSATION UPDATE.") + logger.debug(" CONVERSATION UPDATE.") const app_id = topic_parts[1] const user_id = topic_parts[3] const convers_with = topic_parts[5] - winston.debug("updating conversation:" + convers_with + " for user " + user_id + " patch " + message_string) + logger.debug("updating conversation:" + convers_with + " for user " + user_id + " patch " + message_string) const patch = JSON.parse(message_string) // 1. Patch my conversation: convers_with @@ -780,22 +781,22 @@ function process_update(topic, message_string, callback) { const me = user_id patch.timelineOf = me patch.conversWith = convers_with - winston.debug(">>> ON DISK... CONVERSATION TOPIC " + topic + " WITH PATCH " + patch) - winston.debug("Updating conversation 2.") + logger.debug(">>> ON DISK... CONVERSATION TOPIC " + topic + " WITH PATCH " + patch) + logger.debug("Updating conversation 2.") chatdb.saveOrUpdateConversation(patch, function(err, doc) { - winston.debug(">>> CONVERSATION ON TOPIC" + topic + " UPDATED!") + logger.debug(">>> CONVERSATION ON TOPIC" + topic + " UPDATED!") if (err) { - winston.error("error",err); + logger.error("error",err); callback(false) return } const patch_payload = JSON.stringify(patch) const my_conversation_update_topic = 'apps.tilechat.users.' + me + '.conversations.' + convers_with + '.clientupdated' - winston.debug(">>> NOW PUBLISHING... MY CONVERSATION UPDATE " + my_conversation_update_topic + " WITH PATCH " + patch_payload) + logger.debug(">>> NOW PUBLISHING... MY CONVERSATION UPDATE " + my_conversation_update_topic + " WITH PATCH " + patch_payload) publish(exchange, my_conversation_update_topic, Buffer.from(patch_payload), function(err) { - winston.debug(">>> PUBLISHED!!!! MY CONVERSATION UPDATE TOPIC " + my_conversation_update_topic + " WITH PATCH " + patch_payload) + logger.debug(">>> PUBLISHED!!!! MY CONVERSATION UPDATE TOPIC " + my_conversation_update_topic + " WITH PATCH " + patch_payload) if (err) { - winston.error("error",err); + logger.error("error",err); callback(false) return } @@ -810,65 +811,65 @@ function process_update(topic, message_string, callback) { function process_archive(topic, payload, callback) { // Ex. apps/tilechat/users/USER_ID/conversations/CONVERS_WITH/archive const topic_parts = topic.split(".") - winston.debug("ARCHIVE. TOPIC PARTS:" + topic_parts + "payload (ignored): " + payload) + logger.debug("ARCHIVE. TOPIC PARTS:" + topic_parts + "payload (ignored): " + payload) if (topic_parts.length < 7) { - winston.debug("process_archive topic error. topic_parts.length < 7:" + topic) + logger.debug("process_archive topic error. topic_parts.length < 7:" + topic) callback(true) return } if (topic_parts[4] === "conversations") { - winston.debug("CONVERSATION ARCHIVE.") + logger.debug("CONVERSATION ARCHIVE.") // 'apps.tilechat.users.*.messages.*.*.update' // 'apps/tilechat/users/USER_ID/messages/CONVERS_WITH/MESSAGE_ID/update' // message update, only status update actually supported const app_id = topic_parts[1] const user_id = topic_parts[3] const convers_with = topic_parts[5] - winston.debug("archiving conversation:" + convers_with + " for user " + user_id + " payload: "+ payload) + logger.debug("archiving conversation:" + convers_with + " for user " + user_id + " payload: "+ payload) const me = user_id conversation_archive_patch = { "timelineOf": me, "conversWith": convers_with, "archived": true } - winston.debug("NOTIFY VIA WEBHOOK ON SAVE TOPIC "+ topic) + logger.debug("NOTIFY VIA WEBHOOK ON SAVE TOPIC "+ topic) if (webhook_enabled) { webhooks.WHnotifyConversationArchived(conversation_archive_patch, topic, (err) => { if (err) { - winston.error("Webhook notified with err:"+ err) + logger.error("Webhook notified with err:"+ err) } else { - winston.debug("Webhook notified WHnotifyConversationArchived ok") + logger.debug("Webhook notified WHnotifyConversationArchived ok") } }); } - winston.debug(">>> ON DISK... ARCHIVE CONVERSATION ON TOPIC: " + topic) - winston.debug("Updating conversation 3.") + logger.debug(">>> ON DISK... ARCHIVE CONVERSATION ON TOPIC: " + topic) + logger.debug("Updating conversation 3.") chatdb.saveOrUpdateConversation(conversation_archive_patch, function(err, msg) { - winston.debug(">>> CONVERSATION ON TOPIC: " + topic + " ARCHIVED!") + logger.debug(">>> CONVERSATION ON TOPIC: " + topic + " ARCHIVED!") if (err) { - winston.error("error",err); + logger.error("error",err); callback(false) return } const conversation_deleted_topic = 'apps.tilechat.users.' + user_id + '.conversations.' + convers_with + '.clientdeleted' - winston.debug(">>> NOW PUBLISHING... CONVERSATION ARCHIVED (DELETED) TOPIC " + conversation_deleted_topic) + logger.debug(">>> NOW PUBLISHING... CONVERSATION ARCHIVED (DELETED) TOPIC " + conversation_deleted_topic) const payload = JSON.stringify(conversation_archive_patch) publish(exchange, conversation_deleted_topic, Buffer.from(payload), function(err) { - winston.debug(">>> PUBLISHED!!!! CONVERSATION ON TOPIC: " + conversation_deleted_topic + " ARCHIVED (DELETED). Payload: " + payload + " buffered:" + Buffer.from(payload)) + logger.debug(">>> PUBLISHED!!!! CONVERSATION ON TOPIC: " + conversation_deleted_topic + " ARCHIVED (DELETED). Payload: " + payload + " buffered:" + Buffer.from(payload)) if (err) { - winston.error("error",err); + logger.error("error",err); callback(false) } else { // now publish new archived conversation added const archived_conversation_added_topic = 'apps.tilechat.users.' + user_id + '.archived_conversations.' + convers_with + '.clientadded' - winston.debug(">>> NOW PUBLISHING... CONVERSATION ARCHIVED (ADDED) TOPIC: "+ archived_conversation_added_topic) + logger.debug(">>> NOW PUBLISHING... CONVERSATION ARCHIVED (ADDED) TOPIC: "+ archived_conversation_added_topic) // const success_payload = JSON.stringify({"success": true}) publish(exchange, archived_conversation_added_topic, Buffer.from(payload), function(err) { - winston.debug(">>> PUBLISHED!!!! ARCHIVED (DELETED) CONVERSATION ON TOPIC: " + conversation_deleted_topic) + logger.debug(">>> PUBLISHED!!!! ARCHIVED (DELETED) CONVERSATION ON TOPIC: " + conversation_deleted_topic) if (err) { - winston.error("error",err); + logger.error("error",err); callback(false) } else { @@ -883,14 +884,14 @@ function process_archive(topic, payload, callback) { // function process_create_group(topic, payload, callback) { // var topic_parts = topic.split(".") -// console.debug("process_create_group. TOPIC PARTS:" + topic_parts + " payload:" + payload) +// logger.debug("process_create_group. TOPIC PARTS:" + topic_parts + " payload:" + payload) // // `apps.observer.${app_id}.groups.create` // const app_id = topic_parts[2] -// console.debug("app_id:" + app_id) -// console.debug("payload:"+ payload) +// logger.debug("app_id:" + app_id) +// logger.debug("payload:"+ payload) // const group = JSON.parse(payload) // if (!group.uid || !group.name || !group.members || !group.owner) { -// console.error("Group error during creation. Metadata missed."); +// logger.error("Group error during creation. Metadata missed."); // callback(true); // dequeue // return // } @@ -906,9 +907,9 @@ function process_archive(topic, payload, callback) { // } // else { // for (let [member_id, value] of Object.entries(group.members)) { -// console.debug(">>>>> JOINING MEMBER: "+member_id) +// logger.debug(">>>>> JOINING MEMBER: "+member_id) // joinGroup(member_id, group, function(reply) { -// console.debug("member: " + member_id + " invited on group " + group + " result " + reply) +// logger.debug("member: " + member_id + " invited on group " + group + " result " + reply) // }) // } // callback(true) @@ -930,10 +931,10 @@ function process_archive(topic, payload, callback) { * @param {*} callback */ function joinGroup(joined_member_id, group, callback) { - winston.debug("SENDING 'ADDED TO GROUP' TO EACH MEMBER INCLUDING THE JOINED ONE...", group) + logger.debug("SENDING 'ADDED TO GROUP' TO EACH MEMBER INCLUDING THE JOINED ONE...", group) const appid = group.appId for (let [member_id, value] of Object.entries(group.members)) { - winston.debug("to member:" + member_id) + logger.debug("to member:" + member_id) const now = Date.now() const message = { message_id: uuid(), @@ -958,17 +959,17 @@ function joinGroup(joined_member_id, group, callback) { } } } - winston.debug("Member joined group message:", message) + logger.debug("Member joined group message:", message) let inbox_of = member_id let convers_with = group.uid deliverMessage(message, appid, inbox_of, convers_with, (ok) => { if (!ok) { - winston.error("error delivering message to joined member", inbox_of) + logger.error("error delivering message to joined member", inbox_of) callback(ok) return } else { - winston.debug("DELIVERED MESSAGE TO: " + inbox_of + " CONVERS_WITH " + convers_with) + logger.debug("DELIVERED MESSAGE TO: " + inbox_of + " CONVERS_WITH " + convers_with) } }) } @@ -977,26 +978,26 @@ function joinGroup(joined_member_id, group, callback) { const convid = group.uid chatdb.lastMessages(appid, userid, convid, 1, 200, (err, messages) => { if (err) { - winston.error("Error", err) + logger.error("Error", err) callback(err) } else if (!messages) { - winston.debug("No messages in group: " + group.uid) + logger.debug("No messages in group: " + group.uid) callback(null) } else { - winston.debug("delivering past group messages to:" + joined_member_id + " messages: ", messages) + logger.debug("delivering past group messages to:" + joined_member_id + " messages: ", messages) const inbox_of = joined_member_id const convers_with = group.uid messages.forEach(message => { // TODO: CHECK IN MESSAGE WAS ALREADY DELIVERED. (CLIENT? SERVER?) - winston.debug("Message:", message.text) + logger.debug("Message:", message.text) deliverMessage(message, appid, inbox_of, convers_with, (err) => { if (err) { - winston.error("error delivering past message to joined member: " + inbox_of, err) + logger.error("error delivering past message to joined member: " + inbox_of, err) } else { - winston.debug("DELIVERED PAST MESSAGE TO: " + inbox_of + " CONVERS_WITH : " + convers_with) + logger.debug("DELIVERED PAST MESSAGE TO: " + inbox_of + " CONVERS_WITH : " + convers_with) } }) }); @@ -1007,19 +1008,19 @@ function joinGroup(joined_member_id, group, callback) { function process_update_group(topic, payload, callback) { var topic_parts = topic.split(".") - winston.debug("process_update_group. TOPIC PARTS:" + topic_parts + "payload:" + payload) + logger.debug("process_update_group. TOPIC PARTS:" + topic_parts + "payload:" + payload) // `apps.observer.${app_id}.groups.update` const app_id = topic_parts[2] - winston.debug("app_id:" + app_id) - winston.debug("payload:" + payload) + logger.debug("app_id:" + app_id) + logger.debug("payload:" + payload) const data = JSON.parse(payload) - winston.debug("process_update_group DATA ", data) + logger.debug("process_update_group DATA ", data) const group = data.group - winston.debug("process_update_group DATA.group ", data.group) + logger.debug("process_update_group DATA.group ", data.group) const notify_to = data.notify_to - winston.debug("process_update_group DATA.notify_to ", data.notify_to); + logger.debug("process_update_group DATA.notify_to ", data.notify_to); if (!group || !group.uid) { - winston.error("Group not found!"); + logger.error("Group not found!"); callback(true) return } @@ -1032,7 +1033,7 @@ function process_update_group(topic, payload, callback) { // function saveOrUpdateGroup(group, callback) { // chatdb.saveOrUpdateGroup(group, function(err, doc) { // if (err) { -// winston.error("Error saving group:", err) +// logger.error("Error saving group:", err) // callback(false) // return // } @@ -1047,11 +1048,11 @@ function process_update_group(topic, payload, callback) { // for (let [key, value] of Object.entries(group.members)) { // const member_id = key // const added_group_topic = `apps.${app_id}.users.${member_id}.groups.${group.uid}.clientadded` -// console.debug("added_group_topic:", added_group_topic) +// logger.debug("added_group_topic:", added_group_topic) // const payload = JSON.stringify(group) // publish(exchange, added_group_topic, Buffer.from(payload), function(err, msg) { // if (err) { -// console.error("error publish deliverGroupAdded",err); +// logger.error("error publish deliverGroupAdded",err); // // callback(false) // // return // } @@ -1065,11 +1066,11 @@ function deliverGroupUpdated(group, notify_to, callback) { for (let [key, value] of Object.entries(notify_to)) { const member_id = key const updated_group_topic = `apps.${app_id}.users.${member_id}.groups.${group.uid}.clientupdated` - winston.debug("updated_group_topic:", updated_group_topic) + logger.debug("updated_group_topic:", updated_group_topic) const payload = JSON.stringify(group) publish(exchange, updated_group_topic, Buffer.from(payload), function(err, msg) { if (err) { - winston.error("error publish deliverGroupUpdated",err); + logger.error("error publish deliverGroupUpdated",err); // callback(false) // return } @@ -1107,13 +1108,13 @@ function deliverGroupUpdated(group, notify_to, callback) { // } // const user_id = member_id // const convers_with = group.uid -// console.debug("group_created_message: ", group_created_message) -// console.debug("user_id: " + user_id) -// console.debug("convers_with: " + convers_with) +// logger.debug("group_created_message: ", group_created_message) +// logger.debug("user_id: " + user_id) +// logger.debug("convers_with: " + convers_with) // deliverMessage(group_created_message, app_id, user_id, convers_with, function(ok) { -// winston.debug("MESSAGE DELIVERED?", ok) +// logger.debug("MESSAGE DELIVERED?", ok) // if (!ok) { -// winston.debug("Error sending group creation message.", group_created_message) +// logger.debug("Error sending group creation message.", group_created_message) // callback(false) // return // } @@ -1124,7 +1125,7 @@ function deliverGroupUpdated(group, notify_to, callback) { function closeOnErr(err) { if (!err) return false; - console.error("[AMQP] error", err); + logger.error("[AMQP] error", err); amqpConn.close(); return true; } @@ -1136,21 +1137,21 @@ function closeOnErr(err) { // // Create a database variable outside of the // // database connection callback to reuse the connection pool in the app. // var db; -// winston.debug("connecting to mongodb...") +// logger.debug("connecting to mongodb...") // mongodb.MongoClient.connect(mongouri, { useNewUrlParser: true, useUnifiedTopology: true }, function (err, client) { // if (err) { -// winston.debug(err); +// logger.debug(err); // process.exit(1); // } else { -// winston.debug("MongoDB successfully connected.") +// logger.debug("MongoDB successfully connected.") // } // db = client.db(); // // var port = process.env.PORT || 3000; // // app.listen(port, () => { -// // winston.debug('Web server started.'); +// // logger.debug('Web server started.'); // // }) // chatdb = new ChatDB({database: db}) -// winston.debug('Starting observer.') +// logger.debug('Starting observer.') // startMQ(); // }); @@ -1180,18 +1181,18 @@ async function startServer(config) { mongo_uri = config.mongo_uri || "mongodb://localhost:27017/chatdb"; var db; - winston.debug("connecting to mongodb..."); + logger.debug("connecting to mongodb..."); var client = await mongodb.MongoClient.connect(mongo_uri, { useNewUrlParser: true, useUnifiedTopology: true }) - winston.debug("mongodb connected...", db); + logger.debug("mongodb connected...", db); db = client.db(); chatdb = new ChatDB({database: db}) - winston.info("Starting webhooks..."); + logger.info("Starting webhooks..."); webhooks = new Webhooks({appId: app_id, RABBITMQ_URI: rabbitmq_uri, exchange: exchange, webhook_endpoint: webhook_endpoint, webhook_events: webhook_events_array, queue_name: 'webhooks'}); await webhooks.start(); webhooks.enabled = webhook_enabled; - winston.debug('Starting observer.') + logger.debug('Starting observer.') var amqpConnection = await start(); - winston.debug("[Observer.AMQP] connected."); + logger.debug("[Observer.AMQP] connected."); } @@ -1200,22 +1201,22 @@ async function startServer(config) { // ************ WEBHOOKS *********** // // function WHnotifyMessageReceived(message, callback) { -// winston.debug("NOTIFY MESSAGE:", message); +// logger.debug("NOTIFY MESSAGE:", message); // if (webhook_enabled===false) { -// winston.debug("WHnotifyMessageReceived Discarding notification. webhook_enabled is false."); +// logger.debug("WHnotifyMessageReceived Discarding notification. webhook_enabled is false."); // // callback({err: "WHnotifyMessageReceived Discarding notification. webhook_enabled is false."}); // callback(null) // return // } // const notify_topic = `observer.webhook.apps.${app_id}.message_received` -// winston.debug("notifying webhook notifyMessageReceived topic:" + notify_topic) +// logger.debug("notifying webhook notifyMessageReceived topic:" + notify_topic) // const message_payload = JSON.stringify(message) -// winston.debug("MESSAGE_PAYLOAD: " + message_payload) +// logger.debug("MESSAGE_PAYLOAD: " + message_payload) // publish(exchange, notify_topic, Buffer.from(message_payload), (err) => { // if (err) { -// winston.error("Err", err) +// logger.error("Err", err) // callback(err) // } // else { @@ -1225,10 +1226,10 @@ async function startServer(config) { // } // function WHnotifyMessageSaved(message, callback) { -// winston.debug("NOTIFY MESSAGE:", message) +// logger.debug("NOTIFY MESSAGE:", message) // if (webhook_enabled===false) { -// winston.debug("WHnotifyMessageSaved Discarding notification. webhook_enabled is false."); +// logger.debug("WHnotifyMessageSaved Discarding notification. webhook_enabled is false."); // // callback({err: "WHnotifyMessageSaved Discarding notification. webhook_enabled is false."}); // callback(null) // return @@ -1236,12 +1237,12 @@ async function startServer(config) { // // callback(null) // const notify_topic = `observer.webhook.apps.${app_id}.message_saved` -// winston.debug("notifying webhook notifyMessageSaved topic: " + notify_topic) +// logger.debug("notifying webhook notifyMessageSaved topic: " + notify_topic) // const message_payload = JSON.stringify(message) -// winston.debug("MESSAGE_PAYLOAD: " + message_payload) +// logger.debug("MESSAGE_PAYLOAD: " + message_payload) // publish(exchange, notify_topic, Buffer.from(message_payload), (err) => { // if (err) { -// winston.error("Err", err) +// logger.error("Err", err) // callback(err) // } // else { @@ -1251,10 +1252,10 @@ async function startServer(config) { // } // function WHnotifyConversationSaved(conversation, callback) { -// winston.debug("NOTIFY CONVERSATION:", conversation) +// logger.debug("NOTIFY CONVERSATION:", conversation) // if (webhook_enabled===false) { -// winston.debug("WHnotifyConversationSaved Discarding notification. webhook_enabled is false."); +// logger.debug("WHnotifyConversationSaved Discarding notification. webhook_enabled is false."); // // callback({err: "WHnotifyConversationSaved Discarding notification. webhook_enabled is false."}); // callback(null) // return @@ -1262,17 +1263,17 @@ async function startServer(config) { // // callback(null) // const notify_topic = `observer.webhook.apps.${app_id}.conversation_saved` -// winston.debug("notifying webhook notifyConversationSaved topic: "+ notify_topic) +// logger.debug("notifying webhook notifyConversationSaved topic: "+ notify_topic) // const conversation_payload = JSON.stringify(conversation) -// winston.debug("CONVERSATION_PAYLOAD:"+ conversation_payload) +// logger.debug("CONVERSATION_PAYLOAD:"+ conversation_payload) // publish(exchange, notify_topic, Buffer.from(conversation_payload), (err) => { // if (err) { -// winston.error("Err", err) +// logger.error("Err", err) // callback(err) // //ATTENTO // } // else { -// // winston.debug("ok",callback) +// // logger.debug("ok",callback) // callback(null) // //ATTENTO // } @@ -1280,22 +1281,22 @@ async function startServer(config) { // } // function WHnotifyConversationArchived(conversation, callback) { -// winston.debug("NOTIFY CONVERSATION ARCHIVED:", conversation) +// logger.debug("NOTIFY CONVERSATION ARCHIVED:", conversation) // if (webhook_enabled===false) { -// winston.debug("WHnotifyConversationArchived Discarding notification. webhook_enabled is false."); +// logger.debug("WHnotifyConversationArchived Discarding notification. webhook_enabled is false."); // // callback({err: "WHnotifyConversationArchived Discarding notification. webhook_enabled is false."}); // callback(null) // return // } // const notify_topic = `observer.webhook.apps.${app_id}.conversation_archived` -// winston.debug("notifying webhook notifyConversationArchived topic: " + notify_topic) +// logger.debug("notifying webhook notifyConversationArchived topic: " + notify_topic) // const payload = JSON.stringify(conversation) -// winston.debug("PAYLOAD:", payload) +// logger.debug("PAYLOAD:", payload) // publish(exchange, notify_topic, Buffer.from(payload), (err) => { // if (err) { -// winston.error("Err", err) +// logger.error("Err", err) // callback(err) // } // else { @@ -1305,34 +1306,34 @@ async function startServer(config) { // } // function WHprocess_webhook_message_received(topic, message_string, callback) { -// winston.debug("process webhook_message_received: " + message_string + " on topic: " + topic) +// logger.debug("process webhook_message_received: " + message_string + " on topic: " + topic) // var message = JSON.parse(message_string) -// winston.debug("timelineOf...:" + message.timelineOf) +// logger.debug("timelineOf...:" + message.timelineOf) // if (callback) { // callback(true) // } // if (webhook_enabled===false) { -// winston.debug("WHprocess_webhook_message_received Discarding notification. webhook_enabled is false."); +// logger.debug("WHprocess_webhook_message_received Discarding notification. webhook_enabled is false."); // // callback(true); // return // } // if (!WHisMessageOnGroupTimeline(message)) { -// winston.debug("WHprocess_webhook_message_received Discarding notification. Not to group."); +// logger.debug("WHprocess_webhook_message_received Discarding notification. Not to group."); // // callback(true); // return // } if (!webhook_endpoint) { -// winston.debug("WHprocess_webhook_message_received Discarding notification. webhook_endpoint is undefined.") +// logger.debug("WHprocess_webhook_message_received Discarding notification. webhook_endpoint is undefined.") // // callback(true); // return // } // if (webhook_methods_array.indexOf("new-message")==-1) { -// winston.debug("WHprocess_webhook_message_received Discarding notification. new-message not enabled."); +// logger.debug("WHprocess_webhook_message_received Discarding notification. new-message not enabled."); // // callback(true); // return // } -// winston.verbose("Sending notification to webhook (webhook_message_received) on webhook_endpoint:", webhook_endpoint) +// logger.debug("Sending notification to webhook (webhook_message_received) on webhook_endpoint:", webhook_endpoint) // const message_id = message.message_id; // const recipient_id = message.recipient; // const app_id = message.app_id; @@ -1344,12 +1345,12 @@ async function startServer(config) { // message_id: message_id, // data: message // }; -// winston.debug("WHprocess_webhook_message_received Sending JSON webhook:", json) +// logger.debug("WHprocess_webhook_message_received Sending JSON webhook:", json) // WHsendData(json, function(err, data) { // if (err) { -// winston.error("Err WHsendData callback", err); +// logger.error("Err WHsendData callback", err); // } else { -// winston.debug("WHsendData sendata end with data:" + data); +// logger.debug("WHsendData sendata end with data:" + data); // } // }) // } @@ -1358,34 +1359,34 @@ async function startServer(config) { // function WHprocess_webhook_message_saved(topic, message_string, callback) { -// winston.debug("process webhook_message_saved: " + message_string + " on topic: " + topic) +// logger.debug("process webhook_message_saved: " + message_string + " on topic: " + topic) // var message = JSON.parse(message_string) -// winston.debug("timelineOf...: " + message.timelineOf) +// logger.debug("timelineOf...: " + message.timelineOf) // if (callback) { // callback(true) // } // if (webhook_enabled===false) { -// winston.debug("WHprocess_webhook_message_saved Discarding notification. webhook_enabled is false."); +// logger.debug("WHprocess_webhook_message_saved Discarding notification. webhook_enabled is false."); // // callback(true); // return // } // if (!WHisMessageOnGroupTimeline(message)) { -// winston.debug("WHprocess_webhook_message_saved Discarding notification. Not to group.") +// logger.debug("WHprocess_webhook_message_saved Discarding notification. Not to group.") // return // } else if (!webhook_endpoint) { -// winston.debug("WHprocess_webhook_message_saved Discarding notification. webhook_endpoint is undefined.") +// logger.debug("WHprocess_webhook_message_saved Discarding notification. webhook_endpoint is undefined.") // return // } // if (webhook_methods_array.indexOf("new-message-saved")==-1) { -// winston.debug("WHprocess_webhook_message_saved Discarding notification. new-message-saved not enabled."); +// logger.debug("WHprocess_webhook_message_saved Discarding notification. new-message-saved not enabled."); // // callback(true); // return // } -// winston.verbose("Sending notification to webhook (webhook_message_saved) on webhook_endpoint:", webhook_endpoint) +// logger.debug("Sending notification to webhook (webhook_message_saved) on webhook_endpoint:", webhook_endpoint) // const message_id = message.message_id; // const recipient_id = message.recipient; // const app_id = message.app_id; @@ -1397,19 +1398,19 @@ async function startServer(config) { // message_id: message_id, // data: message // }; -// winston.debug("WHprocess_webhook_message_saved Sending JSON webhook:", json) +// logger.debug("WHprocess_webhook_message_saved Sending JSON webhook:", json) // WHsendData(json, function(err, data) { // if (err) { -// winston.error("Err WHsendData callback", err); +// logger.error("Err WHsendData callback", err); // } else { -// winston.debug("WHsendData sendata end with data:" + data); +// logger.debug("WHsendData sendata end with data:" + data); // } // }) // } // function WHprocess_webhook_conversation_saved(topic, conversation_string, callback) { -// winston.debug("process webhook_conversation_saved:" + conversation_string + "on topic" + topic) +// logger.debug("process webhook_conversation_saved:" + conversation_string + "on topic" + topic) // var conversation = JSON.parse(conversation_string) // if (callback) { @@ -1417,23 +1418,23 @@ async function startServer(config) { // } // if (webhook_enabled===false) { -// winston.debug("Discarding notification. webhook_enabled is false."); +// logger.debug("Discarding notification. webhook_enabled is false."); // // callback(true); // return // } // if (!webhook_endpoint) { -// winston.debug("Discarding notification. webhook_endpoint is undefined.") +// logger.debug("Discarding notification. webhook_endpoint is undefined.") // return // } // if (webhook_methods_array.indexOf("conversation-saved")==-1) { -// winston.debug("Discarding notification. conversation-saved not enabled."); +// logger.debug("Discarding notification. conversation-saved not enabled."); // // callback(true); // return // } -// winston.verbose("Sending notification to webhook (webhook_conversation_saved) on webhook_endpoint:"+ webhook_endpoint + " coonversation: " + conversation_string) +// logger.debug("Sending notification to webhook (webhook_conversation_saved) on webhook_endpoint:"+ webhook_endpoint + " coonversation: " + conversation_string) // // const message_id = message.message_id; // // const recipient_id = message.recipient; // const app_id = conversation.app_id; @@ -1445,46 +1446,46 @@ async function startServer(config) { // // message_id: message_id, // data: conversation // }; -// winston.debug("Sending JSON webhook:", json) +// logger.debug("Sending JSON webhook:", json) // WHsendData(json, function(err, data) { // if (err) { -// winston.error("Err WHsendData callback", err); +// logger.error("Err WHsendData callback", err); // } else { -// winston.debug("WHsendData sendata end with data:" + data); +// logger.debug("WHsendData sendata end with data:" + data); // } // }) // } // function WHprocess_webhook_conversation_archived(topic, message_string, callback) { -// winston.debug("process webhook_conversation_archived:", message_string, "on topic", topic) +// logger.debug("process webhook_conversation_archived:", message_string, "on topic", topic) // var conversation = JSON.parse(message_string) // if (callback) { // callback(true) // } // if (webhook_enabled===false) { -// winston.debug("Discarding notification. webhook_enabled is false."); +// logger.debug("Discarding notification. webhook_enabled is false."); // // callback(true); // return // } // // if (!WHisMessageOnGroupTimeline(message)) { -// // winston.debug("Discarding notification. Not to group.") +// // logger.debug("Discarding notification. Not to group.") // // return // // } // if (!webhook_endpoint) { -// winston.debug("WHprocess_webhook_conversation_archived: Discarding notification. webhook_endpoint is undefined.") +// logger.debug("WHprocess_webhook_conversation_archived: Discarding notification. webhook_endpoint is undefined.") // return // } // if (webhook_methods_array.indexOf("deleted-conversation")==-1) { -// winston.debug("Discarding notification. deleted-conversation not enabled."); +// logger.debug("Discarding notification. deleted-conversation not enabled."); // // callback(true); // return // } -// winston.verbose("Sending notification to webhook (webhook_conversation_archived):", webhook_endpoint) +// logger.debug("Sending notification to webhook (webhook_conversation_archived):", webhook_endpoint) // const conversWith = conversation.conversWith; // const timelineOf = "system"; // conversation.timelineOf; temporary patch for Tiledesk @@ -1497,16 +1498,16 @@ async function startServer(config) { // recipient_id: conversWith, // data: conversation // }; -// winston.debug("Sending JSON webhook:", json) +// logger.debug("Sending JSON webhook:", json) // WHsendData(json, function(err, data) { // if (err) { -// winston.error("Err WHsendData callback", err); +// logger.error("Err WHsendData callback", err); // } else { -// winston.debug("WHsendData sendata end with data:" + data); +// logger.debug("WHsendData sendata end with data:" + data); // } // }) // // var q = url.parse(webhook_endpoint, true); -// // winston.debug("ENV WEBHOOK URL PARSED:", q) +// // logger.debug("ENV WEBHOOK URL PARSED:", q) // // var protocol = (q.protocol == "http:") ? require('http') : require('https'); // // let options = { // // path: q.pathname, @@ -1524,14 +1525,14 @@ async function startServer(config) { // // respdata += chunk; // // }); // // response.on('end', function () { -// // winston.debug("WEBHOOK RESPONSE:", respdata); +// // logger.debug("WEBHOOK RESPONSE:", respdata); // // }); // // }); // // req.write(JSON.stringify(json)); // // req.end(); // // } // // catch(err) { -// // winston.debug("an error occurred:", err) +// // logger.debug("an error occurred:", err) // // } // }) // } @@ -1550,7 +1551,7 @@ async function startServer(config) { // } // function WHsendData(json, callback) { // var q = url.parse(webhook_endpoint, true); -// winston.debug("ENV WEBHOOK URL PARSED:", q) +// logger.debug("ENV WEBHOOK URL PARSED:", q) // var protocol = (q.protocol == "http:") ? require('http') : require('https'); // let options = { // path: q.pathname, @@ -1563,31 +1564,31 @@ async function startServer(config) { // }; // try { // const req = protocol.request(options, (response) => { -// winston.debug("statusCode: "+ response.statusCode + " for webhook_endpoint: " + webhook_endpoint); +// logger.debug("statusCode: "+ response.statusCode + " for webhook_endpoint: " + webhook_endpoint); // if (response.statusCode < 200 || response.statusCode > 299) { // (I don"t know if the 3xx responses come here, if so you"ll want to handle them appropriately -// winston.debug("http statusCode error "+ response.statusCode + " for webhook_endpoint: " + webhook_endpoint); +// logger.debug("http statusCode error "+ response.statusCode + " for webhook_endpoint: " + webhook_endpoint); // return callback({statusCode:response.statusCode}, null) // } // var respdata = '' // response.on('data', function (chunk) { -// // winston.debug("chunk"+chunk) +// // logger.debug("chunk"+chunk) // respdata += chunk; // }); // response.on('end', function () { -// winston.info("WEBHOOK RESPONSE:"+ respdata + " for webhook_endpoint: " + webhook_endpoint); +// logger.info("WEBHOOK RESPONSE:"+ respdata + " for webhook_endpoint: " + webhook_endpoint); // return callback(null, respdata) //TODO SE IL WEBHOOK NN RITORNA SEMBRA CHE SI BLOCCI // }); // }); // req.on('error', function(err) { -// winston.error("WEBHOOK RESPONSE Error:", err); +// logger.error("WEBHOOK RESPONSE Error:", err); // return callback(err, null) // }); // req.write(JSON.stringify(json)); // req.end(); -// // winston.debug("end") +// // logger.debug("end") // } // catch(err) { -// winston.error("an error occurred while posting this json " + JSON.stringify(json), err) +// logger.error("an error occurred while posting this json " + JSON.stringify(json), err) // return callback(err, null) // } // } diff --git a/tiledesk-logger/index.js b/tiledesk-logger/index.js new file mode 100644 index 0000000..e2c38ea --- /dev/null +++ b/tiledesk-logger/index.js @@ -0,0 +1,58 @@ +/* + ver 0.1 + Andrea Sponziello - (c) Tiledesk.com +*/ + +require('dotenv').config(); + +const LEVEL_DEBUG = 0 +const LEVEL_INFO = 1 +const LEVEL_ERROR = 2 +const LEVEL_LOG = LEVEL_DEBUG + +/** + * Tiledesk logger + */ +class TiledeskLogger { + /** + * Constructor + * + * @example + * const logger = require('tiledesk-logger'); + */ + + constructor() { + this.levels = {'DEBUG': LEVEL_DEBUG, 'ERROR': LEVEL_ERROR, 'INFO': LEVEL_INFO, 'LOG': LEVEL_LOG}; + this.logLevel = this.levels[process.env.LOG_LEVEL] || LEVEL_DEBUG + // console.log("actual logLevel", this.logLevel) + } + + debug(...args) { + if (this.logLevel == LEVEL_DEBUG) { + console.debug.apply(console,args) + } + } + + log(...args) { + if (this.logLevel == LEVEL_DEBUG) { + console.log.apply(console,args) + } + } + + info(...args) { + if (this.logLevel <= LEVEL_INFO) { + console.info.apply(console,args) + } + } + + error(...args) { + // if (this.logLevel <= LEVEL_ERROR) { + console.error.apply(console,args) + // } + } + +} + + + +module.exports.logger = new TiledeskLogger(); \ No newline at end of file diff --git a/webhooks/index.js b/webhooks/index.js index 7cd333d..acabf9b 100644 --- a/webhooks/index.js +++ b/webhooks/index.js @@ -4,7 +4,8 @@ */ const amqp = require('amqplib/callback_api'); -const winston = require("../winston"); +// const winston = require("../winston"); +const logger = require('../tiledesk-logger').logger; var url = require('url'); const MessageConstants = require("../models/messageConstants"); // const messageConstants = require('../models/messageConstants'); @@ -90,7 +91,7 @@ class Webhooks { this.webhook_events_array = options.webhook_events || DEFAULT_WEBHOOK_EVENTS; - winston.debug("webhooks inizialized: this.exchange:", this.exchange, "this.offlinePubQueue:", this.offlinePubQueue) + logger.debug("webhooks inizialized: this.exchange:", this.exchange, "this.offlinePubQueue:", this.offlinePubQueue) } @@ -119,13 +120,13 @@ class Webhooks { // WHnotifyMessageReceived // notifyMessageReceived(message) { - // winston.debug("NOTIFY MESSAGE:", message) + // logger.debug("NOTIFY MESSAGE:", message) // const notify_topic = `observer.webhook.apps.${app_id}.message_received` - // winston.debug("notifying webhook notifyMessageReceived topic:", notify_topic) + // logger.debug("notifying webhook notifyMessageReceived topic:", notify_topic) // const message_payload = JSON.stringify(message) // this.publish(this.exchange, notify_topic, Buffer.from(message_payload), (err) => { // if (err) { - // winston.debug("Err", err) + // logger.debug("Err", err) // } // }) // } @@ -133,15 +134,15 @@ class Webhooks { // WHprocess_webhook_message_received // process_webhook_message_received(topic, message_string, callback) { - // winston.debug("process_webhook_message_received.from.incoming:", message_string, "on topic", topic) + // logger.debug("process_webhook_message_received.from.incoming:", message_string, "on topic", topic) // var message = JSON.parse(message_string) - // winston.debug("timelineOf...:", message.timelineOf) + // logger.debug("timelineOf...:", message.timelineOf) // if (callback) { // callback(true) // } // if (this.isMessageOnGroupTimeline(message)) { - // winston.debug("Sending this message for group timeline:", message) + // logger.debug("Sending this message for group timeline:", message) // } // const message_id = message.message_id; // const recipient_id = message.recipient_id; @@ -157,7 +158,7 @@ class Webhooks { // }; // var q = url.parse(process.env.WEBHOOK_ENDPOINT, true); - // winston.debug("ENV WEBHOOK URL PARSED:", q) + // logger.debug("ENV WEBHOOK URL PARSED:", q) // var protocol = (q.protocol == "http") ? require('http') : require('https'); // let options = { // path: q.pathname, @@ -176,7 +177,7 @@ class Webhooks { // }); // response.on('end', function () { - // winston.debug("WEBHOOK RESPONSE:", respdata); + // logger.debug("WEBHOOK RESPONSE:", respdata); // }); // } @@ -201,11 +202,11 @@ class Webhooks { // ************ WEBHOOKS *********** // WHnotifyMessageStatusSentOrDelivered(message_payload, topic, callback) { - console.log("WHnotifyMessageStatusSentOrDelivered()", message_payload) + logger.log("WHnotifyMessageStatusSentOrDelivered()", message_payload) let message = JSON.parse(message_payload); message['temp_field_chat_topic'] = topic; if (message.status == MessageConstants.CHAT_MESSAGE_STATUS_CODE.SENT) { - console.log("SENT...") + logger.log("SENT...") this.WHnotifyMessageStatusSent(message, (err) => { if (callback) { callback(err); @@ -216,7 +217,7 @@ class Webhooks { }) } else if (message.status == MessageConstants.CHAT_MESSAGE_STATUS_CODE.DELIVERED) { - console.log("DELIVERED...") + logger.log("DELIVERED...") this.WHnotifyMessageStatusDelivered(message, (err) => { if (callback) { callback(err); @@ -227,19 +228,19 @@ class Webhooks { }) } else { - console.log("FUCK THIS DELIVERED...") + logger.log("FUCK THIS DELIVERED...") callback(null); } } WHnotifyMessageStatusSent(message, callback) { - console.log("WH Sent method."); + logger.log("WH Sent method."); if (this.webhook_events_array.indexOf(MessageConstants.WEBHOOK_EVENTS.MESSAGE_SENT) == -1) { - winston.debug("WH MESSAGE_SENT disabled."); + logger.debug("WH MESSAGE_SENT disabled."); callback(null); } else { - console.log("WH MESSAGE_SENT enabled"); - winston.debug("WH MESSAGE_DELIVERED enabled."); + logger.log("WH MESSAGE_SENT enabled"); + logger.debug("WH MESSAGE_DELIVERED enabled."); this.WHnotifyMessageDeliver(message, (err) => { callback(err); }); @@ -248,10 +249,10 @@ class Webhooks { WHnotifyMessageStatusDelivered(message, callback) { if (this.webhook_events_array.indexOf(MessageConstants.WEBHOOK_EVENTS.MESSAGE_DELIVERED) == -1) { - winston.debug("WH MESSAGE_DELIVERED disabled."); + logger.debug("WH MESSAGE_DELIVERED disabled."); callback(null); } else { - winston.debug("WH MESSAGE_DELIVERED enabled."); + logger.debug("WH MESSAGE_DELIVERED enabled."); this.WHnotifyMessageDeliver(message, (err) => { callback(err); }); @@ -260,7 +261,7 @@ class Webhooks { WHnotifyMessageStatusReturnReceipt(message, callback) { if (this.webhook_events_array.indexOf(MessageConstants.WEBHOOK_EVENTS.MESSAGE_RETURN_RECEIPT) == -1) { - winston.debug("WH MESSAGE_RETURN_RECEIPT disabled."); + logger.debug("WH MESSAGE_RETURN_RECEIPT disabled."); callback(null); } else { this.WHnotifyMessageUpdate(message, (err) => { @@ -270,19 +271,19 @@ class Webhooks { } WHnotifyMessageDeliver(message, callback) { - winston.debug("WH NOTIFY MESSAGE:", message); + logger.debug("WH NOTIFY MESSAGE:", message); if (this.enabled===false) { - winston.debug("webhooks disabled"); + logger.debug("webhooks disabled"); callback(null) return } const notify_topic = `observer.webhook.apps.${this.appId}.message_deliver` - winston.debug("notifying webhook MessageSent topic:" + notify_topic) + logger.debug("notifying webhook MessageSent topic:" + notify_topic) const message_payload = JSON.stringify(message) - winston.debug("MESSAGE_PAYLOAD: " + message_payload) + logger.debug("MESSAGE_PAYLOAD: " + message_payload) this.publish(this.exchange, notify_topic, Buffer.from(message_payload), (err) => { if (err) { - winston.error("Err", err) + logger.error("Err", err) callback(err) } else { @@ -292,19 +293,19 @@ class Webhooks { } WHnotifyMessageUpdate(message, callback) { - winston.debug("NOTIFY MESSAGE UPDATE:", message); + logger.debug("NOTIFY MESSAGE UPDATE:", message); if (this.enabled===false) { - winston.debug("webhooks disabled"); + logger.debug("webhooks disabled"); callback(null) return } const notify_topic = `observer.webhook.apps.${this.appId}.message_update` - winston.debug("notifying webhook message_update topic:" + notify_topic) + logger.debug("notifying webhook message_update topic:" + notify_topic) const message_payload = JSON.stringify(message) - winston.debug("MESSAGE_PAYLOAD: " + message_payload) + logger.debug("MESSAGE_PAYLOAD: " + message_payload) this.publish(this.exchange, notify_topic, Buffer.from(message_payload), (err) => { if (err) { - winston.error("Err", err) + logger.error("Err", err) callback(err) } else { @@ -314,10 +315,10 @@ class Webhooks { } WHnotifyConversationArchived(conversation, topic, callback) { - winston.debug("NOTIFY CONVERSATION ARCHIVED:", conversation) + logger.debug("NOTIFY CONVERSATION ARCHIVED:", conversation) if (this.enabled===false) { - winston.debug("WHnotifyConversationArchived Discarding notification. webhook_enabled is false."); + logger.debug("WHnotifyConversationArchived Discarding notification. webhook_enabled is false."); // callback({err: "WHnotifyConversationArchived Discarding notification. webhook_enabled is false."}); callback(null) return @@ -326,12 +327,12 @@ class Webhooks { conversation['temp_field_chat_topic'] = topic; const notify_topic = `observer.webhook.apps.${this.appId}.conversation_archived` - winston.debug("notifying webhook notifyConversationArchived topic: " + notify_topic) + logger.debug("notifying webhook notifyConversationArchived topic: " + notify_topic) const payload = JSON.stringify(conversation) - winston.debug("PAYLOAD:", payload) + logger.debug("PAYLOAD:", payload) this.publish(this.exchange, notify_topic, Buffer.from(payload), (err) => { if (err) { - winston.error("Err", err) + logger.error("Err", err) callback(err) } else { @@ -341,33 +342,33 @@ class Webhooks { } WHprocess_webhook_message_deliver(topic, message_string, callback) { - winston.debug("process WHprocess_webhook_message_deliver: " + message_string + " on topic: " + topic) + logger.debug("process WHprocess_webhook_message_deliver: " + message_string + " on topic: " + topic) var message = JSON.parse(message_string) if (callback) { callback(true) } // if (this.enabled===false) { - // winston.debug("WHprocess_webhook_message_deliver Discarding notification. webhook_enabled is false."); + // logger.debug("WHprocess_webhook_message_deliver Discarding notification. webhook_enabled is false."); // return // } // if (!this.WHisMessageOnGroupTimeline(message)) { - // winston.debug("WHprocess_webhook_message_deliver Discarding notification. Not to group."); + // logger.debug("WHprocess_webhook_message_deliver Discarding notification. Not to group."); // // callback(true); // return // } if (!this.webhook_endpoint) { - winston.debug("WHprocess_webhook_message_deliver Discarding notification. webhook_endpoint is undefined.") + logger.debug("WHprocess_webhook_message_deliver Discarding notification. webhook_endpoint is undefined.") // callback(true); return } // if (this.webhook_methods_array.indexOf("new-message")==-1) { - // winston.debug("WHprocess_webhook_message_deliver Discarding notification. new-message not enabled."); + // logger.debug("WHprocess_webhook_message_deliver Discarding notification. new-message not enabled."); // // callback(true); // return // } - winston.verbose("Sending notification to webhook (message_deliver) on webhook_endpoint:" + this.webhook_endpoint); + logger.debug("Sending notification to webhook (message_deliver) on webhook_endpoint:" + this.webhook_endpoint); const message_id = message.message_id; const recipient_id = message.recipient; const app_id = message.app_id; @@ -388,28 +389,28 @@ class Webhooks { extras: {topic: message['temp_field_chat_topic']} }; delete message['temp_field_chat_topic']; - winston.debug("WHprocess_webhook_message_received Sending JSON webhook:", json) + logger.debug("WHprocess_webhook_message_received Sending JSON webhook:", json) this.WHsendData(json, function(err, data) { if (err) { - winston.error("Err WHsendData callback", err); + logger.error("Err WHsendData callback", err); } else { - winston.debug("WHsendData sendata end with data:" + data); + logger.debug("WHsendData sendata end with data:" + data); } }) } WHprocess_webhook_message_update(topic, message_string, callback) { - winston.debug("process WHprocess_webhook_message_update: " + message_string + " on topic: " + topic) + logger.debug("process WHprocess_webhook_message_update: " + message_string + " on topic: " + topic) var message = JSON.parse(message_string) - winston.debug("timelineOf:" + message.timelineOf) + logger.debug("timelineOf:" + message.timelineOf) if (callback) { callback(true) } if (!this.webhook_endpoint) { - winston.debug("WHprocess_webhook_message_update Discarding notification. webhook_endpoint is undefined.") + logger.debug("WHprocess_webhook_message_update Discarding notification. webhook_endpoint is undefined.") return } - winston.verbose("Sending notification to webhook (message_deliver) on webhook_endpoint:" + this.webhook_endpoint); + logger.debug("Sending notification to webhook (message_deliver) on webhook_endpoint:" + this.webhook_endpoint); const message_id = message.message_id; const recipient_id = message.recipient; const app_id = message.app_id; @@ -429,45 +430,45 @@ class Webhooks { data: message, extras: {topic: topic} }; - winston.debug("WHprocess_webhook_message_received Sending JSON webhook:", json) + logger.debug("WHprocess_webhook_message_received Sending JSON webhook:", json) this.WHsendData(json, function(err, data) { if (err) { - winston.error("Err WHsendData callback", err); + logger.error("Err WHsendData callback", err); } else { - winston.debug("WHsendData sendata end with data:" + data); + logger.debug("WHsendData sendata end with data:" + data); } }) } WHprocess_webhook_conversation_archived(topic, payload, callback) { - winston.debug("process webhook_conversation_archived on topic" + topic) - winston.debug("process webhook_conversation_archived on payload" + payload) + logger.debug("process webhook_conversation_archived on topic" + topic) + logger.debug("process webhook_conversation_archived on payload" + payload) var conversation = JSON.parse(payload) - console.debug("conversation['temp_field_chat_topic']", conversation['temp_field_chat_topic']); + logger.debug("conversation['temp_field_chat_topic']", conversation['temp_field_chat_topic']); if (callback) { callback(true) } if (this.enabled===false) { - winston.debug("Discarding notification. webhook_enabled is false."); + logger.debug("Discarding notification. webhook_enabled is false."); // callback(true); return } if (!this.webhook_endpoint) { - winston.debug("WHprocess_webhook_conversation_archived: Discarding notification. webhook_endpoint is undefined.") + logger.debug("WHprocess_webhook_conversation_archived: Discarding notification. webhook_endpoint is undefined.") return } - winston.verbose("Sending notification to webhook (webhook_conversation_archived):", this.webhook_endpoint) + logger.debug("Sending notification to webhook (webhook_conversation_archived):", this.webhook_endpoint) if (!conversation['temp_field_chat_topic']) { - winston.debug("WHprocess_webhook_conversation_archived NO 'temp_field_chat_topic' error.") + logger.debug("WHprocess_webhook_conversation_archived NO 'temp_field_chat_topic' error.") } var topic_parts = conversation['temp_field_chat_topic'].split(".") - winston.debug("ARCHIVE. TOPIC PARTS:", topic_parts) + logger.debug("ARCHIVE. TOPIC PARTS:", topic_parts) if (topic_parts.length < 7) { - winston.debug("process_archive topic error. topic_parts.length < 7:" + topic) + logger.debug("process_archive topic error. topic_parts.length < 7:" + topic) return } const app_id = topic_parts[1]; @@ -486,12 +487,12 @@ class Webhooks { extras: {topic: conversation['temp_field_chat_topic']} }; delete conversation['temp_field_chat_topic']; - winston.debug("Sending JSON webhook:", json) + logger.debug("Sending JSON webhook:", json) this.WHsendData(json, function(err, data) { if (err) { - winston.error("Err WHsendData callback", err); + logger.error("Err WHsendData callback", err); } else { - winston.debug("WHsendData sendata end with data:" + data); + logger.debug("WHsendData sendata end with data:" + data); } }) } @@ -507,7 +508,7 @@ class Webhooks { WHsendData(json, callback) { var q = url.parse(this.webhook_endpoint, true); - winston.debug("ENV WEBHOOK URL PARSED:", q) + logger.debug("ENV WEBHOOK URL PARSED:", q) var protocol = (q.protocol == "http:") ? require('http') : require('https'); let options = { path: q.pathname, @@ -520,40 +521,40 @@ class Webhooks { }; try { const req = protocol.request(options, (response) => { - winston.debug("statusCode: "+ response.statusCode + " for webhook_endpoint: " + this.webhook_endpoint); + logger.debug("statusCode: "+ response.statusCode + " for webhook_endpoint: " + this.webhook_endpoint); if (response.statusCode < 200 || response.statusCode > 299) { // (I don"t know if the 3xx responses come here, if so you"ll want to handle them appropriately - winston.debug("http statusCode error "+ response.statusCode + " for webhook_endpoint: " + this.webhook_endpoint); + logger.debug("http statusCode error "+ response.statusCode + " for webhook_endpoint: " + this.webhook_endpoint); return callback({statusCode:response.statusCode}, null) } var respdata = '' response.on('data', (chunk) => { - // winston.debug("chunk"+chunk) + // logger.debug("chunk"+chunk) respdata += chunk; }); response.on('end', () => { - winston.info("WEBHOOK RESPONSE: "+ respdata + " for webhook_endpoint: " + this.webhook_endpoint); + logger.info("WEBHOOK RESPONSE: "+ respdata + " for webhook_endpoint: " + this.webhook_endpoint); return callback(null, respdata) //TODO SE IL WEBHOOK NN RITORNA SEMBRA CHE SI BLOCCI }); }); req.on('error', (err) => { - winston.error("WEBHOOK RESPONSE Error: ", err); + logger.error("WEBHOOK RESPONSE Error: ", err); return callback(err, null) }); req.write(JSON.stringify(json)); req.end(); - // winston.debug("end") + // logger.debug("end") } catch(err) { - winston.error("an error occurred while posting this json " + JSON.stringify(json), err) + logger.error("an error occurred while posting this json " + JSON.stringify(json), err) return callback(err, null) } } async whenConnected() { const resolve = await this.startPublisher(); - winston.info("webhook publisher started."); + logger.info("webhook publisher started."); this.startWorker(); - winston.info("webhook worker started."); + logger.info("webhook worker started."); return resolve; } @@ -563,10 +564,10 @@ class Webhooks { that.amqpConn.createConfirmChannel( (err, ch) => { if (that.closeOnErr(err)) return; ch.on("error", function (err) { - winston.error("[Webooks.AMQP] channel error", err); + logger.error("[Webooks.AMQP] channel error", err); }); ch.on("close", function () { - winston.debug("[Webooks.AMQP] channel closed"); + logger.debug("[Webooks.AMQP] channel closed"); }); that.pubChannel = ch; // if (that.offlinePubQueue.length > 0) { @@ -588,21 +589,21 @@ class Webhooks { closeOnErr(err) { if (!err) return false; - console.error("[Webooks.AMQP] error", err); + logger.error("[Webooks.AMQP] error", err); this.amqpConn.close(); return true; } startWorker() { - winston.debug("starting webhook worker."); + logger.debug("starting webhook worker."); this.amqpConn.createChannel((err, ch) => { this.channel = ch; if (this.closeOnErr(err)) return; ch.on("error", function (err) { - winston.error("[Webooks.AMQP] channel error", err); + logger.error("[Webooks.AMQP] channel error", err); }); ch.on("close", function () { - winston.debug("[Webooks.AMQP] channel closed"); + logger.debug("[Webooks.AMQP] channel closed"); }); ch.prefetch(10); ch.assertExchange(this.exchange, 'topic', { @@ -610,7 +611,7 @@ class Webhooks { }); ch.assertQueue(this.queue, { durable: true }, (err, _ok) => { if (this.closeOnErr(err)) return; - winston.debug("subscribed to _ok.queue: " + _ok.queue); + logger.debug("subscribed to _ok.queue: " + _ok.queue); this.subscribeTo(this.topic_webhook_message_deliver, ch, _ok.queue) this.subscribeTo(this.topic_webhook_message_update, ch, _ok.queue) this.subscribeTo(this.topic_webhook_conversation_archived, ch, _ok.queue) @@ -622,37 +623,37 @@ class Webhooks { subscribeTo(topic, channel, queue) { channel.bindQueue(queue, this.exchange, topic, {}, function (err, oka) { if (err) { - winston.error("Webooks.Error:", err, " binding on queue:", queue, "topic:", topic) + logger.error("Webooks.Error:", err, " binding on queue:", queue, "topic:", topic) } else { - winston.info("Webooks.bind: '" + queue + "' on topic: " + topic); + logger.info("Webooks.bind: '" + queue + "' on topic: " + topic); } }); } processMsg(msg) { - winston.debug("Webhooks.subscribeTo:" + this); + logger.debug("Webhooks.subscribeTo:" + this); this.work(msg, (ok) => { - winston.debug("Webhooks.worked."); + logger.debug("Webhooks.worked."); try { if (ok) this.channel.ack(msg); else this.channel.reject(msg, true); } catch (e) { - winston.debug("gin2:", e) + logger.debug("gin2:", e) this.closeOnErr(e); } }); } work(msg, callback) { - winston.debug("Webhooks.NEW TOPIC..." + msg.fields.routingKey) //, " message:", msg.content.toString()); + logger.debug("Webhooks.NEW TOPIC..." + msg.fields.routingKey) //, " message:", msg.content.toString()); const topic = msg.fields.routingKey //.replace(/[.]/g, '/'); const message_string = msg.content.toString(); if (topic.startsWith('observer.webhook.') && topic.endsWith('.message_deliver')) { // if (this.enabled === false) { - // winston.debug("work observer.webhook....message_received notification. webhook_enabled is false."); + // logger.debug("work observer.webhook....message_received notification. webhook_enabled is false."); // callback(true); // } else { this.WHprocess_webhook_message_deliver(topic, message_string, callback); @@ -660,7 +661,7 @@ class Webhooks { } else if (topic.startsWith('observer.webhook.') && topic.endsWith('.message_update')) { // if (this.enabled === false) { - // winston.debug("work observer.webhook....message_update notification. webhook_enabled is false."); + // logger.debug("work observer.webhook....message_update notification. webhook_enabled is false."); // callback(true); // } else { this.WHprocess_webhook_message_update(topic, message_string, callback); @@ -668,7 +669,7 @@ class Webhooks { } // else if (topic.startsWith('observer.webhook.') && topic.endsWith('.message_received')) { // if (this.enabled === false) { - // winston.debug("work observer.webhook....message_received notification. webhook_enabled is false."); + // logger.debug("work observer.webhook....message_received notification. webhook_enabled is false."); // callback(true); // } else { // this.WHprocess_webhook_message_received(topic, message_string, callback); @@ -676,21 +677,21 @@ class Webhooks { // } else if (topic.startsWith('observer.webhook.') && topic.endsWith('.conversation_archived')) { // if (this.enabled === false) { - // winston.debug("work observer.webhook....conversation_archived notification. webhook_enabled is false."); + // logger.debug("work observer.webhook....conversation_archived notification. webhook_enabled is false."); // callback(true); // } else { this.WHprocess_webhook_conversation_archived(topic, message_string, callback); // } } else { - winston.error("Webooks.unhandled topic:", topic) + logger.error("Webooks.unhandled topic:", topic) callback(true) } } start() { const that = this; - winston.info("Webhook config: ", this); + logger.info("Webhook config: ", this); return new Promise(function (resolve, reject) { return that.startMQ(resolve, reject); }); @@ -699,26 +700,26 @@ class Webhooks { startMQ(resolve, reject) { const that = this; - winston.debug("Webooks. Connecting to RabbitMQ...") + logger.debug("Webooks. Connecting to RabbitMQ...") amqp.connect(that.RABBITMQ_URI, (err, conn) => { if (err) { - winston.error("[Webooks.AMQP]", err); + logger.error("[Webooks.AMQP]", err); return setTimeout(() => { that.startMQ(resolve, reject) }, 1000); } conn.on("error", (err) => { if (err.message !== "Connection closing") { - winston.error("[Webooks.AMQP] conn error", err); + logger.error("[Webooks.AMQP] conn error", err); return reject(err); } }); conn.on("close", () => { - console.error("[Webooks.AMQP] reconnecting"); + logger.error("[Webooks.AMQP] reconnecting"); return setTimeout(() => { that.startMQ(resolve, reject) }, 1000); }); - winston.info("Webooks. AMQP connected.") + logger.info("Webooks. AMQP connected.") that.amqpConn = conn; that.whenConnected().then(function(ch) { - winston.debug("Webooks. whenConnected() returned") + logger.debug("Webooks. whenConnected() returned") resolve({conn: conn, ch: ch}); }); }); @@ -727,10 +728,10 @@ class Webhooks { publish(exchange, routingKey, content, callback) { try { - winston.debug("Webooks.TRYING TO PUB...") + logger.debug("Webooks.TRYING TO PUB...") this.pubChannel.publish(exchange, routingKey, content, { persistent: true }, (err, ok) => { if (err) { - console.error("[Webooks.AMQP] publish ERROR:", err); + logger.error("[Webooks.AMQP] publish ERROR:", err); this.offlinePubQueue.push([exchange, routingKey, content]); this.pubChannel.connection.close(); callback(err) @@ -740,7 +741,7 @@ class Webhooks { } }); } catch (e) { - console.error("[Webooks.AMQP] publish CATCHED ERROR:", e); + logger.error("[Webooks.AMQP] publish CATCHED ERROR:", e); this.offlinePubQueue.push([exchange, routingKey, content]); callback(e) }