Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pagination #139

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

tonitert
Copy link

This pull request adds support for polling a paginated API based on a page number in the URL's query parameters. As the amount of data being fetched can be very large in my use case, the PR has support for using multiple threads and saving the current page to a file.

An example config can be seen here:

input {
  http_poller {
    urls => {
        example => {
            method => get
            url => "http://localhost:8000/example"
            pagination => {
              start_page => 1
              end_page => 30
              page_parameter => "page"
              concurrent_requests => 4
              last_run_metadata_path => "./last_run_metadata"
            }
            failure_mode => "retry"
            retry_delay => 3
            success_status_codes => [200, 201]
        }
    }
    schedule => { in => "0s"}
    codec => "json"
    keepalive => true
    metadata_target => "http_poller_metadata"
  }
}

This will send requests to
http://localhost:8000/example?page=1
http://localhost:8000/example?page=2
and so on.

Pages can be fetched concurrently on multiple threads at the same time. The page in progress can be saved to a file to restore progress in case of Logstash being stopped or crashing. A file path can be specified, or else the file will be created in the Logstash data directory. It is possible to choose whether to delete the file when the job finishes.

If requests start failing at some point while querying all pages, they can be retried. Other possible failure modes are stopping the input and continuing on error. Success status codes can be specified, so that only certain status code responses are counted as successes.

fix tests by using java sortedset instead of immutable-ruby

on-behalf-of: @insta-advance [email protected]
@axeh
Copy link

axeh commented Oct 12, 2023

@roaksoax any progress on this?

@roaksoax roaksoax requested a review from yaauie January 9, 2024 17:25
Copy link
Contributor

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the effort that went into this. The changes add a lot of functionality, from pagination and failure handling to state tracking across pipeline runs. It's a lot!

I've found a few issues, including a deadlock-inducing race condition, and think that some improvements could be made to the StateHandler abstraction to simplify how the plugin interacts with it. There may be value in giving each top-level request its own request-scoped StateHandler instance, and moving all requests to use Manticore's Client#background, so that the run_once method can run all requests (including paginated requests) to completion in parallel instead of paginated requests needing different logic sprinkled throughout the code.

While paginating over a known quantity of numerically-indexed pages can be useful, the modern practice is for paginated resources to provide a link or token that provides access to the "next" page. Doing so allows the subsequently-requested pages to be stateful continuations of that first page, which prevents elements from being skipped or repeated as the underlying index changes. While actually supporting this type of crawling is beyond the scope of this PR, I'd at least like to ensure that what is delivered here does not prevent future efforts on that front.

Again, thank you for your effort.

request_async(queue, name, request)
opts = request[2]
if !opts.nil? && opts.key?(:pagination)
handle_pagination(queue, name, request.clone)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has a surprising side-effect of making a paginated request block any subsequently-defined requests from being fired until the paginated-request has completed. Under normal operation, this plugin creates a single batch of requests, then executes them all.

