From e8bfe804024dc02273c98d2a96d6717ab4253ef7 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Wed, 12 Jun 2024 13:57:02 +0200 Subject: [PATCH] fixup! snippet-service: add ResourceChanged event subscription --- snippet-service/config.yaml | 20 ----------------- snippet-service/pb/service.proto | 6 ++--- snippet-service/service/config.go | 14 ++---------- snippet-service/service/config_test.go | 19 ---------------- snippet-service/service/resourceSubscriber.go | 7 +++--- snippet-service/service/service.go | 22 +++++++++++-------- snippet-service/service/service_test.go | 21 +++++------------- snippet-service/test/service.go | 6 ++--- 8 files changed, 30 insertions(+), 85 deletions(-) diff --git a/snippet-service/config.yaml b/snippet-service/config.yaml index 52b629784..d99d931f5 100644 --- a/snippet-service/config.yaml +++ b/snippet-service/config.yaml @@ -50,26 +50,6 @@ apis: readHeaderTimeout: 4s writeTimeout: 16s idleTimeout: 30s - tls: - caPool: "/secrets/public/rootca.crt" - keyFile: "/secrets/private/cert.key" - certFile: "/secrets/private/cert.crt" - clientCertificateRequired: true - authorization: - ownerClaim: "sub" - authority: "" - audience: "" - http: - maxIdleConns: 16 - maxConnsPerHost: 32 - maxIdleConnsPerHost: 16 - idleConnTimeout: "30s" - timeout: "10s" - tls: - caPool: "/secrets/public/rootca.crt" - keyFile: "/secrets/private/cert.key" - certFile: "/secrets/public/cert.crt" - useSystemCAPool: false clients: storage: use: mongoDB diff --git a/snippet-service/pb/service.proto b/snippet-service/pb/service.proto index 3b02e5a18..648f1df68 100644 --- a/snippet-service/pb/service.proto +++ b/snippet-service/pb/service.proto @@ -116,12 +116,12 @@ message DeleteConfigurationsResponse { int64 count = 1; } message AppliedDeviceConfiguration { //TODO naming message Resource { - resourceaggregate.pb.ResourceId resource_id = 1; // TODO Jozo href only? + string href = 1; string correlation_id = 2; // Reused from invoke command or generated. Can be used to retrieve corresponding pending command. enum Status { QUEUED = 0; PENDING = 1; - DONE = 2; // If done look to resource_updated even update resource failed for resource aggregate. + DONE = 2; // If done look to resource_updated if update resource failed for resource aggregate. TIMEOUT = 3; }; Status status = 3; @@ -141,7 +141,7 @@ message AppliedDeviceConfiguration { //TODO naming repeated Resource resources = 6; //TODO naming string owner = 7; // Unix timestamp in ns when the applied device configuration has been created/updated - int64 timestamp = 8; // @gotags: bson:"timestamp" + int64 timestamp = 8; } message InvokeConfigurationRequest { diff --git a/snippet-service/service/config.go b/snippet-service/service/config.go index e6461a587..ce6a166ed 100644 --- a/snippet-service/service/config.go +++ b/snippet-service/service/config.go @@ -10,32 +10,22 @@ import ( "github.com/plgd-dev/hub/v2/pkg/config" "github.com/plgd-dev/hub/v2/pkg/log" grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client" - grpcServer "github.com/plgd-dev/hub/v2/pkg/net/grpc/server" httpServer "github.com/plgd-dev/hub/v2/pkg/net/http/server" otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" - certManagerServer "github.com/plgd-dev/hub/v2/pkg/security/certManager/server" natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" grpcService "github.com/plgd-dev/hub/v2/snippet-service/service/grpc" storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" ) type HTTPConfig struct { - Addr string `yaml:"address" json:"address"` - Server httpServer.Config `yaml:",inline" json:",inline"` - TLS certManagerServer.Config `yaml:"tls" json:"tls"` - Authorization grpcServer.AuthorizationConfig `yaml:"authorization" json:"authorization"` + Addr string `yaml:"address" json:"address"` + Server httpServer.Config `yaml:",inline" json:",inline"` } func (c *HTTPConfig) Validate() error { if _, err := net.ResolveTCPAddr("tcp", c.Addr); err != nil { return fmt.Errorf("address('%v') - %w", c.Addr, err) } - if err := c.TLS.Validate(); err != nil { - return fmt.Errorf("tls.%w", err) - } - if err := c.Authorization.Validate(); err != nil { - return fmt.Errorf("authorization.%w", err) - } return nil } diff --git a/snippet-service/service/config_test.go b/snippet-service/service/config_test.go index 9bd469c7f..5a0f4e02b 100644 --- a/snippet-service/service/config_test.go +++ b/snippet-service/service/config_test.go @@ -6,7 +6,6 @@ import ( "github.com/plgd-dev/hub/v2/pkg/log" grpcServer "github.com/plgd-dev/hub/v2/pkg/net/grpc/server" otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" - certManagerServer "github.com/plgd-dev/hub/v2/pkg/security/certManager/server" natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" "github.com/plgd-dev/hub/v2/snippet-service/service" storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" @@ -78,24 +77,6 @@ func TestHTTPConfig(t *testing.T) { }(), wantErr: true, }, - { - name: "invalid - bad TLS", - cfg: func() service.HTTPConfig { - cfg := test.MakeHTTPConfig() - cfg.TLS = certManagerServer.Config{} - return cfg - }(), - wantErr: true, - }, - { - name: "invalid - bad authorization", - cfg: func() service.HTTPConfig { - cfg := test.MakeHTTPConfig() - cfg.Authorization = grpcServer.AuthorizationConfig{} - return cfg - }(), - wantErr: true, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/snippet-service/service/resourceSubscriber.go b/snippet-service/service/resourceSubscriber.go index fb54575a3..d12706b6a 100644 --- a/snippet-service/service/resourceSubscriber.go +++ b/snippet-service/service/resourceSubscriber.go @@ -13,6 +13,7 @@ import ( "github.com/plgd-dev/hub/v2/pkg/log" pkgGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client" + "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus" natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" @@ -121,14 +122,14 @@ func (h *resourceChangedHandler) getConfigurationsWithTokens(ctx context.Context confsWithTokens := make(map[string]configurationWithTokens) for _, c := range configurations { - tokens := confTokens[c.GetId()] + tokens := strings.Unique(confTokens[c.GetId()]) if len(tokens) == 0 { h.logger.Errorf("no tokens found for configuration(id:%v)", c.GetId()) continue } confsWithTokens[c.GetId()] = configurationWithTokens{ configuration: c, - tokens: confTokens[c.GetId()], + tokens: tokens, } } @@ -152,7 +153,7 @@ func (h *resourceChangedHandler) applyConfigurations(ctx context.Context, rc *ev return err } - // get configurations + // get configurations with tokens confsWithTokens, err := h.getConfigurationsWithTokens(ctx, owner, conditions) if err != nil { return err diff --git a/snippet-service/service/service.go b/snippet-service/service/service.go index eae0a7c48..d6794d464 100644 --- a/snippet-service/service/service.go +++ b/snippet-service/service/service.go @@ -12,6 +12,7 @@ import ( "github.com/plgd-dev/hub/v2/pkg/log" "github.com/plgd-dev/hub/v2/pkg/net/listener" otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" + certManagerServer "github.com/plgd-dev/hub/v2/pkg/security/certManager/server" "github.com/plgd-dev/hub/v2/pkg/security/jwt/validator" "github.com/plgd-dev/hub/v2/pkg/service" grpcService "github.com/plgd-dev/hub/v2/snippet-service/service/grpc" @@ -93,17 +94,17 @@ func newStore(ctx context.Context, config StorageConfig, fileWatcher *fsnotify.W return db, fl.ToFunction(), nil } -func newHttpService(ctx context.Context, config HTTPConfig, ss *grpcService.SnippetServiceServer, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*httpService.Service, func(), error) { - httpValidator, err := validator.New(ctx, config.Authorization.Config, fileWatcher, logger, tracerProvider) +func newHttpService(ctx context.Context, config HTTPConfig, validatorConfig validator.Config, tlsConfig certManagerServer.Config, ss *grpcService.SnippetServiceServer, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*httpService.Service, func(), error) { + httpValidator, err := validator.New(ctx, validatorConfig, fileWatcher, logger, tracerProvider) if err != nil { return nil, nil, fmt.Errorf("cannot create http validator: %w", err) } httpService, err := httpService.New(serviceName, httpService.Config{ Connection: listener.Config{ Addr: config.Addr, - TLS: config.TLS, + TLS: tlsConfig, }, - Authorization: config.Authorization.Config, + Authorization: validatorConfig, Server: config.Server, }, ss, httpValidator, fileWatcher, logger, tracerProvider) if err != nil { @@ -174,21 +175,24 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg } }) - httpService, httpServiceClose, err := newHttpService(ctx, config.APIs.HTTP, snippetService, fileWatcher, logger, tracerProvider) + grpcService, grpcServiceClose, err := newGrpcService(ctx, config.APIs.GRPC, snippetService, fileWatcher, logger, tracerProvider) if err != nil { closerFn.Execute() return nil, err } - closerFn.AddFunc(httpServiceClose) + closerFn.AddFunc(grpcServiceClose) + s := service.New(grpcService) - grpcService, grpcServiceClose, err := newGrpcService(ctx, config.APIs.GRPC, snippetService, fileWatcher, logger, tracerProvider) + httpService, httpServiceClose, err := newHttpService(ctx, config.APIs.HTTP, config.APIs.GRPC.Authorization.Config, config.APIs.GRPC.TLS, + snippetService, fileWatcher, logger, tracerProvider) if err != nil { + grpcService.Close() closerFn.Execute() return nil, err } - closerFn.AddFunc(grpcServiceClose) + closerFn.AddFunc(httpServiceClose) + s.Add(httpService) - s := service.New(httpService, grpcService) s.AddCloseFunc(closerFn.Execute) return &Service{ Service: s, diff --git a/snippet-service/service/service_test.go b/snippet-service/service/service_test.go index 56625c563..698c1fdea 100644 --- a/snippet-service/service/service_test.go +++ b/snippet-service/service/service_test.go @@ -152,37 +152,28 @@ func TestServiceNew(t *testing.T) { wantErr: true, }, { - name: "invalid HTTP validator config", - cfg: func() service.Config { - cfg := test.MakeConfig(t) - cfg.APIs.HTTP.Authorization.Config.HTTP = httpClient.Config{} - return cfg - }(), - wantErr: true, - }, - { - name: "invalid HTTP config", + name: "invalid GRPC validator config", cfg: func() service.Config { cfg := test.MakeConfig(t) - cfg.APIs.HTTP.Addr = "invalid" + cfg.APIs.GRPC.Authorization.Config.HTTP = httpClient.Config{} return cfg }(), wantErr: true, }, { - name: "invalid GRPC validator config", + name: "invalid GRPC config", cfg: func() service.Config { cfg := test.MakeConfig(t) - cfg.APIs.GRPC.Authorization.Config.HTTP = httpClient.Config{} + cfg.APIs.GRPC.Addr = "invalid" return cfg }(), wantErr: true, }, { - name: "invalid GRPC config", + name: "invalid HTTP config", cfg: func() service.Config { cfg := test.MakeConfig(t) - cfg.APIs.GRPC.Addr = "invalid" + cfg.APIs.HTTP.Addr = "invalid" return cfg }(), wantErr: true, diff --git a/snippet-service/test/service.go b/snippet-service/test/service.go index bca8a9cfb..d433dd8cd 100644 --- a/snippet-service/test/service.go +++ b/snippet-service/test/service.go @@ -27,10 +27,8 @@ func MakeHTTPConfig() service.HTTPConfig { tls := config.MakeTLSServerConfig() tls.ClientCertificateRequired = false return service.HTTPConfig{ - Addr: config.SNIPPET_SERVICE_HTTP_HOST, - Server: config.MakeHttpServerConfig(), - TLS: tls, - Authorization: config.MakeAuthorizationConfig(), + Addr: config.SNIPPET_SERVICE_HTTP_HOST, + Server: config.MakeHttpServerConfig(), } }