Skip to content

Commit

Permalink
Blocks multiple concurrent provider starts of the same public key + l…
Browse files Browse the repository at this point in the history
…ink name (#627)

* Blocks multiple concurrent provider starts of the same public key + link name

Signed-off-by: Kevin Hoffman <[email protected]>

* Formatting

Signed-off-by: Kevin Hoffman <[email protected]>

* Lawg

Signed-off-by: Kevin Hoffman <[email protected]>

---------

Signed-off-by: Kevin Hoffman <[email protected]>
  • Loading branch information
autodidaddict authored May 17, 2023
1 parent 608128a commit 2f23479
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 60 deletions.
5 changes: 5 additions & 0 deletions host_core/lib/host_core/control_interface/host_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ defmodule HostCore.ControlInterface.HostServer do
"Successfully started provider #{start_provider_command["provider_ref"]} (#{start_provider_command["link_name"]})"
)

:ignore ->
Logger.debug(
"Provider #{start_provider_command["provider_ref"]} (#{start_provider_command["link_name"]}) ignored - already running"
)

{:error, e} ->
Tracer.set_status(:error, inspect(e))

Expand Down
130 changes: 72 additions & 58 deletions host_core/lib/host_core/providers/provider_module.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,72 +146,86 @@ defmodule HostCore.Providers.ProviderModule do
contract_id: contract_id
)

Logger.info("Starting executable capability provider from '#{path}'",
provider_id: claims.public_key,
link_name: link_name,
contract_id: contract_id
)
if HostCore.Providers.ProviderSupervisor.is_running?(
claims.public_key,
link_name,
host_id
) do
Logger.warn("Provider already running on this host, not starting rejected duplicate",
public_key: claims.public_key,
link_name: link_name,
host_id: host_id
)

:ignore
else
Logger.info("Starting executable capability provider from '#{path}'",
provider_id: claims.public_key,
link_name: link_name,
contract_id: contract_id
)

instance_id = UUID.uuid4()
instance_id = UUID.uuid4()

host_info =
VirtualHost.generate_hostinfo_for_provider(
host_info =
VirtualHost.generate_hostinfo_for_provider(
host_id,
claims.public_key,
link_name,
instance_id,
config_json
)
|> Base.encode64()
|> to_charlist()

port = Port.open({:spawn, "#{path}"}, [:binary, {:env, extract_env_vars()}])
Port.monitor(port)
Port.command(port, "#{host_info}\n")

{:os_pid, pid} = Port.info(port, :os_pid)

# Worth pointing out here that this process doesn't need to subscribe to
# the provider's NATS topic. The provider subscribes to that directly
# when it starts.

HostCore.Claims.Manager.put_claims(host_id, lattice_prefix, claims)

publish_provider_started(
host_id,
claims.public_key,
lattice_prefix,
claims,
link_name,
contract_id,
instance_id,
config_json
oci,
annotations
)
|> Base.encode64()
|> to_charlist()

port = Port.open({:spawn, "#{path}"}, [:binary, {:env, extract_env_vars()}])
Port.monitor(port)
Port.command(port, "#{host_info}\n")

{:os_pid, pid} = Port.info(port, :os_pid)

# Worth pointing out here that this process doesn't need to subscribe to
# the provider's NATS topic. The provider subscribes to that directly
# when it starts.

HostCore.Claims.Manager.put_claims(host_id, lattice_prefix, claims)

publish_provider_started(
host_id,
lattice_prefix,
claims,
link_name,
contract_id,
instance_id,
oci,
annotations
)

if oci != nil && oci != "" do
publish_provider_oci_map(host_id, lattice_prefix, claims.public_key, link_name, oci)
end
if oci != nil && oci != "" do
publish_provider_oci_map(host_id, lattice_prefix, claims.public_key, link_name, oci)
end

Process.send_after(self(), :do_health, 5_000)
:timer.send_interval(@thirty_seconds, self(), :do_health)

{:ok,
%State{
os_port: port,
os_pid: pid,
public_key: claims.public_key,
link_name: link_name,
contract_id: contract_id,
instance_id: instance_id,
shutdown_delay: shutdown_delay,
lattice_prefix: lattice_prefix,
executable_path: path,
annotations: annotations,
host_id: host_id,
# until we prove otherwise
healthy: false,
ociref: oci
}, {:continue, :register_provider}}
Process.send_after(self(), :do_health, 5_000)
:timer.send_interval(@thirty_seconds, self(), :do_health)

{:ok,
%State{
os_port: port,
os_pid: pid,
public_key: claims.public_key,
link_name: link_name,
contract_id: contract_id,
instance_id: instance_id,
shutdown_delay: shutdown_delay,
lattice_prefix: lattice_prefix,
executable_path: path,
annotations: annotations,
host_id: host_id,
# until we prove otherwise
healthy: false,
ociref: oci
}, {:continue, :register_provider}}
end
end

@impl true
Expand Down
4 changes: 2 additions & 2 deletions host_core/lib/host_core/providers/provider_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ defmodule HostCore.Providers.ProviderSupervisor do
end
end

defp is_running?(key, _, _) when byte_size(key) == 0, do: false
def is_running?(key, _, _) when byte_size(key) == 0, do: false

defp is_running?(key, link_name, host_id) do
def is_running?(key, link_name, host_id) do
Enum.any?(
all_providers(host_id),
fn {_pid, public_key, ln, _contract_id, _instance_id} ->
Expand Down

0 comments on commit 2f23479

Please sign in to comment.