Skip to content

Commit

Permalink
add tls
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Sep 12, 2024
1 parent dbe75c0 commit 16e4355
Show file tree
Hide file tree
Showing 9 changed files with 537 additions and 17 deletions.
8 changes: 8 additions & 0 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,11 @@ func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string)
pdControl.SetPDClientWithAddress(peerURL, pdClient)
return pdClient
}

// NewFakeEtcdClient creates a fake etcdClient that is set as the etcd client
func NewFakeEtcdClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *pdapi.FakeEtcdClient {
etcdClient := pdapi.NewFakeEtcdClient()
pdControl.SetEtcdClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), etcdClient)

return etcdClient
}
69 changes: 69 additions & 0 deletions pkg/manager/member/pd_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package member
import (
"context"
"fmt"
"net/url"
"path"
"regexp"
"strconv"
"strings"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb-operator/pkg/apis/label"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/manager/suspender"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/manager/volumes"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"
"github.com/pingcap/tidb-operator/pkg/util"

Expand Down Expand Up @@ -387,6 +390,8 @@ func (m *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *a
memberID, memberHealth.ClientUrls, memberHealth, ns, tcName)
continue
}
// try to sync peer urls
m.syncPDPeerUrls(tc, name, pdClient)

status := v1alpha1.PDMember{
Name: name,
Expand Down Expand Up @@ -993,6 +998,70 @@ func (m *pdMemberManager) collectUnjoinedMembers(tc *v1alpha1.TidbCluster, set *
return nil
}

func (m *pdMemberManager) syncPDPeerUrls(tc *v1alpha1.TidbCluster, pdName string, pdCli pdapi.PDClient) error {
ns := tc.GetNamespace()
tcName := tc.GetName()

var (
tlsEnabled = tc.IsTLSClusterEnabled()
member *pdpb.Member
)
members, err := pdCli.GetMembers()
if err != nil {
return err
}
for _, m := range members.Members {
if m.Name == pdName {
member = m
break
}
}
if member == nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to find member %s in pd cluster", ns, tcName, pdName)
}

var (
newPeers []string
needSync bool
)
for _, peerUrl := range member.PeerUrls {
u, err := url.Parse(peerUrl)
if err != nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to parse peer url %s, %v", ns, tcName, peerUrl, err)
}
// check if peer url need to be updated
if tlsEnabled != (u.Scheme == "https") {
needSync = true
if !tlsEnabled {
u.Scheme = "http"
} else {
u.Scheme = "https"
}
newPeers = append(newPeers, u.String())
} else {
newPeers = append(newPeers, peerUrl)
}
}

if needSync {
pdEtcdClient, err := m.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name,
tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))

if err != nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to create pd etcd client, %v", ns, tcName, err)
}
defer pdEtcdClient.Close()
err = pdEtcdClient.UpdateMember(member.GetMemberId(), newPeers)
if err != nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to update pd member: %s peer urls, %v", ns, tcName, pdName, err)
}
klog.Infof("tidbcluster: [%s/%s] pd upgrader: sync pd member: %s peer urls successfully, from %v to %v",
ns, tcName, pdName, member.PeerUrls, newPeers)
}

return nil
}

