Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consumer): use distributed process supervisors #15

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

noaccOS
Copy link
Contributor

@noaccOS noaccOS commented Aug 21, 2024

Distributed supervisors

Set up distributed registries and supervisors using horde.

Horde was chosen compared to Swarm for its clear distinction between Supervisor and Registry, which is reflected in the code (for the most part it was just a matter of aliasing to Horde's {DynamicSupervisor,Registry}). It is also more actively maintained.

For the clustering, we're still using libcluster just like we would for swarm

Still at this time, no topology was specified in the configuration

Clustering manual test

A manual test was performed to make sure all the components were working correctly.

The cluster_topologies function was rewritten like so

defp cluster_topologies do
  [
    gossip: [
      strategy: Cluster.Strategy.Gossip,
      debug: true,
      config: [
        port: 45892,
        if_addr: "0.0.0.0",
        multicast_addr: "255.255.255.255",
        broadcast_only: true
      ]
    ]
  ]
end

And started the application in two machines

iex --sname n1 -S mix
Node.set_cookie(:my_cookie)

consumer_options = [
  mississippi_config: [
    queues: [events_exchange_name: "", prefix: "pfx_", range_start: 5, range_end: 9, total_count: 10]
  ],
  amqp_consumer_options: [host: "localhost"]
]

Mississippi.Consumer.start_link(consumer_options)
iex --sname n2 -S mix
Node.set_cookie(:my_cookie)

consumer_options = [
  mississippi_config: [
    queues: [events_exchange_name: "", prefix: "pfx_", range_start: 0, range_end: 4, total_count: 10]
  ],
  amqp_consumer_options: [host: "192.168.1.201"]
]

Mississippi.Consumer.start_link(consumer_options)

Then by spawning processes, it was assessed by looking at the pids and the logs that they evenly distributed between the two machines

iex(n1@mayoi)9> Consumer.DataUpdater.get_data_updater_process("p1")
{:ok, #PID<24858.484.0>}
iex(n1@mayoi)10> Consumer.DataUpdater.get_data_updater_process("p2")
Handling data with sharding_key "p2"
{:ok, #PID<0.882.0>}
iex(n1@mayoi)11> Consumer.DataUpdater.get_data_updater_process("p3")
{:ok, #PID<24858.492.0>}
 

put registry names scoped to parent module instead of under the global
`Registry` name

this way
1. registry ids are scoped to their parent module
2. it is consistent with the rest of the code, like `MessageTracker.Supervisor`

Signed-off-by: Francesco Noacco <[email protected]>
Replace Kernel's Registry and DynamicSupervisor modules
with Horde's distributed versions.

We still need to set up clustering with libcluster next.

A fork of horde is used instead of the version from hex because
of a bug where `extra_arguments` is not passed as a parameter to the
processes in a DynamicSupervisor

Signed-off-by: Francesco Noacco <[email protected]>
set up clustering using libcluster

the topologies will be defined in a second iteration

Signed-off-by: Francesco Noacco <[email protected]>
@eddbbt
Copy link

eddbbt commented Aug 26, 2024

looks good, about your fix consider to open a pr to original Horde repo, so we can keep it up to date

@noaccOS
Copy link
Contributor Author

noaccOS commented Aug 26, 2024

looks good, about your fix consider to open a pr to original Horde repo, so we can keep it up to date

it's already been done, although I failed to mention it: derekkraan/horde#274

{Registry, [keys: :unique, name: AMQPDataConsumer.Registry, members: :auto]},
{DataUpdater.Supervisor, message_handler: message_handler},
{DynamicSupervisor, strategy: :one_for_one, name: MessageTracker.Supervisor, members: :auto},
{AMQPDataConsumer.Supervisor, queues_config: queues_config}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AMQPDataConsumer.Supervisor is a plain Elixir Supervisor and so it contains no way of managing horde membership. It should be distributed, though, even if its children are statically determined.
A quick and dirty way of doing that would be using Horde.DynamicSupervisor for the AMQPDataConsumer supervisor too

Annopaolo added a commit to Annopaolo/mississippi that referenced this pull request Sep 20, 2024
Some TODOs. Based on secomind#15.

Signed-off-by: Arnaldo Cesco <[email protected]>
Annopaolo added a commit to Annopaolo/mississippi that referenced this pull request Sep 20, 2024
Some TODOs. Based on secomind#15.

Signed-off-by: Arnaldo Cesco <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants