From d3ae8e9232c25d6d52a661f28b9b2a32d0a8d98e Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Wed, 11 Dec 2024 15:48:42 +0800 Subject: [PATCH] fix: delay the wait other coord logic in query coord after query coord change into standby state (#38259) issue: https://github.com/milvus-io/milvus/issues/37764 - After removing rpc layer from mixcoord, the querycoord at standby mode will be blocked forever of deployment rolling --------- Signed-off-by: chyezh --- internal/distributed/querycoord/service.go | 16 ------------- .../distributed/querycoord/service_test.go | 8 ------- internal/querycoordv2/server.go | 17 +++++++++++++ internal/querycoordv2/server_test.go | 24 +++++++++++++++++++ 4 files changed, 41 insertions(+), 24 deletions(-) diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 3273e6de281a3..34ee41869b23b 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -37,7 +37,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" qc "github.com/milvus-io/milvus/internal/querycoordv2" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/pkg/log" @@ -172,33 +171,18 @@ func (s *Server) init() error { } // wait for master init or healthy - log.Info("QueryCoord try to wait for RootCoord ready") - err = componentutil.WaitForComponentHealthy(s.loopCtx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200) - if err != nil { - log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err)) - panic(err) - } - if err := s.SetRootCoord(s.rootCoord); err != nil { panic(err) } - log.Info("QueryCoord report RootCoord ready") // --- Data service client --- if s.dataCoord == nil { s.dataCoord = coordclient.GetDataCoordClient(s.loopCtx) } - log.Info("QueryCoord try to wait for DataCoord ready") - err = componentutil.WaitForComponentHealthy(s.loopCtx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200) - if err != nil { - log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err)) - panic(err) - } if err := s.SetDataCoord(s.dataCoord); err != nil { panic(err) } - log.Info("QueryCoord report DataCoord ready") if err := s.queryCoord.Init(); err != nil { return err diff --git a/internal/distributed/querycoord/service_test.go b/internal/distributed/querycoord/service_test.go index 1e80f2bad3916..79301bdd563fb 100644 --- a/internal/distributed/querycoord/service_test.go +++ b/internal/distributed/querycoord/service_test.go @@ -59,16 +59,8 @@ func Test_NewServer(t *testing.T) { assert.NotNil(t, server) mdc := mocks.NewMockDataCoordClient(t) - mdc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy}, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - }, nil) mrc := mocks.NewMockRootCoordClient(t) - mrc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy}, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - }, nil) mqc := getQueryCoord() successStatus := merr.Success() diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index af9b679ebbe9c..9922274d6b316 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -54,6 +54,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -276,6 +277,22 @@ func (s *Server) Init() error { } func (s *Server) initQueryCoord() error { + // wait for master init or healthy + log.Info("QueryCoord try to wait for RootCoord ready") + if err := componentutil.WaitForComponentHealthy(s.ctx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200); err != nil { + log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err)) + return errors.Wrap(err, "RootCoord not ready") + } + log.Info("QueryCoord report RootCoord ready") + + // wait for master init or healthy + log.Info("QueryCoord try to wait for DataCoord ready") + if err := componentutil.WaitForComponentHealthy(s.ctx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200); err != nil { + log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err)) + return errors.Wrap(err, "DataCoord not ready") + } + log.Info("QueryCoord report DataCoord ready") + s.UpdateStateCode(commonpb.StateCode_Initializing) log.Info("start init querycoord", zap.Any("State", commonpb.StateCode_Initializing)) // Init KV and ID allocator diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 948d0d7a9277b..0a82c2000bb09 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -321,7 +321,19 @@ func (suite *ServerSuite) TestEnableActiveStandby() { suite.server, err = suite.newQueryCoord() suite.NoError(err) mockRootCoord := coordMocks.NewMockRootCoordClient(suite.T()) + mockRootCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ + State: &milvuspb.ComponentInfo{ + StateCode: commonpb.StateCode_Healthy, + }, + Status: merr.Success(), + }, nil).Maybe() mockDataCoord := coordMocks.NewMockDataCoordClient(suite.T()) + mockDataCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ + State: &milvuspb.ComponentInfo{ + StateCode: commonpb.StateCode_Healthy, + }, + Status: merr.Success(), + }, nil).Maybe() mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ Status: merr.Success(), @@ -612,7 +624,19 @@ func (suite *ServerSuite) hackServer() { func (suite *ServerSuite) hackBroker(server *Server) { mockRootCoord := coordMocks.NewMockRootCoordClient(suite.T()) + mockRootCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ + State: &milvuspb.ComponentInfo{ + StateCode: commonpb.StateCode_Healthy, + }, + Status: merr.Success(), + }, nil).Maybe() mockDataCoord := coordMocks.NewMockDataCoordClient(suite.T()) + mockDataCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ + State: &milvuspb.ComponentInfo{ + StateCode: commonpb.StateCode_Healthy, + }, + Status: merr.Success(), + }, nil).Maybe() for _, collection := range suite.collections { mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{