diff --git a/pkg/owner/BUILD.bazel b/pkg/owner/BUILD.bazel index ad74fc9e74c1d..a21a5b9ce5ecc 100644 --- a/pkg/owner/BUILD.bazel +++ b/pkg/owner/BUILD.bazel @@ -37,7 +37,7 @@ go_test( ], embed = [":owner"], flaky = True, - shard_count = 11, + shard_count = 12, deps = [ "//pkg/parser/terror", "//pkg/testkit/testfailpoint", diff --git a/pkg/owner/fail_test.go b/pkg/owner/fail_test.go index b78b393db2280..11045a71b7b1a 100644 --- a/pkg/owner/fail_test.go +++ b/pkg/owner/fail_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/integration" "google.golang.org/grpc" ) @@ -110,3 +111,26 @@ func TestFailNewSession(t *testing.T) { require.Truef(t, isContextDone, "err %v", err) }() } + +func TestOwnerManagerClose(t *testing.T) { + integration.BeforeTestExternal(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + client := cluster.Client(0) + ownerMgr := NewOwnerManager(context.Background(), client, "ddl", "1", "/owner/key") + err := ownerMgr.CampaignOwner() + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/waitCancelCtx", `return(true)`)) + defer func() { + // require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/waitCancelCtx")) + failpoint.Disable("github.com/pingcap/tidb/pkg/util/waitCancelCtx") + }() + // Close etcd session causes campaign loop to refreshSession() + // When the grpc is not available, the campaign should not hang. + ownerMgr.etcdSes.Close() + + // This should not hang. + ownerMgr.Close() +} diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index c11ef66cf9bd1..30b48a9e1d864 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -144,7 +144,7 @@ type ownerManager struct { } // NewOwnerManager creates a new Manager. -func NewOwnerManager(ctx context.Context, etcdCli *clientv3.Client, prompt, id, key string) Manager { +func NewOwnerManager(ctx context.Context, etcdCli *clientv3.Client, prompt, id, key string) *ownerManager { return &ownerManager{ etcdCli: etcdCli, id: id, @@ -176,9 +176,9 @@ func (m *ownerManager) SetListener(listener Listener) { m.listener = listener } -func (m *ownerManager) ForceToBeOwner(context.Context) error { +func (m *ownerManager) ForceToBeOwner(ctx context.Context) error { m.logger.Info("force to be owner") - if err := m.refreshSession(util2.NewSessionDefaultRetryCnt, ManagerSessionTTL); err != nil { + if err := m.refreshSession(ctx, util2.NewSessionDefaultRetryCnt, ManagerSessionTTL); err != nil { return errors.Trace(err) } @@ -287,7 +287,7 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error { } if m.etcdSes == nil { m.logger.Info("start campaign owner") - if err := m.refreshSession(util2.NewSessionDefaultRetryCnt, ttl); err != nil { + if err := m.refreshSession(m.ctx, util2.NewSessionDefaultRetryCnt, ttl); err != nil { return errors.Trace(err) } } else { @@ -364,13 +364,13 @@ func (m *ownerManager) campaignLoop(campaignContext context.Context) { select { case <-m.etcdSes.Done(): m.logger.Info("etcd session done, refresh it") - if err2 := m.refreshSession(util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil { + if err2 := m.refreshSession(campaignContext, util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil { m.logger.Info("break campaign loop, refresh session failed", zap.Error(err2)) return } case <-leaseNotFoundCh: m.logger.Info("meet lease not found error, refresh session") - if err2 := m.refreshSession(util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil { + if err2 := m.refreshSession(campaignContext, util2.NewSessionRetryUnlimited, ManagerSessionTTL); err2 != nil { m.logger.Info("break campaign loop, refresh session failed", zap.Error(err2)) return } @@ -434,7 +434,7 @@ func (m *ownerManager) closeSession() { } } -func (m *ownerManager) refreshSession(retryCnt, ttl int) error { +func (m *ownerManager) refreshSession(ctx context.Context, retryCnt, ttl int) error { m.closeSession() // Note: we must use manager's context to create session. If we use campaign // context and the context is cancelled, the created session cannot be closed @@ -443,7 +443,7 @@ func (m *ownerManager) refreshSession(retryCnt, ttl int) error { // loop is refreshing the session, it might wait for a long time to return, it // should be fine as long as network is ok, and acceptable to wait when not. logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", m.prompt, m.key, m.id) - sess, err2 := util2.NewSession(m.ctx, logPrefix, m.etcdCli, retryCnt, ttl) + sess, err2 := util2.NewSession(ctx, logPrefix, m.etcdCli, retryCnt, ttl) if err2 != nil { return errors.Trace(err2) } diff --git a/pkg/util/etcd.go b/pkg/util/etcd.go index 480ee66829288..3c13725ce3050 100644 --- a/pkg/util/etcd.go +++ b/pkg/util/etcd.go @@ -68,6 +68,13 @@ func NewSession(ctx context.Context, logPrefix string, etcdCli *clientv3.Client, } }) + failpoint.Inject("waitCancelCtx", func(val failpoint.Value) { + if val.(bool) { + <-ctx.Done() + ctx = context.Background() + } + }) + startTime := time.Now() etcdSession, err = concurrency.NewSession(etcdCli, concurrency.WithTTL(ttl), concurrency.WithContext(ctx))