Skip to content

Commit

Permalink
feat(websocket): send LwM2M shadows on request
Browse files Browse the repository at this point in the history
  • Loading branch information
coderbyheart committed Nov 5, 2023
1 parent fa347af commit 05f6362
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 40 deletions.
9 changes: 9 additions & 0 deletions cdk/resources/WebsocketAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export class WebsocketAPI extends Construct {
environment: {
VERSION: this.node.tryGetContext('version'),
CONNECTIONS_TABLE_NAME: this.connectionsTable.tableName,
WEBSOCKET_MANAGEMENT_API_URL: this.websocketManagementAPIURL,
},
layers: [baseLayer],
logRetention: Logs.RetentionDays.ONE_WEEK,
Expand All @@ -147,6 +148,14 @@ export class WebsocketAPI extends Construct {
}:thing/nrplus-gw-*`,
],
}),
new IAM.PolicyStatement({
actions: ['execute-api:ManageConnections'],
resources: [this.websocketAPIArn],
}),
new IAM.PolicyStatement({
actions: ['iot:GetThingShadow', 'iot:ListThings'],
resources: ['*'],
}),
],
})
this.connectionsTable.grantReadWriteData(onMessage)
Expand Down
77 changes: 44 additions & 33 deletions lambda/notifyClients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ export type CellGeoLocationEvent = {

export type Event = DeviceEvent | CellGeoLocationEvent

export const notifyClients =
(
{
db,
connectionsTableName,
apiGwManagementClient,
}: {
db: DynamoDBClient
connectionsTableName: string
apiGwManagementClient: ApiGatewayManagementApiClient
},
dropMessage = false,
) =>
async (event: Event): Promise<void> => {
export const notifyClients = (
{
db,
connectionsTableName,
apiGwManagementClient,
}: {
db: DynamoDBClient
connectionsTableName: string
apiGwManagementClient: ApiGatewayManagementApiClient
},
dropMessage = false,
): ((event: Event) => Promise<void>) => {
const send = sendEvent(apiGwManagementClient)
return async (event: Event): Promise<void> => {
console.log(
JSON.stringify({
event,
Expand All @@ -95,19 +95,7 @@ export const notifyClients =
const context = getEventContext(event)
if (context === null)
throw new Error(`Unknown event: ${JSON.stringify(event)}`)
console.log(`Notifying client`, connectionId)

await apiGwManagementClient.send(
new PostToConnectionCommand({
ConnectionId: connectionId,
Data: Buffer.from(
JSON.stringify({
'@context': context,
...event,
}),
),
}),
)
await send(connectionId, event, context)
} catch (err) {
if ((err as Error).name === 'GoneException') {
console.log(`Client is gone`, connectionId)
Expand All @@ -127,14 +115,37 @@ export const notifyClients =
}
}
}
}

export const sendEvent =
(client: ApiGatewayManagementApiClient) =>
async (
connectionId: string,
event: Record<string, unknown>,
context: URL,
): Promise<void> => {
console.log(`Notifying client`, connectionId)
await client.send(
new PostToConnectionCommand({
ConnectionId: connectionId,
Data: Buffer.from(
JSON.stringify({
'@context': context.toString(),
...event,
}),
),
}),
)
}

const getEventContext = (event: Event): string | null => {
if ('reported' in event) return 'https://thingy.rocks/device-shadow'
if ('message' in event) return 'https://thingy.rocks/device-message'
if ('location' in event) return 'https://thingy.rocks/device-location'
if ('history' in event) return 'https://thingy.rocks/device-history'
const getEventContext = (event: Event): URL | null => {
if ('reported' in event) return new URL('https://thingy.rocks/device-shadow')
if ('message' in event) return new URL('https://thingy.rocks/device-message')
if ('location' in event)
return new URL('https://thingy.rocks/device-location')
if ('history' in event) return new URL('https://thingy.rocks/device-history')
if ('cellGeoLocation' in event)
return 'https://thingy.rocks/cell-geo-location'
return new URL('https://thingy.rocks/cell-geo-location')
return null
}

Expand Down
7 changes: 4 additions & 3 deletions lambda/onConnect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import type {
APIGatewayProxyStructuredResultV2,
APIGatewayProxyWebsocketEventV2,
} from 'aws-lambda'
const { TableName } = fromEnv({ TableName: 'CONNECTIONS_TABLE_NAME' })(
process.env,
)

const { TableName } = fromEnv({
TableName: 'CONNECTIONS_TABLE_NAME',
})(process.env)

const db = new DynamoDBClient({})

Expand Down
61 changes: 57 additions & 4 deletions lambda/onMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@ import type {
} from 'aws-lambda'
import { validateWithTypeBox } from './validateWithTypeBox.js'
import {
GetThingShadowCommand,
IoTDataPlaneClient,
PublishCommand,
} from '@aws-sdk/client-iot-data-plane'
import { DescribeThingCommand, IoTClient } from '@aws-sdk/client-iot'

const db = new DynamoDBClient({})
import {
DescribeThingCommand,
IoTClient,
ListThingsCommand,
} from '@aws-sdk/client-iot'
import { ApiGatewayManagementApi } from '@aws-sdk/client-apigatewaymanagementapi'
import { sendEvent } from './notifyClients.js'
import type { LwM2MObject } from '@hello.nrfcloud.com/proto-lwm2m'
import { shadowToObjects } from '../lwm2m/shadowToObjects.js'

const { TableName } = fromEnv({
const { TableName, websocketManagementAPIURL } = fromEnv({
TableName: 'CONNECTIONS_TABLE_NAME',
websocketManagementAPIURL: 'WEBSOCKET_MANAGEMENT_API_URL',
})(process.env)

const message = Type.Object({
Expand All @@ -30,6 +38,14 @@ const validateMessage = validateWithTypeBox(message)

const iotData = new IoTDataPlaneClient({})
const iot = new IoTClient({})
const db = new DynamoDBClient({})

const apiGwManagementClient = new ApiGatewayManagementApi({
endpoint: websocketManagementAPIURL,
})

const send = sendEvent(apiGwManagementClient)
const decoder = new TextDecoder()

export const handler = async (
event: APIGatewayProxyWebsocketEventV2,
Expand Down Expand Up @@ -79,6 +95,43 @@ export const handler = async (
}
}

if (message.data === 'LWM2M-shadows') {
// Publish LwM2M shadows
const { things } = await iot.send(
new ListThingsCommand({
maxResults: 250,
thingTypeName: '',
}),
)
const shadows = (
await Promise.all<{
deviceId: string
shadow: LwM2MObject[]
}>(
(things ?? []).map(async ({ thingName }) =>
iotData
.send(new GetThingShadowCommand({ thingName, shadowName: 'lwm2m' }))
.then(async ({ payload }) => ({
deviceId: thingName as string,
shadow: shadowToObjects(
JSON.parse(decoder.decode(payload)).state.reported,
),
}))
.catch(() => ({
deviceId: thingName as string,
shadow: [],
})),
),
)
).filter(({ shadow }) => shadow.length > 0)

await send(
event.requestContext.connectionId,
{ shadows },
new URL('https://thingy.rocks/lwm2m-shadows'),
).catch(console.error)
}

const maybeValidMessage = validateMessage(message)
if ('errors' in maybeValidMessage) {
console.error(
Expand Down

0 comments on commit 05f6362

Please sign in to comment.