Skip to content

Commit

Permalink
[remote-storage] Add healthcheck to grpc server (#5461)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Remote storage server is missing a gRPC health check, the current use
case is for testing environment to check if the gRPC server is ready
before run the tests.

## Description of the changes
- Add "google.golang.org/grpc/health/grpc_health_v1" to remote-storage
gRPC server.
- Ping health check for remote storage server before proceeding with
tests
- Add deduping of spans by ID when retrieved from storage (when storage
is not idempotent)
- Make `testzap` loggers include source file

## How was this change tested?
- Tested with #5459 PR.

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: James Ryans <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
james-ryans and yurishkuro authored May 19, 2024
1 parent d4134d5 commit 3dbd02b
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 16 deletions.
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (storageHost) GetExporters() map[component.DataType]map[component.ID]compon
func TestServerDependencies(t *testing.T) {
expectedDependencies := []component.ID{jaegerstorage.ID}
telemetrySettings := component.TelemetrySettings{
Logger: zaptest.NewLogger(t),
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
}

server := newServer(createDefaultConfig().(*Config), telemetrySettings)
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestServerStart(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
telemetrySettings := component.TelemetrySettings{
Logger: zaptest.NewLogger(t),
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
}
server := newServer(tt.config, telemetrySettings)
err := server.Start(context.Background(), host)
Expand Down
8 changes: 4 additions & 4 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type E2EStorageIntegration struct {
// it also initialize the SpanWriter and SpanReader below.
// This function should be called before any of the tests start.
func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
configFile := createStorageCleanerConfig(t, s.ConfigFile, storage)
t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)

Expand Down Expand Up @@ -98,14 +98,14 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
// A Github Actions special annotation to create a foldable section
// in the Github runner output.
// https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#grouping-log-lines
fmt.Println("::group::Jaeger-v2 binary logs")
fmt.Println("::group::🚧 🚧 🚧 Jaeger-v2 binary logs")
outLogs, err := os.ReadFile(outFile.Name())
require.NoError(t, err)
fmt.Printf("Jaeger-v2 output logs:\n%s", outLogs)
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs)

errLogs, err := os.ReadFile(errFile.Name())
require.NoError(t, err)
fmt.Printf("Jaeger-v2 error logs:\n%s", errLogs)
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs)
// End of Github Actions foldable section annotation.
fmt.Println("::endgroup::")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/span_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *spanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*mode
for i := range received.Spans {
spans = append(spans, &received.Spans[i])
}
r.logger.Info(fmt.Sprintf("GetTrace received %d spans (total %d)", len(received.Spans), len(spans)))
// r.logger.Info(fmt.Sprintf("GetTrace received %d spans (total %d)", len(received.Spans), len(spans)))
}
r.logger.Info(fmt.Sprintf("GetTraces received a total of %d spans", len(spans)))

Expand Down
4 changes: 3 additions & 1 deletion cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
Expand Down Expand Up @@ -115,8 +116,9 @@ func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHa
}

server := grpc.NewServer(grpcOpts...)
healthServer := health.NewServer()
reflection.Register(server)
handler.Register(server)
handler.Register(server, healthServer)

return server, nil
}
Expand Down
14 changes: 13 additions & 1 deletion plugin/storage/grpc/shared/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -84,14 +86,24 @@ func NewGRPCHandlerWithPlugins(
}

