Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major refactoring & implement standalone mode #12

Merged
merged 19 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions cmd/remote-work-processor/cmd_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"crypto/sha256"
"encoding/hex"
"flag"
"io"
"log"
"os"
)

type Options struct {
DisplayVersion bool
StandaloneMode bool
InstanceId string
MaxConnRetries uint
}

const (
standaloneModeOpt = "standalone-mode"
instanceIdOpt = "instance-id"
connRetriesOpt = "conn-retries"
versionOpt = "version"
)

func (opts *Options) BindFlags(fs *flag.FlagSet) {
hostname := getHashedHostname()

fs.BoolVar(&opts.StandaloneMode, standaloneModeOpt, false,
"Whether to run the Remote Work Processor in Standalone mode")
fs.StringVar(&opts.InstanceId, instanceIdOpt, hostname,
"Instance Identifier for the Remote Work Processor (only applicable for Standalone mode)")
fs.UintVar(&opts.MaxConnRetries, connRetriesOpt, 3, "Number of retries for gRPC connection to AutoPi server")
fs.BoolVar(&opts.DisplayVersion, versionOpt, false, "Display binary version and exit")
}

func getHashedHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Printf("could not get hostname: %v\n", err)
} else {
hasher := sha256.New()
io.WriteString(hasher, hostname)
hostname = hex.EncodeToString(hasher.Sum(nil))
}
return hostname
}
169 changes: 118 additions & 51 deletions cmd/remote-work-processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,80 +17,147 @@
package main

import (
// "flag"
// "os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.

"context"
"flag"
"fmt"
"github.com/SAP/remote-work-processor/internal/grpc"
"github.com/SAP/remote-work-processor/internal/grpc/processors"
"github.com/SAP/remote-work-processor/internal/kubernetes/controller"
meta "github.com/SAP/remote-work-processor/internal/kubernetes/metadata"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"log"
"os"

"os/signal"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"syscall"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// "sigs.k8s.io/controller-runtime/pkg/healthz"
// "sigs.k8s.io/controller-runtime/pkg/log/zap"
// "github.com/SAP/remote-work-processor/kubernetes/controllers"
"github.com/SAP/remote-work-processor/internal/grpc"
"github.com/SAP/remote-work-processor/internal/grpc/processors"
"github.com/SAP/remote-work-processor/internal/kubernetes/controller"
"github.com/SAP/remote-work-processor/internal/kubernetes/metadata"
//+kubebuilder:scaffold:imports
)

var (
scheme = runtime.NewScheme()
// Version of the Remote Work Processor.
// Injected at linking time via ldflags.
Version string
// BuildDate of the Remote Work Processor.
// Injected at linking time via ldflags.
BuildDate string
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

func main() {
metadata.InitRemoteWorkProcessorMetadata()
config := getKubeConfig()
opts := setupFlagsAndLogger()

e := controller.CreateManagerEngine(scheme, config)
processors.InitProcessorFactory(e)
grpc.InitRemoteWorkProcessorGrpcClient()
if opts.DisplayVersion {
fmt.Printf("rwp-%s Built: %s\n", Version, BuildDate)
return
}

opc := grpc.Client.Receive()
rootCtx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)

for {
op := <-opc
p, err := processors.Factory.CreateProcessor(op)
if err != nil {
log.Fatalf("Error occurred while creating operation processor: %v\n", err)
}
rwpMetadata := meta.LoadMetadata(opts.InstanceId, Version)
grpcClient := grpc.NewClient(rwpMetadata, opts.StandaloneMode)
var drainChan chan struct{}

res := <-p.Process()
if res.Err != nil {
log.Fatalf("Error occurred while processing operation: %v\n", err)
}
var factory processors.ProcessorFactory

if opts.StandaloneMode {
factory = processors.NewStandaloneProcessorFactory()
} else {
config := getKubeConfig()
scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme

if res.Result != nil {
grpc.Client.Send(res.Result)
drainChan = make(chan struct{}, 1)
engine := controller.CreateManagerEngine(scheme, config, grpcClient)
factory = processors.NewKubernetesProcessorFactory(engine, drainChan)
}

connAttemptChan := make(chan struct{}, 1)
connAttemptChan <- struct{}{}
var connAttempts uint = 0

Loop:
for connAttempts < opts.MaxConnRetries {
select {
case <-rootCtx.Done():
log.Println("Received cancellation signal. Stopping Remote Work Processor...")
break Loop
case <-connAttemptChan:
err := grpcClient.InitSession(rootCtx, rwpMetadata.SessionID())
if err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
}
default:
operation, err := grpcClient.ReceiveMsg()
if err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
continue
}
if operation == nil {
// this flow is when the gRPC connection is closed (either by the server or the context has been cancelled)
connAttemptChan <- struct{}{}
// do not increment the retries, as this isn't a failure
continue
}

log.Printf("Creating processor for operation: %T\n", operation.Body)
processor, err := factory.CreateProcessor(operation)
if err != nil {
log.Printf("error creating operation processor: %v\n", err)
continue
}

msg, err := processor.Process(rootCtx)
if err != nil {
signalRetry(&connAttempts, connAttemptChan, fmt.Errorf("error processing operation: %v", err))
continue
}
if msg == nil {
continue
}

if err = grpcClient.Send(msg); err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
}
}
}

if !opts.StandaloneMode {
// wait for context cancellation to be propagated to the k8s manager
<-drainChan
}
}

func getKubeConfig() *rest.Config {
rules := clientcmd.NewDefaultClientConfigLoadingRules()
overrides := &clientcmd.ConfigOverrides{}
func setupFlagsAndLogger() *Options {

Check failure on line 137 in cmd/remote-work-processor/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: Options
opts := &Options{}

Check failure on line 138 in cmd/remote-work-processor/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: Options
opts.BindFlags(flag.CommandLine)

kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)
zapOpts := zap.Options{}
zapOpts.BindFlags(flag.CommandLine)

config, err := kubeConfig.ClientConfig()
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&zapOpts)))
return opts
}

func getKubeConfig() *rest.Config {
config, err := rest.InClusterConfig()
if err != nil {
os.Exit(1)
log.Fatalln("Could not create kubeconfig:", err)
}

return config
}

func signalRetry(attempts *uint, retryChan chan<- struct{}, err error) {
if err != nil {
log.Println(err)
}
retryChan <- struct{}{}
*attempts++
}
22 changes: 13 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ go 1.20

require (
github.com/itchyny/gojq v0.12.12
github.com/pkg/errors v0.9.1
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.31.0
k8s.io/apimachinery v0.26.1
k8s.io/client-go v0.26.1
sigs.k8s.io/controller-runtime v0.14.6
Expand All @@ -20,6 +19,7 @@ require (
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/zapr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
Expand All @@ -39,20 +39,24 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading
Loading