diff --git a/cdk/BackendLambdas.d.ts b/cdk/BackendLambdas.d.ts index 3a000cf..f23e3b0 100644 --- a/cdk/BackendLambdas.d.ts +++ b/cdk/BackendLambdas.d.ts @@ -11,4 +11,5 @@ type BackendLambdas = { onNewNetworkSurvey: PackedLambda onNetworkSurveyLocated: PackedLambda parseSinkMessages: PackedLambda + nrplusGatewayScan: PackedLambda } diff --git a/cdk/backend.ts b/cdk/backend.ts index 947d45e..34604ac 100644 --- a/cdk/backend.ts +++ b/cdk/backend.ts @@ -46,6 +46,7 @@ new BackendApp({ onNewNetworkSurvey: await pack('onNewNetworkSurvey'), onNetworkSurveyLocated: await pack('onNetworkSurveyLocated'), parseSinkMessages: await pack('parseSinkMessages'), + nrplusGatewayScan: await pack('nrplusGatewayScan'), }, layer: await packLayer({ id: 'baseLayer', diff --git a/cdk/resources/NRPlusGateway.ts b/cdk/resources/NRPlusGateway.ts index 9394ad2..5340d53 100644 --- a/cdk/resources/NRPlusGateway.ts +++ b/cdk/resources/NRPlusGateway.ts @@ -1,5 +1,7 @@ import { Duration, Stack } from 'aws-cdk-lib' import IAM from 'aws-cdk-lib/aws-iam' +import Events from 'aws-cdk-lib/aws-events' +import EventsTargets from 'aws-cdk-lib/aws-events-targets' import Iot from 'aws-cdk-lib/aws-iot' import Kinesis, { StreamMode } from 'aws-cdk-lib/aws-kinesis' import Lambda, { StartingPosition } from 'aws-cdk-lib/aws-lambda' @@ -16,6 +18,7 @@ export class NRPlusGateway extends Construct { }: { lambdaSources: { parseSinkMessages: PackedLambda + nrplusGatewayScan: PackedLambda } }, ) { @@ -70,30 +73,34 @@ export class NRPlusGateway extends Construct { }, }) - const parseSinkMessages = new Lambda.Function(this, 'lambda', { - handler: lambdaSources.parseSinkMessages.handler, - architecture: Lambda.Architecture.ARM_64, - runtime: Lambda.Runtime.NODEJS_18_X, - timeout: Duration.minutes(15), - memorySize: 1792, - code: Lambda.Code.fromAsset( - lambdaSources.parseSinkMessages.lambdaZipFile, - ), - description: 'Parse sink messages', - environment: { - VERSION: this.node.tryGetContext('version'), + const parseSinkMessagesFn = new Lambda.Function( + this, + 'parseSinkMessagesFn', + { + handler: lambdaSources.parseSinkMessages.handler, + architecture: Lambda.Architecture.ARM_64, + runtime: Lambda.Runtime.NODEJS_18_X, + timeout: Duration.minutes(15), + memorySize: 1792, + code: Lambda.Code.fromAsset( + lambdaSources.parseSinkMessages.lambdaZipFile, + ), + description: 'Parse sink messages', + environment: { + VERSION: this.node.tryGetContext('version'), + }, + initialPolicy: [ + new IAM.PolicyStatement({ + actions: ['iot:UpdateThingShadow'], + resources: ['*'], + }), + ], + logRetention: Logs.RetentionDays.ONE_WEEK, + reservedConcurrentExecutions: 1, }, - initialPolicy: [ - new IAM.PolicyStatement({ - actions: ['iot:UpdateThingShadow'], - resources: ['*'], - }), - ], - logRetention: Logs.RetentionDays.ONE_WEEK, - reservedConcurrentExecutions: 1, - }) + ) - parseSinkMessages.addEventSource( + parseSinkMessagesFn.addEventSource( new KinesisEventSource(stream, { startingPosition: StartingPosition.TRIM_HORIZON, batchSize: 100, @@ -101,5 +108,45 @@ export class NRPlusGateway extends Construct { parallelizationFactor: 1, }), ) + + // Trigger scan message periodically + const nrplusGatewayScanFn = new Lambda.Function( + this, + 'nrplusGatewayScanFn', + { + handler: lambdaSources.nrplusGatewayScan.handler, + architecture: Lambda.Architecture.ARM_64, + runtime: Lambda.Runtime.NODEJS_18_X, + timeout: Duration.seconds(60), + memorySize: 1792, + code: Lambda.Code.fromAsset( + lambdaSources.nrplusGatewayScan.lambdaZipFile, + ), + description: + 'Periodically trigger scan in sink to sync with relay, required to communicate reliably with relay and relay-connected clients', + environment: { + VERSION: this.node.tryGetContext('version'), + }, + initialPolicy: [ + new IAM.PolicyStatement({ + actions: ['iot:ListThings', 'iot:Publish'], + resources: ['*'], + }), + ], + logRetention: Logs.RetentionDays.ONE_DAY, + }, + ) + + const rule = new Events.Rule(this, 'Rule', { + schedule: Events.Schedule.expression('rate(1 minute)'), + description: `Invoke the summary lambda`, + enabled: true, + targets: [new EventsTargets.LambdaFunction(nrplusGatewayScanFn)], + }) + + nrplusGatewayScanFn.addPermission('InvokeByEvents', { + principal: new IAM.ServicePrincipal('events.amazonaws.com'), + sourceArn: rule.ruleArn, + }) } } diff --git a/lambda/nrplusGatewayScan.ts b/lambda/nrplusGatewayScan.ts new file mode 100644 index 0000000..14e3c33 --- /dev/null +++ b/lambda/nrplusGatewayScan.ts @@ -0,0 +1,35 @@ +import { IoTClient, ListThingsCommand } from '@aws-sdk/client-iot' +import { + IoTDataPlaneClient, + PublishCommand, +} from '@aws-sdk/client-iot-data-plane' + +const iot = new IoTClient({}) +const iotData = new IoTDataPlaneClient({}) + +/** + * Periodically trigger scan in sink to sync with relay, + * required to communicate reliably with relay and relay-connected clients + */ +export const handler = async (): Promise => { + const { things: gateways } = await iot.send( + new ListThingsCommand({ + thingTypeName: 'nrplus-gateway', + }), + ) + + await Promise.all( + (gateways ?? []).map(async ({ thingName }) => { + const topic = `${thingName}/nrplus-ctrl` + const payload = `dect beacon_scan -c 1667 -f -t 2` + console.log(`>`, topic, JSON.stringify(payload)) + return iotData.send( + new PublishCommand({ + topic, + payload: Buffer.from(payload, 'hex'), + qos: 1, + }), + ) + }), + ) +}