As-implemented, only one paginated-request can be running at any given time, since the StateHandler does not scope its records of outstanding pages to the requests by name, is assumed to be operating on one request at a time, requiring its caller to initialize (StateHandler#start_paginated_request) and tear-down (StateHandler#stop_paginated_request) without providing mutual exclusion.

if @stop_writer
break
end
atomic_state_write(file_path, current_page)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to avoid maintaining a separate state-writing thread and simply persist the state each time it changes (e.g., StateHandler#add_page and StateHandler#delete_page)?

Comment on lines +176 to +182
err_msg = "Pagination had a invalid value for concurrent_threads, start_page, end_page, page_parameter, last_run_metadata_path or delete_last_run_metadata!"
raise LogStash::ConfigurationError, err_msg if (!spec[:pagination]["start_page"] || !spec[:pagination]["start_page"].is_a?(Integer)) ||
(!spec[:pagination]["end_page"] || !spec[:pagination]["end_page"].is_a?(Integer)) ||
(!spec[:pagination]["page_parameter"] || !spec[:pagination]["page_parameter"].is_a?(String)) ||
(spec[:pagination]["concurrent_threads"] && !spec[:pagination]["concurrent_threads"].is_a?(Integer)) ||
(spec[:pagination]["last_run_metadata_path"] && !spec[:pagination]["last_run_metadata_path"].is_a?(String)) ||
(spec[:pagination]["delete_last_run_metadata"] && !["true", "false"].include?(spec[:pagination]["delete_last_run_metadata"]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a number of error conditions are all bunched together, it makes it difficult for a user to determine what exactly is wrong with their configuration.

Suggested change
err_msg = "Pagination had a invalid value for concurrent_threads, start_page, end_page, page_parameter, last_run_metadata_path or delete_last_run_metadata!"
raise LogStash::ConfigurationError, err_msg if (!spec[:pagination]["start_page"] || !spec[:pagination]["start_page"].is_a?(Integer)) ||
(!spec[:pagination]["end_page"] || !spec[:pagination]["end_page"].is_a?(Integer)) ||
(!spec[:pagination]["page_parameter"] || !spec[:pagination]["page_parameter"].is_a?(String)) ||
(spec[:pagination]["concurrent_threads"] && !spec[:pagination]["concurrent_threads"].is_a?(Integer)) ||
(spec[:pagination]["last_run_metadata_path"] && !spec[:pagination]["last_run_metadata_path"].is_a?(String)) ||
(spec[:pagination]["delete_last_run_metadata"] && !["true", "false"].include?(spec[:pagination]["delete_last_run_metadata"]))
pagination = spec[:pagination]
raise LogStash::ConfigurationError, "pagination must be a key/value map" unless pagination.kind_of?(Hash)
raise LogStash::ConfigurationError, "pagination[end_page] must be an integer" unless pagination["end_page"]&.kind_of?(Integer)
raise LogStash::ConfigurationError, "pagination[page_parameter] must be a string" unless pagination["page_parameter"]&.kind_of?(String)
raise LogStash::ConfigurationError, "pagination[concurrent_threads] must be a positive integer when provided" if pagination["concurrent_threads"]&.then { |ct| !(ct.kind_of?(Integer) && ct > 0) }
raise LogStash::ConfigurationError, "pagination[last_run_metadata_path] must be a string when provided" if pagination["last_run_metadata_path"]&.then { |lrmp| !lrmp.kind_of?(String) }
raise LogStash::ConfigurationError, "pagination[delete_last_run_metadata] must be either `true` or `false` when provided" if pagination["delete_last_run_metadata"]&.then { |dlrm| !%w(true false).include?(dlrm) }

Comment on lines +109 to +111
while @in_progress_pages.size > max_value && !plugin.stop? do
@pages_signal.wait(@pages_mutex)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

race condition (deadlock):

1: this thread observes @in_progress_pages.size > max_value is observed to be true
2: another thread completes a page and sends ConditionVariable#broadcast
3: this thread sends ConditionVariable#wait, but does not get the signal because it had already been sent; when concurrent_requests is 1, this is always a deadlock.

when "retry"
@logger.warn? && @logger.warn("Encountered request failure with url '%s', trying again after %d seconds.." % [name, req_opts[:retry_delay]])
begin
sleep req_opts[:retry_delay]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to be a stoppable sleep, in order to allow the pipeline to shut down while waiting to retry events.

Suggested change
sleep req_opts[:retry_delay]
Stud.stoppable_sleep(req_opts[:retry_delay]) { stop? }

@@ -219,19 +305,44 @@ def to_nanoseconds(time_diff)

private
def handle_success(queue, name, request, response, execution_time)
failed = check_failure_state(queue, name, request, nil, response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check_failure_state also modifies the request's options to remove transient state about retries

Comment on lines +411 to +412
if req_opts[:pagination]
request_bg(queue, name, request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels like side-channel control; since it has :pagination in its options, we know that the thing calling it is blocking until all work is complete and therefore want to make the retry as a background request ensure that the thread handling the failure doesn't block.

class StateHandler
def initialize(logger, requests)
@logger = logger
@state_path = ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "http_poller", "state")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be scoped to the plugin's id so that two plugins (possibly in different pipelines) that share a request name do not stomp on each other.


private
def atomic_state_write(file_path, current_page)
::File.write(file_path + ".tmp", YAML.dump([@in_progress_pages.to_a, current_page.get]), mode: "w")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This yaml file will end up looking something like the following which isn't very readable:

- - 7
  - 9
  - 11
- 13

I would prefer a file whose contents are self-descriptive:

---
in_progress_pages:
- 7
- 9
- 11
current_page: 12
Suggested change
::File.write(file_path + ".tmp", YAML.dump([@in_progress_pages.to_a, current_page.get]), mode: "w")
::File.write(file_path + ".tmp", YAML.dump({"in_progress_pages" => @in_progress_pages.to_a, "current_page" => current_page.get}), mode: "w")

Comment on lines +231 to +234
current_page.getAndIncrement
@state_handler.add_page(name, current_page)
create_paginated_request(queue, name, request, current_page.get, pagination)
if @state_handler.in_progress_pages.size >= concurrent_requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since our StateHandler is only capable of handling one paginated-request at a time, keeping its "current page" an internal detail would simplify things a bit.

This section also has a race condition with regard to state persistence. Because we increment the current_page before we create the page (which adds it to the StateHandler's in_progress_pages), there is a window in which the state file can be written between the two. Resuming from this state file would skip the current page as it would not be listed as in progress.

@robbavey
Copy link
Contributor

robbavey commented Nov 7, 2024

@tonitert Are you interested in continuing this PR?

@tonitert
Copy link
Author

@tonitert Are you interested in continuing this PR?

Not at the moment, as development for this is paused.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants