Skip to content

Commit

Permalink
Add type information for inbound messages on the DeviceChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
lawik committed Aug 7, 2024
1 parent 6d3b853 commit ca40d3a
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 14 deletions.
2 changes: 1 addition & 1 deletion lib/nerves_hub/deployments.ex
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ defmodule NervesHub.Deployments do
archive_id: deployment.archive_id
}

broadcast(deployment, "archives/updated", payload)
_ = broadcast(deployment, "archives/updated", payload)

description = "deployment #{deployment.name} has a new archive"
AuditLogs.audit!(deployment, deployment, description)
Expand Down
5 changes: 4 additions & 1 deletion lib/nerves_hub/devices/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule NervesHub.Devices.Device do
:connection_types,
:connection_metadata
]
@connection_types [:cellular, :ethernet, :wifi]
@required_params [:org_id, :product_id, :identifier]

schema "devices" do
Expand All @@ -53,7 +54,7 @@ defmodule NervesHub.Devices.Device do
field(:connection_established_at, :utc_datetime)
field(:connection_disconnected_at, :utc_datetime)
field(:connection_last_seen_at, :utc_datetime)
field(:connection_types, {:array, Ecto.Enum}, values: [:cellular, :ethernet, :wifi])
field(:connection_types, {:array, Ecto.Enum}, values: @connection_types)
field(:connecting_code, :string)
field(:connection_metadata, :map, default: %{})

Expand All @@ -68,4 +69,6 @@ defmodule NervesHub.Devices.Device do
|> validate_length(:tags, min: 1)
|> unique_constraint(:identifier)
end

def connection_types, do: @connection_types
end
30 changes: 18 additions & 12 deletions lib/nerves_hub_web/channels/device_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule NervesHubWeb.DeviceChannel do
alias NervesHub.Repo
alias NervesHub.Tracker
alias Phoenix.Socket.Broadcast
alias NervesHubWeb.DeviceChannel.Messages

def join("device", params, %{assigns: %{device: device}} = socket) do
with {:ok, device} <- update_metadata(device, params),
Expand Down Expand Up @@ -411,7 +412,12 @@ defmodule NervesHubWeb.DeviceChannel do
{:noreply, socket}
end

def handle_in("fwup_progress", %{"value" => percent}, %{assigns: %{device: device}} = socket) do
def handle_in(event, params, socket) when is_binary(event) do
{event_atom, parsed_params} = Messages.parse(event, params)
handle_in(event_atom, parsed_params, socket)
end

