Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate with existing observability systems #44

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions internal/impl/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/http"
"os"
"path/filepath"
"sync"

"github.com/ServiceWeaver/weaver-kube/internal/proto"
Expand All @@ -35,6 +36,7 @@ import (
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -47,6 +49,9 @@ import (
// [1] https://prometheus.io
const prometheusEndpoint = "/metrics"

// The directory where "weaver kube" stores data.
var logDir = filepath.Join(runtime.LogsDir(), "kube")

// babysitter starts and manages a weavelet inside the Pod.
type babysitter struct {
ctx context.Context
Expand All @@ -55,6 +60,8 @@ type babysitter struct {
exportTraces func(spans *protos.TraceSpans) error
clientset *kubernetes.Clientset

logger *slog.Logger

// printer pretty prints log entries.
printer *logging.PrettyPrinter

Expand Down Expand Up @@ -90,23 +97,39 @@ func RunBabysitter(ctx context.Context) error {
return err
}

// Create the logger.
fs, err := logging.NewFileStore(logDir)
if err != nil {
return fmt.Errorf("cannot create log storage: %w", err)
}
logSaver := fs.Add
logger := slog.New(&logging.LogHandler{
Opts: logging.Options{
App: cfg.Deployment.App.Name,
Component: "deployer",
Weavelet: uuid.NewString(),
Attrs: []string{"serviceweaver/system", ""},
},
Write: logSaver,
})

// Create the trace exporter.
isTraceExporterServiceRunning := cfg.TraceExporterService != ""
shouldExportTraces := cfg.TraceServiceUrl != ""
var traceExporter *jaeger.Exporter
if isTraceExporterServiceRunning {
if shouldExportTraces {
// Export traces iff there is a tracing service running that is able to receive
// these traces.
traceExporter, err =
jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(cfg.TraceExporterService)))
jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(cfg.TraceServiceUrl)))
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to create trace exporter: %v\n", err)
logger.Error("Unable to create trace exporter", "err", err)
return err
}
}
defer traceExporter.Shutdown(ctx) //nolint:errcheck // response write error

