Skip to content

Commit

Permalink
add tls
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Sep 12, 2024
1 parent dbe75c0 commit cefcf04
Show file tree
Hide file tree
Showing 9 changed files with 466 additions and 20 deletions.
8 changes: 8 additions & 0 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,11 @@ func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string)
pdControl.SetPDClientWithAddress(peerURL, pdClient)
return pdClient
}

// NewFakeEtcdClient creates a fake etcdClient that is set as the etcd client
func NewFakeEtcdClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *pdapi.FakeEtcdClient {
etcdClient := pdapi.NewFakeEtcdClient()
pdControl.SetEtcdClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), etcdClient)

return etcdClient
}
6 changes: 6 additions & 0 deletions pkg/manager/member/pd_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ func (m *pdMemberManager) syncTidbClusterStatus(tc *v1alpha1.TidbCluster, set *a
return err
}

// check pd member's peer urls
upgradePdName := PdName(tcName, 0, tc.Namespace, tc.Spec.ClusterDomain, tc.Spec.AcrossK8s)
if err := m.upgrader.(*pdUpgrader).SyncPDPeerUrls(tc, upgradePdName); err != nil {
return fmt.Errorf("syncTidbClusterStatus: failed to sync pd peer urls for cluster %s/%s, err: %s", ns, tcName, err)
}

cluster, err := pdClient.GetCluster()
if err != nil {
tc.Status.PD.Synced = false
Expand Down
20 changes: 10 additions & 10 deletions pkg/manager/member/pd_ms_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
}

curService := controller.PDMSTrimName(newSet.Name)
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService)
klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService)
if tc.Status.PDMS[curService] == nil {
tc.Status.PDMS[curService] = &v1alpha1.PDMSStatus{Name: curService}
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS component is nil, can not to be upgraded, component: %s", ns, tcName, curService)
Expand All @@ -62,9 +62,9 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
if oldTrimName != curService {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS oldTrimName is %s, not equal to componentName: %s", ns, tcName, oldTrimName, curService)
}
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName)
klog.Infof("tidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName)
if tc.PDMSScaling(oldTrimName) {
klog.Infof("TidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
klog.Infof("tidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
ns, tcName, tc.Status.PDMS[curService].Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
Expand Down Expand Up @@ -141,7 +141,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
return err
}

klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
primary, upgradePDMSName, upgradePodName)
// If current pdms is primary, transfer primary to other pdms pod
if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePDMSName) {
Expand All @@ -152,15 +152,15 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
}

if targetName != "" {
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
err := controller.GetPDMSClient(u.deps.PDControl, tc, curService).TransferPrimary(targetName)
if err != nil {
klog.Errorf("TidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
klog.Errorf("tidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
return err
}
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
} else {
klog.Warningf("TidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
klog.Warningf("tidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
}
}
}
Expand All @@ -177,7 +177,7 @@ func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, n
func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string {
ns := tc.GetNamespace()
tcName := tc.GetName()
klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet)

// set ordinal to max ordinal if ordinal isn't exist
Expand All @@ -202,7 +202,7 @@ func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.Stat
targetName = PDMSPodName(tcName, list[0], controller.PDMSTrimName(newSet.Name))
}

klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
klog.Infof("tidbCluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
return targetName
}

Expand Down
76 changes: 71 additions & 5 deletions pkg/manager/member/pd_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ package member

import (
"fmt"
"net/url"

"github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"

apps "k8s.io/api/apps/v1"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ func (u *pdUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Stat
return fmt.Errorf("tidbcluster: [%s/%s]'s pd status sync failed, can not to be upgraded", ns, tcName)
}
if tc.PDScaling() {
klog.Infof("TidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd",
klog.Infof("tidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd",
ns, tcName, tc.Status.PD.Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
Expand Down Expand Up @@ -143,17 +144,82 @@ func (u *pdUpgrader) upgradePDPod(tc *v1alpha1.TidbCluster, ordinal int32, newSe
if targetName != "" {
err := u.transferPDLeaderTo(tc, targetName)
if err != nil {
klog.Errorf("pd upgrader: failed to transfer pd leader to: %s, %v", targetName, err)
klog.Errorf("tidbcluster: [%s/%s] pd upgrader: failed to transfer pd leader to: %s, %v", ns, tcName, targetName, err)
return err
}
klog.Infof("pd upgrader: transfer pd leader to: %s successfully", targetName)
klog.Infof("tidbcluster: [%s/%s] pd upgrader: transfer pd leader to: %s successfully", ns, tcName, targetName)
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pd member: [%s] is transferring leader to pd member: [%s]", ns, tcName, upgradePdName, targetName)
} else {
klog.Warningf("pd upgrader: skip to transfer pd leader, because can not find a suitable pd")
klog.Warningf("tidbcluster: [%s/%s] pd upgrader: skip to transfer pd leader, because can not find a suitable pd", ns, tcName)
}
}

mngerutils.SetUpgradePartition(newSet, ordinal)
return u.SyncPDPeerUrls(tc, upgradePdName)
}

func (u *pdUpgrader) SyncPDPeerUrls(tc *v1alpha1.TidbCluster, pdName string) error {
ns := tc.GetNamespace()
tcName := tc.GetName()

var (
tlsEnabled = tc.IsTLSClusterEnabled()
pdCli = controller.GetPDClient(u.deps.PDControl, tc)
member *pdpb.Member
)
members, err := pdCli.GetMembers()
if err != nil {
return err
}
for _, m := range members.Members {
if m.Name == pdName {
member = m
break
}
}
if member == nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to find member %s in pd cluster", ns, tcName, pdName)
}

var (
newPeers []string
needSync bool
)
for _, peerUrl := range member.PeerUrls {
u, err := url.Parse(peerUrl)
if err != nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to parse peer url %s, %v", ns, tcName, peerUrl, err)
}
// check if peer url need to be updated
if tlsEnabled != (u.Scheme == "https") {
needSync = true
if !tlsEnabled {
u.Scheme = "http"
} else {
u.Scheme = "https"
}
newPeers = append(newPeers, u.String())
} else {
newPeers = append(newPeers, peerUrl)
}
}

if needSync {
pdEtcdClient, err := u.deps.PDControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name,
tc.IsTLSClusterEnabled(), pdapi.ClusterRef(tc.Spec.ClusterDomain))

if err != nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to create pd etcd client, %v", ns, tcName, err)
}
defer pdEtcdClient.Close()
err = pdEtcdClient.UpdateMember(member.GetMemberId(), newPeers)
if err != nil {
return fmt.Errorf("tidbcluster: [%s/%s] failed to update pd member: %s peer urls, %v", ns, tcName, pdName, err)
}
klog.Infof("tidbcluster: [%s/%s] pd upgrader: sync pd member: %s peer urls successfully, from %v to %v",
ns, tcName, pdName, member.PeerUrls, newPeers)
}

return nil
}

Expand Down
67 changes: 64 additions & 3 deletions pkg/manager/member/pd_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"fmt"
"testing"

. "github.com/onsi/gomega"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb-operator/pkg/apis/label"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/pdapi"

. "github.com/onsi/gomega"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -51,6 +51,7 @@ func TestPDUpgraderUpgrade(t *testing.T) {
upgrader, pdControl, _, podInformer := newPDUpgrader()
tc := newTidbClusterForPDUpgrader()
pdClient := controller.NewFakePDClient(pdControl, tc)
etcdClient := controller.NewFakeEtcdClient(pdControl, tc)

if test.changeFn != nil {
test.changeFn(tc)
Expand Down Expand Up @@ -95,6 +96,27 @@ func TestPDUpgraderUpgrade(t *testing.T) {
return healthInfo, nil
})

pdClient.AddReaction(pdapi.GetMembersActionType, func(action *pdapi.Action) (interface{}, error) {
membersInfo := &pdapi.MembersInfo{
Members: []*pdpb.Member{
{
Name: PdPodName(upgradeTcName, 0),
MemberId: 111,
PeerUrls: []string{"http://upgrader-pd-0:2380"},
},
{
Name: PdPodName(upgradeTcName, 1),
MemberId: 222,
PeerUrls: []string{"http://upgrader-pd-1:2380"},
},
},
}
return membersInfo, nil
})
etcdClient.AddReaction(pdapi.EtcdUpdateMemberActionType, func(action *pdapi.Action) (interface{}, error) {
return nil, nil
})

err := upgrader.Upgrade(tc, oldSet, newSet)
test.errExpectFn(g, err)
test.expectFn(g, tc, newSet)
Expand Down Expand Up @@ -314,12 +336,41 @@ func TestPDUpgraderUpgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
},
},
{
name: "upgrade from non-tls to tls",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Synced = true
tc.Spec = v1alpha1.TidbClusterSpec{
PD: &v1alpha1.PDSpec{
ComponentSpec: v1alpha1.ComponentSpec{
Image: "pingcap/pd:v3.1.0",
},
},
TiDB: &v1alpha1.TiDBSpec{
TLSClient: &v1alpha1.TiDBTLSClient{
Enabled: true,
},
},
TiKV: &v1alpha1.TiKVSpec{},
TLSCluster: &v1alpha1.TLSCluster{Enabled: true},
}
},
changePods: nil,
changeOldSet: nil,
transferLeaderErr: false,
errExpectFn: func(g *GomegaWithT, err error) {
g.Expect(err).NotTo(HaveOccurred())
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.UpgradePhase))
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
},
},
}

