Skip to content

Commit

Permalink
ScaleActor fix, added count to StartActor (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksmtownsend authored Jan 18, 2022
1 parent 1c3bfdf commit 1d06e0e
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 105 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ npm-debug.log

host_config.json
.vscode/
.DS_STORE
64 changes: 34 additions & 30 deletions host_core/lib/host_core/actors/actor_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ defmodule HostCore.Actors.ActorSupervisor do
DynamicSupervisor.init(strategy: :one_for_one)
end

@spec start_actor(binary) ::
:ignore | {:error, any} | {:ok, pid} | {:stop, any} | {:ok, pid, any}
def start_actor(bytes, oci \\ "") when is_binary(bytes) do
@spec start_actor(bytes :: binary(), oci :: String.t(), count :: Integer.t()) ::
{:error, any} | {:ok, [pid()]}
def start_actor(bytes, oci \\ "", count \\ 1) when is_binary(bytes) do
Logger.debug("Start actor request received")

case HostCore.WasmCloud.Native.extract_claims(bytes) do
Expand All @@ -29,10 +29,29 @@ defmodule HostCore.Actors.ActorSupervisor do
{:error,
"Cannot start new instance of #{claims.public_key} from OCI '#{oci}', it is already running with different OCI reference. To upgrade an actor, use live update."}
else
DynamicSupervisor.start_child(
__MODULE__,
{HostCore.Actors.ActorModule, {claims, bytes, oci}}
)
# Start `count` instances of this actor
case 1..count
|> Enum.reduce_while([], fn _count, pids ->
case DynamicSupervisor.start_child(
__MODULE__,
{HostCore.Actors.ActorModule, {claims, bytes, oci}}
) do
{:error, err} ->
{:halt, {:error, "Error: #{err}"}}

{:ok, pid} ->
{:cont, [pid | pids]}

{:ok, pid, _info} ->
{:cont, [pid | pids]}

:ignore ->
{:cont, pids}
end
end) do
{:error, err} -> {:error, err}
pids -> {:ok, pids}
end
end
end
end
Expand All @@ -45,7 +64,7 @@ defmodule HostCore.Actors.ActorSupervisor do
|> length() > 0
end

def start_actor_from_oci(oci) do
def start_actor_from_oci(oci, count \\ 1) do
case HostCore.WasmCloud.Native.get_oci_bytes(
oci,
HostCore.Oci.allow_latest(),
Expand All @@ -56,18 +75,18 @@ defmodule HostCore.Actors.ActorSupervisor do
{:error, err}

{:ok, bytes} ->
start_actor(bytes |> IO.iodata_to_binary(), oci)
start_actor(bytes |> IO.iodata_to_binary(), oci, count)
end
end

def start_actor_from_bindle(bindle_id) do
def start_actor_from_bindle(bindle_id, count \\ 1) do
case HostCore.WasmCloud.Native.get_actor_bindle(String.trim_leading(bindle_id, "bindle://")) do
{:error, err} ->
Logger.error("Failed to download bytes from bindle server for #{bindle_id}")
{:error, err}

{:ok, bytes} ->
start_actor(bytes |> IO.iodata_to_binary(), bindle_id)
start_actor(bytes |> IO.iodata_to_binary(), bindle_id, count)
end
end

Expand Down Expand Up @@ -190,25 +209,10 @@ defmodule HostCore.Actors.ActorSupervisor do

# Current count is less than desired count, start more instances
diff < 0 && ociref != "" ->
case 1..abs(diff)
|> Enum.reduce_while("", fn _, _ ->
res =
if String.starts_with?(ociref, "bindle://") do
start_actor_from_bindle(ociref)
else
start_actor_from_oci(ociref)
end

case res do
{:error, err} ->
{:halt, "Error: #{err}"}

_any ->
{:cont, ""}
end
end) do
"" -> :ok
err -> {:error, err}
if String.starts_with?(ociref, "bindle://") do
start_actor_from_bindle(ociref, abs(diff))
else
start_actor_from_oci(ociref, abs(diff))
end

diff < 0 ->
Expand Down
34 changes: 20 additions & 14 deletions host_core/lib/host_core/control_interface/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule HostCore.ControlInterface.Server do
alias HostCore.CloudEvent

import HostCore.Actors.ActorSupervisor,
only: [start_actor_from_bindle: 1, start_actor_from_oci: 1]
only: [start_actor_from_bindle: 2, start_actor_from_oci: 2]

import HostCore.Providers.ProviderSupervisor,
only: [start_provider_from_bindle: 3, start_provider_from_oci: 3]
Expand Down Expand Up @@ -137,18 +137,20 @@ defmodule HostCore.ControlInterface.Server do
# a queue group

# Launch Actor
# %{"actor_ref" => "wasmcloud.azurecr.io/echo:0.12.0", "host_id" => "Nxxxx"}
# %{"actor_ref" => "bindle://example.com/echo/0.12.0", "host_id" => "Nxxxx"}
# %{"actor_ref" => "wasmcloud.azurecr.io/echo:0.12.0", "host_id" => "Nxxxx", "count" => 3}
# %{"actor_ref" => "bindle://example.com/echo/0.12.0", "host_id" => "Nxxxx", "count" => 4}
defp handle_request({"cmd", _host_id, "la"}, body, _reply_to) do
with {:ok, start_actor_command} <- Jason.decode(body),
true <-
Map.has_key?(start_actor_command, "actor_ref") do
Task.start(fn ->
count = Map.get(start_actor_command, "count", 1)

res =
if String.starts_with?(start_actor_command["actor_ref"], "bindle://") do
start_actor_from_bindle(start_actor_command["actor_ref"])
start_actor_from_bindle(start_actor_command["actor_ref"], count)
else
start_actor_from_oci(start_actor_command["actor_ref"])
start_actor_from_oci(start_actor_command["actor_ref"], count)
end

case res do
Expand Down Expand Up @@ -190,24 +192,28 @@ defmodule HostCore.ControlInterface.Server do
end

# Scale Actor
# input: #{"actor_id" => "...", "actor_ref" => "...", "replicas" => "..."}
# input: #{"actor_id" => "...", "actor_ref" => "...", "count" => 5}
defp handle_request({"cmd", host_id, "scale"}, body, _reply_to) do
with {:ok, scale_request} <- Jason.decode(body),
true <-
["actor_id", "actor_ref", "replicas"]
["actor_id", "actor_ref", "count"]
|> Enum.all?(&Map.has_key?(scale_request, &1)) do
if host_id == HostCore.Host.host_key() do
actor_id = scale_request["actor_id"]
actor_ref = scale_request["actor_ref"]
replicas = String.to_integer(scale_request["replicas"])
count = scale_request["count"]

case HostCore.Actors.ActorSupervisor.scale_actor(actor_id, replicas, actor_ref) do
:ok ->
{:reply, success_ack()}
Task.start(fn ->
case HostCore.Actors.ActorSupervisor.scale_actor(actor_id, count, actor_ref) do
{:error, err} ->
Logger.error("Error scaling actor: #{err}")

{:error, err} ->
{:reply, failure_ack("Error scaling actor: #{err}")}
end
_ ->
:ok
end
end)

{:reply, success_ack()}
else
{:reply, failure_ack("Command received by incorrect host and could not be processed")}
end
Expand Down
2 changes: 1 addition & 1 deletion host_core/mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule HostCore.MixProject do
use Mix.Project

@app_vsn "0.52.0"
@app_vsn "0.52.1"

def project do
[
Expand Down
20 changes: 6 additions & 14 deletions host_core/test/host_core/actors_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ defmodule HostCore.ActorsTest do
test "can load actors", %{:evt_watcher => evt_watcher} do
on_exit(fn -> HostCore.Host.purge() end)
{:ok, bytes} = File.read(@kvcounter_path)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pids} = HostCore.Actors.ActorSupervisor.start_actor(bytes, "", 5)

:ok =
HostCoreTest.EventWatcher.wait_for_event(
Expand Down Expand Up @@ -182,8 +178,8 @@ defmodule HostCore.ActorsTest do
on_exit(fn -> HostCore.Host.purge() end)
:ets.delete(:refmap_table, @echo_oci_reference)
:ets.delete(:refmap_table, @echo_old_oci_reference)
{:ok, pid} = HostCore.Actors.ActorSupervisor.start_actor_from_oci(@echo_oci_reference)
assert Process.alive?(pid)
{:ok, pid} = HostCore.Actors.ActorSupervisor.start_actor_from_oci(@echo_oci_reference, 1)
assert Process.alive?(pid |> List.first())

actor_count =
Map.get(HostCore.Actors.ActorSupervisor.all_actors(), @echo_key)
Expand Down Expand Up @@ -301,8 +297,8 @@ defmodule HostCore.ActorsTest do
:ets.delete(:refmap_table, @echo_oci_reference)
:ets.delete(:refmap_table, @echo_old_oci_reference)

{:ok, pid} = HostCore.Actors.ActorSupervisor.start_actor_from_oci(@echo_oci_reference)
assert Process.alive?(pid)
{:ok, pid} = HostCore.Actors.ActorSupervisor.start_actor_from_oci(@echo_oci_reference, 1)
assert Process.alive?(pid |> List.first())

res = HostCore.Actors.ActorSupervisor.start_actor_from_oci("wasmcloud.azurecr.io/echo:0.3.0")

Expand All @@ -314,11 +310,7 @@ defmodule HostCore.ActorsTest do
test "stop with zero count terminates all", %{:evt_watcher => evt_watcher} do
on_exit(fn -> HostCore.Host.purge() end)
{:ok, bytes} = File.read(@kvcounter_path)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pid} = HostCore.Actors.ActorSupervisor.start_actor(bytes)
{:ok, _pids} = HostCore.Actors.ActorSupervisor.start_actor(bytes, "", 5)

:ok =
HostCoreTest.EventWatcher.wait_for_event(
Expand Down
4 changes: 2 additions & 2 deletions wasmcloud_host/chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ icon: https://github.com/wasmCloud/wasmcloud.com-dev/raw/main/static/images/wasm

type: application

version: 0.3.0
version: 0.3.1

appVersion: "0.52.0"
appVersion: "0.52.1"
15 changes: 3 additions & 12 deletions wasmcloud_host/lib/wasmcloud_host/actor_watcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,9 @@ defmodule WasmcloudHost.ActorWatcher do
end

def start_actor(bytes, replicas) do
case 1..replicas
|> Enum.reduce_while("", fn _, _ ->
case HostCore.Actors.ActorSupervisor.start_actor(bytes) do
{:stop, err} ->
{:halt, "Error: #{err}"}

_any ->
{:cont, ""}
end
end) do
"" -> :ok
msg -> {:error, msg}
case HostCore.Actors.ActorSupervisor.start_actor(bytes, "", replicas) do
{:ok, _pids} -> :ok
{:error, e} -> {:error, e}
end
end
end
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
defmodule WasmcloudHost.Lattice.ControlInterface do
@wasmbus_prefix "wasmbus.ctl."

def scale_actor(actor_id, actor_ref, desired_replicas, host_id) do
def scale_actor(actor_id, actor_ref, desired_count, host_id) do
topic = "#{@wasmbus_prefix}#{HostCore.Host.lattice_prefix()}.cmd.#{host_id}.scale"

payload =
Jason.encode!(%{
"actor_id" => actor_id,
"actor_ref" => actor_ref,
"replicas" => desired_replicas
"count" => desired_count
})

case ctl_request(topic, payload, 2_000) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ defmodule ActorRowComponent do
<button id="scale_actor_button_<%= @actor %>_<%= @host_id %>" class="btn btn-sm btn-warning" data-toggle="tooltip"
data-placement="top" title data-original-title="Scale Actor" phx-click="show_modal"
phx-value-title='Scale "<%= @name %>"' phx-value-component="ScaleActorComponent" phx-value-id="scale_actor_modal"
phx-value-actor="<%= @actor %>" phx-value-host="<%= @host_id %>" phx-value-replicas="<%= @count %>"
phx-value-actor="<%= @actor %>" phx-value-host="<%= @host_id %>" phx-value-count="<%= @count %>"
phx-value-oci="<%= @oci_ref %>">
<svg class="c-icon" style="color: white">
<use xlink:href="/coreui/free.svg#cil-equalizer"></use>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule ScaleActorComponent do
def handle_event(
"scale_actor",
%{
"desired_replicas" => replicas,
"desired_count" => count,
"actor_id" => actor_id,
"actor_ociref" => actor_ref,
"host_id" => host_id
Expand All @@ -24,7 +24,7 @@ defmodule ScaleActorComponent do
case WasmcloudHost.Lattice.ControlInterface.scale_actor(
actor_id,
actor_ref,
replicas,
String.to_integer(count),
host_id
) do
:ok ->
Expand All @@ -46,8 +46,8 @@ defmodule ScaleActorComponent do
<div class="form-group row">
<label class="col-md-3 col-form-label" for="text-input">Replicas</label>
<div class="col-md-9">
<input class="form-control" id="number-input" type="number" name="desired_replicas"
value='<%= Map.get(@modal, "replicas") %>' min="0">
<input class="form-control" id="number-input" type="number" name="desired_count"
value='<%= Map.get(@modal, "count") %>' min="0">
<span class="help-block">Enter how many instances of this actor you want</span>
</div>
</div>
Expand Down
Loading

0 comments on commit 1d06e0e

Please sign in to comment.