Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pd: support schema change by tls #5708

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,11 @@ func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string)
pdControl.SetPDClientWithAddress(peerURL, pdClient)
return pdClient
}

// NewFakeEtcdClient creates a fake etcdClient that is set as the etcd client
func NewFakeEtcdClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *pdapi.FakeEtcdClient {
etcdClient := pdapi.NewFakeEtcdClient()
pdControl.SetEtcdClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), etcdClient)

return etcdClient
}
69 changes: 69 additions & 0 deletions pkg/manager/member/pd_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package member
import (
"context"
"fmt"
"net/url"
"path"
"regexp"
"strconv"
"strings"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb-operator/pkg/apis/label"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/manager/suspender"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/manager/volumes"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"
"github.com/pingcap/tidb-operator/pkg/util"

Expand Down Expand Up @@ -387,6 +390,8 @@ func (m *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *a
memberID, memberHealth.ClientUrls, memberHealth, ns, tcName)
continue
}
// try to sync peer urls
m.syncPDPeerUrls(tc, name, pdClient)

status := v1alpha1.PDMember{
Name: name,
Expand Down Expand Up @@ -993,6 +998,70 @@ func (m *pdMemberManager) collectUnjoinedMembers(tc *v1alpha1.TidbCluster, set *
return nil
}

func (m *pdMemberManager) syncPDPeerUrls(tc *v1alpha1.TidbCluster, pdName string, pdCli pdapi.PDClient) error {
ns := tc.GetNamespace()
tcName := tc.GetName()

var (
tlsEnabled = tc.IsTLSClusterEnabled()
member *pdpb.Member
)
members, err := pdCli.GetMembers()
if err != nil {
return err
}
for _, m := range members.Members {
if m.Name == pdName {
member = m
break
}
}
if member == nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to find member %s in pd cluster", ns, tcName, pdName)
}

var (
newPeers []string
needSync bool
)
for _, peerUrl := range member.PeerUrls {
u, err := url.Parse(peerUrl)
if err != nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to parse peer url %s, %v", ns, tcName, peerUrl, err)
}
// check if peer url need to be updated
if tlsEnabled != (u.Scheme == "https") {
needSync = true
if !tlsEnabled {
u.Scheme = "http"
} else {
u.Scheme = "https"
}
newPeers = append(newPeers, u.String())
} else {
newPeers = append(newPeers, peerUrl)
}
}

if needSync {
pdEtcdClient, err := m.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name,
tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))

if err != nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to create pd etcd client, %v", ns, tcName, err)
}
defer pdEtcdClient.Close()
err = pdEtcdClient.UpdateMember(member.GetMemberId(), newPeers)
if err != nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to update pd member: %s peer urls, %v", ns, tcName, pdName, err)
}
klog.Infof("tidbcluster: [%s/%s] pd upgrader: sync pd member: %s peer urls successfully, from %v to %v",
ns, tcName, pdName, member.PeerUrls, newPeers)
}

return nil
}

// TODO: Support check status http request in future.
func buildPDReadinessProbHandler(tc *v1alpha1.TidbCluster) corev1.ProbeHandler {
return corev1.ProbeHandler{
Expand Down
127 changes: 127 additions & 0 deletions pkg/manager/member/pd_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/google/go-cmp/cmp"
. "github.com/onsi/gomega"
"github.com/pingcap/kvproto/pkg/pdpb"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -2855,6 +2856,132 @@ func TestPDMemberManagerSyncPDStsWhenPdNotJoinCluster(t *testing.T) {
}
}

func TestPDMemberManagerSyncPDPeerUrlsSts(t *testing.T) {
g := NewGomegaWithT(t)
type testcase struct {
name string
modify func(cluster *v1alpha1.TidbCluster)
pdHealth *pdapi.HealthInfo
err bool
membersInfo *pdapi.MembersInfo
expectSchema string
}

testFn := func(test *testcase, t *testing.T) {
tc := newTidbClusterForPD()
ns := tc.Namespace
tcName := tc.Name

pmm, _, _ := newFakePDMemberManager()
pmm.deps.PDControl = pdapi.NewFakePDControl(pmm.deps.KubeInformerFactory.Core().V1().Secrets().Lister(), true)
fakePDControl := pmm.deps.PDControl.(*pdapi.FakePDControl)
pdClient := controller.NewFakePDClient(fakePDControl, tc)
etcdClient := controller.NewFakeEtcdClient(fakePDControl, tc)

pdClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) {
return test.pdHealth, nil
})
pdClient.AddReaction(pdapi.GetClusterActionType, func(action *pdapi.Action) (interface{}, error) {
return &metapb.Cluster{Id: uint64(1)}, nil
})
pdClient.AddReaction(pdapi.GetMembersActionType, func(action *pdapi.Action) (interface{}, error) {
return test.membersInfo, nil
})
etcdClient.AddReaction(pdapi.EtcdUpdateMemberActionType, func(action *pdapi.Action) (interface{}, error) {
return test.expectSchema, nil
})

err := pmm.Sync(tc)
g.Expect(controller.IsRequeueError(err)).To(BeTrue())

_, err = pmm.deps.ServiceLister.Services(ns).Get(controller.PDMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())
_, err = pmm.deps.ServiceLister.Services(ns).Get(controller.PDPeerMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())
_, err = pmm.deps.StatefulSetLister.StatefulSets(ns).Get(controller.PDMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())

test.modify(tc)
err = pmm.syncPDStatefulSetForTidbCluster(tc)
if test.err {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}
}
tests := []testcase{
{
name: "upgrade from non-tls to tls",
expectSchema: "https://",
modify: func(tc *v1alpha1.TidbCluster) {
tc.Spec = v1alpha1.TidbClusterSpec{
PD: &v1alpha1.PDSpec{
ComponentSpec: v1alpha1.ComponentSpec{
Image: "pingcap/pd:v8.1.0",
},
},
TiDB: &v1alpha1.TiDBSpec{
TLSClient: &v1alpha1.TiDBTLSClient{
Enabled: true,
},
},
TiKV: &v1alpha1.TiKVSpec{},
TLSCluster: &v1alpha1.TLSCluster{Enabled: true},
}
},
pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{
{Name: "pd0", MemberID: 1, ClientUrls: []string{"http://pd0:2379"}, Health: true},
}},
membersInfo: &pdapi.MembersInfo{
Members: []*pdpb.Member{
{
Name: "pd0",
MemberId: 1,
PeerUrls: []string{"http://pd0:2380"},
},
},
},
},
{
name: "upgrade from tls to non-tls",
expectSchema: "http://",
modify: func(tc *v1alpha1.TidbCluster) {
tc.Spec = v1alpha1.TidbClusterSpec{
PD: &v1alpha1.PDSpec{
ComponentSpec: v1alpha1.ComponentSpec{
Image: "pingcap/pd:v8.1.0",
},
},
TiDB: &v1alpha1.TiDBSpec{
TLSClient: &v1alpha1.TiDBTLSClient{
Enabled: false,
},
},
TiKV: &v1alpha1.TiKVSpec{},
TLSCluster: &v1alpha1.TLSCluster{Enabled: false},
}
},
pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{
{Name: "pd0", MemberID: 1, ClientUrls: []string{"https://pd0:2379"}, Health: true},
}},
membersInfo: &pdapi.MembersInfo{
Members: []*pdpb.Member{
{
Name: "pd0",
MemberId: 1,
PeerUrls: []string{"https://pd0:2380"},
},
},
},
},
}
for i := range tests {
t.Logf("begin: %s", tests[i].name)
testFn(&tests[i], t)
t.Logf("end: %s", tests[i].name)
}
}

