Skip to content

Commit

Permalink
Merge pull request #9 from mon-territoire/feature-allow_pass_pit_keep…
Browse files Browse the repository at this point in the history
…_alive_for_batch_of_searches

Feature allow pass pit keep alive for batch of searches
  • Loading branch information
inkstak authored Aug 11, 2023
2 parents 5abe2c5 + 12011c3 commit 7a2d7c1
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 33 deletions.
6 changes: 3 additions & 3 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# This configuration was generated by
# `rubocop --auto-gen-config`
# on 2022-12-19 07:05:59 UTC using RuboCop version 1.38.0.
# on 2023-08-11 14:47:38 UTC using RuboCop version 1.52.1.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
# versions of RuboCop, may require this file to be generated again.

# Offense count: 27
# Offense count: 35
# Configuration parameters: CountAsOne.
RSpec/ExampleLength:
Max: 18
Max: 20

# Offense count: 1
# Configuration parameters: AllowSubject.
Expand Down
38 changes: 29 additions & 9 deletions lib/caoutsearch/search/batch/search_after.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@ module Caoutsearch
module Search
module Batch
module SearchAfter
def search_after(keep_alive: "1m", batch_size: 1000, &block)
pit_id = open_point_in_time(keep_alive: keep_alive)
def search_after(pit: nil, keep_alive: nil, batch_size: 1000, &block)
if pit
external_pit = true

warn(<<~MESSAGE) if keep_alive.nil?
A `pit` was passed to batch records without a `keep_alive` argument.
You may need it to extend the PIT on each request.
MESSAGE
end

keep_alive ||= "1m"
pit ||= open_point_in_time(keep_alive: keep_alive)
search = per(batch_size).track_total_hits

request_payload = {
body: search.build.to_h.merge(
pit: {
id: pit_id,
id: pit,
keep_alive: keep_alive
}
)
Expand All @@ -26,7 +36,7 @@ def search_after(keep_alive: "1m", batch_size: 1000, &block)
loop do
requested_at = Time.current

results = instrument(:search_after, pit: pit_id) do |event_payload|
results = instrument(:search_after, pit: pit) do |event_payload|
response = client.search(request_payload)
last_response_time = Time.current

Expand All @@ -40,27 +50,37 @@ def search_after(keep_alive: "1m", batch_size: 1000, &block)

response
rescue Elastic::Transport::Transport::Errors::NotFound => e
raise_enhance_message_when_pit_failed(e, keep_alive, requested_at, last_response_time)
if external_pit && progress.zero?
raise_enhance_message_on_missing_pit(e)
else
raise_enhance_message_on_pit_failure(e, keep_alive, requested_at, last_response_time)
end
end

hits = results["hits"]["hits"]
pit_id = results["pit_id"]
pit = results["pit_id"]
break if hits.empty?

yield hits
break if progress >= total

request_payload[:body].tap do |body|
body[:pit][:id] = pit_id
body[:pit][:id] = pit
body[:search_after] = hits.last["sort"]
body.delete(:track_total_hits)
end
end
ensure
close_point_in_time(pit_id) if pit_id
close_point_in_time(pit) if pit && !external_pit
end

private

def raise_enhance_message_on_missing_pit(error)
raise error.exception "PIT was not found. #{error.message}"
end

def raise_enhance_message_when_pit_failed(error, keep_alive, requested_at, last_response_time)
def raise_enhance_message_on_pit_failure(error, keep_alive, requested_at, last_response_time)
elapsed = (requested_at - last_response_time).round(1).seconds

raise error.exception("PIT registered for #{keep_alive}, #{elapsed.inspect} elapsed between. #{error.message}")
Expand Down
108 changes: 87 additions & 21 deletions spec/caoutsearch/search/batch/search_after_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,32 @@
let(:search_class) { stub_search_class("SampleSearch") }
let(:search) { search_class.new }

let!(:pit) { generate_random_pit }

let(:hits) do
Array.new(12) do |i|
{"_id" => i, "sort" => [i]}
end
end

def generate_random_pit
SecureRandom.base36(664)
end

it "opens a PIT, performs search requests and closes the PIT" do
stubbed_open_pit = stub_elasticsearch_request(:post, "samples/_pit?keep_alive=1m")
.to_return_json(body: {id: "94GY/RZnrjmaRD1vx6qM7w"})
.to_return_json(body: {id: pit})

stubbed_first_search = stub_elasticsearch_request(:post, "_search")
.with(body: {track_total_hits: true, size: 10, pit: {id: "94GY/RZnrjmaRD1vx6qM7w", keep_alive: "1m"}, sort: ["_shard_doc"]})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[0..9]}, pit_id: "94GY/RZnrjmaRD1vx6qM7w"})
.with(body: {track_total_hits: true, size: 10, pit: {id: pit, keep_alive: "1m"}, sort: ["_shard_doc"]})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[0..9]}, pit_id: pit})

stubbed_second_search = stub_elasticsearch_request(:post, "_search")
.with(body: {size: 10, pit: {id: "94GY/RZnrjmaRD1vx6qM7w", keep_alive: "1m"}, sort: ["_shard_doc"], search_after: [9]})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[10..]}, pit_id: "94GY/RZnrjmaRD1vx6qM7w"})
.with(body: {size: 10, pit: {id: pit, keep_alive: "1m"}, sort: ["_shard_doc"], search_after: [9]})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[10..]}, pit_id: pit})

stubbed_close_pit = stub_elasticsearch_request(:delete, "_pit")
.with(body: {id: "94GY/RZnrjmaRD1vx6qM7w"})
.with(body: {id: pit})
.to_return_json(body: {succeed: true})

search.search_after(batch_size: 10) { |_batch| }
Expand All @@ -38,20 +44,24 @@
end
end

it "updates the PIT ID for each requests when it changes" do
it "updates the PIT after it changes on each request" do
stubbed_open_pit = stub_elasticsearch_request(:post, "samples/_pit?keep_alive=1m")
.to_return_json(body: {id: "94GY/RZnrjmaRD1vx6qM7w"})
.to_return_json(body: {id: pit})

another_pit = generate_random_pit

stubbed_first_search = stub_elasticsearch_request(:post, "_search")
.with(body: {track_total_hits: true, size: 10, pit: {id: "94GY/RZnrjmaRD1vx6qM7w", keep_alive: "1m"}, sort: ["_shard_doc"]})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[0..9]}, pit_id: "ikYT/9bwfqz+vvCIHVfkkg"})
.with(body: {track_total_hits: true, size: 10, pit: {id: pit, keep_alive: "1m"}, sort: ["_shard_doc"]})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[0..9]}, pit_id: another_pit})

yet_another_pit = generate_random_pit

stubbed_second_search = stub_elasticsearch_request(:post, "_search")
.with(body: {size: 10, pit: {id: "ikYT/9bwfqz+vvCIHVfkkg", keep_alive: "1m"}, sort: ["_shard_doc"], search_after: [9]})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[10..]}, pit_id: "TlNcKTPu+EwTBIjBwE7TQQ"})
.with(body: {size: 10, pit: {id: another_pit, keep_alive: "1m"}, sort: ["_shard_doc"], search_after: [9]})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[10..]}, pit_id: yet_another_pit})

stubbed_close_pit = stub_elasticsearch_request(:delete, "_pit")
.with(body: {id: "TlNcKTPu+EwTBIjBwE7TQQ"})
.with(body: {id: yet_another_pit})
.to_return_json(body: {succeed: true})

search.search_after(batch_size: 10) { |_batch| }
Expand All @@ -64,30 +74,32 @@
end
end

it "raises an enhanced error message when scroll is expired" do
it "raises an enhanced error message when PIT has expired" do
stub_elasticsearch_request(:post, "samples/_pit?keep_alive=1m")
.to_return_json(body: {id: "94GY/RZnrjmaRD1vx6qM7w"})
.to_return_json(body: {id: pit})

stub_elasticsearch_request(:post, "_search")
.to_return_json(status: 404, body: {error: {type: "search_phase_execution_exception"}})

stub_elasticsearch_request(:delete, "_pit")
.to_return_json(body: {succeed: true})

