Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release 1.10/ack improvement highcpu consumption #426

Conversation

astelmashenko
Copy link
Member

@astelmashenko astelmashenko commented Sep 5, 2023

Fixes #

Highcpu consumption. Introduced by PR 41b2d76 to solve jetstream redelivery. It should be controlled by AckWait consumer configuration.

cc @dan-j

Proposed Changes

Removed code which created timer for each incomming message to mark InProgress to prevent redelivery from JetStream steam.
Big changes: implemented retries based on JetStream deliveryMax feature instead of in-memory golang retry module.

  • 🐛 Fix bug

Release Note

Fixes high cpu consumption of dispatcher

@knative-prow knative-prow bot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Sep 5, 2023
@knative-prow knative-prow bot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label Sep 5, 2023
@codecov
Copy link

codecov bot commented Sep 5, 2023

Codecov Report

Attention: 70 lines in your changes are missing coverage. Please review.

Comparison is base (38d85ad) 45.57% compared to head (e8ae33d) 51.06%.
Report is 1 commits behind head on release-1.10.

Files Patch % Lines
...channel/jetstream/dispatcher/message_dispatcher.go 77.37% 43 Missing and 19 partials ⚠️
pkg/channel/jetstream/dispatcher/consumer.go 0.00% 8 Missing ⚠️
Additional details and impacted files
@@               Coverage Diff                @@
##           release-1.10     #426      +/-   ##
================================================
+ Coverage         45.57%   51.06%   +5.49%     
================================================
  Files                29       30       +1     
  Lines              1953     2197     +244     
================================================
+ Hits                890     1122     +232     
+ Misses             1008     1001       -7     
- Partials             55       74      +19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@dan-j
Copy link
Contributor

dan-j commented Sep 5, 2023

So I added this because we ran into an issue where the time to spin up a Knative Service from 0-replicas was longer than the default AckWait. The dispatcher doesn't know when the *nats.Msg has expired, and the request is held on the knative activator until the service is ready. By the time the request is actually handled the AckWait has passed and the Ack fails.. However, now JetStream thinks the message hasn't been delivered so attempts a redelivery.

In an ideal scenario, the dispatcher would know about the AckWait and dispatch the actual HTTP request using a context with timeout AckWait - jitter (some jitter to make sure there's time for the dispatcher to do it's processing before/after the request). It doesn't look like you can access the AckWait/deadline on the *nats.Msg directly, but you could get the ConsumerInfo from the *nats.Subscription when creating the Consumer during Dispatcher#subscribe(). The ConsumerInfo has a Config.AckWait field, but I'd double check this is properly set if defaults are used when it's created.

I wouldn't want to merge this PR as-is until we have a solution for this 0-replica cold-start issue, but happy to hear any alternative solutions?

@dan-j
Copy link
Contributor

dan-j commented Sep 5, 2023

Ah, another point...

I'm not 100% sure, but I feel like nats.QueueSubscribe() receives messages in batches and then calls the MsgHandler in serial. I observed this when working on 41b2d76, which is why forwarding the actual event is done in another goroutine.

If this PR changes to my suggestion of setting a timeout on the request context, the jitter also needs to take into account that, from an AckWait perspective, the message is considered as "delivered" the moment it's received by the client. I'm not sure there's a way to know what this duration is, I imagine it will typically be sub-milliseconds but could spike to longer if there's high load.

@astelmashenko
Copy link
Member Author

astelmashenko commented Sep 6, 2023

So I added this because we ran into an issue where the time to spin up a Knative Service from 0-replicas was longer than the default AckWait. The dispatcher doesn't know when the *nats.Msg has expired, and the request is held on the knative activator until the service is ready. By the time the request is actually handled the AckWait has passed and the Ack fails.. However, now JetStream thinks the message hasn't been delivered so attempts a redelivery.

Regarding this part, you can set AckWait bigger and set request timeout. E.g. it take 20sec to spin up a service and 30 seconds to process a request, then you set delivery.timeout to 35sec and Channel AckWait to 35*(number of retries)+delta, e.g. 120sec

@astelmashenko
Copy link
Member Author

astelmashenko commented Sep 6, 2023

