Skip to content

Commit

Permalink
fixup! snippet-service: add ResourceChanged event subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Jun 12, 2024
1 parent 2467abf commit e8bfe80
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 85 deletions.
20 changes: 0 additions & 20 deletions snippet-service/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,6 @@ apis:
readHeaderTimeout: 4s
writeTimeout: 16s
idleTimeout: 30s
tls:
caPool: "/secrets/public/rootca.crt"
keyFile: "/secrets/private/cert.key"
certFile: "/secrets/private/cert.crt"
clientCertificateRequired: true
authorization:
ownerClaim: "sub"
authority: ""
audience: ""
http:
maxIdleConns: 16
maxConnsPerHost: 32
maxIdleConnsPerHost: 16
idleConnTimeout: "30s"
timeout: "10s"
tls:
caPool: "/secrets/public/rootca.crt"
keyFile: "/secrets/private/cert.key"
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
clients:
storage:
use: mongoDB
Expand Down
6 changes: 3 additions & 3 deletions snippet-service/pb/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ message DeleteConfigurationsResponse { int64 count = 1; }

message AppliedDeviceConfiguration { //TODO naming
message Resource {
resourceaggregate.pb.ResourceId resource_id = 1; // TODO Jozo href only?
string href = 1;
string correlation_id = 2; // Reused from invoke command or generated. Can be used to retrieve corresponding pending command.
enum Status {
QUEUED = 0;
PENDING = 1;
DONE = 2; // If done look to resource_updated even update resource failed for resource aggregate.
DONE = 2; // If done look to resource_updated if update resource failed for resource aggregate.
TIMEOUT = 3;
};
Status status = 3;
Expand All @@ -141,7 +141,7 @@ message AppliedDeviceConfiguration { //TODO naming
repeated Resource resources = 6; //TODO naming
string owner = 7;
// Unix timestamp in ns when the applied device configuration has been created/updated
int64 timestamp = 8; // @gotags: bson:"timestamp"
int64 timestamp = 8;
}

message InvokeConfigurationRequest {
Expand Down
14 changes: 2 additions & 12 deletions snippet-service/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,22 @@ import (
"github.com/plgd-dev/hub/v2/pkg/config"
"github.com/plgd-dev/hub/v2/pkg/log"
grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client"
grpcServer "github.com/plgd-dev/hub/v2/pkg/net/grpc/server"
httpServer "github.com/plgd-dev/hub/v2/pkg/net/http/server"
otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client"
certManagerServer "github.com/plgd-dev/hub/v2/pkg/security/certManager/server"
natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client"
grpcService "github.com/plgd-dev/hub/v2/snippet-service/service/grpc"
storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config"
)

type HTTPConfig struct {
Addr string `yaml:"address" json:"address"`
Server httpServer.Config `yaml:",inline" json:",inline"`
TLS certManagerServer.Config `yaml:"tls" json:"tls"`
Authorization grpcServer.AuthorizationConfig `yaml:"authorization" json:"authorization"`
Addr string `yaml:"address" json:"address"`
Server httpServer.Config `yaml:",inline" json:",inline"`
}

func (c *HTTPConfig) Validate() error {
if _, err := net.ResolveTCPAddr("tcp", c.Addr); err != nil {
return fmt.Errorf("address('%v') - %w", c.Addr, err)
}
if err := c.TLS.Validate(); err != nil {
return fmt.Errorf("tls.%w", err)
}
if err := c.Authorization.Validate(); err != nil {
return fmt.Errorf("authorization.%w", err)
}
return nil
}

Expand Down
19 changes: 0 additions & 19 deletions snippet-service/service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/plgd-dev/hub/v2/pkg/log"
grpcServer "github.com/plgd-dev/hub/v2/pkg/net/grpc/server"
otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client"
certManagerServer "github.com/plgd-dev/hub/v2/pkg/security/certManager/server"
natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client"
"github.com/plgd-dev/hub/v2/snippet-service/service"
storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config"
Expand Down Expand Up @@ -78,24 +77,6 @@ func TestHTTPConfig(t *testing.T) {
}(),
wantErr: true,
},
{
name: "invalid - bad TLS",
cfg: func() service.HTTPConfig {
cfg := test.MakeHTTPConfig()
cfg.TLS = certManagerServer.Config{}
return cfg
}(),
wantErr: true,
},
{
name: "invalid - bad authorization",
cfg: func() service.HTTPConfig {
cfg := test.MakeHTTPConfig()
cfg.Authorization = grpcServer.AuthorizationConfig{}
return cfg
}(),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions snippet-service/service/resourceSubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/plgd-dev/hub/v2/pkg/log"
pkgGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc"
grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client"
"github.com/plgd-dev/hub/v2/pkg/strings"
"github.com/plgd-dev/hub/v2/resource-aggregate/commands"
"github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus"
natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client"
Expand Down Expand Up @@ -121,14 +122,14 @@ func (h *resourceChangedHandler) getConfigurationsWithTokens(ctx context.Context

confsWithTokens := make(map[string]configurationWithTokens)
for _, c := range configurations {
tokens := confTokens[c.GetId()]
tokens := strings.Unique(confTokens[c.GetId()])
if len(tokens) == 0 {
h.logger.Errorf("no tokens found for configuration(id:%v)", c.GetId())
continue
}
confsWithTokens[c.GetId()] = configurationWithTokens{
configuration: c,
tokens: confTokens[c.GetId()],
tokens: tokens,
}
}

Expand All @@ -152,7 +153,7 @@ func (h *resourceChangedHandler) applyConfigurations(ctx context.Context, rc *ev
return err
}

// get configurations
// get configurations with tokens
confsWithTokens, err := h.getConfigurationsWithTokens(ctx, owner, conditions)
if err != nil {
return err
Expand Down
22 changes: 13 additions & 9 deletions snippet-service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/plgd-dev/hub/v2/pkg/log"
"github.com/plgd-dev/hub/v2/pkg/net/listener"
otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client"
certManagerServer "github.com/plgd-dev/hub/v2/pkg/security/certManager/server"
"github.com/plgd-dev/hub/v2/pkg/security/jwt/validator"
"github.com/plgd-dev/hub/v2/pkg/service"
grpcService "github.com/plgd-dev/hub/v2/snippet-service/service/grpc"
Expand Down Expand Up @@ -93,17 +94,17 @@ func newStore(ctx context.Context, config StorageConfig, fileWatcher *fsnotify.W
return db, fl.ToFunction(), nil
}

func newHttpService(ctx context.Context, config HTTPConfig, ss *grpcService.SnippetServiceServer, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*httpService.Service, func(), error) {
httpValidator, err := validator.New(ctx, config.Authorization.Config, fileWatcher, logger, tracerProvider)
func newHttpService(ctx context.Context, config HTTPConfig, validatorConfig validator.Config, tlsConfig certManagerServer.Config, ss *grpcService.SnippetServiceServer, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*httpService.Service, func(), error) {
httpValidator, err := validator.New(ctx, validatorConfig, fileWatcher, logger, tracerProvider)
if err != nil {
return nil, nil, fmt.Errorf("cannot create http validator: %w", err)
}
httpService, err := httpService.New(serviceName, httpService.Config{
Connection: listener.Config{
Addr: config.Addr,
TLS: config.TLS,
TLS: tlsConfig,
},
Authorization: config.Authorization.Config,
Authorization: validatorConfig,
Server: config.Server,
}, ss, httpValidator, fileWatcher, logger, tracerProvider)
if err != nil {
Expand Down Expand Up @@ -174,21 +175,24 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
}
})

httpService, httpServiceClose, err := newHttpService(ctx, config.APIs.HTTP, snippetService, fileWatcher, logger, tracerProvider)
grpcService, grpcServiceClose, err := newGrpcService(ctx, config.APIs.GRPC, snippetService, fileWatcher, logger, tracerProvider)
if err != nil {
closerFn.Execute()
return nil, err
}
closerFn.AddFunc(httpServiceClose)
closerFn.AddFunc(grpcServiceClose)
s := service.New(grpcService)

grpcService, grpcServiceClose, err := newGrpcService(ctx, config.APIs.GRPC, snippetService, fileWatcher, logger, tracerProvider)
httpService, httpServiceClose, err := newHttpService(ctx, config.APIs.HTTP, config.APIs.GRPC.Authorization.Config, config.APIs.GRPC.TLS,
snippetService, fileWatcher, logger, tracerProvider)
if err != nil {
grpcService.Close()
closerFn.Execute()
return nil, err
}
closerFn.AddFunc(grpcServiceClose)
closerFn.AddFunc(httpServiceClose)
s.Add(httpService)

s := service.New(httpService, grpcService)
s.AddCloseFunc(closerFn.Execute)
return &Service{
Service: s,
Expand Down
21 changes: 6 additions & 15 deletions snippet-service/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,37 +152,28 @@ func TestServiceNew(t *testing.T) {
wantErr: true,
},
{
name: "invalid HTTP validator config",
cfg: func() service.Config {
cfg := test.MakeConfig(t)
cfg.APIs.HTTP.Authorization.Config.HTTP = httpClient.Config{}
return cfg
}(),
wantErr: true,
},
{
name: "invalid HTTP config",
name: "invalid GRPC validator config",
cfg: func() service.Config {
cfg := test.MakeConfig(t)
cfg.APIs.HTTP.Addr = "invalid"
cfg.APIs.GRPC.Authorization.Config.HTTP = httpClient.Config{}
return cfg
}(),
wantErr: true,
},
{
name: "invalid GRPC validator config",
name: "invalid GRPC config",
cfg: func() service.Config {
cfg := test.MakeConfig(t)
cfg.APIs.GRPC.Authorization.Config.HTTP = httpClient.Config{}
cfg.APIs.GRPC.Addr = "invalid"
return cfg
}(),
wantErr: true,
},
{
name: "invalid GRPC config",
name: "invalid HTTP config",
cfg: func() service.Config {
cfg := test.MakeConfig(t)
cfg.APIs.GRPC.Addr = "invalid"
cfg.APIs.HTTP.Addr = "invalid"
return cfg
}(),
wantErr: true,
Expand Down
6 changes: 2 additions & 4 deletions snippet-service/test/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ func MakeHTTPConfig() service.HTTPConfig {
tls := config.MakeTLSServerConfig()
tls.ClientCertificateRequired = false
return service.HTTPConfig{
Addr: config.SNIPPET_SERVICE_HTTP_HOST,
Server: config.MakeHttpServerConfig(),
TLS: tls,
Authorization: config.MakeAuthorizationConfig(),
Addr: config.SNIPPET_SERVICE_HTTP_HOST,
Server: config.MakeHttpServerConfig(),
}
}

Expand Down

0 comments on commit e8bfe80

Please sign in to comment.