// TODO: Support check status http request in future.
func buildPDReadinessProbHandler(tc *v1alpha1.TidbCluster) corev1.ProbeHandler {
return corev1.ProbeHandler{
Expand Down
138 changes: 138 additions & 0 deletions pkg/manager/member/pd_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/google/go-cmp/cmp"
. "github.com/onsi/gomega"
"github.com/pingcap/kvproto/pkg/pdpb"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -2855,6 +2856,143 @@ func TestPDMemberManagerSyncPDStsWhenPdNotJoinCluster(t *testing.T) {
}
}

func TestPDMemberManagerSyncPDPeerUrlsSts(t *testing.T) {
g := NewGomegaWithT(t)
type testcase struct {
name string
modify func(cluster *v1alpha1.TidbCluster)
pdHealth *pdapi.HealthInfo
err bool
membersInfo *pdapi.MembersInfo
expectSchema string
}

testFn := func(test *testcase, t *testing.T) {
tc := newTidbClusterForPD()
ns := tc.Namespace
tcName := tc.Name

pmm, _, _ := newFakePDMemberManager()
fakePDControl := pmm.deps.PDControl.(*pdapi.FakePDControl)
pdClient := controller.NewFakePDClient(fakePDControl, tc)
etcdClient := controller.NewFakeEtcdClient(fakePDControl, tc)

pdClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) {
return test.pdHealth, nil
})
pdClient.AddReaction(pdapi.GetClusterActionType, func(action *pdapi.Action) (interface{}, error) {
return &metapb.Cluster{Id: uint64(1)}, nil
})
pdClient.AddReaction(pdapi.GetMembersActionType, func(action *pdapi.Action) (interface{}, error) {
return test.membersInfo, nil
})
etcdClient.AddReaction(pdapi.EtcdUpdateMemberActionType, func(action *pdapi.Action) (interface{}, error) {
return test.expectSchema, nil
})

err := pmm.Sync(tc)
g.Expect(controller.IsRequeueError(err)).To(BeTrue())

_, err = pmm.deps.ServiceLister.Services(ns).Get(controller.PDMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())
_, err = pmm.deps.ServiceLister.Services(ns).Get(controller.PDPeerMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())
_, err = pmm.deps.StatefulSetLister.StatefulSets(ns).Get(controller.PDMemberName(tcName))
g.Expect(err).NotTo(HaveOccurred())

test.modify(tc)
err = pmm.syncPDStatefulSetForTidbCluster(tc)
if test.err {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}
}
tests := []testcase{
{
name: "upgrade from non-tls to tls",
expectSchema: "https://",
modify: func(tc *v1alpha1.TidbCluster) {
tc.Spec = v1alpha1.TidbClusterSpec{
PD: &v1alpha1.PDSpec{
ComponentSpec: v1alpha1.ComponentSpec{
Image: "pingcap/pd:v8.1.0",
},
},
TiDB: &v1alpha1.TiDBSpec{
TLSClient: &v1alpha1.TiDBTLSClient{
Enabled: true,
},
},
TiKV: &v1alpha1.TiKVSpec{},
TLSCluster: &v1alpha1.TLSCluster{Enabled: true},
}
},
pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{
{Name: "pd0", MemberID: uint64(1), ClientUrls: []string{"http://pd0:2379"}, Health: true},
{Name: "pd1", MemberID: uint64(2), ClientUrls: []string{"http://pd1:2379"}, Health: true},
}},
membersInfo: &pdapi.MembersInfo{
Members: []*pdpb.Member{
{
Name: "pd0",
MemberId: 111,
PeerUrls: []string{"http://pd0:2380"},
},
{
Name: "pd1",
MemberId: 222,
PeerUrls: []string{"http://pd1:2380"},
},
},
},
},
{
name: "upgrade from tls to non-tls",
expectSchema: "http://",
modify: func(tc *v1alpha1.TidbCluster) {
tc.Spec = v1alpha1.TidbClusterSpec{
PD: &v1alpha1.PDSpec{
ComponentSpec: v1alpha1.ComponentSpec{
Image: "pingcap/pd:v8.1.0",
},
},
TiDB: &v1alpha1.TiDBSpec{
TLSClient: &v1alpha1.TiDBTLSClient{
Enabled: false,
},
},
TiKV: &v1alpha1.TiKVSpec{},
TLSCluster: &v1alpha1.TLSCluster{Enabled: false},
}
},
pdHealth: &pdapi.HealthInfo{Healths: []pdapi.MemberHealth{
{Name: "pd0", MemberID: uint64(1), ClientUrls: []string{"https://pd0:2379"}, Health: true},
{Name: "pd1", MemberID: uint64(2), ClientUrls: []string{"https://pd1:2379"}, Health: true},
}},
membersInfo: &pdapi.MembersInfo{
Members: []*pdpb.Member{
{
Name: "pd0",
MemberId: 111,
PeerUrls: []string{"https://pd0:2380"},
},
{
Name: "pd1",
MemberId: 222,
PeerUrls: []string{"https://pd1:2380"},
},
},
},
},
}
for i := range tests {
t.Logf("begin: %s", tests[i].name)
testFn(&tests[i], t)
t.Logf("end: %s", tests[i].name)
}
}