def handle_in(:fwup_progress, %{percent: percent}, %{assigns: %{device: device}} = socket) do
NervesHubWeb.DeviceEndpoint.broadcast_from!(
self(),
"device:#{device.identifier}:internal",
Expand Down Expand Up @@ -446,7 +452,7 @@ defmodule NervesHubWeb.DeviceChannel do
end
end

def handle_in("location:update", location, %{assigns: %{device: device}} = socket) do
def handle_in(:location_update, location, %{assigns: %{device: device}} = socket) do
metadata = Map.put(device.connection_metadata, "location", location)

{:ok, device} = Devices.update_device(device, %{connection_metadata: metadata})
Expand All @@ -461,17 +467,17 @@ defmodule NervesHubWeb.DeviceChannel do
{:reply, :ok, assign(socket, :device, device)}
end

def handle_in("connection_types", %{"values" => types}, %{assigns: %{device: device}} = socket) do
def handle_in(:connection_types, %{types: types}, %{assigns: %{device: device}} = socket) do
{:ok, device} = Devices.update_device(device, %{"connection_types" => types})
{:noreply, assign(socket, :device, device)}
end

def handle_in("status_update", %{"status" => _status}, socket) do
def handle_in(:status_update, %{}, socket) do
# TODO store in tracker or the database?
{:noreply, socket}
end

def handle_in("check_update_available", _params, socket) do
def handle_in(:check_update_available, _params, socket) do
device =
socket.assigns.device
|> Devices.verify_deployment()
Expand All @@ -485,29 +491,29 @@ defmodule NervesHubWeb.DeviceChannel do
{:reply, {:ok, update_payload}, socket}
end

def handle_in("rebooting", _, socket) do
def handle_in(:rebooting, %{}, socket) do
{:noreply, socket}
end

def handle_in("scripts/run", params, socket) do
if pid = socket.assigns.script_refs[params["ref"]] do
output = Enum.join([params["output"], params["return"]], "\n")
def handle_in(:scripts_run, %{ref: ref, output: output, return: return}, socket) do
if pid = socket.assigns.script_refs[ref] do
output = Enum.join([output, return], "\n")
output = String.trim(output)
send(pid, {:output, output})
end

{:noreply, socket}
end

def handle_in("health_check_report", %{"value" => device_status}, socket) do
def handle_in(:health_check_report, device_status, socket) do
device_meta =
for {key, val} <- Map.from_struct(socket.assigns.device.firmware_metadata),
into: %{},
do: {to_string(key), to_string(val)}
do: {key, to_string(val)}

full_report =
device_status
|> Map.put("metadata", Map.merge(device_status["metadata"], device_meta))
|> Map.put(:metadata, Map.merge(device_status.metadata, device_meta))

device_health = %{"device_id" => socket.assigns.device.id, "data" => full_report}

Expand Down
135 changes: 135 additions & 0 deletions lib/nerves_hub_web/channels/device_channel/messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
defmodule NervesHubWeb.DeviceChannel.Messages do
@moduledoc false
alias NervesHub.Devices.Device

require Logger
@type alarm_id() :: String.t()
@type alarm_description() :: String.t()

@type health_check_report() :: %{
timestamp: DateTime.t(),
metadata: %{String.t() => String.t()},
alarms: %{alarm_id() => alarm_description()},
metrics: %{String.t() => number()},
checks: %{String.t() => %{pass: boolean(), note: String.t()}}
}

@type scripts_run() :: %{
ref: String.t(),
output: String.t(),
return: String.t()
}

@type fwup_progress() :: %{
percent: integer()
}

@type location() :: term()

@type connection_types() :: %{types: list(atom())}

@type status_update() :: map()

@type check_update_available() :: map()

# We parse out messages explicitly to let the compiler help with types and
# to keep track of what we have coming in and out of the system
# They are not structs to reduce the proliferation of modules for what is mostly
# an inbetween layer
# If the role of these definitions grows to much it may make sense to turn them into
# structs.
@spec parse(event :: String.t(), params :: map()) ::
{:fwup_progress, fwup_progress()}
| {:location_update, location()}
| {:connection_types, connection_types()}
| {:status_update, status_update()}
| {:check_update_available, check_update_available()}
| {:health_check_report, health_check_report()}
| {:scripts_run, scripts_run()}
| {:rebooting, map()}
| {:unknown, map()}
def parse(event, params)

def parse("fwup_progress", %{"value" => percent}) do
{:fwup_progress, %{percent: percent}}
end

def parse("location:update", location) do
{:location_update, location}
end

@valid_types Device.connection_types()
def parse("connection_types", %{"values" => types}) do
types =
types
|> Enum.map(fn type ->
try do
String.to_existing_atom(type)
rescue
_ -> nil
end
end)
|> Enum.filter(fn type ->
if type in @valid_types do
true
else
Logger.warning("Received invalid type for connection_types: #{inspect(type)}")
false
end
end)

{:connection_types, %{types: types}}
end

def parse("status_update", %{"status" => _status}) do
{:status_update, %{}}
end

def parse("check_update_available", _params) do
{:check_update_available, %{}}
end

def parse("health_check_report", %{
"value" => %{
"timestamp" => iso_ts,
"metadata" => metadata,
"alarms" => alarms,
"metrics" => metrics,
"checks" => checks
}
}) do
{:ok, ts, _} = DateTime.from_iso8601(iso_ts)

status = %{
timestamp: ts,
metadata: metadata,
alarms: alarms,
metrics: metrics,
checks: to_checks(checks)
}

{:health_check_report, status}
end

def parse("scripts/run", %{"ref" => ref, "output" => output, "return" => return}) do
{:scripts_run, %{ref: ref, output: output, return: return}}
end

def parse("rebooting", _) do
{:rebooting, %{}}
end

def parse(event, params) do
Logger.warning(
"Unmatched incoming event in device channel messages '#{event}' with #{inspect(params)}"
)

{:unknown, params}
end

defp to_checks(checks) do
for {key, %{"pass" => pass, "note" => note}} <- checks, into: %{} do
{key, %{pass: pass, note: note}}
end
end
end

0 comments on commit ca40d3a

Please sign in to comment.