Skip to content

Commit

Permalink
feat(nrplus): implement NR+ control messages
Browse files Browse the repository at this point in the history
  • Loading branch information
coderbyheart committed Oct 30, 2023
1 parent aa59bde commit 60e257c
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cdk/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ new BackendApp({
lambdaSources: {
publishToWebsocketClients: await pack('publishToWebsocketClients'),
onConnect: await pack('onConnect'),
onMessage: await pack('onMessage', 'lambda/onMessage.handler'),
onMessage: await pack('onMessage'),
onDisconnect: await pack('onDisconnect'),
onCellGeoLocationResolved: await pack('onCellGeoLocationResolved'),
resolveCellLocation: await pack('resolveCellLocation'),
Expand Down
66 changes: 55 additions & 11 deletions cdk/resources/WebsocketAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export class WebsocketAPI extends Construct {
public readonly websocketAPIArn: string
public readonly websocketManagementAPIURL: string
public constructor(
parent: Stack,
parent: Construct,
{
lambdaSources,
baseLayer,
Expand Down Expand Up @@ -63,9 +63,15 @@ export class WebsocketAPI extends Construct {
apiId: api.ref,
})

this.websocketURI = `wss://${api.ref}.execute-api.${parent.region}.amazonaws.com/${stage.ref}`
this.websocketAPIArn = `arn:aws:execute-api:${parent.region}:${parent.account}:${api.ref}/${stage.stageName}/POST/@connections/*`
this.websocketManagementAPIURL = `https://${api.ref}.execute-api.${parent.region}.amazonaws.com/${stage.stageName}`
this.websocketURI = `wss://${api.ref}.execute-api.${
Stack.of(parent).region
}.amazonaws.com/${stage.ref}`
this.websocketAPIArn = `arn:aws:execute-api:${Stack.of(parent).region}:${
Stack.of(parent).account
}:${api.ref}/${stage.stageName}/POST/@connections/*`
this.websocketManagementAPIURL = `https://${api.ref}.execute-api.${
Stack.of(parent).region
}.amazonaws.com/${stage.stageName}`

// Connect
const onConnect = new Lambda.Function(this, 'onConnect', {
Expand Down Expand Up @@ -93,7 +99,11 @@ export class WebsocketAPI extends Construct {
apiId: api.ref,
description: 'Connect integration',
integrationType: 'AWS_PROXY',
integrationUri: `arn:aws:apigateway:${parent.region}:lambda:path/2015-03-31/functions/${onConnect.functionArn}/invocations`,
integrationUri: `arn:aws:apigateway:${
Stack.of(parent).region
}:lambda:path/2015-03-31/functions/${
onConnect.functionArn
}/invocations`,
},
)
const connectRoute = new ApiGateway.CfnRoute(this, 'connectRoute', {
Expand All @@ -120,6 +130,24 @@ export class WebsocketAPI extends Construct {
},
layers: [baseLayer],
logRetention: Logs.RetentionDays.ONE_WEEK,
initialPolicy: [
new IAM.PolicyStatement({
actions: ['iot:Publish'],
resources: [
`arn:aws:iot:${Stack.of(parent).region}:${
Stack.of(parent).account
}:topic/*/nrplus-ctrl`,
],
}),
new IAM.PolicyStatement({
actions: ['iot:DescribeThing'],
resources: [
`arn:aws:iot:${Stack.of(parent).region}:${
Stack.of(parent).account
}:thing/nrplus-gw-*`,
],
}),
],
})
this.connectionsTable.grantReadWriteData(onMessage)

Expand All @@ -130,7 +158,11 @@ export class WebsocketAPI extends Construct {
apiId: api.ref,
description: 'Send messages integration',
integrationType: 'AWS_PROXY',
integrationUri: `arn:aws:apigateway:${parent.region}:lambda:path/2015-03-31/functions/${onMessage.functionArn}/invocations`,
integrationUri: `arn:aws:apigateway:${
Stack.of(parent).region
}:lambda:path/2015-03-31/functions/${
onMessage.functionArn
}/invocations`,
},
)
const sendMessageRoute = new ApiGateway.CfnRoute(this, 'sendMessageRoute', {
Expand Down Expand Up @@ -168,7 +200,11 @@ export class WebsocketAPI extends Construct {
apiId: api.ref,
description: 'Disconnect integration',
integrationType: 'AWS_PROXY',
integrationUri: `arn:aws:apigateway:${parent.region}:lambda:path/2015-03-31/functions/${onDisconnect.functionArn}/invocations`,
integrationUri: `arn:aws:apigateway:${
Stack.of(parent).region
}:lambda:path/2015-03-31/functions/${
onDisconnect.functionArn
}/invocations`,
},
)
const disconnectRoute = new ApiGateway.CfnRoute(this, 'disconnectRoute', {
Expand All @@ -184,19 +220,25 @@ export class WebsocketAPI extends Construct {
principal: new IAM.ServicePrincipal(
'apigateway.amazonaws.com',
) as IAM.IPrincipal,
sourceArn: `arn:aws:execute-api:${parent.region}:${parent.account}:${api.ref}/${stage.stageName}/sendmessage`,
sourceArn: `arn:aws:execute-api:${Stack.of(parent).region}:${
Stack.of(parent).account
}:${api.ref}/${stage.stageName}/sendmessage`,
})
onConnect.addPermission('invokeByAPI', {
principal: new IAM.ServicePrincipal(
'apigateway.amazonaws.com',
) as IAM.IPrincipal,
sourceArn: `arn:aws:execute-api:${parent.region}:${parent.account}:${api.ref}/${stage.stageName}/$connect`,
sourceArn: `arn:aws:execute-api:${Stack.of(parent).region}:${
Stack.of(parent).account
}:${api.ref}/${stage.stageName}/$connect`,
})
onDisconnect.addPermission('invokeByAPI', {
principal: new IAM.ServicePrincipal(
'apigateway.amazonaws.com',
) as IAM.IPrincipal,
sourceArn: `arn:aws:execute-api:${parent.region}:${parent.account}:${api.ref}/${stage.stageName}/$disconnect`,
sourceArn: `arn:aws:execute-api:${Stack.of(parent).region}:${
Stack.of(parent).account
}:${api.ref}/${stage.stageName}/$disconnect`,
})

// Publish events
Expand Down Expand Up @@ -248,7 +290,9 @@ export class WebsocketAPI extends Construct {
new IAM.PolicyStatement({
actions: ['iot:Publish'],
resources: [
`arn:aws:iot:${parent.region}:${parent.account}:topic/errors`,
`arn:aws:iot:${Stack.of(parent).region}:${
Stack.of(parent).account
}:topic/errors`,
],
}),
],
Expand Down
111 changes: 90 additions & 21 deletions lambda/onMessage.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,36 @@
import { DynamoDBClient, UpdateItemCommand } from '@aws-sdk/client-dynamodb'
import { fromEnv } from '@nordicsemiconductor/from-env'
import { Type } from '@sinclair/typebox'
import type {
APIGatewayProxyStructuredResultV2,
APIGatewayProxyWebsocketEventV2,
} from 'aws-lambda'
import { validateWithTypeBox } from './validateWithTypeBox.js'
import {
IoTDataPlaneClient,
PublishCommand,
} from '@aws-sdk/client-iot-data-plane'
import { DescribeThingCommand, IoTClient } from '@aws-sdk/client-iot'

const db = new DynamoDBClient({})

const { TableName } = fromEnv({
TableName: 'CONNECTIONS_TABLE_NAME',
websocketManagementAPIURL: 'WEBSOCKET_MANAGEMENT_API_URL',
})(process.env)

const message = Type.Object({
message: Type.Literal('sendmessage'),
data: Type.Object({
deviceId: Type.String({ minLength: 1 }),
code: Type.String({ minLength: 1 }),
nrplusCtrl: Type.String({ minLength: 1 }),
}),
})
const validateMessage = validateWithTypeBox(message)

const iotData = new IoTDataPlaneClient({})
const iot = new IoTClient({})

export const handler = async (
event: APIGatewayProxyWebsocketEventV2,
): Promise<APIGatewayProxyStructuredResultV2> => {
Expand All @@ -21,28 +40,78 @@ export const handler = async (
}),
)

await db.send(
new UpdateItemCommand({
TableName,
Key: {
connectionId: {
S: event.requestContext.connectionId,
let message: Record<string, any> | undefined = undefined
try {
message = JSON.parse(event.body ?? '{}')
} catch (err) {
console.error(`Failed to parse message as JSON.`)
}

if (message === undefined) {
console.error(`No message provided.`)
return {
statusCode: 400,
}
}
if (message.data === 'PING') {
await db.send(
new UpdateItemCommand({
TableName,
Key: {
connectionId: {
S: event.requestContext.connectionId,
},
},
},
UpdateExpression: 'SET #lastSeen = :lastSeen',
ExpressionAttributeNames: {
'#lastSeen': 'lastSeen',
},
ExpressionAttributeValues: {
':lastSeen': {
S: new Date().toISOString(),
UpdateExpression: 'SET #lastSeen = :lastSeen',
ExpressionAttributeNames: {
'#lastSeen': 'lastSeen',
},
},
}),
)
ExpressionAttributeValues: {
':lastSeen': {
S: new Date().toISOString(),
},
},
}),
)

return {
statusCode: 200,
}
}

return {
statusCode: 200,
body: `Got your message, ${event.requestContext.connectionId}!`,
const maybeValidMessage = validateMessage(message)
if ('errors' in maybeValidMessage) {
console.error(
`Failed to validate message: ${JSON.stringify(maybeValidMessage.errors)}`,
)
return {
statusCode: 400,
}
} else {
const { deviceId, code, nrplusCtrl } = maybeValidMessage.value.data
const attributes = (
await iot.send(new DescribeThingCommand({ thingName: deviceId }))
).attributes
if (
attributes === undefined ||
!('code' in attributes) ||
attributes.code !== code
) {
return {
statusCode: 403,
body: `Code ${code} not valid for device ${deviceId}!`,
}
}
await iotData.send(
new PublishCommand({
topic: `${deviceId}/nrplus-ctrl`,
payload: Buffer.from(nrplusCtrl, 'hex'),
qos: 1,
}),
)
console.log(`>`, `${deviceId}/nrplus-ctrl`, nrplusCtrl)
return {
statusCode: 202,
}
}
}

0 comments on commit 60e257c

Please sign in to comment.