diff --git a/cmd/weaver-kube/main.go b/cmd/weaver-kube/main.go index a15b1c2..a08fe1a 100644 --- a/cmd/weaver-kube/main.go +++ b/cmd/weaver-kube/main.go @@ -14,14 +14,12 @@ package main -import "github.com/ServiceWeaver/weaver/runtime/tool" +import ( + "github.com/ServiceWeaver/weaver-kube/internal/impl" + "github.com/ServiceWeaver/weaver-kube/internal/tool" + swtool "github.com/ServiceWeaver/weaver/runtime/tool" +) func main() { - tool.Run("weaver kube", map[string]*tool.Command{ - "version": &versionCmd, - "deploy": &deployCmd, - - // Hidden commands. - "babysitter": &babysitterCmd, - }) + swtool.Run("weaver kube", tool.Commands(impl.BabysitterOptions{})) } diff --git a/go.mod b/go.mod index 060ada0..fd2e2f8 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/ServiceWeaver/weaver v0.22.0 github.com/google/uuid v1.3.1 go.opentelemetry.io/otel v1.19.0 + go.opentelemetry.io/otel/sdk v1.16.0 go.opentelemetry.io/otel/trace v1.19.0 golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 golang.org/x/sync v0.3.0 @@ -52,7 +53,6 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect - go.opentelemetry.io/otel/sdk v1.16.0 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect diff --git a/internal/impl/babysitter.go b/internal/impl/babysitter.go index 6a1ddf6..8a881eb 100644 --- a/internal/impl/babysitter.go +++ b/internal/impl/babysitter.go @@ -18,13 +18,12 @@ import ( "bytes" "context" "fmt" - "log/slog" "net" "net/http" "os" - "path/filepath" "slices" "sync" + "time" "github.com/ServiceWeaver/weaver/runtime" "github.com/ServiceWeaver/weaver/runtime/envelope" @@ -32,7 +31,9 @@ import ( "github.com/ServiceWeaver/weaver/runtime/metrics" "github.com/ServiceWeaver/weaver/runtime/prometheus" "github.com/ServiceWeaver/weaver/runtime/protos" + "github.com/ServiceWeaver/weaver/runtime/traces" "github.com/google/uuid" + "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/sync/errgroup" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,16 +47,20 @@ import ( // [1] https://prometheus.io const prometheusEndpoint = "/metrics" -// The directory where "weaver kube" stores data. -var logDir = filepath.Join(runtime.LogsDir(), "kube") +// BabysitterOptions configure a babysitter. See tool.Plugins for details. +type BabysitterOptions struct { + HandleLogEntry func(context.Context, *protos.LogEntry) error + HandleTraceSpans func(context.Context, []trace.ReadOnlySpan) error + HandleMetrics func(context.Context, []*metrics.MetricSnapshot) error +} // babysitter starts and manages a weavelet inside the Pod. type babysitter struct { ctx context.Context cfg *BabysitterConfig + opts BabysitterOptions app *protos.AppConfig envelope *envelope.Envelope - logger *slog.Logger clientset *kubernetes.Clientset printer *logging.PrettyPrinter @@ -63,7 +68,7 @@ type babysitter struct { watching map[string]struct{} // components being watched } -func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *BabysitterConfig, components []string) (*babysitter, error) { +func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *BabysitterConfig, components []string, opts BabysitterOptions) (*babysitter, error) { // Create the envelope. wlet := &protos.EnvelopeInfo{ App: app.Name, @@ -78,23 +83,7 @@ func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *Babysitte return nil, fmt.Errorf("NewBabysitter: create envelope: %w", err) } - // Create the logger. - fs, err := logging.NewFileStore(logDir) - if err != nil { - return nil, fmt.Errorf("NewBabysitter: create logger: %w", err) - } - logSaver := fs.Add - logger := slog.New(&logging.LogHandler{ - Opts: logging.Options{ - App: app.Name, - Component: "deployer", - Weavelet: uuid.NewString(), - Attrs: []string{"serviceweaver/system", ""}, - }, - Write: logSaver, - }) - - // Create a Kubernetes config. + // Create a Kubernetes client set. kubeConfig, err := rest.InClusterConfig() if err != nil { return nil, fmt.Errorf("NewBabysitter: get kube config: %w", err) @@ -108,14 +97,18 @@ func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *Babysitte b := &babysitter{ ctx: ctx, cfg: config, + opts: opts, app: app, envelope: e, - logger: logger, clientset: clientset, - printer: logging.NewPrettyPrinter(false /*colors disabled*/), watching: map[string]struct{}{}, } + // Create the pretty printer for logging, if there is no log handler. + if opts.HandleLogEntry == nil { + b.printer = logging.NewPrettyPrinter(false /*colors disabled*/) + } + // Inform the weavelet of the components it should host. if err := b.envelope.UpdateComponents(components); err != nil { return nil, fmt.Errorf("NewBabysitter: update components: %w", err) @@ -125,23 +118,42 @@ func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *Babysitte } func (b *babysitter) Serve() error { - // Run an HTTP server that exports metrics. - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) - if err != nil { - return fmt.Errorf("Babysitter.Serve: listen on port %d: %w", prometheusPort, err) + group, ctx := errgroup.WithContext(b.ctx) + + if b.opts.HandleMetrics == nil { + // Run an HTTP server that exports metrics. + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return fmt.Errorf("babysitter.Serve: listen on port %d: %w", prometheusPort, err) + } + mux := http.NewServeMux() + mux.HandleFunc(prometheusEndpoint, func(w http.ResponseWriter, r *http.Request) { + // Read the metrics. + metrics := b.readMetrics() + var b bytes.Buffer + prometheus.TranslateMetricsToPrometheusTextFormat(&b, metrics, r.Host, prometheusEndpoint) + w.Write(b.Bytes()) //nolint:errcheck // response write error + }) + group.Go(func() error { + return serveHTTP(ctx, lis, mux) + }) + } else { + // Periodically call b.opts.HandleMetrics with the set of metrics. + group.Go(func() error { + ticker := time.NewTimer(time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := b.opts.HandleMetrics(ctx, b.readMetrics()); err != nil { + return err + } + } + } + }) } - mux := http.NewServeMux() - mux.HandleFunc(prometheusEndpoint, func(w http.ResponseWriter, r *http.Request) { - // Read the metrics. - metrics := b.readMetrics() - var b bytes.Buffer - prometheus.TranslateMetricsToPrometheusTextFormat(&b, metrics, r.Host, prometheusEndpoint) - w.Write(b.Bytes()) //nolint:errcheck // response write error - }) - var group errgroup.Group - group.Go(func() error { - return serveHTTP(b.ctx, lis, mux) - }) // Run the envelope and handle messages from the weavelet. group.Go(func() error { @@ -248,15 +260,25 @@ func (b *babysitter) ExportListener(context.Context, *protos.ExportListenerReque } // HandleLogEntry implements the envelope.EnvelopeHandler interface. -func (b *babysitter) HandleLogEntry(_ context.Context, entry *protos.LogEntry) error { +func (b *babysitter) HandleLogEntry(ctx context.Context, entry *protos.LogEntry) error { + if b.opts.HandleLogEntry != nil { + return b.opts.HandleLogEntry(ctx, entry) + } fmt.Println(b.printer.Format(entry)) return nil } // HandleTraceSpans implements the envelope.EnvelopeHandler interface. func (b *babysitter) HandleTraceSpans(ctx context.Context, spans *protos.TraceSpans) error { - // TODO(mwhittaker): Implement with plugins. - return nil + if b.opts.HandleTraceSpans == nil { + return nil + } + + var spansToExport []trace.ReadOnlySpan + for _, span := range spans.Span { + spansToExport = append(spansToExport, &traces.ReadSpan{Span: span}) + } + return b.opts.HandleTraceSpans(ctx, spansToExport) } // GetSelfCertificate implements the envelope.EnvelopeHandler interface. diff --git a/cmd/weaver-kube/babysitter.go b/internal/tool/babysitter.go similarity index 72% rename from cmd/weaver-kube/babysitter.go rename to internal/tool/babysitter.go index e85d963..f0429e4 100644 --- a/cmd/weaver-kube/babysitter.go +++ b/internal/tool/babysitter.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package tool import ( "context" @@ -30,42 +30,42 @@ import ( "google.golang.org/protobuf/encoding/prototext" ) -var babysitterFlags = flag.NewFlagSet("babysitter", flag.ContinueOnError) - -var babysitterCmd = tool.Command{ - Name: "babysitter", - Flags: babysitterFlags, - Description: "The weaver kubernetes babysitter", - Help: `Usage: +func babysitterCmd(opts impl.BabysitterOptions) *tool.Command { + return &tool.Command{ + Name: "babysitter", + Flags: flag.NewFlagSet("babysitter", flag.ContinueOnError), + Description: "The weaver kubernetes babysitter", + Help: `Usage: weaver kube babysitter ... Flags: -h, --help Print this help message.`, - Fn: func(ctx context.Context, args []string) error { - // Parse command line arguments. - if len(args) < 3 { - return fmt.Errorf("want >= 3 arguments, got %d", len(args)) - } - app, err := parseWeaverConfig(args[0]) - if err != nil { - return err - } - config, err := parseBabysitterConfig(args[1]) - if err != nil { - return err - } - components := args[2:] + Fn: func(ctx context.Context, args []string) error { + // Parse command line arguments. + if len(args) < 3 { + return fmt.Errorf("want >= 3 arguments, got %d", len(args)) + } + app, err := parseWeaverConfig(args[0]) + if err != nil { + return err + } + config, err := parseBabysitterConfig(args[1]) + if err != nil { + return err + } + components := args[2:] - // Create the babysitter. - b, err := impl.NewBabysitter(ctx, app, config, components) - if err != nil { - return err - } + // Create the babysitter. + b, err := impl.NewBabysitter(ctx, app, config, components, opts) + if err != nil { + return err + } - // Run the babysitter. - return b.Serve() - }, - Hidden: true, + // Run the babysitter. + return b.Serve() + }, + Hidden: true, + } } // parseWeaverConfig parses a weaver.toml config file. diff --git a/cmd/weaver-kube/deploy.go b/internal/tool/deploy.go similarity index 98% rename from cmd/weaver-kube/deploy.go rename to internal/tool/deploy.go index 877ceae..85a89a0 100644 --- a/cmd/weaver-kube/deploy.go +++ b/internal/tool/deploy.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package tool import ( "context" @@ -94,7 +94,7 @@ Container Image Names: [1] https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ - e) Configure probes [1]. The kube deployer allows you to configure readiness + d) Configure probes [1]. The kube deployer allows you to configure readiness and liveness probes. For each probe, you can configure: - how often to perform the probe "period_secs" - how long to wait for a probe to respond before declaring a timeout "timeout_secs" diff --git a/internal/tool/tool.go b/internal/tool/tool.go new file mode 100644 index 0000000..0b1a7bf --- /dev/null +++ b/internal/tool/tool.go @@ -0,0 +1,30 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tool + +import ( + "github.com/ServiceWeaver/weaver-kube/internal/impl" + "github.com/ServiceWeaver/weaver/runtime/tool" +) + +func Commands(opts impl.BabysitterOptions) map[string]*tool.Command { + return map[string]*tool.Command{ + "version": &versionCmd, + "deploy": &deployCmd, + + // Hidden commands. + "babysitter": babysitterCmd(opts), + } +} diff --git a/cmd/weaver-kube/version.go b/internal/tool/version.go similarity index 98% rename from cmd/weaver-kube/version.go rename to internal/tool/version.go index a2874ad..8333dae 100644 --- a/cmd/weaver-kube/version.go +++ b/internal/tool/version.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package tool import ( "context" diff --git a/tool/tool.go b/tool/tool.go new file mode 100644 index 0000000..b779a31 --- /dev/null +++ b/tool/tool.go @@ -0,0 +1,47 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tool + +import ( + "context" + + "github.com/ServiceWeaver/weaver-kube/internal/impl" + "github.com/ServiceWeaver/weaver-kube/internal/tool" + "github.com/ServiceWeaver/weaver/runtime/metrics" + "github.com/ServiceWeaver/weaver/runtime/protos" + swtool "github.com/ServiceWeaver/weaver/runtime/tool" + "go.opentelemetry.io/otel/sdk/trace" +) + +// Plugins configure a Service Weaver application deployed on Kubernetes. +// +// Note that the handlers inside a Plugins struct are invoked on every replica +// of a Service Weaver application binary on locally produced logs, metrics, +// and traces. Handlers may be called concurrently from multiple goroutines. +type Plugins struct { + // HandleLogEntry handles log entries produced by a component logger. + HandleLogEntry func(context.Context, *protos.LogEntry) error + // HandleTraceSpans handles spans produced by inter-component communication. + HandleTraceSpans func(context.Context, []trace.ReadOnlySpan) error + // HandleMetrics is called periodically on a snapshot of all metric values. + HandleMetrics func(context.Context, []*metrics.MetricSnapshot) error +} + +// Run runs the "weaver-kube" binary. You can provide Run a set of Plugins to +// customize the behavior of "weaver-kube". The provided name is the name of +// the custom binary. +func Run(name string, plugins Plugins) { + swtool.Run(name, tool.Commands(impl.BabysitterOptions(plugins))) +}