for i := range tests {
testFn(&tests[i])
}

}

func TestChoosePDToTransferFromMembers(t *testing.T) {
Expand Down Expand Up @@ -447,6 +498,16 @@ func TestChoosePDToTransferFromMembers(t *testing.T) {

func newPDUpgrader() (Upgrader, *pdapi.FakePDControl, *controller.FakePodControl, podinformers.PodInformer) {
fakeDeps := controller.NewFakeDependencies()

informer := fakeDeps.KubeInformerFactory
informer.Core().V1().Secrets().Informer().GetIndexer().Add(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "upgrader-cluster-client-secret",
Namespace: corev1.NamespaceDefault,
},
})
fakeDeps.PDControl = pdapi.NewFakePDControl(informer.Core().V1().Secrets().Lister())

pdUpgrader := &pdUpgrader{deps: fakeDeps}
pdControl := fakeDeps.PDControl.(*pdapi.FakePDControl)
podControl := fakeDeps.PodControl.(*controller.FakePodControl)
Expand Down
72 changes: 72 additions & 0 deletions pkg/pdapi/fake_pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,75 @@ func (c *FakePDMSClient) TransferPrimary(newPrimary string) error {
_, err := c.fakeAPI(PDMSTransferPrimaryActionType, action)
return err
}

const (
EtcdGetActionType ActionType = "Get"
EtcdPutActionType ActionType = "Put"
EtcdDeleteActionType ActionType = "Delete"
EtcdUpdateMemberActionType ActionType = "UpdateMember"
EtcdCloseActionType ActionType = "Close"
)

// FakeEtcdClient implements a fake version of EtcdClient.
type FakeEtcdClient struct {
reactions map[ActionType]Reaction
}

func NewFakeEtcdClient() *FakeEtcdClient {
return &FakeEtcdClient{reactions: map[ActionType]Reaction{}}
}

func (c *FakeEtcdClient) AddReaction(actionType ActionType, reaction Reaction) {
c.reactions[actionType] = reaction
}

// fakeAPI is a small helper for fake API calls
func (c *FakeEtcdClient) fakeAPI(actionType ActionType, action *Action) (interface{}, error) {
if reaction, ok := c.reactions[actionType]; ok {
result, err := reaction(action)
if err != nil {
return nil, err
}
return result, nil
}
return nil, &NotFoundReaction{actionType}
}

func (c *FakeEtcdClient) Get(_ string, _ bool) (kvs []*KeyValue, err error) {
action := &Action{}
nil, err := c.fakeAPI(EtcdGetActionType, action)
if err != nil {
return
}
return
}

func (c *FakeEtcdClient) PutKey(_, _ string) error {
action := &Action{}
_, err := c.fakeAPI(EtcdPutActionType, action)
return err
}

func (c *FakeEtcdClient) PutTTLKey(_, _ string, _ int64) error {
action := &Action{}
_, err := c.fakeAPI(EtcdPutActionType, action)
return err
}

func (c *FakeEtcdClient) DeleteKey(_ string) error {
action := &Action{}
_, err := c.fakeAPI(EtcdDeleteActionType, action)
return err
}

func (c *FakeEtcdClient) UpdateMember(_ uint64, _ []string) error {
action := &Action{}
_, err := c.fakeAPI(EtcdUpdateMemberActionType, action)
return err
}

func (c *FakeEtcdClient) Close() error {
action := &Action{}
_, err := c.fakeAPI(EtcdCloseActionType, action)
return err
}
Loading

0 comments on commit cefcf04

Please sign in to comment.