Skip to content

Commit d5b855e

Browse files
committed
bugfix recoverforever option for stateless processors
1 parent 83aa884 commit d5b855e

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

processor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,12 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
333333
return nil
334334
}
335335

336+
// never enter the rebalance loop, just stop here until the context is closed
337+
if g.opts.recoverForever {
338+
<-ctx.Done()
339+
return nil
340+
}
341+
336342
// run the main rebalance-consume-loop
337343
errg.Go(func() error {
338344
return g.rebalanceLoop(ctx)

systemtest/processor_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,74 @@ func TestRebalance(t *testing.T) {
502502
require.NoError(t, errg.Wait().ErrorOrNil())
503503
}
504504

505+
func TestRecoverForeverStateless(t *testing.T) {
506+
brokers := initSystemTest(t)
507+
var (
508+
group = goka.Group(fmt.Sprintf("goka-systemtest-recoverforever-%d", time.Now().Unix()))
509+
inputStream = fmt.Sprintf("%s-input", group)
510+
)
511+
512+
tmc := goka.NewTopicManagerConfig()
513+
tmc.Table.Replication = 1
514+
tmc.Stream.Replication = 1
515+
cfg := goka.DefaultConfig()
516+
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
517+
goka.ReplaceGlobalConfig(cfg)
518+
519+
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
520+
require.NoError(t, err)
521+
522+
err = tm.EnsureStreamExists(inputStream, 1)
523+
require.NoError(t, err)
524+
525+
// emit an input-message
526+
inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
527+
require.NoError(t, err)
528+
require.NoError(t, inputEmitter.EmitSync("key1", "input-value"))
529+
require.NoError(t, inputEmitter.Finish())
530+
531+
var (
532+
processed atomic.Int64
533+
)
534+
535+
proc, err := goka.NewProcessor(brokers,
536+
goka.DefineGroup(
537+
group,
538+
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) {
539+
processed.Add(1)
540+
}),
541+
),
542+
goka.WithRecoverForever(),
543+
)
544+
require.NoError(t, err)
545+
546+
ctx, cancel := context.WithCancel(context.Background())
547+
defer cancel()
548+
errg, ctx := multierr.NewErrGroup(ctx)
549+
550+
errg.Go(func() error {
551+
return proc.Run(ctx)
552+
})
553+
554+
// wait until it's starting
555+
select {
556+
case <-proc.StateReader().WaitForState(goka.ProcStateStarting):
557+
case <-time.After(10 * time.Second):
558+
}
559+
560+
// wait some more, at least rebalance-timeout
561+
<-time.After(5 * time.Second)
562+
563+
// it should never recover
564+
require.False(t, proc.Recovered())
565+
// nor process any message
566+
require.EqualValues(t, 0, processed.Load())
567+
568+
// stop everything and wait until it's shut down
569+
cancel()
570+
require.NoError(t, errg.Wait().ErrorOrNil())
571+
}
572+
505573
// TestRebalanceSharePartitions runs two processors one after each other
506574
// and asserts that they rebalance partitions appropriately
507575
func TestRebalanceSharePartitions(t *testing.T) {

0 commit comments

Comments
 (0)