diff --git a/cmd/remote-work-processor/main.go b/cmd/remote-work-processor/main.go index 47bec66..c6fae65 100644 --- a/cmd/remote-work-processor/main.go +++ b/cmd/remote-work-processor/main.go @@ -25,6 +25,7 @@ import ( "github.com/SAP/remote-work-processor/internal/kubernetes/controller" meta "github.com/SAP/remote-work-processor/internal/kubernetes/metadata" "github.com/SAP/remote-work-processor/internal/opt" + "github.com/SAP/remote-work-processor/internal/utils" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -34,8 +35,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" "syscall" - "time" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -83,10 +82,10 @@ func main() { connAttemptChan := make(chan struct{}, 1) connAttemptChan <- struct{}{} - var connAttempts uint = 0 + retryConfig := utils.CreateRetryConfig(opts.RetryInterval, opts.RetryStrategy.Unmarshall(), connAttemptChan) Loop: - for connAttempts < opts.MaxConnRetries { + for retryConfig.GetAttempts() < opts.MaxConnRetries { select { case <-rootCtx.Done(): log.Println("Received cancellation signal. Stopping Remote Work Processor...") @@ -94,12 +93,12 @@ Loop: case <-connAttemptChan: err := grpcClient.InitSession(rootCtx, rwpMetadata.SessionID()) if err != nil { - signalRetry(rootCtx, &connAttempts, connAttemptChan, err) + utils.Retry(rootCtx, retryConfig, err) } default: operation, err := grpcClient.ReceiveMsg() if err != nil { - signalRetry(rootCtx, &connAttempts, connAttemptChan, err) + utils.Retry(rootCtx, retryConfig, err) continue } if operation == nil { @@ -118,7 +117,7 @@ Loop: msg, err := processor.Process(rootCtx) if err != nil { - signalRetry(rootCtx, &connAttempts, connAttemptChan, fmt.Errorf("error processing operation: %v", err)) + utils.Retry(rootCtx, retryConfig, fmt.Errorf("error processing operation: %v", err)) continue } if msg == nil { @@ -126,7 +125,7 @@ Loop: } if err = grpcClient.Send(msg); err != nil { - signalRetry(rootCtx, &connAttempts, connAttemptChan, err) + utils.Retry(rootCtx, retryConfig, err) } } } @@ -156,21 +155,3 @@ func getKubeConfig() *rest.Config { } return config } - -// TODO: this can be made more "enterprise": -// -// make the retry interval configurable via cmd option -// add a new option to specify whether the retry strategy is -// - "fixed" (retry on regular interval) -// - "exponential" (each subsequent retry is after a longer period of time) -func signalRetry(ctx context.Context, attempts *uint, retryChan chan<- struct{}, err error) { - log.Println(err) - log.Println("retrying after 10 seconds...") - select { - case <-ctx.Done(): - return - case <-time.After(10 * time.Second): - } - retryChan <- struct{}{} - *attempts++ -} diff --git a/internal/grpc/processors/update_configuration_processor.go b/internal/grpc/processors/update_configuration_processor.go index ee4e734..c2d77c1 100644 --- a/internal/grpc/processors/update_configuration_processor.go +++ b/internal/grpc/processors/update_configuration_processor.go @@ -28,7 +28,8 @@ func NewUpdateWatchConfigurationProcessor(op *pb.ServerMessage_UpdateConfigReque func (p UpdateWatchConfigurationProcessor) Process(ctx context.Context) (*pb.ClientMessage, error) { if !p.isEnabled() { log.Println("Unable to process watch config: Remote Worker is disabled.") - // what to return? + // this corner case can't happen unless there is a proxy between the AutoPi and the RWP + // because the AutoPi won't send any messages to disabled Operators return nil, nil } @@ -39,7 +40,8 @@ func (p UpdateWatchConfigurationProcessor) Process(ctx context.Context) (*pb.Cli if p.engine == nil { log.Println("Unable to process watch config: Remote Worker is running in standalone mode.") - // what to return? + // this corner case can't happen unless there is a proxy between the AutoPi and the RWP + // because the AutoPi won't send UpdateConfig messages to Standalone Operators return nil, nil } diff --git a/internal/opt/cmd_options.go b/internal/opt/cmd_options.go index f9179e3..6187138 100644 --- a/internal/opt/cmd_options.go +++ b/internal/opt/cmd_options.go @@ -3,11 +3,14 @@ package opt import ( "crypto/sha256" "encoding/hex" + "errors" "flag" + "github.com/SAP/remote-work-processor/internal/utils" "github.com/google/uuid" "io" "log" "os" + "time" ) type Options struct { @@ -15,13 +18,19 @@ type Options struct { StandaloneMode bool InstanceId string MaxConnRetries uint + RetryInterval time.Duration + RetryStrategy StrategyOpt } +type StrategyOpt utils.RetryStrategy + const ( standaloneModeOpt = "standalone-mode" instanceIdOpt = "instance-id" connRetriesOpt = "conn-retries" versionOpt = "version" + retryIntervalOpt = "retry-interval" + retryStrategyOpt = "retry-strategy" ) func (opts *Options) BindFlags(fs *flag.FlagSet) { @@ -33,6 +42,34 @@ func (opts *Options) BindFlags(fs *flag.FlagSet) { "Instance Identifier for the Remote Work Processor (only applicable for Standalone mode)") fs.UintVar(&opts.MaxConnRetries, connRetriesOpt, 6, "Number of retries for gRPC connection to AutoPi server") fs.BoolVar(&opts.DisplayVersion, versionOpt, false, "Display binary version and exit") + fs.DurationVar(&opts.RetryInterval, retryIntervalOpt, 10*time.Second, "Retry interval") + fs.Var(&opts.RetryStrategy, retryStrategyOpt, "Retry strategy [fixed, incr]") +} + +func (opt *StrategyOpt) String() string { + if len(*opt) == 0 { + return string(utils.RetryStrategyFixed) + } + return string(*opt) +} + +func (opt *StrategyOpt) Get() any { + if len(*opt) == 0 { + return utils.RetryStrategyFixed + } + return utils.RetryStrategy(*opt) +} + +func (opt *StrategyOpt) Set(value string) error { + if value != string(utils.RetryStrategyFixed) && value != utils.RetryStrategyIncremental { + return errors.New("invalid value for retry-strategy: " + value) + } + *opt = StrategyOpt(value) + return nil +} + +func (opt *StrategyOpt) Unmarshall() utils.RetryStrategy { + return opt.Get().(utils.RetryStrategy) } func getHashedHostname() string { diff --git a/internal/utils/retrier.go b/internal/utils/retrier.go index ef89ce2..a2ec8f1 100644 --- a/internal/utils/retrier.go +++ b/internal/utils/retrier.go @@ -2,6 +2,7 @@ package utils import ( "context" + "log" "time" ) @@ -9,27 +10,25 @@ type RetryStrategy string const ( RetryStrategyFixed RetryStrategy = "fixed" - RetryStrategyExponential = "expnt" + RetryStrategyIncremental = "incr" ) type RetryConfig struct { - retryInterval time.Duration - retryStrategy RetryStrategy - retryMultiplicationFactor float32 - signalChan chan<- struct{} - attempts uint + retryInterval time.Duration + retryStrategy RetryStrategy + signalChan chan<- struct{} + attempts uint } func CreateDefaultRetryConfig(signalChan chan<- struct{}) *RetryConfig { - return CreateRetryConfig(10*time.Second, RetryStrategyFixed, 1, signalChan) + return CreateRetryConfig(10*time.Second, RetryStrategyFixed, signalChan) } -func CreateRetryConfig(interval time.Duration, strategy RetryStrategy, factor float32, signalChan chan<- struct{}) *RetryConfig { +func CreateRetryConfig(interval time.Duration, strategy RetryStrategy, signalChan chan<- struct{}) *RetryConfig { return &RetryConfig{ - retryInterval: interval, - retryStrategy: strategy, - retryMultiplicationFactor: factor, - signalChan: signalChan, + retryInterval: interval, + retryStrategy: strategy, + signalChan: signalChan, } } @@ -41,11 +40,22 @@ func (conf *RetryConfig) getNextRetryInterval() time.Duration { attempts := conf.attempts conf.attempts++ if conf.retryStrategy == RetryStrategyFixed { - return time.Duration(attempts+1) * conf.retryInterval + return conf.retryInterval } - return 0 + if conf.retryStrategy == RetryStrategyIncremental { + return time.Duration(float32(attempts+1)*1.75) * conf.retryInterval + } + return 0 //unreachable } func Retry(ctx context.Context, config *RetryConfig, err error) { - + log.Println(err) + nextRetryInterval := config.getNextRetryInterval() + log.Println("retrying after", nextRetryInterval) + select { + case <-ctx.Done(): + return + case <-time.After(nextRetryInterval): + } + config.signalChan <- struct{}{} }