Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put babysitter configuration in ConfigMap. #82

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions cmd/weaver-kube/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
"path/filepath"

"github.com/ServiceWeaver/weaver-kube/internal/impl"
"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/codegen"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/ServiceWeaver/weaver/runtime/tool"
"google.golang.org/protobuf/encoding/prototext"
)

var babysitterFlags = flag.NewFlagSet("babysitter", flag.ContinueOnError)
Expand All @@ -30,15 +37,63 @@ var babysitterCmd = tool.Command{
Flags: babysitterFlags,
Description: "The weaver kubernetes babysitter",
Help: `Usage:
weaver kube babysitter
weaver kube babysitter <weaver config file> <babysitter config file> <component>...

Flags:
-h, --help Print this help message.`,
Fn: func(ctx context.Context, args []string) error {
if len(args) != 0 {
return fmt.Errorf("usage: weaver kube babysitter")
// Parse command line arguments.
if len(args) < 3 {
return fmt.Errorf("want >= 3 arguments, got %d", len(args))
}
return impl.RunBabysitter(ctx)
app, err := parseWeaverConfig(args[0])
if err != nil {
return err
}
config, err := parseBabysitterConfig(args[1])
if err != nil {
return err
}
components := args[2:]
mwhittaker marked this conversation as resolved.
Show resolved Hide resolved

// Create the babysitter.
b, err := impl.NewBabysitter(ctx, app, config, components)
if err != nil {
return err
}

// Run the babysitter.
return b.Serve()
},
Hidden: true,
}

// parseWeaverConfig parses a weaver.toml config file.
func parseWeaverConfig(filename string) (*protos.AppConfig, error) {
contents, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("read config file %q: %w", filename, err)
}
app, err := runtime.ParseConfig(filename, string(contents), codegen.ComponentConfigValidator)
if err != nil {
return nil, fmt.Errorf("parse config file %q: %w", filename, err)
}
// Rewrite the app config to point to the binary in the container.
app.Binary = fmt.Sprintf("/weaver/%s", filepath.Base(app.Binary))
if _, err := os.Stat(app.Binary); errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("binary %q doesn't exist", app.Binary)
}
return app, nil
}

