Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Figgy Hydrator Polling #97

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ config :logger, :console,
# Use Jason for JSON parsing in Phoenix
config :phoenix, :json_library, Jason

config :dpul_collections, :figgy_hydrator, poll_interval: 60000

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{config_env()}.exs"
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ config :dpul_collections, :solr, %{
username: System.get_env("SOLR_USERNAME"),
password: System.get_env("SOLR_PASSWORD")
}

# Set this poll interval really small so it triggers in test.
config :dpul_collections, :figgy_hydrator, poll_interval: 50
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
last_queried_marker: Figgy.ResourceMarker.t(),
pulled_records: [Figgy.ResourceMarker.t()],
acked_records: [Figgy.ResourceMarker.t()],
cache_version: Integer
cache_version: Integer,
stored_demand: Integer
}
def init(cache_version) do
last_queried_marker = IndexingPipeline.get_processor_marker!("hydrator", cache_version)
Expand All @@ -26,7 +27,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
last_queried_marker: last_queried_marker |> Figgy.ResourceMarker.from(),
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:producer, initial_state}
Expand All @@ -38,11 +40,13 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
state = %{
last_queried_marker: last_queried_marker,
pulled_records: pulled_records,
acked_records: acked_records
acked_records: acked_records,
stored_demand: stored_demand
}
)
when demand > 0 do
records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, demand)
) do
total_demand = stored_demand + demand

records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, total_demand)

new_state =
state
Expand All @@ -55,10 +59,40 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
Enum.concat(pulled_records, Enum.map(records, &Figgy.ResourceMarker.from/1))
)
|> Map.put(:acked_records, acked_records)
|> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records)))

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
Process.send_after(
self(),
:check_for_updates,
Application.get_env(:dpul_collections, :figgy_hydrator)[:poll_interval]
)
end

{:noreply, Enum.map(records, &wrap_record/1), new_state}
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand == fulfilled_demand do
0
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand > fulfilled_demand do
total_demand - fulfilled_demand
end

def handle_info(:check_for_updates, state = %{stored_demand: demand})
when demand > 0 do
new_demand = 0
handle_demand(new_demand, state)
end

def handle_info(:check_for_updates, state) do
{:noreply, [], state}
end

@impl GenStage
def handle_info({:ack, :figgy_producer_ack, pending_markers}, state) do
messages = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -39,7 +40,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

{:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state)
Expand All @@ -56,7 +58,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -79,7 +82,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
fabricated_marker
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

{:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state)
Expand All @@ -95,7 +99,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -114,7 +119,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: fabricated_marker,
pulled_records: [],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

{:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state)
Expand All @@ -126,7 +132,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: fabricated_marker,
pulled_records: [],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 1
}

assert new_state == expected_state
Expand All @@ -144,7 +151,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

acked_markers =
Expand All @@ -163,7 +171,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker3
],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -187,7 +196,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker3,
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand Down Expand Up @@ -217,7 +227,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

acked_markers =
Expand All @@ -236,7 +247,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker2
],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -257,7 +269,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker3,
pulled_records: [],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

acked_markers =
Expand All @@ -270,7 +283,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker3,
pulled_records: [],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -296,7 +310,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker2
],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

acked_markers =
Expand All @@ -314,7 +329,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker2
],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -341,7 +357,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

first_ack =
Expand All @@ -355,7 +372,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -373,7 +391,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker2,
pulled_records: [],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -386,5 +405,10 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do

assert processor_marker == marker2
end

test ".handle_info(:check_for_updates) with no stored demand" do
assert Figgy.HydrationProducer.handle_info(:check_for_updates, %{stored_demand: 0}) ==
{:noreply, [], %{stored_demand: 0}}
end
end
end
Loading