Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major refactoring & implement standalone mode #12

Merged
merged 19 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions cmd/remote-work-processor/cmd_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"crypto/sha256"
"encoding/hex"
"flag"
"io"
"log"
"os"
)

type Options struct {
DisplayVersion bool
StandaloneMode bool
InstanceId string
MaxConnRetries uint
}

const (
standaloneModeOpt = "standalone-mode"
instanceIdOpt = "instance-id"
connRetriesOpt = "conn-retries"
versionOpt = "version"
)

func (opts *Options) BindFlags(fs *flag.FlagSet) {
hostname := getHashedHostname()

fs.BoolVar(&opts.StandaloneMode, standaloneModeOpt, false,
"Whether to run the Remote Work Processor in Standalone mode")
fs.StringVar(&opts.InstanceId, instanceIdOpt, hostname,
"Instance Identifier for the Remote Work Processor (only applicable for Standalone mode)")
fs.UintVar(&opts.MaxConnRetries, connRetriesOpt, 3, "Number of retries for gRPC connection to AutoPi server")
fs.BoolVar(&opts.DisplayVersion, versionOpt, false, "Display binary version and exit")
}

func getHashedHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Printf("could not get hostname: %v\n", err)
} else {
hasher := sha256.New()
io.WriteString(hasher, hostname)
hostname = hex.EncodeToString(hasher.Sum(nil))
}
return hostname
}
171 changes: 120 additions & 51 deletions cmd/remote-work-processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,80 +17,149 @@
package main

import (
// "flag"
// "os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.

"context"
"flag"
"fmt"
"github.com/SAP/remote-work-processor/internal/grpc"
"github.com/SAP/remote-work-processor/internal/grpc/processors"
"github.com/SAP/remote-work-processor/internal/kubernetes/controller"
meta "github.com/SAP/remote-work-processor/internal/kubernetes/metadata"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"log"
"os"

"os/signal"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"syscall"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// "sigs.k8s.io/controller-runtime/pkg/healthz"
// "sigs.k8s.io/controller-runtime/pkg/log/zap"
// "github.com/SAP/remote-work-processor/kubernetes/controllers"
"github.com/SAP/remote-work-processor/internal/grpc"
"github.com/SAP/remote-work-processor/internal/grpc/processors"
"github.com/SAP/remote-work-processor/internal/kubernetes/controller"
"github.com/SAP/remote-work-processor/internal/kubernetes/metadata"
//+kubebuilder:scaffold:imports
)

var (
scheme = runtime.NewScheme()
// Version of the Remote Work Processor.
// Injected at linking time via ldflags.
Version string
// BuildDate of the Remote Work Processor.
// Injected at linking time via ldflags.
BuildDate string
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

func main() {
metadata.InitRemoteWorkProcessorMetadata()
config := getKubeConfig()
opts := setupFlagsAndLogger()

e := controller.CreateManagerEngine(scheme, config)
processors.InitProcessorFactory(e)
grpc.InitRemoteWorkProcessorGrpcClient()
if opts.DisplayVersion {
fmt.Printf("rwp-%s Built: %s\n", Version, BuildDate)
return
}

opc := grpc.Client.Receive()
rootCtx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)

for {
op := <-opc
p, err := processors.Factory.CreateProcessor(op)
if err != nil {
log.Fatalf("Error occurred while creating operation processor: %v\n", err)
}
rwpMetadata := meta.LoadMetadata(opts.InstanceId, Version)
grpcClient := grpc.NewClient(rwpMetadata, opts.StandaloneMode)
var drainChan chan struct{}

res := <-p.Process()
if res.Err != nil {
log.Fatalf("Error occurred while processing operation: %v\n", err)
}
var factory processors.ProcessorFactory

if opts.StandaloneMode {
factory = processors.NewStandaloneProcessorFactory()
} else {
config := getKubeConfig()
scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme

if res.Result != nil {
grpc.Client.Send(res.Result)
drainChan = make(chan struct{}, 1)
engine := controller.CreateManagerEngine(scheme, config, grpcClient)
factory = processors.NewKubernetesProcessorFactory(engine, drainChan)
}

connAttemptChan := make(chan struct{}, 1)
connAttemptChan <- struct{}{}
var connAttempts uint = 0

Loop:
for connAttempts < opts.MaxConnRetries {
select {
case <-rootCtx.Done():
break Loop
case <-connAttemptChan:
err := grpcClient.InitSession(rootCtx, rwpMetadata.SessionID())
if err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
}
default:
operation, err := grpcClient.ReceiveMsg()
if err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
continue
}
if operation == nil {
// this flow is only when the backend closes the gRPC connection
connAttemptChan <- struct{}{}
// do not increment the retries, as this isn't a failure
continue
}

processor, err := factory.CreateProcessor(operation)
if err != nil {
log.Printf("error creating operation processor: %v\n", err)
continue
}

msg, err := processor.Process(rootCtx)
//TODO: not every error needs session reestablishment; make a custom error struct and only
// recreation the session based on error type
if err != nil {
//TODO: check how the backed handles the case when the client doesn't send a "confirm" message
// ensure there are retries in case there isn't a confirmation
signalRetry(&connAttempts, connAttemptChan, fmt.Errorf("error processing operation: %v", err))
continue
}
if msg == nil {
continue
}

if err = grpcClient.Send(msg); err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
}
}
}

