diff --git a/app-shell-odd/src/actions.ts b/app-shell-odd/src/actions.ts index 92bef0b73f4..d1427d8468d 100644 --- a/app-shell-odd/src/actions.ts +++ b/app-shell-odd/src/actions.ts @@ -76,7 +76,6 @@ import { VALUE_UPDATED, VIEW_PROTOCOL_SOURCE_FOLDER, NOTIFY_SUBSCRIBE, - NOTIFY_UNSUBSCRIBE, ROBOT_MASS_STORAGE_DEVICE_ADDED, ROBOT_MASS_STORAGE_DEVICE_ENUMERATED, ROBOT_MASS_STORAGE_DEVICE_REMOVED, @@ -105,7 +104,6 @@ import type { AppRestartAction, NotifySubscribeAction, NotifyTopic, - NotifyUnsubscribeAction, ReloadUiAction, RobotMassStorageDeviceAdded, RobotMassStorageDeviceEnumerated, @@ -428,18 +426,6 @@ export const notifySubscribeAction = ( meta: { shell: true }, }) -export const notifyUnsubscribeAction = ( - hostname: string, - topic: NotifyTopic -): NotifyUnsubscribeAction => ({ - type: NOTIFY_UNSUBSCRIBE, - payload: { - hostname, - topic, - }, - meta: { shell: true }, -}) - export function startDiscovery( timeout: number | null = null ): StartDiscoveryAction { diff --git a/app-shell-odd/src/constants.ts b/app-shell-odd/src/constants.ts index c76f302c130..788fdf70cd7 100644 --- a/app-shell-odd/src/constants.ts +++ b/app-shell-odd/src/constants.ts @@ -225,8 +225,6 @@ export const ROBOT_MASS_STORAGE_DEVICE_ENUMERATED: 'shell:ROBOT_MASS_STORAGE_DEV 'shell:ROBOT_MASS_STORAGE_DEVICE_ENUMERATED' export const NOTIFY_SUBSCRIBE: 'shell:NOTIFY_SUBSCRIBE' = 'shell:NOTIFY_SUBSCRIBE' -export const NOTIFY_UNSUBSCRIBE: 'shell:NOTIFY_UNSUBSCRIBE' = - 'shell:NOTIFY_UNSUBSCRIBE' // copy // TODO(mc, 2020-05-11): i18n @@ -252,3 +250,8 @@ export const HTTP_API_VERSION: 3 = 3 export const SEND_READY_STATUS: 'shell:SEND_READY_STATUS' = 'shell:SEND_READY_STATUS' + +export const FAILURE_STATUSES = { + ECONNREFUSED: 'ECONNREFUSED', + ECONNFAILED: 'ECONNFAILED', +} as const diff --git a/app-shell-odd/src/main.ts b/app-shell-odd/src/main.ts index f536f56f96c..eaea1768078 100644 --- a/app-shell-odd/src/main.ts +++ b/app-shell-odd/src/main.ts @@ -23,7 +23,11 @@ import { } from './config' import systemd from './systemd' import { watchForMassStorage } from './usb' -import { registerNotify, closeAllNotifyConnections } from './notify' +import { + registerNotify, + establishBrokerConnection, + closeBrokerConnection, +} from './notifications' import type { BrowserWindow } from 'electron' import type { Dispatch, Logger } from './types' @@ -58,7 +62,7 @@ if (config.devtools) app.once('ready', installDevtools) app.once('window-all-closed', () => { log.debug('all windows closed, quitting the app') - closeAllNotifyConnections() + closeBrokerConnection() .then(() => { app.quit() }) @@ -96,7 +100,7 @@ function startUp(): void { mainWindow = createUi(dispatch) rendererLogger = createRendererLogger() - + void establishBrokerConnection() mainWindow.once('closed', () => (mainWindow = null)) log.info('Fetching latest software version') diff --git a/app-shell-odd/src/notifications/connect.ts b/app-shell-odd/src/notifications/connect.ts new file mode 100644 index 00000000000..67df09de466 --- /dev/null +++ b/app-shell-odd/src/notifications/connect.ts @@ -0,0 +1,121 @@ +import mqtt from 'mqtt' + +import { connectionStore } from './store' +import { + sendDeserialized, + sendDeserializedGenericError, + deserializeExpectedMessages, +} from './deserialize' +import { unsubscribe } from './unsubscribe' +import { notifyLog } from './notifyLog' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' + +// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied. +const CLIENT_ID = 'odd-' + Math.random().toString(16).slice(2, 8) // Derived from mqttjs +const connectOptions: mqtt.IClientOptions = { + clientId: CLIENT_ID, + port: 1883, + keepalive: 60, + protocolVersion: 5, + reconnectPeriod: 1000, + connectTimeout: 30 * 1000, + clean: true, + resubscribe: true, +} + +export function connectAsync(brokerURL: string): Promise { + const client = mqtt.connect(brokerURL, connectOptions) + + return new Promise((resolve, reject) => { + // Listeners added to client to trigger promise resolution + const promiseListeners: { + [key: string]: (...args: any[]) => void + } = { + connect: () => { + removePromiseListeners() + return resolve(client) + }, + // A connection error event will close the connection without a retry. + error: (error: Error | string) => { + removePromiseListeners() + const clientEndPromise = new Promise((resolve, reject) => + client.end(true, {}, () => resolve(error)) + ) + return clientEndPromise.then(() => reject(error)) + }, + end: () => promiseListeners.error(`Couldn't connect to ${brokerURL}`), + } + + function removePromiseListeners(): void { + Object.keys(promiseListeners).forEach(eventName => { + client.removeListener(eventName, promiseListeners[eventName]) + }) + } + + Object.keys(promiseListeners).forEach(eventName => { + client.on(eventName, promiseListeners[eventName]) + }) + }) +} + +export function establishListeners(): void { + const client = connectionStore.client as mqtt.MqttClient + const { ip, robotName } = connectionStore + + client.on( + 'message', + (topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => { + deserializeExpectedMessages(message.toString()) + .then(deserializedMessage => { + const messageContainsUnsubFlag = 'unsubscribe' in deserializedMessage + if (messageContainsUnsubFlag) { + void unsubscribe(topic).catch((error: Error) => + notifyLog.debug(error.message) + ) + } + + notifyLog.debug('Received notification data from main via IPC', { + ip, + topic, + }) + + sendDeserialized(topic, deserializedMessage) + }) + .catch(error => notifyLog.debug(`${error.message}`)) + } + ) + + client.on('reconnect', () => { + notifyLog.debug(`Attempting to reconnect to ${robotName} on ${ip}`) + }) + // handles transport layer errors only + client.on('error', error => { + notifyLog.warn(`Error - ${error.name}: ${error.message}`) + sendDeserializedGenericError('ALL_TOPICS') + client.end() + }) + + client.on('end', () => { + notifyLog.debug(`Closed connection to ${robotName} on ${ip}`) + // Marking the connection as failed with a generic error status lets the connection re-establish in the future + // and tells the browser to fall back to polling (assuming this 'end' event isn't caused by the app closing). + void connectionStore.setErrorStatus() + }) + + client.on('disconnect', packet => { + notifyLog.warn( + `Disconnected from ${robotName} on ${ip} with code ${ + packet.reasonCode ?? 'undefined' + }` + ) + sendDeserializedGenericError('ALL_TOPICS') + }) +} + +export function closeConnectionForcefully(): Promise { + const { client } = connectionStore + return new Promise((resolve, reject) => + client?.end(true, {}, () => resolve()) + ) +} diff --git a/app-shell-odd/src/notifications/deserialize.ts b/app-shell-odd/src/notifications/deserialize.ts new file mode 100644 index 00000000000..4539bc97faa --- /dev/null +++ b/app-shell-odd/src/notifications/deserialize.ts @@ -0,0 +1,62 @@ +import isEqual from 'lodash/isEqual' + +import { connectionStore } from './store' + +import type { + NotifyBrokerResponses, + NotifyRefetchData, + NotifyResponseData, + NotifyTopic, + NotifyUnsubscribeData, +} from '@opentrons/app/src/redux/shell/types' +import { FAILURE_STATUSES } from '../constants' + +const VALID_NOTIFY_RESPONSES: [NotifyRefetchData, NotifyUnsubscribeData] = [ + { refetchUsingHTTP: true }, + { unsubscribe: true }, +] + +export function sendDeserialized( + topic: NotifyTopic, + message: NotifyResponseData +): void { + try { + const browserWindow = connectionStore.getBrowserWindow() + browserWindow?.webContents.send( + 'notify', + connectionStore.ip, + topic, + message + ) + } catch {} // Prevents shell erroring during app shutdown event. +} + +export function sendDeserializedGenericError(topic: NotifyTopic): void { + sendDeserialized(topic, FAILURE_STATUSES.ECONNFAILED) +} + +export function deserializeExpectedMessages( + message: string +): Promise { + return new Promise((resolve, reject) => { + let deserializedMessage: NotifyResponseData | Record + const error = new Error( + `Unexpected data received from notify broker: ${message}` + ) + + try { + deserializedMessage = JSON.parse(message) + } catch { + reject(error) + } + + const isValidNotifyResponse = VALID_NOTIFY_RESPONSES.some(model => + isEqual(model, deserializedMessage) + ) + if (!isValidNotifyResponse) { + reject(error) + } else { + resolve(JSON.parse(message)) + } + }) +} diff --git a/app-shell-odd/src/notifications/index.ts b/app-shell-odd/src/notifications/index.ts new file mode 100644 index 00000000000..cce5758de72 --- /dev/null +++ b/app-shell-odd/src/notifications/index.ts @@ -0,0 +1,65 @@ +import { connectionStore } from './store' +import { + connectAsync, + establishListeners, + closeConnectionForcefully, +} from './connect' +import { subscribe } from './subscribe' +import { notifyLog } from './notifyLog' + +import type { BrowserWindow } from 'electron' +import type { Action, Dispatch } from '../types' + +// Manages the MQTT broker connection through a connection store. Subscriptions are handled "lazily" - a component must +// dispatch a subscribe action before a subscription request is made to the broker. Unsubscribe requests only occur if +// the broker sends an "unsubscribe" flag. Pending subs and unsubs are used to prevent unnecessary network and broker load. + +export function registerNotify( + dispatch: Dispatch, + mainWindow: BrowserWindow +): (action: Action) => unknown { + // Because of the ODD's start sequence, the browser window will always be defined before relevant actions occur. + if (connectionStore.getBrowserWindow() == null) { + connectionStore.setBrowserWindow(mainWindow) + } + + return function handleAction(action: Action) { + switch (action.type) { + case 'shell:NOTIFY_SUBSCRIBE': + return subscribe(action.payload.topic) + } + } +} + +export function establishBrokerConnection(): Promise { + const { ip, robotName } = connectionStore + + return connectAsync(`mqtt://${connectionStore.ip}`) + .then(client => { + notifyLog.debug(`Successfully connected to ${robotName} on ${ip}`) + void connectionStore + .setConnected(client) + .then(() => establishListeners()) + .catch((error: Error) => notifyLog.debug(error.message)) + }) + .catch((error: Error) => { + notifyLog.warn( + `Failed to connect to ${robotName} on ${ip} - ${error.name}: ${error.message} ` + ) + void connectionStore.setErrorStatus() + }) +} + +export function closeBrokerConnection(): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => { + reject(Error('Failed to close the connection within the time limit.')) + }, 2000) + + notifyLog.debug( + `Stopping notify service connection for host ${connectionStore.robotName}` + ) + const closeConnection = closeConnectionForcefully() + closeConnection.then(resolve).catch(reject) + }) +} diff --git a/app-shell-odd/src/notifications/notifyLog.ts b/app-shell-odd/src/notifications/notifyLog.ts new file mode 100644 index 00000000000..35507fa2c2a --- /dev/null +++ b/app-shell-odd/src/notifications/notifyLog.ts @@ -0,0 +1,3 @@ +import { createLogger } from '../log' + +export const notifyLog = createLogger('notify') diff --git a/app-shell-odd/src/notifications/store.ts b/app-shell-odd/src/notifications/store.ts new file mode 100644 index 00000000000..9553fba3af4 --- /dev/null +++ b/app-shell-odd/src/notifications/store.ts @@ -0,0 +1,128 @@ +import type mqtt from 'mqtt' + +import { FAILURE_STATUSES } from '../constants' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' +import type { BrowserWindow } from 'electron' + +type FailedConnStatus = typeof FAILURE_STATUSES[keyof typeof FAILURE_STATUSES] + +/** + * @description Manages the internal state of MQTT connections to various robot hosts. + */ +class ConnectionStore { + public readonly ip = '127.0.0.1' + + public readonly robotName = 'LOCALHOST' + + public client: mqtt.MqttClient | null = null + + private readonly subscriptions: Set = new Set() + + private readonly pendingSubs: Set = new Set() + + private readonly pendingUnsubs: Set = new Set() + + private unreachableStatus: FailedConnStatus | null = null + + private browserWindow: BrowserWindow | null = null + + public getBrowserWindow(): BrowserWindow | null { + return this.browserWindow + } + + /** + * @returns {FailedConnStatus} "ECONNREFUSED" is a proxy for a port block error and is only returned once + * for analytics reasons. Afterward, a generic "ECONNFAILED" is returned. + */ + public getFailedConnectionStatus(): FailedConnStatus | null { + const failureStatus = this.unreachableStatus + if (failureStatus === FAILURE_STATUSES.ECONNREFUSED) { + this.unreachableStatus = FAILURE_STATUSES.ECONNFAILED + } + return failureStatus + } + + public setBrowserWindow(window: BrowserWindow): void { + this.browserWindow = window + } + + public setConnected(client: mqtt.MqttClient): Promise { + return new Promise((resolve, reject) => { + if (this.client == null) { + this.client = client + resolve() + } else { + reject(new Error(`Connection already exists for ${this.robotName}`)) + } + }) + } + + /** + * @description Marks the host as unreachable. Don't report ECONNREFUSED, since while this is a good enough proxy + * for port block events, it's not perfect, and a port block event can never actually occur on the ODD. + */ + public setErrorStatus(): Promise { + return new Promise((resolve, reject) => { + this.unreachableStatus = FAILURE_STATUSES.ECONNFAILED + resolve() + }) + } + + public setSubStatus( + topic: NotifyTopic, + status: 'pending' | 'subscribed' + ): Promise { + return new Promise((resolve, reject) => { + if (status === 'pending') { + this.pendingSubs.add(topic) + } else { + this.subscriptions.add(topic) + this.pendingSubs.delete(topic) + } + resolve() + }) + } + + public setUnsubStatus( + topic: NotifyTopic, + status: 'pending' | 'unsubscribed' + ): Promise { + return new Promise((resolve, reject) => { + if (this.subscriptions.has(topic)) { + if (status === 'pending') { + this.pendingUnsubs.add(topic) + } else { + this.pendingUnsubs.delete(topic) + this.subscriptions.delete(topic) + } + } + resolve() + }) + } + + public isConnectedToBroker(): boolean { + return this.client?.connected ?? false + } + + public isPendingSub(topic: NotifyTopic): boolean { + return this.pendingSubs.has(topic) + } + + public isActiveSub(topic: NotifyTopic): boolean { + return this.subscriptions.has(topic) + } + + public isPendingUnsub(topic: NotifyTopic): boolean { + return this.pendingUnsubs.has(topic) + } + + /** + * @description A broker connection is terminated if it is errored or not present in the store. + */ + public isConnectionTerminated(): boolean { + return this.unreachableStatus != null + } +} + +export const connectionStore = new ConnectionStore() diff --git a/app-shell-odd/src/notifications/subscribe.ts b/app-shell-odd/src/notifications/subscribe.ts new file mode 100644 index 00000000000..6e334cb89c9 --- /dev/null +++ b/app-shell-odd/src/notifications/subscribe.ts @@ -0,0 +1,120 @@ +import mqtt from 'mqtt' + +import { connectionStore } from './store' +import { sendDeserialized, sendDeserializedGenericError } from './deserialize' +import { notifyLog } from './notifyLog' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' + +/** + * @property {number} qos: "Quality of Service", "at least once". Because we use React Query, which does not trigger + a render update event if duplicate data is received, we can avoid the additional overhead of guaranteeing "exactly once" delivery. + */ +const subscribeOptions: mqtt.IClientSubscribeOptions = { + qos: 1, +} + +const CHECK_CONNECTION_INTERVAL = 500 + +export function subscribe(topic: NotifyTopic): Promise { + if (connectionStore.isConnectionTerminated()) { + const errorMessage = connectionStore.getFailedConnectionStatus() + if (errorMessage != null) { + sendDeserialized(topic, errorMessage) + } + return Promise.resolve() + } else { + return waitUntilActiveOrErrored('client') + .then(() => { + const { client } = connectionStore + if (client == null) { + return Promise.reject(new Error('Expected hostData, received null.')) + } + + if ( + !connectionStore.isActiveSub(topic) && + !connectionStore.isPendingSub(topic) + ) { + connectionStore + .setSubStatus(topic, 'pending') + .then( + () => + new Promise(() => { + client.subscribe(topic, subscribeOptions, subscribeCb) + }) + ) + .catch((error: Error) => notifyLog.debug(error.message)) + } else { + void waitUntilActiveOrErrored('subscription', topic).catch( + (error: Error) => { + notifyLog.debug(error.message) + sendDeserializedGenericError(topic) + } + ) + } + }) + .catch((error: Error) => { + notifyLog.debug(error.message) + sendDeserializedGenericError(topic) + }) + } + + function subscribeCb(error: Error, result: mqtt.ISubscriptionGrant[]): void { + const { robotName, ip } = connectionStore + + if (error != null) { + sendDeserializedGenericError(topic) + notifyLog.debug( + `Failed to subscribe to ${robotName} on ${ip} to topic: ${topic}` + ) + } else { + notifyLog.debug( + `Successfully subscribed to ${robotName} on ${ip} to topic: ${topic}` + ) + connectionStore + .setSubStatus(topic, 'subscribed') + .catch((error: Error) => notifyLog.debug(error.message)) + } + } +} + +// Check every 500ms for 2 seconds before failing. +function waitUntilActiveOrErrored( + connection: 'client' | 'subscription', + topic?: NotifyTopic +): Promise { + return new Promise((resolve, reject) => { + if (connection === 'subscription') { + if (topic == null) { + reject( + new Error( + 'Must specify a topic when connection is type "subscription".' + ) + ) + } + } + + const MAX_RETRIES = 4 + let counter = 0 + const intervalId = setInterval(() => { + const hasReceivedAck = + connection === 'client' + ? connectionStore.isConnectedToBroker() + : connectionStore.isActiveSub(topic as NotifyTopic) + if (hasReceivedAck) { + clearInterval(intervalId) + resolve() + } + + counter++ + if (counter === MAX_RETRIES) { + clearInterval(intervalId) + reject( + new Error( + `Maximum number of retries exceeded for ${connectionStore.robotName} on ${connectionStore.ip}.` + ) + ) + } + }, CHECK_CONNECTION_INTERVAL) + }) +} diff --git a/app-shell-odd/src/notifications/unsubscribe.ts b/app-shell-odd/src/notifications/unsubscribe.ts new file mode 100644 index 00000000000..da9d0935ed2 --- /dev/null +++ b/app-shell-odd/src/notifications/unsubscribe.ts @@ -0,0 +1,36 @@ +import { connectionStore } from './store' +import { notifyLog } from './notifyLog' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' + +export function unsubscribe(topic: NotifyTopic): Promise { + return new Promise((resolve, reject) => { + if (!connectionStore.isPendingUnsub(topic)) { + connectionStore + .setUnsubStatus(topic, 'pending') + .then(() => { + const { client } = connectionStore + if (client == null) { + return reject(new Error('Expected hostData, received null.')) + } + + client.unsubscribe(topic, {}, (error, result) => { + const { robotName, ip } = connectionStore + if (error != null) { + notifyLog.debug( + `Failed to unsubscribe to ${robotName} on ${ip} from topic: ${topic}` + ) + } else { + notifyLog.debug( + `Successfully unsubscribed to ${robotName} on ${ip} from topic: ${topic}` + ) + connectionStore + .setUnsubStatus(topic, 'unsubscribed') + .catch((error: Error) => notifyLog.debug(error.message)) + } + }) + }) + .catch((error: Error) => notifyLog.debug(error.message)) + } + }) +} diff --git a/app-shell-odd/src/notify.ts b/app-shell-odd/src/notify.ts deleted file mode 100644 index f88280369a0..00000000000 --- a/app-shell-odd/src/notify.ts +++ /dev/null @@ -1,455 +0,0 @@ -/* eslint-disable @typescript-eslint/no-dynamic-delete */ -import mqtt from 'mqtt' -import isEqual from 'lodash/isEqual' - -import { createLogger } from './log' - -import type { BrowserWindow } from 'electron' -import type { - NotifyTopic, - NotifyResponseData, - NotifyRefetchData, - NotifyUnsubscribeData, - NotifyNetworkError, -} from '@opentrons/app/src/redux/shell/types' -import type { Action, Dispatch } from './types' - -// TODO(jh, 2024-03-01): after refactoring notify connectivity and subscription logic, uncomment logs. - -// Manages MQTT broker connections via a connection store, establishing a connection to the broker only if a connection does not -// already exist, and disconnects from the broker when the app is not subscribed to any topics for the given broker. -// A redundant connection to the same broker results in the older connection forcibly closing, which we want to avoid. -// However, redundant subscriptions are permitted and result in the broker sending the retained message for that topic. -// To mitigate redundant connections, the connection manager eagerly adds the host, removing the host if the connection fails. - -const FAILURE_STATUSES = { - ECONNREFUSED: 'ECONNREFUSED', - ECONNFAILED: 'ECONNFAILED', -} as const - -interface ConnectionStore { - [hostname: string]: { - client: mqtt.MqttClient | null - subscriptions: Record - pendingSubs: Set - } -} - -const connectionStore: ConnectionStore = {} -const unreachableHosts = new Set() -const log = createLogger('notify') -// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied. -// This clientId is derived from the mqttjs library. -const CLIENT_ID = 'odd-' + Math.random().toString(16).slice(2, 8) - -const connectOptions: mqtt.IClientOptions = { - clientId: CLIENT_ID, - port: 1883, - keepalive: 60, - protocolVersion: 5, - reconnectPeriod: 1000, - connectTimeout: 30 * 1000, - clean: true, - resubscribe: true, -} - -/** - * @property {number} qos: "Quality of Service", "at least once". Because we use React Query, which does not trigger - a render update event if duplicate data is received, we can avoid the additional overhead - to guarantee "exactly once" delivery. - */ -const subscribeOptions: mqtt.IClientSubscribeOptions = { - qos: 1, -} - -export function registerNotify( - dispatch: Dispatch, - mainWindow: BrowserWindow -): (action: Action) => unknown { - return function handleAction(action: Action) { - switch (action.type) { - case 'shell:NOTIFY_SUBSCRIBE': - return subscribe({ - ...action.payload, - browserWindow: mainWindow, - hostname: '127.0.0.1', - }) - - case 'shell:NOTIFY_UNSUBSCRIBE': - return unsubscribe({ - ...action.payload, - browserWindow: mainWindow, - hostname: '127.0.0.1', - }) - } - } -} - -const CHECK_CONNECTION_INTERVAL = 500 -let hasReportedAPortBlockEvent = false - -interface NotifyParams { - browserWindow: BrowserWindow - hostname: string - topic: NotifyTopic -} - -function subscribe(notifyParams: NotifyParams): Promise { - const { hostname, topic, browserWindow } = notifyParams - if (unreachableHosts.has(hostname)) { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - return Promise.resolve() - } - // true if no subscription (and therefore connection) to host exists - else if (connectionStore[hostname] == null) { - connectionStore[hostname] = { - client: null, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - subscriptions: { [topic]: 1 } as Record, - pendingSubs: new Set(), - } - return connectAsync(`mqtt://${hostname}`) - .then(client => { - const { pendingSubs } = connectionStore[hostname] - log.info(`Successfully connected to ${hostname}`) - connectionStore[hostname].client = client - pendingSubs.add(topic) - establishListeners({ ...notifyParams, client }) - return new Promise(() => { - client.subscribe(topic, subscribeOptions, subscribeCb) - pendingSubs.delete(topic) - }) - }) - .catch((error: Error) => { - log.warn( - `Failed to connect to ${hostname} - ${error.name}: ${error.message} ` - ) - let failureMessage: NotifyNetworkError = FAILURE_STATUSES.ECONNFAILED - if (connectionStore[hostname]?.client == null) { - unreachableHosts.add(hostname) - if ( - error.message.includes(FAILURE_STATUSES.ECONNREFUSED) && - !hasReportedAPortBlockEvent - ) { - failureMessage = FAILURE_STATUSES.ECONNREFUSED - hasReportedAPortBlockEvent = true - } - } - - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: failureMessage, - }) - if (hostname in connectionStore) delete connectionStore[hostname] - }) - } - // true if the connection store has an entry for the hostname. - else { - return waitUntilActiveOrErrored('client') - .then(() => { - const { client, subscriptions, pendingSubs } = connectionStore[hostname] - const activeClient = client as mqtt.Client - const isNotActiveSubscription = (subscriptions[topic] ?? 0) <= 0 - if (!pendingSubs.has(topic) && isNotActiveSubscription) { - pendingSubs.add(topic) - return new Promise(() => { - activeClient.subscribe(topic, subscribeOptions, subscribeCb) - pendingSubs.delete(topic) - }) - } else { - void waitUntilActiveOrErrored('subscription') - .then(() => { - subscriptions[topic] += 1 - }) - .catch(() => { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - }) - } - }) - .catch(() => { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - }) - } - function subscribeCb(error: Error, result: mqtt.ISubscriptionGrant[]): void { - const { subscriptions } = connectionStore[hostname] - if (error != null) { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - setTimeout(() => { - if (Object.keys(connectionStore[hostname].subscriptions).length <= 0) { - connectionStore[hostname].client?.end() - } - }, RENDER_TIMEOUT) - } else { - if (subscriptions[topic] > 0) { - subscriptions[topic] += 1 - } else { - subscriptions[topic] = 1 - } - } - } - - // Check every 500ms for 2 seconds before failing. - function waitUntilActiveOrErrored( - connection: 'client' | 'subscription' - ): Promise { - return new Promise((resolve, reject) => { - const MAX_RETRIES = 4 - let counter = 0 - const intervalId = setInterval(() => { - const host = connectionStore[hostname] - const hasReceivedAck = - connection === 'client' - ? host?.client != null - : host?.subscriptions[topic] > 0 - if (hasReceivedAck) { - clearInterval(intervalId) - resolve() - } - - counter++ - if (counter === MAX_RETRIES) { - clearInterval(intervalId) - reject(new Error('Maximum subscription retries exceeded.')) - } - }, CHECK_CONNECTION_INTERVAL) - }) - } -} - -// Because subscription logic is directly tied to the component lifecycle, it is possible -// for a component to trigger an unsubscribe event on dismount while a new component mounts and -// triggers a subscribe event. For the connection store and MQTT to reflect correct topic subscriptions, -// do not unsubscribe and close connections before newly mounted components have had time to update the connection store. -const RENDER_TIMEOUT = 10000 // 10 seconds - -function unsubscribe(notifyParams: NotifyParams): Promise { - const { hostname, topic } = notifyParams - return new Promise(() => { - setTimeout(() => { - if (hostname in connectionStore) { - const { client } = connectionStore[hostname] - const subscriptions = connectionStore[hostname]?.subscriptions - const isLastSubscription = subscriptions[topic] <= 1 - - if (isLastSubscription) { - client?.unsubscribe(topic, {}, (error, result) => { - if (error == null) { - handleDecrementSubscriptionCount(hostname, topic) - } else { - log.warn(`Failed to subscribe on ${hostname} to topic: ${topic}`) - } - }) - } else { - subscriptions[topic] -= 1 - } - } - }, RENDER_TIMEOUT) - }) -} - -function connectAsync(brokerURL: string): Promise { - const client = mqtt.connect(brokerURL, connectOptions) - - return new Promise((resolve, reject) => { - // Listeners added to client to trigger promise resolution - const promiseListeners: { - [key: string]: (...args: any[]) => void - } = { - connect: () => { - removePromiseListeners() - return resolve(client) - }, - // A connection error event will close the connection without a retry. - error: (error: Error | string) => { - removePromiseListeners() - const clientEndPromise = new Promise((resolve, reject) => - client.end(true, {}, () => resolve(error)) - ) - return clientEndPromise.then(() => reject(error)) - }, - end: () => promiseListeners.error(`Couldn't connect to ${brokerURL}`), - } - - function removePromiseListeners(): void { - Object.keys(promiseListeners).forEach(eventName => { - client.removeListener(eventName, promiseListeners[eventName]) - }) - } - - Object.keys(promiseListeners).forEach(eventName => { - client.on(eventName, promiseListeners[eventName]) - }) - }) -} - -function handleDecrementSubscriptionCount( - hostname: string, - topic: NotifyTopic -): void { - const host = connectionStore[hostname] - if (host) { - const { client, subscriptions } = host - if (topic in subscriptions) { - subscriptions[topic] -= 1 - if (subscriptions[topic] <= 0) { - delete subscriptions[topic] - } - } - - if (Object.keys(subscriptions).length <= 0) { - client?.end() - } - } -} - -interface ListenerParams { - client: mqtt.MqttClient - browserWindow: BrowserWindow - hostname: string -} - -function establishListeners({ - client, - browserWindow, - hostname, -}: ListenerParams): void { - client.on( - 'message', - (topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => { - deserialize(message.toString()) - .then(deserializedMessage => { - log.debug('Received notification data from main via IPC', { - hostname, - topic, - }) - - browserWindow.webContents.send( - 'notify', - hostname, - topic, - deserializedMessage - ) - }) - .catch(error => log.debug(`${error.message}`)) - } - ) - - client.on('reconnect', () => { - log.info(`Attempting to reconnect to ${hostname}`) - }) - // handles transport layer errors only - client.on('error', error => { - log.warn(`Error - ${error.name}: ${error.message}`) - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic: 'ALL_TOPICS', - message: FAILURE_STATUSES.ECONNFAILED, - }) - client.end() - }) - - client.on('end', () => { - log.info(`Closed connection to ${hostname}`) - if (hostname in connectionStore) delete connectionStore[hostname] - }) - - client.on('disconnect', packet => { - log.warn( - `Disconnected from ${hostname} with code ${ - packet.reasonCode ?? 'undefined' - }` - ) - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic: 'ALL_TOPICS', - message: FAILURE_STATUSES.ECONNFAILED, - }) - }) -} - -export function closeAllNotifyConnections(): Promise { - return new Promise((resolve, reject) => { - setTimeout(() => { - reject(Error('Failed to close all connections within the time limit.')) - }, 2000) - - log.debug('Stopping notify service connections') - const closeConnections = Object.values(connectionStore).map( - ({ client }) => { - return new Promise((resolve, reject) => { - client?.end(true, {}, () => resolve(null)) - }) - } - ) - Promise.all(closeConnections).then(resolve).catch(reject) - }) -} - -interface SendToBrowserParams { - browserWindow: BrowserWindow - hostname: string - topic: NotifyTopic - message: NotifyResponseData -} - -function sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message, -}: SendToBrowserParams): void { - browserWindow.webContents.send('notify', hostname, topic, message) -} - -const VALID_MODELS: [NotifyRefetchData, NotifyUnsubscribeData] = [ - { refetchUsingHTTP: true }, - { unsubscribe: true }, -] - -function deserialize(message: string): Promise { - return new Promise((resolve, reject) => { - let deserializedMessage: NotifyResponseData | Record - const error = new Error( - `Unexpected data received from notify broker: ${message}` - ) - - try { - deserializedMessage = JSON.parse(message) - } catch { - reject(error) - } - - const isValidNotifyResponse = VALID_MODELS.some(model => - isEqual(model, deserializedMessage) - ) - if (!isValidNotifyResponse) { - reject(error) - } else { - resolve(JSON.parse(message)) - } - }) -} diff --git a/app-shell/src/notifications/store.ts b/app-shell/src/notifications/store.ts index 63195d62d23..9968080258e 100644 --- a/app-shell/src/notifications/store.ts +++ b/app-shell/src/notifications/store.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/no-dynamic-delete */ import type mqtt from 'mqtt' import { FAILURE_STATUSES } from '../constants' diff --git a/app/src/redux/shell/__tests__/actions.test.ts b/app/src/redux/shell/__tests__/actions.test.ts index 7edf16f1b64..127e64503e0 100644 --- a/app/src/redux/shell/__tests__/actions.test.ts +++ b/app/src/redux/shell/__tests__/actions.test.ts @@ -1,10 +1,6 @@ import { describe, it, expect } from 'vitest' -import { - uiInitialized, - notifySubscribeAction, - notifyUnsubscribeAction, -} from '../actions' +import { uiInitialized, notifySubscribeAction } from '../actions' import type { NotifyTopic } from '../types' @@ -28,14 +24,4 @@ describe('shell actions', () => { meta: { shell: true }, }) }) - it('should be able to create an UNSUBSCRIBE action', () => { - expect(notifyUnsubscribeAction(MOCK_HOSTNAME, MOCK_TOPIC)).toEqual({ - type: 'shell:NOTIFY_UNSUBSCRIBE', - payload: { - hostname: MOCK_HOSTNAME, - topic: MOCK_TOPIC, - }, - meta: { shell: true }, - }) - }) }) diff --git a/app/src/redux/shell/actions.ts b/app/src/redux/shell/actions.ts index cf910b5c67e..7922eebef4c 100644 --- a/app/src/redux/shell/actions.ts +++ b/app/src/redux/shell/actions.ts @@ -9,7 +9,6 @@ import type { RobotMassStorageDeviceEnumerated, RobotMassStorageDeviceRemoved, NotifySubscribeAction, - NotifyUnsubscribeAction, NotifyTopic, } from './types' @@ -31,8 +30,6 @@ export const ROBOT_MASS_STORAGE_DEVICE_ENUMERATED: 'shell:ROBOT_MASS_STORAGE_DEV 'shell:ROBOT_MASS_STORAGE_DEVICE_ENUMERATED' export const NOTIFY_SUBSCRIBE: 'shell:NOTIFY_SUBSCRIBE' = 'shell:NOTIFY_SUBSCRIBE' -export const NOTIFY_UNSUBSCRIBE: 'shell:NOTIFY_UNSUBSCRIBE' = - 'shell:NOTIFY_UNSUBSCRIBE' export const uiInitialized = (): UiInitializedAction => ({ type: UI_INITIALIZED, @@ -124,15 +121,3 @@ export const notifySubscribeAction = ( }, meta: { shell: true }, }) - -export const notifyUnsubscribeAction = ( - hostname: string, - topic: NotifyTopic -): NotifyUnsubscribeAction => ({ - type: NOTIFY_UNSUBSCRIBE, - payload: { - hostname, - topic, - }, - meta: { shell: true }, -}) diff --git a/app/src/redux/shell/types.ts b/app/src/redux/shell/types.ts index df36124e7c1..1a4cb343d64 100644 --- a/app/src/redux/shell/types.ts +++ b/app/src/redux/shell/types.ts @@ -142,8 +142,6 @@ export type NotifyTopic = | 'robot-server/runs' | `robot-server/runs/${string}` -export type NotifyAction = 'subscribe' | 'unsubscribe' - export interface NotifySubscribeAction { type: 'shell:NOTIFY_SUBSCRIBE' payload: { @@ -153,15 +151,6 @@ export interface NotifySubscribeAction { meta: { shell: true } } -export interface NotifyUnsubscribeAction { - type: 'shell:NOTIFY_UNSUBSCRIBE' - payload: { - hostname: string - topic: NotifyTopic - } - meta: { shell: true } -} - export type ShellAction = | UiInitializedAction | ShellUpdateAction @@ -175,4 +164,3 @@ export type ShellAction = | RobotMassStorageDeviceEnumerated | RobotMassStorageDeviceRemoved | NotifySubscribeAction - | NotifyUnsubscribeAction diff --git a/app/src/resources/__tests__/useNotifyService.test.ts b/app/src/resources/__tests__/useNotifyService.test.ts index ad8628e3e87..1e2ba78c744 100644 --- a/app/src/resources/__tests__/useNotifyService.test.ts +++ b/app/src/resources/__tests__/useNotifyService.test.ts @@ -7,10 +7,7 @@ import { useHost } from '@opentrons/react-api-client' import { useNotifyService } from '../useNotifyService' import { appShellListener } from '../../redux/shell/remote' import { useTrackEvent } from '../../redux/analytics' -import { - notifySubscribeAction, - notifyUnsubscribeAction, -} from '../../redux/shell' +import { notifySubscribeAction } from '../../redux/shell' import { useIsFlex } from '../../organisms/Devices/hooks/useIsFlex' import type { Mock } from 'vitest' @@ -44,6 +41,7 @@ describe('useNotifyService', () => { vi.mocked(useDispatch).mockReturnValue(mockDispatch) vi.mocked(useHost).mockReturnValue(MOCK_HOST_CONFIG) vi.mocked(useIsFlex).mockReturnValue(true) + vi.mocked(appShellListener).mockClear() }) afterEach(() => { @@ -63,26 +61,9 @@ describe('useNotifyService', () => { expect(mockDispatch).toHaveBeenCalledWith( notifySubscribeAction(MOCK_HOST_CONFIG.hostname, MOCK_TOPIC) ) - expect(mockDispatch).not.toHaveBeenCalledWith( - notifyUnsubscribeAction(MOCK_HOST_CONFIG.hostname, MOCK_TOPIC) - ) expect(appShellListener).toHaveBeenCalled() }) - it('should trigger an unsubscribe action on dismount', () => { - const { unmount } = renderHook(() => - useNotifyService({ - topic: MOCK_TOPIC, - setRefetchUsingHTTP: mockHTTPRefetch, - options: MOCK_OPTIONS, - } as any) - ) - unmount() - expect(mockDispatch).toHaveBeenCalledWith( - notifyUnsubscribeAction(MOCK_HOST_CONFIG.hostname, MOCK_TOPIC) - ) - }) - it('should not subscribe to notifications if forceHttpPolling is true', () => { renderHook(() => useNotifyService({ diff --git a/app/src/resources/useNotifyService.ts b/app/src/resources/useNotifyService.ts index 63c887fb9b5..f6cfaefa2b8 100644 --- a/app/src/resources/useNotifyService.ts +++ b/app/src/resources/useNotifyService.ts @@ -5,7 +5,7 @@ import { useDispatch } from 'react-redux' import { useHost } from '@opentrons/react-api-client' import { appShellListener } from '../redux/shell/remote' -import { notifySubscribeAction, notifyUnsubscribeAction } from '../redux/shell' +import { notifySubscribeAction } from '../redux/shell' import { useTrackEvent, ANALYTICS_NOTIFICATION_PORT_BLOCK_ERROR, @@ -62,13 +62,12 @@ export function useNotifyService({ }) dispatch(notifySubscribeAction(hostname, topic)) hasUsedNotifyService.current = true - } else setRefetchUsingHTTP('always') + } else { + setRefetchUsingHTTP('always') + } return () => { if (hasUsedNotifyService.current) { - if (hostname != null) { - dispatch(notifyUnsubscribeAction(hostname, topic)) - } appShellListener({ hostname: hostname as string, topic,