Skip to content

Commit

Permalink
Added basic plugin system to customize telemetry.
Browse files Browse the repository at this point in the history
This PR introduces a very basic plugin system that allows developers to
customize how `weaver-kube` manages logs, metrics, and traces. For
example, consider the following `foo/main.go`:

```
// This is file foo/main.go.
package main

import ...

func main() {
    plugins := tool.Plugins{
        HandleLogEntry: func(context.Context, *protos.LogEntry) error {
            ...
        },
        HandleTraceSpans: func(context.Context, []trace.ReadOnlySpan) error {
            ...
        },
        HandleMetrics: func(context.Context, []*metrics.MetricSnapshot) error {
            ...
        },
    }
    tool.Run("foo", plugins)
}
```

We can build this file into an executable `foo` that behaves the same as
`weaver-kube`, but uses the provided plugins:

```shell
$ foo --help
Deploy and manage Service Weaver programs.

Usage:
  foo <command> ...

Available Commands:
  deploy       Deploy a Service Weaver app
  help         Print help for a sub-command
  version      Show foo version

Flags:
  -h, --help   Print this help message.

Use "foo help <command>" for more information about a command.
```

See https://github.com/mwhittaker/jeagar for a complete example.

In the long term, we'll replace this basic plugin mechanism with
something more general. This PR allows customers to onboard to Service
Weaver in the short term.
  • Loading branch information
mwhittaker committed Oct 25, 2023
1 parent 024f4ad commit dec2d7f
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 87 deletions.
14 changes: 6 additions & 8 deletions cmd/weaver-kube/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@

package main

import "github.com/ServiceWeaver/weaver/runtime/tool"
import (
"github.com/ServiceWeaver/weaver-kube/internal/impl"
"github.com/ServiceWeaver/weaver-kube/internal/tool"
swtool "github.com/ServiceWeaver/weaver/runtime/tool"
)

