diff --git a/server/coordinator/eventdispatch/dispatch.go b/server/coordinator/eventdispatch/dispatch.go index 43f26fc5..9d35e07c 100644 --- a/server/coordinator/eventdispatch/dispatch.go +++ b/server/coordinator/eventdispatch/dispatch.go @@ -18,8 +18,8 @@ package eventdispatch import ( "context" - "github.com/CeresDB/ceresdbproto/golang/pkg/metaeventpb" + "github.com/CeresDB/ceresdbproto/golang/pkg/metaeventpb" "github.com/CeresDB/ceresmeta/server/cluster/metadata" ) diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index 55dca617..2fe3bad4 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -18,7 +18,6 @@ package coordinator import ( "context" - "github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transfertable" "github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb" "github.com/CeresDB/ceresmeta/server/cluster/metadata" @@ -30,6 +29,7 @@ import ( "github.com/CeresDB/ceresmeta/server/coordinator/procedure/ddl/droptable" "github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/split" "github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transferleader" + "github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transfertable" "github.com/CeresDB/ceresmeta/server/id" "github.com/CeresDB/ceresmeta/server/storage" "github.com/pkg/errors" diff --git a/server/coordinator/procedure/operation/transfertable/transfer_table.go b/server/coordinator/procedure/operation/transfertable/transfer_table.go index c98c01ee..dd73cc88 100644 --- a/server/coordinator/procedure/operation/transfertable/transfer_table.go +++ b/server/coordinator/procedure/operation/transfertable/transfer_table.go @@ -1,3 +1,19 @@ +/* + * Copyright 2022 The CeresDB Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package transfertable import ( @@ -48,8 +64,8 @@ type Procedure struct { tableInfo metadata.TableInfo hasSrcShardNode bool - srcShardNodeWithVersion metadata.ShardNodeWithVersion - relatedVersionInfo procedure.RelatedVersionInfo + srcShardNodeWithVersion *metadata.ShardNodeWithVersion + relatedVersionInfo *procedure.RelatedVersionInfo // Protect the state. // FIXME: the procedure should be executed sequentially, so any need to use a lock to protect it? @@ -90,10 +106,10 @@ func NewProcedure(params ProcedureParams) (procedure.Procedure, error) { shardWithVersion := map[storage.ShardID]uint64{} hasSrcShardNode := false - srcShardNodeWithVersion := metadata.ShardNodeWithVersion{} + var srcShardNodeWithVersion *metadata.ShardNodeWithVersion if len(entry.NodeShards) != 0 { hasSrcShardNode = true - srcShardNodeWithVersion = entry.NodeShards[0] + srcShardNodeWithVersion = &entry.NodeShards[0] shardWithVersion[srcShardNodeWithVersion.ShardID()] = srcShardNodeWithVersion.ShardInfo.Version } @@ -102,7 +118,7 @@ func NewProcedure(params ProcedureParams) (procedure.Procedure, error) { return nil, metadata.ErrTransferTable.WithCausef("table not found") } - relatedVersionInfo := procedure.RelatedVersionInfo{ + relatedVersionInfo := &procedure.RelatedVersionInfo{ ClusterID: clusterMetadata.GetClusterID(), ShardWithVersion: shardWithVersion, ClusterVersion: clusterMetadata.GetClusterViewVersion(), @@ -200,7 +216,7 @@ func (p *Procedure) Start(ctx context.Context) error { } func (p *Procedure) RelatedVersionInfo() procedure.RelatedVersionInfo { - return p.relatedVersionInfo + return *p.relatedVersionInfo } func (p *Procedure) persist(_ctx context.Context) error { diff --git a/server/coordinator/procedure/test/common.go b/server/coordinator/procedure/test/common.go index 8f271f59..bf233691 100644 --- a/server/coordinator/procedure/test/common.go +++ b/server/coordinator/procedure/test/common.go @@ -69,12 +69,12 @@ func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ eventdispa return 0, nil } -func (m MockDispatch) OpenTableOnShard(_ context.Context, _ string, _ eventdispatch.OpenTableOnShardRequest) error { - return nil +func (m MockDispatch) OpenTableOnShard(_ context.Context, _ string, _ eventdispatch.OpenTableOnShardRequest) (eventdispatch.OpenTableOnShardResponse, error) { + return eventdispatch.OpenTableOnShardResponse{Version: 0}, nil } -func (m MockDispatch) CloseTableOnShard(_ context.Context, _ string, _ eventdispatch.CloseTableOnShardRequest) error { - return nil +func (m MockDispatch) CloseTableOnShard(_ context.Context, _ string, _ eventdispatch.CloseTableOnShardRequest) (eventdispatch.CloseTableOnShardResponse, error) { + return eventdispatch.CloseTableOnShardResponse{Version: 0}, nil } type MockStorage struct{} diff --git a/server/service/http/api.go b/server/service/http/api.go index 58df0b76..e818c097 100644 --- a/server/service/http/api.go +++ b/server/service/http/api.go @@ -220,7 +220,7 @@ func (a *API) transferTable(req *http.Request) apiFuncResult { } select { - case _ = <-resultCh: + case <-resultCh: log.Info("transfer leader succeed", zap.String("request", fmt.Sprintf("%+v", transferTableRequest)), zap.Int64("costTime", time.Since(start).Milliseconds())) return okResult(statusSuccess) case err = <-errorCh: