-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for pagination #139
base: main
Are you sure you want to change the base?
Conversation
on-behalf-of: @insta-advance [email protected]
fix tests by using java sortedset instead of immutable-ruby on-behalf-of: @insta-advance [email protected]
@roaksoax any progress on this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate the effort that went into this. The changes add a lot of functionality, from pagination and failure handling to state tracking across pipeline runs. It's a lot!
I've found a few issues, including a deadlock-inducing race condition, and think that some improvements could be made to the StateHandler
abstraction to simplify how the plugin interacts with it. There may be value in giving each top-level request its own request-scoped StateHandler
instance, and moving all requests to use Manticore's Client#background
, so that the run_once
method can run all requests (including paginated requests) to completion in parallel instead of paginated requests needing different logic sprinkled throughout the code.
While paginating over a known quantity of numerically-indexed pages can be useful, the modern practice is for paginated resources to provide a link or token that provides access to the "next" page. Doing so allows the subsequently-requested pages to be stateful continuations of that first page, which prevents elements from being skipped or repeated as the underlying index changes. While actually supporting this type of crawling is beyond the scope of this PR, I'd at least like to ensure that what is delivered here does not prevent future efforts on that front.
Again, thank you for your effort.
request_async(queue, name, request) | ||
opts = request[2] | ||
if !opts.nil? && opts.key?(:pagination) | ||
handle_pagination(queue, name, request.clone) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has a surprising side-effect of making a paginated request block any subsequently-defined requests from being fired until the paginated-request has completed. Under normal operation, this plugin creates a single batch of requests, then executes them all.
As-implemented, only one paginated-request can be running at any given time, since the StateHandler
does not scope its records of outstanding pages to the requests by name, is assumed to be operating on one request at a time, requiring its caller to initialize (StateHandler#start_paginated_request
) and tear-down (StateHandler#stop_paginated_request
) without providing mutual exclusion.
if @stop_writer | ||
break | ||
end | ||
atomic_state_write(file_path, current_page) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to avoid maintaining a separate state-writing thread and simply persist the state each time it changes (e.g., StateHandler#add_page
and StateHandler#delete_page
)?
err_msg = "Pagination had a invalid value for concurrent_threads, start_page, end_page, page_parameter, last_run_metadata_path or delete_last_run_metadata!" | ||
raise LogStash::ConfigurationError, err_msg if (!spec[:pagination]["start_page"] || !spec[:pagination]["start_page"].is_a?(Integer)) || | ||
(!spec[:pagination]["end_page"] || !spec[:pagination]["end_page"].is_a?(Integer)) || | ||
(!spec[:pagination]["page_parameter"] || !spec[:pagination]["page_parameter"].is_a?(String)) || | ||
(spec[:pagination]["concurrent_threads"] && !spec[:pagination]["concurrent_threads"].is_a?(Integer)) || | ||
(spec[:pagination]["last_run_metadata_path"] && !spec[:pagination]["last_run_metadata_path"].is_a?(String)) || | ||
(spec[:pagination]["delete_last_run_metadata"] && !["true", "false"].include?(spec[:pagination]["delete_last_run_metadata"])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a number of error conditions are all bunched together, it makes it difficult for a user to determine what exactly is wrong with their configuration.
err_msg = "Pagination had a invalid value for concurrent_threads, start_page, end_page, page_parameter, last_run_metadata_path or delete_last_run_metadata!" | |
raise LogStash::ConfigurationError, err_msg if (!spec[:pagination]["start_page"] || !spec[:pagination]["start_page"].is_a?(Integer)) || | |
(!spec[:pagination]["end_page"] || !spec[:pagination]["end_page"].is_a?(Integer)) || | |
(!spec[:pagination]["page_parameter"] || !spec[:pagination]["page_parameter"].is_a?(String)) || | |
(spec[:pagination]["concurrent_threads"] && !spec[:pagination]["concurrent_threads"].is_a?(Integer)) || | |
(spec[:pagination]["last_run_metadata_path"] && !spec[:pagination]["last_run_metadata_path"].is_a?(String)) || | |
(spec[:pagination]["delete_last_run_metadata"] && !["true", "false"].include?(spec[:pagination]["delete_last_run_metadata"])) | |
pagination = spec[:pagination] | |
raise LogStash::ConfigurationError, "pagination must be a key/value map" unless pagination.kind_of?(Hash) | |
raise LogStash::ConfigurationError, "pagination[end_page] must be an integer" unless pagination["end_page"]&.kind_of?(Integer) | |
raise LogStash::ConfigurationError, "pagination[page_parameter] must be a string" unless pagination["page_parameter"]&.kind_of?(String) | |
raise LogStash::ConfigurationError, "pagination[concurrent_threads] must be a positive integer when provided" if pagination["concurrent_threads"]&.then { |ct| !(ct.kind_of?(Integer) && ct > 0) } | |
raise LogStash::ConfigurationError, "pagination[last_run_metadata_path] must be a string when provided" if pagination["last_run_metadata_path"]&.then { |lrmp| !lrmp.kind_of?(String) } | |
raise LogStash::ConfigurationError, "pagination[delete_last_run_metadata] must be either `true` or `false` when provided" if pagination["delete_last_run_metadata"]&.then { |dlrm| !%w(true false).include?(dlrm) } |
while @in_progress_pages.size > max_value && !plugin.stop? do | ||
@pages_signal.wait(@pages_mutex) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
race condition (deadlock):
1: this thread observes @in_progress_pages.size > max_value
is observed to be true
2: another thread completes a page and sends ConditionVariable#broadcast
3: this thread sends ConditionVariable#wait
, but does not get the signal because it had already been sent; when concurrent_requests
is 1, this is always a deadlock.
when "retry" | ||
@logger.warn? && @logger.warn("Encountered request failure with url '%s', trying again after %d seconds.." % [name, req_opts[:retry_delay]]) | ||
begin | ||
sleep req_opts[:retry_delay] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will need to be a stoppable sleep, in order to allow the pipeline to shut down while waiting to retry events.
sleep req_opts[:retry_delay] | |
Stud.stoppable_sleep(req_opts[:retry_delay]) { stop? } |
@@ -219,19 +305,44 @@ def to_nanoseconds(time_diff) | |||
|
|||
private | |||
def handle_success(queue, name, request, response, execution_time) | |||
failed = check_failure_state(queue, name, request, nil, response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check_failure_state
also modifies the request
's options to remove transient state about retries
if req_opts[:pagination] | ||
request_bg(queue, name, request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels like side-channel control; since it has :pagination
in its options, we know that the thing calling it is blocking until all work is complete and therefore want to make the retry as a background request ensure that the thread handling the failure doesn't block.
class StateHandler | ||
def initialize(logger, requests) | ||
@logger = logger | ||
@state_path = ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "http_poller", "state") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be scoped to the plugin's id
so that two plugins (possibly in different pipelines) that share a request name do not stomp on each other.
|
||
private | ||
def atomic_state_write(file_path, current_page) | ||
::File.write(file_path + ".tmp", YAML.dump([@in_progress_pages.to_a, current_page.get]), mode: "w") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This yaml file will end up looking something like the following which isn't very readable:
- - 7
- 9
- 11
- 13
I would prefer a file whose contents are self-descriptive:
---
in_progress_pages:
- 7
- 9
- 11
current_page: 12
::File.write(file_path + ".tmp", YAML.dump([@in_progress_pages.to_a, current_page.get]), mode: "w") | |
::File.write(file_path + ".tmp", YAML.dump({"in_progress_pages" => @in_progress_pages.to_a, "current_page" => current_page.get}), mode: "w") |
current_page.getAndIncrement | ||
@state_handler.add_page(name, current_page) | ||
create_paginated_request(queue, name, request, current_page.get, pagination) | ||
if @state_handler.in_progress_pages.size >= concurrent_requests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since our StateHandler
is only capable of handling one paginated-request at a time, keeping its "current page" an internal detail would simplify things a bit.
This section also has a race condition with regard to state persistence. Because we increment the current_page
before we create the page (which adds it to the StateHandler
's in_progress_pages
), there is a window in which the state file can be written between the two. Resuming from this state file would skip the current page as it would not be listed as in progress.
@tonitert Are you interested in continuing this PR? |
Not at the moment, as development for this is paused. |
This pull request adds support for polling a paginated API based on a page number in the URL's query parameters. As the amount of data being fetched can be very large in my use case, the PR has support for using multiple threads and saving the current page to a file.
An example config can be seen here:
This will send requests to
http://localhost:8000/example?page=1
http://localhost:8000/example?page=2
and so on.
Pages can be fetched concurrently on multiple threads at the same time. The page in progress can be saved to a file to restore progress in case of Logstash being stopped or crashing. A file path can be specified, or else the file will be created in the Logstash data directory. It is possible to choose whether to delete the file when the job finishes.
If requests start failing at some point while querying all pages, they can be retried. Other possible failure modes are stopping the input and continuing on error. Success status codes can be specified, so that only certain status code responses are counted as successes.