// Register registers the server as gRPC methods handler.
func (s *GRPCHandler) Register(ss *grpc.Server) error {
func (s *GRPCHandler) Register(ss *grpc.Server, hs *health.Server) error {
storage_v1.RegisterSpanReaderPluginServer(ss, s)
storage_v1.RegisterSpanWriterPluginServer(ss, s)
storage_v1.RegisterArchiveSpanReaderPluginServer(ss, s)
storage_v1.RegisterArchiveSpanWriterPluginServer(ss, s)
storage_v1.RegisterPluginCapabilitiesServer(ss, s)
storage_v1.RegisterDependenciesReaderPluginServer(ss, s)
storage_v1.RegisterStreamingSpanWriterPluginServer(ss, s)

hs.SetServingStatus("jaeger.storage.v1.SpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.SpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.ArchiveSpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.ArchiveSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.PluginCapabilities", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.DependenciesReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.StreamingSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(ss, hs)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
s.factory = badger.NewFactory()
s.factory.Options.Primary.Ephemeral = false

logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
err := s.factory.Initialize(metrics.NullFactory, logger)
require.NoError(t, err)
t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *CassandraStorageIntegration) cleanUp(t *testing.T) {
}

func (s *CassandraStorageIntegration) initializeCassandraFactory(t *testing.T, flags []string) *cassandra.Factory {
logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
f := cassandra.NewFactory()
v, command := config.Viperize(f.AddFlags)
require.NoError(t, command.ParseFlags(flags))
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *ESStorageIntegration) esCleanUp(t *testing.T) {
}

func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory {
logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
f := es.NewFactory()
v, command := config.Viperize(f.AddFlags)
args := []string{
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/pkg/config"
Expand All @@ -34,7 +35,7 @@ type GRPCStorageIntegrationTestSuite struct {
}

func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
logger := zaptest.NewLogger(t)
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
s.remoteStorage = StartNewRemoteMemoryStorage(t)

f := grpc.NewFactory()
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), expectedTraceID)
return err == nil && len(actual.Spans) == len(expected.Spans)
return err == nil && len(actual.Spans) >= len(expected.Spans)
})
if !assert.True(t, found) {
CompareTraces(t, expected, actual)
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type KafkaIntegrationTestSuite struct {
}

func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) {
logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
const encoding = "json"
const groupID = "kafka-integration-test"
const clientID = "kafka-integration-test"
Expand Down
28 changes: 27 additions & 1 deletion plugin/storage/integration/remote_memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
package integration

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/jaegertracing/jaeger/cmd/remote-storage/app"
"github.com/jaegertracing/jaeger/pkg/config"
Expand All @@ -25,7 +31,7 @@ type RemoteMemoryStorage struct {
}

func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage {
logger := zaptest.NewLogger(t)
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
opts := &app.Options{
GRPCHostPort: ports.PortToHostPort(ports.RemoteStorageGRPC),
Tenancy: tenancy.Options{
Expand All @@ -45,6 +51,26 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage {
require.NoError(t, err)
require.NoError(t, server.Start())

conn, err := grpc.NewClient(
opts.GRPCHostPort,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
defer conn.Close()
healthClient := grpc_health_v1.NewHealthClient(conn)
require.Eventually(t, func() bool {
req := &grpc_health_v1.HealthCheckRequest{}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
resp, err := healthClient.Check(ctx, req)
if err != nil {
t.Logf("remote storage server is not ready: err=%v", err)
return false
}
t.Logf("remote storage server status: %v", resp.Status)
return resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING
}, 30*time.Second, time.Second, "failed to ensure remote storage server is ready")

return &RemoteMemoryStorage{
server: server,
storageFactory: storageFactory,
Expand Down
22 changes: 22 additions & 0 deletions plugin/storage/integration/trace_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ func CompareSliceOfTraces(t *testing.T, expected []*model.Trace, actual []*model
}
}

// trace.Spans may contain spans with the same SpanID. Remove duplicates
// and keep the first one. Use a map to keep track of the spans we've seen.
func dedupeSpans(trace *model.Trace) {
seen := make(map[model.SpanID]bool)
var newSpans []*model.Span
for _, span := range trace.Spans {
if !seen[span.SpanID] {
seen[span.SpanID] = true
newSpans = append(newSpans, span)
}
}
trace.Spans = newSpans
}

// CompareTraces compares two traces
func CompareTraces(t *testing.T, expected *model.Trace, actual *model.Trace) {
if expected.Spans == nil {
Expand All @@ -55,6 +69,14 @@ func CompareTraces(t *testing.T, expected *model.Trace, actual *model.Trace) {
}
require.NotNil(t, actual)
require.NotNil(t, actual.Spans)

// some storage implementation may retry writing of spans and end up with duplicates.
countBefore := len(actual.Spans)
dedupeSpans(actual)
if countAfter := len(actual.Spans); countAfter != countBefore {
t.Logf("Removed spans with duplicate span IDs; before=%d, after=%d", countBefore, countAfter)
}

model.SortTrace(expected)
model.SortTrace(actual)
checkSize(t, expected, actual)
Expand Down
30 changes: 30 additions & 0 deletions plugin/storage/integration/trace_compare_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/model"
)

func TestDedupeSpans(t *testing.T) {
trace := &model.Trace{
Spans: []*model.Span{
{
SpanID: 1,
},
{
SpanID: 1,
},
{
SpanID: 2,
},
},
}
dedupeSpans(trace)
assert.Len(t, trace.Spans, 2)
}

0 comments on commit 3dbd02b

Please sign in to comment.