Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed Nov 29, 2024
1 parent c91716f commit 4c197fe
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 48 deletions.
4 changes: 2 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}

baseTelset := telemetry.Setting{
baseTelset := telemetry.Settings{
Logger: svc.Logger,
TracerProvider: tracer.OTEL,
Metrics: baseFactory,
Expand Down Expand Up @@ -219,7 +219,7 @@ func startQuery(
depReader dependencystore.Reader,
metricsQueryService querysvc.MetricsQueryService,
tm *tenancy.Manager,
telset telemetry.Setting,
telset telemetry.Settings,
) *queryApp.Server {
spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
Expand Down
50 changes: 25 additions & 25 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Server struct {
httpServer *httpServer
separatePorts bool
bgFinished sync.WaitGroup
telemetry.Setting
telset telemetry.Settings
}

// NewServer creates and initializes Server
Expand All @@ -58,7 +58,7 @@ func NewServer(
metricsQuerySvc querysvc.MetricsQueryService,
options *QueryOptions,
tm *tenancy.Manager,
telset telemetry.Setting,
telset telemetry.Settings,
) (*Server, error) {
_, httpPort, err := net.SplitHostPort(options.HTTP.Endpoint)
if err != nil {
Expand Down Expand Up @@ -90,15 +90,15 @@ func NewServer(
grpcServer: grpcServer,
httpServer: httpServer,
separatePorts: separatePorts,
Setting: telset,
telset: telset,
}, nil
}

func registerGRPCHandlers(
server *grpc.Server,
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
telset telemetry.Setting,
telset telemetry.Settings,
) {
reflection.Register(server)
handler := NewGRPCHandler(querySvc, metricsQuerySvc, GRPCHandlerOptions{
Expand All @@ -121,7 +121,7 @@ func createGRPCServer(
ctx context.Context,
options *QueryOptions,
tm *tenancy.Manager,
telset telemetry.Setting,
telset telemetry.Settings,
) (*grpc.Server, error) {
var grpcOpts []configgrpc.ToServerOption
unaryInterceptors := []grpc.UnaryServerInterceptor{
Expand Down Expand Up @@ -164,7 +164,7 @@ func initRouter(
metricsQuerySvc querysvc.MetricsQueryService,
queryOpts *QueryOptions,
tenancyMgr *tenancy.Manager,
telset telemetry.Setting,
telset telemetry.Settings,
) (http.Handler, io.Closer) {
apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(telset.Logger),
Expand Down Expand Up @@ -206,7 +206,7 @@ func createHTTPServer(
metricsQuerySvc querysvc.MetricsQueryService,
queryOpts *QueryOptions,
tm *tenancy.Manager,
telset telemetry.Setting,
telset telemetry.Settings,
) (*httpServer, error) {
handler, staticHandlerCloser := initRouter(querySvc, metricsQuerySvc, queryOpts, tm, telset)
handler = recoveryhandler.NewRecoveryHandler(telset.Logger, true)(handler)
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {
if err != nil {
return nil, err
}
s.Logger.Info(
s.telset.Logger.Info(
"Query server started",
zap.String("http_addr", s.HTTPAddr()),
zap.String("grpc_addr", s.GRPCAddr()),
Expand All @@ -272,7 +272,7 @@ func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {
tcpPort = port
}

s.Logger.Info(
s.telset.Logger.Info(
"Query server started",
zap.Int("port", tcpPort),
zap.String("addr", s.queryOptions.HTTP.Endpoint))
Expand Down Expand Up @@ -317,46 +317,46 @@ func (s *Server) Start(ctx context.Context) error {
s.bgFinished.Add(1)
go func() {
defer s.bgFinished.Done()
s.Logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
s.telset.Logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
err := s.httpServer.Serve(s.httpConn)
if err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, cmux.ErrListenerClosed) && !errors.Is(err, cmux.ErrServerClosed) {
s.Logger.Error("Could not start HTTP server", zap.Error(err))
s.ReportStatus(componentstatus.NewFatalErrorEvent(err))
s.telset.Logger.Error("Could not start HTTP server", zap.Error(err))
s.telset.ReportStatus(componentstatus.NewFatalErrorEvent(err))
return
}
s.Logger.Info("HTTP server stopped", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
s.telset.Logger.Info("HTTP server stopped", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
}()

// Start GRPC server concurrently
s.bgFinished.Add(1)
go func() {
defer s.bgFinished.Done()
s.Logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint))
s.telset.Logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint))

err := s.grpcServer.Serve(s.grpcConn)
if err != nil && !errors.Is(err, cmux.ErrListenerClosed) && !errors.Is(err, cmux.ErrServerClosed) {
s.Logger.Error("Could not start GRPC server", zap.Error(err))
s.ReportStatus(componentstatus.NewFatalErrorEvent(err))
s.telset.Logger.Error("Could not start GRPC server", zap.Error(err))
s.telset.ReportStatus(componentstatus.NewFatalErrorEvent(err))
return
}
s.Logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint))
s.telset.Logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint))
}()

// Start cmux server concurrently.
if !s.separatePorts {
s.bgFinished.Add(1)
go func() {
defer s.bgFinished.Done()
s.Logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
s.telset.Logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))

err := cmuxServer.Serve()
// TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here.
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.Logger.Error("Could not start multiplexed server", zap.Error(err))
s.ReportStatus(componentstatus.NewFatalErrorEvent(err))
s.telset.Logger.Error("Could not start multiplexed server", zap.Error(err))
s.telset.ReportStatus(componentstatus.NewFatalErrorEvent(err))
return
}
s.Logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
s.telset.Logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
}()
}
return nil
Expand All @@ -374,20 +374,20 @@ func (s *Server) GRPCAddr() string {
func (s *Server) Close() error {
var errs []error

s.Logger.Info("Closing HTTP server")
s.telset.Logger.Info("Closing HTTP server")
if err := s.httpServer.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close HTTP server: %w", err))
}

s.Logger.Info("Stopping gRPC server")
s.telset.Logger.Info("Stopping gRPC server")
s.grpcServer.Stop()

if !s.separatePorts {
s.Logger.Info("Closing CMux server")
s.telset.Logger.Info("Closing CMux server")
s.cmuxServer.Close()
}
s.bgFinished.Wait()

s.Logger.Info("Server stopped")
s.telset.Logger.Info("Server stopped")
return errors.Join(errs...)
}
2 changes: 1 addition & 1 deletion cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (

var testCertKeyLocation = "../../../pkg/config/tlscfg/testdata"

func initTelSet(logger *zap.Logger, tracerProvider *jtracer.JTracer, hc *healthcheck.HealthCheck) telemetry.Setting {
func initTelSet(logger *zap.Logger, tracerProvider *jtracer.JTracer, hc *healthcheck.HealthCheck) telemetry.Settings {
telset := telemetry.NoopSettings()
telset.Logger = logger
telset.TracerProvider = tracerProvider.OTEL
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
}
}

baseTelset := telemetry.Setting{
baseTelset := telemetry.Settings{
Logger: logger,
Metrics: baseFactory,
TracerProvider: jt.OTEL,
Expand Down
14 changes: 7 additions & 7 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ type Server struct {
grpcConn net.Listener
grpcServer *grpc.Server
wg sync.WaitGroup
telemetry.Setting
telset telemetry.Settings
}

// NewServer creates and initializes Server.
func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Setting) (*Server, error) {
func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, telset.Logger)
if err != nil {
return nil, err
Expand All @@ -50,7 +50,7 @@ func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy
return &Server{
opts: options,
grpcServer: grpcServer,
Setting: telset,
telset: telset,
}, nil
}

Expand Down Expand Up @@ -128,14 +128,14 @@ func (s *Server) Start() error {
if err != nil {
return err
}
s.Logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr()))
s.telset.Logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr()))
s.grpcConn = listener
s.wg.Add(1)
go func() {
defer s.wg.Done()
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.Logger.Error("GRPC server exited", zap.Error(err))
s.ReportStatus(componentstatus.NewFatalErrorEvent(err))
s.telset.Logger.Error("GRPC server exited", zap.Error(err))
s.telset.ReportStatus(componentstatus.NewFatalErrorEvent(err))

Check warning on line 138 in cmd/remote-storage/app/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/remote-storage/app/server.go#L137-L138

Added lines #L137 - L138 were not covered by tests
}
}()

Expand All @@ -148,6 +148,6 @@ func (s *Server) Close() error {
s.grpcConn.Close()
s.opts.TLSGRPC.Close()
s.wg.Wait()
s.ReportStatus(componentstatus.NewEvent(componentstatus.StatusStopped))
s.telset.ReportStatus(componentstatus.NewEvent(componentstatus.StatusStopped))
return nil
}
6 changes: 3 additions & 3 deletions cmd/remote-storage/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestNewServer_TLSConfigError(t *testing.T) {
KeyPath: "invalid/path",
ClientCAPath: "invalid/path",
}
telset := telemetry.Setting{
telset := telemetry.Settings{
Logger: zap.NewNop(),
ReportStatus: telemetry.HCAdapter(healthcheck.New()),
}
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestServerGRPCTLS(t *testing.T) {
storageMocks.reader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

tm := tenancy.NewManager(&tenancy.Options{Enabled: true})
telset := telemetry.Setting{
telset := telemetry.Settings{
Logger: flagsSvc.Logger,
ReportStatus: telemetry.HCAdapter(flagsSvc.HC()),
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestServerHandlesPortZero(t *testing.T) {
zapCore, logs := observer.New(zap.InfoLevel)
flagsSvc.Logger = zap.New(zapCore)
storageMocks := newStorageMocks()
telset := telemetry.Setting{
telset := telemetry.Settings{
Logger: flagsSvc.Logger,
ReportStatus: telemetry.HCAdapter(flagsSvc.HC()),
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/remote-storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {
logger.Fatal("Failed to parse options", zap.Error(err))
}

baseTelset := telemetry.Setting{
baseTelset := telemetry.Settings{
Logger: svc.Logger,
Metrics: baseFactory,
ReportStatus: telemetry.HCAdapter(svc.HC()),
Expand Down
10 changes: 5 additions & 5 deletions pkg/telemetry/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
)

type Setting struct {
type Settings struct {
Logger *zap.Logger
Metrics metrics.Factory
MeterProvider metric.MeterProvider
Expand Down Expand Up @@ -47,8 +47,8 @@ func HCAdapter(hc *healthcheck.HealthCheck) func(*componentstatus.Event) {
}
}

func NoopSettings() Setting {
return Setting{
func NoopSettings() Settings {
return Settings{
Logger: zap.NewNop(),
Metrics: metrics.NullFactory,
MeterProvider: noopmetric.NewMeterProvider(),
Expand All @@ -58,8 +58,8 @@ func NoopSettings() Setting {
}
}

func FromOtelComponent(telset component.TelemetrySettings, host component.Host) Setting {
return Setting{
func FromOtelComponent(telset component.TelemetrySettings, host component.Host) Settings {
return Settings{
Logger: telset.Logger,
Metrics: otelmetrics.NewFactory(telset.MeterProvider),
MeterProvider: telset.MeterProvider,
Expand Down
2 changes: 1 addition & 1 deletion pkg/telemetry/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestHCAdapter(t *testing.T) {
}
}

func TestNoopSettings(t *testing.T) {
func TestNoopSettingss(t *testing.T) {
telset := telemetry.NoopSettings()
assert.NotNil(t, telset.Logger)
assert.NotNil(t, telset.Metrics)
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var ( // interface comformance checks
// Factory implements storage.Factory and creates storage components backed by a storage plugin.
type Factory struct {
config Config
telset telemetry.Setting
telset telemetry.Settings
services *ClientPluginServices
tracedRemoteConn *grpc.ClientConn
untracedRemoteConn *grpc.ClientConn
Expand All @@ -59,7 +59,7 @@ func NewFactory() *Factory {
// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg Config,
telset telemetry.Setting,
telset telemetry.Settings,
) (*Factory, error) {
f := NewFactory()
f.config = cfg
Expand Down

0 comments on commit 4c197fe

Please sign in to comment.