func TestPDShouldRecover(t *testing.T) {
pods := []*v1.Pod{
{
Expand Down
20 changes: 10 additions & 10 deletions pkg/manager/member/pd_ms_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
}

curService := controller.PDMSTrimName(newSet.Name)
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService)
klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService)
if tc.Status.PDMS[curService] == nil {
tc.Status.PDMS[curService] = &v1alpha1.PDMSStatus{Name: curService}
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS component is nil, can not to be upgraded, component: %s", ns, tcName, curService)
Expand All @@ -62,9 +62,9 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
if oldTrimName != curService {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS oldTrimName is %s, not equal to componentName: %s", ns, tcName, oldTrimName, curService)
}
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName)
klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName)
if tc.PDMSScaling(oldTrimName) {
klog.Infof("TidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
klog.Infof("tidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
ns, tcName, tc.Status.PDMS[curService].Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
Expand Down Expand Up @@ -141,7 +141,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
return err
}

klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
primary, upgradePDMSName, upgradePodName)
// If current pdms is primary, transfer primary to other pdms pod
if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePDMSName) {
Expand All @@ -152,15 +152,15 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
}

if targetName != "" {
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
err := controller.GetPDMSClient(u.deps.PDControl, tc, curService).TransferPrimary(targetName)
if err != nil {
klog.Errorf("TidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
klog.Errorf("tidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
return err
}
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
} else {
klog.Warningf("TidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
klog.Warningf("tidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
}
}
}
Expand All @@ -177,7 +177,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string {
ns := tc.GetNamespace()
tcName := tc.GetName()
klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet)

// set ordinal to max ordinal if ordinal isn't exist
Expand All @@ -202,7 +202,7 @@ func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.Stat
targetName = PDMSPodName(tcName, list[0], controller.PDMSTrimName(newSet.Name))
}

klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
return targetName
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/manager/member/pd_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"

apps "k8s.io/api/apps/v1"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -54,7 +53,7 @@ func (u *pdUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Stat
return fmt.Errorf("tidbcluster: [%s/%s]'s pd status sync failed, can not to be upgraded", ns, tcName)
}
if tc.PDScaling() {
klog.Infof("TidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd",
klog.Infof("tidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd",
ns, tcName, tc.Status.PD.Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
Expand Down Expand Up @@ -143,13 +142,13 @@ func (u *pdUpgrader) upgradePDPod(tc *v1alpha1.TidbCluster, ordinal int32, newSe
if targetName != "" {
err := u.transferPDLeaderTo(tc, targetName)
if err != nil {
klog.Errorf("pd upgrader: failed to transfer pd leader to: %s, %v", targetName, err)
klog.Errorf("tidbcluster: [%s/%s] pd upgrader: failed to transfer pd leader to: %s, %v", ns, tcName, targetName, err)
return err
}
klog.Infof("pd upgrader: transfer pd leader to: %s successfully", targetName)
klog.Infof("tidbcluster: [%s/%s] pd upgrader: transfer pd leader to: %s successfully", ns, tcName, targetName)
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd member: [%s] is transferring leader to pd member: [%s]", ns, tcName, upgradePdName, targetName)
} else {
klog.Warningf("pd upgrader: skip to transfer pd leader, because can not find a suitable pd")
klog.Warningf("tidbcluster: [%s/%s] pd upgrader: skip to transfer pd leader, because can not find a suitable pd", ns, tcName)
}
}

Expand Down
Loading
Loading