diff --git a/README.md b/README.md index 5b4f676..d999477 100644 --- a/README.md +++ b/README.md @@ -38,19 +38,10 @@ aws ssm put-parameter --name thingy-rocks-backend-Wirepas5GMeshGatewayEndpoint - npx cdk deploy ``` -## Support for managing Wirepas 5G Mesh nodes - -For interacting with these nodes, - -1. Create a thing type `mesh-node` (they cannot be created using - CloudFormation). -1. Assign the thing type `mesh-node` to the thing which should act as a 5G Mesh - Node. - -The thing type is check when an state update is received from the UI. - ### Running the Wirepas 5G Mesh Gateway +Create a thing type `wirepas-5g-mesh-gateway`. + Configure the gateway settings using the `.envrc` (see [the example](./envrc.example)). diff --git a/cdk/resources/Wirepas5GMeshGateway.ts b/cdk/resources/Wirepas5GMeshGateway.ts index e666294..9a22635 100644 --- a/cdk/resources/Wirepas5GMeshGateway.ts +++ b/cdk/resources/Wirepas5GMeshGateway.ts @@ -1,27 +1,23 @@ import type { Stack } from 'aws-cdk-lib' import { aws_iam as IAM } from 'aws-cdk-lib' import { Construct } from 'constructs' -import type { WebsocketAPI } from './WebsocketAPI' export class Wirepas5GMeshGateway extends Construct { public readonly accessKey - constructor(parent: Stack, { websocketAPI }: { websocketAPI: WebsocketAPI }) { + constructor(parent: Stack) { super(parent, 'Wirepas5GMeshGateway') const gatewayUser = new IAM.User(this, 'gatewayUser') gatewayUser.addToPolicy( new IAM.PolicyStatement({ - actions: ['iot:DescribeEndpoint'], + actions: [ + 'iot:DescribeEndpoint', + 'iot:UpdateThingShadow', + 'iot:ListThings', + ], resources: [`*`], }), ) - gatewayUser.addToPolicy( - new IAM.PolicyStatement({ - actions: ['execute-api:ManageConnections'], - resources: [websocketAPI.websocketAPIArn], - }), - ) - websocketAPI.connectionsTable.grantFullAccess(gatewayUser) this.accessKey = new IAM.CfnAccessKey(this, 'accessKey', { userName: gatewayUser.userName, diff --git a/cdk/stacks/BackendStack.ts b/cdk/stacks/BackendStack.ts index 581127c..bc21f05 100644 --- a/cdk/stacks/BackendStack.ts +++ b/cdk/stacks/BackendStack.ts @@ -103,7 +103,7 @@ export class BackendStack extends Stack { baseLayer, }) - const wirepasGateway = new Wirepas5GMeshGateway(this, { websocketAPI: api }) + const wirepasGateway = new Wirepas5GMeshGateway(this) // Outputs new CfnOutput(this, 'WebSocketURI', { diff --git a/lambda/notifyClients.ts b/lambda/notifyClients.ts index 09b37a4..9e89292 100644 --- a/lambda/notifyClients.ts +++ b/lambda/notifyClients.ts @@ -38,6 +38,8 @@ export type DeviceEvent = { deviceAlias?: string // The fixed geo-location of the device, deviceLocation?: string // e.g.: 63.42115901688979,10.437200141182338 + // The thingy type + deviceType?: string } & ( | { reported: Record diff --git a/lambda/withDeviceAlias.ts b/lambda/withDeviceAlias.ts index a7cdd0f..41a5769 100644 --- a/lambda/withDeviceAlias.ts +++ b/lambda/withDeviceAlias.ts @@ -13,21 +13,26 @@ export const withDeviceAlias = >( return (notifier: N) => async (event: Parameters[0]): Promise => { if (!('deviceId' in event)) return notifier(event) - const { alias: deviceAlias, location } = await info(event.deviceId) - if (deviceAlias === undefined) return notifier(event) + const { alias: deviceAlias, location, type } = await info(event.deviceId) return notifier({ ...event, deviceAlias, deviceLocation: location, + deviceType: type, }) } } -const deviceInfo: Record = {} +const deviceInfo: Record< + string, + { alias?: string; location?: string; type?: string } +> = {} export const getDeviceInfo = (iot: IoTClient) => - async (deviceId: string): Promise<{ alias?: string; location?: string }> => { + async ( + deviceId: string, + ): Promise<{ alias?: string; location?: string; type?: string }> => { const info = deviceInfo[deviceId] ?? (await getDeviceAttributes(iot)(deviceId)) if (!(deviceId in deviceInfo)) deviceInfo[deviceId] = info @@ -36,11 +41,13 @@ export const getDeviceInfo = } const getDeviceAttributes = (iot: IoTClient) => async (deviceId: string) => { - const { name, location } = - (await iot.send(new DescribeThingCommand({ thingName: deviceId }))) - ?.attributes ?? {} + const { attributes, thingTypeName } = await iot.send( + new DescribeThingCommand({ thingName: deviceId }), + ) + const { name, location } = attributes ?? {} return { alias: name, location, + type: thingTypeName, } } diff --git a/package-lock.json b/package-lock.json index f1ff0e0..9b7666f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "@sinclair/typebox": "0.32.13", "ajv": "8.12.0", "jsonata": "2.0.3", + "lodash-es": "4.17.21", "mqtt": "5.3.5", "protobufjs": "7.2.6" }, @@ -35,6 +36,7 @@ "@swc/core": "1.4.0", "@types/aws-lambda": "8.10.133", "@types/glob": "8.1.0", + "@types/lodash-es": "4.17.12", "@types/node": "20.11.16", "@types/yazl": "2.4.5", "@typescript-eslint/eslint-plugin": "6.20.0", @@ -6532,6 +6534,21 @@ "@types/node": "*" } }, + "node_modules/@types/lodash": { + "version": "4.14.202", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.202.tgz", + "integrity": "sha512-OvlIYQK9tNneDlS0VN54LLd5uiPCBOp7gS5Z0f1mjoJYBrtStzgmJBxONW3U6OZqdtNzZPmn9BS/7WI7BFFcFQ==", + "dev": true + }, + "node_modules/@types/lodash-es": { + "version": "4.17.12", + "resolved": "https://registry.npmjs.org/@types/lodash-es/-/lodash-es-4.17.12.tgz", + "integrity": "sha512-0NgftHUcV4v34VhXm8QBSftKVXtbkBG3ViCjs6+eJ5a6y6Mi/jiFGPc1sC7QK+9BFhWrURE3EOggmWaSxL9OzQ==", + "dev": true, + "dependencies": { + "@types/lodash": "*" + } + }, "node_modules/@types/minimatch": { "version": "5.1.2", "resolved": "https://registry.npmjs.org/@types/minimatch/-/minimatch-5.1.2.tgz", @@ -11459,6 +11476,11 @@ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", "dev": true }, + "node_modules/lodash-es": { + "version": "4.17.21", + "resolved": "https://registry.npmjs.org/lodash-es/-/lodash-es-4.17.21.tgz", + "integrity": "sha512-mKnC+QJ9pWVzv+C4/U3rRsHapFfHvQFoFB92e52xeyGMcX6/OlIl78je1u8vePzYZSkkogMPJ2yjxxsb89cxyw==" + }, "node_modules/lodash.camelcase": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/lodash.camelcase/-/lodash.camelcase-4.3.0.tgz", diff --git a/package.json b/package.json index 3b1927d..0c9c6f9 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "@swc/core": "1.4.0", "@types/aws-lambda": "8.10.133", "@types/glob": "8.1.0", + "@types/lodash-es": "4.17.12", "@types/node": "20.11.16", "@types/yazl": "2.4.5", "@typescript-eslint/eslint-plugin": "6.20.0", @@ -95,6 +96,7 @@ "@sinclair/typebox": "0.32.13", "ajv": "8.12.0", "jsonata": "2.0.3", + "lodash-es": "4.17.21", "mqtt": "5.3.5", "protobufjs": "7.2.6" } diff --git a/wirepas-5g-mesh-gateway/ScannableArray.ts b/wirepas-5g-mesh-gateway/ScannableArray.ts index f30cb84..72740d7 100644 --- a/wirepas-5g-mesh-gateway/ScannableArray.ts +++ b/wirepas-5g-mesh-gateway/ScannableArray.ts @@ -6,17 +6,13 @@ export class ScannableArray { this.array = array } - getChar(): number | undefined { - return this.array.at(this.index) - } - - getCharNext(): number { + getChar(): number { const next = this.array.at(this.index++) if (next === undefined) throw new Error(`Out of bounds!`) return next } - next(): void { - this.index++ + hasNext(): boolean { + return this.array.at(this.index + 1) !== undefined } } diff --git a/wirepas-5g-mesh-gateway/decodePayload.spec.ts b/wirepas-5g-mesh-gateway/decodePayload.spec.ts new file mode 100644 index 0000000..0c13253 --- /dev/null +++ b/wirepas-5g-mesh-gateway/decodePayload.spec.ts @@ -0,0 +1,95 @@ +import { describe, it } from 'node:test' +import { decodePayload } from './decodePayload.js' +import assert from 'node:assert/strict' + +void describe('decodePayload()', () => { + void it('should decode the payload', () => { + /* + + For this payload (92 bytes), it is on TLV format: you have the information ID (1 byte), the data length (1 byte) and the data (n bytes). + + For example, data 01 corresponds to the counter (4 bytes long, data is 42 c2 00 00 or 49730). + + The relevant data starts with 0F for the temperature (here, it is this part: 0f 04 0a d7 c3 41, which gives a temperature of 24.48°C (it is a float32)). + + Also, we send data starting with 01 but with a different length (3 bytes) which corresponds to a key press. + Example: 01 00 02 (because there is only one button). + + You may see payloads starting with 03 (3 bytes): it is when LED status/color changes. + In this case, color is the following: + + Byte 1: ID (0x03) + Byte 2: Color. 0x00: red, 0x01: blue, 0x02: green + Byte 3: State. 0x00: off, 0x01: on. + We send this payload when requested (response to get LED status (starts with 0x82)) or when setting LED (message 0x81), as an acknowledgement (to confirm color/status has changed). + + */ + const payload = Buffer.from( + [ + // [0x01: COUNTER] [0x04] [size_t counter] + '01 04 42 c2 00 00', + // [0x02: TIMESTAMP] [0x08] [int64_t timestamp] + '02 08 48 db f5 60 9b e4 00 00', + // [0x03: IAQ] [0x02] [uint16_t iaq] + '03 02 00 00', + // [0x04: IAQ_ACC] [0x01] [uint8_t iaq_acc] + '04 01 00', + // [0x05: SIAQ] [0x02] [uint16_t siaq] + '05 02 00 00', + // [0x06: SIAQ_ACC] [0x01] [uint8_t siaq_acc] + '06 01 00', + // [0x07: SENSOR_STATUS] [0x01] [uint8_t sensor_status] + '07 01 00', + // [0x08: SENSOR_STABILITY][0x01] [uint8_t sensor_stable] + '08 01 00', + // [0x09: GAS] [0x01] [uint8_t gas] + '09 01 00', + // [0x0A: GAS_ACC] [0x01] [uint8_t gas_acc] + '0a 01 00', + // [0x0B: VOC] [0x02] [uint16_t voc] + '0b 02 00 00', + // [0x0C: VOC_ACC] [0x01] [uint8_t voc_acc] + '0c 01 00', + // [0x0D: CO2] [0x02] [uint16_t co2] + '0d 02 00 00', + // [0x0E: CO2_ACC] [0x01] [uint8_t co2_acc] + '0e 01 00', + // [0x0F: TEMPERATURE] [0x04] [float temperature] + '0f 04 0a d7 c3 41', + // [0x10: HUMIDITY] [0x04] [float humidity] + '10 04 68 91 8d 41', + // [0x12: HUM_RAW] [0x04] [float raw_humidity] + '12 04 00 40 8a 46', + // [0x11: TEMP_RAW] [0x04] [float raw_temperature] + '11 04 00 00 19 45', + // [0x13: PRESS_RAW] [0x04] [float raw_pressure] + '13 04 80 f2 c3 47', + // [0x14: GAS_RAW] [0x04] [float raw_gas] + '14 04 00 ca 9e 47', + ] + .join('') + .replaceAll(' ', ''), + 'hex', + ) + + const decoded = decodePayload(payload) + + assert.deepEqual(decoded, [ + { counter: 49730 }, + { + // eslint-disable-next-line @typescript-eslint/no-loss-of-precision + temperature: 24.479999542236328, + }, + { + // eslint-disable-next-line @typescript-eslint/no-loss-of-precision + humidity: 17.695999145507812, + }, + { + raw_pressure: 100325, + }, + { + raw_gas: 81300, + }, + ]) + }) +}) diff --git a/wirepas-5g-mesh-gateway/decodePayload.ts b/wirepas-5g-mesh-gateway/decodePayload.ts index 841f327..6e1c690 100644 --- a/wirepas-5g-mesh-gateway/decodePayload.ts +++ b/wirepas-5g-mesh-gateway/decodePayload.ts @@ -1,58 +1,123 @@ import { ScannableArray } from './ScannableArray.js' export type Wirepas5GMeshNodePayload = | { counter: number } + | { timestamp: number } + | { temperature: number } | { button: number } - | { led: Record } - -export type Wirepas5GMeshNodeEvent = { - meshNodeEvent: { - meta: { - node: number - gateway: string - rxTime: Date - travelTimeMs: number - hops?: number - } - message: Wirepas5GMeshNodePayload - } + | { humidity: number } + | { raw_pressure: number } + | { raw_gas: number } + +enum MessageType { + COUNTER = 0x01, // [0x04] [size_t counter] + TIMESTAMP = 0x02, // [0x08] [int64_t timestamp] + IAQ = 0x03, // [0x02] [uint16_t iaq] + IAQ_ACC = 0x04, // [0x01] [uint8_t iaq_acc] + SIAQ = 0x05, // [0x02] [uint16_t siaq] + SIAQ_ACC = 0x06, // [0x01] [uint8_t siaq_acc] + SENSOR_STATUS = 0x07, // [0x01] [uint8_t sensor_status] + SENSOR_STABILITY = 0x08, // [0x01] [uint8_t sensor_stable] + GAS = 0x09, // [0x01] [uint8_t gas] + GAS_ACC = 0x0a, // [0x01] [uint8_t gas_acc] + VOC = 0x0b, // [0x02] [uint16_t voc] + VOC_ACC = 0x0c, // [0x01] [uint8_t voc_acc] + CO2 = 0x0d, // [0x02] [uint16_t co2] + CO2_ACC = 0x0e, // [0x01] [uint8_t co2_acc] + TEMPERATURE = 0x0f, // [0x04] [float temperature] + HUMIDITY = 0x10, // [0x04] [float humidity] + TEMP_RAW = 0x11, // [0x04] [float raw_temperature] + HUM_RAW = 0x12, // [0x04] [float raw_humidity] + PRESS_RAW = 0x13, // [0x04] [float raw_pressure] + GAS_RAW = 0x14, // [0x04] [float raw_gas] } -/** - * @see https://github.com/wirepas/wm-sdk/tree/v1.4.0/source/example_apps/evaluation_app#button-pressed-notification-message - */ + export const decodePayload = ( payload: Uint8Array, -): Wirepas5GMeshNodePayload | null => { - const scannableMessage = new ScannableArray(payload) - switch (scannableMessage.getChar()) { - // Periodic message with a counter value - case 0: - scannableMessage.next() - return counterMessage(scannableMessage) - // Button pressed - case 1: - scannableMessage.next() - return { button: scannableMessage.getChar() as number } - // LED state - case 3: - case 129: - scannableMessage.next() - return { - led: { - [scannableMessage.getCharNext()]: scannableMessage.getCharNext(), - }, - } - default: - console.error(`Unknown message type`, scannableMessage.getChar()) - return null +): Wirepas5GMeshNodePayload[] => { + const messages: Wirepas5GMeshNodePayload[] = [] + const msg = new ScannableArray(payload) + + while (msg.hasNext()) { + const type = msg.getChar() + const len = msg.getChar() + const skip = () => { + for (let i = 0; i < len; i++) msg.getChar() + } + switch (type) { + // Periodic message with a counter value + case MessageType.COUNTER: + messages.push({ counter: readUint(msg, len) }) + continue + case MessageType.TIMESTAMP: + // messages.push({ timestamp: readUint(msg, len) }) + skip() + continue + // Skip + case MessageType.IAQ: + case MessageType.IAQ_ACC: + case MessageType.SIAQ: + case MessageType.SIAQ_ACC: + case MessageType.SENSOR_STATUS: + case MessageType.SENSOR_STABILITY: + case MessageType.GAS: + case MessageType.GAS_ACC: + case MessageType.VOC: + case MessageType.VOC_ACC: + case MessageType.CO2: + case MessageType.CO2_ACC: + case MessageType.TEMP_RAW: + case MessageType.HUM_RAW: + skip() + continue + case MessageType.TEMPERATURE: + messages.push({ temperature: readFloat(msg, len) }) + continue + case MessageType.HUMIDITY: + messages.push({ humidity: readFloat(msg, len) }) + continue + case MessageType.PRESS_RAW: + messages.push({ raw_pressure: readFloat(msg, len) }) + continue + case MessageType.GAS_RAW: + messages.push({ raw_gas: readFloat(msg, len) }) + continue + default: + console.error(`Unknown message type`, type) + skip() + break + } + } + + return messages +} + +const readUint = (message: ScannableArray, numBytes: number): number => { + const bytes = Buffer.alloc(numBytes) + for (let i = 0; i < numBytes; i++) { + bytes.writeUInt8(message.getChar(), i) + } + return bytesToNumber(bytes) +} + +const bytesToNumber = (byteArray: Buffer): number => { + let result = 0 + for (let i = byteArray.byteLength - 1; i >= 0; i--) { + result = result * 256 + (byteArray[i] ?? 0) + } + + return result +} + +const readFloat = (message: ScannableArray, numBytes: number): number => { + const bytes = Buffer.alloc(numBytes) + for (let i = 0; i < numBytes; i++) { + bytes.writeUInt8(message.getChar(), i) } + return bytesToFloat(bytes) } -const counterMessage = (message: ScannableArray): { counter: number } => { - const counterBytes = Buffer.alloc(4) - counterBytes.writeUInt8(message.getCharNext(), 0) - counterBytes.writeUInt8(message.getCharNext(), 1) - counterBytes.writeUInt8(message.getCharNext(), 2) - counterBytes.writeUInt8(message.getCharNext(), 3) - const counterValue = new Uint32Array(counterBytes)[0] - return { counter: counterValue as number } +const bytesToFloat = (byteArray: Buffer): number => { + const dataView = new DataView(byteArray.buffer) + const floatValue = dataView.getFloat32(0, true) + return floatValue } diff --git a/wirepas-5g-mesh-gateway/gateway.ts b/wirepas-5g-mesh-gateway/gateway.ts index 72aa035..7b63f94 100644 --- a/wirepas-5g-mesh-gateway/gateway.ts +++ b/wirepas-5g-mesh-gateway/gateway.ts @@ -1,10 +1,18 @@ import { fromEnv } from '@nordicsemiconductor/from-env' import mqtt from 'mqtt' -import { decodePayload, type Wirepas5GMeshNodeEvent } from './decodePayload.js' -import { log } from './log.js' +import { debug, error, log } from './log.js' import { GenericMessage } from './protobuf/ts/generic_message.js' -import { IoTDataPlaneClient } from '@aws-sdk/client-iot-data-plane' -import { DescribeEndpointCommand, IoTClient } from '@aws-sdk/client-iot' +import { + IoTDataPlaneClient, + UpdateThingShadowCommand, +} from '@aws-sdk/client-iot-data-plane' +import { + DescribeEndpointCommand, + IoTClient, + ListThingsCommand, + type ThingAttribute, +} from '@aws-sdk/client-iot' +import { merge } from 'lodash-es' const { region, accessKeyId, secretAccessKey, gatewayEndpoint } = fromEnv({ region: 'GATEWAY_REGION', @@ -13,6 +21,11 @@ const { region, accessKeyId, secretAccessKey, gatewayEndpoint } = fromEnv({ secretAccessKey: 'GATEWAY_AWS_SECRET_ACCESS_KEY', })(process.env) +const stateFlushInterval = parseInt( + process.env.STATE_FLUSH_INTERVAL_SECONDS ?? '60', + 10, +) + const auth = { region, credentials: { @@ -20,19 +33,36 @@ const auth = { secretAccessKey, }, } -const iotClient = async () => +const iotClient = new IoTClient(auth) +const iotDataClient = await (async () => new IoTDataPlaneClient({ ...auth, - endpoint: ( - await new IoTClient(auth).send( - new DescribeEndpointCommand({ - endpointType: 'iot:Data-ATS', - }), - ) - ).endpointAddress, - }) -// TODO: write updates to shadow -void iotClient + endpoint: `https://${ + ( + await new IoTClient(auth).send( + new DescribeEndpointCommand({ + endpointType: 'iot:Data-ATS', + }), + ) + ).endpointAddress + }`, + }))() + +// Find thing for gateway +const thingTypeName = 'wirepas-5g-mesh-gateway' +const { things: gateways } = await iotClient.send( + new ListThingsCommand({ + thingTypeName, + }), +) + +const existingGws = (gateways ?? []).reduce( + (list, gw) => ({ ...list, [gw.thingName as string]: gw }), + {} as Record, +) +Object.keys(existingGws).forEach((gwId) => + debug(`Known gateway things: ${gwId}`), +) const parsedEndpoint = new URL(gatewayEndpoint) log(`Connecting to`, parsedEndpoint.hostname) @@ -53,6 +83,8 @@ client.on('connect', () => { } }) +let nodes: Record> = {} + client.on('message', (_, message) => { const packetReceivedEvent = GenericMessage.fromBinary(message)?.wirepas?.packetReceivedEvent @@ -60,37 +92,56 @@ client.on('message', (_, message) => { const { sourceAddress, rxTimeMsEpoch, - sourceEndpoint, - destinationEndpoint, payload, travelTimeMs, header: { gwId }, + qos, hopCount, } = packetReceivedEvent - // Only handle messages on the 1/1 endpoint - if (sourceEndpoint !== 1 || destinationEndpoint !== 1) return - // Only handle messages with payload if (payload === undefined) return const rxTime = new Date(parseInt(BigInt(rxTimeMsEpoch).toString())) - const decodedPayload = decodePayload(payload) - if (decodedPayload !== null) { - const event: Wirepas5GMeshNodeEvent = { - meshNodeEvent: { - meta: { - node: sourceAddress, - gateway: gwId, - rxTime, - travelTimeMs, - ...(hopCount !== undefined ? { hops: hopCount } : {}), - }, - message: decodedPayload, - }, - } - console.log(JSON.stringify({ event })) - // TODO: write to shadow + if (existingGws[gwId] === undefined) { + error( + `Unknown gateway: ${gwId}! Add a new IoT Thing with the name "${gwId}" and the thing type "${thingTypeName}".`, + ) + return } + + nodes[gwId] = merge( + { + [sourceAddress]: { + travelTimeMs, + ...(hopCount !== undefined ? { hops: hopCount } : {}), + rxTime, + qos, + }, + }, + nodes[gwId], + ) } }) + +// Regularly send buffered updates +setInterval(async () => { + await Promise.all( + Object.entries(nodes).map(async ([gwId, nodes]) => + iotDataClient.send( + new UpdateThingShadowCommand({ + thingName: gwId, + payload: JSON.stringify({ + state: { + reported: { + nodes, + }, + }, + }), + }), + ), + ), + ) + nodes = {} +}, stateFlushInterval * 1000) +debug(`Flushing state every ${stateFlushInterval} seconds`) diff --git a/wirepas-5g-mesh-gateway/log.ts b/wirepas-5g-mesh-gateway/log.ts index 473b15e..0934fff 100644 --- a/wirepas-5g-mesh-gateway/log.ts +++ b/wirepas-5g-mesh-gateway/log.ts @@ -1,4 +1,6 @@ -const ts = () => `[${new Date().toISOString()}]` +import chalk from 'chalk' + +const ts = () => chalk.gray(`[${new Date().toISOString()}]`) export const log = (...args: any[]): void => { console.log(ts(), ...args)