diff --git a/relayer/processor/event_processor.go b/relayer/processor/event_processor.go index 37dba66fc..5bbaa5205 100644 --- a/relayer/processor/event_processor.go +++ b/relayer/processor/event_processor.go @@ -2,8 +2,7 @@ package processor import ( "context" - - "golang.org/x/sync/errgroup" + "sync" ) // EventProcessorBuilder is a configuration type with .With functions used for building an EventProcessor. @@ -91,25 +90,37 @@ func (ep EventProcessorBuilder) Build() EventProcessor { // It will return once all PathProcessors and ChainProcessors have stopped running due to context cancellation, // or if a critical error has occurred within one of the ChainProcessors. func (ep EventProcessor) Run(ctx context.Context) error { - var eg errgroup.Group + var wg sync.WaitGroup + runCtx, runCtxCancel := context.WithCancel(ctx) for _, pathProcessor := range ep.pathProcessors { pathProcessor := pathProcessor - eg.Go(func() error { + + wg.Add(1) + go func() { pathProcessor.Run(runCtx, runCtxCancel) - return nil - }) + wg.Done() + }() } + for _, chainProcessor := range ep.chainProcessors { chainProcessor := chainProcessor - eg.Go(func() error { - err := chainProcessor.Run(runCtx, ep.initialBlockHistory, ep.stuckPacket) - // Signal the other chain processors to exit. - runCtxCancel() - return err - }) + + wg.Add(1) + go func() { + err := func() error { + err := chainProcessor.Run(runCtx, ep.initialBlockHistory, ep.stuckPacket) + return err + }() + if err != nil { + // TODO: do we need to log errors here or have they already been logged by ChainProcessor? + } + wg.Done() + }() } - err := eg.Wait() + + wg.Wait() runCtxCancel() - return err + + return nil } diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index b8b9d4da2..10677b9e2 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -399,7 +399,11 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { return } - for len(pp.pathEnd1.incomingCacheData) > 0 || len(pp.pathEnd2.incomingCacheData) > 0 || len(pp.retryProcess) > 0 || len(pp.pathEnd1.finishedProcessing) > 0 || len(pp.pathEnd2.finishedProcessing) > 0 { + for len(pp.pathEnd1.incomingCacheData) > 0 || + len(pp.pathEnd2.incomingCacheData) > 0 || + len(pp.retryProcess) > 0 || + len(pp.pathEnd1.finishedProcessing) > 0 || + len(pp.pathEnd2.finishedProcessing) > 0 { // signals are available, so this will not need to block. if pp.processAvailableSignals(ctx, cancel) { return