Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for paginated response from API #88

Open
AkshatJindal1 opened this issue Jan 4, 2021 · 6 comments · May be fixed by #90
Open

Support for paginated response from API #88

AkshatJindal1 opened this issue Jan 4, 2021 · 6 comments · May be fixed by #90
Assignees
Labels
enhancement New feature or request in progress

Comments

@AkshatJindal1
Copy link
Contributor

Is your feature request related to a problem? Please describe.
Recently I have been working on a project that requires the HTTP source connector. But the API returns only a set number of response as an array and provide a URL at the end to access the next set of responses. So if we can have a way to support calling the next page URL till we don't have any new record, it will solve the problem

Describe the solution you'd like
I would like to work on the problem, by adding a way to extract the next page URL till it exists or till no new records are present and keep pushing the records per page to Kafka.

Describe alternatives you've considered
I have also tried to collect all the messages and push them at once till no new page URL is present, but that will lead to a lot of memory issues if the number of pages is very large.

@castorm
Copy link
Owner

castorm commented Jan 4, 2021

Hi @AkshatJindal1,

Thanks for submitting your request.

Do you intend to work on it yourself? If so, do you need any sort of direction? Is there any way I can help? It would probably be beneficial to both to discuss a possible solution for it before jumping to implementation.

Best regards.

@castorm castorm added awaiting feedback enhancement New feature or request and removed prioritization labels Jan 4, 2021
@AkshatJindal1
Copy link
Contributor Author

AkshatJindal1 commented Jan 4, 2021

Hi @castorm it will be better if we can discuss ways to implement it. For my use case, the response that I am getting is as follows:

{
    "d": {
        "results": [
            { "response data ......." }
         ],
        "__next": "complete next page url"
     }
}

And for another set of URL, the response is:

{
    "d": {
        "results": [
            { "response data ......." }
         ],
        next": "query parameter for next page URL"    // for this case base URL is same, we only need to modify the query 
                                                                                         parameter with what we get here. 
     }
}

My understanding is that I fetch the records in ascending order of the DateTime field, and keep making the call till I get the latest record or the next page URL is null or not present. Please let me know your views. If you can provide some guidance, I can work on this use case.

@castorm
Copy link
Owner

castorm commented Jan 4, 2021

Hi @AkshatJindal1,

I've been meaning for a while to refactor an area that might help with this, and I just took the opportunity to do it.

The way I see this fitting into the existing design is by extending the record's offset with some sort of per-page offset.

As you know, offset is what the connector uses to track progress (allowing for instance to continue where it left after a failure or a restart), and up until now, it pointed to the last record read from the source.

The conceptual shift required to support pagination is essentially moving the granularity for representing that progress one level up, to the level of page (defined as a response containing a subset of records).

This could be achieved by reading per-page offset here:
https://github.com/castorm/kafka-connect-http/blob/master/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java#L73

In a similar way we do for per-record offset here:
https://github.com/castorm/kafka-connect-http/blob/master/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java#L87

And the code as-is would be already prepared to merge the both:
https://github.com/castorm/kafka-connect-http/blob/master/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java#L80

In essence, we add a new property to take some offset from the page in addition to the offset we take from the records:

    "http.response.response.offset.pointer": "next=/d/next",

We define our request based on such offset:

    "http.request.url": "http://server/rest/api",
    "http.request.params": "next=${offset.next}",

We make sure the first request has a value for such parameter:

    "http.offset.initial": "next=whatever",

Challenges

  • This, unfortunately, forces delivery guarantee semantics of "at-most-once': In case of a fault while processing the current page, if at least one record gets published with an offset pointing to the next page, after restarting, the connector will go straight for the next page even though there were unpublished/missing records from the previous one.
  • This source connector was designed to work with infinite (incrementally growing) sources of data, if there is a chance we'll reach a final page where no next is provided, the best thing the connector can do is fail. Would that be good enough?
  • This is more of a warning: even though progress will be managed via per-page offset instead of per-record offset, the connector still uses record's key/timestamp internally for de-duplication purposes.

Let me know your thoughts on this. Also, if you can provide some more context on whether the dataset you are trying to process is static or incrementally growing, and how that growth is managed, will new pages appear?

Best regards.

@AkshatJindal1
Copy link
Contributor Author

AkshatJindal1 commented Jan 5, 2021

I believe that this would solve our issue if the per-page offset was fixed for a set of records. But what I have noticed is that it contains start pointer and skip records as the query parameters (either directly or in encoded form).

Eg: https://server/rest.api?$ skiptoken=eyJzdGFydFJvdyI6MTAwMCwiZW5kUm93IjoyMDAwfQ==

Here the query parameter skiptoken is base64 encoded which translates to: {"startRow":1000,"endRow":2000}

So if in case the connector is reading some records and only a few new records were inserted, maybe 10 or 15, it might read the same record twice.
In this case, duplication can be avoided by using per record timestamp, but the problem that I see is that we might be making some unnecessary calls just to discard those records.