expect { search.search_after { |_batch| } }
expect {
search.search_after { |_batch| }
}
.to raise_error(Elastic::Transport::Transport::Errors::NotFound)
.with_message(/PIT registered for 1m, .* seconds elapsed between. \[404\] {"error"/)
end

it "closes PIT after when an exception happened" do
stub_elasticsearch_request(:post, "samples/_pit?keep_alive=1m")
.to_return_json(body: {id: "94GY/RZnrjmaRD1vx6qM7w"})
.to_return_json(body: {id: pit})

stub_elasticsearch_request(:post, "_search")
.to_return_json(status: 404, body: {error: {type: "search_phase_execution_exception"}})

stubbed_close_pit = stub_elasticsearch_request(:delete, "_pit")
.with(body: {id: "94GY/RZnrjmaRD1vx6qM7w"})
.with(body: {id: pit})
.to_return_json(body: {succeed: true})

begin
Expand All @@ -98,13 +110,67 @@
expect(stubbed_close_pit).to have_been_requested.once
end

it "uses the given PIT and do not close it" do
stubbed_search = stub_elasticsearch_request(:post, "_search")
.with(body: {track_total_hits: true, size: 10, pit: {id: pit, keep_alive: "10m"}, sort: ["_shard_doc"]})
.to_return_json(body: {hits: {total: {value: 5}, hits: hits[0..4]}, pit_id: pit})

search.search_after(batch_size: 10, pit: pit, keep_alive: "10m") { |_batch| }

aggregate_failures do
expect(stubbed_search).to have_been_requested.once

expect(WebMock).not_to have_requested(:post, "samples/_pit")
expect(WebMock).not_to have_requested(:delete, "_pit")
end
end

it "raises an explicit error when PIT argument is not found by Elasticsearch" do
stub_elasticsearch_request(:post, "_search")
.to_return_json(status: 404, body: {error: {type: "search_context_missing_exception"}})

expect {
search.search_after(pit: pit) { |_batch| }
}
.to raise_error(Elastic::Transport::Transport::Errors::NotFound)
.with_message(/PIT was not found. \[404\]/)
end

it "raises enhanced error messsage when given PIT expired between two batches" do
stub_elasticsearch_request(:post, "_search")
.with(body: {track_total_hits: true, size: 10, pit: {id: pit, keep_alive: "1m"}, sort: ["_shard_doc"]})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[0..9]}, pit_id: pit})

stub_elasticsearch_request(:post, "_search")
.with(body: {size: 10, pit: {id: pit, keep_alive: "1m"}, sort: ["_shard_doc"], search_after: [9]})
.to_return_json(status: 404, body: {error: {type: "search_phase_execution_exception"}})

expect {
search.search_after(batch_size: 10, pit: pit) { |_batch| }
}
.to raise_error(Elastic::Transport::Transport::Errors::NotFound)
.with_message(/PIT registered for 1m, .* seconds elapsed between. \[404\] {"error"/)
end

it "warns about missing keep_alive argument along with PIT argument" do
stub_elasticsearch_request(:post, "_search")
.with(body: {track_total_hits: true, size: 10, pit: {id: pit, keep_alive: "1m"}, sort: ["_shard_doc"]})
.to_return_json(body: {hits: {total: {value: 5}, hits: hits[0..4]}, pit_id: pit})

expect {
search.search_after(batch_size: 10, pit: pit) { |_batch| }
}.to output(
"A `pit` was passed to batch records without a `keep_alive` argument.\nYou may need it to extend the PIT on each request.\n"
).to_stderr
end

it "yields batches of hits" do
stub_elasticsearch_request(:post, "samples/_pit?keep_alive=1m")
.to_return_json(body: {id: "94GY/RZnrjmaRD1vx6qM7w"})
.to_return_json(body: {id: pit})

stub_elasticsearch_request(:post, "_search")
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[0..9]}, pit_id: "94GY/RZnrjmaRD1vx6qM7w"})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[10..]}, pit_id: "94GY/RZnrjmaRD1vx6qM7w"})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[0..9]}, pit_id: pit})
.to_return_json(body: {hits: {total: {value: 12}, hits: hits[10..]}, pit_id: pit})

stub_elasticsearch_request(:delete, "_pit")
.to_return_json(body: {succeed: true})
Expand Down

0 comments on commit 7a2d7c1

Please sign in to comment.