diff --git a/pkg/csp/chan.go b/pkg/csp/chan.go index 821dbef7..2409a2e6 100644 --- a/pkg/csp/chan.go +++ b/pkg/csp/chan.go @@ -208,7 +208,7 @@ func (s Sender) Send(ctx context.Context, v Value) (casm.Future, capnp.ReleaseFu // for a given sender. func (s Sender) NewStream(ctx context.Context) SendStream { sender := channel.Sender(s) - sender.SetFlowLimiter(flowcontrol.NewFixedLimiter(1024)) // TODO: use BBR once scheduler bug is fixed + sender.SetFlowLimiter(flowcontrol.NewFixedLimiter(1e6)) // TODO: use BBR once scheduler bug is fixed return SendStream{ ctx: ctx, diff --git a/pkg/pubsub/topic.go b/pkg/pubsub/topic.go index 20acc4ba..0ac1ccb0 100644 --- a/pkg/pubsub/topic.go +++ b/pkg/pubsub/topic.go @@ -57,7 +57,7 @@ func (t Topic) Publish(ctx context.Context, b []byte) error { // FlowLimiter. func (t Topic) NewStream(ctx context.Context) Stream { // TODO: use BBR once scheduler bug is fixed - api.Topic(t).SetFlowLimiter(flowcontrol.NewFixedLimiter(1024)) + api.Topic(t).SetFlowLimiter(flowcontrol.NewFixedLimiter(1e6)) cherr := make(chan error, 1) done := make(chan struct{}) @@ -235,7 +235,7 @@ func (t topicServer) Subscribe(ctx context.Context, call MethodSubscribe) error defer sub.Cancel() sender := call.Args().Chan() - sender.SetFlowLimiter(flowcontrol.NewFixedLimiter(1024)) // TODO: use BBR once scheduler bug is fixed + sender.SetFlowLimiter(flowcontrol.NewFixedLimiter(1e6)) // TODO: use BBR once scheduler bug is fixed t.log.Debug("registered subscription handler") defer t.log.Debug("unregistered subscription handler")