-
Notifications
You must be signed in to change notification settings - Fork 179
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(app-shell-odd): Utilize robot-server unsubscribe flags (#14724)
Closes EXEC-319 This is the app-shell-odd equivalent of the app-shell refactor, #14648. It's similar to the app-shell logic, but significantly simpler, since we don't have to manage multiple robots, worry about localhost port blocking, and multiple IPs per robot. The real change lies in the initial connect and final disconnect on app shutdown. Otherwise, the changes are primarily in the ConnectionStore. Because the app no longer utilizes unsubscribe actions in any capacity, we can safely remove those references.
- Loading branch information
1 parent
aa7f7ce
commit 1ba6166
Showing
17 changed files
with
554 additions
and
540 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import mqtt from 'mqtt' | ||
|
||
import { connectionStore } from './store' | ||
import { | ||
sendDeserialized, | ||
sendDeserializedGenericError, | ||
deserializeExpectedMessages, | ||
} from './deserialize' | ||
import { unsubscribe } from './unsubscribe' | ||
import { notifyLog } from './notifyLog' | ||
|
||
import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' | ||
|
||
// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied. | ||
const CLIENT_ID = 'odd-' + Math.random().toString(16).slice(2, 8) // Derived from mqttjs | ||
const connectOptions: mqtt.IClientOptions = { | ||
clientId: CLIENT_ID, | ||
port: 1883, | ||
keepalive: 60, | ||
protocolVersion: 5, | ||
reconnectPeriod: 1000, | ||
connectTimeout: 30 * 1000, | ||
clean: true, | ||
resubscribe: true, | ||
} | ||
|
||
export function connectAsync(brokerURL: string): Promise<mqtt.Client> { | ||
const client = mqtt.connect(brokerURL, connectOptions) | ||
|
||
return new Promise((resolve, reject) => { | ||
// Listeners added to client to trigger promise resolution | ||
const promiseListeners: { | ||
[key: string]: (...args: any[]) => void | ||
} = { | ||
connect: () => { | ||
removePromiseListeners() | ||
return resolve(client) | ||
}, | ||
// A connection error event will close the connection without a retry. | ||
error: (error: Error | string) => { | ||
removePromiseListeners() | ||
const clientEndPromise = new Promise((resolve, reject) => | ||
client.end(true, {}, () => resolve(error)) | ||
) | ||
return clientEndPromise.then(() => reject(error)) | ||
}, | ||
end: () => promiseListeners.error(`Couldn't connect to ${brokerURL}`), | ||
} | ||
|
||
function removePromiseListeners(): void { | ||
Object.keys(promiseListeners).forEach(eventName => { | ||
client.removeListener(eventName, promiseListeners[eventName]) | ||
}) | ||
} | ||
|
||
Object.keys(promiseListeners).forEach(eventName => { | ||
client.on(eventName, promiseListeners[eventName]) | ||
}) | ||
}) | ||
} | ||
|
||
export function establishListeners(): void { | ||
const client = connectionStore.client as mqtt.MqttClient | ||
const { ip, robotName } = connectionStore | ||
|
||
client.on( | ||
'message', | ||
(topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => { | ||
deserializeExpectedMessages(message.toString()) | ||
.then(deserializedMessage => { | ||
const messageContainsUnsubFlag = 'unsubscribe' in deserializedMessage | ||
if (messageContainsUnsubFlag) { | ||
void unsubscribe(topic).catch((error: Error) => | ||
notifyLog.debug(error.message) | ||
) | ||
} | ||
|
||
notifyLog.debug('Received notification data from main via IPC', { | ||
ip, | ||
topic, | ||
}) | ||
|
||
sendDeserialized(topic, deserializedMessage) | ||
}) | ||
.catch(error => notifyLog.debug(`${error.message}`)) | ||
} | ||
) | ||
|
||
client.on('reconnect', () => { | ||
notifyLog.debug(`Attempting to reconnect to ${robotName} on ${ip}`) | ||
}) | ||
// handles transport layer errors only | ||
client.on('error', error => { | ||
notifyLog.warn(`Error - ${error.name}: ${error.message}`) | ||
sendDeserializedGenericError('ALL_TOPICS') | ||
client.end() | ||
}) | ||
|
||
client.on('end', () => { | ||
notifyLog.debug(`Closed connection to ${robotName} on ${ip}`) | ||
// Marking the connection as failed with a generic error status lets the connection re-establish in the future | ||
// and tells the browser to fall back to polling (assuming this 'end' event isn't caused by the app closing). | ||
void connectionStore.setErrorStatus() | ||
}) | ||
|
||
client.on('disconnect', packet => { | ||
notifyLog.warn( | ||
`Disconnected from ${robotName} on ${ip} with code ${ | ||
packet.reasonCode ?? 'undefined' | ||
}` | ||
) | ||
sendDeserializedGenericError('ALL_TOPICS') | ||
}) | ||
} | ||
|
||
export function closeConnectionForcefully(): Promise<void> { | ||
const { client } = connectionStore | ||
return new Promise<void>((resolve, reject) => | ||
client?.end(true, {}, () => resolve()) | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import isEqual from 'lodash/isEqual' | ||
|
||
import { connectionStore } from './store' | ||
|
||
import type { | ||
NotifyBrokerResponses, | ||
NotifyRefetchData, | ||
NotifyResponseData, | ||
NotifyTopic, | ||
NotifyUnsubscribeData, | ||
} from '@opentrons/app/src/redux/shell/types' | ||
import { FAILURE_STATUSES } from '../constants' | ||
|
||
const VALID_NOTIFY_RESPONSES: [NotifyRefetchData, NotifyUnsubscribeData] = [ | ||
{ refetchUsingHTTP: true }, | ||
{ unsubscribe: true }, | ||
] | ||
|
||
export function sendDeserialized( | ||
topic: NotifyTopic, | ||
message: NotifyResponseData | ||
): void { | ||
try { | ||
const browserWindow = connectionStore.getBrowserWindow() | ||
browserWindow?.webContents.send( | ||
'notify', | ||
connectionStore.ip, | ||
topic, | ||
message | ||
) | ||
} catch {} // Prevents shell erroring during app shutdown event. | ||
} | ||
|
||
export function sendDeserializedGenericError(topic: NotifyTopic): void { | ||
sendDeserialized(topic, FAILURE_STATUSES.ECONNFAILED) | ||
} | ||
|
||
export function deserializeExpectedMessages( | ||
message: string | ||
): Promise<NotifyBrokerResponses> { | ||
return new Promise((resolve, reject) => { | ||
let deserializedMessage: NotifyResponseData | Record<string, unknown> | ||
const error = new Error( | ||
`Unexpected data received from notify broker: ${message}` | ||
) | ||
|
||
try { | ||
deserializedMessage = JSON.parse(message) | ||
} catch { | ||
reject(error) | ||
} | ||
|
||
const isValidNotifyResponse = VALID_NOTIFY_RESPONSES.some(model => | ||
isEqual(model, deserializedMessage) | ||
) | ||
if (!isValidNotifyResponse) { | ||
reject(error) | ||
} else { | ||
resolve(JSON.parse(message)) | ||
} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import { connectionStore } from './store' | ||
import { | ||
connectAsync, | ||
establishListeners, | ||
closeConnectionForcefully, | ||
} from './connect' | ||
import { subscribe } from './subscribe' | ||
import { notifyLog } from './notifyLog' | ||
|
||
import type { BrowserWindow } from 'electron' | ||
import type { Action, Dispatch } from '../types' | ||
|
||
// Manages the MQTT broker connection through a connection store. Subscriptions are handled "lazily" - a component must | ||
// dispatch a subscribe action before a subscription request is made to the broker. Unsubscribe requests only occur if | ||
// the broker sends an "unsubscribe" flag. Pending subs and unsubs are used to prevent unnecessary network and broker load. | ||
|
||
export function registerNotify( | ||
dispatch: Dispatch, | ||
mainWindow: BrowserWindow | ||
): (action: Action) => unknown { | ||
// Because of the ODD's start sequence, the browser window will always be defined before relevant actions occur. | ||
if (connectionStore.getBrowserWindow() == null) { | ||
connectionStore.setBrowserWindow(mainWindow) | ||
} | ||
|
||
return function handleAction(action: Action) { | ||
switch (action.type) { | ||
case 'shell:NOTIFY_SUBSCRIBE': | ||
return subscribe(action.payload.topic) | ||
} | ||
} | ||
} | ||
|
||
export function establishBrokerConnection(): Promise<void> { | ||
const { ip, robotName } = connectionStore | ||
|
||
return connectAsync(`mqtt://${connectionStore.ip}`) | ||
.then(client => { | ||
notifyLog.debug(`Successfully connected to ${robotName} on ${ip}`) | ||
void connectionStore | ||
.setConnected(client) | ||
.then(() => establishListeners()) | ||
.catch((error: Error) => notifyLog.debug(error.message)) | ||
}) | ||
.catch((error: Error) => { | ||
notifyLog.warn( | ||
`Failed to connect to ${robotName} on ${ip} - ${error.name}: ${error.message} ` | ||
) | ||
void connectionStore.setErrorStatus() | ||
}) | ||
} | ||
|
||
export function closeBrokerConnection(): Promise<void> { | ||
return new Promise((resolve, reject) => { | ||
setTimeout(() => { | ||
reject(Error('Failed to close the connection within the time limit.')) | ||
}, 2000) | ||
|
||
notifyLog.debug( | ||
`Stopping notify service connection for host ${connectionStore.robotName}` | ||
) | ||
const closeConnection = closeConnectionForcefully() | ||
closeConnection.then(resolve).catch(reject) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
import { createLogger } from '../log' | ||
|
||
export const notifyLog = createLogger('notify') |
Oops, something went wrong.