exportTraces := func(spans *protos.TraceSpans) error {
if !isTraceExporterServiceRunning {
if !shouldExportTraces {
return nil
}

Expand Down Expand Up @@ -134,6 +157,7 @@ func RunBabysitter(ctx context.Context) error {
envelope: e,
exportTraces: exportTraces,
clientset: clientset,
logger: logger,
printer: logging.NewPrettyPrinter(false /*colors disabled*/),
watching: map[string]struct{}{},
}
Expand All @@ -148,9 +172,9 @@ func RunBabysitter(ctx context.Context) error {
if err != nil {
return fmt.Errorf("error getting local hostname: %w", err)
}
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, metricsPort))
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, defaultMetricsPort))
if err != nil {
return fmt.Errorf("unable to listen on port %d: %w", metricsPort, err)
return fmt.Errorf("unable to listen on port %d: %w", defaultMetricsPort, err)
}
mux := http.NewServeMux()
mux.HandleFunc(prometheusEndpoint, func(w http.ResponseWriter, r *http.Request) {
Expand Down
58 changes: 30 additions & 28 deletions internal/impl/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,25 @@ type KubeConfig struct {
// specified, the deployer will launch corresponding services for exporting logs,
// metrics and traces automatically.
//
// We support the following observability services:
// prometheus_service - to export metrics to Prometheus [1]
// jaeger_service - to export traces to Jaeger [2]
// loki_service - to export logs to Grafana Loki [3]
// grafana_service - to visualize/manipulate observability information [4]
// The key must be one of the following strings:
// "prometheus_service" - to export metrics to Prometheus [1]
// "jaeger_service" - to export traces to Jaeger [2]
// "loki_service" - to export logs to Grafana Loki [3]
// "grafana_service" - to visualize/manipulate observability information [4]
//
// Possible values for each service:
// 1) do not specify a value at all; leave it empty
// this is the default value; kube deployer will automatically create the
// this is the default behavior; kube deployer will automatically create the
// observability service for you.
//
// 2) "none"
// kube deployer will not export the corresponding observability information to
// any service. E.g., prometheus_service = "none", it means that the user will
// not be able to see any metrics at all. This can be useful for testing or
// any service. E.g., prometheus_service = "none" means that the user will not
// be able to see any metrics at all. This can be useful for testing or
// benchmarking the performance of your application.
//
// 3) "your_observability_service_name"
// if you already have a running service to collect metrics, traces or logs,
// if you already have a running service to collect metrics, traces or logs,
// then you can simply specify the service name, and your application will
// automatically export the corresponding information to your service. E.g.,
// jaeger_service = "jaeger-all-in-one" will enable your running Jaeger
Expand Down Expand Up @@ -323,12 +323,12 @@ func (r *replicaSetInfo) buildContainer() (corev1.Container, error) {
// docker image.
r.dep.App.Binary = fmt.Sprintf("/weaver/%s", filepath.Base(r.dep.App.Binary))
kubeCfgStr, err := proto.ToEnv(&ReplicaSetConfig{
Namespace: r.namespace,
Deployment: r.dep,
ReplicaSet: r.name,
ComponentsToStart: r.components,
InternalPort: int32(r.internalPort),
TraceExporterService: r.traceServiceURL,
Namespace: r.namespace,
Deployment: r.dep,
ReplicaSet: r.name,
ComponentsToStart: r.components,
InternalPort: int32(r.internalPort),
TraceServiceUrl: r.traceServiceURL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/Url/URL/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TraceServiceUrl is the autogenerated proto field trace_service_url. Unless we define it as trace_serviceURL, I think protoc will convert URL to Url.

})
if err != nil {
return corev1.Container{}, err
Expand Down Expand Up @@ -360,7 +360,7 @@ func (r *replicaSetInfo) buildContainer() (corev1.Container, error) {

// Expose the metrics port from the container, so it can be discoverable for
// scraping by Prometheus.
Ports: []corev1.ContainerPort{{ContainerPort: metricsPort}},
Ports: []corev1.ContainerPort{{ContainerPort: defaultMetricsPort}},

// Enabling TTY and Stdin allows the user to run a shell inside the container,
// for debugging.
Expand Down Expand Up @@ -435,9 +435,9 @@ func GenerateKubeDeployment(image string, dep *protos.Deployment, cfg *KubeConfi
generated = append(generated, content...)

// Generate deployment info needed to get insights into the application.
content, err = generateObservabilityInfo(dep, cfg)
content, err = generateObservabilityConfigs(dep, cfg)
if err != nil {
return fmt.Errorf("unable to create observability information: %w", err)
return fmt.Errorf("unable to create configuration information: %w", err)
}
generated = append(generated, content...)

Expand Down Expand Up @@ -599,15 +599,17 @@ func buildReplicaSetSpecs(dep *protos.Deployment, image string, cfg *KubeConfig)
}

// Compute the URL of the export traces service.
var exportTracesURLInfo string
val := cfg.Observability[exportTracesURL]
exportTracesURLIsSet := val != "" && val != "none"
if exportTracesURLIsSet {
exportTracesURLInfo = fmt.Sprintf("http://%s:%d/api/traces", val, jaegerCollectorPort)
} else {
if val != "none" {
exportTracesURLInfo = fmt.Sprintf("http://%s:%d/api/traces", name{dep.App.Name, jaegerAppName}.DNSLabel(), jaegerCollectorPort)
}
var traceServiceURL string
jservice := cfg.Observability[tracesConfigKey]
switch {
case jservice == auto:
// Point to the service launched by the kube deployer.
traceServiceURL = fmt.Sprintf("http://%s:%d/api/traces", name{dep.App.Name, jaegerAppName}.DNSLabel(), defaultJaegerCollectorPort)
case jservice != disabled:
// Point to the service launched by the user.
traceServiceURL = fmt.Sprintf("http://%s:%d/api/traces", jservice, defaultJaegerCollectorPort)
default:
// No trace to export.
}

// Build the replica sets.
Expand All @@ -621,7 +623,7 @@ func buildReplicaSetSpecs(dep *protos.Deployment, image string, cfg *KubeConfig)
dep: dep,
components: map[string]*ReplicaSetConfig_Listeners{},
internalPort: internalPort,
traceServiceURL: exportTracesURLInfo,
traceServiceURL: traceServiceURL,
}
}

Expand Down
69 changes: 34 additions & 35 deletions internal/impl/kube.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/impl/kube.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ message ReplicaSetConfig {
map<string, Listeners> components_to_start = 3;
int32 internal_port = 4;
string namespace = 5;
string trace_exporter_service = 6;
string trace_service_url = 6;
}
Loading
Loading