From 687662b1d7f52c3ae5e81c785e31e7c2ca0c9fc3 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Wed, 3 Jul 2024 19:00:40 +0200 Subject: [PATCH] Replace runtime map with periodic database check --- .../templates/snippet-service/config.yaml | 23 +- charts/plgd-hub/values.yaml | 3 +- pkg/mongodb/marshal.go | 17 +- snippet-service/config.yaml | 5 +- snippet-service/{store => jq}/jq.go | 2 +- snippet-service/{store => jq}/jq_test.go | 6 +- snippet-service/pb/appliedConfiguration.go | 12 +- snippet-service/service/config.go | 63 +--- snippet-service/service/config_test.go | 85 +---- snippet-service/service/grpc/server.go | 5 +- .../service/http/invokeConfiguration_test.go | 6 +- snippet-service/service/service.go | 65 +--- snippet-service/service/service_test.go | 134 +------- .../store/cqldb/appliedConfiguration.go | 4 + .../store/mongodb/appliedConfiguration.go | 41 ++- .../mongodb/getAppliedConfigurations_test.go | 35 +++ snippet-service/store/store.go | 3 + snippet-service/test/appliedConfiguration.go | 3 + snippet-service/test/service.go | 56 ++-- snippet-service/updater/config.go | 46 +++ snippet-service/updater/config_test.go | 73 +++++ snippet-service/updater/expiredUpdates.go | 26 ++ .../{store => updater}/resourceUpdater.go | 290 +++++++----------- .../updater/resourceUpdater_test.go | 112 +++++++ snippet-service/updater/updateExecution.go | 67 ++++ 25 files changed, 636 insertions(+), 546 deletions(-) rename snippet-service/{store => jq}/jq.go (97%) rename snippet-service/{store => jq}/jq_test.go (95%) create mode 100644 snippet-service/updater/config.go create mode 100644 snippet-service/updater/config_test.go create mode 100644 snippet-service/updater/expiredUpdates.go rename snippet-service/{store => updater}/resourceUpdater.go (76%) create mode 100644 snippet-service/updater/resourceUpdater_test.go create mode 100644 snippet-service/updater/updateExecution.go diff --git a/charts/plgd-hub/templates/snippet-service/config.yaml b/charts/plgd-hub/templates/snippet-service/config.yaml index a14e5b04c..dcde0518c 100644 --- a/charts/plgd-hub/templates/snippet-service/config.yaml +++ b/charts/plgd-hub/templates/snippet-service/config.yaml @@ -101,21 +101,22 @@ data: {{- $cqlDbTls := .clients.storage.cqlDB.tls }} {{- include "plgd-hub.certificateConfig" (list $ $cqlDbTls $cert ) | indent 10 }} useSystemCAPool: {{ .clients.storage.cqlDB.tls.useSystemCAPool }} - resourceAggregate: - pendingCommandsCheckInterval: {{ .clients.resourceAggregate.pendingCommandsCheckInterval }} + resourceUpdater: + cleanUpExpiredUpdates: {{ .clients.resourceUpdater.cleanUpExpiredUpdates | quote }} + pendingCommandsCheckInterval: {{ .clients.resourceUpdater.pendingCommandsCheckInterval }} grpc: - {{- $resourceAggregate := .clients.resourceAggregate.grpc.address }} - address:{{ printf " " }}{{- include "plgd-hub.resourceAggregateAddress" (list $ $resourceAggregate ) | quote }} - sendMsgSize: {{ int64 .clients.resourceAggregate.grpc.sendMsgSize | default 4194304 }} - recvMsgSize: {{ int64 .clients.resourceAggregate.grpc.recvMsgSize | default 4194304 }} + {{- $resourceUpdater := .clients.resourceUpdater.grpc.address }} + address:{{ printf " " }}{{- include "plgd-hub.resourceAggregateAddress" (list $ $resourceUpdater ) | quote }} + sendMsgSize: {{ int64 .clients.resourceUpdater.grpc.sendMsgSize | default 4194304 }} + recvMsgSize: {{ int64 .clients.resourceUpdater.grpc.recvMsgSize | default 4194304 }} keepAlive: - time: {{ .clients.resourceAggregate.grpc.keepAlive.time }} - timeout: {{ .clients.resourceAggregate.grpc.keepAlive.timeout }} - permitWithoutStream: {{ .clients.resourceAggregate.grpc.keepAlive.permitWithoutStream }} + time: {{ .clients.resourceUpdater.grpc.keepAlive.time }} + timeout: {{ .clients.resourceUpdater.grpc.keepAlive.timeout }} + permitWithoutStream: {{ .clients.resourceUpdater.grpc.keepAlive.permitWithoutStream }} tls: - {{- $raClientTls := .clients.resourceAggregate.grpc.tls }} + {{- $raClientTls := .clients.resourceUpdater.grpc.tls }} {{- include "plgd-hub.certificateConfig" (list $ $raClientTls $cert) | indent 10 }} - useSystemCAPool: {{ .clients.resourceAggregate.grpc.tls.useSystemCAPool }} + useSystemCAPool: {{ .clients.resourceUpdater.grpc.tls.useSystemCAPool }} {{- include "plgd-hub.openTelemetryExporterConfig" (list $ $cert ) | nindent 6 }} {{- end }} {{- end }} diff --git a/charts/plgd-hub/values.yaml b/charts/plgd-hub/values.yaml index c2c502137..98dd2cd77 100644 --- a/charts/plgd-hub/values.yaml +++ b/charts/plgd-hub/values.yaml @@ -2508,8 +2508,9 @@ snippetservice: keyFile: certFile: useSystemCAPool: false - resourceAggregate: + resourceUpdater: pendingCommandsCheckInterval: 1m + cleanUpExpiredUpdates: "0 * * * *" grpc: address: "" sendMsgSize: 4194304 diff --git a/pkg/mongodb/marshal.go b/pkg/mongodb/marshal.go index 7893cf72c..54dbd8acb 100644 --- a/pkg/mongodb/marshal.go +++ b/pkg/mongodb/marshal.go @@ -12,7 +12,7 @@ import ( type updateJSON = func(map[string]interface{}) -func ConvertStringValueToInt(json map[string]interface{}, path string) { +func ConvertStringValueToInt64(json map[string]interface{}, path string) { pos := strings.Index(path, ".") if pos == -1 { valueI, ok := json[path] @@ -36,11 +36,24 @@ func ConvertStringValueToInt(json map[string]interface{}, path string) { if !ok { return } + elemArray, ok := elem.([]interface{}) + if ok { + for i, elem := range elemArray { + elemMap, ok2 := elem.(map[string]interface{}) + if !ok2 { + continue + } + ConvertStringValueToInt64(elemMap, path[pos+1:]) + elemArray[i] = elemMap + } + json[elemPath] = elemArray + return + } elemMap, ok := elem.(map[string]interface{}) if !ok { return } - ConvertStringValueToInt(elemMap, path[pos+1:]) + ConvertStringValueToInt64(elemMap, path[pos+1:]) json[elemPath] = elemMap } diff --git a/snippet-service/config.yaml b/snippet-service/config.yaml index 24b61966c..7d0342316 100644 --- a/snippet-service/config.yaml +++ b/snippet-service/config.yaml @@ -114,7 +114,9 @@ clients: keyFile: "/secrets/private/cert.key" certFile: "/secrets/public/cert.crt" useSystemCAPool: false - resourceAggregate: + resourceUpdater: + pendingCommandsCheckInterval: 1m + cleanUpExpiredUpdates: "0 * * * *" grpc: address: "" sendMsgSize: 4194304 @@ -128,4 +130,3 @@ clients: keyFile: "/secrets/private/cert.key" certFile: "/secrets/public/cert.crt" useSystemCAPool: false - pendingCommandsCheckInterval: 1m diff --git a/snippet-service/store/jq.go b/snippet-service/jq/jq.go similarity index 97% rename from snippet-service/store/jq.go rename to snippet-service/jq/jq.go index 3fa07ddd8..470135e09 100644 --- a/snippet-service/store/jq.go +++ b/snippet-service/jq/jq.go @@ -1,4 +1,4 @@ -package store +package jq import ( "fmt" diff --git a/snippet-service/store/jq_test.go b/snippet-service/jq/jq_test.go similarity index 95% rename from snippet-service/store/jq_test.go rename to snippet-service/jq/jq_test.go index 792e52ea8..5b41aa707 100644 --- a/snippet-service/store/jq_test.go +++ b/snippet-service/jq/jq_test.go @@ -1,11 +1,11 @@ -package store_test +package jq_test import ( "testing" "github.com/plgd-dev/go-coap/v3/message" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" - "github.com/plgd-dev/hub/v2/snippet-service/store" + "github.com/plgd-dev/hub/v2/snippet-service/jq" hubTest "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/kit/v2/codec/json" "github.com/stretchr/testify/require" @@ -117,7 +117,7 @@ func TestEvalJQCondition(t *testing.T) { err = commands.DecodeContent(tt.content, &json) } require.NoError(t, err) - got, err := store.EvalJQCondition(tt.jq, json) + got, err := jq.EvalJQCondition(tt.jq, json) if tt.wantErr { require.Error(t, err) return diff --git a/snippet-service/pb/appliedConfiguration.go b/snippet-service/pb/appliedConfiguration.go index a1e4398d5..12b0052af 100644 --- a/snippet-service/pb/appliedConfiguration.go +++ b/snippet-service/pb/appliedConfiguration.go @@ -88,6 +88,7 @@ func (r *AppliedConfiguration_Resource) Clone() *AppliedConfiguration_Resource { CorrelationId: r.GetCorrelationId(), Status: r.GetStatus(), ResourceUpdated: r.GetResourceUpdated().Clone(), + ValidUntil: r.GetValidUntil(), } } @@ -95,8 +96,12 @@ func (r *AppliedConfiguration_Resource) UnmarshalBSON(data []byte) error { return pkgMongo.UnmarshalProtoBSON(data, r, nil) } +func (c *AppliedConfiguration_Resource) jsonToBSONTag(json map[string]interface{}) { + pkgMongo.ConvertStringValueToInt64(json, "validUntil") +} + func (r *AppliedConfiguration_Resource) MarshalBSON() ([]byte, error) { - return pkgMongo.MarshalProtoBSON(r, nil) + return pkgMongo.MarshalProtoBSON(r, r.jsonToBSONTag) } func (c *AppliedConfiguration) CloneExecutedBy() isAppliedConfiguration_ExecutedBy { @@ -133,8 +138,9 @@ func (c *AppliedConfiguration) Clone() *AppliedConfiguration { } func (c *AppliedConfiguration) jsonToBSONTag(json map[string]interface{}) { - pkgMongo.ConvertStringValueToInt(json, "configurationId.version") - pkgMongo.ConvertStringValueToInt(json, "conditionId.version") + pkgMongo.ConvertStringValueToInt64(json, "configurationId.version") + pkgMongo.ConvertStringValueToInt64(json, "conditionId.version") + pkgMongo.ConvertStringValueToInt64(json, "resources.validUntil") } func (c *AppliedConfiguration) MarshalBSON() ([]byte, error) { diff --git a/snippet-service/service/config.go b/snippet-service/service/config.go index ea832e033..99397ffb0 100644 --- a/snippet-service/service/config.go +++ b/snippet-service/service/config.go @@ -3,18 +3,16 @@ package service import ( "fmt" "net" - "time" - "github.com/go-co-op/gocron/v2" "github.com/google/uuid" "github.com/plgd-dev/hub/v2/pkg/config" "github.com/plgd-dev/hub/v2/pkg/log" - grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client" httpServer "github.com/plgd-dev/hub/v2/pkg/net/http/server" otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" grpcService "github.com/plgd-dev/hub/v2/snippet-service/service/grpc" storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" + "github.com/plgd-dev/hub/v2/snippet-service/updater" ) type HTTPConfig struct { @@ -45,53 +43,6 @@ func (c *APIsConfig) Validate() error { return nil } -type StorageConfig struct { - Embedded storeConfig.Config `yaml:",inline" json:",inline"` - ExtendCronParserBySeconds bool `yaml:"-" json:"-"` - CleanUpRecords string `yaml:"cleanUpRecords" json:"cleanUpRecords"` -} - -func (c *StorageConfig) Validate() error { - if err := c.Embedded.Validate(); err != nil { - return err - } - if c.CleanUpRecords == "" { - return nil - } - s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan - if err != nil { - return fmt.Errorf("cannot create cron job: %w", err) - } - defer func() { - if errS := s.Shutdown(); errS != nil { - log.Errorf("failed to shutdown cron job: %w", errS) - } - }() - _, err = s.NewJob(gocron.CronJob(c.CleanUpRecords, c.ExtendCronParserBySeconds), - gocron.NewTask(func() { - // do nothing - })) - if err != nil { - return fmt.Errorf("cleanUpRecords('%v') - %w", c.CleanUpRecords, err) - } - return nil -} - -type ResourceAggregateConfig struct { - Connection grpcClient.Config `yaml:"grpc" json:"grpc"` - PendingCommandsCheckInterval time.Duration `yaml:"pendingCommandsCheckInterval" json:"pendingCommandsCheckInterval"` -} - -func (c *ResourceAggregateConfig) Validate() error { - if err := c.Connection.Validate(); err != nil { - return fmt.Errorf("grpc.%w", err) - } - if c.PendingCommandsCheckInterval <= 0 { - return fmt.Errorf("pendingCommandsCheckInterval('%v') - must be greater than 0", c.PendingCommandsCheckInterval) - } - return nil -} - type EventBusConfig struct { NATS natsClient.Config `yaml:"nats" json:"nats"` } @@ -104,10 +55,10 @@ func (c *EventBusConfig) Validate() error { } type ClientsConfig struct { - Storage StorageConfig `yaml:"storage" json:"storage"` - OpenTelemetryCollector otelClient.Config `yaml:"openTelemetryCollector" json:"openTelemetryCollector"` - EventBus EventBusConfig `yaml:"eventBus" json:"eventBus"` - ResourceAggregate ResourceAggregateConfig `yaml:"resourceAggregate" json:"resourceAggregate"` + Storage storeConfig.Config `yaml:"storage" json:"storage"` + OpenTelemetryCollector otelClient.Config `yaml:"openTelemetryCollector" json:"openTelemetryCollector"` + EventBus EventBusConfig `yaml:"eventBus" json:"eventBus"` + ResourceUpdater updater.ResourceUpdaterConfig `yaml:"resourceUpdater" json:"resourceUpdater"` } func (c *ClientsConfig) Validate() error { @@ -120,8 +71,8 @@ func (c *ClientsConfig) Validate() error { if err := c.EventBus.Validate(); err != nil { return fmt.Errorf("eventBus.%w", err) } - if err := c.ResourceAggregate.Validate(); err != nil { - return fmt.Errorf("resourceAggregate.%w", err) + if err := c.ResourceUpdater.Validate(); err != nil { + return fmt.Errorf("resourceUpdater.%w", err) } return nil } diff --git a/snippet-service/service/config_test.go b/snippet-service/service/config_test.go index ec00ee24d..2e1b4574e 100644 --- a/snippet-service/service/config_test.go +++ b/snippet-service/service/config_test.go @@ -2,7 +2,6 @@ package service_test import ( "testing" - "time" "github.com/plgd-dev/hub/v2/pkg/log" grpcServer "github.com/plgd-dev/hub/v2/pkg/net/grpc/server" @@ -11,7 +10,7 @@ import ( "github.com/plgd-dev/hub/v2/snippet-service/service" storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" "github.com/plgd-dev/hub/v2/snippet-service/test" - "github.com/plgd-dev/hub/v2/test/config" + "github.com/plgd-dev/hub/v2/snippet-service/updater" "github.com/stretchr/testify/require" ) @@ -95,90 +94,20 @@ func TestHTTPConfig(t *testing.T) { func TestStorageConfig(t *testing.T) { tests := []struct { name string - cfg service.StorageConfig + cfg storeConfig.Config wantErr bool }{ { name: "valid", - cfg: test.MakeStorageConfig(), + cfg: test.MakeStoreConfig(), wantErr: false, }, { - name: "valid - no cron", - cfg: func() service.StorageConfig { - cfg := test.MakeStorageConfig() - cfg.CleanUpRecords = "" - return cfg - }(), - wantErr: false, - }, - { - name: "invalid - no storage", - cfg: func() service.StorageConfig { - cfg := test.MakeStorageConfig() - cfg.Embedded = storeConfig.Config{} - return cfg - }(), - wantErr: true, - }, - { - name: "invalid - bad cron expression", - cfg: func() service.StorageConfig { - cfg := test.MakeStorageConfig() - cfg.CleanUpRecords = "bad" - return cfg - }(), - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := tt.cfg.Validate() - if tt.wantErr { - require.Error(t, err) - return - } - require.NoError(t, err) - }) - } -} - -func TestResourceAggregateConfig(t *testing.T) { - tests := []struct { - name string - cfg service.ResourceAggregateConfig - wantErr bool - }{ - { - name: "valid", - cfg: service.ResourceAggregateConfig{ - Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST), - PendingCommandsCheckInterval: time.Second * 10, - }, - }, - { - name: "invalid - no connection", - cfg: func() service.ResourceAggregateConfig { - cfg := service.ResourceAggregateConfig{ - PendingCommandsCheckInterval: time.Second * 10, - } - return cfg - }(), - wantErr: true, - }, - { - name: "invalid - pending commands check interval", - cfg: func() service.ResourceAggregateConfig { - cfg := service.ResourceAggregateConfig{ - Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST), - PendingCommandsCheckInterval: -1, - } - return cfg - }(), + name: "invalid", + cfg: storeConfig.Config{}, wantErr: true, }, } - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { err := tt.cfg.Validate() @@ -205,7 +134,7 @@ func TestClientsConfig(t *testing.T) { name: "invalid - no storage", cfg: func() service.ClientsConfig { cfg := test.MakeClientsConfig() - cfg.Storage = service.StorageConfig{} + cfg.Storage = storeConfig.Config{} return cfg }(), wantErr: true, @@ -238,7 +167,7 @@ func TestClientsConfig(t *testing.T) { name: "invalid ResourceAggregate", cfg: func() service.ClientsConfig { cfg := test.MakeClientsConfig() - cfg.ResourceAggregate = service.ResourceAggregateConfig{} + cfg.ResourceUpdater = updater.ResourceUpdaterConfig{} return cfg }(), wantErr: true, diff --git a/snippet-service/service/grpc/server.go b/snippet-service/service/grpc/server.go index df8c93e1d..3d26dc71e 100644 --- a/snippet-service/service/grpc/server.go +++ b/snippet-service/service/grpc/server.go @@ -9,6 +9,7 @@ import ( pkgGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/snippet-service/pb" "github.com/plgd-dev/hub/v2/snippet-service/store" + "github.com/plgd-dev/hub/v2/snippet-service/updater" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -18,13 +19,13 @@ type SnippetServiceServer struct { pb.UnimplementedSnippetServiceServer store store.Store - resourceUpdater *store.ResourceUpdater + resourceUpdater *updater.ResourceUpdater ownerClaim string hubID string logger log.Logger } -func NewSnippetServiceServer(store store.Store, resourceUpdater *store.ResourceUpdater, ownerClaim string, hubID string, logger log.Logger) *SnippetServiceServer { +func NewSnippetServiceServer(store store.Store, resourceUpdater *updater.ResourceUpdater, ownerClaim string, hubID string, logger log.Logger) *SnippetServiceServer { return &SnippetServiceServer{ store: store, resourceUpdater: resourceUpdater, diff --git a/snippet-service/service/http/invokeConfiguration_test.go b/snippet-service/service/http/invokeConfiguration_test.go index 0aa65346f..188d05db8 100644 --- a/snippet-service/service/http/invokeConfiguration_test.go +++ b/snippet-service/service/http/invokeConfiguration_test.go @@ -23,8 +23,8 @@ import ( "github.com/plgd-dev/hub/v2/resource-aggregate/commands" "github.com/plgd-dev/hub/v2/snippet-service/pb" snippetHttp "github.com/plgd-dev/hub/v2/snippet-service/service/http" - "github.com/plgd-dev/hub/v2/snippet-service/store" snippetTest "github.com/plgd-dev/hub/v2/snippet-service/test" + "github.com/plgd-dev/hub/v2/snippet-service/updater" hubTest "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" httpTest "github.com/plgd-dev/hub/v2/test/http" @@ -136,7 +136,7 @@ func getPendingCommands(ctx context.Context, t *testing.T, c grpcGwPb.GrpcGatewa id = c.ResourceUpdatePending.GetAuditContext().GetCorrelationId() } - appliedConfID, _, _, ok := store.SplitCorrelationID(id) + appliedConfID, _, _, ok := updater.SplitCorrelationID(id) if !ok { continue } @@ -175,7 +175,7 @@ func TestRequestHandlerInvokeConfiguration(t *testing.T) { }() snippetCfg := snippetTest.MakeConfig(t) - snippetCfg.Clients.ResourceAggregate.PendingCommandsCheckInterval = time.Millisecond * 500 + snippetCfg.Clients.ResourceUpdater.PendingCommandsCheckInterval = time.Millisecond * 500 _, shutdownHttp := snippetTest.New(t, snippetCfg) defer shutdownHttp() logger := log.NewLogger(snippetCfg.Log) diff --git a/snippet-service/service/service.go b/snippet-service/service/service.go index e72ceab8f..7973940cc 100644 --- a/snippet-service/service/service.go +++ b/snippet-service/service/service.go @@ -3,9 +3,7 @@ package service import ( "context" "fmt" - "time" - "github.com/go-co-op/gocron/v2" "github.com/plgd-dev/hub/v2/pkg/config/database" "github.com/plgd-dev/hub/v2/pkg/fn" "github.com/plgd-dev/hub/v2/pkg/fsnotify" @@ -20,6 +18,7 @@ import ( "github.com/plgd-dev/hub/v2/snippet-service/store" storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" "github.com/plgd-dev/hub/v2/snippet-service/store/mongodb" + "github.com/plgd-dev/hub/v2/snippet-service/updater" "go.opentelemetry.io/otel/trace" ) @@ -29,7 +28,7 @@ type Service struct { *service.Service snippetService *grpcService.SnippetServiceServer - resourceUpdater *store.ResourceUpdater + resourceUpdater *updater.ResourceUpdater resourceSubscriber *ResourceSubscriber } @@ -44,49 +43,6 @@ func createStore(ctx context.Context, config storeConfig.Config, fileWatcher *fs return nil, fmt.Errorf("invalid store use('%v')", config.Use) } -func newStore(ctx context.Context, config StorageConfig, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (store.Store, func(), error) { - var fl fn.FuncList - db, err := createStore(ctx, config.Embedded, fileWatcher, logger, tracerProvider) - if err != nil { - fl.Execute() - return nil, nil, err - } - fl.AddFunc(func() { - if errC := db.Close(ctx); errC != nil { - log.Errorf("failed to close mongodb store: %w", errC) - } - }) - if config.CleanUpRecords == "" { - return db, fl.ToFunction(), nil - } - // TODO: do we need a cron job? - s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan - if err != nil { - fl.Execute() - return nil, nil, fmt.Errorf("cannot create cron job: %w", err) - } - _, err = s.NewJob(gocron.CronJob(config.CleanUpRecords, config.ExtendCronParserBySeconds), gocron.NewTask(func() { - // db.CancelExpiredPendingCommands(ctx, time.Now()) - /* - _, errDel := db.DeleteNonDeviceExpiredRecords(ctx, time.Now()) - if errDel != nil && !errors.Is(errDel, store.ErrNotSupported) { - log.Errorf("failed to delete expired signing records: %w", errDel) - } - */ - })) - if err != nil { - fl.Execute() - return nil, nil, fmt.Errorf("cannot create cron job: %w", err) - } - fl.AddFunc(func() { - if errS := s.Shutdown(); errS != nil { - log.Errorf("failed to shutdown cron job: %w", errS) - } - }) - s.Start() - return db, fl.ToFunction(), nil -} - func newHttpService(ctx context.Context, config HTTPConfig, validatorConfig validator.Config, tlsConfig certManagerServer.Config, ss *grpcService.SnippetServiceServer, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*httpService.Service, func(), error) { httpValidator, err := validator.New(ctx, validatorConfig, fileWatcher, logger, tracerProvider) if err != nil { @@ -129,14 +85,18 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg closerFn.AddFunc(otelClient.Close) tracerProvider := otelClient.GetTracerProvider() - dbStorage, closeStore, err := newStore(ctx, config.Clients.Storage, fileWatcher, logger, tracerProvider) + db, err := createStore(ctx, config.Clients.Storage, fileWatcher, logger, tracerProvider) if err != nil { closerFn.Execute() return nil, fmt.Errorf("cannot create store: %w", err) } - closerFn.AddFunc(closeStore) + closerFn.AddFunc(func() { + if errC := db.Close(ctx); errC != nil { + log.Errorf("failed to close store: %w", errC) + } + }) - resourceUpdater, err := store.NewResourceUpdater(ctx, config.Clients.ResourceAggregate.Connection, config.Clients.ResourceAggregate.PendingCommandsCheckInterval, dbStorage, fileWatcher, logger, tracerProvider) + resourceUpdater, err := updater.NewResourceUpdater(ctx, config.Clients.ResourceUpdater, db, fileWatcher, logger, tracerProvider) if err != nil { closerFn.Execute() return nil, fmt.Errorf("cannot create resource change handler: %w", err) @@ -160,7 +120,7 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg } }) - snippetService := grpcService.NewSnippetServiceServer(dbStorage, resourceUpdater, config.APIs.GRPC.Authorization.OwnerClaim, config.HubID, logger) + snippetService := grpcService.NewSnippetServiceServer(db, resourceUpdater, config.APIs.GRPC.Authorization.OwnerClaim, config.HubID, logger) grpcService, grpcServiceClose, err := newGrpcService(ctx, config.APIs.GRPC, snippetService, fileWatcher, logger, tracerProvider) if err != nil { @@ -192,3 +152,8 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg func (s *Service) SnippetServiceServer() *grpcService.SnippetServiceServer { return s.snippetService } + +func (s *Service) SetToken(token string) { + // TODO: remove once m2m-token service is integrated + s.resourceUpdater.SetToken(token) +} diff --git a/snippet-service/service/service_test.go b/snippet-service/service/service_test.go index 4cdfc2023..a654cdaff 100644 --- a/snippet-service/service/service_test.go +++ b/snippet-service/service/service_test.go @@ -39,11 +39,11 @@ import ( natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" "github.com/plgd-dev/hub/v2/snippet-service/pb" "github.com/plgd-dev/hub/v2/snippet-service/service" - "github.com/plgd-dev/hub/v2/snippet-service/store" storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" storeCqlDB "github.com/plgd-dev/hub/v2/snippet-service/store/cqldb" storeMongo "github.com/plgd-dev/hub/v2/snippet-service/store/mongodb" "github.com/plgd-dev/hub/v2/snippet-service/test" + "github.com/plgd-dev/hub/v2/snippet-service/updater" hubTest "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" oauthService "github.com/plgd-dev/hub/v2/test/oauth-server/service" @@ -88,10 +88,8 @@ func TestServiceNew(t *testing.T) { name: "invalid DB", cfg: service.Config{ Clients: service.ClientsConfig{ - Storage: service.StorageConfig{ - Embedded: storeConfig.Config{ - Use: "invalid", - }, + Storage: storeConfig.Config{ + Use: "invalid", }, }, }, @@ -101,13 +99,11 @@ func TestServiceNew(t *testing.T) { name: "invalid mongoDB config", cfg: service.Config{ Clients: service.ClientsConfig{ - Storage: service.StorageConfig{ - Embedded: storeConfig.Config{ - Use: database.MongoDB, - MongoDB: &storeMongo.Config{ - Mongo: mongodb.Config{ - URI: "invalid", - }, + Storage: storeConfig.Config{ + Use: database.MongoDB, + MongoDB: &storeMongo.Config{ + Mongo: mongodb.Config{ + URI: "invalid", }, }, }, @@ -119,25 +115,14 @@ func TestServiceNew(t *testing.T) { name: "invalid cqlDB config", cfg: service.Config{ Clients: service.ClientsConfig{ - Storage: service.StorageConfig{ - Embedded: storeConfig.Config{ - Use: database.CqlDB, - CqlDB: &storeCqlDB.Config{}, - }, + Storage: storeConfig.Config{ + Use: database.CqlDB, + CqlDB: &storeCqlDB.Config{}, }, }, }, wantErr: true, }, - { - name: "invalid CronJob config", - cfg: func() service.Config { - cfg := test.MakeConfig(t) - cfg.Clients.Storage.CleanUpRecords = "invalid" - return cfg - }(), - wantErr: true, - }, { name: "invalid resource subscriber config", cfg: func() service.Config { @@ -151,7 +136,7 @@ func TestServiceNew(t *testing.T) { name: "invalid resource aggregate client config", cfg: func() service.Config { cfg := test.MakeConfig(t) - cfg.Clients.ResourceAggregate = service.ResourceAggregateConfig{} + cfg.Clients.ResourceUpdater = updater.ResourceUpdaterConfig{} return cfg }(), wantErr: true, @@ -237,7 +222,7 @@ func TestService(t *testing.T) { defer tearDown() snippetCfg := test.MakeConfig(t) - snippetCfg.Clients.ResourceAggregate.PendingCommandsCheckInterval = time.Millisecond * 500 + snippetCfg.Clients.ResourceUpdater.PendingCommandsCheckInterval = time.Millisecond * 500 _, shutdownSnippetService := test.New(t, snippetCfg) defer shutdownSnippetService() @@ -448,7 +433,7 @@ func TestService(t *testing.T) { if appliedConf1Status != pb.AppliedConfiguration_Resource_TIMEOUT && appliedConf1Status != pb.AppliedConfiguration_Resource_DONE { // -> wait enough time to timeout pending commands - time.Sleep(2 * snippetCfg.Clients.ResourceAggregate.PendingCommandsCheckInterval) + time.Sleep(2 * snippetCfg.Clients.ResourceUpdater.PendingCommandsCheckInterval) } var got map[interface{}]interface{} @@ -523,94 +508,3 @@ func TestService(t *testing.T) { }, nil) require.NoError(t, err) } - -func TestServiceCleanUp(t *testing.T) { - deviceID := hubTest.MustFindDeviceByName(hubTest.TestDeviceName) - ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT) - defer cancel() - - tearDown := hubTestService.SetUp(ctx, t) - defer tearDown() - - needsShutdown := true - snippetCfg := test.MakeConfig(t) - _, shutdownSnippetService := test.New(t, snippetCfg) - defer func() { - if needsShutdown { - needsShutdown = false - shutdownSnippetService() - } - }() - - snippetClientConn, err := grpc.NewClient(config.SNIPPET_SERVICE_HOST, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ - RootCAs: hubTest.GetRootCertificatePool(t), - }))) - require.NoError(t, err) - defer func() { - _ = snippetClientConn.Close() - }() - snippetClient := pb.NewSnippetServiceClient(snippetClientConn) - - ctx = pkgGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - - grpcClient := grpcgwTest.NewTestClient(t) - defer func() { - err = grpcClient.Close() - require.NoError(t, err) - }() - _, shutdownDevSim := hubTest.OnboardDevSim(ctx, t, grpcClient.GrpcGatewayClient(), deviceID, config.ACTIVE_COAP_SCHEME+"://"+config.COAP_GW_HOST, hubTest.GetAllBackendResourceLinks()) - defer shutdownDevSim() - - notExistingResourceHref := "/not/existing" - // configuration - // -> /not/existing -> { value: 42 } - conf, err := snippetClient.CreateConfiguration(ctx, &pb.Configuration{ - Name: "update", - Owner: oauthService.DeviceUserID, - Resources: []*pb.Configuration_Resource{ - { - Href: notExistingResourceHref, - Content: &commands.Content{ - ContentType: message.AppOcfCbor.String(), - Data: hubTest.EncodeToCbor(t, map[string]interface{}{ - "value": 42, - }), - }, - TimeToLive: int64(5 * time.Minute), // will not timeout during test - }, - }, - }) - require.NoError(t, err) - require.NotEmpty(t, conf.GetId()) - - // invoke configuration with long TimeToLive - resp, err := snippetClient.InvokeConfiguration(ctx, &pb.InvokeConfigurationRequest{ - ConfigurationId: conf.GetId(), - DeviceId: deviceID, - }) - require.NoError(t, err) - - // stop service - shutdownSnippetService() - needsShutdown = false - - // check that all configurations are either in timeout or done state - s, cleanUpStore := test.NewStore(t) - defer cleanUpStore() - - appliedConfs := make(map[string]*pb.AppliedConfiguration) - err = s.GetAppliedConfigurations(ctx, oauthService.DeviceUserID, &pb.GetAppliedConfigurationsRequest{ - IdFilter: []string{resp.GetAppliedConfigurationId()}, - }, func(appliedConf *store.AppliedConfiguration) error { - appliedConfs[appliedConf.GetId()] = appliedConf.GetAppliedConfiguration().Clone() - return nil - }) - require.NoError(t, err) - require.Len(t, appliedConfs, 1) - appliedConf, ok := appliedConfs[resp.GetAppliedConfigurationId()] - require.True(t, ok) - for _, r := range appliedConf.GetResources() { - status := r.GetStatus() - require.True(t, pb.AppliedConfiguration_Resource_TIMEOUT == status || pb.AppliedConfiguration_Resource_DONE == status) - } -} diff --git a/snippet-service/store/cqldb/appliedConfiguration.go b/snippet-service/store/cqldb/appliedConfiguration.go index 7063b8354..b3e38cb71 100644 --- a/snippet-service/store/cqldb/appliedConfiguration.go +++ b/snippet-service/store/cqldb/appliedConfiguration.go @@ -30,3 +30,7 @@ func (s *Store) UpdateAppliedConfiguration(context.Context, *pb.AppliedConfigura func (s *Store) UpdateAppliedConfigurationResource(context.Context, string, store.UpdateAppliedConfigurationResourceRequest) (*pb.AppliedConfiguration, error) { return nil, store.ErrNotSupported } + +func (s *Store) GetExpiredAppliedConfigurationResourceUpdates(context.Context, store.ProccessAppliedConfigurations) (int64, error) { + return 0, store.ErrNotSupported +} diff --git a/snippet-service/store/mongodb/appliedConfiguration.go b/snippet-service/store/mongodb/appliedConfiguration.go index 410792d9e..2a18b10ad 100644 --- a/snippet-service/store/mongodb/appliedConfiguration.go +++ b/snippet-service/store/mongodb/appliedConfiguration.go @@ -85,11 +85,11 @@ func (s *Store) UpdateAppliedConfiguration(ctx context.Context, adc *pb.AppliedC if result.Err() != nil { return nil, result.Err() } - updatedAdc := pb.AppliedConfiguration{} + updatedAdc := store.AppliedConfiguration{} if err = result.Decode(&updatedAdc); err != nil { return nil, err } - return &updatedAdc, nil + return updatedAdc.GetAppliedConfiguration(), nil } func toAppliedDeviceConfigurationsVersionFilter(idKey, versionsKey string, vf pb.VersionFilter) interface{} { @@ -293,3 +293,40 @@ func (s *Store) UpdateAppliedConfigurationResource(ctx context.Context, owner st return updatedAppliedCfg.GetAppliedConfiguration(), nil } + +func (s *Store) GetExpiredAppliedConfigurationResourceUpdates(ctx context.Context, p store.ProccessAppliedConfigurations) (int64, error) { + validUntil := time.Now().UnixNano() + pl := mongo.Pipeline{ + // match resources that have a resource in pending state and expired + bson.D{{Key: mongodb.Match, Value: bson.M{ + store.ResourcesKey: bson.M{ + "$elemMatch": bson.M{ + store.StatusKey: pb.AppliedConfiguration_Resource_PENDING.String(), + store.ValidUntil: bson.M{"$lte": validUntil}, + }, + }, + }}}, + } + // project only the resources that are expired + pl = append(pl, bson.D{{Key: mongodb.Set, Value: bson.M{ + store.ResourcesKey: bson.M{ + "$filter": bson.M{ + "input": "$" + store.ResourcesKey, + "as": "elem", + "cond": bson.M{ + mongodb.And: bson.A{ + bson.M{"$eq": bson.A{"$$elem." + store.StatusKey, pb.AppliedConfiguration_Resource_PENDING.String()}}, + bson.M{"$gt": bson.A{"$$elem." + store.ValidUntil, 0}}, + bson.M{"$lte": bson.A{"$$elem." + store.ValidUntil, validUntil}}, + }, + }, + }, + }, + }}}) + + cur, err := s.Collection(appliedConfigurationsCol).Aggregate(ctx, pl) + if err != nil { + return 0, err + } + return validUntil, processCursor(ctx, cur, p) +} diff --git a/snippet-service/store/mongodb/getAppliedConfigurations_test.go b/snippet-service/store/mongodb/getAppliedConfigurations_test.go index 7e0369db3..f1c187257 100644 --- a/snippet-service/store/mongodb/getAppliedConfigurations_test.go +++ b/snippet-service/store/mongodb/getAppliedConfigurations_test.go @@ -403,3 +403,38 @@ func TestStoreGetAppliedConfigurations(t *testing.T) { }) } } + +func TestGetExpiredAppliedConfigurationResourceUpdates(t *testing.T) { + s, cleanUpStore := test.NewMongoStore(t) + defer cleanUpStore() + + ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT) + defer cancel() + stored := test.AddAppliedConfigurationsToStore(ctx, t, s) + + got := make(map[string]*pb.AppliedConfiguration) + validUntil, err := s.GetExpiredAppliedConfigurationResourceUpdates(ctx, func(ac *store.AppliedConfiguration) error { + got[ac.GetId()] = ac.GetAppliedConfiguration().Clone() + return nil + }) + require.NoError(t, err) + + expiredStored := make(map[string]*pb.AppliedConfiguration) + for _, ac := range stored { + var resources []*pb.AppliedConfiguration_Resource + for _, r := range ac.GetResources() { + if r.GetStatus() == pb.AppliedConfiguration_Resource_PENDING && + r.GetValidUntil() > 0 && r.GetValidUntil() <= validUntil { + resources = append(resources, r) + } + } + if len(resources) > 0 { + newAc := ac.Clone() + newAc.Resources = resources + expiredStored[ac.GetId()] = newAc + } + } + + require.Len(t, got, len(expiredStored)) + test.CmpAppliedDeviceConfigurationsMaps(t, expiredStored, got, false) +} diff --git a/snippet-service/store/store.go b/snippet-service/store/store.go index 114520dc6..f227ece1b 100644 --- a/snippet-service/store/store.go +++ b/snippet-service/store/store.go @@ -25,6 +25,7 @@ const ( TimestampKey = "timestamp" // must match with Timestamp field tag StatusKey = "status" // must match with Status field tag HrefKey = "href" // must match with Href field tag + ValidUntil = "validUntil" // must match with ValidUntil field tag ApiAccessTokenKey = "apiAccessToken" // must match with Condition.ApiAccessToken field tag DeviceIDFilterKey = "deviceIdFilter" // must match with Condition.DeviceIdFilter tag @@ -132,6 +133,8 @@ type Store interface { UpdateAppliedConfiguration(ctx context.Context, conf *pb.AppliedConfiguration) (*pb.AppliedConfiguration, error) // UpdateAppliedConfigurationResource updates an existing applied device configuration resource in the database. UpdateAppliedConfigurationResource(ctx context.Context, owner string, query UpdateAppliedConfigurationResourceRequest) (*pb.AppliedConfiguration, error) + // GetExpiredAppliedConfigurationResourceUpdates loads applied device configuration with expired (validUntil <= now) resource updates from the database. + GetExpiredAppliedConfigurationResourceUpdates(ctx context.Context, p ProccessAppliedConfigurations) (int64, error) Close(ctx context.Context) error } diff --git a/snippet-service/test/appliedConfiguration.go b/snippet-service/test/appliedConfiguration.go index b843eaebd..1a6ab4ad1 100644 --- a/snippet-service/test/appliedConfiguration.go +++ b/snippet-service/test/appliedConfiguration.go @@ -46,6 +46,9 @@ func AppliedConfigurationResource(t *testing.T, deviceID string, start, n int) [ CorrelationId: correlationID, Status: pb.AppliedConfiguration_Resource_Status(1 + i%4), } + if resource.GetStatus() == pb.AppliedConfiguration_Resource_PENDING { + resource.ValidUntil = time.Now().Add(time.Minute * -3).Add(time.Minute * time.Duration(i)).UnixNano() + } if resource.GetStatus() == pb.AppliedConfiguration_Resource_DONE { resource.ResourceUpdated = pbTest.MakeResourceUpdated(t, deviceID, diff --git a/snippet-service/test/service.go b/snippet-service/test/service.go index 9542b327a..9be85839b 100644 --- a/snippet-service/test/service.go +++ b/snippet-service/test/service.go @@ -14,6 +14,7 @@ import ( storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" storeCqlDB "github.com/plgd-dev/hub/v2/snippet-service/store/cqldb" storeMongo "github.com/plgd-dev/hub/v2/snippet-service/store/mongodb" + "github.com/plgd-dev/hub/v2/snippet-service/updater" "github.com/plgd-dev/hub/v2/test/config" httpTest "github.com/plgd-dev/hub/v2/test/http" "github.com/stretchr/testify/require" @@ -40,41 +41,44 @@ func MakeAPIsConfig() service.APIsConfig { } } +func MakeResourceUpdaterConfig() updater.ResourceUpdaterConfig { + return updater.ResourceUpdaterConfig{ + Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST), + PendingCommandsCheckInterval: time.Second * 10, + CleanUpExpiredUpdates: "0 * * * *", + ExtendCronParserBySeconds: false, + } +} + func MakeClientsConfig() service.ClientsConfig { return service.ClientsConfig{ - Storage: MakeStorageConfig(), + Storage: MakeStoreConfig(), OpenTelemetryCollector: config.MakeOpenTelemetryCollectorClient(), EventBus: service.EventBusConfig{ NATS: config.MakeSubscriberConfig(), }, - ResourceAggregate: service.ResourceAggregateConfig{ - Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST), - PendingCommandsCheckInterval: time.Second * 10, - }, + ResourceUpdater: MakeResourceUpdaterConfig(), } } -func MakeStorageConfig() service.StorageConfig { - return service.StorageConfig{ - CleanUpRecords: "0 1 * * *", - Embedded: storeConfig.Config{ - // TODO: add cqldb support - // Use: config.ACTIVE_DATABASE(), - Use: database.MongoDB, - MongoDB: &storeMongo.Config{ - Mongo: mongodb.Config{ - MaxPoolSize: 16, - MaxConnIdleTime: time.Minute * 4, - URI: config.MONGODB_URI, - Database: "snippetService", - TLS: config.MakeTLSClientConfig(), - }, - }, - CqlDB: &storeCqlDB.Config{ - Embedded: config.MakeCqlDBConfig(), - Table: "snippets", +func MakeStoreConfig() storeConfig.Config { + return storeConfig.Config{ + // TODO: add cqldb support + // Use: config.ACTIVE_DATABASE(), + Use: database.MongoDB, + MongoDB: &storeMongo.Config{ + Mongo: mongodb.Config{ + MaxPoolSize: 16, + MaxConnIdleTime: time.Minute * 4, + URI: config.MONGODB_URI, + Database: "snippetService", + TLS: config.MakeTLSClientConfig(), }, }, + CqlDB: &storeCqlDB.Config{ + Embedded: config.MakeCqlDBConfig(), + Table: "snippets", + }, } } @@ -124,7 +128,7 @@ func New(t require.TestingT, cfg service.Config) (*service.Service, func()) { func NewStore(t require.TestingT) (store.Store, func()) { cfg := MakeConfig(t) - if cfg.Clients.Storage.Embedded.Use == database.CqlDB { + if cfg.Clients.Storage.Use == database.CqlDB { return nil, nil } return NewMongoStore(t) @@ -138,7 +142,7 @@ func NewMongoStore(t require.TestingT) (*storeMongo.Store, func()) { require.NoError(t, err) ctx := context.Background() - store, err := storeMongo.New(ctx, cfg.Clients.Storage.Embedded.MongoDB, fileWatcher, logger, noop.NewTracerProvider()) + store, err := storeMongo.New(ctx, cfg.Clients.Storage.MongoDB, fileWatcher, logger, noop.NewTracerProvider()) require.NoError(t, err) cleanUp := func() { diff --git a/snippet-service/updater/config.go b/snippet-service/updater/config.go new file mode 100644 index 000000000..080634506 --- /dev/null +++ b/snippet-service/updater/config.go @@ -0,0 +1,46 @@ +package updater + +import ( + "fmt" + "time" + + "github.com/go-co-op/gocron/v2" + "github.com/plgd-dev/hub/v2/pkg/log" + grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client" +) + +type ResourceUpdaterConfig struct { + Connection grpcClient.Config `yaml:"grpc" json:"grpc"` + PendingCommandsCheckInterval time.Duration `yaml:"pendingCommandsCheckInterval" json:"pendingCommandsCheckInterval"` + CleanUpExpiredUpdates string `yaml:"cleanUpExpiredUpdates" json:"cleanUpExpiredUpdates"` + ExtendCronParserBySeconds bool `yaml:"-" json:"-"` +} + +func (c *ResourceUpdaterConfig) Validate() error { + if err := c.Connection.Validate(); err != nil { + return fmt.Errorf("grpc.%w", err) + } + if c.PendingCommandsCheckInterval <= 0 { + return fmt.Errorf("pendingCommandsCheckInterval('%v') - must be greater than 0", c.PendingCommandsCheckInterval) + } + if c.CleanUpExpiredUpdates == "" { + return nil + } + s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan + if err != nil { + return fmt.Errorf("cannot create cron job: %w", err) + } + defer func() { + if errS := s.Shutdown(); errS != nil { + log.Errorf("failed to shutdown cron job: %w", errS) + } + }() + _, err = s.NewJob(gocron.CronJob(c.CleanUpExpiredUpdates, c.ExtendCronParserBySeconds), + gocron.NewTask(func() { + // do nothing + })) + if err != nil { + return fmt.Errorf("cleanUpExpiredUpdates('%v') - %w", c.CleanUpExpiredUpdates, err) + } + return nil +} diff --git a/snippet-service/updater/config_test.go b/snippet-service/updater/config_test.go new file mode 100644 index 000000000..3cabd00e8 --- /dev/null +++ b/snippet-service/updater/config_test.go @@ -0,0 +1,73 @@ +package updater_test + +import ( + "testing" + "time" + + "github.com/plgd-dev/hub/v2/snippet-service/test" + "github.com/plgd-dev/hub/v2/snippet-service/updater" + "github.com/plgd-dev/hub/v2/test/config" + "github.com/stretchr/testify/require" +) + +func TestResourceAggregateConfig(t *testing.T) { + tests := []struct { + name string + cfg updater.ResourceUpdaterConfig + wantErr bool + }{ + { + name: "valid", + cfg: test.MakeResourceUpdaterConfig(), + }, + { + name: "valid - no cron", + cfg: func() updater.ResourceUpdaterConfig { + cfg := test.MakeResourceUpdaterConfig() + cfg.CleanUpExpiredUpdates = "" + return cfg + }(), + }, + { + name: "invalid - no connection", + cfg: func() updater.ResourceUpdaterConfig { + cfg := updater.ResourceUpdaterConfig{ + PendingCommandsCheckInterval: time.Second * 10, + } + return cfg + }(), + wantErr: true, + }, + { + name: "invalid - bad cron expression", + cfg: func() updater.ResourceUpdaterConfig { + cfg := test.MakeResourceUpdaterConfig() + cfg.CleanUpExpiredUpdates = "bad" + return cfg + }(), + wantErr: true, + }, + { + name: "invalid - pending commands check interval", + cfg: func() updater.ResourceUpdaterConfig { + cfg := updater.ResourceUpdaterConfig{ + Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST), + PendingCommandsCheckInterval: -1, + } + return cfg + }(), + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cfg.Validate() + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} diff --git a/snippet-service/updater/expiredUpdates.go b/snippet-service/updater/expiredUpdates.go new file mode 100644 index 000000000..9cbcdd347 --- /dev/null +++ b/snippet-service/updater/expiredUpdates.go @@ -0,0 +1,26 @@ +package updater + +import ( + "fmt" + "time" + + "github.com/go-co-op/gocron/v2" +) + +func NewExpiredUpdatesChecker(cleanUpExpiredUpdates string, withSeconds bool, updater *ResourceUpdater) (gocron.Scheduler, error) { + if cleanUpExpiredUpdates == "" { + return nil, nil + } + s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan + if err != nil { + return nil, fmt.Errorf("cannot create cron job: %w", err) + } + _, err = s.NewJob(gocron.CronJob(cleanUpExpiredUpdates, withSeconds), gocron.NewTask(func() { + updater.TimeoutPendingResourceUpdates() + })) + if err != nil { + return nil, fmt.Errorf("cannot create cron job: %w", err) + } + s.Start() + return s, nil +} diff --git a/snippet-service/store/resourceUpdater.go b/snippet-service/updater/resourceUpdater.go similarity index 76% rename from snippet-service/store/resourceUpdater.go rename to snippet-service/updater/resourceUpdater.go index 13e172b6a..37d0a98ab 100644 --- a/snippet-service/store/resourceUpdater.go +++ b/snippet-service/updater/resourceUpdater.go @@ -1,4 +1,4 @@ -package store +package updater import ( "cmp" @@ -9,78 +9,66 @@ import ( "strings" "time" + "github.com/go-co-op/gocron/v2" "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/plgd-dev/go-coap/v3/message" - "github.com/plgd-dev/go-coap/v3/pkg/cache" - "github.com/plgd-dev/go-coap/v3/pkg/runner/periodic" "github.com/plgd-dev/hub/v2/pkg/fsnotify" "github.com/plgd-dev/hub/v2/pkg/log" pkgGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client" - pkgTime "github.com/plgd-dev/hub/v2/pkg/time" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus" "github.com/plgd-dev/hub/v2/resource-aggregate/events" raService "github.com/plgd-dev/hub/v2/resource-aggregate/service" + "github.com/plgd-dev/hub/v2/snippet-service/jq" "github.com/plgd-dev/hub/v2/snippet-service/pb" + "github.com/plgd-dev/hub/v2/snippet-service/store" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" ) -type pendingConfiguration struct { - id string - configurationID string - owner string - correlationID string - token string - resourceID *commands.ResourceId -} - type ResourceUpdater struct { - ctx context.Context - storage Store - raConn *grpcClient.Client - raClient raService.ResourceAggregateClient - pendingConfigurations *cache.Cache[string, *pendingConfiguration] - logger log.Logger -} + ctx context.Context + storage store.Store + raConn *grpcClient.Client + raClient raService.ResourceAggregateClient + scheduler gocron.Scheduler + logger log.Logger -func newPendingConfigurationsCache(ctx context.Context, interval time.Duration) *cache.Cache[string, *pendingConfiguration] { - c := cache.NewCache[string, *pendingConfiguration]() - add := periodic.New(ctx.Done(), interval) - add(func(now time.Time) bool { - c.CheckExpirations(now) - return true - }) - return c + token string } -func NewResourceUpdater(ctx context.Context, config grpcClient.Config, pendingCommandsCheckInterval time.Duration, storage Store, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*ResourceUpdater, error) { - raConn, err := grpcClient.New(config, fileWatcher, logger, tracerProvider) +func NewResourceUpdater(ctx context.Context, config ResourceUpdaterConfig, storage store.Store, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*ResourceUpdater, error) { + raConn, err := grpcClient.New(config.Connection, fileWatcher, logger, tracerProvider) if err != nil { return nil, fmt.Errorf("cannot connect to resource aggregate: %w", err) } - return &ResourceUpdater{ - ctx: ctx, - storage: storage, - raConn: raConn, - raClient: raService.NewResourceAggregateClient(raConn.GRPC()), - pendingConfigurations: newPendingConfigurationsCache(ctx, pendingCommandsCheckInterval), - logger: logger, - }, nil + ru := &ResourceUpdater{ + ctx: ctx, + storage: storage, + raConn: raConn, + raClient: raService.NewResourceAggregateClient(raConn.GRPC()), + logger: logger, + } + scheduler, err := NewExpiredUpdatesChecker(config.CleanUpExpiredUpdates, config.ExtendCronParserBySeconds, ru) + if err != nil { + return nil, fmt.Errorf("cannot create scheduler: %w", err) + } + ru.scheduler = scheduler + return ru, nil } type evaluateCondition = func(condition *pb.Condition) bool func (h *ResourceUpdater) getConditions(ctx context.Context, owner, deviceID, resourceHref string, resourceTypes []string, eval evaluateCondition) ([]*pb.Condition, error) { conditions := make([]*pb.Condition, 0, 4) - err := h.storage.GetLatestEnabledConditions(ctx, owner, &GetLatestConditionsQuery{ + err := h.storage.GetLatestEnabledConditions(ctx, owner, &store.GetLatestConditionsQuery{ DeviceID: deviceID, ResourceHref: resourceHref, ResourceTypeFilter: resourceTypes, - }, func(v *Condition) error { + }, func(v *store.Condition) error { c, errG := v.GetLatest() if errG != nil { return fmt.Errorf("cannot get condition: %w", errG) @@ -98,70 +86,6 @@ func (h *ResourceUpdater) getConditions(ctx context.Context, owner, deviceID, re return conditions, nil } -type executeByType int - -const ( - executeByTypeFindCondition executeByType = iota - executeByTypeCondition - executeByTypeOnDemand -) - -type appliedCondition struct { - id string - version uint64 - token string -} - -type appliedOnDemand struct { - token string -} - -type execution struct { - condition appliedCondition // executedBy = executeByTypeCondition - conditions []*pb.Condition // executedBy = executeByTypeFindCondition - onDemand appliedOnDemand // executedBy = executeByTypeOnDemand - force bool - executeBy executeByType -} - -func (e execution) token() string { - if e.executeBy == executeByTypeOnDemand { - return e.onDemand.token - } - if e.executeBy == executeByTypeCondition { - return e.condition.token - } - return "" -} - -func (e execution) setExecutedBy(ac *pb.AppliedConfiguration) { - if e.executeBy == executeByTypeOnDemand { - ac.ExecutedBy = pb.MakeExecutedByOnDemand() - return - } - if e.condition.id != "" { - ac.ExecutedBy = pb.MakeExecutedByConditionId(e.condition.id, e.condition.version) - return - } - firstCondition := e.conditions[0] - ac.ExecutedBy = pb.MakeExecutedByConditionId(firstCondition.GetId(), firstCondition.GetVersion()) -} - -type executionResult struct { - validUntil int64 - condition appliedCondition // executedBy = executeByTypeCondition or executeByTypeFindCondition - onDemand appliedOnDemand // executedBy = executeByTypeOnDemand - executedBy executeByType - err error -} - -func (er executionResult) token() string { - if er.executedBy == executeByTypeOnDemand { - return er.onDemand.token - } - return er.condition.token -} - func (h *ResourceUpdater) applyExecution(ctx context.Context, execution execution, resourceID *commands.ResourceId, configurationID, correlationID string, cr *pb.Configuration_Resource) executionResult { if execution.executeBy == executeByTypeOnDemand { validUntil, err := h.applyConfigurationToResource(ctx, resourceID, configurationID, correlationID, cr, execution.onDemand.token) @@ -236,7 +160,7 @@ func (h *ResourceUpdater) getConfigurationsByConditions(ctx context.Context, own configurations := make([]*pb.Configuration, 0, 4) err := h.storage.GetConfigurations(ctx, owner, &pb.GetConfigurationsRequest{ IdFilter: idFilter, - }, func(v *Configuration) error { + }, func(v *store.Configuration) error { c, errG := v.GetLatest() if errG != nil { return fmt.Errorf("cannot get configuration: %w", errG) @@ -338,26 +262,6 @@ func (h *ResourceUpdater) findTokenAndApplyConfigurationToResource(ctx context.C return 0, appliedCondition{}, errors.New("cannot apply configuration: no valid token found") } -func (h *ResourceUpdater) timeoutAppliedConfigurationPendingResource(ctx context.Context, pd *pendingConfiguration) error { - _, err := h.storage.UpdateAppliedConfigurationResource(ctx, pd.owner, UpdateAppliedConfigurationResourceRequest{ - AppliedConfigurationID: pd.id, - StatusFilter: []pb.AppliedConfiguration_Resource_Status{pb.AppliedConfiguration_Resource_PENDING}, - Resource: &pb.AppliedConfiguration_Resource{ - Href: pd.resourceID.GetHref(), - CorrelationId: pd.correlationID, - Status: pb.AppliedConfiguration_Resource_TIMEOUT, - ResourceUpdated: &events.ResourceUpdated{ - ResourceId: &commands.ResourceId{ - DeviceId: pd.resourceID.GetDeviceId(), - Href: pd.resourceID.GetHref(), - }, - Status: commands.Status_ERROR, - }, - }, - }) - return err -} - func resourceCorrelationID(ids ...string) string { cID := "" for _, id := range ids { @@ -423,8 +327,8 @@ func makeResourceUpdatedWithError(confID, owner, correlationID string, resourceI } } -func getUpdateAppliedConfigurationResourceRequest(appliedConfID, confID, owner, correlationID string, resourceID *commands.ResourceId, execution executionResult, setExecutionConditionID bool) UpdateAppliedConfigurationResourceRequest { - update := UpdateAppliedConfigurationResourceRequest{ +func getUpdateAppliedConfigurationResourceRequest(appliedConfID, confID, owner, correlationID string, resourceID *commands.ResourceId, execution executionResult, setExecutionConditionID bool) store.UpdateAppliedConfigurationResourceRequest { + update := store.UpdateAppliedConfigurationResourceRequest{ AppliedConfigurationID: appliedConfID, StatusFilter: []pb.AppliedConfiguration_Resource_Status{pb.AppliedConfiguration_Resource_QUEUED}, Resource: &pb.AppliedConfiguration_Resource{ @@ -480,21 +384,6 @@ func (h *ResourceUpdater) cancelPendingResourceUpdates(appliedConf *pb.AppliedCo } } -func (h *ResourceUpdater) scheduleAppliedConfigurationPendingResourceTimeout(pd *pendingConfiguration, validUntil int64) { - h.logger.Debugf("timeout(%v) for pending resource(%v) update scheduled for %v", pd.correlationID, pd.resourceID.GetHref(), time.Unix(0, validUntil)) - h.pendingConfigurations.Store( - pd.correlationID, cache.NewElement( - pd, - pkgTime.Unix(0, validUntil), - func(d *pendingConfiguration) { - h.logger.Debugf("timeout for pending resource(%v) update reached", d.resourceID.GetHref()) - if errT := h.timeoutAppliedConfigurationPendingResource(h.ctx, d); errT != nil { - h.logger.Errorf("failed to timeout pending applied configuration for resource(%v): %w", d.resourceID.GetHref(), errT) - } - }), - ) -} - func (h *ResourceUpdater) applyConfigurationToResources(ctx context.Context, owner, deviceID, correlationID string, confWithExecution *configurationWithExecution) (*pb.AppliedConfiguration, error) { h.logger.Debugf("applying configuration(id:%v)", confWithExecution.configuration.GetId()) appliedConfID := uuid.NewString() @@ -536,7 +425,7 @@ func (h *ResourceUpdater) applyConfigurationToResources(ctx context.Context, own var err error updatedAppliedConf, err := h.storage.UpdateAppliedConfigurationResource(ctx, owner, update) if err != nil { - if errors.Is(err, ErrNotFound) { // the appliedConfiguration doesnt exists -> it was removed by forced InvokeConfiguration from other thread + if errors.Is(err, store.ErrNotFound) { // the appliedConfiguration doesnt exists -> it was removed by forced InvokeConfiguration from other thread ctxWithToken := ctx if exRes.token() != "" { ctxWithToken = pkgGrpc.CtxWithToken(ctx, exRes.token()) @@ -550,16 +439,6 @@ func (h *ResourceUpdater) applyConfigurationToResources(ctx context.Context, own continue } appliedConf = updatedAppliedConf - if exRes.validUntil > 0 { - h.scheduleAppliedConfigurationPendingResourceTimeout(&pendingConfiguration{ - id: appliedConf.GetId(), - configurationID: confID, - owner: owner, - correlationID: resCorrelationID, - token: exRes.token(), - resourceID: resourceID, - }, exRes.validUntil) - } } return appliedConf, errs.ErrorOrNil() } @@ -591,11 +470,11 @@ func (h *ResourceUpdater) applyConfigurationsByConditions(ctx context.Context, r } eval := func(condition *pb.Condition) bool { - jq := condition.GetJqExpressionFilter() - if jq == "" { + jqe := condition.GetJqExpressionFilter() + if jqe == "" { return true } - ok, errE := EvalJQCondition(jq, rcData) + ok, errE := jq.EvalJQCondition(jqe, rcData) if errE != nil { h.logger.Error(errE) return false @@ -623,7 +502,7 @@ func (h *ResourceUpdater) applyConfigurationsByConditions(ctx context.Context, r continue } _, errA := h.applyConfigurationToResources(ctx, owner, deviceID, "", &c) - if IsDuplicateKeyError(errA) { + if store.IsDuplicateKeyError(errA) { // applied configuration already exists h.logger.Debugf("applied configuration already exists for device(%s) and configuration(%s): %v", deviceID, c.configuration.GetId(), errA) @@ -648,13 +527,9 @@ func (h *ResourceUpdater) finishPendingConfiguration(ctx context.Context, update if !ok || !isValidUUID(appliedConfID) || !isValidUUID(resourcesCorrelationID) { return nil } - h.logger.Debugf("finishing pending configuration(%v) for resource(%v:%v): %v", appliedConfID, updated.GetResourceId().GetDeviceId(), updated.GetResourceId().GetHref(), updated) - pc, ok := h.pendingConfigurations.LoadAndDelete(correlationID) - if ok { - h.logger.Debugf("pending configuration(%v) for resource(%v:%v) update expiration handler removed", pc.Data().id, updated.GetResourceId().GetDeviceId(), updated.GetResourceId().GetHref()) - } + h.logger.Debugf("finishing pending configuration(%v) update for resource(%v:%v): %v", appliedConfID, updated.GetResourceId().GetDeviceId(), updated.GetResourceId().GetHref(), updated) owner := updated.GetAuditContext().GetOwner() - _, err := h.storage.UpdateAppliedConfigurationResource(ctx, owner, UpdateAppliedConfigurationResourceRequest{ + _, err := h.storage.UpdateAppliedConfigurationResource(ctx, owner, store.UpdateAppliedConfigurationResourceRequest{ AppliedConfigurationID: appliedConfID, Resource: &pb.AppliedConfiguration_Resource{ Href: updated.GetResourceId().GetHref(), @@ -663,7 +538,7 @@ func (h *ResourceUpdater) finishPendingConfiguration(ctx context.Context, update ResourceUpdated: updated, }, }) - if updated.GetStatus() == commands.Status_CANCELED && errors.Is(err, ErrNotFound) { + if updated.GetStatus() == commands.Status_CANCELED && errors.Is(err, store.ErrNotFound) { // the pending update was canceled by h.cancelPendingResourceUpdates and the configuration was removed return nil } @@ -686,7 +561,7 @@ func (h *ResourceUpdater) handleResourceUpdated(ctx context.Context, ev eventbus if err := ev.Unmarshal(&updated); err != nil { return fmt.Errorf("cannot unmarshal ResourceUpdated event: %w", err) } - if err := h.finishPendingConfiguration(ctx, &updated); err != nil && !errors.Is(err, ErrNotFound) { + if err := h.finishPendingConfiguration(ctx, &updated); err != nil && !errors.Is(err, store.ErrNotFound) { return fmt.Errorf("failed to finish pending applied configuration for resource(%v): %w", updated.GetResourceId().GetHref(), err) } return nil @@ -734,12 +609,12 @@ func (h *ResourceUpdater) applyConfigurationOnDemand(ctx context.Context, conf * } func (h *ResourceUpdater) InvokeConfiguration(ctx context.Context, token, owner string, req *pb.InvokeConfigurationRequest) (*pb.AppliedConfiguration, error) { - if err := ValidateInvokeConfigurationRequest(req); err != nil { + if err := store.ValidateInvokeConfigurationRequest(req); err != nil { return nil, err } // find configuration var confs []*pb.Configuration - err := h.storage.GetLatestConfigurationsByID(ctx, owner, []string{req.GetConfigurationId()}, func(v *Configuration) error { + err := h.storage.GetLatestConfigurationsByID(ctx, owner, []string{req.GetConfigurationId()}, func(v *store.Configuration) error { c, err := v.GetLatest() if err != nil { return err @@ -760,29 +635,72 @@ func (h *ResourceUpdater) InvokeConfiguration(ctx context.Context, token, owner return appliedConf, nil } -func (h *ResourceUpdater) Close() error { - elements := h.pendingConfigurations.LoadAndDeleteAll() - var errs *multierror.Error - for _, e := range elements { - pc := e.Data() - h.cancelPendingResourceUpdate(pkgGrpc.CtxWithToken(h.ctx, pc.token), pc.resourceID, pc.correlationID, pc.configurationID) - _, err := h.storage.UpdateAppliedConfigurationResource(h.ctx, pc.owner, UpdateAppliedConfigurationResourceRequest{ - AppliedConfigurationID: pc.id, - StatusFilter: []pb.AppliedConfiguration_Resource_Status{pb.AppliedConfiguration_Resource_PENDING}, - Resource: &pb.AppliedConfiguration_Resource{ - Href: pc.resourceID.GetHref(), - CorrelationId: pc.correlationID, - Status: pb.AppliedConfiguration_Resource_DONE, - ResourceUpdated: makeResourceUpdatedWithError(pc.configurationID, pc.owner, pc.correlationID, pc.resourceID, errors.New("pending resource update canceled")), +func (h *ResourceUpdater) timeoutAppliedConfigurationPendingResource(ctx context.Context, owner, appliedConfigurationID, correlationID string, resourceID *commands.ResourceId) { + _, err := h.storage.UpdateAppliedConfigurationResource(ctx, owner, store.UpdateAppliedConfigurationResourceRequest{ + AppliedConfigurationID: appliedConfigurationID, + StatusFilter: []pb.AppliedConfiguration_Resource_Status{pb.AppliedConfiguration_Resource_PENDING}, + Resource: &pb.AppliedConfiguration_Resource{ + Href: resourceID.GetHref(), + CorrelationId: correlationID, + Status: pb.AppliedConfiguration_Resource_TIMEOUT, + ResourceUpdated: &events.ResourceUpdated{ + ResourceId: &commands.ResourceId{ + DeviceId: resourceID.GetDeviceId(), + Href: resourceID.GetHref(), + }, + Status: commands.Status_ERROR, }, - }) - if err != nil { - h.logger.Debugf("failed to update applied configuration resource(%v): %v", pc.resourceID.ToString(), err) - } else { - h.logger.Debugf("applied configuration resource(%v) canceled", pc.resourceID.ToString()) + }, + }) + if err != nil { + h.logger.Errorf("failed to timeout pending applied configuration for resource(%v): %w", resourceID.GetHref(), err) + } +} + +func (h *ResourceUpdater) TimeoutPendingResourceUpdates() { + h.logger.Debug("checking pending resource updates for timeout") + // get expired pending updates from the database + var pendingUpdates []*store.AppliedConfiguration + _, err := h.storage.GetExpiredAppliedConfigurationResourceUpdates(h.ctx, func(ac *store.AppliedConfiguration) error { + pendingUpdates = append(pendingUpdates, ac) + return nil + }) + if err != nil { + h.logger.Errorf("cannot get expired pending resource updates: %v", err) + return + } + if len(pendingUpdates) == 0 { + return + } + + ctx := h.ctx + if h.token != "" { + ctx = pkgGrpc.CtxWithToken(ctx, h.token) + } + // cancel pending updates + for _, ac := range pendingUpdates { + for _, res := range ac.GetResources() { + if res.GetStatus() != pb.AppliedConfiguration_Resource_PENDING { + continue + } + resourceID := &commands.ResourceId{DeviceId: ac.GetDeviceId(), Href: res.GetHref()} + h.logger.Debugf("timeout for pending resource(%v) update reached", resourceID.GetHref()) + h.timeoutAppliedConfigurationPendingResource(ctx, ac.GetOwner(), ac.GetId(), res.GetCorrelationId(), resourceID) } } +} + +func (h *ResourceUpdater) Close() error { + var errs *multierror.Error + if h.scheduler != nil { + err := h.scheduler.Shutdown() + errs = multierror.Append(errs, err) + } err := h.raConn.Close() errs = multierror.Append(errs, err) return errs.ErrorOrNil() } + +func (h *ResourceUpdater) SetToken(token string) { + h.token = token +} diff --git a/snippet-service/updater/resourceUpdater_test.go b/snippet-service/updater/resourceUpdater_test.go new file mode 100644 index 000000000..feca90f3c --- /dev/null +++ b/snippet-service/updater/resourceUpdater_test.go @@ -0,0 +1,112 @@ +package updater_test + +import ( + "context" + "crypto/tls" + "testing" + "time" + + "github.com/plgd-dev/go-coap/v3/message" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" + pkgGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" + "github.com/plgd-dev/hub/v2/resource-aggregate/commands" + "github.com/plgd-dev/hub/v2/snippet-service/pb" + "github.com/plgd-dev/hub/v2/snippet-service/store" + "github.com/plgd-dev/hub/v2/snippet-service/test" + hubTest "github.com/plgd-dev/hub/v2/test" + "github.com/plgd-dev/hub/v2/test/config" + oauthService "github.com/plgd-dev/hub/v2/test/oauth-server/service" + oauthTest "github.com/plgd-dev/hub/v2/test/oauth-server/test" + hubTestService "github.com/plgd-dev/hub/v2/test/service" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +func TestCleanUpExpiredUpdates(t *testing.T) { + deviceID := hubTest.MustFindDeviceByName(hubTest.TestDeviceName) + ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT) + defer cancel() + + tearDown := hubTestService.SetUp(ctx, t) + defer tearDown() + + snippetCfg := test.MakeConfig(t) + snippetCfg.Clients.ResourceUpdater.ExtendCronParserBySeconds = true + snippetCfg.Clients.ResourceUpdater.CleanUpExpiredUpdates = "*/5 * * * * *" + snippetService, shutdownSnippetService := test.New(t, snippetCfg) + defer shutdownSnippetService() + + token := oauthTest.GetDefaultAccessToken(t) + snippetService.SetToken(token) + + snippetClientConn, err := grpc.NewClient(config.SNIPPET_SERVICE_HOST, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ + RootCAs: hubTest.GetRootCertificatePool(t), + }))) + require.NoError(t, err) + defer func() { + _ = snippetClientConn.Close() + }() + snippetClient := pb.NewSnippetServiceClient(snippetClientConn) + + ctx = pkgGrpc.CtxWithToken(ctx, token) + + grpcClient := grpcgwTest.NewTestClient(t) + defer func() { + err = grpcClient.Close() + require.NoError(t, err) + }() + _, shutdownDevSim := hubTest.OnboardDevSim(ctx, t, grpcClient.GrpcGatewayClient(), deviceID, config.ACTIVE_COAP_SCHEME+"://"+config.COAP_GW_HOST, hubTest.GetAllBackendResourceLinks()) + defer shutdownDevSim() + + notExistingResourceHref := "/not/existing" + // configuration + // -> /not/existing -> { value: 42 } + conf, err := snippetClient.CreateConfiguration(ctx, &pb.Configuration{ + Name: "update", + Owner: oauthService.DeviceUserID, + Resources: []*pb.Configuration_Resource{ + { + Href: notExistingResourceHref, + Content: &commands.Content{ + ContentType: message.AppOcfCbor.String(), + Data: hubTest.EncodeToCbor(t, map[string]interface{}{ + "value": 42, + }), + }, + TimeToLive: int64(100 * time.Millisecond), + }, + }, + }) + require.NoError(t, err) + require.NotEmpty(t, conf.GetId()) + + // invoke configuration with long TimeToLive + resp, err := snippetClient.InvokeConfiguration(ctx, &pb.InvokeConfigurationRequest{ + ConfigurationId: conf.GetId(), + DeviceId: deviceID, + }) + require.NoError(t, err) + + time.Sleep(time.Second * 10) + + // check that all configurations are either in timeout or done state + s, cleanUpStore := test.NewStore(t) + defer cleanUpStore() + + appliedConfs := make(map[string]*pb.AppliedConfiguration) + err = s.GetAppliedConfigurations(ctx, oauthService.DeviceUserID, &pb.GetAppliedConfigurationsRequest{ + IdFilter: []string{resp.GetAppliedConfigurationId()}, + }, func(appliedConf *store.AppliedConfiguration) error { + appliedConfs[appliedConf.GetId()] = appliedConf.GetAppliedConfiguration().Clone() + return nil + }) + require.NoError(t, err) + require.Len(t, appliedConfs, 1) + appliedConf, ok := appliedConfs[resp.GetAppliedConfigurationId()] + require.True(t, ok) + for _, r := range appliedConf.GetResources() { + status := r.GetStatus() + require.True(t, pb.AppliedConfiguration_Resource_TIMEOUT == status || pb.AppliedConfiguration_Resource_DONE == status) + } +} diff --git a/snippet-service/updater/updateExecution.go b/snippet-service/updater/updateExecution.go new file mode 100644 index 000000000..a3aa5201a --- /dev/null +++ b/snippet-service/updater/updateExecution.go @@ -0,0 +1,67 @@ +package updater + +import "github.com/plgd-dev/hub/v2/snippet-service/pb" + +type executeByType int + +const ( + executeByTypeFindCondition executeByType = iota + executeByTypeCondition + executeByTypeOnDemand +) + +type appliedCondition struct { + id string + version uint64 + token string +} + +type appliedOnDemand struct { + token string +} + +type execution struct { + condition appliedCondition // executedBy = executeByTypeCondition + conditions []*pb.Condition // executedBy = executeByTypeFindCondition + onDemand appliedOnDemand // executedBy = executeByTypeOnDemand + force bool + executeBy executeByType +} + +func (e execution) token() string { + if e.executeBy == executeByTypeOnDemand { + return e.onDemand.token + } + if e.executeBy == executeByTypeCondition { + return e.condition.token + } + return "" +} + +func (e execution) setExecutedBy(ac *pb.AppliedConfiguration) { + if e.executeBy == executeByTypeOnDemand { + ac.ExecutedBy = pb.MakeExecutedByOnDemand() + return + } + if e.condition.id != "" { + ac.ExecutedBy = pb.MakeExecutedByConditionId(e.condition.id, e.condition.version) + return + } + firstCondition := e.conditions[0] + ac.ExecutedBy = pb.MakeExecutedByConditionId(firstCondition.GetId(), firstCondition.GetVersion()) +} + +type executionResult struct { + validUntil int64 + condition appliedCondition // executedBy = executeByTypeCondition or executeByTypeFindCondition + onDemand appliedOnDemand // executedBy = executeByTypeOnDemand + executedBy executeByType + err error +} + +func (er executionResult) token() string { + if er.executedBy == executeByTypeOnDemand { + return er.onDemand.token + } + return er.condition.token +}