// parseBabysitterConfig parses a config.textpb config file containing a
// BabysitterConfig.
func parseBabysitterConfig(filename string) (*impl.BabysitterConfig, error) {
contents, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("read config file %q: %w", filename, err)
}
var config impl.BabysitterConfig
err = prototext.Unmarshal(contents, &config)
return &config, err
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.19.0
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
golang.org/x/sync v0.3.0
google.golang.org/protobuf v1.31.0
k8s.io/api v0.27.4
k8s.io/apimachinery v0.27.4
Expand Down Expand Up @@ -56,7 +57,6 @@ require (
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
Expand Down
171 changes: 71 additions & 100 deletions internal/impl/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"net/http"
"os"
"path/filepath"
"slices"
"sync"

"github.com/ServiceWeaver/weaver-kube/internal/proto"
"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/envelope"
"github.com/ServiceWeaver/weaver/runtime/logging"
Expand All @@ -36,6 +36,7 @@ import (
"github.com/google/uuid"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -53,62 +54,43 @@ var logDir = filepath.Join(runtime.LogsDir(), "kube")

// babysitter starts and manages a weavelet inside the Pod.
type babysitter struct {
ctx context.Context
cfg *ReplicaSetConfig
envelope *envelope.Envelope
exportTraces func(spans *protos.TraceSpans) error
clientset *kubernetes.Clientset

logger *slog.Logger

// printer pretty prints log entries.
printer *logging.PrettyPrinter
ctx context.Context
cfg *BabysitterConfig
app *protos.AppConfig
envelope *envelope.Envelope
logger *slog.Logger
traceExporter *jaeger.Exporter
clientset *kubernetes.Clientset
printer *logging.PrettyPrinter

mu sync.Mutex
watching map[string]struct{} // components being watched
}

func RunBabysitter(ctx context.Context) error {
// Retrieve the deployment information.
val, ok := os.LookupEnv(kubeConfigEnvKey)
if !ok {
return fmt.Errorf("environment variable %q not set", kubeConfigEnvKey)
}
if val == "" {
return fmt.Errorf("empty value for environment variable %q", kubeConfigEnvKey)
}
cfg := &ReplicaSetConfig{}
if err := proto.FromEnv(val, cfg); err != nil {
return err
}
host, err := os.Hostname()
if err != nil {
return fmt.Errorf("error getting local hostname: %w", err)
}

func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *BabysitterConfig, components []string) (*babysitter, error) {
// Create the envelope.
wlet := &protos.EnvelopeInfo{
App: cfg.App.Name,
DeploymentId: cfg.DepId,
App: app.Name,
DeploymentId: config.DeploymentId,
Id: uuid.New().String(),
Sections: cfg.App.Sections,
RunMain: cfg.Name == runtime.Main,
InternalAddress: fmt.Sprintf("%s:%d", host, internalPort),
Sections: app.Sections,
RunMain: slices.Contains(components, runtime.Main),
InternalAddress: fmt.Sprintf(":%d", internalPort),
}
e, err := envelope.NewEnvelope(ctx, wlet, cfg.App)
e, err := envelope.NewEnvelope(ctx, wlet, app)
if err != nil {
return err
return nil, fmt.Errorf("NewBabysitter: create envelope: %w", err)
}

// Create the logger.
fs, err := logging.NewFileStore(logDir)
if err != nil {
return fmt.Errorf("cannot create log storage: %w", err)
return nil, fmt.Errorf("NewBabysitter: create logger: %w", err)
}
logSaver := fs.Add
logger := slog.New(&logging.LogHandler{
Opts: logging.Options{
App: cfg.App.Name,
App: app.Name,
Component: "deployer",
Weavelet: uuid.NewString(),
Attrs: []string{"serviceweaver/system", ""},
Expand All @@ -117,67 +99,53 @@ func RunBabysitter(ctx context.Context) error {
})

// Create the trace exporter.
shouldExportTraces := cfg.TraceServiceUrl != ""
var traceExporter *jaeger.Exporter
if shouldExportTraces {
// Export traces iff there is a tracing service running that is able to receive
// these traces.
traceExporter, err =
jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(cfg.TraceServiceUrl)))
if config.TraceServiceUrl != "" {
// Export traces if there is a tracing service running that is able to
// receive these traces.
endpoint := jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(config.TraceServiceUrl))
traceExporter, err = jaeger.New(endpoint)
if err != nil {
logger.Error("Unable to create trace exporter", "err", err)
return err
}
}
defer traceExporter.Shutdown(ctx) //nolint:errcheck // response write error

exportTraces := func(spans *protos.TraceSpans) error {
if !shouldExportTraces {
return nil
}

var spansToExport []trace.ReadOnlySpan
for _, span := range spans.Span {
spansToExport = append(spansToExport, &traces.ReadSpan{Span: span})
return nil, fmt.Errorf("NewBabysitter: create trace exporter: %w", err)
}
return traceExporter.ExportSpans(ctx, spansToExport)
}

// Create a Kubernetes config.
config, err := rest.InClusterConfig()
kubeConfig, err := rest.InClusterConfig()
if err != nil {
return err
return nil, fmt.Errorf("NewBabysitter: get kube config: %w", err)
}
clientset, err := kubernetes.NewForConfig(config)
clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return err
return nil, fmt.Errorf("NewBabysitter: get kube client set: %w", err)
}

// Create the babysitter.
b := &babysitter{
ctx: ctx,
cfg: cfg,
envelope: e,
exportTraces: exportTraces,
clientset: clientset,
logger: logger,
printer: logging.NewPrettyPrinter(false /*colors disabled*/),
watching: map[string]struct{}{},
ctx: ctx,
cfg: config,
app: app,
envelope: e,
logger: logger,
traceExporter: traceExporter,
clientset: clientset,
printer: logging.NewPrettyPrinter(false /*colors disabled*/),
watching: map[string]struct{}{},
}

// Inform the weavelet of the components it should host.
components := make([]string, len(cfg.Components))
for i, c := range cfg.Components {
components[i] = c.Name
}
if err := b.envelope.UpdateComponents(components); err != nil {
return err
return nil, fmt.Errorf("NewBabysitter: update components: %w", err)
}

// Run a http server that exports the metrics.
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, defaultMetricsPort))
return b, nil
}

