Skip to content

Commit

Permalink
Merge branch 'master' into FixTiFlashStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
KanShiori authored Jul 25, 2022
2 parents b990129 + a641d3d commit bdb8aad
Show file tree
Hide file tree
Showing 11 changed files with 1,031 additions and 19 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ const (
AnnSysctlInit = "tidb.pingcap.com/sysctl-init"
// AnnEvictLeaderBeginTime is pod annotation key to indicate the begin time for evicting region leader
AnnEvictLeaderBeginTime = "tidb.pingcap.com/evictLeaderBeginTime"
// AnnTiCDCGracefulShutdownBeginTime is pod annotation key to indicate the begin time for graceful shutdown TiCDC
AnnTiCDCGracefulShutdownBeginTime = "tidb.pingcap.com/ticdc-graceful-shutdown-begin-time"
// AnnStsLastSyncTimestamp is sts annotation key to indicate the last timestamp the operator sync the sts
AnnStsLastSyncTimestamp = "tidb.pingcap.com/sync-timestamp"

Expand Down
37 changes: 29 additions & 8 deletions pkg/apis/pingcap/v1alpha1/tidbcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,14 +569,6 @@ func (tc *TidbCluster) TiFlashStsDesiredReplicas() int32 {
return tc.Spec.TiFlash.Replicas + int32(len(tc.Status.TiFlash.FailureStores))
}

func (tc *TidbCluster) TiCDCDeployDesiredReplicas() int32 {
if tc.Spec.TiCDC == nil {
return 0
}

return tc.Spec.TiCDC.Replicas
}

func (tc *TidbCluster) TiFlashStsActualReplicas() int32 {
stsStatus := tc.Status.TiFlash.StatefulSet
if stsStatus == nil {
Expand All @@ -596,6 +588,35 @@ func (tc *TidbCluster) TiFlashStsDesiredOrdinals(excludeFailover bool) sets.Int3
return GetPodOrdinalsFromReplicasAndDeleteSlots(replicas, tc.getDeleteSlots(label.TiFlashLabelVal))
}

// TiCDCAllCapturesReady return whether all captures of TiCDC are ready.
//
// If TiCDC isn't specified, return false.
func (tc *TidbCluster) TiCDCAllCapturesReady() bool {
if tc.Spec.TiCDC == nil {
return false
}

if int(tc.TiCDCDeployDesiredReplicas()) != len(tc.Status.TiCDC.Captures) {
return false
}

for _, c := range tc.Status.TiCDC.Captures {
if !c.Ready {
return false
}
}

return true
}

func (tc *TidbCluster) TiCDCDeployDesiredReplicas() int32 {
if tc.Spec.TiCDC == nil {
return 0
}

return tc.Spec.TiCDC.Replicas
}

// TiDBAllPodsStarted return whether all pods of TiDB are started.
//
// If TiDB isn't specified, return false.
Expand Down
219 changes: 217 additions & 2 deletions pkg/controller/ticdc_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
package controller

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
httputil "github.com/pingcap/tidb-operator/pkg/util/http"
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
)

type CaptureStatus struct {
Expand All @@ -27,10 +33,35 @@ type CaptureStatus struct {
IsOwner bool `json:"is_owner"`
}

type captureInfo struct {
ID string `json:"id"`
IsOwner bool `json:"is_owner"`
AdvertiseAddr string `json:"address"`
}

// drainCaptureRequest is request for manual `DrainCapture`
type drainCaptureRequest struct {
CaptureID string `json:"capture_id"`
}

// drainCaptureResp is response for manual `DrainCapture`
type drainCaptureResp struct {
CurrentTableCount int `json:"current_table_count"`
}

// TiCDCControlInterface is the interface that knows how to manage ticdc captures
type TiCDCControlInterface interface {
// GetStatus returns ticdc's status
GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) (*CaptureStatus, error)
// DrainCapture remove capture ownership and moves its tables to other captures.
// Returns the number of tables in the capture.
// If there is only one capture, it always return 0.
DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error)
// ResignOwner tries to resign ownership from the current capture.
// Returns true if the capture has already resigned ownership,
// otherwise caller should retry resign owner.
// If there is only one capture, it always return true.
ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error)
}

// defaultTiCDCControl is default implementation of TiCDCControlInterface.
Expand Down Expand Up @@ -63,17 +94,193 @@ func (c *defaultTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32)
return &status, err
}

func (c *defaultTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (int, bool, error) {
httpClient, err := c.getHTTPClient(tc)
if err != nil {
klog.Warningf("ticdc control: drain capture is failed, error: %v", err)
return 0, false, err
}

baseURL := c.getBaseURL(tc, ordinal)

captures, retry, err := getCaptures(httpClient, baseURL)
if err != nil {
klog.Warningf("ticdc control: drain capture is failed, error: %v", err)
return 0, false, err
}
if retry {
// Let caller retry drain capture.
return 0, true, nil
}
if len(captures) == 1 {
// No way to drain a single node TiCDC cluster, ignore.
return 0, false, nil
}

this, owner := getOrdinalAndOwnerCaptureInfo(tc, ordinal, captures)
if this == nil {
addr := getCaptureAdvertiseAddressPrefix(tc, ordinal)
return 0, false, fmt.Errorf("capture not found, address: %s, captures: %+v", addr, captures)
}
if owner == nil {
return 0, false, fmt.Errorf("owner not found, captures: %+v", captures)
}

payload := drainCaptureRequest{
CaptureID: this.ID,
}
payloadBody, err := json.Marshal(payload)
if err != nil {
return 0, false, fmt.Errorf("ticdc drain capture failed, marshal request error: %v", err)
}
req, err := http.NewRequest("PUT", baseURL+"/api/v1/captures/drain", bytes.NewReader(payloadBody))
if err != nil {
return 0, false, fmt.Errorf("ticdc drain capture failed, new request error: %v", err)
}
res, err := httpClient.Do(req)
if err != nil {
return 0, false, fmt.Errorf("ticdc drain capture failed, request error: %v", err)
}
defer httputil.DeferClose(res.Body)
if res.StatusCode == http.StatusNotFound {
// It is likely the TiCDC does not support the API, ignore.
klog.Infof("ticdc control: %s does not support drain capture, skip", this.AdvertiseAddr)
return 0, false, nil
}
if res.StatusCode == http.StatusServiceUnavailable {
// TiCDC is not ready, retry.
klog.Infof("ticdc control: %s service unavailable drain capture, retry", this.AdvertiseAddr)
return 0, true, nil
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return 0, false, fmt.Errorf("ticdc drain capture failed, read response error: %v", err)
}

var resp drainCaptureResp
err = json.Unmarshal(body, &resp)
if err != nil {
// It is likely the TiCDC does not support the API, ignore.
return 0, false, nil
}
return resp.CurrentTableCount, false, nil
}

func (c *defaultTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (bool, error) {
httpClient, err := c.getHTTPClient(tc)
if err != nil {
klog.Warningf("ticdc control: resign owner failed, error: %v", err)
return false, err
}

baseURL := c.getBaseURL(tc, ordinal)
captures, retry, err := getCaptures(httpClient, baseURL)
if err != nil {
klog.Warningf("ticdc control: resign owner failed, error: %v", err)
return false, err
}
if retry {
// Let caller retry resign owner.
return false, nil
}
if len(captures) == 1 {
// No way to resign owner in a single node TiCDC cluster, ignore.
return true, nil
}

this, owner := getOrdinalAndOwnerCaptureInfo(tc, ordinal, captures)
if owner != nil && this != nil {
if owner.ID != this.ID {
// Ownership has been transferred another capture.
return true, nil
}
} else {
// Owner or this capture not found, resign ownership from the capture is
// meaning less, ignore.
return true, nil
}

res, err := httpClient.Post(baseURL+"/api/v1/owner/resign", "", nil)
if err != nil {
return false, fmt.Errorf("ticdc resign owner failed, request error: %v", err)
}
httputil.DeferClose(res.Body)
if res.StatusCode == http.StatusNotFound {
// It is likely the TiCDC does not support the API, ignore.
klog.Infof("ticdc control: %s does not support resign owner, skip", this.AdvertiseAddr)
return true, nil
}
if res.StatusCode == http.StatusServiceUnavailable {
// Let caller retry resign owner.
klog.Infof("ticdc control: %s service unavailable resign owner, retry", this.AdvertiseAddr)
return false, nil
}
return false, nil
}

func (c *defaultTiCDCControl) getBaseURL(tc *v1alpha1.TidbCluster, ordinal int32) string {
if c.testURL != "" {
return c.testURL
}

scheme := tc.Scheme()
addr := getCaptureAdvertiseAddressPrefix(tc, ordinal)
return fmt.Sprintf("%s://%s:8301", scheme, addr)
}

// getCaptureAdvertiseAddressPrefix is the prefix of TiCDC advertiseAddress
// which is composed by ${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}.svc.${ClusterDomain}:8301
// this function return a string "${POD_NAME}.${HEADLESS_SERVICE_NAME}.${NAMESPACE}"
func getCaptureAdvertiseAddressPrefix(tc *v1alpha1.TidbCluster, ordinal int32) string {
tcName := tc.GetName()
ns := tc.GetNamespace()
scheme := tc.Scheme()
hostName := fmt.Sprintf("%s-%d", TiCDCMemberName(tcName), ordinal)

return fmt.Sprintf("%s://%s.%s.%s:8301", scheme, hostName, TiCDCPeerMemberName(tcName), ns)
return fmt.Sprintf("%s.%s.%s", hostName, TiCDCPeerMemberName(tcName), ns)
}

func getCaptures(httpClient *http.Client, baseURL string) ([]captureInfo, bool, error) {
res, err := httpClient.Get(baseURL + "/api/v1/captures")
if err != nil {
return nil, false, fmt.Errorf("ticdc get captures failed, request error: %v", err)
}
defer httputil.DeferClose(res.Body)
if res.StatusCode == http.StatusNotFound {
// It is likely the TiCDC does not support the API, ignore.
return nil, false, nil
}
if res.StatusCode == http.StatusServiceUnavailable {
// TiCDC is not ready, retry.
return nil, true, nil
}

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, false, fmt.Errorf("ticdc get captures failed, read response error: %v", err)
}
var resp []captureInfo
err = json.Unmarshal(body, &resp)
if err != nil {
// It is likely the TiCDC does not support the API, ignore.
return nil, false, nil
}
return resp, false, nil
}

func getOrdinalAndOwnerCaptureInfo(
tc *v1alpha1.TidbCluster, ordinal int32, captures []captureInfo,
) (this, owner *captureInfo) {
addrPrefix := getCaptureAdvertiseAddressPrefix(tc, ordinal)
for i := range captures {
cp := &captures[i]
if strings.Contains(cp.AdvertiseAddr, addrPrefix) {
this = cp
}
if cp.IsOwner {
owner = cp
}
}
return
}

// FakeTiCDCControl is a fake implementation of TiCDCControlInterface.
Expand All @@ -97,3 +304,11 @@ func (c *FakeTiCDCControl) GetStatus(tc *v1alpha1.TidbCluster, ordinal int32) (*
}
return c.getStatus(tc, ordinal)
}

func (c *FakeTiCDCControl) DrainCapture(tc *v1alpha1.TidbCluster, ordinal int32) (tableCount int, retry bool, err error) {
return 0, false, nil
}

func (c *FakeTiCDCControl) ResignOwner(tc *v1alpha1.TidbCluster, ordinal int32) (ok bool, err error) {
return true, nil
}
Loading

0 comments on commit bdb8aad

Please sign in to comment.