@dan-j , overall as I understand you need to calculate AckWait based on delivery.retry and delivery.timeout you set to you channel's subscription. E.g. just to set it big enough, e.g. 300s? and control slow consumers by MaxAckPending.

Having that timer you basiclly set AckWait to unlimited and add additional load on cpu. In my case I have broker and around 30 triggers which leads to 30 consumers with MaxAckPending=1000 it becomes 30000 timers in the worst case, it creates very big load on CPU.

@astelmashenko
Copy link
Member Author

I'm not 100% sure, but I feel like nats.QueueSubscribe() receives messages in batches and then calls the MsgHandler in serial.

I'm not sure about that, it is push based consumer, if I scale dispatcher it round-robin messages one by one in turn.

@dan-j
Copy link
Contributor

dan-j commented Sep 6, 2023

Regarding this part, you can set AckWait bigger and set request timeout. E.g. it take 20sec to spin up a service and 30 seconds to process a request, then you set delivery.timeout to 35sec and Channel AckWait to 35*(number of retries)+delta, e.g. 120sec

Yeah this is what I'm getting at. We could do that and merge this PR, but it would still introduce bugs in our environment, because the request dispatcher doesn't set any timeout/deadline on the request's context.Context. Which is what I mean about not wanting to merge this as-is.

I'm happy to remove the ticker, your issue is a real one which I agree needs fixing, just we need a solution which works for both of us.

One concern I've always had with the operator is how we handle retries. At the moment it's really confusing because we retry in multiple places: 1) at the JS layer via a consumer's maxDelivery and 2) at the in-memory layer via the DispatchMessageWithRetries() function which configured by the Subscription.

It would be nice to solve both issues here.

The simplest option is to remove redelivery from the JS layer and reuse the DispatchMessageWithRetries() functionality. However this would result in message loss in the event of a pod failure.

The more robust solution is to remove NatsJetStreamChannel.spec.consumerConfigTemplate.ackWait and NatsJetStreamChannel.spec.consumerConfigTemplate.maxDeliver from the CRDs, and calculate the proper JS consumer options based on the Subscription.spec.delivery configuration. Then let JS do all the retrying and use DispatchMessage instead of DispatchMessageWithRetries, setting the context.Context properly to abort requests which time out. This might be a bit of a headache to do exponential backoffs because ackWait would need to be the maximum possible, and the context's timeout would need to be calculated on each retry (which is possible because you can get the redelivery counter from meta, _ := msg.Metadata(); meta.NumDelivered)

* implemented update subscription

* do not call addstream if it is existing, to prevent error propagation

* added comments

* added reconciler test

* added reconciler tests

* removed unused types

* added check for err
@knative-prow-robot knative-prow-robot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Oct 9, 2023
@knative-prow knative-prow bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Oct 9, 2023
@knative-prow-robot knative-prow-robot removed the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Oct 9, 2023
@astelmashenko
Copy link
Member Author

hey @dan-j , I've done initial implementation (have not tested it though), your review is welcomed.

* implemented update subscription

* do not call addstream if it is existing, to prevent error propagation

* added comments

* added reconciler test

* added reconciler tests

* removed unused types

* added check for err
@astelmashenko astelmashenko marked this pull request as ready for review November 2, 2023 14:06
@knative-prow knative-prow bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Nov 2, 2023
@astelmashenko
Copy link
Member Author

@dan-j , here are results of load test. Scenario is pretty simple:
http workers -> broker (channel) -> trigger (sub) -> functionQ
A functionQ processes a request with random sleep.
Trigger has request timeout, so that functionQ 25% of times fails to process in time.
Please pay attention on blue line, it is http latency of the dispatcher.
Retries are in-memory:
image
Retries are based on JetStream:
image

@dan-j
Copy link
Contributor

dan-j commented Dec 17, 2023

Awesome stuff! Thanks for the graphs too 👍

It's a shame we've had to reimplement the whole message_dispatcher logic but from our earlier conversations this was going the be the only way.

I'm happy to merge this, but need to fix my permissions on the github org so I can't do it just yet

@astelmashenko
Copy link
Member Author

@dan-j , now it is green , ready to merge

@dan-j
Copy link
Contributor

dan-j commented Dec 18, 2023

@pierDipi could we have this merged? I will try to sort my permissions on knative/org or wherever it is this week