func (b *babysitter) Serve() error {
// Run an HTTP server that exports metrics.
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", defaultMetricsPort))
if err != nil {
return fmt.Errorf("unable to listen on port %d: %w", defaultMetricsPort, err)
return fmt.Errorf("Babysitter.Serve: listen on port %d: %w", defaultMetricsPort, err)
}
mux := http.NewServeMux()
mux.HandleFunc(prometheusEndpoint, func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -187,14 +155,19 @@ func RunBabysitter(ctx context.Context) error {
prometheus.TranslateMetricsToPrometheusTextFormat(&b, metrics, r.Host, prometheusEndpoint)
w.Write(b.Bytes()) //nolint:errcheck // response write error
})
go func() {
if err := serveHTTP(ctx, lis, mux); err != nil {
fmt.Fprintf(os.Stderr, "Unable to start HTTP server: %v\n", err)
}
}()
var group errgroup.Group
group.Go(func() error {
return serveHTTP(b.ctx, lis, mux)
})

// Run the envelope and handle messages from the weavelet.
return e.Serve(b)
group.Go(func() error {
return b.envelope.Serve(b)
})

err = group.Wait()
b.traceExporter.Shutdown(b.ctx) //nolint:errcheck // response write error
return err
}

// ActivateComponent implements the envelope.EnvelopeHandler interface.
Expand All @@ -221,8 +194,8 @@ func (b *babysitter) watchPods(ctx context.Context, component string) error {
b.mu.Unlock()

// Watch the pods running the requested component.
rs := replicaSetName(component, b.cfg.App)
name := deploymentName(b.cfg.App.Name, rs, b.cfg.DepId)
rs := replicaSetName(component, b.app)
name := deploymentName(b.app.Name, rs, b.cfg.DeploymentId)
opts := metav1.ListOptions{LabelSelector: fmt.Sprintf("serviceweaver/name=%s", name)}
watcher, err := b.clientset.CoreV1().Pods(b.cfg.Namespace).Watch(ctx, opts)
if err != nil {
Expand Down Expand Up @@ -281,17 +254,11 @@ func (b *babysitter) watchPods(ctx context.Context, component string) error {

// GetListenerAddress implements the envelope.EnvelopeHandler interface.
func (b *babysitter) GetListenerAddress(_ context.Context, request *protos.GetListenerAddressRequest) (*protos.GetListenerAddressReply, error) {
// The external listeners are prestarted, hence we return the address of
// the Kubernetes Service.
for _, components := range b.cfg.Components {
for _, lis := range components.Listeners {
if lis.Name == request.Name {
addr := fmt.Sprintf(":%d", lis.ExternalPort)
return &protos.GetListenerAddressReply{Address: addr}, nil
}
}
port, ok := b.cfg.Listeners[request.Name]
if !ok {
return nil, fmt.Errorf("listener %q not found", request.Name)
}
return &protos.GetListenerAddressReply{}, nil
return &protos.GetListenerAddressReply{Address: fmt.Sprintf(":%d", port)}, nil
}

// ExportListener implements the envelope.EnvelopeHandler interface.
Expand All @@ -306,11 +273,15 @@ func (b *babysitter) HandleLogEntry(_ context.Context, entry *protos.LogEntry) e
}

// HandleTraceSpans implements the envelope.EnvelopeHandler interface.
func (b *babysitter) HandleTraceSpans(_ context.Context, spans *protos.TraceSpans) error {
if b.exportTraces == nil {
func (b *babysitter) HandleTraceSpans(ctx context.Context, spans *protos.TraceSpans) error {
if b.traceExporter == nil {
return nil
}
return b.exportTraces(spans)
var spansToExport []trace.ReadOnlySpan
for _, span := range spans.Span {
spansToExport = append(spansToExport, &traces.ReadSpan{Span: span})
}
return b.traceExporter.ExportSpans(ctx, spansToExport)
}

// GetSelfCertificate implements the envelope.EnvelopeHandler interface.
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func Deploy(ctx context.Context, configFilename string) error {
}

// Generate the kube deployment information.
return generateYAMLs(image, app, depId, config)
return generateYAMLs(configFilename, app, config, depId, image)
}

// checkVersionCompatibility checks that the `weaver kube` binary is compatible
Expand Down
Loading
Loading