Skip to content

Commit

Permalink
Add configuration options to Kube Cache service (#1304)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Oct 31, 2024
1 parent ac81b7d commit bce5cf4
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 27 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ linters:
- errorlint
- cyclop
- errname
- exportloopref
- gocritic
- goimports
- gosimple
Expand Down
69 changes: 52 additions & 17 deletions cmd/k8s-cache/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,60 @@ package main

import (
"context"
"flag"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/grafana/beyla/pkg/buildinfo"
"github.com/grafana/beyla/pkg/kubecache"
"github.com/grafana/beyla/pkg/kubecache/meta"
"github.com/grafana/beyla/pkg/kubecache/service"
)

const defaultPort = 50055

// main code of te Kubernetes K8s informer's metadata cache service, when it runs as a separate service and not
// as a library inside Beyla

func main() {
// TODO: use buildinfo to print version
// TODO: let configure logger
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{AddSource: true, Level: slog.LevelDebug})))
lvl := slog.LevelVar{}
lvl.Set(slog.LevelInfo)
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: &lvl,
})))

slog.Info("Beyla's Kubernetes Metadata cache service", "Version", buildinfo.Version, "Revision", buildinfo.Revision)

ic := service.InformersCache{
Port: defaultPort,
configPath := flag.String("config", "", "path to the configuration file")
flag.Parse()
if cfg := os.Getenv("BEYLA_K8S_CACHE_CONFIG_PATH"); cfg != "" {
configPath = &cfg
}
portStr := os.Getenv("BEYLA_K8S_CACHE_PORT")
if portStr != "" {
var err error
if ic.Port, err = strconv.Atoi(portStr); err != nil {
slog.Error("invalid BEYLA_K8S_CACHE_PORT, using default port", "error", err)
ic.Port = defaultPort
}
config := loadFromFile(configPath)
if err := lvl.UnmarshalText([]byte(config.LogLevel)); err != nil {
slog.Error("unknown log level specified, choices are [DEBUG, INFO, WARN, ERROR]", "error", err)
os.Exit(-1)
}

if config.ProfilePort != 0 {
go func() {
slog.Info("starting PProf HTTP listener", "port", config.ProfilePort)
err := http.ListenAndServe(fmt.Sprintf(":%d", config.ProfilePort), nil)
slog.Error("PProf HTTP listener stopped working", "error", err)
}()
}

ic := service.InformersCache{Config: config}

// Adding shutdown hook for graceful stop.
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

if err := ic.Run(ctx,
// TODO: make it configurable
meta.WithResyncPeriod(30*time.Minute)); err != nil {
meta.WithResyncPeriod(config.InformerResyncPeriod)); err != nil {
slog.Error("starting informers' cache service", "error", err)
os.Exit(-1)
}
Expand All @@ -51,3 +66,23 @@ func main() {
time.Sleep(time.Second)
}
}

func loadFromFile(configPath *string) *kubecache.Config {
var configReader io.ReadCloser
if configPath != nil && *configPath != "" {
var err error
if configReader, err = os.Open(*configPath); err != nil {
slog.Error("can't open "+*configPath, "error", err)
os.Exit(-1)
}
defer configReader.Close()
}
config, err := kubecache.LoadConfig(configReader)
if err != nil {
slog.Error("wrong configuration", "error", err)
// nolint:gocritic
os.Exit(-1)
}

return config
}
53 changes: 53 additions & 0 deletions pkg/kubecache/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package kubecache

import (
"fmt"
"io"
"time"

"github.com/caarlos0/env/v9"
"gopkg.in/yaml.v3"
)

// Config options of the Kubernetes Cache service. Check the "DefaultConfig" variable for a view of the default values.
type Config struct {
// LogLevel can be one of: debug, info, warn, error
LogLevel string `yaml:"log_level" env:"BEYLA_K8S_CACHE_LOG_LEVEL"`
// Port where the service is going to listen to
Port int `yaml:"port" env:"BEYLA_K8S_CACHE_PORT"`
// MaxConnection is the maximum number of concurrent clients that the service can handle at the same time
MaxConnections int `yaml:"max_connections" env:"BEYLA_K8S_CACHE_MAX_CONNECTIONS"`
// ProfilePort is the port where the pprof server is going to listen to. 0 (default) means disabled
ProfilePort int `yaml:"profile_port" env:"BEYLA_K8S_CACHE_PROFILE_PORT"`
// InformerResyncPeriod is the time interval between complete resyncs of the informers
InformerResyncPeriod time.Duration `yaml:"informer_resync_period" env:"BEYLA_K8S_CACHE_INFORMER_RESYNC_PERIOD"`
}

var DefaultConfig = Config{
LogLevel: "info",
Port: 50055,
MaxConnections: 100,
InformerResyncPeriod: 30 * time.Minute,
ProfilePort: 0,
}

// LoadConfig overrides configuration in the following order (from less to most priority)
// 1 - Default configuration (DefaultConfig variable)
// 2 - Contents of the provided file reader (nillable)
// 3 - Environment variables
func LoadConfig(file io.Reader) (*Config, error) {
cfg := DefaultConfig
if file != nil {
cfgBuf, err := io.ReadAll(file)
if err != nil {
return nil, fmt.Errorf("reading YAML configuration: %w", err)
}
if err := yaml.Unmarshal(cfgBuf, &cfg); err != nil {
return nil, fmt.Errorf("parsing YAML configuration: %w", err)
}
}
if err := env.Parse(&cfg); err != nil {
return nil, fmt.Errorf("reading env vars: %w", err)
}
return &cfg, nil
}
18 changes: 13 additions & 5 deletions pkg/kubecache/envtest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"

"github.com/grafana/beyla/pkg/kubecache"
"github.com/grafana/beyla/pkg/kubecache/informer"
"github.com/grafana/beyla/pkg/kubecache/meta"
"github.com/grafana/beyla/pkg/kubecache/service"
Expand All @@ -34,6 +35,8 @@ var (

const timeout = 10 * time.Second

var freePort int

func TestMain(m *testing.M) {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{AddSource: true, Level: slog.LevelDebug})))
var cancel context.CancelFunc
Expand Down Expand Up @@ -64,6 +67,11 @@ func TestMain(m *testing.M) {
slog.Error("creating K8s manager client", "error", err)
os.Exit(1)
}
freePort, err = test.FreeTCPPort()
if err != nil {
slog.Error("getting a free TCP port", "error", err)
os.Exit(1)
}
go func() {
if err := k8sManager.Start(ctx); err != nil {
slog.Error("starting manager", "error", err)
Expand All @@ -78,12 +86,12 @@ func TestMain(m *testing.M) {
}()

// Create and start informers client cache
svc := service.InformersCache{
Port: 50055, // TODO: get a free port automatically to not collide with other tests
}
iConfig := kubecache.DefaultConfig
iConfig.Port = freePort
svc := service.InformersCache{Config: &iConfig}
go func() {
if err := svc.Run(ctx,
meta.WithResyncPeriod(30*time.Minute),
meta.WithResyncPeriod(iConfig.InformerResyncPeriod),
meta.WithKubeClient(theClient),
); err != nil {
slog.Error("running service", "error", err)
Expand All @@ -95,7 +103,7 @@ func TestMain(m *testing.M) {
}

func TestAPIs(t *testing.T) {
svcClient := serviceClient{ID: "first-pod", Address: "127.0.0.1:50055"}
svcClient := serviceClient{ID: "first-pod", Address: fmt.Sprintf("127.0.0.1:%d", freePort)}
// client
require.Eventually(t, func() bool {
return svcClient.Start(ctx) == nil
Expand Down
12 changes: 8 additions & 4 deletions pkg/kubecache/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/peer"

"github.com/grafana/beyla/pkg/kubecache"
"github.com/grafana/beyla/pkg/kubecache/informer"
"github.com/grafana/beyla/pkg/kubecache/meta"
)
Expand All @@ -19,7 +20,7 @@ import (
type InformersCache struct {
informer.UnimplementedEventStreamServiceServer

Port int
Config *kubecache.Config

started atomic.Bool
informers *meta.Informers
Expand All @@ -32,7 +33,7 @@ func (ic *InformersCache) Run(ctx context.Context, opts ...meta.InformerOption)
}
ic.log = slog.With("component", "server.InformersCache")

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", ic.Port))
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", ic.Config.Port))
if err != nil {
return fmt.Errorf("starting TCP connection: %w", err)
}
Expand All @@ -42,10 +43,13 @@ func (ic *InformersCache) Run(ctx context.Context, opts ...meta.InformerOption)
return fmt.Errorf("initializing informers: %w", err)
}

s := grpc.NewServer()
s := grpc.NewServer(
// TODO: configure other aspects (e.g. secure connections)
grpc.MaxConcurrentStreams(uint32(ic.Config.MaxConnections)),
)
informer.RegisterEventStreamServiceServer(s, ic)

ic.log.Info("server listening", "port", ic.Port)
ic.log.Info("server listening", "port", ic.Config.Port)

errs := make(chan error, 1)
go func() {
Expand Down

0 comments on commit bce5cf4

Please sign in to comment.