@astelmashenko
Copy link
Member Author

/assign @pierDipi

@creydr
Copy link
Contributor

creydr commented Dec 21, 2023

Hi @astelmashenko,
thanks for your PR!
Is there a reason, why you're targeting directly 1.10 branch and not main and then backporting it? That way it will only be fixed in 1.10 and not in any future releases :/

@astelmashenko
Copy link
Member Author

astelmashenko commented Dec 21, 2023

@creydr , it is because we are using 1.10 in production. I'll backport to later versions with e.g. cherry-pick or manually merge it into main.

@dan-j
Copy link
Contributor

dan-j commented Jan 2, 2024

@astelmashenko can we update the PR to go into main and then cherry-pick afterwards.

@astelmashenko
Copy link
Member Author

@dan-j , I would not do that, main was updated and, afaik, is not directly compatible with this PR

@dan-j
Copy link
Contributor

dan-j commented Jan 2, 2024

Ah, fair enough. Let's try and get this all wrapped up this week. I've created knative/community#1479 to add us both as approvers.

@zhaojizhuang at the moment you're the only approver of this repo, or there's probably an eventing admin who can look.

Once this is in, I'll get the changes onto latest too

@dan-j
Copy link
Contributor

dan-j commented Jan 4, 2024

/lgtm
/approve

@knative-prow knative-prow bot added the lgtm Indicates that a PR is ready to be merged. label Jan 4, 2024
@dan-j
Copy link
Contributor

dan-j commented Jan 4, 2024

@astelmashenko and I are now approvers, but since this is being merged into knative-extensions:release-1.10 the OWNERS_ALIASES file isn't updated.

Can someone from @knative-extensions/eventing-writers please approve this PR?

@pierDipi
Copy link
Member

pierDipi commented Jan 4, 2024

@dan-j we can backport the approvers update, feel free to open a PR

@pierDipi
Copy link
Member

pierDipi commented Jan 4, 2024

/approve

Copy link

knative-prow bot commented Jan 4, 2024

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: astelmashenko, dan-j, pierDipi

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow knative-prow bot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jan 4, 2024
@knative-prow knative-prow bot merged commit 711fe4f into knative-extensions:release-1.10 Jan 4, 2024
21 checks passed
@astelmashenko
Copy link
Member Author

/cherrypick main

@knative-prow-robot
Copy link
Contributor

@astelmashenko: #426 failed to apply on top of branch "main":

Applying: removed timer which marked messages as InProgress
Applying: goimports
Applying: implemented update subscription (#427)
Using index info to reconstruct a base tree...
M	pkg/channel/jetstream/dispatcher/dispatcher.go
M	pkg/channel/jetstream/dispatcher/dispatcher_test.go
M	pkg/channel/jetstream/dispatcher/natsjetstreamchannel_test.go
M	pkg/channel/jetstream/dispatcher/reconciler.go
Falling back to patching base and 3-way merge...
Auto-merging pkg/channel/jetstream/dispatcher/natsjetstreamchannel_test.go
Auto-merging pkg/channel/jetstream/dispatcher/dispatcher.go
No changes -- Patch already applied.
Applying: implemented retries based on JetStream; consumer ackWait and maxDeliver are not come from Subscription
Using index info to reconstruct a base tree...
M	pkg/channel/jetstream/dispatcher/consumer.go
M	pkg/channel/jetstream/dispatcher/dispatcher.go
Falling back to patching base and 3-way merge...
Auto-merging pkg/channel/jetstream/dispatcher/dispatcher.go
Auto-merging pkg/channel/jetstream/dispatcher/consumer.go
CONFLICT (content): Merge conflict in pkg/channel/jetstream/dispatcher/consumer.go
error: Failed to merge in the changes.
hint: Use 'git am --show-current-patch=diff' to see the failed patch
Patch failed at 0004 implemented retries based on JetStream; consumer ackWait and maxDeliver are not come from Subscription
When you have resolved this problem, run "git am --continue".
If you prefer to skip this patch, run "git am --skip" instead.
To restore the original branch and stop patching, run "git am --abort".

In response to this:

/cherrypick main

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. lgtm Indicates that a PR is ready to be merged. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants