Skip to content

Commit

Permalink
feat(nrplus): trigger scan message periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
coderbyheart committed Oct 30, 2023
1 parent 60e257c commit eadeae7
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 22 deletions.
1 change: 1 addition & 0 deletions cdk/BackendLambdas.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ type BackendLambdas = {
onNewNetworkSurvey: PackedLambda
onNetworkSurveyLocated: PackedLambda
parseSinkMessages: PackedLambda
nrplusGatewayScan: PackedLambda
}
1 change: 1 addition & 0 deletions cdk/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ new BackendApp({
onNewNetworkSurvey: await pack('onNewNetworkSurvey'),
onNetworkSurveyLocated: await pack('onNetworkSurveyLocated'),
parseSinkMessages: await pack('parseSinkMessages'),
nrplusGatewayScan: await pack('nrplusGatewayScan'),
},
layer: await packLayer({
id: 'baseLayer',
Expand Down
91 changes: 69 additions & 22 deletions cdk/resources/NRPlusGateway.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Duration, Stack } from 'aws-cdk-lib'
import IAM from 'aws-cdk-lib/aws-iam'
import Events from 'aws-cdk-lib/aws-events'
import EventsTargets from 'aws-cdk-lib/aws-events-targets'
import Iot from 'aws-cdk-lib/aws-iot'
import Kinesis, { StreamMode } from 'aws-cdk-lib/aws-kinesis'
import Lambda, { StartingPosition } from 'aws-cdk-lib/aws-lambda'
Expand All @@ -16,6 +18,7 @@ export class NRPlusGateway extends Construct {
}: {
lambdaSources: {
parseSinkMessages: PackedLambda
nrplusGatewayScan: PackedLambda
}
},
) {
Expand Down Expand Up @@ -70,36 +73,80 @@ export class NRPlusGateway extends Construct {
},
})

const parseSinkMessages = new Lambda.Function(this, 'lambda', {
handler: lambdaSources.parseSinkMessages.handler,
architecture: Lambda.Architecture.ARM_64,
runtime: Lambda.Runtime.NODEJS_18_X,
timeout: Duration.minutes(15),
memorySize: 1792,
code: Lambda.Code.fromAsset(
lambdaSources.parseSinkMessages.lambdaZipFile,
),
description: 'Parse sink messages',
environment: {
VERSION: this.node.tryGetContext('version'),
const parseSinkMessagesFn = new Lambda.Function(
this,
'parseSinkMessagesFn',
{
handler: lambdaSources.parseSinkMessages.handler,
architecture: Lambda.Architecture.ARM_64,
runtime: Lambda.Runtime.NODEJS_18_X,
timeout: Duration.minutes(15),
memorySize: 1792,
code: Lambda.Code.fromAsset(
lambdaSources.parseSinkMessages.lambdaZipFile,
),
description: 'Parse sink messages',
environment: {
VERSION: this.node.tryGetContext('version'),
},
initialPolicy: [
new IAM.PolicyStatement({
actions: ['iot:UpdateThingShadow'],
resources: ['*'],
}),
],
logRetention: Logs.RetentionDays.ONE_WEEK,
reservedConcurrentExecutions: 1,
},
initialPolicy: [
new IAM.PolicyStatement({
actions: ['iot:UpdateThingShadow'],
resources: ['*'],
}),
],
logRetention: Logs.RetentionDays.ONE_WEEK,
reservedConcurrentExecutions: 1,
})
)

parseSinkMessages.addEventSource(
parseSinkMessagesFn.addEventSource(
new KinesisEventSource(stream, {
startingPosition: StartingPosition.TRIM_HORIZON,
batchSize: 100,
maxBatchingWindow: Duration.seconds(1),
parallelizationFactor: 1,
}),
)

// Trigger scan message periodically
const nrplusGatewayScanFn = new Lambda.Function(
this,
'nrplusGatewayScanFn',
{
handler: lambdaSources.nrplusGatewayScan.handler,
architecture: Lambda.Architecture.ARM_64,
runtime: Lambda.Runtime.NODEJS_18_X,
timeout: Duration.seconds(60),
memorySize: 1792,
code: Lambda.Code.fromAsset(
lambdaSources.nrplusGatewayScan.lambdaZipFile,
),
description:
'Periodically trigger scan in sink to sync with relay, required to communicate reliably with relay and relay-connected clients',
environment: {
VERSION: this.node.tryGetContext('version'),
},
initialPolicy: [
new IAM.PolicyStatement({
actions: ['iot:ListThings', 'iot:Publish'],
resources: ['*'],
}),
],
logRetention: Logs.RetentionDays.ONE_DAY,
},
)

const rule = new Events.Rule(this, 'Rule', {
schedule: Events.Schedule.expression('rate(1 minute)'),
description: `Invoke the summary lambda`,
enabled: true,
targets: [new EventsTargets.LambdaFunction(nrplusGatewayScanFn)],
})

nrplusGatewayScanFn.addPermission('InvokeByEvents', {
principal: new IAM.ServicePrincipal('events.amazonaws.com'),
sourceArn: rule.ruleArn,
})
}
}
35 changes: 35 additions & 0 deletions lambda/nrplusGatewayScan.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { IoTClient, ListThingsCommand } from '@aws-sdk/client-iot'
import {
IoTDataPlaneClient,
PublishCommand,
} from '@aws-sdk/client-iot-data-plane'

const iot = new IoTClient({})
const iotData = new IoTDataPlaneClient({})

/**
* Periodically trigger scan in sink to sync with relay,
* required to communicate reliably with relay and relay-connected clients
*/
export const handler = async (): Promise<void> => {
const { things: gateways } = await iot.send(
new ListThingsCommand({
thingTypeName: 'nrplus-gateway',
}),
)

await Promise.all(
(gateways ?? []).map(async ({ thingName }) => {
const topic = `${thingName}/nrplus-ctrl`
const payload = `dect beacon_scan -c 1667 -f -t 2`
console.log(`>`, topic, JSON.stringify(payload))
return iotData.send(
new PublishCommand({
topic,
payload: Buffer.from(payload, 'hex'),
qos: 1,
}),
)
}),
)
}

0 comments on commit eadeae7

Please sign in to comment.