Skip to content

Commit

Permalink
Add more configurable retrier
Browse files Browse the repository at this point in the history
  • Loading branch information
radito3 committed May 11, 2024
1 parent d34ce92 commit 2b0a525
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 43 deletions.
33 changes: 7 additions & 26 deletions cmd/remote-work-processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/SAP/remote-work-processor/internal/kubernetes/controller"
meta "github.com/SAP/remote-work-processor/internal/kubernetes/metadata"
"github.com/SAP/remote-work-processor/internal/opt"
"github.com/SAP/remote-work-processor/internal/utils"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -34,8 +35,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"syscall"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down Expand Up @@ -83,23 +82,23 @@ func main() {

connAttemptChan := make(chan struct{}, 1)
connAttemptChan <- struct{}{}
var connAttempts uint = 0
retryConfig := utils.CreateRetryConfig(opts.RetryInterval, opts.RetryStrategy.Unmarshall(), connAttemptChan)

Loop:
for connAttempts < opts.MaxConnRetries {
for retryConfig.GetAttempts() < opts.MaxConnRetries {
select {
case <-rootCtx.Done():
log.Println("Received cancellation signal. Stopping Remote Work Processor...")
break Loop
case <-connAttemptChan:
err := grpcClient.InitSession(rootCtx, rwpMetadata.SessionID())
if err != nil {
signalRetry(rootCtx, &connAttempts, connAttemptChan, err)
utils.Retry(rootCtx, retryConfig, err)
}
default:
operation, err := grpcClient.ReceiveMsg()
if err != nil {
signalRetry(rootCtx, &connAttempts, connAttemptChan, err)
utils.Retry(rootCtx, retryConfig, err)
continue
}
if operation == nil {
Expand All @@ -118,15 +117,15 @@ Loop:

msg, err := processor.Process(rootCtx)
if err != nil {
signalRetry(rootCtx, &connAttempts, connAttemptChan, fmt.Errorf("error processing operation: %v", err))
utils.Retry(rootCtx, retryConfig, fmt.Errorf("error processing operation: %v", err))
continue
}
if msg == nil {
continue
}

if err = grpcClient.Send(msg); err != nil {
signalRetry(rootCtx, &connAttempts, connAttemptChan, err)
utils.Retry(rootCtx, retryConfig, err)
}
}
}
Expand Down Expand Up @@ -156,21 +155,3 @@ func getKubeConfig() *rest.Config {
}
return config
}

// TODO: this can be made more "enterprise":
//
// make the retry interval configurable via cmd option
// add a new option to specify whether the retry strategy is
// - "fixed" (retry on regular interval)
// - "exponential" (each subsequent retry is after a longer period of time)
func signalRetry(ctx context.Context, attempts *uint, retryChan chan<- struct{}, err error) {
log.Println(err)
log.Println("retrying after 10 seconds...")
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
}
retryChan <- struct{}{}
*attempts++
}
6 changes: 4 additions & 2 deletions internal/grpc/processors/update_configuration_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func NewUpdateWatchConfigurationProcessor(op *pb.ServerMessage_UpdateConfigReque
func (p UpdateWatchConfigurationProcessor) Process(ctx context.Context) (*pb.ClientMessage, error) {
if !p.isEnabled() {
log.Println("Unable to process watch config: Remote Worker is disabled.")
// what to return?
// this corner case can't happen unless there is a proxy between the AutoPi and the RWP
// because the AutoPi won't send any messages to disabled Operators
return nil, nil
}

Expand All @@ -39,7 +40,8 @@ func (p UpdateWatchConfigurationProcessor) Process(ctx context.Context) (*pb.Cli

if p.engine == nil {
log.Println("Unable to process watch config: Remote Worker is running in standalone mode.")
// what to return?
// this corner case can't happen unless there is a proxy between the AutoPi and the RWP
// because the AutoPi won't send UpdateConfig messages to Standalone Operators
return nil, nil
}

Expand Down
37 changes: 37 additions & 0 deletions internal/opt/cmd_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,34 @@ package opt
import (
"crypto/sha256"
"encoding/hex"
"errors"
"flag"
"github.com/SAP/remote-work-processor/internal/utils"
"github.com/google/uuid"
"io"
"log"
"os"
"time"
)

type Options struct {
DisplayVersion bool
StandaloneMode bool
InstanceId string
MaxConnRetries uint
RetryInterval time.Duration
RetryStrategy StrategyOpt
}

type StrategyOpt utils.RetryStrategy

const (
standaloneModeOpt = "standalone-mode"
instanceIdOpt = "instance-id"
connRetriesOpt = "conn-retries"
versionOpt = "version"
retryIntervalOpt = "retry-interval"
retryStrategyOpt = "retry-strategy"
)

func (opts *Options) BindFlags(fs *flag.FlagSet) {
Expand All @@ -33,6 +42,34 @@ func (opts *Options) BindFlags(fs *flag.FlagSet) {
"Instance Identifier for the Remote Work Processor (only applicable for Standalone mode)")
fs.UintVar(&opts.MaxConnRetries, connRetriesOpt, 6, "Number of retries for gRPC connection to AutoPi server")
fs.BoolVar(&opts.DisplayVersion, versionOpt, false, "Display binary version and exit")
fs.DurationVar(&opts.RetryInterval, retryIntervalOpt, 10*time.Second, "Retry interval")
fs.Var(&opts.RetryStrategy, retryStrategyOpt, "Retry strategy [fixed, incr]")
}

func (opt *StrategyOpt) String() string {
if len(*opt) == 0 {
return string(utils.RetryStrategyFixed)
}
return string(*opt)
}

func (opt *StrategyOpt) Get() any {
if len(*opt) == 0 {
return utils.RetryStrategyFixed
}
return utils.RetryStrategy(*opt)
}

func (opt *StrategyOpt) Set(value string) error {
if value != string(utils.RetryStrategyFixed) && value != utils.RetryStrategyIncremental {
return errors.New("invalid value for retry-strategy: " + value)
}
*opt = StrategyOpt(value)
return nil
}

func (opt *StrategyOpt) Unmarshall() utils.RetryStrategy {
return opt.Get().(utils.RetryStrategy)
}

func getHashedHostname() string {
Expand Down
40 changes: 25 additions & 15 deletions internal/utils/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,33 @@ package utils

import (
"context"
"log"
"time"
)

type RetryStrategy string

const (
RetryStrategyFixed RetryStrategy = "fixed"
RetryStrategyExponential = "expnt"
RetryStrategyIncremental = "incr"
)

type RetryConfig struct {
retryInterval time.Duration
retryStrategy RetryStrategy
retryMultiplicationFactor float32
signalChan chan<- struct{}
attempts uint
retryInterval time.Duration
retryStrategy RetryStrategy
signalChan chan<- struct{}
attempts uint
}

func CreateDefaultRetryConfig(signalChan chan<- struct{}) *RetryConfig {
return CreateRetryConfig(10*time.Second, RetryStrategyFixed, 1, signalChan)
return CreateRetryConfig(10*time.Second, RetryStrategyFixed, signalChan)
}

func CreateRetryConfig(interval time.Duration, strategy RetryStrategy, factor float32, signalChan chan<- struct{}) *RetryConfig {
func CreateRetryConfig(interval time.Duration, strategy RetryStrategy, signalChan chan<- struct{}) *RetryConfig {
return &RetryConfig{
retryInterval: interval,
retryStrategy: strategy,
retryMultiplicationFactor: factor,
signalChan: signalChan,
retryInterval: interval,
retryStrategy: strategy,
signalChan: signalChan,
}
}

Expand All @@ -41,11 +40,22 @@ func (conf *RetryConfig) getNextRetryInterval() time.Duration {
attempts := conf.attempts
conf.attempts++
if conf.retryStrategy == RetryStrategyFixed {
return time.Duration(attempts+1) * conf.retryInterval
return conf.retryInterval
}
return 0
if conf.retryStrategy == RetryStrategyIncremental {
return time.Duration(float32(attempts+1)*1.75) * conf.retryInterval
}
return 0 //unreachable
}

func Retry(ctx context.Context, config *RetryConfig, err error) {

log.Println(err)
nextRetryInterval := config.getNextRetryInterval()
log.Println("retrying after", nextRetryInterval)
select {
case <-ctx.Done():
return
case <-time.After(nextRetryInterval):
}
config.signalChan <- struct{}{}
}

0 comments on commit 2b0a525

Please sign in to comment.