Skip to content

Commit

Permalink
[Bugs] [ Refactor ] - Redis Cluster Scaling (#539)
Browse files Browse the repository at this point in the history
* Set up CI with Azure Pipelines

[skip ci]

* Update pipeline.yaml

* revert azure pipeline

Signed-off-by: Shubham Gupta <[email protected]>

* update the pipeline

Signed-off-by: Shubham Gupta <[email protected]>

* follow up for the cluster scaling

Signed-off-by: Shubham Gupta <[email protected]>

---------

Signed-off-by: Shubham Gupta <[email protected]>
  • Loading branch information
shubham-cmyk authored Jul 3, 2023
1 parent a570e6e commit 0d17bf9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 29 deletions.
8 changes: 3 additions & 5 deletions controllers/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,18 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{RequeueAfter: time.Second * 60}, err
}

// last Leader in redis cluster
lastLeaderPod := instance.Name + "-leader-" + strconv.Itoa(int(k8sutils.CheckRedisNodeCount(instance, "leader"))-1)
// Check if the cluster is downscaled
if leaderReplicas < k8sutils.CheckRedisNodeCount(instance, "leader") {

// Imp if the last index of leader sts is not leader make it then
// check whether the redis is leader or not ?
// if not true then make it leader pod

if !(k8sutils.VerifyLeaderPod(instance, lastLeaderPod)) {
if !(k8sutils.VerifyLeaderPod(instance)) {
// lastLeaderPod is slaving right now Make it the master Pod
// We have to bring a manual failover here to make it a leaderPod
// clusterFailover should also include the clusterReplicate since we have to map the followers to new leader
k8sutils.ClusterFailover(instance, lastLeaderPod)
k8sutils.ClusterFailover(instance)
}

// Step 1 Rehard the Cluster
Expand Down Expand Up @@ -166,7 +164,7 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}
} else {
if followerReplicas > 0 {
if followerReplicas > 0 && redisFollowerInfo.Status.ReadyReplicas == followerReplicas {
reqLogger.Info("All leader are part of the cluster, adding follower/replicas", "Leaders.Count", leaderCount, "Instance.Size", leaderReplicas, "Follower.Replicas", followerReplicas)
k8sutils.ExecuteRedisReplicationCommand(instance)
} else {
Expand Down
22 changes: 15 additions & 7 deletions k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,31 @@ func getRedisClusterSlots(cr *redisv1beta1.RedisCluster, nodeID string) string {

// getRedisNodeID would return nodeID of a redis node by passing pod
func getRedisNodeID(cr *redisv1beta1.RedisCluster, pod RedisDetails) string {

var client *redis.Client
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
client = configureRedisClient(cr, pod.PodName)
defer client.Close()

pong, err := client.Ping().Result()
if err != nil || pong != "PONG" {
logger.Error(err, "Failed to ping Redis server")
return ""
}

cmd := redis.NewStringCmd("cluster", "myid")
err := client.Process(cmd)
err = client.Process(cmd)
if err != nil {
logger.Error(err, "Redis command failed with this error")
return ""
}

output, err := cmd.Result()
if err != nil {
logger.Error(err, "Redis command failed with this error")
return ""
}
logger.Info("Redis node ID ", "is", output)

return output

}

// Rebalance the Redis CLuster using the Empty Master Nodes
Expand Down Expand Up @@ -179,7 +185,7 @@ func CheckIfEmptyMasters(cr *redisv1beta1.RedisCluster) {
podNodeID := getRedisNodeID(cr, pod)
podSlots := getRedisClusterSlots(cr, podNodeID)

if podSlots == "0" {
if podSlots == "0" || podSlots == "" {
logger.Info("Found Empty Redis Leader Node", "pod", pod)
RebalanceRedisClusterEmptyMasters(cr)
break
Expand Down Expand Up @@ -379,8 +385,9 @@ func RemoveRedisNodeFromCluster(cr *redisv1beta1.RedisCluster) {
}

// verifyLeaderPod return true if the pod is leader/master
func VerifyLeaderPod(cr *redisv1beta1.RedisCluster, podName string) bool {
func VerifyLeaderPod(cr *redisv1beta1.RedisCluster) bool {
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
podName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(cr, "leader"))-1)

redisClient := configureRedisClient(cr, podName)
defer redisClient.Close()
Expand All @@ -401,8 +408,9 @@ func VerifyLeaderPod(cr *redisv1beta1.RedisCluster, podName string) bool {
return false
}

func ClusterFailover(cr *redisv1beta1.RedisCluster, slavePodName string) {
func ClusterFailover(cr *redisv1beta1.RedisCluster) {
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(cr, "leader"))-1)
// cmd = redis-cli cluster failover -a <pass>

var cmd []string
Expand Down
49 changes: 32 additions & 17 deletions k8sutils/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,39 @@ func ExecuteRedisReplicationCommand(cr *redisv1beta1.RedisCluster) {
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
followerCounts := cr.Spec.GetReplicaCounts("follower")
leaderCounts := cr.Spec.GetReplicaCounts("leader")
followerPerLeader := followerCounts / leaderCounts

nodes := checkRedisCluster(cr)
for followerIdx := 0; followerIdx <= int(followerCounts)-1; followerIdx++ {
followerPod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-follower-" + strconv.Itoa(followerIdx),
Namespace: cr.Namespace,
}
leaderPod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(followerIdx)%int(leaderCounts)),
Namespace: cr.Namespace,
}
podIP = getRedisServerIP(followerPod)
if !checkRedisNodePresence(cr, nodes, podIP) {
logger.Info("Adding node to cluster.", "Node.IP", podIP, "Follower.Pod", followerPod)
cmd := createRedisReplicationCommand(cr, leaderPod, followerPod)
executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")
} else {
logger.Info("Skipping Adding node to cluster, already present.", "Follower.Pod", followerPod)
for followerIdx := 0; followerIdx <= int(followerCounts)-1; {
for i := 0; i < int(followerPerLeader) && followerIdx <= int(followerCounts)-1; i++ {
followerPod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-follower-" + strconv.Itoa(followerIdx),
Namespace: cr.Namespace,
}
leaderPod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(followerIdx)%int(leaderCounts)),
Namespace: cr.Namespace,
}
podIP = getRedisServerIP(followerPod)
if !checkRedisNodePresence(cr, nodes, podIP) {
logger.Info("Adding node to cluster.", "Node.IP", podIP, "Follower.Pod", followerPod)
cmd := createRedisReplicationCommand(cr, leaderPod, followerPod)
redisClient := configureRedisClient(cr, followerPod.PodName)
pong, err := redisClient.Ping().Result()
if err != nil {
logger.Error(err, "Failed to ping Redis server", "Follower.Pod", followerPod)
continue
}
if pong == "PONG" {
executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")
} else {
logger.Info("Skipping execution of command due to failed Redis ping", "Follower.Pod", followerPod)
}
} else {
logger.Info("Skipping Adding node to cluster, already present.", "Follower.Pod", followerPod)
}

followerIdx++
}
}
}
Expand Down Expand Up @@ -491,7 +507,6 @@ func checkRedisServerRole(cr *redisv1beta1.RedisReplication, podName string) str
}

return role

}

// checkAttachedSlave would return redis pod name which has slave
Expand Down

0 comments on commit 0d17bf9

Please sign in to comment.