func main() {
tool.Run("weaver kube", map[string]*tool.Command{
"version": &versionCmd,
"deploy": &deployCmd,

// Hidden commands.
"babysitter": &babysitterCmd,
})
swtool.Run("weaver kube", tool.Commands(impl.BabysitterOptions{}))
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/ServiceWeaver/weaver v0.22.0
github.com/google/uuid v1.3.1
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.19.0
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
golang.org/x/sync v0.3.0
Expand Down Expand Up @@ -52,7 +53,6 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
Expand Down
110 changes: 66 additions & 44 deletions internal/impl/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@ import (
"bytes"
"context"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"path/filepath"
"slices"
"sync"
"time"

"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/envelope"
"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/metrics"
"github.com/ServiceWeaver/weaver/runtime/prometheus"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/ServiceWeaver/weaver/runtime/traces"
"github.com/google/uuid"
"go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -46,24 +47,28 @@ import (
// [1] https://prometheus.io
const prometheusEndpoint = "/metrics"

// The directory where "weaver kube" stores data.
var logDir = filepath.Join(runtime.LogsDir(), "kube")
// BabysitterOptions configure a babysitter. See tool.Plugins for details.
type BabysitterOptions struct {
HandleLogEntry func(context.Context, *protos.LogEntry) error
HandleTraceSpans func(context.Context, []trace.ReadOnlySpan) error
HandleMetrics func(context.Context, []*metrics.MetricSnapshot) error
}

// babysitter starts and manages a weavelet inside the Pod.
type babysitter struct {
ctx context.Context
cfg *BabysitterConfig
opts BabysitterOptions
app *protos.AppConfig
envelope *envelope.Envelope
logger *slog.Logger
clientset *kubernetes.Clientset
printer *logging.PrettyPrinter

mu sync.Mutex
watching map[string]struct{} // components being watched
}

func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *BabysitterConfig, components []string) (*babysitter, error) {
func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *BabysitterConfig, components []string, opts BabysitterOptions) (*babysitter, error) {
// Create the envelope.
wlet := &protos.EnvelopeInfo{
App: app.Name,
Expand All @@ -78,23 +83,7 @@ func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *Babysitte
return nil, fmt.Errorf("NewBabysitter: create envelope: %w", err)
}

// Create the logger.
fs, err := logging.NewFileStore(logDir)
if err != nil {
return nil, fmt.Errorf("NewBabysitter: create logger: %w", err)
}
logSaver := fs.Add
logger := slog.New(&logging.LogHandler{
Opts: logging.Options{
App: app.Name,
Component: "deployer",
Weavelet: uuid.NewString(),
Attrs: []string{"serviceweaver/system", ""},
},
Write: logSaver,
})

// Create a Kubernetes config.
// Create a Kubernetes client set.
kubeConfig, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("NewBabysitter: get kube config: %w", err)
Expand All @@ -108,14 +97,18 @@ func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *Babysitte
b := &babysitter{
ctx: ctx,
cfg: config,
opts: opts,
app: app,
envelope: e,
logger: logger,
clientset: clientset,
printer: logging.NewPrettyPrinter(false /*colors disabled*/),
watching: map[string]struct{}{},
}

// Create the pretty printer for logging, if there is no log handler.
if opts.HandleLogEntry == nil {
b.printer = logging.NewPrettyPrinter(false /*colors disabled*/)
}

// Inform the weavelet of the components it should host.
if err := b.envelope.UpdateComponents(components); err != nil {
return nil, fmt.Errorf("NewBabysitter: update components: %w", err)
Expand All @@ -125,23 +118,42 @@ func NewBabysitter(ctx context.Context, app *protos.AppConfig, config *Babysitte
}

func (b *babysitter) Serve() error {
// Run an HTTP server that exports metrics.
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort))
if err != nil {
return fmt.Errorf("Babysitter.Serve: listen on port %d: %w", prometheusPort, err)
group, ctx := errgroup.WithContext(b.ctx)

if b.opts.HandleMetrics == nil {
// Run an HTTP server that exports metrics.
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort))
if err != nil {
return fmt.Errorf("babysitter.Serve: listen on port %d: %w", prometheusPort, err)
}
mux := http.NewServeMux()
mux.HandleFunc(prometheusEndpoint, func(w http.ResponseWriter, r *http.Request) {
// Read the metrics.
metrics := b.readMetrics()
var b bytes.Buffer
prometheus.TranslateMetricsToPrometheusTextFormat(&b, metrics, r.Host, prometheusEndpoint)
w.Write(b.Bytes()) //nolint:errcheck // response write error
})
group.Go(func() error {
return serveHTTP(ctx, lis, mux)
})
} else {
// Periodically call b.opts.HandleMetrics with the set of metrics.
group.Go(func() error {
ticker := time.NewTimer(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := b.opts.HandleMetrics(ctx, b.readMetrics()); err != nil {
return err
}
}
}
})
}
mux := http.NewServeMux()
mux.HandleFunc(prometheusEndpoint, func(w http.ResponseWriter, r *http.Request) {
// Read the metrics.
metrics := b.readMetrics()
var b bytes.Buffer
prometheus.TranslateMetricsToPrometheusTextFormat(&b, metrics, r.Host, prometheusEndpoint)
w.Write(b.Bytes()) //nolint:errcheck // response write error
})
var group errgroup.Group
group.Go(func() error {
return serveHTTP(b.ctx, lis, mux)
})

