Skip to content

Commit

Permalink
release 1.10/ack improvement highcpu consumption (#426)
Browse files Browse the repository at this point in the history
* removed timer which marked messages as InProgress

* goimports

* implemented update subscription (#427)

* implemented update subscription

* do not call addstream if it is existing, to prevent error propagation

* added comments

* added reconciler test

* added reconciler tests

* removed unused types

* added check for err

* implemented retries based on JetStream; consumer ackWait and maxDeliver are not come from Subscription

* removed timer which marked messages as InProgress

* goimports

* implemented retries based on JetStream; consumer ackWait and maxDeliver are not come from Subscription

* changed nil to noRetries

* removed timer which marked messages as InProgress

* goimports

* implemented update subscription (#427)

* implemented update subscription

* do not call addstream if it is existing, to prevent error propagation

* added comments

* added reconciler test

* added reconciler tests

* removed unused types

* added check for err

* implemented retries based on JetStream; consumer ackWait and maxDeliver are not come from Subscription

* changed nil to noRetries

* updated nats client lib, fixed backoff logic

* update codeget and removed commented code

* implemented exponential backoff right way on JetStream side

* removed timer which marked messages as InProgress

* goimports

* implemented retries based on JetStream; consumer ackWait and maxDeliver are not come from Subscription

* updated nats client lib, fixed backoff logic

* update codeget and removed commented code

* implemented exponential backoff right way on JetStream side

* removed timer which marked messages as InProgress

* goimports

* implemented retries based on JetStream; consumer ackWait and maxDeliver are not come from Subscription

* updated nats client lib, fixed backoff logic

* update codeget and removed commented code

* implemented exponential backoff right way on JetStream side

* rebased

* go mod tidy/vendor

* updatecodegen

* added calcuation of request deadline

* removed logging err back

* updated tests to cover missing lines

* using retryconfig.timeout as fallback, when there is no consumerTemplate.ackWait

* fix possible nil pointer error

* re-implemented message_dispatcher for jetstream side retries

* go-imports message_dispatcher.go

* tests: message_dispatcher.go

* goimports: message_dispatcher.go
  • Loading branch information
astelmashenko authored Jan 4, 2024
1 parent fcd6b58 commit 711fe4f
Show file tree
Hide file tree
Showing 117 changed files with 36,173 additions and 10,939 deletions.
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/kelseyhightower/envconfig v1.4.0
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats.go v1.22.1
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nats.go v1.30.2
github.com/nats-io/nkeys v0.4.5
github.com/nats-io/stan.go v0.9.0
github.com/pkg/errors v0.9.1
go.opencensus.io v0.23.0
Expand All @@ -29,7 +29,10 @@ require (
knative.dev/reconciler-test v0.0.0-20231023114053-616ce2cecb19
)

require github.com/stretchr/testify v1.8.0
require (
github.com/rickb777/date v1.13.0
github.com/stretchr/testify v1.8.0
)

require (
cloud.google.com/go v0.98.0 // indirect
Expand Down Expand Up @@ -75,7 +78,7 @@ require (
github.com/influxdata/tdigest v0.0.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
Expand All @@ -91,7 +94,6 @@ require (
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rickb777/date v1.13.0 // indirect
github.com/rickb777/plural v1.2.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rogpeppe/fastuuid v1.2.0 // indirect
Expand Down
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
Expand Down Expand Up @@ -431,14 +431,15 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE=
github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/stan.go v0.6.0/go.mod h1:eIcD5bi3pqbHT/xIIvXMwvzXYElgouBvaVRftaE+eac=
Expand Down
76 changes: 11 additions & 65 deletions pkg/channel/jetstream/dispatcher/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package dispatcher
import (
"context"
"errors"
"net/http"
"sync"
"time"

cejs "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/nats-io/nats.go"
"go.opencensus.io/trace"
"go.uber.org/zap"
Expand All @@ -46,11 +43,12 @@ var (

type Consumer struct {
sub Subscription
dispatcher eventingchannels.MessageDispatcher
dispatcher NatsMessageDispatcher
reporter eventingchannels.StatsReporter
channelNamespace string

jsSub *nats.Subscription
jsSub *nats.Subscription
natsConsumerInfo *nats.ConsumerInfo

logger *zap.SugaredLogger
ctx context.Context
Expand All @@ -73,66 +71,19 @@ func (c *Consumer) Close() error {
}

func (c *Consumer) MsgHandler(msg *nats.Msg) {
logger := c.logger.With(zap.String("msg_id", msg.Header.Get(nats.MsgIdHdr)))
ctx := logging.WithLogger(c.ctx, logger)
tickerCtx, tickerCancel := context.WithCancel(c.ctx)

tickerDone := make(chan struct{})

go func() {
defer close(tickerDone)

// TODO(dan-j): this should be a fraction of the Consumer's AckWait
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-tickerCtx.Done():
return
case <-ticker.C:
if err := msg.InProgress(nats.Context(tickerCtx)); err != nil && !errors.Is(err, context.Canceled) {
logging.FromContext(ctx).Errorw("failed to mark message as in progress", zap.Error(err))
}
}
}
}()
logger := c.logger.With(zap.String("msg_id", msg.Header.Get(nats.MsgIdHdr)))
ctx := logging.WithLogger(c.ctx, logger)

go func() {
var result protocol.Result

// wrap the handler in a local function so that the tickerCtx is cancelled even if a panic occurs.
func() {
defer tickerCancel()
result = c.doHandle(ctx, msg)
}()

// wait for the ticker to stop to prevent attempts to mark the message as in progress after it has been acked
// or nacked
<-tickerDone

switch {
case protocol.IsACK(result):
if err := msg.Ack(nats.Context(ctx)); err != nil {
logger.Errorw("failed to Ack message after successful delivery to subscriber", zap.Error(err))
}
case protocol.IsNACK(result):
if err := msg.Nak(nats.Context(ctx)); err != nil {
logger.Errorw("failed to Nack message after failed delivery to subscriber", zap.Error(err))
}
default:
if err := msg.Term(nats.Context(ctx)); err != nil {
logger.Errorw("failed to Term message after failed delivery to subscriber", zap.Error(err))
}
}
c.doHandle(ctx, msg)
}()
}

// doHandle forwards the received event to the subscriber, the return has three outcomes:
// - Ack (includes `nil`): the event was successfully delivered to the subscriber
// - Nack: the event was not delivered to the subscriber, but it can be retried
// - any other error: the event should be terminated and not retried
func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) protocol.Result {
func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) error {
logger := logging.FromContext(ctx)

if logger.Desugar().Core().Enabled(zap.DebugLevel) {
Expand Down Expand Up @@ -165,14 +116,16 @@ func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) protocol.Result

te := kncloudevents.TypeExtractorTransformer("")

dispatchExecutionInfo, err := c.dispatcher.DispatchMessageWithRetries(
dispatchExecutionInfo, err := c.dispatcher.DispatchMessageWithNatsRetries(
ctx,
message,
additionalHeaders,
c.sub.Subscriber,
c.sub.Reply,
c.sub.DeadLetter,
c.sub.RetryConfig,
c.natsConsumerInfo.Config.AckWait,
msg,
&te,
)

Expand All @@ -187,18 +140,11 @@ func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) protocol.Result
zap.Error(err),
zap.Any("dispatch_resp_code", dispatchExecutionInfo.ResponseCode))

code := dispatchExecutionInfo.ResponseCode
if code/100 == 5 || code == http.StatusTooManyRequests || code == http.StatusRequestTimeout {
// tell JSM to redeliver the message later
return protocol.NewReceipt(false, "%w", err)
}

// let knative decide what to do with the message, if it wraps an Ack/Nack then that is what will happen,
// otherwise we will Terminate the message
return err
}

logger.Debug("message forwarded to downstream subscriber")

return protocol.ResultACK
return nil
}
21 changes: 18 additions & 3 deletions pkg/channel/jetstream/dispatcher/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package dispatcher

import (
"time"

"github.com/nats-io/nats.go"
"knative.dev/eventing-natss/pkg/apis/messaging/v1alpha1"
"knative.dev/eventing-natss/pkg/channel/jetstream/utils"
"knative.dev/eventing/pkg/kncloudevents"
)

func buildStreamConfig(streamName, subject string, config *v1alpha1.StreamConfig) *nats.StreamConfig {
Expand Down Expand Up @@ -55,19 +58,31 @@ func buildStreamConfig(streamName, subject string, config *v1alpha1.StreamConfig

}

func buildConsumerConfig(consumerName, deliverSubject string, template *v1alpha1.ConsumerConfigTemplate) *nats.ConsumerConfig {
func buildConsumerConfig(consumerName, deliverSubject string, template *v1alpha1.ConsumerConfigTemplate, retryConfig *kncloudevents.RetryConfig) *nats.ConsumerConfig {
const jitter = time.Millisecond * 500
consumerConfig := nats.ConsumerConfig{
Durable: consumerName,
DeliverGroup: consumerName,
DeliverSubject: deliverSubject,
AckPolicy: nats.AckExplicitPolicy,
}

if template != nil {
consumerConfig.AckWait = template.AckWait.Duration
}

if retryConfig != nil {
if retryConfig.RequestTimeout > 0 {
consumerConfig.AckWait = retryConfig.RequestTimeout + jitter
}

consumerConfig.MaxDeliver = retryConfig.RetryMax + 1
}

if template != nil {
consumerConfig.DeliverPolicy = utils.ConvertDeliverPolicy(template.DeliverPolicy, nats.DeliverAllPolicy)
consumerConfig.OptStartSeq = template.OptStartSeq
consumerConfig.AckWait = template.AckWait.Duration
consumerConfig.MaxDeliver = template.MaxDeliver
// ignoring template.AckWait and template.MaxDeliver
consumerConfig.FilterSubject = template.FilterSubject
consumerConfig.ReplayPolicy = utils.ConvertReplayPolicy(template.ReplayPolicy, nats.ReplayInstantPolicy)
consumerConfig.RateLimit = template.RateLimitBPS
Expand Down
9 changes: 5 additions & 4 deletions pkg/channel/jetstream/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
// - Consumer per .spec.subscribers[] of a channel, forwarding events to the specified subscriber address.
type Dispatcher struct {
receiver *eventingchannels.MessageReceiver
dispatcher *eventingchannels.MessageDispatcherImpl
dispatcher *NatsMessageDispatcherImpl
reporter eventingchannels.StatsReporter

js nats.JetStreamContext
Expand All @@ -71,7 +71,7 @@ func NewDispatcher(ctx context.Context, args NatsDispatcherArgs) (*Dispatcher, e
reporter := eventingchannels.NewStatsReporter(args.ContainerName, kmeta.ChildName(args.PodName, uuid.New().String()))

d := &Dispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(logger.Desugar()),
dispatcher: NewNatsMessageDispatcher(logger.Desugar()),
reporter: reporter,

js: args.JetStream,
Expand Down Expand Up @@ -217,7 +217,7 @@ func (d *Dispatcher) updateSubscription(ctx context.Context, config ChannelConfi

if isLeader {
deliverSubject := d.consumerSubjectFunc(config.Namespace, config.Name, string(sub.UID))
consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate)
consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate, sub.RetryConfig)

_, err := d.js.UpdateConsumer(config.StreamName, consumerConfig)
if err != nil {
Expand Down Expand Up @@ -251,6 +251,7 @@ func (d *Dispatcher) subscribe(ctx context.Context, config ChannelConfig, sub Su
channelNamespace: config.Namespace,
logger: logger,
ctx: ctx,
natsConsumerInfo: info,
}

consumer.jsSub, err = d.js.QueueSubscribe(info.Config.DeliverSubject, info.Config.DeliverGroup, consumer.MsgHandler,
Expand Down Expand Up @@ -300,7 +301,7 @@ func (d *Dispatcher) getOrEnsureConsumer(ctx context.Context, config ChannelConf

if isLeader {
deliverSubject := d.consumerSubjectFunc(config.Namespace, config.Name, string(sub.UID))
consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate)
consumerConfig := buildConsumerConfig(consumerName, deliverSubject, config.ConsumerConfigTemplate, sub.RetryConfig)

// AddConsumer is idempotent so this will either create the consumer, update to match expected config, or no-op
info, err := d.js.AddConsumer(config.StreamName, consumerConfig)
Expand Down
13 changes: 12 additions & 1 deletion pkg/channel/jetstream/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package dispatcher
import (
"context"
"testing"
"time"

"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -67,8 +71,15 @@ func TestDispatcher_ReconcileConsumers(t *testing.T) {
ctx = controller.WithEventRecorder(ctx, eventRecorder)

nc := reconciletesting.NewNatsJetStreamChannel(testNS, ncName, reconciletesting.WithNatsJetStreamChannelSubscribers(subscribers))
sub := fanout.Subscription{
RetryConfig: &kncloudevents.RetryConfig{
RequestTimeout: time.Second,
RetryMax: 1,
},
}
config := createChannelConfig(nc, Subscription{
UID: subscriber1UID,
UID: subscriber1UID,
Subscription: sub,
})

d, err := NewDispatcher(ctx, NatsDispatcherArgs{
Expand Down
Loading

0 comments on commit 711fe4f

Please sign in to comment.