func TestPDShouldRecover(t *testing.T) {
pods := []*v1.Pod{
{
Expand Down
20 changes: 10 additions & 10 deletions pkg/manager/member/pd_ms_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
}

curService := controller.PDMSTrimName(newSet.Name)
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService)
klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService)
if tc.Status.PDMS[curService] == nil {
tc.Status.PDMS[curService] = &v1alpha1.PDMSStatus{Name: curService}
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS component is nil, can not to be upgraded, component: %s", ns, tcName, curService)
Expand All @@ -62,9 +62,9 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
if oldTrimName != curService {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS oldTrimName is %s, not equal to componentName: %s", ns, tcName, oldTrimName, curService)
}
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName)
klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName)
if tc.PDMSScaling(oldTrimName) {
klog.Infof("TidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
klog.Infof("tidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
ns, tcName, tc.Status.PDMS[curService].Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
Expand Down Expand Up @@ -141,7 +141,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
return err
}

klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
primary, upgradePDMSName, upgradePodName)
// If current pdms is primary, transfer primary to other pdms pod
if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePDMSName) {
Expand All @@ -152,15 +152,15 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
}

if targetName != "" {
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
err := controller.GetPDMSClient(u.deps.PDControl, tc, curService).TransferPrimary(targetName)
if err != nil {
klog.Errorf("TidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
klog.Errorf("tidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
return err
}
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
} else {
klog.Warningf("TidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
klog.Warningf("tidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
}
}
}
Expand All @@ -177,7 +177,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string {
ns := tc.GetNamespace()
tcName := tc.GetName()
klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet)

// set ordinal to max ordinal if ordinal isn't exist
Expand All @@ -202,7 +202,7 @@ func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.Stat
targetName = PDMSPodName(tcName, list[0], controller.PDMSTrimName(newSet.Name))
}

klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
return targetName
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/manager/member/pd_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"

apps "k8s.io/api/apps/v1"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -54,7 +53,7 @@ func (u *pdUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Stat
return fmt.Errorf("tidbcluster: [%s/%s]'s pd status sync failed, can not to be upgraded", ns, tcName)
}
if tc.PDScaling() {
klog.Infof("TidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd",
klog.Infof("tidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd",
ns, tcName, tc.Status.PD.Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
Expand Down Expand Up @@ -143,13 +142,13 @@ func (u *pdUpgrader) upgradePDPod(tc *v1alpha1.TidbCluster, ordinal int32, newSe
if targetName != "" {
err := u.transferPDLeaderTo(tc, targetName)
if err != nil {
klog.Errorf("pd upgrader: failed to transfer pd leader to: %s, %v", targetName, err)
klog.Errorf("tidbcluster: [%s/%s] pd upgrader: failed to transfer pd leader to: %s, %v", ns, tcName, targetName, err)
return err
}
klog.Infof("pd upgrader: transfer pd leader to: %s successfully", targetName)
klog.Infof("tidbcluster: [%s/%s] pd upgrader: transfer pd leader to: %s successfully", ns, tcName, targetName)
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd member: [%s] is transferring leader to pd member: [%s]", ns, tcName, upgradePdName, targetName)
} else {
klog.Warningf("pd upgrader: skip to transfer pd leader, because can not find a suitable pd")
klog.Warningf("tidbcluster: [%s/%s] pd upgrader: skip to transfer pd leader, because can not find a suitable pd", ns, tcName)
}
}

Expand Down
Loading

0 comments on commit 16e4355

Please sign in to comment.