if !opts.StandaloneMode {
// wait for context cancellation to be propagated to the k8s manager
<-drainChan
}
}

func getKubeConfig() *rest.Config {
rules := clientcmd.NewDefaultClientConfigLoadingRules()
overrides := &clientcmd.ConfigOverrides{}
func setupFlagsAndLogger() *Options {

Check failure on line 139 in cmd/remote-work-processor/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: Options
opts := &Options{}

Check failure on line 140 in cmd/remote-work-processor/main.go

View workflow job for this annotation

GitHub Actions / test

undefined: Options
opts.BindFlags(flag.CommandLine)

kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)
zapOpts := zap.Options{}
zapOpts.BindFlags(flag.CommandLine)

config, err := kubeConfig.ClientConfig()
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&zapOpts)))
return opts
}

func getKubeConfig() *rest.Config {
config, err := rest.InClusterConfig()
if err != nil {
os.Exit(1)
log.Fatalln("Could not create kubeconfig:", err)
}

return config
}

func signalRetry(attempts *uint, retryChan chan<- struct{}, err error) {
if err != nil {
log.Println(err)
}
retryChan <- struct{}{}
*attempts++
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.20

require (
github.com/itchyny/gojq v0.12.12
github.com/pkg/errors v0.9.1
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
k8s.io/apimachinery v0.26.1
Expand All @@ -20,6 +19,7 @@ require (
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/zapr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
Expand All @@ -39,11 +39,15 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sys v0.6.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -81,9 +83,11 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/zapr v1.2.3 h1:a9vnzlIBPQBBkeaR9IuMUfmVOrQlkoC4YfPoFkX3T7A=
github.com/go-logr/zapr v1.2.3/go.mod h1:eIauM6P8qSvTw5o2ez6UEAfGjQKrxQTl5EoK+Qa2oG4=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
Expand Down Expand Up @@ -260,6 +264,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand All @@ -271,9 +276,14 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down Expand Up @@ -438,6 +448,7 @@ golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down Expand Up @@ -574,6 +585,7 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
5 changes: 1 addition & 4 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package cache

type Cache[K comparable, V any] interface {
type MapCache[K comparable, V any] interface {
Read(k K) V
Write(k K, v V) V
Remove(k K)
Size() int
}

type MapCache[K comparable, V any] interface {
Cache[K, V]
FromMap(m map[K]V) MapCache[K, V]
ToMap() map[K]V
}
17 changes: 0 additions & 17 deletions internal/cache/errors.go

This file was deleted.

8 changes: 0 additions & 8 deletions internal/executors/enum.go

This file was deleted.

Loading
Loading