Skip to content

Commit

Permalink
pdms: Add the name field to the startup parameters (#5698)
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
Signed-off-by: husharp <[email protected]>
Co-authored-by: Xuecheng Zhang <[email protected]>
  • Loading branch information
HuSharp and csuzhangxc authored Aug 6, 2024
1 parent 5d83d89 commit c329624
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 48 deletions.
15 changes: 8 additions & 7 deletions examples/basic/pd-micro-service-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ spec:
helper:
image: alpine:3.16.0
pd:
baseImage: pingcap/pd
version: v8.1.0
# TODO: replaced v8.3.0 after v8.3.0 released
baseImage: hub.pingcap.net/devbuild/pd
version: v8.3.0-5427
maxFailoverCount: 0
replicas: 1
# if storageClassName is not set, the default Storage Class of the Kubernetes cluster will be used
Expand All @@ -28,13 +29,13 @@ spec:
mode: "ms"
pdms:
- name: "tso"
baseImage: pingcap/pd
version: v8.1.0
baseImage: hub.pingcap.net/devbuild/pd
version: v8.3.0-5427
replicas: 2
- name: "scheduling"
baseImage: pingcap/pd
version: v8.1.0
replicas: 1
baseImage: hub.pingcap.net/devbuild/pd
version: v8.3.0-5427
replicas: 2
tikv:
baseImage: pingcap/tikv
version: v8.1.0
Expand Down
15 changes: 9 additions & 6 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,29 @@ func GetPDClient(pdControl pdapi.PDControlInterface, tc *v1alpha1.TidbCluster) p
}

// GetPDMSClient tries to return an available PDMSClient
func GetPDMSClient(pdControl pdapi.PDControlInterface, tc *v1alpha1.TidbCluster, serviceName string) error {
func GetPDMSClient(pdControl pdapi.PDControlInterface, tc *v1alpha1.TidbCluster, serviceName string) pdapi.PDMSClient {
pdMSClient := getPDMSClientFromService(pdControl, tc, serviceName)

err := pdMSClient.GetHealth()
if err == nil {
return nil
return pdMSClient
}

for _, service := range tc.Status.PDMS {
if service.Name != serviceName {
continue
}
for _, pdMember := range service.Members {
pdPeerClient := pdControl.GetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), serviceName,
pdMSPeerClient := pdControl.GetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), serviceName,
tc.IsTLSClusterEnabled(), pdapi.SpecifyClient(pdMember, pdMember))
err = pdPeerClient.GetHealth()
err = pdMSPeerClient.GetHealth()
if err == nil {
return nil
return pdMSPeerClient
}
}
}

return err
return nil
}

// NewFakePDClient creates a fake pdclient that is set as the pd client
Expand Down
10 changes: 0 additions & 10 deletions pkg/manager/member/pd_ms_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"path"
"strings"

"github.com/Masterminds/semver"
"github.com/pingcap/tidb-operator/pkg/apis/label"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
Expand Down Expand Up @@ -775,15 +774,6 @@ func (m *pdMSMemberManager) getPDMSConfigMap(tc *v1alpha1.TidbCluster, curSpec *
return cm, nil
}

// PDMSSupportMicroServices returns true if the given version of PDMS supports microservices.
func PDMSSupportMicroServices(version string) (bool, error) {
v, err := semver.NewVersion(version)
if err != nil {
return true, err
}
return v.Major() >= 7 && v.Minor() >= 2 && v.Patch() >= 0, nil
}

type FakePDMSMemberManager struct {
err error
}
Expand Down
33 changes: 16 additions & 17 deletions pkg/manager/member/pd_ms_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS status is nil, can not to be upgraded", ns, tcName)
}

