diff --git a/dot/state/block.go b/dot/state/block.go index d57df0beb0..3d34be19bf 100644 --- a/dot/state/block.go +++ b/dot/state/block.go @@ -62,6 +62,10 @@ type BlockState struct { unfinalisedBlocks *hashToBlockMap tries *Tries + // State variables + pausedLock sync.RWMutex + pause chan struct{} + // block notifiers imported map[chan *types.Block]struct{} finalised map[chan *types.FinalisationInfo]struct{} @@ -85,6 +89,7 @@ func NewBlockState(db database.Database, trs *Tries, telemetry Telemetry) (*Bloc finalised: make(map[chan *types.FinalisationInfo]struct{}), runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version), telemetry: telemetry, + pause: make(chan struct{}), } gh, err := bs.db.Get(headerHashKey(0)) @@ -120,6 +125,7 @@ func NewBlockStateFromGenesis(db database.Database, trs *Tries, header *types.He genesisHash: header.Hash(), lastFinalised: header.Hash(), telemetry: telemetryMailer, + pause: make(chan struct{}), } if err := bs.setArrivalTime(header.Hash(), time.Now()); err != nil { @@ -153,6 +159,29 @@ func NewBlockStateFromGenesis(db database.Database, trs *Tries, header *types.He return bs, nil } +// Pause pauses the service ie. halts block production +func (bs *BlockState) Pause() error { + bs.pausedLock.Lock() + defer bs.pausedLock.Unlock() + + if bs.IsPaused() { + return nil + } + + close(bs.pause) + return nil +} + +// IsPaused returns if the service is paused or not (ie. producing blocks) +func (bs *BlockState) IsPaused() bool { + select { + case <-bs.pause: + return true + default: + return false + } +} + // encodeBlockNumber encodes a block number as big endian uint64 func encodeBlockNumber(number uint64) []byte { enc := make([]byte, 8) // encoding results in 8 bytes diff --git a/dot/state/block_finalisation.go b/dot/state/block_finalisation.go index 1d7297a052..dce3645678 100644 --- a/dot/state/block_finalisation.go +++ b/dot/state/block_finalisation.go @@ -127,6 +127,10 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er bs.lock.Lock() defer bs.lock.Unlock() + if bs.IsPaused() { + return errors.New("blockstate service is paused") + } + has, err := bs.HasHeader(hash) if err != nil { return fmt.Errorf("could not check header for hash %s: %w", hash, err) diff --git a/dot/state/service.go b/dot/state/service.go index 8afeb41151..cb38a371f2 100644 --- a/dot/state/service.go +++ b/dot/state/service.go @@ -44,6 +44,11 @@ type Service struct { BabeThresholdDenominator uint64 } +// Pause Pauses the state service +func (s *Service) Pause() error { + return s.Block.Pause() +} + // Config is the default configuration used by state service. type Config struct { Path string diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index 7de69c3bbc..6fcdc44a60 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -262,6 +262,10 @@ func (cs *chainSync) bootstrapSync() { cs.workerPool.useConnectedPeers() err = cs.requestMaxBlocksFrom(currentBlock, networkInitialSync) if err != nil { + if errors.Is(err, errBlockStatePaused) { + logger.Debugf("exiting bootstrap sync: %s", err) + return + } logger.Errorf("requesting max blocks from best block header: %s", err) } @@ -411,8 +415,11 @@ func (cs *chainSync) requestChainBlocks(announcedHeader, bestBlockHeader *types. } resultsQueue := make(chan *syncTaskResult) - cs.workerPool.submitRequest(request, &peerWhoAnnounced, resultsQueue) - err := cs.handleWorkersResults(resultsQueue, networkBroadcast, startAtBlock, totalBlocks) + err := cs.submitRequest(request, &peerWhoAnnounced, resultsQueue) + if err != nil { + return err + } + err = cs.handleWorkersResults(resultsQueue, networkBroadcast, startAtBlock, totalBlocks) if err != nil { return fmt.Errorf("while handling workers results: %w", err) } @@ -449,8 +456,10 @@ func (cs *chainSync) requestForkBlocks(bestBlockHeader, highestFinalizedHeader, gapLength, peerWhoAnnounced, announcedHeader.Number, announcedHash.Short()) resultsQueue := make(chan *syncTaskResult) - cs.workerPool.submitRequest(request, &peerWhoAnnounced, resultsQueue) - + err = cs.submitRequest(request, &peerWhoAnnounced, resultsQueue) + if err != nil { + return err + } err = cs.handleWorkersResults(resultsQueue, networkBroadcast, startAtBlock, gapLength) if err != nil { return fmt.Errorf("while handling workers results: %w", err) @@ -499,8 +508,10 @@ func (cs *chainSync) requestPendingBlocks(highestFinalizedHeader *types.Header) // the `requests` in the tip sync are not related necessarily // this is why we need to treat them separately resultsQueue := make(chan *syncTaskResult) - cs.workerPool.submitRequest(descendingGapRequest, nil, resultsQueue) - + err = cs.submitRequest(descendingGapRequest, nil, resultsQueue) + if err != nil { + return err + } // TODO: we should handle the requests concurrently // a way of achieve that is by constructing a new `handleWorkersResults` for // handling only tip sync requests @@ -536,8 +547,11 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin } } - resultsQueue := cs.workerPool.submitRequests(requests) - err := cs.handleWorkersResults(resultsQueue, origin, startRequestAt, expectedAmountOfBlocks) + resultsQueue, err := cs.submitRequests(requests) + if err != nil { + return err + } + err = cs.handleWorkersResults(resultsQueue, origin, startRequestAt, expectedAmountOfBlocks) if err != nil { return fmt.Errorf("while handling workers results: %w", err) } @@ -545,6 +559,26 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin return nil } +func (cs *chainSync) submitRequest( + request *network.BlockRequestMessage, + who *peer.ID, + resultCh chan<- *syncTaskResult, +) error { + if !cs.blockState.IsPaused() { + cs.workerPool.submitRequest(request, who, resultCh) + return nil + } + return fmt.Errorf("submitting request: %w", errBlockStatePaused) +} + +func (cs *chainSync) submitRequests(requests []*network.BlockRequestMessage) ( + resultCh chan *syncTaskResult, err error) { + if !cs.blockState.IsPaused() { + return cs.workerPool.submitRequests(requests), nil + } + return nil, fmt.Errorf("submitting requests: %w", errBlockStatePaused) +} + func (cs *chainSync) showSyncStats(syncBegin time.Time, syncedBlocks int) { finalisedHeader, err := cs.blockState.GetHighestFinalisedHeader() if err != nil { @@ -581,7 +615,6 @@ func (cs *chainSync) showSyncStats(syncBegin time.Time, syncedBlocks int) { func (cs *chainSync) handleWorkersResults( workersResults chan *syncTaskResult, origin blockOrigin, startAtBlock uint, expectedSyncedBlocks uint32) error { startTime := time.Now() - syncingChain := make([]*types.BlockData, expectedSyncedBlocks) // the total numbers of blocks is missing in the syncing chain waitingBlocks := expectedSyncedBlocks @@ -627,7 +660,10 @@ taskResultLoop: } // TODO: avoid the same peer to get the same task - cs.workerPool.submitRequest(request, nil, workersResults) + err := cs.submitRequest(request, nil, workersResults) + if err != nil { + return err + } continue } @@ -648,14 +684,20 @@ taskResultLoop: }, who) } - cs.workerPool.submitRequest(taskResult.request, nil, workersResults) + err = cs.submitRequest(taskResult.request, nil, workersResults) + if err != nil { + return err + } continue taskResultLoop } isChain := isResponseAChain(response.BlockData) if !isChain { logger.Criticalf("response from %s is not a chain", who) - cs.workerPool.submitRequest(taskResult.request, nil, workersResults) + err = cs.submitRequest(taskResult.request, nil, workersResults) + if err != nil { + return err + } continue taskResultLoop } @@ -663,7 +705,10 @@ taskResultLoop: startAtBlock, expectedSyncedBlocks) if !grows { logger.Criticalf("response from %s does not grows the ongoing chain", who) - cs.workerPool.submitRequest(taskResult.request, nil, workersResults) + err = cs.submitRequest(taskResult.request, nil, workersResults) + if err != nil { + return err + } continue taskResultLoop } @@ -678,7 +723,10 @@ taskResultLoop: }, who) cs.workerPool.ignorePeerAsWorker(taskResult.who) - cs.workerPool.submitRequest(taskResult.request, nil, workersResults) + err = cs.submitRequest(taskResult.request, nil, workersResults) + if err != nil { + return err + } continue taskResultLoop } @@ -708,7 +756,10 @@ taskResultLoop: Direction: network.Ascending, Max: &difference, } - cs.workerPool.submitRequest(taskResult.request, nil, workersResults) + err = cs.submitRequest(taskResult.request, nil, workersResults) + if err != nil { + return err + } continue taskResultLoop } } diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index c964b6ab5a..2b163a283d 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -147,6 +147,7 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { blockStateMock.EXPECT(). HasHeader(block2AnnounceHeader.Hash()). Return(false, nil) + blockStateMock.EXPECT().IsPaused().Return(false) blockStateMock.EXPECT(). BestBlockHeader(). @@ -272,6 +273,8 @@ func Test_chainSync_onBlockAnnounceHandshake_tipModeNeedToCatchup(t *testing.T) Return(block1AnnounceHeader, nil). Times(3) + blockStateMock.EXPECT().IsPaused().Return(false).Times(2) + expectedRequest := network.NewAscendingBlockRequests( block1AnnounceHeader.Number+1, block2AnnounceHeader.Number, network.BootstrapRequestData) @@ -532,6 +535,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorker(t *testing.T) { mockedBlockState := NewMockBlockState(ctrl) mockedBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo)) + mockedBlockState.EXPECT().IsPaused().Return(false) mockBabeVerifier := NewMockBabeVerifier(ctrl) mockStorageState := NewMockStorageState(ctrl) @@ -588,6 +592,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithTwoWorkers(t *testing.T) { mockTelemetry := NewMockTelemetry(ctrl) mockBlockState.EXPECT().GetHighestFinalisedHeader().Return(types.NewEmptyHeader(), nil).Times(1) + mockBlockState.EXPECT().IsPaused().Return(false) mockNetwork.EXPECT().Peers().Return([]common.PeerInfo{}).Times(1) // this test expects two workers responding each request with 128 blocks which means @@ -664,6 +669,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorkerFailing(t *testing. ctrl := gomock.NewController(t) mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo)) + mockBlockState.EXPECT().IsPaused().Return(false).Times(2) mockedGenesisHeader := types.NewHeader(common.NewHash([]byte{0}), trie.EmptyHash, trie.EmptyHash, 0, types.NewDigest()) @@ -676,6 +682,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorkerFailing(t *testing. mockTelemetry := NewMockTelemetry(ctrl) mockBlockState.EXPECT().GetHighestFinalisedHeader().Return(types.NewEmptyHeader(), nil).Times(1) + mockNetwork.EXPECT().Peers().Return([]common.PeerInfo{}).Times(1) // this test expects two workers responding each request with 128 blocks which means @@ -760,6 +767,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithProtocolNotSupported(t *test ctrl := gomock.NewController(t) mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo)) + mockBlockState.EXPECT().IsPaused().Return(false).Times(2) mockBlockState.EXPECT(). GetHighestFinalisedHeader(). Return(types.NewEmptyHeader(), nil). @@ -864,6 +872,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithNilHeaderInResponse(t *testi ctrl := gomock.NewController(t) mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo)) + mockBlockState.EXPECT().IsPaused().Return(false).Times(2) mockBlockState.EXPECT(). GetHighestFinalisedHeader(). Return(types.NewEmptyHeader(), nil). @@ -970,6 +979,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithResponseIsNotAChain(t *testi ctrl := gomock.NewController(t) mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo)) + mockBlockState.EXPECT().IsPaused().Return(false).Times(2) mockBlockState.EXPECT(). GetHighestFinalisedHeader(). Return(types.NewEmptyHeader(), nil). @@ -1073,6 +1083,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithReceivedBadBlock(t *testing. ctrl := gomock.NewController(t) mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo)) + mockBlockState.EXPECT().IsPaused().Return(false).Times(2) mockBlockState.EXPECT(). GetHighestFinalisedHeader(). Return(types.NewEmptyHeader(), nil). @@ -1197,6 +1208,7 @@ func TestChainSync_BootstrapSync_SucessfulSync_ReceivedPartialBlockData(t *testi ctrl := gomock.NewController(t) mockBlockState := NewMockBlockState(ctrl) mockBlockState.EXPECT().GetFinalisedNotifierChannel().Return(make(chan *types.FinalisationInfo)) + mockBlockState.EXPECT().IsPaused().Return(false).Times(2) mockBlockState.EXPECT(). GetHighestFinalisedHeader(). Return(types.NewEmptyHeader(), nil). diff --git a/dot/sync/errors.go b/dot/sync/errors.go index 08f89cacba..ab1e8a854f 100644 --- a/dot/sync/errors.go +++ b/dot/sync/errors.go @@ -8,11 +8,7 @@ import ( ) var ( - // ErrServiceStopped is returned when the service has been stopped - ErrServiceStopped = errors.New("service has been stopped") - - // ErrInvalidBlock is returned when a block cannot be verified - ErrInvalidBlock = errors.New("could not verify block") + errBlockStatePaused = errors.New("blockstate service has been paused") // ErrInvalidBlockRequest is returned when an invalid block request is received ErrInvalidBlockRequest = errors.New("invalid block request") diff --git a/dot/sync/interfaces.go b/dot/sync/interfaces.go index 89336bf46b..03820704a5 100644 --- a/dot/sync/interfaces.go +++ b/dot/sync/interfaces.go @@ -38,6 +38,9 @@ type BlockState interface { GetHeaderByNumber(num uint) (*types.Header, error) GetAllBlocksAtNumber(num uint) ([]common.Hash, error) IsDescendantOf(parent, child common.Hash) (bool, error) + + IsPaused() bool + Pause() error } // StorageState is the interface for the storage state diff --git a/dot/sync/mocks_test.go b/dot/sync/mocks_test.go index 5e1b70bb8f..bb57e94a7d 100644 --- a/dot/sync/mocks_test.go +++ b/dot/sync/mocks_test.go @@ -297,6 +297,34 @@ func (mr *MockBlockStateMockRecorder) IsDescendantOf(arg0, arg1 any) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsDescendantOf", reflect.TypeOf((*MockBlockState)(nil).IsDescendantOf), arg0, arg1) } +// IsPaused mocks base method. +func (m *MockBlockState) IsPaused() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsPaused") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsPaused indicates an expected call of IsPaused. +func (mr *MockBlockStateMockRecorder) IsPaused() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPaused", reflect.TypeOf((*MockBlockState)(nil).IsPaused)) +} + +// Pause mocks base method. +func (m *MockBlockState) Pause() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Pause") + ret0, _ := ret[0].(error) + return ret0 +} + +// Pause indicates an expected call of Pause. +func (mr *MockBlockStateMockRecorder) Pause() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pause", reflect.TypeOf((*MockBlockState)(nil).Pause)) +} + // Range mocks base method. func (m *MockBlockState) Range(arg0, arg1 common.Hash) ([]common.Hash, error) { m.ctrl.T.Helper() diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index bec64dfc7e..c2bebb339a 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -26,6 +26,11 @@ type Service struct { network Network } +// Pause Pauses the sync service +func (s *Service) Pause() error { + return s.blockState.Pause() +} + // Config is the configuration for the sync Service. type Config struct { LogLvl log.Level @@ -101,6 +106,10 @@ func (s *Service) HandleBlockAnnounce(from peer.ID, msg *network.BlockAnnounceMe logger.Debugf("received block announce from: %s, #%d (%s)", from, blockAnnounceHeader.Number, blockAnnounceHeaderHash.Short()) + if s.blockState.IsPaused() { + return errors.New("blockstate service is paused") + } + // if the peer reports a lower or equal best block number than us, // check if they are on a fork or not bestBlockHeader, err := s.blockState.BestBlockHeader() diff --git a/dot/sync/syncer_test.go b/dot/sync/syncer_test.go index 2f192ab6a3..6da3a07b3a 100644 --- a/dot/sync/syncer_test.go +++ b/dot/sync/syncer_test.go @@ -82,6 +82,7 @@ func TestService_HandleBlockAnnounce(t *testing.T) { "best_block_header_error": { serviceBuilder: func(ctrl *gomock.Controller) *Service { blockState := NewMockBlockState(ctrl) + blockState.EXPECT().IsPaused().Return(false) blockState.EXPECT().BestBlockHeader().Return(nil, errTest) return &Service{ blockState: blockState, @@ -95,6 +96,7 @@ func TestService_HandleBlockAnnounce(t *testing.T) { "number_smaller_than_best_block_number_get_hash_by_number_error": { serviceBuilder: func(ctrl *gomock.Controller) *Service { blockState := NewMockBlockState(ctrl) + blockState.EXPECT().IsPaused().Return(false) bestBlockHeader := &types.Header{Number: 2} blockState.EXPECT().BestBlockHeader().Return(bestBlockHeader, nil) blockState.EXPECT().GetHashByNumber(uint(1)).Return(common.Hash{}, errTest) @@ -111,6 +113,7 @@ func TestService_HandleBlockAnnounce(t *testing.T) { "number_smaller_than_best_block_number_and_same_hash": { serviceBuilder: func(ctrl *gomock.Controller) *Service { blockState := NewMockBlockState(ctrl) + blockState.EXPECT().IsPaused().Return(false) bestBlockHeader := &types.Header{Number: 2} blockState.EXPECT().BestBlockHeader().Return(bestBlockHeader, nil) blockState.EXPECT().GetHashByNumber(uint(1)).Return(block1AnnounceHeader.Hash(), nil) @@ -124,6 +127,7 @@ func TestService_HandleBlockAnnounce(t *testing.T) { "number_smaller_than_best_block_number_get_highest_finalised_header_error": { serviceBuilder: func(ctrl *gomock.Controller) *Service { blockState := NewMockBlockState(ctrl) + blockState.EXPECT().IsPaused().Return(false) bestBlockHeader := &types.Header{Number: 2} blockState.EXPECT().BestBlockHeader().Return(bestBlockHeader, nil) blockState.EXPECT().GetHashByNumber(uint(1)).Return(common.Hash{2}, nil) @@ -140,7 +144,7 @@ func TestService_HandleBlockAnnounce(t *testing.T) { "number_smaller_than_best_block_announced_number_equaks_finalised_number": { serviceBuilder: func(ctrl *gomock.Controller) *Service { blockState := NewMockBlockState(ctrl) - + blockState.EXPECT().IsPaused().Return(false) bestBlockHeader := &types.Header{Number: 2} blockState.EXPECT().BestBlockHeader().Return(bestBlockHeader, nil) blockState.EXPECT().GetHashByNumber(uint(1)). @@ -165,6 +169,7 @@ func TestService_HandleBlockAnnounce(t *testing.T) { "number_smaller_than_best_block_number_and_finalised_number_bigger_than_number": { serviceBuilder: func(ctrl *gomock.Controller) *Service { blockState := NewMockBlockState(ctrl) + blockState.EXPECT().IsPaused().Return(false) bestBlockHeader := &types.Header{Number: 2} blockState.EXPECT().BestBlockHeader().Return(bestBlockHeader, nil) blockState.EXPECT().GetHashByNumber(uint(1)). @@ -191,6 +196,7 @@ func TestService_HandleBlockAnnounce(t *testing.T) { "has_header_error": { serviceBuilder: func(ctrl *gomock.Controller) *Service { blockState := NewMockBlockState(ctrl) + blockState.EXPECT().IsPaused().Return(false) bestBlockHeader := &types.Header{Number: 3} blockState.EXPECT().BestBlockHeader().Return(bestBlockHeader, nil) blockState.EXPECT().GetHashByNumber(uint(2)). @@ -212,6 +218,7 @@ func TestService_HandleBlockAnnounce(t *testing.T) { "has_the_hash": { serviceBuilder: func(ctrl *gomock.Controller) *Service { blockState := NewMockBlockState(ctrl) + blockState.EXPECT().IsPaused().Return(false) bestBlockHeader := &types.Header{Number: 3} blockState.EXPECT().BestBlockHeader().Return(bestBlockHeader, nil) blockState.EXPECT().GetHashByNumber(uint(2)). @@ -228,8 +235,8 @@ func TestService_HandleBlockAnnounce(t *testing.T) { }, "number_bigger_than_best_block_number_added_in_disjoint_set_with_success": { serviceBuilder: func(ctrl *gomock.Controller) *Service { - blockState := NewMockBlockState(ctrl) + blockState.EXPECT().IsPaused().Return(false) bestBlockHeader := &types.Header{Number: 1} blockState.EXPECT().BestBlockHeader().Return(bestBlockHeader, nil) chainSyncMock := NewMockChainSync(ctrl) diff --git a/lib/services/services.go b/lib/services/services.go index a26fd4c628..275f253374 100644 --- a/lib/services/services.go +++ b/lib/services/services.go @@ -13,6 +13,11 @@ type Service interface { Stop() error } +// Pausable must be implemented by services that must be paused before shutdown +type Pausable interface { + Pause() error +} + // ServiceRegistry is a structure to manage core system services type ServiceRegistry struct { services map[reflect.Type]Service // map of types to service instances @@ -52,9 +57,30 @@ func (s *ServiceRegistry) StartAll() { s.logger.Debug("All services started.") } +// PauseServices pauses key services before shutdown to allow a graceful shutdown +func (s *ServiceRegistry) PauseServices() { + s.logger.Infof("Pausing key services") + for _, typ := range s.serviceTypes { + pausable, ok := s.services[typ].(Pausable) + if ok && pausable != nil { + err := pausable.Pause() + if err != nil { + s.logger.Errorf("Error pausing %s service: %s", typ, err) + } + } else if ok { + s.logger.Errorf("Error pausing required services") + } + } + + s.logger.Infof("Paused key services") +} + // StopAll calls `Service.Stop()` for all registered services func (s *ServiceRegistry) StopAll() { s.logger.Infof("Stopping services: %v", s.serviceTypes) + + s.PauseServices() + for _, typ := range s.serviceTypes { s.logger.Debugf("Stopping service %s", typ) err := s.services[typ].Stop()