66 "log"
77 "os"
88 "strings"
9+ "sync/atomic"
910 "testing"
1011 "time"
1112
@@ -241,9 +242,7 @@ func TestRecoverAhead(t *testing.T) {
241242 return proc2 .Run (ctx )
242243 })
243244
244- pollTimed (t , "procs 1&2 recovered" , func () bool {
245- return true
246- }, proc1 .Recovered , proc2 .Recovered )
245+ pollTimed (t , "procs 1&2 recovered" , proc1 .Recovered , proc2 .Recovered )
247246
248247 // check the storages that were initalized by the processors:
249248 // both have each 2 storages, because all tables only have 1 partition
@@ -259,9 +258,6 @@ func TestRecoverAhead(t *testing.T) {
259258 // wait until the keys are present
260259 pollTimed (t , "key-values are present" ,
261260
262- func () bool {
263- return true
264- },
265261 func () bool {
266262 has , _ := tableStorage1 .Has ("key1" )
267263 return has
@@ -297,6 +293,141 @@ func TestRecoverAhead(t *testing.T) {
297293 require .NoError (t , errg .Wait ().ErrorOrNil ())
298294}
299295
296+ func TestRecoverForever (t * testing.T ) {
297+ brokers := initSystemTest (t )
298+ var (
299+ group = goka .Group (fmt .Sprintf ("goka-systemtest-recoverforever-%d" , time .Now ().Unix ()))
300+ inputStream = fmt .Sprintf ("%s-input" , group )
301+ table = string (goka .GroupTable (group ))
302+ )
303+
304+ tmc := goka .NewTopicManagerConfig ()
305+ tmc .Table .Replication = 1
306+ tmc .Stream .Replication = 1
307+ cfg := goka .DefaultConfig ()
308+ cfg .Consumer .Offsets .Initial = sarama .OffsetOldest
309+ goka .ReplaceGlobalConfig (cfg )
310+
311+ tm , err := goka .TopicManagerBuilderWithConfig (cfg , tmc )(brokers )
312+ require .NoError (t , err )
313+
314+ err = tm .EnsureStreamExists (inputStream , 1 )
315+ require .NoError (t , err )
316+ err = tm .EnsureTableExists (string (table ), 1 )
317+ require .NoError (t , err )
318+
319+ // emit something into the state table (like simulating a processor ctx.SetValue()).
320+ // Our test processors should update their value in the join-table
321+ tableEmitter , err := goka .NewEmitter (brokers , goka .Stream (table ), new (codec.String ))
322+ require .NoError (t , err )
323+ require .NoError (t , tableEmitter .EmitSync ("key1" , "tableval1" ))
324+ require .NoError (t , tableEmitter .Finish ())
325+
326+ // emit an input-message
327+ inputEmitter , err := goka .NewEmitter (brokers , goka .Stream (inputStream ), new (codec.String ))
328+ require .NoError (t , err )
329+ require .NoError (t , inputEmitter .EmitSync ("key1" , "input-value" ))
330+ require .NoError (t , inputEmitter .Finish ())
331+
332+ storageTracker := newStorageTracker ()
333+
334+ var (
335+ processed atomic.Int64
336+ itemsRecovered atomic.Int64
337+ )
338+
339+ createProc := func (recoverForever bool ) * goka.Processor {
340+ opts := []goka.ProcessorOption {
341+
342+ goka .WithUpdateCallback (func (ctx goka.UpdateContext , s storage.Storage , key string , value []byte ) error {
343+ itemsRecovered .Add (1 )
344+ return goka .DefaultUpdate (ctx , s , key , value )
345+ }),
346+ goka .WithStorageBuilder (storageTracker .Build ),
347+ }
348+ if recoverForever {
349+ opts = append (opts , goka .WithRecoverForever ())
350+ }
351+ proc , err := goka .NewProcessor (brokers ,
352+ goka .DefineGroup (
353+ group ,
354+ goka .Input (goka .Stream (inputStream ), new (codec.String ), func (ctx goka.Context , msg interface {}) {
355+ processed .Add (1 )
356+ }),
357+ goka .Persist (new (codec.String )),
358+ ),
359+ opts ... ,
360+ )
361+ require .NoError (t , err )
362+ return proc
363+ }
364+
365+ proc1 := createProc (true )
366+
367+ ctx , cancel := context .WithCancel (context .Background ())
368+ defer cancel ()
369+ errg , ctx := multierr .NewErrGroup (ctx )
370+
371+ errg .Go (func () error {
372+ return proc1 .Run (ctx )
373+ })
374+
375+ pollTimed (t , "procs 1 storage initialized" , func () bool {
376+ return len (storageTracker .storages ) == 1
377+ })
378+
379+ // get the corresponding storages for both table and join-partitions
380+ tableStorage1 := storageTracker .storages [storageTracker .key (string (table ), 0 )]
381+
382+ // wait until the keys are present
383+ pollTimed (t , "key-values are present" ,
384+ func () bool {
385+ has , _ := tableStorage1 .Has ("key1" )
386+ return has
387+ },
388+ )
389+
390+ // check the table-values
391+ val1 , _ := tableStorage1 .Get ("key1" )
392+ require .Equal (t , "tableval1" , string (val1 ))
393+
394+ // stop everything and wait until it's shut down
395+ cancel ()
396+ require .NoError (t , errg .Wait ().ErrorOrNil ())
397+
398+ require .EqualValues (t , 0 , processed .Load ())
399+ require .EqualValues (t , 1 , itemsRecovered .Load ())
400+
401+ // run processor a second time, without recover forever
402+ itemsRecovered .Store (0 )
403+ proc2 := createProc (false )
404+ ctx , cancel = context .WithCancel (context .Background ())
405+ defer cancel ()
406+ errg , ctx = multierr .NewErrGroup (ctx )
407+ errg .Go (func () error {
408+ return proc2 .Run (ctx )
409+ })
410+
411+ pollTimed (t , "procs 2 storage initialized" , func () bool {
412+ return len (storageTracker .storages ) == 1
413+ })
414+
415+ pollTimed (t , "procs recovered" , proc2 .Recovered )
416+
417+ // wait until the input is actually processed
418+ pollTimed (t , "input processed" , func () bool {
419+ return processed .Load () == 1
420+ })
421+
422+ // at this point we know the processor started, recovered, and consumed the one message in the input-table.
423+ // Now make sure the processor did not recover again (because it did already in the first run)
424+ require .EqualValues (t , 0 , itemsRecovered .Load ())
425+
426+ // stop everything and wait until it's shut down
427+ cancel ()
428+ require .NoError (t , errg .Wait ().ErrorOrNil ())
429+ }
430+
300431// TestRebalance runs some processors to test rebalance. It's merely a
301432// runs-without-errors test, not a real functional test.
302433func TestRebalance (t * testing.T ) {
0 commit comments