From ca40d3a0586a145b92a419a5718051c4b5b86dd6 Mon Sep 17 00:00:00 2001 From: Lars Wikman Date: Wed, 7 Aug 2024 21:20:53 +0200 Subject: [PATCH] Add type information for inbound messages on the DeviceChannel --- lib/nerves_hub/deployments.ex | 2 +- lib/nerves_hub/devices/device.ex | 5 +- lib/nerves_hub_web/channels/device_channel.ex | 30 ++-- .../channels/device_channel/messages.ex | 135 ++++++++++++++++++ 4 files changed, 158 insertions(+), 14 deletions(-) create mode 100644 lib/nerves_hub_web/channels/device_channel/messages.ex diff --git a/lib/nerves_hub/deployments.ex b/lib/nerves_hub/deployments.ex index cc53ad0f9..f40520ec1 100644 --- a/lib/nerves_hub/deployments.ex +++ b/lib/nerves_hub/deployments.ex @@ -190,7 +190,7 @@ defmodule NervesHub.Deployments do archive_id: deployment.archive_id } - broadcast(deployment, "archives/updated", payload) + _ = broadcast(deployment, "archives/updated", payload) description = "deployment #{deployment.name} has a new archive" AuditLogs.audit!(deployment, deployment, description) diff --git a/lib/nerves_hub/devices/device.ex b/lib/nerves_hub/devices/device.ex index 0093cbb21..ac11fa4ee 100644 --- a/lib/nerves_hub/devices/device.ex +++ b/lib/nerves_hub/devices/device.ex @@ -28,6 +28,7 @@ defmodule NervesHub.Devices.Device do :connection_types, :connection_metadata ] + @connection_types [:cellular, :ethernet, :wifi] @required_params [:org_id, :product_id, :identifier] schema "devices" do @@ -53,7 +54,7 @@ defmodule NervesHub.Devices.Device do field(:connection_established_at, :utc_datetime) field(:connection_disconnected_at, :utc_datetime) field(:connection_last_seen_at, :utc_datetime) - field(:connection_types, {:array, Ecto.Enum}, values: [:cellular, :ethernet, :wifi]) + field(:connection_types, {:array, Ecto.Enum}, values: @connection_types) field(:connecting_code, :string) field(:connection_metadata, :map, default: %{}) @@ -68,4 +69,6 @@ defmodule NervesHub.Devices.Device do |> validate_length(:tags, min: 1) |> unique_constraint(:identifier) end + + def connection_types, do: @connection_types end diff --git a/lib/nerves_hub_web/channels/device_channel.ex b/lib/nerves_hub_web/channels/device_channel.ex index 3152034f7..2fcb77d1b 100644 --- a/lib/nerves_hub_web/channels/device_channel.ex +++ b/lib/nerves_hub_web/channels/device_channel.ex @@ -18,6 +18,7 @@ defmodule NervesHubWeb.DeviceChannel do alias NervesHub.Repo alias NervesHub.Tracker alias Phoenix.Socket.Broadcast + alias NervesHubWeb.DeviceChannel.Messages def join("device", params, %{assigns: %{device: device}} = socket) do with {:ok, device} <- update_metadata(device, params), @@ -411,7 +412,12 @@ defmodule NervesHubWeb.DeviceChannel do {:noreply, socket} end - def handle_in("fwup_progress", %{"value" => percent}, %{assigns: %{device: device}} = socket) do + def handle_in(event, params, socket) when is_binary(event) do + {event_atom, parsed_params} = Messages.parse(event, params) + handle_in(event_atom, parsed_params, socket) + end + + def handle_in(:fwup_progress, %{percent: percent}, %{assigns: %{device: device}} = socket) do NervesHubWeb.DeviceEndpoint.broadcast_from!( self(), "device:#{device.identifier}:internal", @@ -446,7 +452,7 @@ defmodule NervesHubWeb.DeviceChannel do end end - def handle_in("location:update", location, %{assigns: %{device: device}} = socket) do + def handle_in(:location_update, location, %{assigns: %{device: device}} = socket) do metadata = Map.put(device.connection_metadata, "location", location) {:ok, device} = Devices.update_device(device, %{connection_metadata: metadata}) @@ -461,17 +467,17 @@ defmodule NervesHubWeb.DeviceChannel do {:reply, :ok, assign(socket, :device, device)} end - def handle_in("connection_types", %{"values" => types}, %{assigns: %{device: device}} = socket) do + def handle_in(:connection_types, %{types: types}, %{assigns: %{device: device}} = socket) do {:ok, device} = Devices.update_device(device, %{"connection_types" => types}) {:noreply, assign(socket, :device, device)} end - def handle_in("status_update", %{"status" => _status}, socket) do + def handle_in(:status_update, %{}, socket) do # TODO store in tracker or the database? {:noreply, socket} end - def handle_in("check_update_available", _params, socket) do + def handle_in(:check_update_available, _params, socket) do device = socket.assigns.device |> Devices.verify_deployment() @@ -485,13 +491,13 @@ defmodule NervesHubWeb.DeviceChannel do {:reply, {:ok, update_payload}, socket} end - def handle_in("rebooting", _, socket) do + def handle_in(:rebooting, %{}, socket) do {:noreply, socket} end - def handle_in("scripts/run", params, socket) do - if pid = socket.assigns.script_refs[params["ref"]] do - output = Enum.join([params["output"], params["return"]], "\n") + def handle_in(:scripts_run, %{ref: ref, output: output, return: return}, socket) do + if pid = socket.assigns.script_refs[ref] do + output = Enum.join([output, return], "\n") output = String.trim(output) send(pid, {:output, output}) end @@ -499,15 +505,15 @@ defmodule NervesHubWeb.DeviceChannel do {:noreply, socket} end - def handle_in("health_check_report", %{"value" => device_status}, socket) do + def handle_in(:health_check_report, device_status, socket) do device_meta = for {key, val} <- Map.from_struct(socket.assigns.device.firmware_metadata), into: %{}, - do: {to_string(key), to_string(val)} + do: {key, to_string(val)} full_report = device_status - |> Map.put("metadata", Map.merge(device_status["metadata"], device_meta)) + |> Map.put(:metadata, Map.merge(device_status.metadata, device_meta)) device_health = %{"device_id" => socket.assigns.device.id, "data" => full_report} diff --git a/lib/nerves_hub_web/channels/device_channel/messages.ex b/lib/nerves_hub_web/channels/device_channel/messages.ex new file mode 100644 index 000000000..6a7a02a3f --- /dev/null +++ b/lib/nerves_hub_web/channels/device_channel/messages.ex @@ -0,0 +1,135 @@ +defmodule NervesHubWeb.DeviceChannel.Messages do + @moduledoc false + alias NervesHub.Devices.Device + + require Logger + @type alarm_id() :: String.t() + @type alarm_description() :: String.t() + + @type health_check_report() :: %{ + timestamp: DateTime.t(), + metadata: %{String.t() => String.t()}, + alarms: %{alarm_id() => alarm_description()}, + metrics: %{String.t() => number()}, + checks: %{String.t() => %{pass: boolean(), note: String.t()}} + } + + @type scripts_run() :: %{ + ref: String.t(), + output: String.t(), + return: String.t() + } + + @type fwup_progress() :: %{ + percent: integer() + } + + @type location() :: term() + + @type connection_types() :: %{types: list(atom())} + + @type status_update() :: map() + + @type check_update_available() :: map() + + # We parse out messages explicitly to let the compiler help with types and + # to keep track of what we have coming in and out of the system + # They are not structs to reduce the proliferation of modules for what is mostly + # an inbetween layer + # If the role of these definitions grows to much it may make sense to turn them into + # structs. + @spec parse(event :: String.t(), params :: map()) :: + {:fwup_progress, fwup_progress()} + | {:location_update, location()} + | {:connection_types, connection_types()} + | {:status_update, status_update()} + | {:check_update_available, check_update_available()} + | {:health_check_report, health_check_report()} + | {:scripts_run, scripts_run()} + | {:rebooting, map()} + | {:unknown, map()} + def parse(event, params) + + def parse("fwup_progress", %{"value" => percent}) do + {:fwup_progress, %{percent: percent}} + end + + def parse("location:update", location) do + {:location_update, location} + end + + @valid_types Device.connection_types() + def parse("connection_types", %{"values" => types}) do + types = + types + |> Enum.map(fn type -> + try do + String.to_existing_atom(type) + rescue + _ -> nil + end + end) + |> Enum.filter(fn type -> + if type in @valid_types do + true + else + Logger.warning("Received invalid type for connection_types: #{inspect(type)}") + false + end + end) + + {:connection_types, %{types: types}} + end + + def parse("status_update", %{"status" => _status}) do + {:status_update, %{}} + end + + def parse("check_update_available", _params) do + {:check_update_available, %{}} + end + + def parse("health_check_report", %{ + "value" => %{ + "timestamp" => iso_ts, + "metadata" => metadata, + "alarms" => alarms, + "metrics" => metrics, + "checks" => checks + } + }) do + {:ok, ts, _} = DateTime.from_iso8601(iso_ts) + + status = %{ + timestamp: ts, + metadata: metadata, + alarms: alarms, + metrics: metrics, + checks: to_checks(checks) + } + + {:health_check_report, status} + end + + def parse("scripts/run", %{"ref" => ref, "output" => output, "return" => return}) do + {:scripts_run, %{ref: ref, output: output, return: return}} + end + + def parse("rebooting", _) do + {:rebooting, %{}} + end + + def parse(event, params) do + Logger.warning( + "Unmatched incoming event in device channel messages '#{event}' with #{inspect(params)}" + ) + + {:unknown, params} + end + + defp to_checks(checks) do + for {key, %{"pass" => pass, "note" => note}} <- checks, into: %{} do + {key, %{pass: pass, note: note}} + end + end +end