From 8a461b898bf53ad8d0994120eddfcd65974ba9a4 Mon Sep 17 00:00:00 2001 From: Jeff Grunewald Date: Mon, 4 Oct 2021 21:02:01 -0400 Subject: [PATCH] fix regression to allow multiple topics per producer supervisor (#98) * fix regression to allow multiple topics per producer supervisor * fix CI setup beam vm action --- .github/workflows/main.yml | 6 ++-- .github/workflows/master.yml | 2 +- .github/workflows/release.yml | 2 +- lib/elsa/supervisor.ex | 23 +++++++++------ mix.exs | 12 ++++---- mix.lock | 22 ++++++++------- test/integration/elsa/producer_test.exs | 28 ++++++++++++++----- .../elsa/dynamic_process_manager_test.exs | 4 +-- 8 files changed, 61 insertions(+), 38 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e5bbd61..a8265ca 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: actions/setup-elixir@v1.2.0 + - uses: erlef/setup-beam@v1 with: otp-version: 21.3 elixir-version: 1.8.2 @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: actions/setup-elixir@v1.2.0 + - uses: erlef/setup-beam@v1 with: otp-version: 21.3 elixir-version: 1.8.2 @@ -43,7 +43,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: actions/setup-elixir@v1.2.0 + - uses: erlef/setup-beam@v1 with: otp-version: 21.3 elixir-version: 1.8.2 diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 4699fbf..4d787ee 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: actions/setup-elixir@v1.2.0 + - uses: erlef/setup-beam@v1 with: otp-version: 21.3 elixir-version: 1.8.2 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f519b87..7cae6e2 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: actions/setup-elixir@v1.2.0 + - uses: erlef/setup-beam@v1 with: otp-version: 21.3 elixir-version: 1.8.2 diff --git a/lib/elsa/supervisor.ex b/lib/elsa/supervisor.ex index fc4f3a5..ffbc690 100644 --- a/lib/elsa/supervisor.ex +++ b/lib/elsa/supervisor.ex @@ -155,7 +155,7 @@ defmodule Elsa.Supervisor do {Elsa.Registry, name: registry}, {DynamicSupervisor, strategy: :one_for_one, name: dynamic_supervisor(registry)}, start_client(args), - producer_spec(registry, Keyword.get(args, :producer, [])), + producer_spec(registry, Keyword.get(args, :producer)), start_group_consumer(connection, registry, Keyword.get(args, :group_consumer)), start_consumer(connection, registry, Keyword.get(args, :consumer)) ] @@ -208,19 +208,26 @@ defmodule Elsa.Supervisor do initializer: {Elsa.Consumer.Worker.Initializer, :init, [consumer_args]}} end - defp producer_spec(registry, args) do - initializer = - case Keyword.take(args, [:topic, :config]) do - [] -> nil - init_args -> {Elsa.Producer.Initializer, :init, [registry, init_args]} - end + defp producer_spec(registry, nil) do + [ + { + Elsa.DynamicProcessManager, + id: :producer_process_manager, + dynamic_supervisor: dynamic_supervisor(registry), + initializer: nil, + poll: false, + name: via_name(registry, :producer_process_manager) + } + ] + end + defp producer_spec(registry, args) do [ { Elsa.DynamicProcessManager, id: :producer_process_manager, dynamic_supervisor: dynamic_supervisor(registry), - initializer: initializer, + initializer: {Elsa.Producer.Initializer, :init, [registry, args]}, poll: Keyword.get(args, :poll, false), name: via_name(registry, :producer_process_manager) } diff --git a/mix.exs b/mix.exs index 0a1a126..156e4f7 100644 --- a/mix.exs +++ b/mix.exs @@ -30,14 +30,14 @@ defmodule Elsa.MixProject do defp deps do [ - {:brod, "~> 3.14"}, + {:brod, "~> 3.14.0"}, {:patiently, "~> 0.2", only: [:dev, :test, :integration]}, - {:divo, "~> 1.1", only: [:dev, :test, :integration], override: true}, - {:divo_kafka, "~> 0.1.0", only: [:dev, :test, :integration]}, - {:placebo, "~> 2.0.0-rc.2", only: [:dev, :test]}, + {:divo, "~> 1.3", only: [:dev, :test, :integration], override: true}, + {:divo_kafka, "~> 0.1.7", only: [:dev, :test, :integration]}, + {:placebo, "~> 2.0", only: [:dev, :test]}, {:checkov, "~> 1.0", only: [:test, :integration]}, - {:ex_doc, "~> 0.22.1", only: [:dev]}, - {:dialyxir, "~> 1.0.0", only: [:dev], runtime: false} + {:ex_doc, "~> 0.25.3", only: [:dev]}, + {:dialyxir, "~> 1.1.0", only: [:dev], runtime: false} ] end diff --git a/mix.lock b/mix.lock index 83f2643..6ba0615 100644 --- a/mix.lock +++ b/mix.lock @@ -2,20 +2,22 @@ "brod": {:hex, :brod, "3.14.0", "f959408e88acd0feca22f6a43ca26e70201c6e5c57dc74b87f08ef65a5e7fe18", [:rebar3], [{:kafka_protocol, "2.3.6", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "a0153437835b810d93e79c7000f55b0c4bd62c281a3224a2615f6aa72b52d484"}, "checkov": {:hex, :checkov, "1.0.0", "cecf1be22ea506b2fbd6741d7c00f4876bb2be76ea1b95493c25b51028f24410", [:mix], [], "hexpm", "9fa85e6fdf1bcec2dd0d996d0c1e5a83e336dafb97c931232af1cb1e7ef4420a"}, "crc32cer": {:hex, :crc32cer, "0.1.4", "a656dff19474d1a1fc5bb0081610ab6b0695b23affc47fa90abeb079a8ef9752", [:rebar3], [], "hexpm", "964735a5422cf65bbc5354860a560fff546f0026f83f8860525bd58ab5bade5d"}, - "dialyxir": {:hex, :dialyxir, "1.0.0", "6a1fa629f7881a9f5aaf3a78f094b2a51a0357c843871b8bc98824e7342d00a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "aeb06588145fac14ca08d8061a142d52753dbc2cf7f0d00fc1013f53f8654654"}, - "divo": {:hex, :divo, "1.1.9", "6f91b0a02bd97800eb9a99abd771b4c9b67d282b67abc223eb2832b93f557b7e", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "b0edcd689089d723802c2d582bab54a77725f673445aa474eea259448910c252"}, - "divo_kafka": {:hex, :divo_kafka, "0.1.6", "dffaa5d419d75e6607b581187347e6fef18b9d06d517a0f7a49772b52f60115f", [:mix], [{:divo, "~> 1.1", [hex: :divo, repo: "hexpm", optional: false]}], "hexpm", "cbc408a8b6593784524b5fee09aae0e9cc58328a174fc6d3f337d9fb34b4bc62"}, + "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, + "divo": {:hex, :divo, "1.3.1", "a7cdb05d4525a9703e11dbcf40567d426b546f8e816b9c9465232c10bc6a257b", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "b3edef7baf068bbf864c01c8f3ed06fcf81c08e8d4adcc1cfc4b7d6eb69c6a18"}, + "divo_kafka": {:hex, :divo_kafka, "0.1.7", "e8253bb735e001c41f35645ac0429740b6b6350ceb0ae268609f769f0b3883c5", [:mix], [{:divo, "~> 1.1", [hex: :divo, repo: "hexpm", optional: false]}], "hexpm", "25f9b89a1f59f6801b8b1e044eaa8cdce4e0756b4a8512458ea31f9c99ec338f"}, "earmark": {:hex, :earmark, "1.4.5", "62ffd3bd7722fb7a7b1ecd2419ea0b458c356e7168c1f5d65caf09b4fbdd13c8", [:mix], [], "hexpm", "b7d0e6263d83dc27141a523467799a685965bf8b13b6743413f19a7079843f4f"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.22.1", "9bb6d51508778193a4ea90fa16eac47f8b67934f33f8271d5e1edec2dc0eee4c", [:mix], [{:earmark, "~> 1.4.0", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "d957de1b75cb9f78d3ee17820733dc4460114d8b1e11f7ee4fd6546e69b1db60"}, - "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"}, + "ex_doc": {:hex, :ex_doc, "0.25.3", "3edf6a0d70a39d2eafde030b8895501b1c93692effcbd21347296c18e47618ce", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "9ebebc2169ec732a38e9e779fd0418c9189b3ca93f4a676c961be6c1527913f5"}, + "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, "kafka_protocol": {:hex, :kafka_protocol, "2.3.6", "df076a8ef49fffae3535c805cb00f3a057ce1895e63398bf8a10569eeeac02f8", [:rebar, :rebar3], [{:crc32cer, "0.1.4", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.5", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "7cb061fe46babc7fd269d2c0e5b4dba5d1efc4f7dacce85b17a9cca973106b23"}, - "makeup": {:hex, :makeup, "1.0.3", "e339e2f766d12e7260e6672dd4047405963c5ec99661abdc432e6ec67d29ef95", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "2e9b4996d11832947731f7608fed7ad2f9443011b3b479ae288011265cdd3dad"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"}, - "meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm", "d34f013c156db51ad57cc556891b9720e6a1c1df5fe2e15af999c84d6cebeb1a"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.6.0", "32111b3bf39137144abd7ba1cce0914533b2d16ef35e8abc5ec8be6122944263", [:mix], [], "hexpm", "27eac315a94909d4dc68bc07a4a83e06c8379237c5ea528a9acff4ca1c873c52"}, + "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, + "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, "patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm", "c08cc5edc27def565647a9b55a0bea8025a5f81a4472e57692f28f2292c44c94"}, - "placebo": {:hex, :placebo, "2.0.0-rc.2", "e148f8b313e75978cdf7ebc762124ec64322d7a092b5d9fdf907d3d6a7b6e0b8", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "46ebd45c7786e92807cd8652e522a236088e71cb43c90a8587edb2a70b8acd5a"}, + "placebo": {:hex, :placebo, "2.0.0", "c0e773dec77e941bcbcc14d10b759f2d66775aff9b75051f3e41939b64300e81", [:mix], [{:meck, "~> 0.9", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "e0872cec8848d7e59ba96396f45ee1ad34662c689c86ba6190694d38b4289844"}, "snappyer": {:hex, :snappyer, "1.2.5", "9154b9ac84031f0a799f72a4aa87df23ab2193b5631475fa2cdc304382d2df77", [:rebar3], [], "hexpm", "d2adc26a81efd5f138397a38a0bb545188d302972721f8be0de37fa452c8aed7"}, "supervisor3": {:hex, :supervisor3, "1.1.11", "d81cdec31d102fde407423e1d05b569572850deebed86b951d5233c387cba80b", [:rebar3], [], "hexpm", "e6c2dedbcabcba24995a218aca12db5e208b80d3252692b22ef0f1a266104b50"}, } diff --git a/test/integration/elsa/producer_test.exs b/test/integration/elsa/producer_test.exs index 1e99826..343e475 100644 --- a/test/integration/elsa/producer_test.exs +++ b/test/integration/elsa/producer_test.exs @@ -13,12 +13,18 @@ defmodule Elsa.ProducerTest do describe "producer managers" do setup do topic = "producer-manager-test" + topic2 = "producer-test-secondary" connection = :elsa_producer_test2 Elsa.create_topic(@brokers, topic) + Elsa.create_topic(@brokers, topic2) {:ok, supervisor} = - Elsa.Supervisor.start_link(endpoints: @brokers, connection: connection, producer: [topic: topic]) + Elsa.Supervisor.start_link( + endpoints: @brokers, + connection: connection, + producer: [[topic: topic], [topic: topic2]] + ) Elsa.Producer.ready?(connection) @@ -26,11 +32,16 @@ defmodule Elsa.ProducerTest do assert_down(supervisor) end) - [connection: connection, topic: topic, registry: Elsa.Supervisor.registry(connection)] + [connection: connection, topics: [topic, topic2], registry: Elsa.Supervisor.registry(connection)] end - test "restarts producers when the client is dropped", %{connection: connection, topic: topic, registry: registry} do + test "restarts producers when the client is dropped", %{ + connection: connection, + topics: [topic, topic2], + registry: registry + } do message = "everything's fine here" + message2 = "also over here" client_pid = Elsa.Registry.whereis_name({registry, :brod_client}) Process.exit(client_pid, :kill) @@ -43,13 +54,16 @@ defmodule Elsa.ProducerTest do ) Producer.produce(connection, topic, message) + Producer.produce(connection, topic2, message2) Patiently.wait_for!( fn -> - case Elsa.fetch(@brokers, topic) do - {:ok, 1, [%Elsa.Message{value: result}]} -> - message == result - + with {:ok, 1, [%Elsa.Message{value: result}]} <- Elsa.fetch(@brokers, topic), + true <- message == result, + {:ok, 1, [%Elsa.Message{value: result2}]} <- Elsa.fetch(@brokers, topic2), + true <- message2 == result2 do + true + else _ -> false end diff --git a/test/unit/elsa/dynamic_process_manager_test.exs b/test/unit/elsa/dynamic_process_manager_test.exs index ef1047b..4550e5d 100644 --- a/test/unit/elsa/dynamic_process_manager_test.exs +++ b/test/unit/elsa/dynamic_process_manager_test.exs @@ -17,7 +17,7 @@ defmodule Elsa.DynamicProcessManagerTest do Process.sleep(1_000) assert 0 == Agent.get(:agent1, fn s -> s end) - assert {:ok, test_server} = Elsa.DynamicProcessManager.start_child(:pm, TestServer) + assert {:ok, _test_server} = Elsa.DynamicProcessManager.start_child(:pm, TestServer) assert "hello" == TestServer.echo(TestServer, "hello") Process.whereis(:dyn_sup) @@ -44,7 +44,7 @@ defmodule Elsa.DynamicProcessManagerTest do Process.sleep(1_000) assert 0 == Agent.get(:agent1, fn s -> s end) - assert {:ok, test_server} = Elsa.DynamicProcessManager.start_child(:pm, TestServer) + assert {:ok, _test_server} = Elsa.DynamicProcessManager.start_child(:pm, TestServer) assert "hello" == TestServer.echo(TestServer, "hello") Process.whereis(:dyn_sup)