Skip to content

Commit 9efde41

Browse files
authored
fix: run maxNumberOfPollers poller routines instead of initialNumberOfPollers (#2105)
* Update internal_worker_base.go * scalableTaskPoller concurrency scaling tests
1 parent a451bef commit 9efde41

File tree

2 files changed

+210
-1
lines changed

2 files changed

+210
-1
lines changed

internal/internal_worker_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -885,7 +885,7 @@ func newScalableTaskPoller(
885885
}
886886
switch p := pollerBehavior.(type) {
887887
case *pollerBehaviorAutoscaling:
888-
tw.pollerCount = p.initialNumberOfPollers
888+
tw.pollerCount = p.maximumNumberOfPollers
889889
tw.pollerSemaphore = newPollerSemaphore(p.initialNumberOfPollers)
890890
tw.pollerAutoscalerReportHandle = newPollScalerReportHandle(pollScalerReportHandleOptions{
891891
initialPollerCount: p.initialNumberOfPollers,

internal/internal_worker_base_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
package internal
22

33
import (
4+
"context"
5+
"sync/atomic"
46
"testing"
7+
"time"
58

69
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
711
"github.com/stretchr/testify/suite"
812
enumspb "go.temporal.io/api/enums/v1"
913
"go.temporal.io/api/serviceerror"
14+
"go.temporal.io/sdk/internal/common/metrics"
15+
ilog "go.temporal.io/sdk/internal/log"
1016
)
1117

1218
type (
@@ -19,6 +25,16 @@ func TestPollScalerReportHandleSuite(t *testing.T) {
1925
suite.Run(t, new(PollScalerReportHandleSuite))
2026
}
2127

28+
type (
29+
ScalableTaskPollerSuite struct {
30+
suite.Suite
31+
}
32+
)
33+
34+
func TestScalableTaskPollerSuite(t *testing.T) {
35+
suite.Run(t, new(ScalableTaskPollerSuite))
36+
}
37+
2238
type testTask struct {
2339
psd pollerScaleDecision
2440
}
@@ -115,3 +131,196 @@ func (s *PollScalerReportHandleSuite) TestScaleUpOnDelay() {
115131
assert.Equal(s.T(), 10, targetSuggestion)
116132

117133
}
134+
135+
func (s *ScalableTaskPollerSuite) TestAutoscalingConcurrencyScalesUpToMaximum() {
136+
behavior := &pollerBehaviorAutoscaling{
137+
initialNumberOfPollers: 2,
138+
maximumNumberOfPollers: 3,
139+
minimumNumberOfPollers: 1,
140+
}
141+
142+
blockingPoller := newSemaphoreProbeTaskPoller()
143+
poller := newScalableTaskPoller(blockingPoller, ilog.NewNopLogger(), behavior)
144+
poller.taskPollerType = "test"
145+
146+
bw := newBaseWorker(baseWorkerOptions{
147+
slotSupplier: &testSlotSupplier{},
148+
maxTaskPerSecond: 1000,
149+
taskPollers: []scalableTaskPoller{poller},
150+
taskProcessor: noopTaskProcessor{},
151+
workerType: "AutoscalingTest",
152+
logger: ilog.NewNopLogger(),
153+
stopTimeout: time.Second,
154+
metricsHandler: metrics.NopHandler,
155+
})
156+
157+
bw.Start()
158+
defer func() {
159+
allowBlockedPollers(blockingPoller, poller.pollerSemaphore)
160+
blockingPoller.Close()
161+
bw.Stop()
162+
}()
163+
164+
eventuallySemaphoreState(s.T(), blockingPoller, poller.pollerSemaphore, 2, 2, "expected initial poller to start")
165+
166+
require.Never(s.T(), func() bool {
167+
allowBlockedPollers(blockingPoller, poller.pollerSemaphore)
168+
permits, _ := readSemaphoreState(poller.pollerSemaphore)
169+
return permits > 2
170+
}, 200*time.Millisecond, 10*time.Millisecond, "should not exceed initial concurrency")
171+
172+
poller.pollerAutoscalerReportHandle.updateTarget(func(int64) int64 { return 3 })
173+
174+
eventuallySemaphoreState(s.T(), blockingPoller, poller.pollerSemaphore, 3, 3, "expected concurrency to scale up to maximum")
175+
176+
require.Never(s.T(), func() bool {
177+
allowBlockedPollers(blockingPoller, poller.pollerSemaphore)
178+
permits, _ := readSemaphoreState(poller.pollerSemaphore)
179+
return permits > 3
180+
}, 200*time.Millisecond, 10*time.Millisecond, "should not exceed maximum concurrency")
181+
}
182+
183+
func (s *ScalableTaskPollerSuite) TestAutoscalingScalesDownToMinimum() {
184+
behavior := &pollerBehaviorAutoscaling{
185+
initialNumberOfPollers: 2,
186+
maximumNumberOfPollers: 3,
187+
minimumNumberOfPollers: 1,
188+
}
189+
190+
blockingPoller := newSemaphoreProbeTaskPoller()
191+
poller := newScalableTaskPoller(blockingPoller, ilog.NewNopLogger(), behavior)
192+
poller.taskPollerType = "test"
193+
194+
bw := newBaseWorker(baseWorkerOptions{
195+
slotSupplier: &testSlotSupplier{},
196+
maxTaskPerSecond: 1000,
197+
taskPollers: []scalableTaskPoller{poller},
198+
taskProcessor: noopTaskProcessor{},
199+
workerType: "AutoscalingTest",
200+
logger: ilog.NewNopLogger(),
201+
stopTimeout: time.Second,
202+
metricsHandler: metrics.NopHandler,
203+
})
204+
205+
bw.Start()
206+
defer func() {
207+
allowBlockedPollers(blockingPoller, poller.pollerSemaphore)
208+
blockingPoller.Close()
209+
bw.Stop()
210+
}()
211+
212+
eventuallySemaphoreState(s.T(), blockingPoller, poller.pollerSemaphore, 2, 2, "expected initial concurrency")
213+
214+
poller.pollerAutoscalerReportHandle.updateTarget(func(target int64) int64 { return 1 })
215+
216+
eventuallySemaphoreState(s.T(), blockingPoller, poller.pollerSemaphore, 1, 1, "expected concurrency to reduce to minimum")
217+
218+
require.Never(s.T(), func() bool {
219+
allowBlockedPollers(blockingPoller, poller.pollerSemaphore)
220+
permits, _ := readSemaphoreState(poller.pollerSemaphore)
221+
return permits == 0
222+
}, 200*time.Millisecond, 10*time.Millisecond, "should not scale below minimum")
223+
}
224+
225+
type semaphoreProbeTaskPoller struct {
226+
signals chan struct{}
227+
closed atomic.Bool
228+
}
229+
230+
func newSemaphoreProbeTaskPoller() *semaphoreProbeTaskPoller {
231+
return &semaphoreProbeTaskPoller{
232+
signals: make(chan struct{}, 32),
233+
}
234+
}
235+
236+
// PollTask implements taskPoller and blocks until a signal is provided so the semaphore permits stay acquired.
237+
func (p *semaphoreProbeTaskPoller) PollTask() (taskForWorker, error) {
238+
_, ok := <-p.signals
239+
if !ok {
240+
return nil, nil
241+
}
242+
return nil, nil
243+
}
244+
245+
// Cleanup implements taskPoller.
246+
func (p *semaphoreProbeTaskPoller) Cleanup() error {
247+
p.Close()
248+
return nil
249+
}
250+
251+
func (p *semaphoreProbeTaskPoller) Allow(n int) {
252+
for range n {
253+
for {
254+
if p.closed.Load() {
255+
return
256+
}
257+
select {
258+
case p.signals <- struct{}{}:
259+
goto next
260+
default:
261+
time.Sleep(1 * time.Millisecond)
262+
}
263+
}
264+
next:
265+
}
266+
}
267+
268+
func (p *semaphoreProbeTaskPoller) Close() {
269+
if p.closed.CompareAndSwap(false, true) {
270+
close(p.signals)
271+
}
272+
}
273+
274+
func allowBlockedPollers(p *semaphoreProbeTaskPoller, sem *pollerSemaphore) {
275+
if p == nil || sem == nil {
276+
return
277+
}
278+
permits, _ := readSemaphoreState(sem)
279+
if permits > 0 {
280+
p.Allow(permits)
281+
}
282+
}
283+
284+
func eventuallySemaphoreState(t *testing.T, blockingPoller *semaphoreProbeTaskPoller, sem *pollerSemaphore, expectedPermits, expectedMax int, msg string) {
285+
require.Eventually(t, func() bool {
286+
allowBlockedPollers(blockingPoller, sem)
287+
permits, max := readSemaphoreState(sem)
288+
return permits == expectedPermits && max == expectedMax
289+
}, time.Second, 10*time.Millisecond, msg)
290+
}
291+
292+
func readSemaphoreState(ps *pollerSemaphore) (permits int, max int) {
293+
if ps == nil {
294+
return 0, 0
295+
}
296+
barrier := <-ps.bs
297+
permits = ps.permits
298+
max = ps.maxPermits
299+
ps.bs <- barrier
300+
return
301+
}
302+
303+
type testSlotSupplier struct{}
304+
305+
func (s *testSlotSupplier) ReserveSlot(ctx context.Context, info SlotReservationInfo) (*SlotPermit, error) {
306+
select {
307+
case <-ctx.Done():
308+
return nil, ctx.Err()
309+
default:
310+
}
311+
return &SlotPermit{}, nil
312+
}
313+
314+
func (s *testSlotSupplier) TryReserveSlot(SlotReservationInfo) *SlotPermit {
315+
return &SlotPermit{}
316+
}
317+
318+
func (s *testSlotSupplier) MarkSlotUsed(SlotMarkUsedInfo) {}
319+
320+
func (s *testSlotSupplier) ReleaseSlot(SlotReleaseInfo) {}
321+
322+
func (s *testSlotSupplier) MaxSlots() int { return 0 }
323+
324+
type noopTaskProcessor struct{}
325+
326+
func (noopTaskProcessor) ProcessTask(any) error { return nil }

0 commit comments

Comments
 (0)