Skip to content

Commit

Permalink
refactor(app-shell-odd): Utilize robot-server unsubscribe flags (#14724)
Browse files Browse the repository at this point in the history
Closes EXEC-319

This is the app-shell-odd equivalent of the app-shell refactor, #14648. It's similar to the app-shell logic, but significantly simpler, since we don't have to manage multiple robots, worry about localhost port blocking, and multiple IPs per robot. The real change lies in the initial connect and final disconnect on app shutdown. Otherwise, the changes are primarily in the ConnectionStore. Because the app no longer utilizes unsubscribe actions in any capacity, we can safely remove those references.
  • Loading branch information
mjhuff authored Mar 25, 2024
1 parent ede85ef commit 97e6f0d
Show file tree
Hide file tree
Showing 17 changed files with 554 additions and 543 deletions.
14 changes: 0 additions & 14 deletions app-shell-odd/src/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ import {
VALUE_UPDATED,
VIEW_PROTOCOL_SOURCE_FOLDER,
NOTIFY_SUBSCRIBE,
NOTIFY_UNSUBSCRIBE,
ROBOT_MASS_STORAGE_DEVICE_ADDED,
ROBOT_MASS_STORAGE_DEVICE_ENUMERATED,
ROBOT_MASS_STORAGE_DEVICE_REMOVED,
Expand Down Expand Up @@ -105,7 +104,6 @@ import type {
AppRestartAction,
NotifySubscribeAction,
NotifyTopic,
NotifyUnsubscribeAction,
ReloadUiAction,
RobotMassStorageDeviceAdded,
RobotMassStorageDeviceEnumerated,
Expand Down Expand Up @@ -428,18 +426,6 @@ export const notifySubscribeAction = (
meta: { shell: true },
})

export const notifyUnsubscribeAction = (
hostname: string,
topic: NotifyTopic
): NotifyUnsubscribeAction => ({
type: NOTIFY_UNSUBSCRIBE,
payload: {
hostname,
topic,
},
meta: { shell: true },
})

export function startDiscovery(
timeout: number | null = null
): StartDiscoveryAction {
Expand Down
7 changes: 5 additions & 2 deletions app-shell-odd/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ export const ROBOT_MASS_STORAGE_DEVICE_ENUMERATED: 'shell:ROBOT_MASS_STORAGE_DEV
'shell:ROBOT_MASS_STORAGE_DEVICE_ENUMERATED'
export const NOTIFY_SUBSCRIBE: 'shell:NOTIFY_SUBSCRIBE' =
'shell:NOTIFY_SUBSCRIBE'
export const NOTIFY_UNSUBSCRIBE: 'shell:NOTIFY_UNSUBSCRIBE' =
'shell:NOTIFY_UNSUBSCRIBE'

// copy
// TODO(mc, 2020-05-11): i18n
Expand All @@ -252,3 +250,8 @@ export const HTTP_API_VERSION: 3 = 3

export const SEND_READY_STATUS: 'shell:SEND_READY_STATUS' =
'shell:SEND_READY_STATUS'

export const FAILURE_STATUSES = {
ECONNREFUSED: 'ECONNREFUSED',
ECONNFAILED: 'ECONNFAILED',
} as const
10 changes: 7 additions & 3 deletions app-shell-odd/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import {
} from './config'
import systemd from './systemd'
import { watchForMassStorage } from './usb'
import { registerNotify, closeAllNotifyConnections } from './notify'
import {
registerNotify,
establishBrokerConnection,
closeBrokerConnection,
} from './notifications'

import type { BrowserWindow } from 'electron'
import type { Dispatch, Logger } from './types'
Expand Down Expand Up @@ -58,7 +62,7 @@ if (config.devtools) app.once('ready', installDevtools)

app.once('window-all-closed', () => {
log.debug('all windows closed, quitting the app')
closeAllNotifyConnections()
closeBrokerConnection()
.then(() => {
app.quit()
})
Expand Down Expand Up @@ -96,7 +100,7 @@ function startUp(): void {

mainWindow = createUi(dispatch)
rendererLogger = createRendererLogger()

void establishBrokerConnection()
mainWindow.once('closed', () => (mainWindow = null))

log.info('Fetching latest software version')
Expand Down
121 changes: 121 additions & 0 deletions app-shell-odd/src/notifications/connect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import mqtt from 'mqtt'

import { connectionStore } from './store'
import {
sendDeserialized,
sendDeserializedGenericError,
deserializeExpectedMessages,
} from './deserialize'
import { unsubscribe } from './unsubscribe'
import { notifyLog } from './notifyLog'

import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types'

// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied.
const CLIENT_ID = 'odd-' + Math.random().toString(16).slice(2, 8) // Derived from mqttjs
const connectOptions: mqtt.IClientOptions = {
clientId: CLIENT_ID,
port: 1883,
keepalive: 60,
protocolVersion: 5,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
clean: true,
resubscribe: true,
}

export function connectAsync(brokerURL: string): Promise<mqtt.Client> {
const client = mqtt.connect(brokerURL, connectOptions)

return new Promise((resolve, reject) => {
// Listeners added to client to trigger promise resolution
const promiseListeners: {
[key: string]: (...args: any[]) => void
} = {
connect: () => {
removePromiseListeners()
return resolve(client)
},
// A connection error event will close the connection without a retry.
error: (error: Error | string) => {
removePromiseListeners()
const clientEndPromise = new Promise((resolve, reject) =>
client.end(true, {}, () => resolve(error))
)
return clientEndPromise.then(() => reject(error))
},
end: () => promiseListeners.error(`Couldn't connect to ${brokerURL}`),
}

function removePromiseListeners(): void {
Object.keys(promiseListeners).forEach(eventName => {
client.removeListener(eventName, promiseListeners[eventName])
})
}

Object.keys(promiseListeners).forEach(eventName => {
client.on(eventName, promiseListeners[eventName])
})
})
}

export function establishListeners(): void {
const client = connectionStore.client as mqtt.MqttClient
const { ip, robotName } = connectionStore

client.on(
'message',
(topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => {
deserializeExpectedMessages(message.toString())
.then(deserializedMessage => {
const messageContainsUnsubFlag = 'unsubscribe' in deserializedMessage
if (messageContainsUnsubFlag) {
void unsubscribe(topic).catch((error: Error) =>
notifyLog.debug(error.message)
)
}

notifyLog.debug('Received notification data from main via IPC', {
ip,
topic,
})

sendDeserialized(topic, deserializedMessage)
})
.catch(error => notifyLog.debug(`${error.message}`))
}
)

client.on('reconnect', () => {
notifyLog.debug(`Attempting to reconnect to ${robotName} on ${ip}`)
})
// handles transport layer errors only
client.on('error', error => {
notifyLog.warn(`Error - ${error.name}: ${error.message}`)
sendDeserializedGenericError('ALL_TOPICS')
client.end()
})

client.on('end', () => {
notifyLog.debug(`Closed connection to ${robotName} on ${ip}`)
// Marking the connection as failed with a generic error status lets the connection re-establish in the future
// and tells the browser to fall back to polling (assuming this 'end' event isn't caused by the app closing).
void connectionStore.setErrorStatus()
})

client.on('disconnect', packet => {
notifyLog.warn(
`Disconnected from ${robotName} on ${ip} with code ${
packet.reasonCode ?? 'undefined'
}`
)
sendDeserializedGenericError('ALL_TOPICS')
})
}

export function closeConnectionForcefully(): Promise<void> {
const { client } = connectionStore
return new Promise<void>((resolve, reject) =>
client?.end(true, {}, () => resolve())
)
}
62 changes: 62 additions & 0 deletions app-shell-odd/src/notifications/deserialize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import isEqual from 'lodash/isEqual'

import { connectionStore } from './store'

import type {
NotifyBrokerResponses,
NotifyRefetchData,
NotifyResponseData,
NotifyTopic,
NotifyUnsubscribeData,
} from '@opentrons/app/src/redux/shell/types'
import { FAILURE_STATUSES } from '../constants'

const VALID_NOTIFY_RESPONSES: [NotifyRefetchData, NotifyUnsubscribeData] = [
{ refetchUsingHTTP: true },
{ unsubscribe: true },
]

export function sendDeserialized(
topic: NotifyTopic,
message: NotifyResponseData
): void {
try {
const browserWindow = connectionStore.getBrowserWindow()
browserWindow?.webContents.send(
'notify',
connectionStore.ip,
topic,
message
)
} catch {} // Prevents shell erroring during app shutdown event.
}

export function sendDeserializedGenericError(topic: NotifyTopic): void {
sendDeserialized(topic, FAILURE_STATUSES.ECONNFAILED)
}

export function deserializeExpectedMessages(
message: string
): Promise<NotifyBrokerResponses> {
return new Promise((resolve, reject) => {
let deserializedMessage: NotifyResponseData | Record<string, unknown>
const error = new Error(
`Unexpected data received from notify broker: ${message}`
)

try {
deserializedMessage = JSON.parse(message)
} catch {
reject(error)
}

const isValidNotifyResponse = VALID_NOTIFY_RESPONSES.some(model =>
isEqual(model, deserializedMessage)
)
if (!isValidNotifyResponse) {
reject(error)
} else {
resolve(JSON.parse(message))
}
})
}
65 changes: 65 additions & 0 deletions app-shell-odd/src/notifications/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { connectionStore } from './store'
import {
connectAsync,
establishListeners,
closeConnectionForcefully,
} from './connect'
import { subscribe } from './subscribe'
import { notifyLog } from './notifyLog'

import type { BrowserWindow } from 'electron'
import type { Action, Dispatch } from '../types'

// Manages the MQTT broker connection through a connection store. Subscriptions are handled "lazily" - a component must
// dispatch a subscribe action before a subscription request is made to the broker. Unsubscribe requests only occur if
// the broker sends an "unsubscribe" flag. Pending subs and unsubs are used to prevent unnecessary network and broker load.

export function registerNotify(
dispatch: Dispatch,
mainWindow: BrowserWindow
): (action: Action) => unknown {
// Because of the ODD's start sequence, the browser window will always be defined before relevant actions occur.
if (connectionStore.getBrowserWindow() == null) {
connectionStore.setBrowserWindow(mainWindow)
}

return function handleAction(action: Action) {
switch (action.type) {
case 'shell:NOTIFY_SUBSCRIBE':
return subscribe(action.payload.topic)
}
}
}

export function establishBrokerConnection(): Promise<void> {
const { ip, robotName } = connectionStore

return connectAsync(`mqtt://${connectionStore.ip}`)
.then(client => {
notifyLog.debug(`Successfully connected to ${robotName} on ${ip}`)
void connectionStore
.setConnected(client)
.then(() => establishListeners())
.catch((error: Error) => notifyLog.debug(error.message))
})
.catch((error: Error) => {
notifyLog.warn(
`Failed to connect to ${robotName} on ${ip} - ${error.name}: ${error.message} `
)
void connectionStore.setErrorStatus()
})
}

export function closeBrokerConnection(): Promise<void> {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject(Error('Failed to close the connection within the time limit.'))
}, 2000)

notifyLog.debug(
`Stopping notify service connection for host ${connectionStore.robotName}`
)
const closeConnection = closeConnectionForcefully()
closeConnection.then(resolve).catch(reject)
})
}
3 changes: 3 additions & 0 deletions app-shell-odd/src/notifications/notifyLog.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { createLogger } from '../log'

export const notifyLog = createLogger('notify')
Loading

0 comments on commit 97e6f0d

Please sign in to comment.