Skip to content

Commit

Permalink
feat(wirepas): let syslog take care of log timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
coderbyheart committed Feb 14, 2024
1 parent 04bfa88 commit df8e438
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 65 deletions.
29 changes: 8 additions & 21 deletions wirepas-5g-mesh-gateway/cloudToGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,11 @@ const connect = async ({
privateKey,
deviceId,
mqttEndpoint,
log,
}: {
clientCert: string
privateKey: string
deviceId: string
mqttEndpoint: string

log: {
debug: (...args: any[]) => void
error: (...args: any[]) => void
}
}) =>
new Promise<mqtt.MqttClientConnection>((resolve, reject) => {
const cfg = iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder(
Expand All @@ -42,21 +36,21 @@ const connect = async ({
const client = new mqtt.MqttClient(clientBootstrap)
const connection = client.new_connection(cfg.build())
connection.on('error', (err) => {
log.error(JSON.stringify(err))
console.error(JSON.stringify(err))
reject(err)
})
connection.on('connect', () => {
log.debug(`${deviceId} connected`)
console.debug(`${deviceId} connected`)
resolve(connection)
})
connection.on('disconnect', () => {
log.debug(`${deviceId} disconnected`)
console.debug(`${deviceId} disconnected`)
})
connection.on('closed', () => {
log.debug(`${deviceId} closed`)
console.debug(`${deviceId} closed`)
})
connection.connect().catch(() => {
log.debug(`${deviceId} failed to connect.`)
console.debug(`${deviceId} failed to connect.`)
})
})

Expand All @@ -68,13 +62,7 @@ type Desired = {
}

export const cloudToGateway =
(
iotDataClient: IoTDataPlaneClient,
log: {
debug: (...args: any[]) => void
error: (...args: any[]) => void
},
) =>
(iotDataClient: IoTDataPlaneClient) =>
async (
deviceId: string,
onDesired: (desired: Desired) => Promise<void>,
Expand Down Expand Up @@ -104,7 +92,6 @@ export const cloudToGateway =
privateKey,
deviceId,
mqttEndpoint: 'iot.thingy.rocks',
log,
})
const shadow = new iotshadow.IotShadowClient(connection)

Expand All @@ -115,10 +102,10 @@ export const cloudToGateway =
mqtt.QoS.AtLeastOnce,
async (err, response) => {
if (err !== undefined) {
log.error(err)
console.error(err)
}
const desired = (response?.state ?? {}) as Desired
log.debug(JSON.stringify(desired))
console.debug(JSON.stringify(desired))
await onDesired(desired)
await iotDataClient.send(
new UpdateThingShadowCommand({
Expand Down
61 changes: 30 additions & 31 deletions wirepas-5g-mesh-gateway/gateway.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { fromEnv } from '@nordicsemiconductor/from-env'
import mqtt from 'mqtt'
import { mqtt as awsMqtt } from 'aws-iot-device-sdk-v2'
import { debug, error, log } from './log.js'
import { GenericMessage } from './protobuf/ts/generic_message.js'
import {
IoTDataPlaneClient,
Expand Down Expand Up @@ -78,24 +77,24 @@ const existingGws = (gateways ?? []).reduce(
{} as Record<string, ThingAttribute>,
)
Object.keys(existingGws).forEach((gwId) =>
debug(`Known gateway things: ${gwId}`),
console.debug(`Known gateway things: ${gwId}`),
)

const parsedEndpoint = new URL(gatewayEndpoint)
log(`Connecting to`, parsedEndpoint.hostname)
console.log(`Connecting to`, parsedEndpoint.hostname)

const client = mqtt.connect(gatewayEndpoint)

const topics = ['gw-event/#']

client.on('connect', () => {
log(`Connected.`)
console.log(`Connected.`)
for (const topic of topics) {
client.subscribe(topic, (err, grants) => {
if (err !== null) {
throw err
}
for (const { topic } of grants ?? []) log(`Subscribed to`, topic)
for (const { topic } of grants ?? []) console.log(`Subscribed to`, topic)
})
}
})
Expand Down Expand Up @@ -133,7 +132,7 @@ client.on('message', (_, message) => {

const rxTime = new Date(parseInt(BigInt(rxTimeMsEpoch).toString()))
if (existingGws[gwId] === undefined) {
error(
console.error(
`Unknown gateway: ${gwId}! Add a new IoT Thing with the name "${gwId}" and the thing type "${thingTypeName}".`,
)
return
Expand Down Expand Up @@ -162,12 +161,12 @@ client.on('message', (_, message) => {
payload: ((payload) => {
try {
return decodePayload(payload, (type, pos) => {
debug(`Unknown message type`, type)
debug(Buffer.from(payload).toString('hex'))
debug(' '.repeat(Math.max(0, pos - 1)) + ' ^')
console.debug(`Unknown message type`, type)
console.debug(Buffer.from(payload).toString('hex'))
console.debug(' '.repeat(Math.max(0, pos - 1)) + ' ^')
})
} catch {
debug(
console.debug(
`Failed to decode payload: ${Buffer.from(payload).toString('hex')}`,
)
}
Expand All @@ -185,7 +184,7 @@ setInterval(async () => {
await Promise.all(
Object.entries(nodes).map(async ([gwId, nodes]) => {
Object.entries(nodes).forEach(([nodeId, data]) => {
debug(gwId, nodeId, JSON.stringify(data))
console.debug(gwId, nodeId, JSON.stringify(data))
})

return iotDataClient.send(
Expand All @@ -204,13 +203,13 @@ setInterval(async () => {
)
nodes = {}
}, stateFlushInterval * 1000)
debug(`Flushing state every ${stateFlushInterval} seconds`)
console.debug(`Flushing state every ${stateFlushInterval} seconds`)

// Handle configuration changes
const C2G = chalk.blue.dim('C2G')
const sendToGateway = wirepasPublish({
client,
debug: (...args) => debug(C2G, ...args),
debug: (...args) => console.debug(C2G, ...args),
})
const gwThingConnections: Record<string, awsMqtt.MqttClientConnection> = {}

Expand All @@ -227,25 +226,25 @@ const updateColor = throttle(
)

for (const gwId of Object.keys(existingGws)) {
gwThingConnections[gwId] = await cloudToGateway(iotDataClient, {
debug: (...args) => debug(C2G, ...args),
error: (...args) => error(C2G, ...args),
})(gwId, async (desired) => {
for (const [nodeId, { payload }] of Object.entries(desired.nodes)) {
const node = parseInt(nodeId, 10)
if ('led' in payload && payload.led !== undefined) {
const { r, g, b } = payload.led
const updates = []
if (r !== undefined)
updates.push(updateColor(gwId, node, LED_COLOR.RED, toState(r)))
if (g !== undefined)
updates.push(updateColor(gwId, node, LED_COLOR.GREEN, toState(g)))
if (b !== undefined)
updates.push(updateColor(gwId, node, LED_COLOR.BLUE, toState(b)))
await Promise.all(updates)
gwThingConnections[gwId] = await cloudToGateway(iotDataClient)(
gwId,
async (desired) => {
for (const [nodeId, { payload }] of Object.entries(desired.nodes)) {
const node = parseInt(nodeId, 10)
if ('led' in payload && payload.led !== undefined) {
const { r, g, b } = payload.led
const updates = []
if (r !== undefined)
updates.push(updateColor(gwId, node, LED_COLOR.RED, toState(r)))
if (g !== undefined)
updates.push(updateColor(gwId, node, LED_COLOR.GREEN, toState(g)))
if (b !== undefined)
updates.push(updateColor(gwId, node, LED_COLOR.BLUE, toState(b)))
await Promise.all(updates)
}
}
}
})
},
)
}

const toState = (state: boolean) => (state ? LED_STATE.ON : LED_STATE.OFF)
13 changes: 0 additions & 13 deletions wirepas-5g-mesh-gateway/log.ts

This file was deleted.

0 comments on commit df8e438

Please sign in to comment.