From 82c772676f7e2b6466df38f1db6ac7c3b0452a7f Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Thu, 16 May 2024 18:48:30 +0200 Subject: [PATCH] snapshot-service: persistent Condition storage --- Makefile | 2 + .../store/cqldb/signingRecords.go | 10 +- certificate-authority/store/cqldb/store.go | 37 +-- identity-store/persistence/cqldb/persist.go | 2 +- .../persistence/cqldb/persist_test.go | 2 +- identity-store/persistence/cqldb/store.go | 37 +-- pkg/cqldb/store.go | 55 ++++ .../cqrs/eventstore/cqldb/delete.go | 2 +- .../cqrs/eventstore/cqldb/eventstore.go | 41 +-- .../eventstore/cqldb/getLatestDeviceETags.go | 2 +- .../cqrs/eventstore/cqldb/load.go | 4 +- .../cqldb/loadDeviceMetadataByServiceIDs.go | 2 +- .../cqrs/eventstore/cqldb/save.go | 10 +- snapshot-service/.plantuml | 91 ------ snapshot-service/Makefile | 1 + snapshot-service/pb/README.md | 30 +- snapshot-service/pb/condition.go | 51 +++ snapshot-service/pb/doc.html | 30 +- snapshot-service/pb/service.pb.go | 26 +- snapshot-service/pb/service.proto | 26 +- snapshot-service/pb/service.swagger.json | 69 ++-- .../service/condition/createCondition.go | 1 + snapshot-service/service/service.go | 2 +- snapshot-service/store/condition.go | 12 + snapshot-service/store/cqldb/condition.go | 15 + snapshot-service/store/cqldb/store.go | 38 +-- snapshot-service/store/mongodb/condition.go | 105 +++++++ .../store/mongodb/condition_test.go | 295 ++++++++++++++++++ snapshot-service/store/mongodb/store.go | 4 +- snapshot-service/store/store.go | 25 ++ snapshot-service/test/test.go | 17 + 31 files changed, 728 insertions(+), 316 deletions(-) create mode 100644 pkg/cqldb/store.go delete mode 100644 snapshot-service/.plantuml create mode 100644 snapshot-service/pb/condition.go create mode 100644 snapshot-service/service/condition/createCondition.go create mode 100644 snapshot-service/store/condition.go create mode 100644 snapshot-service/store/cqldb/condition.go create mode 100644 snapshot-service/store/mongodb/condition.go create mode 100644 snapshot-service/store/mongodb/condition_test.go create mode 100644 snapshot-service/test/test.go diff --git a/Makefile b/Makefile index 3e5cbf8fe..afdd4f2f3 100644 --- a/Makefile +++ b/Makefile @@ -115,6 +115,8 @@ scylla: scylla/clean --entrypoint /bin/cp \ scylladb/scylla \ /etc/scylla/scylla.yaml /etc-scylla-tmp/scylla.yaml + sudo chown $(shell whoami) $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml + yq -i '.server_encryption_options.internode_encryption="all"' $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml yq -i '.server_encryption_options.certificate="/certs/http.crt"' $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml yq -i '.server_encryption_options.keyfile="/certs/http.key"' $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml diff --git a/certificate-authority/store/cqldb/signingRecords.go b/certificate-authority/store/cqldb/signingRecords.go index 73924579c..feaa18b9b 100644 --- a/certificate-authority/store/cqldb/signingRecords.go +++ b/certificate-authority/store/cqldb/signingRecords.go @@ -76,7 +76,7 @@ func (s *Store) CreateSigningRecord(ctx context.Context, signingRecord *store.Si if err != nil { return err } - applied, err := s.client.Session().Query(insertQuery).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil) + applied, err := s.Session().Query(insertQuery).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil) if err != nil { return err } @@ -95,7 +95,7 @@ func (s *Store) UpdateSigningRecord(ctx context.Context, signingRecord *store.Si return err } - return s.client.Session().Query(insertQuery).WithContext(ctx).Exec() + return s.Session().Query(insertQuery).WithContext(ctx).Exec() } func stringsToCQLSet(filter []string, str bool) string { @@ -232,7 +232,7 @@ func (s *Store) readPrimaryKeys(ctx context.Context, where string) (primaryKeysV b.WriteString(s.Table()) b.WriteString(" " + cqldb.WhereClause + " ") b.WriteString(where) - iter := s.client.Session().Query(b.String()).WithContext(ctx).Iter() + iter := s.Session().Query(b.String()).WithContext(ctx).Iter() defer iter.Close() return readPrimaryKeys(iter) } @@ -323,7 +323,7 @@ func (s *Store) DeleteSigningRecords(ctx context.Context, owner string, query *s b.WriteString(" IF EXISTS") var count int64 - applied, err := s.client.Session().Query(b.String()).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil) + applied, err := s.Session().Query(b.String()).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil) if err == nil { if applied { count++ @@ -377,7 +377,7 @@ func (i *SigningRecordsIterator) nextQuery() bool { b.WriteString(i.s.Table()) b.WriteString(" " + cqldb.WhereClause + " ") b.WriteString(i.queries[i.queriesIdx]) - i.iter = i.s.client.Session().Query(b.String()).WithContext(i.ctx).Iter() + i.iter = i.s.Session().Query(b.String()).WithContext(i.ctx).Iter() i.queriesIdx++ return true } diff --git a/certificate-authority/store/cqldb/store.go b/certificate-authority/store/cqldb/store.go index ebdcf0075..ea8fcc084 100644 --- a/certificate-authority/store/cqldb/store.go +++ b/certificate-authority/store/cqldb/store.go @@ -33,11 +33,8 @@ type Index struct { // clustering key: deviceIDKey var primaryKey = []string{idKey, ownerKey, commonNameKey} -// Store implements an Store for cqldb. type Store struct { - client *cqldb.Client - config *Config - logger log.Logger + *cqldb.Store } func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) { @@ -92,36 +89,6 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config * } return &Store{ - client: client, - logger: logger, - config: config, + Store: cqldb.NewStore(config.Table, client, logger), }, nil } - -// Clear clears the event storage. -func (s *Store) Clear(ctx context.Context) error { - err := s.client.DropKeyspace(ctx) - if err != nil { - return fmt.Errorf("cannot clear: %w", err) - } - return nil -} - -func (s *Store) Table() string { - return s.client.Keyspace() + "." + s.config.Table -} - -// Clear documents in collections, but don't drop the database or the collections -func (s *Store) ClearTable(ctx context.Context) error { - return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec() -} - -// Close closes the database session. -func (s *Store) Close(_ context.Context) error { - s.client.Close() - return nil -} - -func (s *Store) AddCloseFunc(f func()) { - s.client.AddCloseFunc(f) -} diff --git a/identity-store/persistence/cqldb/persist.go b/identity-store/persistence/cqldb/persist.go index c4875c8e2..5e3702989 100644 --- a/identity-store/persistence/cqldb/persist.go +++ b/identity-store/persistence/cqldb/persist.go @@ -25,7 +25,7 @@ type PersistenceTx struct { // tx := s.persistence.NewTransaction() // defer tx.Close() func (s *Store) NewTransaction(ctx context.Context) persistence.PersistenceTx { - return &PersistenceTx{tx: s.client.Session(), table: s.Table(), err: nil, ctx: ctx} + return &PersistenceTx{tx: s.Session(), table: s.Table(), err: nil, ctx: ctx} } func (p *PersistenceTx) retrieveDeviceByQuery(whereCondition string) (_ *persistence.AuthorizedDevice, ok bool, err error) { diff --git a/identity-store/persistence/cqldb/persist_test.go b/identity-store/persistence/cqldb/persist_test.go index 4aefa1071..ef573bd58 100644 --- a/identity-store/persistence/cqldb/persist_test.go +++ b/identity-store/persistence/cqldb/persist_test.go @@ -314,7 +314,7 @@ func TestPersistenceTxPersist(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := p.TruncateTable(ctx) + err := p.ClearTable(ctx) require.NoError(t, err) if tt.actionBeforePersist != nil { tt.actionBeforePersist(t, tx) diff --git a/identity-store/persistence/cqldb/store.go b/identity-store/persistence/cqldb/store.go index 2c73504d9..f593b2821 100644 --- a/identity-store/persistence/cqldb/store.go +++ b/identity-store/persistence/cqldb/store.go @@ -32,9 +32,7 @@ var indexes = []cqldb.Index{ // Store implements an Store for cqldb. type Store struct { - client *cqldb.Client - config *Config - logger log.Logger + *cqldb.Store } func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) { @@ -90,37 +88,6 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config * } return &Store{ - client: client, - logger: logger, - config: config, + Store: cqldb.NewStore(config.Table, client, logger), }, nil } - -// Clear clears the event storage. -func (s *Store) Clear(ctx context.Context) error { - err := s.client.DropKeyspace(ctx) - if err != nil { - return fmt.Errorf("cannot clear: %w", err) - } - - return nil -} - -func (s *Store) Table() string { - return s.client.Keyspace() + "." + s.config.Table -} - -// Truncate records in table, but don't drop the keybase or the table. -func (s *Store) TruncateTable(ctx context.Context) error { - return s.client.Session().Query("TRUNCATE TABLE " + s.Table()).WithContext(ctx).Exec() -} - -// Close closes the database session. -func (s *Store) Close(_ context.Context) error { - s.client.Close() - return nil -} - -func (s *Store) AddCloseFunc(f func()) { - s.client.AddCloseFunc(f) -} diff --git a/pkg/cqldb/store.go b/pkg/cqldb/store.go new file mode 100644 index 000000000..4162b58bc --- /dev/null +++ b/pkg/cqldb/store.go @@ -0,0 +1,55 @@ +package cqldb + +import ( + "context" + "fmt" + + "github.com/gocql/gocql" + "github.com/plgd-dev/hub/v2/pkg/log" +) + +// Store implements an Store for cqldb. +type Store struct { + table string + client *Client + logger log.Logger +} + +func NewStore(table string, client *Client, logger log.Logger) *Store { + return &Store{ + table: table, + client: client, + logger: logger, + } +} + +func (s *Store) Table() string { + return s.client.Keyspace() + "." + s.table +} + +func (s *Store) Session() *gocql.Session { + return s.client.Session() +} + +func (s *Store) AddCloseFunc(f func()) { + s.client.AddCloseFunc(f) +} + +func (s *Store) Clear(ctx context.Context) error { + err := s.client.DropKeyspace(ctx) + if err != nil { + return fmt.Errorf("cannot clear: %w", err) + } + return nil +} + +// Clear documents in collections, but don't drop the database or the collections +func (s *Store) ClearTable(ctx context.Context) error { + return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec() +} + +// Close closes the database session. +func (s *Store) Close(_ context.Context) error { + s.client.Close() + return nil +} diff --git a/resource-aggregate/cqrs/eventstore/cqldb/delete.go b/resource-aggregate/cqrs/eventstore/cqldb/delete.go index a5240ba60..26c878c92 100644 --- a/resource-aggregate/cqrs/eventstore/cqldb/delete.go +++ b/resource-aggregate/cqrs/eventstore/cqldb/delete.go @@ -33,5 +33,5 @@ func (s *EventStore) Delete(ctx context.Context, queries []eventstore.DeleteQuer return errors.New("failed to delete documents: invalid query") } - return s.client.Session().Query("delete from " + s.Table() + " " + cqldb.WhereClause + " " + deviceIDKey + " in (" + deviceIDFilter + ");").WithContext(ctx).Exec() + return s.Session().Query("delete from " + s.Table() + " " + cqldb.WhereClause + " " + deviceIDKey + " in (" + deviceIDFilter + ");").WithContext(ctx).Exec() } diff --git a/resource-aggregate/cqrs/eventstore/cqldb/eventstore.go b/resource-aggregate/cqrs/eventstore/cqldb/eventstore.go index bd9fc0a9f..ab63d6ef7 100644 --- a/resource-aggregate/cqrs/eventstore/cqldb/eventstore.go +++ b/resource-aggregate/cqrs/eventstore/cqldb/eventstore.go @@ -42,13 +42,9 @@ var indexes = []cqldb.Index{ // EventStore implements an EventStore for cqldb. type EventStore struct { - client *cqldb.Client - config *Config - logger log.Logger -} - -func (s *EventStore) AddCloseFunc(f func()) { - s.client.AddCloseFunc(f) + *cqldb.Store + marshalerFunc MarshalerFunc + unmarshalerFunc UnmarshalerFunc } func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider, opts ...Option) (*EventStore, error) { @@ -124,9 +120,9 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config * } return &EventStore{ - client: client, - logger: logger, - config: config, + Store: cqldb.NewStore(config.Table, client, logger), + marshalerFunc: config.marshalerFunc, + unmarshalerFunc: config.unmarshalerFunc, }, nil } @@ -151,28 +147,3 @@ func getLatestEventsSnapshot(events []eventstore.Event, marshaler MarshalerFunc) } return lastEvent, snapshot, nil } - -// Clear clears the event storage. -func (s *EventStore) Clear(ctx context.Context) error { - err := s.client.DropKeyspace(ctx) - if err != nil { - return fmt.Errorf("cannot clear: %w", err) - } - - return nil -} - -func (s *EventStore) Table() string { - return s.client.Keyspace() + "." + s.config.Table -} - -// Clear documents in collections, but don't drop the database or the collections -func (s *EventStore) ClearTable(ctx context.Context) error { - return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec() -} - -// Close closes the database session. -func (s *EventStore) Close(_ context.Context) error { - s.client.Close() - return nil -} diff --git a/resource-aggregate/cqrs/eventstore/cqldb/getLatestDeviceETags.go b/resource-aggregate/cqrs/eventstore/cqldb/getLatestDeviceETags.go index b3f052fcb..858a7b0c1 100644 --- a/resource-aggregate/cqrs/eventstore/cqldb/getLatestDeviceETags.go +++ b/resource-aggregate/cqrs/eventstore/cqldb/getLatestDeviceETags.go @@ -49,7 +49,7 @@ func (s *EventStore) GetLatestDeviceETags(ctx context.Context, deviceID string, q.WriteString(" " + cqldb.WhereClause) q.WriteString(" " + deviceIDKey + "=" + deviceID) - iter := s.client.Session().Query(q.String()).WithContext(ctx).Iter() + iter := s.Session().Query(q.String()).WithContext(ctx).Iter() if iter == nil { return nil, errors.New("cannot create iterator") } diff --git a/resource-aggregate/cqrs/eventstore/cqldb/load.go b/resource-aggregate/cqrs/eventstore/cqldb/load.go index d0d1b57d9..3e14ae108 100644 --- a/resource-aggregate/cqrs/eventstore/cqldb/load.go +++ b/resource-aggregate/cqrs/eventstore/cqldb/load.go @@ -109,8 +109,8 @@ func (s *EventStore) loadEventsQuery(ctx context.Context, eh eventstore.Handler, q.WriteString(" " + cqldb.WhereClause + " " + filter) } q.WriteString(";") - iter := s.client.Session().Query(q.String()).WithContext(ctx).Iter() - i := newIterator(iter, s.config.unmarshalerFunc) + iter := s.Session().Query(q.String()).WithContext(ctx).Iter() + i := newIterator(iter, s.unmarshalerFunc) err := eh.Handle(ctx, i) errClose := iter.Close() if err == nil { diff --git a/resource-aggregate/cqrs/eventstore/cqldb/loadDeviceMetadataByServiceIDs.go b/resource-aggregate/cqrs/eventstore/cqldb/loadDeviceMetadataByServiceIDs.go index efb8c78f0..770199d59 100644 --- a/resource-aggregate/cqrs/eventstore/cqldb/loadDeviceMetadataByServiceIDs.go +++ b/resource-aggregate/cqrs/eventstore/cqldb/loadDeviceMetadataByServiceIDs.go @@ -21,7 +21,7 @@ func (s *EventStore) LoadDeviceMetadataByServiceIDs(ctx context.Context, service } serviceIDs = pkgStrings.Unique(serviceIDs) q := cqldb.SelectCommand + " " + deviceIDKey + "," + serviceIDKey + " " + cqldb.FromClause + " " + s.Table() + " " + cqldb.WhereClause + " " + serviceIDKey + " in (" + strings.Join(serviceIDs, ",") + ") LIMIT " + strconv.FormatInt(limit, 10) + ";" - iter := s.client.Session().Query(q).WithContext(ctx).Iter() + iter := s.Session().Query(q).WithContext(ctx).Iter() if iter == nil { return nil, errors.New("cannot create iterator") } diff --git a/resource-aggregate/cqrs/eventstore/cqldb/save.go b/resource-aggregate/cqrs/eventstore/cqldb/save.go index 9d2255c35..240e19553 100644 --- a/resource-aggregate/cqrs/eventstore/cqldb/save.go +++ b/resource-aggregate/cqrs/eventstore/cqldb/save.go @@ -84,13 +84,13 @@ func eventsToCQLSetValue(event eventstore.Event, data []byte) string { } func (s *EventStore) saveEvent(ctx context.Context, events []eventstore.Event) (status eventstore.SaveStatus, err error) { - lastEvent, snapshotBinary, err := getLatestEventsSnapshot(events, s.config.marshalerFunc) + lastEvent, snapshotBinary, err := getLatestEventsSnapshot(events, s.marshalerFunc) if err != nil { return eventstore.Fail, err } setters := eventsToCQLSetValue(lastEvent, snapshotBinary) - q := "update " + s.client.Keyspace() + "." + s.config.Table + " set " + setters + " " + cqldb.WhereClause + " " + deviceIDKey + "=" + lastEvent.GroupID() + " and " + idKey + "=" + lastEvent.AggregateID() + " if " + versionKey + "=" + strconv.FormatUint(events[0].Version()-1, 10) + ";" - ok, err := s.client.Session().Query(q).WithContext(ctx).ScanCAS(nil) + q := "update " + s.Table() + " set " + setters + " " + cqldb.WhereClause + " " + deviceIDKey + "=" + lastEvent.GroupID() + " and " + idKey + "=" + lastEvent.AggregateID() + " if " + versionKey + "=" + strconv.FormatUint(events[0].Version()-1, 10) + ";" + ok, err := s.Session().Query(q).WithContext(ctx).ScanCAS(nil) if err != nil { return eventstore.Fail, fmt.Errorf("cannot update snapshot event('%v'): %w", events, err) } @@ -112,7 +112,7 @@ func (s *EventStore) Save(ctx context.Context, events ...eventstore.Event) (even if events[0].Version() != 0 { return s.saveEvent(ctx, events) } - lastEvent, data, err := getLatestEventsSnapshot(events, s.config.marshalerFunc) + lastEvent, data, err := getLatestEventsSnapshot(events, s.marshalerFunc) if err != nil { return eventstore.Fail, err } @@ -121,7 +121,7 @@ func (s *EventStore) Save(ctx context.Context, events ...eventstore.Event) (even values := kvs.Values() q := "insert into " + s.Table() + " (" + strings.Join(keys, ",") + ") values (" + strings.Join(values, ",") + ") if not exists;" - ok, err := s.client.Session().Query(q).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil, nil, nil, nil, nil) + ok, err := s.Session().Query(q).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil, nil, nil, nil, nil) if err != nil { if errors.Is(err, gocql.ErrNotFound) { return eventstore.Ok, nil diff --git a/snapshot-service/.plantuml b/snapshot-service/.plantuml deleted file mode 100644 index dfd743248..000000000 --- a/snapshot-service/.plantuml +++ /dev/null @@ -1,91 +0,0 @@ -@startuml - -title "Rule Execution Flow" - -actor "Client/ResourceChanged event" as Client -participant "Rule Engine" as RuleEngine -participant "Rule Engine (Evaluation)" as RuleEvaluationService -database "RuleActionLink Store" as RuleActionLinkStore -database "AppliedRuleActionLink Store" as AppliedRuleActionLinkStore -database "Event Store" as EventStore -control "Event Bus" as Bus - -participant "Rule Engine (Scene and Rule Trigger)" as SceneRuleTrigger -participant "Resource Aggregate" as RA - -Client -> RuleEngine: InvokeRule -activate RuleEngine - -RuleEngine -> RuleEvaluationService: EvaluateRule -activate RuleEvaluationService -RuleEvaluationService -> EventStore: Load all needed resources for evaluation. -EventStore --> RuleEvaluationService: Resources - -alt Rule evaluation successful - RuleEvaluationService --> RuleEngine: Success - RuleEngine -> RuleActionLinkStore: GetAssociatedRuleActionLinks - activate RuleActionLinkStore - - loop for each RuleActionLink - RuleActionLinkStore --> RuleEngine: RuleActionLink - RuleEngine -> AppliedRuleActionLinkStore: CreateAppliedRuleActionLink - activate AppliedRuleActionLinkStore - AppliedRuleActionLinkStore --> RuleEngine: AppliedRuleActionLink - RuleEngine -> SceneRuleTrigger: ApplyScenesAndTriggerRules - activate SceneRuleTrigger - loop for each resource in scene - SceneRuleTrigger -> RA: Update resource from scene - RA -> Bus: Resource Update Pending - Bus --> SceneRuleTrigger: Resource updated - end - loop for each trigger rule - SceneRuleTrigger -> RuleEngine: Invoke rule - RuleEngine --> SceneRuleTrigger: Success - end - SceneRuleTrigger --> RuleEngine: Success - deactivate SceneRuleTrigger - RuleEngine -> AppliedRuleActionLinkStore: UpdateStatus - AppliedRuleActionLinkStore --> RuleEngine: Updated Status - deactivate AppliedRuleActionLinkStore - end - -else Rule evaluation failed - RuleEvaluationService --> RuleEngine: Failure -end -deactivate RuleEvaluationService - -RuleEngine --> Client: InvokeRuleResponse -deactivate RuleEngine -@enduml - -@startuml - -title "Create Configuration Workflow"" -actor Client -participant "Rule Engine" as RuleEngine -database "Rules Store" as RulesStore -database "RuleActionLink Store" as RuleActionLinkStore -database "Scene Store" as SceneStore - -Client -> RuleEngine: CreateSceneRequest -activate RuleEngine -RuleEngine -> SceneStore: Scene -SceneStore --> RuleEngine: id -RuleEngine --> Client: CreateSceneResponse -deactivate RuleEngine - -Client -> RuleEngine: CreateRuleRequest -activate RuleEngine -RuleEngine -> RulesStore: Rule -RulesStore --> RuleEngine: id -RuleEngine --> Client: CreateRuleResponse -deactivate RuleEngine - -Client -> RuleEngine: CreateRuleActionLinkRequest -activate RuleEngine -RuleEngine -> RuleActionLinkStore: RuleActionLink -RuleActionLinkStore --> RuleEngine: id -RuleEngine --> Client: CreateRuleActionLinkResponse -deactivate RuleEngine - -@enduml diff --git a/snapshot-service/Makefile b/snapshot-service/Makefile index 3d4969461..25a1a528d 100644 --- a/snapshot-service/Makefile +++ b/snapshot-service/Makefile @@ -48,6 +48,7 @@ GRPCGATEWAY_MODULE_PATH := $(shell go list -m -f '{{.Dir}}' github.com/grpc-ecos proto/generate: protoc -I=. -I=$(REPOSITORY_DIRECTORY) -I=$(GOPATH)/src -I=$(GOOGLEAPIS_PATH) -I=$(GRPCGATEWAY_MODULE_PATH) --go_out=$(GOPATH)/src $(WORKING_DIRECTORY)/pb/service.proto + protoc-go-inject-tag -remove_tag_comment -input=$(WORKING_DIRECTORY)/pb/service.pb.go protoc -I=. -I=$(REPOSITORY_DIRECTORY) -I=$(GOPATH)/src -I=$(GOOGLEAPIS_PATH) -I=$(GRPCGATEWAY_MODULE_PATH) --openapiv2_out=$(REPOSITORY_DIRECTORY) \ --openapiv2_opt logtostderr=true \ $(WORKING_DIRECTORY)/pb/service.proto diff --git a/snapshot-service/pb/README.md b/snapshot-service/pb/README.md index 93920f126..e4bbc3699 100644 --- a/snapshot-service/pb/README.md +++ b/snapshot-service/pb/README.md @@ -100,17 +100,25 @@ driven by resource change event | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| id | [string](#string) | | | -| version | [uint64](#uint64) | | | -| name | [string](#string) | | | -| enabled | [bool](#bool) | | | -| configuration_id | [string](#string) | | | -| device_id_filter | [string](#string) | repeated | | -| resource_type_filter | [string](#string) | repeated | | -| resource_href_filter | [string](#string) | repeated | | -| jq_expression_filter | [string](#string) | | | -| api_access_token | [string](#string) | | token used to update resources in the configuration | -| owner | [string](#string) | | | +| id | [string](#string) | | Unique condition ID + +@gotags: bson:"_id" | +| version | [uint64](#uint64) | | @gotags: bson:"version" | +| name | [string](#string) | | @gotags: bson:"name,omitempty" | +| enabled | [bool](#bool) | | @gotags: bson:"enabled,omitempty" | +| configuration_id | [string](#string) | | ID of the configuration to be applied when the condition is satisfied + +@gotags: bson:"configurationId" | +| device_id_filter | [string](#string) | repeated | @gotags: bson:"deviceIdFilter,omitempty" | +| resource_type_filter | [string](#string) | repeated | @gotags: bson:"resourceTypeFilter,omitempty" | +| resource_href_filter | [string](#string) | repeated | @gotags: bson:"resourceHrefFilter,omitempty" | +| jq_expression_filter | [string](#string) | | @gotags: bson:"jqExpressionFilter,omitempty" | +| api_access_token | [string](#string) | | Token used to update resources in the configuration + +@gotags: bson:"-" | +| owner | [string](#string) | | Condition owner + +@gotags: bson:"owner" | diff --git a/snapshot-service/pb/condition.go b/snapshot-service/pb/condition.go new file mode 100644 index 000000000..37874e0ab --- /dev/null +++ b/snapshot-service/pb/condition.go @@ -0,0 +1,51 @@ +package pb + +import ( + "errors" + "fmt" + "slices" + "sort" + + "github.com/google/uuid" +) + +type Conditions []*Condition + +func (cs Conditions) Sort() { + sort.Slice(cs, func(i, j int) bool { + return cs[i].GetId() < cs[j].GetId() + }) +} + +func (c *Condition) CopyData(condition *Condition) { + c.Id = condition.GetId() + c.Version = condition.GetVersion() + c.Name = condition.GetName() + c.Enabled = condition.GetEnabled() + c.ConfigurationId = condition.GetConfigurationId() + c.DeviceIdFilter = slices.Clone(condition.GetDeviceIdFilter()) + c.ResourceTypeFilter = slices.Clone(condition.GetResourceTypeFilter()) + c.ResourceHrefFilter = slices.Clone(condition.GetResourceHrefFilter()) + c.JqExpressionFilter = condition.GetJqExpressionFilter() + c.ApiAccessToken = condition.GetApiAccessToken() + c.Owner = condition.GetOwner() +} + +func (c *Condition) Validate() error { + if c.GetId() == "" { + return errors.New("empty condition ID") + } + if _, err := uuid.Parse(c.GetId()); err != nil { + return fmt.Errorf("invalid condition ID(%v): %w", c.GetId(), err) + } + if c.GetConfigurationId() == "" { + return errors.New("empty configuration ID") + } + if _, err := uuid.Parse(c.GetConfigurationId()); err != nil { + return fmt.Errorf("invalid configuration ID(%v): %w", c.GetConfigurationId(), err) + } + if c.GetOwner() == "" { + return errors.New("empty condition owner") + } + return nil +} diff --git a/snapshot-service/pb/doc.html b/snapshot-service/pb/doc.html index d9d66cfb1..7f945e7a0 100644 --- a/snapshot-service/pb/doc.html +++ b/snapshot-service/pb/doc.html @@ -428,77 +428,85 @@

Condition

id string -

+

Unique condition ID + +@gotags: bson:"_id"

version uint64 -

+

@gotags: bson:"version"

name string -

+

@gotags: bson:"name,omitempty"

enabled bool -

+

@gotags: bson:"enabled,omitempty"

configuration_id string -

+

ID of the configuration to be applied when the condition is satisfied + +@gotags: bson:"configurationId"

device_id_filter string repeated -

+

@gotags: bson:"deviceIdFilter,omitempty"

resource_type_filter string repeated -

+

@gotags: bson:"resourceTypeFilter,omitempty"

resource_href_filter string repeated -

+

@gotags: bson:"resourceHrefFilter,omitempty"

jq_expression_filter string -

+

@gotags: bson:"jqExpressionFilter,omitempty"

api_access_token string -

token used to update resources in the configuration

+

Token used to update resources in the configuration + +@gotags: bson:"-"

owner string -

+

Condition owner + +@gotags: bson:"owner"

diff --git a/snapshot-service/pb/service.pb.go b/snapshot-service/pb/service.pb.go index 46b44cf4c..ae140bec8 100644 --- a/snapshot-service/pb/service.pb.go +++ b/snapshot-service/pb/service.pb.go @@ -193,17 +193,21 @@ type Condition struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Version uint64 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"` - Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` - Enabled bool `protobuf:"varint,4,opt,name=enabled,proto3" json:"enabled,omitempty"` - ConfigurationId string `protobuf:"bytes,5,opt,name=configuration_id,json=configurationId,proto3" json:"configuration_id,omitempty"` - DeviceIdFilter []string `protobuf:"bytes,6,rep,name=device_id_filter,json=deviceIdFilter,proto3" json:"device_id_filter,omitempty"` - ResourceTypeFilter []string `protobuf:"bytes,7,rep,name=resource_type_filter,json=resourceTypeFilter,proto3" json:"resource_type_filter,omitempty"` - ResourceHrefFilter []string `protobuf:"bytes,8,rep,name=resource_href_filter,json=resourceHrefFilter,proto3" json:"resource_href_filter,omitempty"` - JqExpressionFilter string `protobuf:"bytes,9,opt,name=jq_expression_filter,json=jqExpressionFilter,proto3" json:"jq_expression_filter,omitempty"` - ApiAccessToken string `protobuf:"bytes,10,opt,name=api_access_token,json=apiAccessToken,proto3" json:"api_access_token,omitempty"` // token used to update resources in the configuration - Owner string `protobuf:"bytes,11,opt,name=owner,proto3" json:"owner,omitempty"` + // Unique condition ID + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty" bson:"_id"` + Version uint64 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty" bson:"version"` + Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty" bson:"name,omitempty"` + Enabled bool `protobuf:"varint,4,opt,name=enabled,proto3" json:"enabled,omitempty" bson:"enabled,omitempty"` + // ID of the configuration to be applied when the condition is satisfied + ConfigurationId string `protobuf:"bytes,5,opt,name=configuration_id,json=configurationId,proto3" json:"configuration_id,omitempty" bson:"configurationId"` + DeviceIdFilter []string `protobuf:"bytes,6,rep,name=device_id_filter,json=deviceIdFilter,proto3" json:"device_id_filter,omitempty" bson:"deviceIdFilter,omitempty"` + ResourceTypeFilter []string `protobuf:"bytes,7,rep,name=resource_type_filter,json=resourceTypeFilter,proto3" json:"resource_type_filter,omitempty" bson:"resourceTypeFilter,omitempty"` + ResourceHrefFilter []string `protobuf:"bytes,8,rep,name=resource_href_filter,json=resourceHrefFilter,proto3" json:"resource_href_filter,omitempty" bson:"resourceHrefFilter,omitempty"` + JqExpressionFilter string `protobuf:"bytes,9,opt,name=jq_expression_filter,json=jqExpressionFilter,proto3" json:"jq_expression_filter,omitempty" bson:"jqExpressionFilter,omitempty"` + // Token used to update resources in the configuration + ApiAccessToken string `protobuf:"bytes,10,opt,name=api_access_token,json=apiAccessToken,proto3" json:"api_access_token,omitempty" bson:"-"` + // Condition owner + Owner string `protobuf:"bytes,11,opt,name=owner,proto3" json:"owner,omitempty" bson:"owner"` } func (x *Condition) Reset() { diff --git a/snapshot-service/pb/service.proto b/snapshot-service/pb/service.proto index f8261704b..6c8092048 100644 --- a/snapshot-service/pb/service.proto +++ b/snapshot-service/pb/service.proto @@ -49,17 +49,21 @@ message IDFilter { } message Condition { // driven by resource change event - string id = 1; - uint64 version = 2; - string name = 3; - bool enabled = 4; - string configuration_id = 5; - repeated string device_id_filter = 6; - repeated string resource_type_filter = 7; - repeated string resource_href_filter = 8; - string jq_expression_filter = 9; - string api_access_token = 10; // token used to update resources in the configuration - string owner = 11; + // Unique condition ID + string id = 1; // @gotags: bson:"_id" + uint64 version = 2; // @gotags: bson:"version" + string name = 3; // @gotags: bson:"name,omitempty" + bool enabled = 4; // @gotags: bson:"enabled,omitempty" + // ID of the configuration to be applied when the condition is satisfied + string configuration_id = 5; // @gotags: bson:"configurationId" + repeated string device_id_filter = 6; // @gotags: bson:"deviceIdFilter,omitempty" + repeated string resource_type_filter = 7; // @gotags: bson:"resourceTypeFilter,omitempty" + repeated string resource_href_filter = 8; // @gotags: bson:"resourceHrefFilter,omitempty" + string jq_expression_filter = 9; // @gotags: bson:"jqExpressionFilter,omitempty" + // Token used to update resources in the configuration + string api_access_token = 10; // @gotags: bson:"-" + // Condition owner + string owner = 11; // @gotags: bson:"owner" } message GetConditionsRequest { repeated IDFilter id_filter = 1; } diff --git a/snapshot-service/pb/service.swagger.json b/snapshot-service/pb/service.swagger.json index 47265c7c8..bfae974db 100644 --- a/snapshot-service/pb/service.swagger.json +++ b/snapshot-service/pb/service.swagger.json @@ -132,6 +132,7 @@ "parameters": [ { "name": "id", + "description": "Unique condition ID\n\n@gotags: bson:\"_id\"", "in": "path", "required": true, "type": "string" @@ -450,44 +451,56 @@ "properties": { "version": { "type": "string", - "format": "uint64" + "format": "uint64", + "title": "@gotags: bson:\"version\"" }, "name": { - "type": "string" + "type": "string", + "title": "@gotags: bson:\"name,omitempty\"" }, "enabled": { - "type": "boolean" + "type": "boolean", + "title": "@gotags: bson:\"enabled,omitempty\"" }, "configurationId": { - "type": "string" + "type": "string", + "description": "@gotags: bson:\"configurationId\"", + "title": "ID of the configuration to be applied when the condition is satisfied" }, "deviceIdFilter": { "type": "array", "items": { "type": "string" - } + }, + "title": "@gotags: bson:\"deviceIdFilter,omitempty\"" }, "resourceTypeFilter": { "type": "array", "items": { "type": "string" - } + }, + "title": "@gotags: bson:\"resourceTypeFilter,omitempty\"" }, "resourceHrefFilter": { "type": "array", "items": { "type": "string" - } + }, + "title": "@gotags: bson:\"resourceHrefFilter,omitempty\"" }, "jqExpressionFilter": { - "type": "string" + "type": "string", + "title": "@gotags: bson:\"jqExpressionFilter,omitempty\"" }, "apiAccessToken": { "type": "string", - "title": "token used to update resources in the configuration" + "description": "@gotags: bson:\"-\"", + "title": "Token used to update resources in the configuration" }, "owner": { - "type": "string" + "type": "string", + "description": "@gotags: bson:\"owner\"", + "title": "Condition owner" } }, "title": "driven by resource change event" @@ -603,48 +616,62 @@ "type": "object", "properties": { "id": { - "type": "string" + "type": "string", + "description": "@gotags: bson:\"_id\"", + "title": "Unique condition ID" }, "version": { "type": "string", - "format": "uint64" + "format": "uint64", + "title": "@gotags: bson:\"version\"" }, "name": { - "type": "string" + "type": "string", + "title": "@gotags: bson:\"name,omitempty\"" }, "enabled": { - "type": "boolean" + "type": "boolean", + "title": "@gotags: bson:\"enabled,omitempty\"" }, "configurationId": { - "type": "string" + "type": "string", + "description": "@gotags: bson:\"configurationId\"", + "title": "ID of the configuration to be applied when the condition is satisfied" }, "deviceIdFilter": { "type": "array", "items": { "type": "string" - } + }, + "title": "@gotags: bson:\"deviceIdFilter,omitempty\"" }, "resourceTypeFilter": { "type": "array", "items": { "type": "string" - } + }, + "title": "@gotags: bson:\"resourceTypeFilter,omitempty\"" }, "resourceHrefFilter": { "type": "array", "items": { "type": "string" - } + }, + "title": "@gotags: bson:\"resourceHrefFilter,omitempty\"" }, "jqExpressionFilter": { - "type": "string" + "type": "string", + "title": "@gotags: bson:\"jqExpressionFilter,omitempty\"" }, "apiAccessToken": { "type": "string", - "title": "token used to update resources in the configuration" + "description": "@gotags: bson:\"-\"", + "title": "Token used to update resources in the configuration" }, "owner": { - "type": "string" + "type": "string", + "description": "@gotags: bson:\"owner\"", + "title": "Condition owner" } }, "title": "driven by resource change event" diff --git a/snapshot-service/service/condition/createCondition.go b/snapshot-service/service/condition/createCondition.go new file mode 100644 index 000000000..ff2fec882 --- /dev/null +++ b/snapshot-service/service/condition/createCondition.go @@ -0,0 +1 @@ +package condition diff --git a/snapshot-service/service/service.go b/snapshot-service/service/service.go index 82e87789f..b6feee37e 100644 --- a/snapshot-service/service/service.go +++ b/snapshot-service/service/service.go @@ -23,7 +23,7 @@ import ( "go.opentelemetry.io/otel/trace" ) -const serviceName = "certificate-authority" +const serviceName = "snapshot-service" func createStore(ctx context.Context, config storeConfig.Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (store.Store, error) { switch config.Use { diff --git a/snapshot-service/store/condition.go b/snapshot-service/store/condition.go new file mode 100644 index 000000000..d595c94d2 --- /dev/null +++ b/snapshot-service/store/condition.go @@ -0,0 +1,12 @@ +package store + +import ( + "github.com/plgd-dev/hub/v2/snapshot-service/pb" +) + +const ( + DeviceIDFilterKey = "deviceIdFilter" // must match with pb.Condition.DeviceIdFilter tag + OwnerKey = "owner" // must match with pb.Condition.Owner tag +) + +type Condition = pb.Condition diff --git a/snapshot-service/store/cqldb/condition.go b/snapshot-service/store/cqldb/condition.go new file mode 100644 index 000000000..3d6cffcdf --- /dev/null +++ b/snapshot-service/store/cqldb/condition.go @@ -0,0 +1,15 @@ +package cqldb + +import ( + "context" + + "github.com/plgd-dev/hub/v2/snapshot-service/store" +) + +func (s *Store) CreateCondition(context.Context, *store.Condition) error { + return store.ErrNotSupported +} + +func (s *Store) LoadConditions(context.Context, string, *store.ConditionsQuery, store.LoadConditionsFunc) error { + return store.ErrNotSupported +} diff --git a/snapshot-service/store/cqldb/store.go b/snapshot-service/store/cqldb/store.go index ebdcf0075..7a4536d18 100644 --- a/snapshot-service/store/cqldb/store.go +++ b/snapshot-service/store/cqldb/store.go @@ -35,9 +35,7 @@ var primaryKey = []string{idKey, ownerKey, commonNameKey} // Store implements an Store for cqldb. type Store struct { - client *cqldb.Client - config *Config - logger log.Logger + *cqldb.Store } func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) { @@ -83,7 +81,7 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config * } if config.Table == "" { - config.Table = "signedCertificateRecords" + config.Table = "snapshots" } err := createEventsTable(ctx, client, config.Table) @@ -92,36 +90,6 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config * } return &Store{ - client: client, - logger: logger, - config: config, + Store: cqldb.NewStore(config.Table, client, logger), }, nil } - -// Clear clears the event storage. -func (s *Store) Clear(ctx context.Context) error { - err := s.client.DropKeyspace(ctx) - if err != nil { - return fmt.Errorf("cannot clear: %w", err) - } - return nil -} - -func (s *Store) Table() string { - return s.client.Keyspace() + "." + s.config.Table -} - -// Clear documents in collections, but don't drop the database or the collections -func (s *Store) ClearTable(ctx context.Context) error { - return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec() -} - -// Close closes the database session. -func (s *Store) Close(_ context.Context) error { - s.client.Close() - return nil -} - -func (s *Store) AddCloseFunc(f func()) { - s.client.AddCloseFunc(f) -} diff --git a/snapshot-service/store/mongodb/condition.go b/snapshot-service/store/mongodb/condition.go new file mode 100644 index 000000000..c2ce9857b --- /dev/null +++ b/snapshot-service/store/mongodb/condition.go @@ -0,0 +1,105 @@ +package mongodb + +import ( + "context" + "errors" + + "github.com/hashicorp/go-multierror" + "github.com/plgd-dev/hub/v2/snapshot-service/store" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +/* +Condition -> podmienka za akych okolnosti sa aplikuje + - id (identifikator) (user nevie menit) + - verzia (pri update sa verzia inkremente) + - pri ukladani checknem v DB ze predchadzajuca verzia je o 1 mensie + - t.j. check nenastala mi medzi tym zmena + - name - user-friendly meno + - enabled + - configuration id + - device_id_filter - OR - pride event a chcecknem ci device_id_filter obsahuje ID device + - ak je prazdny tak vsetko pustit + - resource_type_filter - AND - ked mam viac musia sa vsetky matchnut + - ak je prazdny tak vsetko pustit + - resource_href_filer - OR - musim matchnut aspon jeden href + - ak je prazdny tak vsetko pustit- resource_href_filter + - jq_expression - expression pustim nad obsahom ResourceChanged eventom, mal by vratit true/false (ci hodnota existuje) + - dalsia podmienka + https://github.com/itchyny/gojq + - api_access_token - TODO, zatial nechame otvorene; ked sa ide aplikovat konfiguraciu tak potrebujes token na autorizaciu + - owner -> musi sediet s tym co je v DB +*/ + +func (s *Store) CreateCondition(ctx context.Context, cond *store.Condition) error { + if err := cond.Validate(); err != nil { + return err + } + _, err := s.Collection(conditionsCol).InsertOne(ctx, cond) + if err != nil { + return err + } + return nil +} + +type conditionsIterator struct { + iter *mongo.Cursor +} + +func (i *conditionsIterator) Next(ctx context.Context, s *store.Condition) bool { + if !i.iter.Next(ctx) { + return false + } + err := i.iter.Decode(s) + return err == nil +} + +func (i *conditionsIterator) Err() error { + return i.iter.Err() +} + +func toDeviceIDQueryFilter(deviceID string) bson.M { + return bson.M{"$or": []bson.D{ + {{Key: store.DeviceIDFilterKey, Value: bson.M{"$exists": false}}}, + {{Key: store.DeviceIDFilterKey, Value: deviceID}}, + }} +} + +func toConditionsQueryFilter(owner string, queries *store.ConditionsQuery) interface{} { + filter := make([]interface{}, 0, 2) + if owner != "" { + filter = append(filter, bson.D{{Key: store.OwnerKey, Value: owner}}) + } + if queries.DeviceID != "" { + filter = append(filter, toDeviceIDQueryFilter(queries.DeviceID)) + } + if len(filter) == 0 { + return bson.D{} + } + if len(filter) == 1 { + return filter[0] + } + return bson.M{"$and": filter} +} + +func (s *Store) LoadConditions(ctx context.Context, owner string, query *store.ConditionsQuery, h store.LoadConditionsFunc) error { + col := s.Collection(conditionsCol) + iter, err := col.Find(ctx, toConditionsQueryFilter(owner, query)) + if errors.Is(err, mongo.ErrNilDocument) { + return nil + } + if err != nil { + return err + } + + var errors *multierror.Error + i := conditionsIterator{ + iter: iter, + } + err = h(ctx, &i) + errors = multierror.Append(errors, err) + errClose := iter.Close(ctx) + errors = multierror.Append(errors, errClose) + return errors.ErrorOrNil() +} diff --git a/snapshot-service/store/mongodb/condition_test.go b/snapshot-service/store/mongodb/condition_test.go new file mode 100644 index 000000000..8c4ac5551 --- /dev/null +++ b/snapshot-service/store/mongodb/condition_test.go @@ -0,0 +1,295 @@ +package mongodb_test + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/plgd-dev/hub/v2/snapshot-service/pb" + "github.com/plgd-dev/hub/v2/snapshot-service/store" + "github.com/plgd-dev/hub/v2/snapshot-service/test" + "github.com/plgd-dev/hub/v2/test/config" + "github.com/stretchr/testify/require" +) + +func TestStoreCreateCondition(t *testing.T) { + s, cleanUpStore := test.NewMongoStore(t) + defer cleanUpStore() + + ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT) + defer cancel() + + condID := uuid.New().String() + + type args struct { + cond *store.Condition + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "valid", + args: args{ + cond: &store.Condition{ + Id: condID, + ConfigurationId: uuid.New().String(), + Owner: "owner", + }, + }, + }, + { + name: "missing ID", + args: args{ + cond: &store.Condition{ + ConfigurationId: uuid.New().String(), + Owner: "owner", + }, + }, + wantErr: true, + }, + { + name: "invalid ID", + args: args{ + cond: &store.Condition{ + Id: "invalid", + ConfigurationId: uuid.New().String(), + Owner: "owner", + }, + }, + wantErr: true, + }, + { + name: "missing ConfigurationId", + args: args{ + cond: &store.Condition{ + Id: uuid.New().String(), + Owner: "owner", + }, + }, + wantErr: true, + }, + { + name: "invalid ID", + args: args{ + cond: &store.Condition{ + Id: uuid.New().String(), + ConfigurationId: "invalid", + Owner: "owner", + }, + }, + wantErr: true, + }, + { + name: "missing Owner", + args: args{ + cond: &store.Condition{ + Id: uuid.New().String(), + ConfigurationId: uuid.New().String(), + }, + }, + wantErr: true, + }, + { + name: "duplicit ID", + args: args{ + cond: &store.Condition{ + Id: condID, + ConfigurationId: uuid.New().String(), + Owner: "owner", + }, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := s.CreateCondition(ctx, tt.args.cond) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + +func TestStoreLoadConditionsByDeviceIDAndOwner(t *testing.T) { + s, cleanUpStore := test.NewMongoStore(t) + defer cleanUpStore() + + ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT) + defer cancel() + + const deviceID1 = "deviceID1" + const deviceID2 = "deviceID2" + const deviceID3 = "deviceID3" + const owner1 = "owner1" + const owner2 = "owner2" + cond1 := &store.Condition{ + Id: uuid.New().String(), + Name: "c1", + ConfigurationId: uuid.New().String(), + Owner: owner1, + } + err := s.CreateCondition(ctx, cond1) + require.NoError(t, err) + + cond2 := &store.Condition{ + Id: uuid.New().String(), + Name: "c2", + ConfigurationId: uuid.New().String(), + DeviceIdFilter: []string{deviceID1}, + Owner: owner1, + } + err = s.CreateCondition(ctx, cond2) + require.NoError(t, err) + + cond3 := &store.Condition{ + Id: uuid.New().String(), + Name: "c3", + ConfigurationId: uuid.New().String(), + DeviceIdFilter: []string{deviceID2}, + Owner: owner1, + } + err = s.CreateCondition(ctx, cond3) + require.NoError(t, err) + + cond4 := &store.Condition{ + Id: uuid.New().String(), + Name: "c4", + ConfigurationId: uuid.New().String(), + DeviceIdFilter: []string{deviceID1, deviceID3}, + Owner: owner1, + } + err = s.CreateCondition(ctx, cond4) + require.NoError(t, err) + + cond5 := &store.Condition{ + Id: uuid.New().String(), + Name: "c5", + ConfigurationId: uuid.New().String(), + DeviceIdFilter: []string{deviceID3}, + Owner: owner2, + } + err = s.CreateCondition(ctx, cond5) + require.NoError(t, err) + + type args struct { + query *store.ConditionsQuery + owner string + } + + tests := []struct { + name string + args args + wantErr bool + want pb.Conditions + }{ + { + name: "all", + args: args{ + query: &store.ConditionsQuery{}, + }, + want: pb.Conditions{cond1, cond2, cond3, cond4, cond5}, + }, + { + name: "deviceID3", + args: args{ + query: &store.ConditionsQuery{ + DeviceID: deviceID3, + }, + }, + want: pb.Conditions{cond1, cond4, cond5}, + }, + { + name: "owner1", + args: args{ + query: &store.ConditionsQuery{}, + owner: owner1, + }, + want: pb.Conditions{cond1, cond2, cond3, cond4}, + }, + { + name: "owner1/deviceID1", + args: args{ + query: &store.ConditionsQuery{ + DeviceID: deviceID1, + }, + owner: owner1, + }, + want: pb.Conditions{cond1, cond2, cond4}, + }, + { + name: "owner1/deviceID2", + args: args{ + query: &store.ConditionsQuery{ + DeviceID: deviceID2, + }, + owner: owner1, + }, + want: pb.Conditions{cond1, cond3}, + }, + { + name: "owner1/new deviceID", + args: args{ + query: &store.ConditionsQuery{ + DeviceID: uuid.New().String(), + }, + owner: owner1, + }, + want: pb.Conditions{cond1}, + }, + { + name: "owner2/deviceID1", + args: args{ + query: &store.ConditionsQuery{ + DeviceID: deviceID1, + }, + owner: owner2, + }, + want: pb.Conditions{}, + }, + { + name: "owner2/deviceID3", + args: args{ + query: &store.ConditionsQuery{ + DeviceID: deviceID3, + }, + owner: owner2, + }, + want: pb.Conditions{cond5}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var got pb.Conditions + err := s.LoadConditions(ctx, tt.args.owner, tt.args.query, func(ctx context.Context, iter store.ConditionIter) error { + var c store.Condition + for iter.Next(ctx, &c) { + cond := &store.Condition{} + cond.CopyData(&c) + got = append(got, cond) + } + return iter.Err() + }) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + require.Equal(t, len(tt.want), len(got)) + if len(tt.want) == len(got) { + got.Sort() + tt.want.Sort() + for i := range got { + test.CmpCondition(t, tt.want[i], got[i]) + } + } + }) + } +} diff --git a/snapshot-service/store/mongodb/store.go b/snapshot-service/store/mongodb/store.go index c4265d6b9..5b2c095b3 100644 --- a/snapshot-service/store/mongodb/store.go +++ b/snapshot-service/store/mongodb/store.go @@ -15,7 +15,7 @@ type Store struct { *pkgMongo.Store } -const snapshotsCol = "snapshots" +const conditionsCol = "conditions" func New(ctx context.Context, cfg *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) { certManager, err := client.New(cfg.Mongo.TLS, fileWatcher, logger) @@ -34,7 +34,7 @@ func New(ctx context.Context, cfg *Config, fileWatcher *fsnotify.Watcher, logger } func (s *Store) clearDatabases(ctx context.Context) error { - return s.Collection(snapshotsCol).Drop(ctx) + return s.Collection(conditionsCol).Drop(ctx) } func (s *Store) Close(ctx context.Context) error { diff --git a/snapshot-service/store/store.go b/snapshot-service/store/store.go index a1c489dea..f736ee808 100644 --- a/snapshot-service/store/store.go +++ b/snapshot-service/store/store.go @@ -2,8 +2,33 @@ package store import ( "context" + "errors" ) +type ( + ConditionsQuery struct { + DeviceID string + } +) + +type ConditionIter interface { + Next(ctx context.Context, condition *Condition) bool + Err() error +} + +type ( + LoadConditionsFunc = func(ctx context.Context, iter ConditionIter) (err error) +) + +var ErrNotSupported = errors.New("not supported") + type Store interface { + // CreateCondition creates a new condition. If the condition already exists, it will throw an error. + CreateCondition(ctx context.Context, condition *Condition) error + // UpdateSigningRecord updates an existing signing record. If the record does not exist, it will create a new one. + // UpdateSigningRecord(ctx context.Context, record *SigningRecord) error + // DeleteSigningRecords(ctx context.Context, ownerID string, query *DeleteSigningRecordsQuery) (int64, error) + LoadConditions(ctx context.Context, ownerID string, query *ConditionsQuery, h LoadConditionsFunc) error + Close(ctx context.Context) error } diff --git a/snapshot-service/test/test.go b/snapshot-service/test/test.go new file mode 100644 index 000000000..a3aec27c2 --- /dev/null +++ b/snapshot-service/test/test.go @@ -0,0 +1,17 @@ +package test + +import ( + "testing" + + "github.com/plgd-dev/hub/v2/snapshot-service/store" + "github.com/plgd-dev/kit/v2/codec/json" + "github.com/stretchr/testify/require" +) + +func CmpCondition(t *testing.T, want, got *store.Condition) { + wantJson, err := json.Encode(want) + require.NoError(t, err) + gotJson, err := json.Encode(got) + require.NoError(t, err) + require.JSONEq(t, string(wantJson), string(gotJson)) +}