Skip to content

Commit 6a4dfd0

Browse files
committed
bugfix recoverforever option for stateless processors
1 parent e157b9c commit 6a4dfd0

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

processor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,12 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
328328
return nil
329329
}
330330

331+
// never enter the rebalance loop, just stop here until the context is closed
332+
if g.opts.recoverForever {
333+
<-ctx.Done()
334+
return nil
335+
}
336+
331337
// run the main rebalance-consume-loop
332338
errg.Go(func() error {
333339
return g.rebalanceLoop(ctx)

systemtest/processor_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,74 @@ func TestRebalance(t *testing.T) {
497497
require.NoError(t, errg.Wait().ErrorOrNil())
498498
}
499499

500+
func TestRecoverForeverStateless(t *testing.T) {
501+
brokers := initSystemTest(t)
502+
var (
503+
group = goka.Group(fmt.Sprintf("goka-systemtest-recoverforever-%d", time.Now().Unix()))
504+
inputStream = fmt.Sprintf("%s-input", group)
505+
)
506+
507+
tmc := goka.NewTopicManagerConfig()
508+
tmc.Table.Replication = 1
509+
tmc.Stream.Replication = 1
510+
cfg := goka.DefaultConfig()
511+
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
512+
goka.ReplaceGlobalConfig(cfg)
513+
514+
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
515+
require.NoError(t, err)
516+
517+
err = tm.EnsureStreamExists(inputStream, 1)
518+
require.NoError(t, err)
519+
520+
// emit an input-message
521+
inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
522+
require.NoError(t, err)
523+
require.NoError(t, inputEmitter.EmitSync("key1", "input-value"))
524+
require.NoError(t, inputEmitter.Finish())
525+
526+
var (
527+
processed atomic.Int64
528+
)
529+
530+
proc, err := goka.NewProcessor(brokers,
531+
goka.DefineGroup(
532+
group,
533+
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) {
534+
processed.Add(1)
535+
}),
536+
),
537+
goka.WithRecoverForever(),
538+
)
539+
require.NoError(t, err)
540+
541+
ctx, cancel := context.WithCancel(context.Background())
542+
defer cancel()
543+
errg, ctx := multierr.NewErrGroup(ctx)
544+
545+
errg.Go(func() error {
546+
return proc.Run(ctx)
547+
})
548+
549+
// wait until it's starting
550+
select {
551+
case <-proc.StateReader().WaitForState(goka.ProcStateStarting):
552+
case <-time.After(10 * time.Second):
553+
}
554+
555+
// wait some more, at least rebalance-timeout
556+
<-time.After(5 * time.Second)
557+
558+
// it should never recover
559+
require.False(t, proc.Recovered())
560+
// nor process any message
561+
require.EqualValues(t, 0, processed.Load())
562+
563+
// stop everything and wait until it's shut down
564+
cancel()
565+
require.NoError(t, errg.Wait().ErrorOrNil())
566+
}
567+
500568
// TestRebalanceSharePartitions runs two processors one after each other
501569
// and asserts that they rebalance partitions appropriately
502570
func TestRebalanceSharePartitions(t *testing.T) {

0 commit comments

Comments
 (0)