// Run the envelope and handle messages from the weavelet.
group.Go(func() error {
Expand Down Expand Up @@ -248,15 +260,25 @@ func (b *babysitter) ExportListener(context.Context, *protos.ExportListenerReque
}

// HandleLogEntry implements the envelope.EnvelopeHandler interface.
func (b *babysitter) HandleLogEntry(_ context.Context, entry *protos.LogEntry) error {
func (b *babysitter) HandleLogEntry(ctx context.Context, entry *protos.LogEntry) error {
if b.opts.HandleLogEntry != nil {
return b.opts.HandleLogEntry(ctx, entry)
}
fmt.Println(b.printer.Format(entry))
return nil
}

// HandleTraceSpans implements the envelope.EnvelopeHandler interface.
func (b *babysitter) HandleTraceSpans(ctx context.Context, spans *protos.TraceSpans) error {
// TODO(mwhittaker): Implement with plugins.
return nil
if b.opts.HandleTraceSpans == nil {
return nil
}

var spansToExport []trace.ReadOnlySpan
for _, span := range spans.Span {
spansToExport = append(spansToExport, &traces.ReadSpan{Span: span})
}
return b.opts.HandleTraceSpans(ctx, spansToExport)
}

// GetSelfCertificate implements the envelope.EnvelopeHandler interface.
Expand Down
62 changes: 31 additions & 31 deletions cmd/weaver-kube/babysitter.go → internal/tool/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package main
package tool

import (
"context"
Expand All @@ -30,42 +30,42 @@ import (
"google.golang.org/protobuf/encoding/prototext"
)

var babysitterFlags = flag.NewFlagSet("babysitter", flag.ContinueOnError)

var babysitterCmd = tool.Command{
Name: "babysitter",
Flags: babysitterFlags,
Description: "The weaver kubernetes babysitter",
Help: `Usage:
func babysitterCmd(opts impl.BabysitterOptions) *tool.Command {
return &tool.Command{
Name: "babysitter",
Flags: flag.NewFlagSet("babysitter", flag.ContinueOnError),
Description: "The weaver kubernetes babysitter",
Help: `Usage:
weaver kube babysitter <weaver config file> <babysitter config file> <component>...
Flags:
-h, --help Print this help message.`,
Fn: func(ctx context.Context, args []string) error {
// Parse command line arguments.
if len(args) < 3 {
return fmt.Errorf("want >= 3 arguments, got %d", len(args))
}
app, err := parseWeaverConfig(args[0])
if err != nil {
return err
}
config, err := parseBabysitterConfig(args[1])
if err != nil {
return err
}
components := args[2:]
Fn: func(ctx context.Context, args []string) error {
// Parse command line arguments.
if len(args) < 3 {
return fmt.Errorf("want >= 3 arguments, got %d", len(args))
}
app, err := parseWeaverConfig(args[0])
if err != nil {
return err
}
config, err := parseBabysitterConfig(args[1])
if err != nil {
return err
}
components := args[2:]

// Create the babysitter.
b, err := impl.NewBabysitter(ctx, app, config, components)
if err != nil {
return err
}
// Create the babysitter.
b, err := impl.NewBabysitter(ctx, app, config, components, opts)
if err != nil {
return err
}

// Run the babysitter.
return b.Serve()
},
Hidden: true,
// Run the babysitter.
return b.Serve()
},
Hidden: true,
}
}

// parseWeaverConfig parses a weaver.toml config file.
Expand Down
4 changes: 2 additions & 2 deletions cmd/weaver-kube/deploy.go → internal/tool/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package main
package tool

import (
"context"
Expand Down Expand Up @@ -94,7 +94,7 @@ Container Image Names:
[1] https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
e) Configure probes [1]. The kube deployer allows you to configure readiness
d) Configure probes [1]. The kube deployer allows you to configure readiness
and liveness probes. For each probe, you can configure:
- how often to perform the probe "period_secs"
- how long to wait for a probe to respond before declaring a timeout "timeout_secs"
Expand Down
30 changes: 30 additions & 0 deletions internal/tool/tool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tool

import (
"github.com/ServiceWeaver/weaver-kube/internal/impl"
"github.com/ServiceWeaver/weaver/runtime/tool"
)

func Commands(opts impl.BabysitterOptions) map[string]*tool.Command {
return map[string]*tool.Command{
"version": &versionCmd,
"deploy": &deployCmd,

// Hidden commands.
"babysitter": babysitterCmd(opts),
}
}
2 changes: 1 addition & 1 deletion cmd/weaver-kube/version.go → internal/tool/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package main
package tool

import (
"context"
Expand Down
Loading

0 comments on commit dec2d7f

Please sign in to comment.