Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
mjhuff committed Mar 20, 2024
1 parent c25f0ce commit c93702a
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 129 deletions.
33 changes: 12 additions & 21 deletions app-shell/src/notifications/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ export function cleanUpUnreachableRobots(
healthyRobots: RobotData[]
): Promise<void> {
return new Promise(() => {
const healthyRobotIPs = healthyRobots.map(({ ip }) => ip)
const healthyRobotIPsSet = new Set(healthyRobotIPs)
const healthyRobotNames = healthyRobots.map(({ robotName }) => robotName)
const healthyRobotNamesSet = new Set(healthyRobotNames)
const unreachableRobots = connectionStore
.getReachableHosts()
.filter(hostname => {
return !healthyRobotIPsSet.has(hostname)
.filter(robotName => {
return !healthyRobotNamesSet.has(robotName)
})
void closeConnectionsForcefullyFor(unreachableRobots)
})
Expand All @@ -67,31 +67,20 @@ export function establishConnections(
): Promise<void> {
return new Promise(() => {
const newConnections = healthyRobots.filter(({ ip, robotName }) => {
if (connectionStore.isConnectedToBroker(ip)) {
if (connectionStore.isConnectedToBroker(robotName)) {
return false
} else if (connectionStore.isAssociatedWithExistingHostData(robotName)) {
if (!connectionStore.isAssociatedBrokerErrored(robotName)) {
// If not connected, wait for another poll of discovery-client to resolve the current connection.
if (connectionStore.isAssociatedBrokerConnected(robotName)) {
void connectionStore.associateIPWithExistingHostData(ip, robotName)
}
} else {
connectionStore.associateIPWithRobotName(ip, robotName)
if (!connectionStore.isConnectionTerminated(robotName)) {
return false
} else {
// Mark this IP as a new potential broker connection to check if the broker is reachable on this IP.
if (!connectionStore.isKnownPortBlockedIP(ip)) {
void connectionStore.deleteAllAssociatedIPsGivenRobotName(robotName)
return true
} else {
return false
}
return !connectionStore.isKnownPortBlockedIP(ip)
}
} else {
return !connectionStore.isKnownPortBlockedIP(ip)
}
})
newConnections.forEach(({ ip, robotName }) => {
void connectionStore
.setPendingConnection(ip, robotName)
.setPendingConnection(robotName)
.then(() => {
connectAsync(`mqtt://${ip}`)
.then(client => {
Expand Down Expand Up @@ -188,6 +177,8 @@ function establishListeners(

client.on('end', () => {
notifyLog.debug(`Closed connection to ${robotName} on ${ip}`)
// Marking the connection as failed with a generic error status lets the connection re-establish in the future
// and tells the browser to fall back to polling (assuming this 'end' event isn't caused by the app closing).
void connectionStore.setErrorStatus(ip, FAILURE_STATUSES.ECONNFAILED)
})

Expand Down
1 change: 1 addition & 0 deletions app-shell/src/notifications/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type { Action, Dispatch } from '../types'
// dispatch a subscribe action before a subscription request is made to the broker. Unsubscribe requests only occur if
// the broker sends an "unsubscribe" flag. Pending subs and unsubs are used to prevent unnecessary network and broker load.

//TOME: 1) Do other refactors mentioned in feedback. 2) Test on robot. 3) Write tests.

Check failure on line 23 in app-shell/src/notifications/index.ts

View workflow job for this annotation

GitHub Actions / js checks

Expected space or tab after '//' in comment

Check failure on line 23 in app-shell/src/notifications/index.ts

View workflow job for this annotation

GitHub Actions / js checks

Expected space or tab after '//' in comment
export function registerNotify(
dispatch: Dispatch,
mainWindow: BrowserWindow
Expand Down
175 changes: 72 additions & 103 deletions app-shell/src/notifications/store.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* eslint-disable @typescript-eslint/no-dynamic-delete */
import type mqtt from 'mqtt'
import head from 'lodash/head'

import { FAILURE_STATUSES } from '../constants'

Expand All @@ -22,7 +21,9 @@ interface HostData {
* Manages the internal state of MQTT connections to various robot hosts.
*/
class ConnectionStore {
private hosts: Record<string, HostData> = {}
private hostsByRobotName: Record<string, HostData> = {}

private robotNamesByIP: Record<string, string> = {}

private browserWindow: BrowserWindow | null = null

Expand All @@ -33,12 +34,13 @@ class ConnectionStore {
}

public getReachableHosts(): string[] {
return Object.keys(this.hosts)
return Object.keys(this.hostsByRobotName)
}

public getClient(ip: string): mqtt.MqttClient | null {
if (ip in this.hosts) {
return this.hosts[ip].client
const hostData = this.getHostDataByIP(ip)
if (hostData != null) {
return hostData.client
} else {
return null
}
Expand All @@ -50,37 +52,31 @@ class ConnectionStore {
* for analytics reasons. Afterward, a generic "ECONNFAILED" is returned.
*/
public getFailedConnectionStatus(ip: string): FailedConnStatus | null {
if (ip in this.hosts) {
const failureStatus = this.hosts[ip].unreachableStatus
const robotName = this.getRobotNameByIP(ip)
if (robotName != null) {
const failureStatus = this.hostsByRobotName[robotName].unreachableStatus
if (failureStatus === FAILURE_STATUSES.ECONNREFUSED) {
this.hosts[ip].unreachableStatus = FAILURE_STATUSES.ECONNFAILED
this.hostsByRobotName[robotName].unreachableStatus =
FAILURE_STATUSES.ECONNFAILED
}
return failureStatus
} else {
return null
}
}

public getAssociatedIPsFromRobotName(robotName: string): string[] {
return Object.keys(this.hosts).filter(
ip => this.hosts[ip].robotName === robotName
)
}

public getRobotNameFromIP(ip: string): string | null {
if (ip in this.hosts) {
return this.hosts[ip].robotName
} else return null
public getRobotNameByIP(ip: string): string | null {
return this.robotNamesByIP[ip] ?? null
}

public setBrowserWindow(window: BrowserWindow): void {
this.browserWindow = window
}

public setPendingConnection(ip: string, robotName: string): Promise<void> {
public setPendingConnection(robotName: string): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.isAssociatedBrokerConnecting(robotName)) {
this.hosts[ip] = {
if (!this.isConnectingToBroker(robotName)) {
this.hostsByRobotName[robotName] = {
robotName,
client: null,
subscriptions: new Set(),
Expand All @@ -92,21 +88,24 @@ class ConnectionStore {
} else {
reject(
new Error(
'Cannot create a new connection while connecting on an associated IP.'
'Cannot create a new connection while currently connecting.'
)
)
}
})
}

public setConnected(ip: string, client: mqtt.MqttClient): Promise<void> {
public setConnected(
robotName: string,
client: mqtt.MqttClient
): Promise<void> {
return new Promise((resolve, reject) => {
if (ip in this.hosts) {
if (this.hosts[ip].client == null) {
this.hosts[ip].client = client
if (robotName in this.hostsByRobotName) {
if (this.hostsByRobotName[robotName].client == null) {
this.hostsByRobotName[robotName].client = client
resolve()
} else {
reject(new Error(`Connection already exists for ${ip}`))
reject(new Error(`Connection already exists for ${robotName}`))
}
} else {
reject(new Error('IP is not associated with a connection'))
Expand All @@ -121,20 +120,21 @@ class ConnectionStore {
*/
public setErrorStatus(ip: string, errorMessage: string): Promise<void> {
return new Promise((resolve, reject) => {
if (ip in this.hosts) {
if (this.hosts[ip].unreachableStatus == null) {
const robotName = this.getRobotNameByIP(ip)
if (robotName != null && robotName in this.hostsByRobotName) {
if (this.hostsByRobotName[robotName].unreachableStatus == null) {
const errorStatus = errorMessage?.includes(
FAILURE_STATUSES.ECONNREFUSED
)
? FAILURE_STATUSES.ECONNREFUSED
: FAILURE_STATUSES.ECONNFAILED

this.hosts[ip].unreachableStatus = errorStatus
this.hostsByRobotName[ip].unreachableStatus = errorStatus
if (errorStatus === FAILURE_STATUSES.ECONNREFUSED) {
this.knownPortBlockedIPs.add(ip)
}
resolve()
}
resolve()
} else {
reject(new Error(`${ip} is not associated with a connection`))
}
Expand All @@ -147,8 +147,9 @@ class ConnectionStore {
status: 'pending' | 'subscribed'
): Promise<void> {
return new Promise((resolve, reject) => {
if (ip in this.hosts) {
const { pendingSubs, subscriptions } = this.hosts[ip]
const robotName = this.getRobotNameByIP(ip)
if (robotName != null && robotName in this.hostsByRobotName) {
const { pendingSubs, subscriptions } = this.hostsByRobotName[robotName]
if (status === 'pending') {
pendingSubs.add(topic)
} else {
Expand All @@ -162,14 +163,17 @@ class ConnectionStore {
})
}

public setUnubStatus(
public setUnsubStatus(
ip: string,
topic: NotifyTopic,
status: 'pending' | 'unsubscribed'
): Promise<void> {
return new Promise((resolve, reject) => {
if (ip in this.hosts) {
const { pendingUnsubs, subscriptions } = this.hosts[ip]
const robotName = this.getRobotNameByIP(ip)
if (robotName != null && robotName in this.hostsByRobotName) {
const { pendingUnsubs, subscriptions } = this.hostsByRobotName[
robotName
]
if (subscriptions.has(topic)) {
if (status === 'pending') {
pendingUnsubs.add(topic)
Expand All @@ -185,90 +189,50 @@ class ConnectionStore {
})
}

/**
*
* @description Creates a new hosts entry for a given IP with HostData that is a reference to an existing
* IP's HostData. This occurs when two IPs reported by discovery-client actually reference the same broker.
*/
public associateIPWithExistingHostData(
ip: string,
robotName: string
): Promise<void> {
return new Promise((resolve, reject) => {
const associatedHost = Object.values(this.hosts).find(
hostData => hostData.robotName === robotName
)
if (associatedHost != null) {
this.hosts[ip] = associatedHost
resolve()
} else {
reject(new Error('No associated IP found.'))
}
})
}

public deleteAllAssociatedIPsGivenRobotName(
robotName: string
): Promise<void> {
return new Promise((resolve, reject) => {
const associatedHosts = this.getAssociatedIPsFromRobotName(robotName)
associatedHosts.forEach(hostname => {
delete this.hosts[hostname]
})
resolve()
})
}

public isAssociatedWithExistingHostData(robotName: string): boolean {
return this.getAssociatedIPsFromRobotName(robotName).length > 0
}

public isAssociatedBrokerErrored(robotName: string): boolean {
const associatedRobots = this.getAssociatedIPsFromRobotName(robotName)
return this.isBrokerErrored(head(associatedRobots) as string)
}

public isAssociatedBrokerConnected(robotName: string): boolean {
const associatedIPs = this.getAssociatedIPsFromRobotName(robotName)
return this.isConnectedToBroker(head(associatedIPs) as string)
}

public isAssociatedBrokerConnecting(robotName: string): boolean {
const associatedIPs = this.getAssociatedIPsFromRobotName(robotName)
return this.isConnectingToBroker(head(associatedIPs) as string)
public associateIPWithRobotName(ip: string, robotName: string): void {
const robotNameInStore = this.robotNamesByIP[ip]
if (robotNameInStore !== robotName) {
this.robotNamesByIP[ip] = robotName
}
}

public isConnectedToBroker(ip: string): boolean {
return this.hosts[ip]?.client?.connected ?? false
public isConnectedToBroker(robotName: string): boolean {
return robotName != null
? this.hostsByRobotName[robotName]?.client?.connected ?? false
: false
}

public isConnectingToBroker(ip: string): boolean {
public isConnectingToBroker(robotName: string): boolean {
return (
(this.hosts[ip]?.client == null ?? false) && !this.isBrokerErrored(ip)
(this.hostsByRobotName[robotName]?.client == null ?? false) &&
!this.isConnectionTerminated(robotName)
)
}

public isPendingSub(ip: string, topic: NotifyTopic): boolean {
if (ip in this.hosts) {
const { pendingSubs } = this.hosts[ip]
const robotName = this.getRobotNameByIP(ip)
if (robotName != null && robotName in this.hostsByRobotName) {
const { pendingSubs } = this.hostsByRobotName[robotName]
return pendingSubs.has(topic)
} else {
return false
}
}

public isActiveSub(ip: string, topic: NotifyTopic): boolean {
if (ip in this.hosts) {
const { subscriptions } = this.hosts[ip]
const robotName = this.getRobotNameByIP(ip)
if (robotName != null && robotName in this.hostsByRobotName) {
const { subscriptions } = this.hostsByRobotName[robotName]
return subscriptions.has(topic)
} else {
return false
}
}

public isPendingUnsub(ip: string, topic: NotifyTopic): boolean {
if (ip in this.hosts) {
const { pendingUnsubs } = this.hosts[ip]
const robotName = this.getRobotNameByIP(ip)
if (robotName != null && robotName in this.hostsByRobotName) {
const { pendingUnsubs } = this.hostsByRobotName[robotName]
return pendingUnsubs.has(topic)
} else {
return false
Expand All @@ -277,11 +241,11 @@ class ConnectionStore {

/**
*
* @description Reachable refers to whether the broker connection has returned an error.
* @description A broker connection is terminated if it is errored or not present in the store.
*/
public isBrokerErrored(ip: string): boolean {
if (ip in this.hosts) {
return this.hosts[ip].unreachableStatus != null
public isConnectionTerminated(robotName: string): boolean {
if (robotName in this.hostsByRobotName) {
return this.hostsByRobotName[robotName].unreachableStatus != null
} else {
return true
}
Expand All @@ -291,8 +255,13 @@ class ConnectionStore {
return this.knownPortBlockedIPs.has(ip)
}

public isIPInStore(ip: string): boolean {
return ip in this.hosts
private getHostDataByIP(ip: string): HostData | null {
if (ip in this.robotNamesByIP) {
const robotName = this.robotNamesByIP[ip]
return this.hostsByRobotName[robotName] ?? null
} else {
return null
}
}
}

Expand Down
Loading

0 comments on commit c93702a

Please sign in to comment.