componentName := controller.PDMSTrimName(newSet.Name)
klog.Infof("gracefulUpgrade pdMS trim name, componentName: %s", componentName)
if tc.Status.PDMS[componentName] == nil {
tc.Status.PDMS[componentName] = &v1alpha1.PDMSStatus{Name: componentName}
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS component is nil, can not to be upgraded, component: %s", ns, tcName, componentName)
curService := controller.PDMSTrimName(newSet.Name)
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, componentName: %s", ns, tcName, curService)
if tc.Status.PDMS[curService] == nil {
tc.Status.PDMS[curService] = &v1alpha1.PDMSStatus{Name: curService}
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS component is nil, can not to be upgraded, component: %s", ns, tcName, curService)
}
if !tc.Status.PDMS[componentName].Synced {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS status sync failed, can not to be upgraded, component: %s", ns, tcName, componentName)
if !tc.Status.PDMS[curService].Synced {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS status sync failed, can not to be upgraded, component: %s", ns, tcName, curService)
}
oldTrimName := controller.PDMSTrimName(oldSet.Name)
if oldTrimName != componentName {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS oldTrimName is %s, not equal to componentName: %s", ns, tcName, oldTrimName, componentName)
if oldTrimName != curService {
return fmt.Errorf("tidbcluster: [%s/%s]'s pdMS oldTrimName is %s, not equal to componentName: %s", ns, tcName, oldTrimName, curService)
}
klog.Infof("gracefulUpgrade pdMS trim name, oldTrimName: %s", oldTrimName)
klog.Infof("TidbCluster: [%s/%s]' gracefulUpgrade pdMS trim name, oldTrimName: %s", ns, tcName, oldTrimName)
if tc.PDMSScaling(oldTrimName) {
klog.Infof("TidbCluster: [%s/%s]'s pdMS status is %v, can not upgrade pdMS",
ns, tcName, tc.Status.PDMS[componentName].Phase)
ns, tcName, tc.Status.PDMS[curService].Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
return err
Expand All @@ -73,7 +73,7 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
return nil
}

tc.Status.PDMS[componentName].Phase = v1alpha1.UpgradePhase
tc.Status.PDMS[curService].Phase = v1alpha1.UpgradePhase
if !templateEqual(newSet, oldSet) {
return nil
}
Expand All @@ -84,7 +84,7 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
// If we encounter this situation, we will let the native statefulset controller do the upgrade completely, which may be unsafe for upgrading pdMS.
// Therefore, in the production environment, we should try to avoid modifying the pd statefulset update strategy directly.
newSet.Spec.UpdateStrategy = oldSet.Spec.UpdateStrategy
klog.Warningf("tidbcluster: [%s/%s] pdMS statefulset %s UpdateStrategy has been modified manually, componentName: %s", ns, tcName, oldSet.GetName(), componentName)
klog.Warningf("Tidbcluster: [%s/%s] pdMS statefulset %s UpdateStrategy has been modified manually, componentName: %s", ns, tcName, oldSet.GetName(), curService)
return nil
}

Expand All @@ -103,27 +103,26 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pdMS pod: [%s] has no label: %s", ns, tcName, podName, apps.ControllerRevisionHashLabelKey)
}

if revision == tc.Status.PDMS[componentName].StatefulSet.UpdateRevision {
if revision == tc.Status.PDMS[curService].StatefulSet.UpdateRevision {
if !k8s.IsPodReady(pod) {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s upgraded pdMS pod: [%s] is not ready", ns, tcName, podName)
}

var exist bool
for _, member := range tc.Status.PDMS[componentName].Members {
for _, member := range tc.Status.PDMS[curService].Members {
if strings.Contains(member, podName) {
exist = true
}
}
if !exist {
return controller.RequeueErrorf("tidbcluster: [%s/%s]'s pdMS upgraded pod: [%s] is not exist, all members: %v",
ns, tcName, podName, tc.Status.PDMS[componentName].Members)
ns, tcName, podName, tc.Status.PDMS[curService].Members)
}
continue
}
mngerutils.SetUpgradePartition(newSet, i)
return nil
}

return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/manager/member/pd_ms_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func TestPDMSUpgraderUpgrade(t *testing.T) {
for i := range tests {
testFn(&tests[i])
}

}

func newPDMSUpgrader() (Upgrader, podinformers.PodInformer) {
Expand Down
28 changes: 24 additions & 4 deletions pkg/manager/member/startscript/v2/pd_start_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/manager/member/constants"
"github.com/pingcap/tidb-operator/pkg/util/cmpver"
)

// PDStartScriptModel contain fields for rendering PD start script
Expand All @@ -47,6 +48,7 @@ type PDMSStartScriptModel struct {
PDInitWaitTime int
PDAddresses string

PDMSName string
PDMSDomain string
ListenAddr string
AdvertiseListenAddr string
Expand Down Expand Up @@ -100,7 +102,7 @@ func RenderPDStartScript(tc *v1alpha1.TidbCluster) (string, error) {
mode := ""
if tc.Spec.PD.Mode == "ms" && tc.Spec.PDMS != nil {
mode = "api"
// default enbled the dns detection
// default enabled the dns detection
waitForDnsNameIpMatchOnStartup = true
}
pdStartScriptTpl := template.Must(
Expand Down Expand Up @@ -136,6 +138,14 @@ func renderPDMSStartScript(tc *v1alpha1.TidbCluster, name string) (string, error
m.PDMSDomain = m.PDMSDomain + "." + tc.Spec.ClusterDomain
}

if check, err := pdMSSupportMicroServicesWithName.Check(tc.PDMSVersion(name)); check && err == nil {
m.PDMSName = "${PDMS_POD_NAME}"
if tc.Spec.ClusterDomain != "" {
m.PDMSName = m.PDMSDomain
}
name = fmt.Sprintf("%s --name=%s", name, m.PDMSName)
}

m.PDStartTimeout = tc.PDStartTimeout()

m.PDInitWaitTime = tc.PDInitWaitTime()
Expand Down Expand Up @@ -315,10 +325,20 @@ func replacePdStartScriptDnsAwaitPart(withLocalIpMatch bool, startScript string)
}
}

func enableMicroServiceModeDynamic(ms string, startScript string) string {
if ms != "" {
return strings.ReplaceAll(startScript, pdEnableMicroService, fmt.Sprintf(" %s %s ", pdEnableMicroServiceSubScript, ms))
// startParams has different values for different PD related service:
// - for original `PD`, startParams should be empty.
// - for `PD API` service, startParams should be `api`
// - for `TSO` and `Scheduling`, startParams should be `tso` and `scheduling` respectively.
// NOTICE: in `8.3.0` we have supported `name` start parameter, so we will pass `tso name=${PDMS_POD_NAME}` to startParams.
func enableMicroServiceModeDynamic(startParams string, startScript string) string {
if startParams != "" {
return strings.ReplaceAll(startScript, pdEnableMicroService, fmt.Sprintf(" %s %s ", pdEnableMicroServiceSubScript, startParams))
} else {
// for original `PD`, should be empty.
return strings.ReplaceAll(startScript, pdEnableMicroService, "")
}
}

// PDMSSupportMicroServicesWithName returns true if the given version of PDMS supports microservices with name.
// related https://github.com/tikv/pd/pull/8461.
var pdMSSupportMicroServicesWithName, _ = cmpver.NewConstraint(cmpver.GreaterOrEqual, "v8.3.0")
116 changes: 116 additions & 0 deletions pkg/manager/member/startscript/v2/pd_start_script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/onsi/gomega"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/stretchr/testify/require"
"k8s.io/utils/pointer"
)

func TestRenderPDStartScript(t *testing.T) {
Expand Down Expand Up @@ -793,6 +795,83 @@ ARGS=" services tso --listen-addr=http://0.0.0.0:2379 \
--config=/etc/pd/pd.toml \
"
echo "starting pd-server ..."
sleep $((RANDOM % 10))
echo "/pd-server ${ARGS}"
exec /pd-server ${ARGS}
exit 0
`,
},
{
name: "pdms with name(>= 8.3.0)",
modifyTC: func(tc *v1alpha1.TidbCluster) {
tc.Spec.ClusterDomain = "cluster-1.com"
tc.Spec.PDMS = []*v1alpha1.PDMSSpec{
{
ComponentSpec: v1alpha1.ComponentSpec{
Image: "pingcap/pd:v8.3.0",
},
Name: "tso",
},
}
},
expectScript: `#!/bin/sh
set -uo pipefail
ANNOTATIONS="/etc/podinfo/annotations"
if [[ ! -f "${ANNOTATIONS}" ]]
then
echo "${ANNOTATIONS} does't exist, exiting."
exit 1
fi
source ${ANNOTATIONS} 2>/dev/null
runmode=${runmode:-normal}
if [[ X${runmode} == Xdebug ]]
then
echo "entering debug mode."
tail -f /dev/null
fi
PDMS_POD_NAME=${POD_NAME:-$HOSTNAME}
PD_DOMAIN=${PDMS_POD_NAME}.start-script-test-tso-peer.start-script-test-ns.svc.cluster-1.com
elapseTime=0
period=1
threshold=30
while true; do
sleep ${period}
elapseTime=$(( elapseTime+period ))
if [[ ${elapseTime} -ge ${threshold} ]]; then
echo "waiting for pd cluster ready timeout" >&2
exit 1
fi
digRes=$(dig ${PD_DOMAIN} A ${PD_DOMAIN} AAAA +search +short)
if [ $? -ne 0 ]; then
echo "domain resolve ${PD_DOMAIN} failed"
echo "$digRes"
continue
fi
if [ -z "${digRes}" ]
then
echo "domain resolve ${PD_DOMAIN} no record return"
else
echo "domain resolve ${PD_DOMAIN} success"
echo "$digRes"
break
fi
done
ARGS=" services tso --name=${PDMS_POD_NAME}.start-script-test-tso-peer.start-script-test-ns.svc.cluster-1.com --listen-addr=http://0.0.0.0:2379 \
--advertise-listen-addr=http://${PD_DOMAIN}:2379 \
--backend-endpoints=http://start-script-test-pd:2379 \
--config=/etc/pd/pd.toml \
"
echo "starting pd-server ..."
sleep $((RANDOM % 10))
echo "/pd-server ${ARGS}"
Expand Down Expand Up @@ -822,3 +901,40 @@ exit 0
g.Expect(validateScript(script)).Should(gomega.Succeed())
}
}

func TestPDMSWithName(t *testing.T) {
re := require.New(t)
tc := &v1alpha1.TidbCluster{
Spec: v1alpha1.TidbClusterSpec{
PDMS: []*v1alpha1.PDMSSpec{
{
Name: "tso",
ComponentSpec: v1alpha1.ComponentSpec{
Image: "pd-test-image",
},
Replicas: 3,
StorageClassName: pointer.StringPtr("my-storage-class"),
},
},
},
}

for _, spec := range tc.Spec.PDMS {
spec.Image = "pingcap/pd:v8.2.0"
}
check, err := pdMSSupportMicroServicesWithName.Check(tc.PDMSVersion("tso"))
re.Nil(err)
re.False(check)
for _, spec := range tc.Spec.PDMS {
spec.Image = "pingcap/pd:v8.3.0"
}
check, err = pdMSSupportMicroServicesWithName.Check(tc.PDMSVersion("tso"))
re.Nil(err)
re.True(check)
for _, spec := range tc.Spec.PDMS {
spec.Image = "pingcap/pd:v9.1.0"
}
check, err = pdMSSupportMicroServicesWithName.Check(tc.PDMSVersion("tso"))
re.Nil(err)
re.True(check)
}
4 changes: 2 additions & 2 deletions pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func (m *tikvMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
// Check if all PD Micro Services are available
if tc.Spec.PDMS != nil && (tc.Spec.PD != nil && tc.Spec.PD.Mode == "ms") {
for _, pdms := range tc.Spec.PDMS {
if err = controller.GetPDMSClient(m.deps.PDControl, tc, pdms.Name); err != nil {
if cli := controller.GetPDMSClient(m.deps.PDControl, tc, pdms.Name); cli == nil {
return controller.RequeueErrorf("PDMS component %s for TidbCluster: [%s/%s], "+
"waiting for PD micro service cluster running, error: %v", pdms.Name, ns, tcName, err)
"waiting for PD micro service cluster running", pdms.Name, ns, tcName)
}
}
}
Expand Down
Loading

0 comments on commit c329624

Please sign in to comment.