From 396fd037bd7a9d6dba542e60f5edba3f7f3bfbe9 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Tue, 24 Sep 2024 15:04:07 -0700 Subject: [PATCH 1/2] Add stored_demand to HydratorProducer --- ...tion_producer.ex => hydration_producer.ex} | 6 +- .../figgy/hydration_producer_test.exs | 57 ++++++++++++------- 2 files changed, 42 insertions(+), 21 deletions(-) rename lib/dpul_collections/indexing_pipeline/figgy/{hyrdation_producer.ex => hydration_producer.ex} (97%) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hyrdation_producer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex similarity index 97% rename from lib/dpul_collections/indexing_pipeline/figgy/hyrdation_producer.ex rename to lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex index 9567e3c8..93f65e52 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hyrdation_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex @@ -17,7 +17,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do last_queried_marker: Figgy.ResourceMarker.t(), pulled_records: [Figgy.ResourceMarker.t()], acked_records: [Figgy.ResourceMarker.t()], - cache_version: Integer + cache_version: Integer, + stored_demand: Integer } def init(cache_version) do last_queried_marker = IndexingPipeline.get_processor_marker!("hydrator", cache_version) @@ -26,7 +27,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do last_queried_marker: last_queried_marker |> Figgy.ResourceMarker.from(), pulled_records: [], acked_records: [], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } {:producer, initial_state} diff --git a/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs index 5863142b..137fb2bb 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs @@ -22,7 +22,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do marker2 ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } assert new_state == expected_state @@ -39,7 +40,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do marker2 ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } {:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state) @@ -56,7 +58,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do marker3 ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } assert new_state == expected_state @@ -79,7 +82,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do fabricated_marker ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } {:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state) @@ -95,7 +99,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do marker3 ], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } assert new_state == expected_state @@ -114,7 +119,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do last_queried_marker: fabricated_marker, pulled_records: [], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } {:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state) @@ -126,7 +132,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do last_queried_marker: fabricated_marker, pulled_records: [], acked_records: [], - cache_version: 0 + cache_version: 0, + stored_demand: 0 } assert new_state == expected_state @@ -144,7 +151,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do marker3 ], acked_records: [], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } acked_markers = @@ -163,7 +171,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do acked_records: [ marker3 ], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } {:noreply, [], new_state} = @@ -187,7 +196,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do last_queried_marker: marker3, pulled_records: [], acked_records: [], - cache_version: cache_version + cache_version: cache_version, + stored_demand: 0 } {:noreply, [], new_state} = @@ -217,7 +227,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do marker3 ], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } acked_markers = @@ -236,7 +247,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do acked_records: [ marker2 ], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -257,7 +269,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do last_queried_marker: marker3, pulled_records: [], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } acked_markers = @@ -270,7 +283,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do last_queried_marker: marker3, pulled_records: [], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -296,7 +310,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do acked_records: [ marker2 ], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } acked_markers = @@ -314,7 +329,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do acked_records: [ marker2 ], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -341,7 +357,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do marker2 ], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } first_ack = @@ -355,7 +372,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do marker2 ], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = @@ -373,7 +391,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do last_queried_marker: marker2, pulled_records: [], acked_records: [], - cache_version: 1 + cache_version: 1, + stored_demand: 0 } {:noreply, [], new_state} = From fb8031ec258bed8c9af5b4130c586e1e25bd6c6b Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Tue, 24 Sep 2024 15:17:12 -0700 Subject: [PATCH 2/2] Add configurable poll interval for Hydrator. --- config/config.exs | 2 + config/test.exs | 3 ++ .../figgy/hydration_producer.ex | 40 +++++++++++++++++-- .../figgy/hydration_producer_test.exs | 7 +++- 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/config/config.exs b/config/config.exs index 45b9e3d2..c7a22718 100644 --- a/config/config.exs +++ b/config/config.exs @@ -61,6 +61,8 @@ config :logger, :console, # Use Jason for JSON parsing in Phoenix config :phoenix, :json_library, Jason +config :dpul_collections, :figgy_hydrator, poll_interval: 60000 + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{config_env()}.exs" diff --git a/config/test.exs b/config/test.exs index e91088b3..e81df83f 100644 --- a/config/test.exs +++ b/config/test.exs @@ -55,3 +55,6 @@ config :dpul_collections, :solr, %{ username: System.get_env("SOLR_USERNAME"), password: System.get_env("SOLR_PASSWORD") } + +# Set this poll interval really small so it triggers in test. +config :dpul_collections, :figgy_hydrator, poll_interval: 50 diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex index 93f65e52..96c0ceb8 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex @@ -40,11 +40,13 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do state = %{ last_queried_marker: last_queried_marker, pulled_records: pulled_records, - acked_records: acked_records + acked_records: acked_records, + stored_demand: stored_demand } - ) - when demand > 0 do - records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, demand) + ) do + total_demand = stored_demand + demand + + records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, total_demand) new_state = state @@ -57,10 +59,40 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do Enum.concat(pulled_records, Enum.map(records, &Figgy.ResourceMarker.from/1)) ) |> Map.put(:acked_records, acked_records) + |> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records))) + + # Set a timer to try fulfilling demand again later + if new_state.stored_demand > 0 do + Process.send_after( + self(), + :check_for_updates, + Application.get_env(:dpul_collections, :figgy_hydrator)[:poll_interval] + ) + end {:noreply, Enum.map(records, &wrap_record/1), new_state} end + defp calculate_stored_demand(total_demand, fulfilled_demand) + when total_demand == fulfilled_demand do + 0 + end + + defp calculate_stored_demand(total_demand, fulfilled_demand) + when total_demand > fulfilled_demand do + total_demand - fulfilled_demand + end + + def handle_info(:check_for_updates, state = %{stored_demand: demand}) + when demand > 0 do + new_demand = 0 + handle_demand(new_demand, state) + end + + def handle_info(:check_for_updates, state) do + {:noreply, [], state} + end + @impl GenStage def handle_info({:ack, :figgy_producer_ack, pending_markers}, state) do messages = [] diff --git a/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs index 137fb2bb..446f3f2f 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs @@ -133,7 +133,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do pulled_records: [], acked_records: [], cache_version: 0, - stored_demand: 0 + stored_demand: 1 } assert new_state == expected_state @@ -405,5 +405,10 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do assert processor_marker == marker2 end + + test ".handle_info(:check_for_updates) with no stored demand" do + assert Figgy.HydrationProducer.handle_info(:check_for_updates, %{stored_demand: 0}) == + {:noreply, [], %{stored_demand: 0}} + end end end