My idea is as follows:

  1. We can have a new parameter to get the key for the next page URL as part of the connector property.
  2. For one particular call, we push the record to Kafka whatever we get, and if the next page URL is present or is not null then we make that call for the next page by changing the query parameter and changing the timestamp offset to the lastest record for that page.
  3. Once we get to the end, we remove that particular query parameter and update the offset to the offset of the latest page.

Please let me know your thoughts. I had created a base code for this implementation. Let me know if this is not clear, I can share that.

This is what I am suggesting. #90.

AkshatJindal1 added a commit to AkshatJindal1/kafka-connect-http that referenced this issue Jan 5, 2021
@AkshatJindal1 AkshatJindal1 linked a pull request Jan 5, 2021 that will close this issue
AkshatJindal1 added a commit to AkshatJindal1/kafka-connect-http that referenced this issue Jan 5, 2021
@castorm
Copy link
Owner

castorm commented Jan 5, 2021

Hi @AkshatJindal1,

I'm afraid this type of pagination is not really aligned with the spirit of this connector. It puts a big burden in the data system supporting it, as it would cause performance issues the deeper you paginate. Because of this, I wouldn't like to encourage its use, especially in a connector that is specifically designed to run infinitely, hence making deep pagination issues a matter of when not if. (There are plenty of resources out there explaining this in detail, e.g.: https://kwojcicki.github.io/blog/ES-PAGINATION).

Having said that, I understand more people might face this sort of pagination, so I'm not opposed to support it from this project, as long its caveats are well documented, but we would need to manage to extend the code in a way that it doesn't alter its fundamental core, meaning, by adding stuff, not modifying significantly what's already there (Open/Close principle). I say this because I foresee fundamental differences with the current implementation that we'll need to reconcile.

I haven't seen your proposal yet, so please, accept my review with this in mind.

I don't really want to waste your time or assume anything, so if you already have an implementation that works for you in your fork and you are happy with it, maybe that's enough to you. This, obviously, is only relevant if you want to contribute that work.

Regarding your message:

So if in case the connector is reading some records and only a few new records were inserted, maybe 10 or 15, it might read the same record twice.

My thinking was actually the opposite, if those 10-15 records already had the next page already in the offset, the remaining would be skipped after recovery. That's why I talked about at-most-once semantics.
I assume you say this because your intention is not to consider the next page until all records have been processed. So only the last record would contain a pointer to the next page, and all previous records would contain a pointer to self page.

I guess the only problem with this would be when acks for publishing records are out of order (which can happen in partitioned topics) in which case would result in duplicates which can be handled, but it might also result in lost records, e.g. Assume at least 2 pages, where page 1 contains two records: A(next=1), B(next=2). In the scenario where B is acked before A, and then system faults. The next request would be for page 2, having no acknowledgement of A (potentially lost). It's an edge case, maybe not significant, but worth considering as it would need to be documented.

In this case, duplication can be avoided by using per record timestamp, but the problem that I see is that we might be making some unnecessary calls just to discard those records.

I would argue unnecessary calls in edge-cases are a cheap price to pay if it helps simplifying things.

My idea is as follows:
1- We can have a new parameter to get the key for the next page URL as part of the connector property.

It makes sense, it's also inline with what I mentioned in the previous message.

2- For one particular call, we push the record to Kafka whatever we get, and if the next page URL is present or is not null then we make that call for the next page by changing the query parameter and changing the timestamp offset to the latest record for that page.

The fundamental difference here is that in order to change from at-most-once to at-least-once semantics, you suggest to mark the latest record only with a pointer to the next page, instead of all records in the page.

3- Once we get to the end, we remove that particular query parameter and update the offset to the offset of the latest page.

I guess here is where most of the challenges would come. Once you reach the end, last page won't point any further.

  • if page was incomplete (record number < page size) -> there is no next -> next request should be for the same page
  • if page was complete (record number = page size) -> there might not be next until there is at least one record on the next page, making this page not to be latest -> next request should be for the same page until next is provided, the problem is, you might have already published the last record pointing to the same page, and de-duplication mechanisms will prevent you from publishing it again. I found this to be the trickiest scenario.

I hope this makes sense.

Thanks,
Best regards.

@castorm castorm linked a pull request Jan 5, 2021 that will close this issue
@AkshatJindal1
Copy link
Contributor Author

I guess here is where most of the challenges would come. Once you reach the end, the last page won't point any further.

if page was incomplete (record number < page size) -> there is no next -> next request should be for the same page
if page was complete (record number = page size) -> there might not be next until there is at least one record on the next page, making this page not to be latest -> next request should be for the same page until next is provided, the problem is, you might have already published the last record pointing to the same page, and de-duplication mechanisms will prevent you from publishing it again. I found this to be the trickiest scenario.

For this particular case, I wanted to handle it in the following way:

  1. We take the timestamp pointer as well as the next page pointer from the user.
  2. Once we reach the end, we update our query parameter with the timestamp for the latest record and remove the query parameter for the next page URL.
  3. From now on it will make the API call by only fetching the records from after the latest timestamp.

My assumption here is that the records that we get will be in ascending order of timestamp or we will have a mechanism to sort it in ascending order as part of API call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request in progress
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants