diff --git a/pkg/controller/pd_control.go b/pkg/controller/pd_control.go index b42f88bb0a..8a8124fb99 100644 --- a/pkg/controller/pd_control.go +++ b/pkg/controller/pd_control.go @@ -133,3 +133,11 @@ func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string) pdControl.SetPDClientWithAddress(peerURL, pdClient) return pdClient } + +// NewFakeEtcdClient creates a fake etcdClient that is set as the etcd client +func NewFakeEtcdClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *pdapi.FakeEtcdClient { + etcdClient := pdapi.NewFakeEtcdClient() + pdControl.SetEtcdClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), etcdClient) + + return etcdClient +} diff --git a/pkg/manager/member/pd_member_manager.go b/pkg/manager/member/pd_member_manager.go index f4d14d9fe6..41bd59c103 100644 --- a/pkg/manager/member/pd_member_manager.go +++ b/pkg/manager/member/pd_member_manager.go @@ -16,11 +16,13 @@ package member import ( "context" "fmt" + "net/url" "path" "regexp" "strconv" "strings" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/tidb-operator/pkg/apis/label" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" @@ -30,6 +32,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/manager/suspender" mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils" "github.com/pingcap/tidb-operator/pkg/manager/volumes" + "github.com/pingcap/tidb-operator/pkg/pdapi" "github.com/pingcap/tidb-operator/pkg/third_party/k8s" "github.com/pingcap/tidb-operator/pkg/util" @@ -387,6 +390,8 @@ func (m *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *a memberID, memberHealth.ClientUrls, memberHealth, ns, tcName) continue } + // try to sync peer urls + m.syncPDPeerUrls(tc, name, pdClient) status := v1alpha1.PDMember{ Name: name, @@ -993,6 +998,70 @@ func (m *pdMemberManager) collectUnjoinedMembers(tc *v1alpha1.TidbCluster, set * return nil } +func (m *pdMemberManager) syncPDPeerUrls(tc *v1alpha1.TidbCluster, pdName string, pdCli pdapi.PDClient) error { + ns := tc.GetNamespace() + tcName := tc.GetName() + + var ( + tlsEnabled = tc.IsTLSClusterEnabled() + member *pdpb.Member + ) + members, err := pdCli.GetMembers() + if err != nil { + return err + } + for _, m := range members.Members { + if m.Name == pdName { + member = m + break + } + } + if member == nil { + return fmt.Errorf("tidbcluster: [%s/%s] failed to find member %s in pd cluster", ns, tcName, pdName) + } + + var ( + newPeers []string + needSync bool + ) + for _, peerUrl := range member.PeerUrls { + u, err := url.Parse(peerUrl) + if err != nil { + return fmt.Errorf("tidbcluster: [%s/%s] failed to parse peer url %s, %v", ns, tcName, peerUrl, err) + } + // check if peer url need to be updated + if tlsEnabled != (u.Scheme == "https") { + needSync = true + if !tlsEnabled { + u.Scheme = "http" + } else { + u.Scheme = "https" + } + newPeers = append(newPeers, u.String()) + } else { + newPeers = append(newPeers, peerUrl) + } + } + + if needSync { + pdEtcdClient, err := m.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name, + tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain)) + + if err != nil { + return fmt.Errorf("tidbcluster: [%s/%s] failed to create pd etcd client, %v", ns, tcName, err) + } + defer pdEtcdClient.Close() + err = pdEtcdClient.UpdateMember(member.GetMemberId(), newPeers) + if err != nil { + return fmt.Errorf("tidbcluster: [%s/%s] failed to update pd member: %s peer urls, %v", ns, tcName, pdName, err) + } + klog.Infof("tidbcluster: [%s/%s] pd upgrader: sync pd member: %s peer urls successfully, from %v to %v", + ns, tcName, pdName, member.PeerUrls, newPeers) + } + + return nil +} + // TODO: Support check status http request in future. func buildPDReadinessProbHandler(tc *v1alpha1.TidbCluster) corev1.ProbeHandler { return corev1.ProbeHandler{ diff --git a/pkg/manager/member/pd_member_manager_test.go b/pkg/manager/member/pd_member_manager_test.go index 149b1b92d4..f6691e8c65 100644 --- a/pkg/manager/member/pd_member_manager_test.go +++ b/pkg/manager/member/pd_member_manager_test.go @@ -21,6 +21,7 @@ import ( "github.com/google/go-cmp/cmp" . "github.com/onsi/gomega" + "github.com/pingcap/kvproto/pkg/pdpb" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -2855,6 +2856,132 @@ func TestPDMemberManagerSyncPDStsWhenPdNotJoinCluster(t *testing.T) { } } +func TestPDMemberManagerSyncPDPeerUrlsSts(t *testing.T) { + g := NewGomegaWithT(t) + type testcase struct { + name string + modify func(cluster *v1alpha1.TidbCluster) + pdHealth *pdapi.HealthInfo + err bool + membersInfo *pdapi.MembersInfo + expectSchema string + } + + testFn := func(test *testcase, t *testing.T) { + tc := newTidbClusterForPD() + ns := tc.Namespace + tcName := tc.Name + + pmm, _, _ := newFakePDMemberManager() + pmm.deps.PDControl = pdapi.NewFakePDControl(pmm.deps.KubeInformerFactory.Core().V1().Secrets().Lister(), true) + fakePDControl := pmm.deps.PDControl.(*pdapi.FakePDControl) + pdClient := controller.NewFakePDClient(fakePDControl, tc) + etcdClient := controller.NewFakeEtcdClient(fakePDControl, tc) + + pdClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) { + return test.pdHealth, nil + }) + pdClient.AddReaction(pdapi.GetClusterActionType, func(action *pdapi.Action) (interface{}, error) { + return &metapb.Cluster{Id: uint64(1)}, nil + }) + pdClient.AddReaction(pdapi.GetMembersActionType, func(action *pdapi.Action) (interface{}, error) { + return test.membersInfo, nil + }) + etcdClient.AddReaction(pdapi.EtcdUpdateMemberActionType, func(action *pdapi.Action) (interface{}, error) { + return test.expectSchema, nil + }) + + err := pmm.Sync(tc) + g.Expect(controller.IsRequeueError(err)).To(BeTrue()) + + _, err = pmm.deps.ServiceLister.Services(ns).Get(controller.PDMemberName(tcName)) + g.Expect(err).NotTo(HaveOccurred()) + _, err = pmm.deps.ServiceLister.Services(ns).Get(controller.PDPeerMemberName(tcName)) + g.Expect(err).NotTo(HaveOccurred()) + _, err = pmm.deps.StatefulSetLister.StatefulSets(ns).Get(controller.PDMemberName(tcName)) + g.Expect(err).NotTo(HaveOccurred()) + + test.modify(tc) + err = pmm.syncPDStatefulSetForTidbCluster(tc) + if test.err { + g.Expect(err).To(HaveOccurred()) + } else { + g.Expect(err).NotTo(HaveOccurred()) + } + } + tests := []testcase{ + { + name: "upgrade from non-tls to tls", + expectSchema: "https://", + modify: func(tc *v1alpha1.TidbCluster) { + tc.Spec = v1alpha1.TidbClusterSpec{ + PD: &v1alpha1.PDSpec{ + ComponentSpec: v1alpha1.ComponentSpec{ + Image: "pingcap/pd:v8.1.0", + }, + }, + TiDB: &v1alpha1.TiDBSpec{ + TLSClient: &v1alpha1.TiDBTLSClient{ + Enabled: true, + }, + }, + TiKV: &v1alpha1.TiKVSpec{}, + TLSCluster: &v1alpha1.TLSCluster{Enabled: true}, + } + }, + pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{ + {Name: "pd0", MemberID: 1, ClientUrls: []string{"http://pd0:2379"}, Health: true}, + }}, + membersInfo: &pdapi.MembersInfo{ + Members: []*pdpb.Member{ + { + Name: "pd0", + MemberId: 1, + PeerUrls: []string{"http://pd0:2380"}, + }, + }, + }, + }, + { + name: "upgrade from tls to non-tls", + expectSchema: "http://", + modify: func(tc *v1alpha1.TidbCluster) { + tc.Spec = v1alpha1.TidbClusterSpec{ + PD: &v1alpha1.PDSpec{ + ComponentSpec: v1alpha1.ComponentSpec{ + Image: "pingcap/pd:v8.1.0", + }, + }, + TiDB: &v1alpha1.TiDBSpec{ + TLSClient: &v1alpha1.TiDBTLSClient{ + Enabled: false, + }, + }, + TiKV: &v1alpha1.TiKVSpec{}, + TLSCluster: &v1alpha1.TLSCluster{Enabled: false}, + } + }, + pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{ + {Name: "pd0", MemberID: 1, ClientUrls: []string{"https://pd0:2379"}, Health: true}, + }}, + membersInfo: &pdapi.MembersInfo{ + Members: []*pdpb.Member{ + { + Name: "pd0", + MemberId: 1, + PeerUrls: []string{"https://pd0:2380"}, + }, + }, + }, + }, + } + for i := range tests { + t.Logf("begin: %s", tests[i].name) + testFn(&tests[i], t) + t.Logf("end: %s", tests[i].name) + } +} + func TestPDShouldRecover(t *testing.T) { pods := []*v1.Pod{ { diff --git a/pkg/manager/member/pd_ms_upgrader.go b/pkg/manager/member/pd_ms_upgrader.go index 3c99ad6499..28923be244 100644 --- a/pkg/manager/member/pd_ms_upgrader.go +++ b/pkg/manager/member/pd_ms_upgrader.go @@ -50,7 +50,7 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St } curService := controller.PDMSTrimName(newSet.Name) - klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService) + klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService) if tc.Status.PDMS[curService] == nil { tc.Status.PDMS[curService] = &v1alpha1.PDMSStatus{Name: curService} return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS component is nil, can not to be upgraded, component: %s", ns, tcName, curService) @@ -62,9 +62,9 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St if oldTrimName != curService { return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS oldTrimName is %s, not equal to componentName: %s", ns, tcName, oldTrimName, curService) } - klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName) + klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName) if tc.PDMSScaling(oldTrimName) { - klog.Infof("TidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS", + klog.Infof("tidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS", ns, tcName, tc.Status.PDMS[curService].Phase) _, podSpec, err := GetLastAppliedConfig(oldSet) if err != nil { @@ -141,7 +141,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n return err } - klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName, + klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName, primary, upgradePDMSName, upgradePodName) // If current pdms is primary, transfer primary to other pdms pod if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePDMSName) { @@ -152,15 +152,15 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n } if targetName != "" { - klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName) + klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName) err := controller.GetPDMSClient(u.deps.PDControl, tc, curService).TransferPrimary(targetName) if err != nil { - klog.Errorf("TidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err) + klog.Errorf("tidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err) return err } - klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName) + klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName) } else { - klog.Warningf("TidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName) + klog.Warningf("tidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName) } } } @@ -177,7 +177,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string { ns := tc.GetNamespace() tcName := tc.GetName() - klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName) + klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName) ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet) // set ordinal to max ordinal if ordinal isn't exist @@ -202,7 +202,7 @@ func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.Stat targetName = PDMSPodName(tcName, list[0], controller.PDMSTrimName(newSet.Name)) } - klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName) + klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName) return targetName } diff --git a/pkg/manager/member/pd_upgrader.go b/pkg/manager/member/pd_upgrader.go index 5b82096de4..a29a92938f 100644 --- a/pkg/manager/member/pd_upgrader.go +++ b/pkg/manager/member/pd_upgrader.go @@ -23,7 +23,6 @@ import ( mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils" "github.com/pingcap/tidb-operator/pkg/pdapi" "github.com/pingcap/tidb-operator/pkg/third_party/k8s" - apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -61,7 +60,7 @@ func (u *pdUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Stat return fmt.Errorf("tidbcluster: [%s/%s]'s pd status sync failed, can not to be upgraded", ns, tcName) } if tc.PDScaling() { - klog.Infof("TidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd", + klog.Infof("tidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd", ns, tcName, tc.Status.PD.Phase) _, podSpec, err := GetLastAppliedConfig(oldSet) if err != nil { @@ -166,13 +165,13 @@ func (u *pdUpgrader) upgradePDPod(tc *v1alpha1.TidbCluster, ordinal int32, newSe if targetName != "" { err := u.transferPDLeaderTo(tc, targetName) if err != nil { - klog.Errorf("pd upgrader: failed to transfer pd leader to: %s, %v", targetName, err) + klog.Errorf("tidbcluster: [%s/%s] pd upgrader: failed to transfer pd leader to: %s, %v", ns, tcName, targetName, err) return err } - klog.Infof("pd upgrader: transfer pd leader to: %s successfully", targetName) + klog.Infof("tidbcluster: [%s/%s] pd upgrader: transfer pd leader to: %s successfully", ns, tcName, targetName) return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd member: [%s] is transferring leader to pd member: [%s]", ns, tcName, upgradePdName, targetName) } else { - klog.Warningf("pd upgrader: skip to transfer pd leader, because can not find a suitable pd") + klog.Warningf("tidbcluster: [%s/%s] pd upgrader: skip to transfer pd leader, because can not find a suitable pd", ns, tcName) } } diff --git a/pkg/pdapi/fake_pdapi.go b/pkg/pdapi/fake_pdapi.go index 80138fe4a5..59c40383f9 100644 --- a/pkg/pdapi/fake_pdapi.go +++ b/pkg/pdapi/fake_pdapi.go @@ -15,6 +15,7 @@ package pdapi import ( "fmt" + "strings" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -339,3 +340,81 @@ func (c *FakePDMSClient) TransferPrimary(newPrimary string) error { _, err := c.fakeAPI(PDMSTransferPrimaryActionType, action) return err } + +const ( + EtcdGetActionType ActionType = "Get" + EtcdPutActionType ActionType = "Put" + EtcdDeleteActionType ActionType = "Delete" + EtcdUpdateMemberActionType ActionType = "UpdateMember" + EtcdCloseActionType ActionType = "Close" +) + +// FakeEtcdClient implements a fake version of EtcdClient. +type FakeEtcdClient struct { + reactions map[ActionType]Reaction +} + +func NewFakeEtcdClient() *FakeEtcdClient { + return &FakeEtcdClient{reactions: map[ActionType]Reaction{}} +} + +func (c *FakeEtcdClient) AddReaction(actionType ActionType, reaction Reaction) { + c.reactions[actionType] = reaction +} + +// fakeAPI is a small helper for fake API calls +func (c *FakeEtcdClient) fakeAPI(actionType ActionType, action *Action) (interface{}, error) { + if reaction, ok := c.reactions[actionType]; ok { + result, err := reaction(action) + if err != nil { + return nil, err + } + return result, nil + } + return nil, &NotFoundReaction{actionType} +} + +func (c *FakeEtcdClient) Get(_ string, _ bool) (kvs []*KeyValue, err error) { + action := &Action{} + nil, err := c.fakeAPI(EtcdGetActionType, action) + if err != nil { + return + } + return +} + +func (c *FakeEtcdClient) PutKey(_, _ string) error { + action := &Action{} + _, err := c.fakeAPI(EtcdPutActionType, action) + return err +} + +func (c *FakeEtcdClient) PutTTLKey(_, _ string, _ int64) error { + action := &Action{} + _, err := c.fakeAPI(EtcdPutActionType, action) + return err +} + +func (c *FakeEtcdClient) DeleteKey(_ string) error { + action := &Action{} + _, err := c.fakeAPI(EtcdDeleteActionType, action) + return err +} + +func (c *FakeEtcdClient) UpdateMember(_ uint64, members []string) error { + action := &Action{} + res, err := c.fakeAPI(EtcdUpdateMemberActionType, action) + expectSchema := res.(string) + for _, member := range members { + if !strings.Contains(member, expectSchema) { + panic("schema not match") + } + } + return err +} + +func (c *FakeEtcdClient) Close() error { + action := &Action{} + _, err := c.fakeAPI(EtcdCloseActionType, action) + return err +} diff --git a/pkg/pdapi/pd_control.go b/pkg/pdapi/pd_control.go index 811f7d6c01..a2960e0ed3 100644 --- a/pkg/pdapi/pd_control.go +++ b/pkg/pdapi/pd_control.go @@ -333,12 +333,31 @@ func genEtcdClientUrl(namespace Namespace, clusterName, clusterDomain string, he // FakePDControl implements a fake version of PDControlInterface. type FakePDControl struct { defaultPDControl + skipTLS bool } -func NewFakePDControl(secretLister corelisterv1.SecretLister) *FakePDControl { - return &FakePDControl{ - defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}, pdMSClients: map[string]PDMSClient{}}, +func NewFakePDControl(secretLister corelisterv1.SecretLister, skipTLS ...bool) *FakePDControl { + fake := &FakePDControl{ + defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}, + pdMSClients: map[string]PDMSClient{}, pdEtcdClients: map[string]PDEtcdClient{}}, + false, } + if len(skipTLS) > 0 { + fake.skipTLS = true + } + return fake +} + +func (fpc *FakePDControl) GetPDClient(namespace Namespace, tcName string, tlsEnabled bool, opts ...Option) PDClient { + if fpc.skipTLS { + return fpc.defaultPDControl.pdClients[genClientKey("http", namespace, tcName, "")] + } + // return default GetPDClient + return fpc.defaultPDControl.GetPDClient(namespace, tcName, tlsEnabled, opts...) +} + +func (fpc *FakePDControl) GetPDEtcdClient(namespace Namespace, tcName string, _ bool, _ ...Option) (PDEtcdClient, error) { + return fpc.defaultPDControl.pdEtcdClients[genClientKey("http", namespace, tcName, "")], nil } func (fpc *FakePDControl) SetPDClient(namespace Namespace, tcName string, pdclient PDClient) { @@ -364,3 +383,7 @@ func (fpc *FakePDControl) SetPDMSClientWithClusterDomain(namespace Namespace, tc func (fpc *FakePDControl) SetPDMSClientWithAddress(peerURL string, pdmsclient PDMSClient) { fpc.defaultPDControl.pdMSClients[peerURL] = pdmsclient } + +func (fpc *FakePDControl) SetEtcdClient(namespace Namespace, tcName string, etcdClient PDEtcdClient) { + fpc.defaultPDControl.pdEtcdClients[genClientKey("http", namespace, tcName, "")] = etcdClient +} diff --git a/pkg/pdapi/pdetcd.go b/pkg/pdapi/pdetcd.go index 68866d3612..7cf6b5fd20 100644 --- a/pkg/pdapi/pdetcd.go +++ b/pkg/pdapi/pdetcd.go @@ -33,10 +33,12 @@ type PDEtcdClient interface { Get(key string, prefix bool) (kvs []*KeyValue, err error) // PutKey will put key to the target pd etcd cluster PutKey(key, value string) error - // PutKey will put key with ttl to the target pd etcd cluster + // PutTTLKey will put key with ttl to the target pd etcd cluster PutTTLKey(key, value string, ttl int64) error // DeleteKey will delete key from the target pd etcd cluster DeleteKey(key string) error + // UpdateMember will update the member list of the target pd etcd cluster + UpdateMember(id uint64, peerAddrs []string) error // Close will close the etcd connection Close() error } @@ -121,3 +123,15 @@ func (c *pdEtcdClient) DeleteKey(key string) error { } return nil } + +func (c *pdEtcdClient) UpdateMember(id uint64, peerAddrs []string) error { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + _, err := c.etcdClient.MemberUpdate(ctx, id, peerAddrs) + if err != nil { + return err + } + + return nil +} diff --git a/tests/e2e/tidbcluster/tidbcluster.go b/tests/e2e/tidbcluster/tidbcluster.go index 7402637aca..70140674cd 100644 --- a/tests/e2e/tidbcluster/tidbcluster.go +++ b/tests/e2e/tidbcluster/tidbcluster.go @@ -1232,6 +1232,207 @@ var _ = ginkgo.Describe("TiDBCluster", func() { ginkgo.By("Check custom labels and annotations for TidbInitializer") checkInitializerCustomLabelAndAnn(ti, c) }) + + ginkgo.It(sc.name, func() { + tcName := "upgrade-tls" + + ginkgo.By("Installing tidb CA certificate") + err := InstallTiDBIssuer(ns, tcName) + framework.ExpectNoError(err, "failed to install CA certificate") + + ginkgo.By("Installing tidb server and client certificate") + err = InstallTiDBCertificates(ns, tcName) + framework.ExpectNoError(err, "failed to install tidb server and client certificate") + + ginkgo.By("Installing tidbInitializer client certificate") + err = installTiDBInitializerCertificates(ns, tcName) + framework.ExpectNoError(err, "failed to install tidbInitializer client certificate") + + ginkgo.By("Installing dashboard client certificate") + err = installPDDashboardCertificates(ns, tcName) + framework.ExpectNoError(err, "failed to install dashboard client certificate") + + ginkgo.By("Installing tidb components certificates") + err = InstallTiDBComponentsCertificates(ns, tcName) + framework.ExpectNoError(err, "failed to install tidb components certificates") + + if sc.skipCA { + ginkgo.By("Removing ca.crt in tidb-client-secret, dashboard and tidbInitializer client certificate") + err = removeCACertFromSecret(genericCli, ns, fmt.Sprintf("%s-tidb-client-secret", tcName)) + framework.ExpectNoError(err, "failed to update tidb-client-secret with ca.crt deleted") + err = removeCACertFromSecret(genericCli, ns, fmt.Sprintf("%s-dashboard-tls", tcName)) + framework.ExpectNoError(err, "failed to update dashboard-tls secret with ca.crt deleted") + err = removeCACertFromSecret(genericCli, ns, fmt.Sprintf("%s-initializer-tls", tcName)) + framework.ExpectNoError(err, "failed to update initializer-tls secret with ca.crt deleted") + } + + ginkgo.By("Creating tidb cluster without tls") + dashTLSName := fmt.Sprintf("%s-dashboard-tls", tcName) + tc := fixture.GetTidbCluster(ns, tcName, utilimage.TiDBLatest) + tc.Spec.PD.Replicas = 1 + tc.Spec.PD.TLSClientSecretName = &dashTLSName + tc.Spec.TiKV.Replicas = 1 + tc.Spec.TiDB.Replicas = 1 + tc.Spec.Pump = &v1alpha1.PumpSpec{ + Replicas: 1, + BaseImage: "pingcap/tidb-binlog", + ResourceRequirements: fixture.WithStorage(fixture.BurstableSmall, "1Gi"), + Config: tcconfig.New(map[string]interface{}{ + "addr": fmt.Sprintf("0.0.0.0:%d", v1alpha1.DefaultPumpPort), + "storage": map[string]interface{}{ + "stop-write-at-available-space": 0, + }, + }), + } + err = genericCli.Create(context.TODO(), tc) + framework.ExpectNoError(err, "failed to create TidbCluster: %q", tc.Name) + err = oa.WaitForTidbClusterReady(tc, 30*time.Minute, 5*time.Second) + framework.ExpectNoError(err, "wait for TidbCluster ready timeout: %q", tc.Name) + + ginkgo.By("Upgrade tidb cluster with TLS disabled") + tc.Spec.PD.TLSClientSecretName = &dashTLSName + tc.Spec.TiDB.TLSClient = &v1alpha1.TiDBTLSClient{Enabled: true} + tc.Spec.TiDB.TLSClient.SkipInternalClientCA = sc.skipCA + tc.Spec.TLSCluster = &v1alpha1.TLSCluster{Enabled: true} + + err = genericCli.Update(context.TODO(), tc) + framework.ExpectNoError(err, "failed to create TidbCluster: %q", tc.Name) + err = oa.WaitForTidbClusterReady(tc, 30*time.Minute, 5*time.Second) + framework.ExpectNoError(err, "wait for TidbCluster ready timeout: %q", tc.Name) + + ginkgo.By("Ensure configs of all components are not changed") + newTC, err := cli.PingcapV1alpha1().TidbClusters(tc.Namespace).Get(context.TODO(), tc.Name, metav1.GetOptions{}) + tc.Spec.TiDB.Config.Set("log.file.max-backups", int64(3)) // add default configs that ard added by operator + framework.ExpectNoError(err, "failed to get TidbCluster: %s", tc.Name) + framework.ExpectEqual(newTC.Spec.PD.Config, tc.Spec.PD.Config, "pd config isn't equal of TidbCluster: %s", tc.Name) + framework.ExpectEqual(newTC.Spec.TiKV.Config, tc.Spec.TiKV.Config, "tikv config isn't equal of TidbCluster: %s", tc.Name) + framework.ExpectEqual(newTC.Spec.TiDB.Config, tc.Spec.TiDB.Config, "tidb config isn't equal of TidbCluster: %s", tc.Name) + framework.ExpectEqual(newTC.Spec.Pump.Config, tc.Spec.Pump.Config, "pump config isn't equal of TidbCluster: %s", tc.Name) + framework.ExpectEqual(newTC.Spec.TiCDC.Config, tc.Spec.TiCDC.Config, "ticdc config isn't equal of TidbCluster: %s", tc.Name) + framework.ExpectEqual(newTC.Spec.TiFlash.Config, tc.Spec.TiFlash.Config, "tiflash config isn't equal of TidbCluster: %s", tc.Name) + + ginkgo.By("Ensure Dashboard use custom secret") + foundSecretName := false + pdSts, err := stsGetter.StatefulSets(ns).Get(context.TODO(), controller.PDMemberName(tcName), metav1.GetOptions{}) + framework.ExpectNoError(err, "failed to get statefulsets for pd: %q", tc.Name) + for _, vol := range pdSts.Spec.Template.Spec.Volumes { + if vol.Name == "tidb-client-tls" { + foundSecretName = true + framework.ExpectEqual(vol.Secret.SecretName, dashTLSName) + } + } + framework.ExpectEqual(foundSecretName, true) + + ginkgo.By("Creating tidb initializer") + passwd := "admin" + initName := fmt.Sprintf("%s-initializer", tcName) + initPassWDName := fmt.Sprintf("%s-initializer-passwd", tcName) + initTLSName := fmt.Sprintf("%s-initializer-tls", tcName) + initSecret := fixture.GetInitializerSecret(tc, initPassWDName, passwd) + _, err = c.CoreV1().Secrets(ns).Create(context.TODO(), initSecret, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create secret for TidbInitializer: %v", initSecret) + + ti := fixture.GetTidbInitializer(ns, tcName, initName, initPassWDName, initTLSName) + err = genericCli.Create(context.TODO(), ti) + framework.ExpectNoError(err, "failed to create TidbInitializer: %v", ti) + + source := &tests.DrainerSourceConfig{ + Namespace: ns, + ClusterName: tcName, + OperatorTag: cfg.OperatorTag, + ClusterVersion: utilimage.TiDBLatest, + } + targetTcName := "tls-target" + targetTc := fixture.GetTidbCluster(ns, targetTcName, utilimage.TiDBLatest) + targetTc.Spec.PD.Replicas = 1 + targetTc.Spec.TiKV.Replicas = 1 + targetTc.Spec.TiDB.Replicas = 1 + err = genericCli.Create(context.TODO(), targetTc) + framework.ExpectNoError(err, "failed to create TidbCluster: %v", targetTc) + err = oa.WaitForTidbClusterReady(targetTc, 30*time.Minute, 5*time.Second) + framework.ExpectNoError(err, "wait for TidbCluster timeout: %v", targetTc) + + drainerConfig := &tests.DrainerConfig{ + // Note: DrainerName muse be tcName + // oa.DeployDrainer will use DrainerName as release name to run "helm install..." + // in InstallTiDBCertificates, we use `tidbComponentsCertificatesTmpl` to render the certs: + // ... + // dnsNames: + // - "*.{{ .ClusterName }}-{{ .ClusterName }}-drainer" + // - "*.{{ .ClusterName }}-{{ .ClusterName }}-drainer.{{ .Namespace }}" + // - "*.{{ .ClusterName }}-{{ .ClusterName }}-drainer.{{ .Namespace }}.svc" + // ... + // refer to the 'drainer' part in https://docs.pingcap.com/tidb-in-kubernetes/dev/enable-tls-between-components + DrainerName: tcName, + OperatorTag: cfg.OperatorTag, + SourceClusterName: tcName, + Namespace: ns, + DbType: tests.DbTypeTiDB, + Host: fmt.Sprintf("%s-tidb.%s.svc.cluster.local", targetTcName, ns), + Port: strconv.Itoa(int(targetTc.Spec.TiDB.GetServicePort())), + TLSCluster: true, + User: "root", + Password: "", + } + + ginkgo.By("Deploying tidb drainer") + err = oa.DeployDrainer(drainerConfig, source) + framework.ExpectNoError(err, "failed to deploy drainer: %v", drainerConfig) + err = oa.CheckDrainer(drainerConfig, source) + framework.ExpectNoError(err, "failed to check drainer: %v", drainerConfig) + + ginkgo.By("Inserting data into source db") + err = wait.PollImmediate(time.Second*5, time.Minute*5, insertIntoDataToSourceDB(fw, c, ns, tcName, passwd, true)) + framework.ExpectNoError(err, "insert data into source db timeout") + + ginkgo.By("Checking tidb-binlog works as expected") + err = wait.PollImmediate(time.Second*5, time.Minute*5, dataInClusterIsCorrect(fw, c, ns, targetTcName, "", false)) + framework.ExpectNoError(err, "check data correct timeout") + + ginkgo.By("Connecting to tidb server to verify the connection is TLS enabled") + err = wait.PollImmediate(time.Second*5, time.Minute*5, tidbIsTLSEnabled(fw, c, ns, tcName, passwd)) + framework.ExpectNoError(err, "connect to TLS tidb timeout") + + ginkgo.By("Scaling out tidb cluster") + err = controller.GuaranteedUpdate(genericCli, tc, func() error { + tc.Spec.PD.Replicas = 5 + tc.Spec.TiKV.Replicas = 5 + tc.Spec.TiDB.Replicas = 3 + tc.Spec.Pump.Replicas++ + tc.Spec.TiFlash.Replicas = 3 + tc.Spec.TiCDC.Replicas = 3 + return nil + }) + framework.ExpectNoError(err, "failed to update TidbCluster: %q", tc.Name) + err = oa.WaitForTidbClusterReady(tc, 30*time.Minute, 5*time.Second) + framework.ExpectNoError(err, "wait for TidbCluster ready timeout: %q", tc.Name) + + ginkgo.By("Scaling in tidb cluster") + err = controller.GuaranteedUpdate(genericCli, tc, func() error { + tc.Spec.PD.Replicas = 3 + tc.Spec.TiKV.Replicas = 3 + tc.Spec.TiDB.Replicas = 2 + tc.Spec.Pump.Replicas-- + tc.Spec.TiFlash.Replicas = 1 + tc.Spec.TiCDC.Replicas = 1 + return nil + }) + framework.ExpectNoError(err, "failed to update TidbCluster: %q", tc.Name) + err = oa.WaitForTidbClusterReady(tc, 30*time.Minute, 5*time.Second) + framework.ExpectNoError(err, "wait for TidbCluster ready timeout: %q", tc.Name) + + ginkgo.By("Upgrading tidb cluster") + err = controller.GuaranteedUpdate(genericCli, tc, func() error { + tc.Spec.Version = utilimage.TiDBLatest + return nil + }) + framework.ExpectNoError(err, "failed to update TidbCluster: %q", tc.Name) + err = oa.WaitForTidbClusterReady(tc, 30*time.Minute, 5*time.Second) + framework.ExpectNoError(err, "wait for TidbCluster ready timeout: %q", tc.Name) + + ginkgo.By("Check custom labels and annotations for TidbInitializer") + checkInitializerCustomLabelAndAnn(ti, c) + }) } ginkgo.It("should enable TLS for MySQL Client and between Heterogeneous TiDB components", func() {