diff --git a/Makefile b/Makefile
index 3e5cbf8fe..afdd4f2f3 100644
--- a/Makefile
+++ b/Makefile
@@ -115,6 +115,8 @@ scylla: scylla/clean
--entrypoint /bin/cp \
scylladb/scylla \
/etc/scylla/scylla.yaml /etc-scylla-tmp/scylla.yaml
+ sudo chown $(shell whoami) $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml
+
yq -i '.server_encryption_options.internode_encryption="all"' $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml
yq -i '.server_encryption_options.certificate="/certs/http.crt"' $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml
yq -i '.server_encryption_options.keyfile="/certs/http.key"' $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml
diff --git a/certificate-authority/store/cqldb/signingRecords.go b/certificate-authority/store/cqldb/signingRecords.go
index 73924579c..feaa18b9b 100644
--- a/certificate-authority/store/cqldb/signingRecords.go
+++ b/certificate-authority/store/cqldb/signingRecords.go
@@ -76,7 +76,7 @@ func (s *Store) CreateSigningRecord(ctx context.Context, signingRecord *store.Si
if err != nil {
return err
}
- applied, err := s.client.Session().Query(insertQuery).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil)
+ applied, err := s.Session().Query(insertQuery).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil)
if err != nil {
return err
}
@@ -95,7 +95,7 @@ func (s *Store) UpdateSigningRecord(ctx context.Context, signingRecord *store.Si
return err
}
- return s.client.Session().Query(insertQuery).WithContext(ctx).Exec()
+ return s.Session().Query(insertQuery).WithContext(ctx).Exec()
}
func stringsToCQLSet(filter []string, str bool) string {
@@ -232,7 +232,7 @@ func (s *Store) readPrimaryKeys(ctx context.Context, where string) (primaryKeysV
b.WriteString(s.Table())
b.WriteString(" " + cqldb.WhereClause + " ")
b.WriteString(where)
- iter := s.client.Session().Query(b.String()).WithContext(ctx).Iter()
+ iter := s.Session().Query(b.String()).WithContext(ctx).Iter()
defer iter.Close()
return readPrimaryKeys(iter)
}
@@ -323,7 +323,7 @@ func (s *Store) DeleteSigningRecords(ctx context.Context, owner string, query *s
b.WriteString(" IF EXISTS")
var count int64
- applied, err := s.client.Session().Query(b.String()).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil)
+ applied, err := s.Session().Query(b.String()).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil)
if err == nil {
if applied {
count++
@@ -377,7 +377,7 @@ func (i *SigningRecordsIterator) nextQuery() bool {
b.WriteString(i.s.Table())
b.WriteString(" " + cqldb.WhereClause + " ")
b.WriteString(i.queries[i.queriesIdx])
- i.iter = i.s.client.Session().Query(b.String()).WithContext(i.ctx).Iter()
+ i.iter = i.s.Session().Query(b.String()).WithContext(i.ctx).Iter()
i.queriesIdx++
return true
}
diff --git a/certificate-authority/store/cqldb/store.go b/certificate-authority/store/cqldb/store.go
index ebdcf0075..ea8fcc084 100644
--- a/certificate-authority/store/cqldb/store.go
+++ b/certificate-authority/store/cqldb/store.go
@@ -33,11 +33,8 @@ type Index struct {
// clustering key: deviceIDKey
var primaryKey = []string{idKey, ownerKey, commonNameKey}
-// Store implements an Store for cqldb.
type Store struct {
- client *cqldb.Client
- config *Config
- logger log.Logger
+ *cqldb.Store
}
func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) {
@@ -92,36 +89,6 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config *
}
return &Store{
- client: client,
- logger: logger,
- config: config,
+ Store: cqldb.NewStore(config.Table, client, logger),
}, nil
}
-
-// Clear clears the event storage.
-func (s *Store) Clear(ctx context.Context) error {
- err := s.client.DropKeyspace(ctx)
- if err != nil {
- return fmt.Errorf("cannot clear: %w", err)
- }
- return nil
-}
-
-func (s *Store) Table() string {
- return s.client.Keyspace() + "." + s.config.Table
-}
-
-// Clear documents in collections, but don't drop the database or the collections
-func (s *Store) ClearTable(ctx context.Context) error {
- return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec()
-}
-
-// Close closes the database session.
-func (s *Store) Close(_ context.Context) error {
- s.client.Close()
- return nil
-}
-
-func (s *Store) AddCloseFunc(f func()) {
- s.client.AddCloseFunc(f)
-}
diff --git a/identity-store/persistence/cqldb/persist.go b/identity-store/persistence/cqldb/persist.go
index c4875c8e2..5e3702989 100644
--- a/identity-store/persistence/cqldb/persist.go
+++ b/identity-store/persistence/cqldb/persist.go
@@ -25,7 +25,7 @@ type PersistenceTx struct {
// tx := s.persistence.NewTransaction()
// defer tx.Close()
func (s *Store) NewTransaction(ctx context.Context) persistence.PersistenceTx {
- return &PersistenceTx{tx: s.client.Session(), table: s.Table(), err: nil, ctx: ctx}
+ return &PersistenceTx{tx: s.Session(), table: s.Table(), err: nil, ctx: ctx}
}
func (p *PersistenceTx) retrieveDeviceByQuery(whereCondition string) (_ *persistence.AuthorizedDevice, ok bool, err error) {
diff --git a/identity-store/persistence/cqldb/persist_test.go b/identity-store/persistence/cqldb/persist_test.go
index 4aefa1071..ef573bd58 100644
--- a/identity-store/persistence/cqldb/persist_test.go
+++ b/identity-store/persistence/cqldb/persist_test.go
@@ -314,7 +314,7 @@ func TestPersistenceTxPersist(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- err := p.TruncateTable(ctx)
+ err := p.ClearTable(ctx)
require.NoError(t, err)
if tt.actionBeforePersist != nil {
tt.actionBeforePersist(t, tx)
diff --git a/identity-store/persistence/cqldb/store.go b/identity-store/persistence/cqldb/store.go
index 2c73504d9..f593b2821 100644
--- a/identity-store/persistence/cqldb/store.go
+++ b/identity-store/persistence/cqldb/store.go
@@ -32,9 +32,7 @@ var indexes = []cqldb.Index{
// Store implements an Store for cqldb.
type Store struct {
- client *cqldb.Client
- config *Config
- logger log.Logger
+ *cqldb.Store
}
func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) {
@@ -90,37 +88,6 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config *
}
return &Store{
- client: client,
- logger: logger,
- config: config,
+ Store: cqldb.NewStore(config.Table, client, logger),
}, nil
}
-
-// Clear clears the event storage.
-func (s *Store) Clear(ctx context.Context) error {
- err := s.client.DropKeyspace(ctx)
- if err != nil {
- return fmt.Errorf("cannot clear: %w", err)
- }
-
- return nil
-}
-
-func (s *Store) Table() string {
- return s.client.Keyspace() + "." + s.config.Table
-}
-
-// Truncate records in table, but don't drop the keybase or the table.
-func (s *Store) TruncateTable(ctx context.Context) error {
- return s.client.Session().Query("TRUNCATE TABLE " + s.Table()).WithContext(ctx).Exec()
-}
-
-// Close closes the database session.
-func (s *Store) Close(_ context.Context) error {
- s.client.Close()
- return nil
-}
-
-func (s *Store) AddCloseFunc(f func()) {
- s.client.AddCloseFunc(f)
-}
diff --git a/pkg/cqldb/store.go b/pkg/cqldb/store.go
new file mode 100644
index 000000000..4162b58bc
--- /dev/null
+++ b/pkg/cqldb/store.go
@@ -0,0 +1,55 @@
+package cqldb
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/gocql/gocql"
+ "github.com/plgd-dev/hub/v2/pkg/log"
+)
+
+// Store implements an Store for cqldb.
+type Store struct {
+ table string
+ client *Client
+ logger log.Logger
+}
+
+func NewStore(table string, client *Client, logger log.Logger) *Store {
+ return &Store{
+ table: table,
+ client: client,
+ logger: logger,
+ }
+}
+
+func (s *Store) Table() string {
+ return s.client.Keyspace() + "." + s.table
+}
+
+func (s *Store) Session() *gocql.Session {
+ return s.client.Session()
+}
+
+func (s *Store) AddCloseFunc(f func()) {
+ s.client.AddCloseFunc(f)
+}
+
+func (s *Store) Clear(ctx context.Context) error {
+ err := s.client.DropKeyspace(ctx)
+ if err != nil {
+ return fmt.Errorf("cannot clear: %w", err)
+ }
+ return nil
+}
+
+// Clear documents in collections, but don't drop the database or the collections
+func (s *Store) ClearTable(ctx context.Context) error {
+ return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec()
+}
+
+// Close closes the database session.
+func (s *Store) Close(_ context.Context) error {
+ s.client.Close()
+ return nil
+}
diff --git a/resource-aggregate/cqrs/eventstore/cqldb/delete.go b/resource-aggregate/cqrs/eventstore/cqldb/delete.go
index a5240ba60..26c878c92 100644
--- a/resource-aggregate/cqrs/eventstore/cqldb/delete.go
+++ b/resource-aggregate/cqrs/eventstore/cqldb/delete.go
@@ -33,5 +33,5 @@ func (s *EventStore) Delete(ctx context.Context, queries []eventstore.DeleteQuer
return errors.New("failed to delete documents: invalid query")
}
- return s.client.Session().Query("delete from " + s.Table() + " " + cqldb.WhereClause + " " + deviceIDKey + " in (" + deviceIDFilter + ");").WithContext(ctx).Exec()
+ return s.Session().Query("delete from " + s.Table() + " " + cqldb.WhereClause + " " + deviceIDKey + " in (" + deviceIDFilter + ");").WithContext(ctx).Exec()
}
diff --git a/resource-aggregate/cqrs/eventstore/cqldb/eventstore.go b/resource-aggregate/cqrs/eventstore/cqldb/eventstore.go
index bd9fc0a9f..ab63d6ef7 100644
--- a/resource-aggregate/cqrs/eventstore/cqldb/eventstore.go
+++ b/resource-aggregate/cqrs/eventstore/cqldb/eventstore.go
@@ -42,13 +42,9 @@ var indexes = []cqldb.Index{
// EventStore implements an EventStore for cqldb.
type EventStore struct {
- client *cqldb.Client
- config *Config
- logger log.Logger
-}
-
-func (s *EventStore) AddCloseFunc(f func()) {
- s.client.AddCloseFunc(f)
+ *cqldb.Store
+ marshalerFunc MarshalerFunc
+ unmarshalerFunc UnmarshalerFunc
}
func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider, opts ...Option) (*EventStore, error) {
@@ -124,9 +120,9 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config *
}
return &EventStore{
- client: client,
- logger: logger,
- config: config,
+ Store: cqldb.NewStore(config.Table, client, logger),
+ marshalerFunc: config.marshalerFunc,
+ unmarshalerFunc: config.unmarshalerFunc,
}, nil
}
@@ -151,28 +147,3 @@ func getLatestEventsSnapshot(events []eventstore.Event, marshaler MarshalerFunc)
}
return lastEvent, snapshot, nil
}
-
-// Clear clears the event storage.
-func (s *EventStore) Clear(ctx context.Context) error {
- err := s.client.DropKeyspace(ctx)
- if err != nil {
- return fmt.Errorf("cannot clear: %w", err)
- }
-
- return nil
-}
-
-func (s *EventStore) Table() string {
- return s.client.Keyspace() + "." + s.config.Table
-}
-
-// Clear documents in collections, but don't drop the database or the collections
-func (s *EventStore) ClearTable(ctx context.Context) error {
- return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec()
-}
-
-// Close closes the database session.
-func (s *EventStore) Close(_ context.Context) error {
- s.client.Close()
- return nil
-}
diff --git a/resource-aggregate/cqrs/eventstore/cqldb/getLatestDeviceETags.go b/resource-aggregate/cqrs/eventstore/cqldb/getLatestDeviceETags.go
index b3f052fcb..858a7b0c1 100644
--- a/resource-aggregate/cqrs/eventstore/cqldb/getLatestDeviceETags.go
+++ b/resource-aggregate/cqrs/eventstore/cqldb/getLatestDeviceETags.go
@@ -49,7 +49,7 @@ func (s *EventStore) GetLatestDeviceETags(ctx context.Context, deviceID string,
q.WriteString(" " + cqldb.WhereClause)
q.WriteString(" " + deviceIDKey + "=" + deviceID)
- iter := s.client.Session().Query(q.String()).WithContext(ctx).Iter()
+ iter := s.Session().Query(q.String()).WithContext(ctx).Iter()
if iter == nil {
return nil, errors.New("cannot create iterator")
}
diff --git a/resource-aggregate/cqrs/eventstore/cqldb/load.go b/resource-aggregate/cqrs/eventstore/cqldb/load.go
index d0d1b57d9..3e14ae108 100644
--- a/resource-aggregate/cqrs/eventstore/cqldb/load.go
+++ b/resource-aggregate/cqrs/eventstore/cqldb/load.go
@@ -109,8 +109,8 @@ func (s *EventStore) loadEventsQuery(ctx context.Context, eh eventstore.Handler,
q.WriteString(" " + cqldb.WhereClause + " " + filter)
}
q.WriteString(";")
- iter := s.client.Session().Query(q.String()).WithContext(ctx).Iter()
- i := newIterator(iter, s.config.unmarshalerFunc)
+ iter := s.Session().Query(q.String()).WithContext(ctx).Iter()
+ i := newIterator(iter, s.unmarshalerFunc)
err := eh.Handle(ctx, i)
errClose := iter.Close()
if err == nil {
diff --git a/resource-aggregate/cqrs/eventstore/cqldb/loadDeviceMetadataByServiceIDs.go b/resource-aggregate/cqrs/eventstore/cqldb/loadDeviceMetadataByServiceIDs.go
index efb8c78f0..770199d59 100644
--- a/resource-aggregate/cqrs/eventstore/cqldb/loadDeviceMetadataByServiceIDs.go
+++ b/resource-aggregate/cqrs/eventstore/cqldb/loadDeviceMetadataByServiceIDs.go
@@ -21,7 +21,7 @@ func (s *EventStore) LoadDeviceMetadataByServiceIDs(ctx context.Context, service
}
serviceIDs = pkgStrings.Unique(serviceIDs)
q := cqldb.SelectCommand + " " + deviceIDKey + "," + serviceIDKey + " " + cqldb.FromClause + " " + s.Table() + " " + cqldb.WhereClause + " " + serviceIDKey + " in (" + strings.Join(serviceIDs, ",") + ") LIMIT " + strconv.FormatInt(limit, 10) + ";"
- iter := s.client.Session().Query(q).WithContext(ctx).Iter()
+ iter := s.Session().Query(q).WithContext(ctx).Iter()
if iter == nil {
return nil, errors.New("cannot create iterator")
}
diff --git a/resource-aggregate/cqrs/eventstore/cqldb/save.go b/resource-aggregate/cqrs/eventstore/cqldb/save.go
index 9d2255c35..240e19553 100644
--- a/resource-aggregate/cqrs/eventstore/cqldb/save.go
+++ b/resource-aggregate/cqrs/eventstore/cqldb/save.go
@@ -84,13 +84,13 @@ func eventsToCQLSetValue(event eventstore.Event, data []byte) string {
}
func (s *EventStore) saveEvent(ctx context.Context, events []eventstore.Event) (status eventstore.SaveStatus, err error) {
- lastEvent, snapshotBinary, err := getLatestEventsSnapshot(events, s.config.marshalerFunc)
+ lastEvent, snapshotBinary, err := getLatestEventsSnapshot(events, s.marshalerFunc)
if err != nil {
return eventstore.Fail, err
}
setters := eventsToCQLSetValue(lastEvent, snapshotBinary)
- q := "update " + s.client.Keyspace() + "." + s.config.Table + " set " + setters + " " + cqldb.WhereClause + " " + deviceIDKey + "=" + lastEvent.GroupID() + " and " + idKey + "=" + lastEvent.AggregateID() + " if " + versionKey + "=" + strconv.FormatUint(events[0].Version()-1, 10) + ";"
- ok, err := s.client.Session().Query(q).WithContext(ctx).ScanCAS(nil)
+ q := "update " + s.Table() + " set " + setters + " " + cqldb.WhereClause + " " + deviceIDKey + "=" + lastEvent.GroupID() + " and " + idKey + "=" + lastEvent.AggregateID() + " if " + versionKey + "=" + strconv.FormatUint(events[0].Version()-1, 10) + ";"
+ ok, err := s.Session().Query(q).WithContext(ctx).ScanCAS(nil)
if err != nil {
return eventstore.Fail, fmt.Errorf("cannot update snapshot event('%v'): %w", events, err)
}
@@ -112,7 +112,7 @@ func (s *EventStore) Save(ctx context.Context, events ...eventstore.Event) (even
if events[0].Version() != 0 {
return s.saveEvent(ctx, events)
}
- lastEvent, data, err := getLatestEventsSnapshot(events, s.config.marshalerFunc)
+ lastEvent, data, err := getLatestEventsSnapshot(events, s.marshalerFunc)
if err != nil {
return eventstore.Fail, err
}
@@ -121,7 +121,7 @@ func (s *EventStore) Save(ctx context.Context, events ...eventstore.Event) (even
values := kvs.Values()
q := "insert into " + s.Table() + " (" + strings.Join(keys, ",") + ") values (" + strings.Join(values, ",") + ") if not exists;"
- ok, err := s.client.Session().Query(q).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil, nil, nil, nil, nil)
+ ok, err := s.Session().Query(q).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil, nil, nil, nil, nil)
if err != nil {
if errors.Is(err, gocql.ErrNotFound) {
return eventstore.Ok, nil
diff --git a/snapshot-service/.plantuml b/snapshot-service/.plantuml
deleted file mode 100644
index dfd743248..000000000
--- a/snapshot-service/.plantuml
+++ /dev/null
@@ -1,91 +0,0 @@
-@startuml
-
-title "Rule Execution Flow"
-
-actor "Client/ResourceChanged event" as Client
-participant "Rule Engine" as RuleEngine
-participant "Rule Engine (Evaluation)" as RuleEvaluationService
-database "RuleActionLink Store" as RuleActionLinkStore
-database "AppliedRuleActionLink Store" as AppliedRuleActionLinkStore
-database "Event Store" as EventStore
-control "Event Bus" as Bus
-
-participant "Rule Engine (Scene and Rule Trigger)" as SceneRuleTrigger
-participant "Resource Aggregate" as RA
-
-Client -> RuleEngine: InvokeRule
-activate RuleEngine
-
-RuleEngine -> RuleEvaluationService: EvaluateRule
-activate RuleEvaluationService
-RuleEvaluationService -> EventStore: Load all needed resources for evaluation.
-EventStore --> RuleEvaluationService: Resources
-
-alt Rule evaluation successful
- RuleEvaluationService --> RuleEngine: Success
- RuleEngine -> RuleActionLinkStore: GetAssociatedRuleActionLinks
- activate RuleActionLinkStore
-
- loop for each RuleActionLink
- RuleActionLinkStore --> RuleEngine: RuleActionLink
- RuleEngine -> AppliedRuleActionLinkStore: CreateAppliedRuleActionLink
- activate AppliedRuleActionLinkStore
- AppliedRuleActionLinkStore --> RuleEngine: AppliedRuleActionLink
- RuleEngine -> SceneRuleTrigger: ApplyScenesAndTriggerRules
- activate SceneRuleTrigger
- loop for each resource in scene
- SceneRuleTrigger -> RA: Update resource from scene
- RA -> Bus: Resource Update Pending
- Bus --> SceneRuleTrigger: Resource updated
- end
- loop for each trigger rule
- SceneRuleTrigger -> RuleEngine: Invoke rule
- RuleEngine --> SceneRuleTrigger: Success
- end
- SceneRuleTrigger --> RuleEngine: Success
- deactivate SceneRuleTrigger
- RuleEngine -> AppliedRuleActionLinkStore: UpdateStatus
- AppliedRuleActionLinkStore --> RuleEngine: Updated Status
- deactivate AppliedRuleActionLinkStore
- end
-
-else Rule evaluation failed
- RuleEvaluationService --> RuleEngine: Failure
-end
-deactivate RuleEvaluationService
-
-RuleEngine --> Client: InvokeRuleResponse
-deactivate RuleEngine
-@enduml
-
-@startuml
-
-title "Create Configuration Workflow""
-actor Client
-participant "Rule Engine" as RuleEngine
-database "Rules Store" as RulesStore
-database "RuleActionLink Store" as RuleActionLinkStore
-database "Scene Store" as SceneStore
-
-Client -> RuleEngine: CreateSceneRequest
-activate RuleEngine
-RuleEngine -> SceneStore: Scene
-SceneStore --> RuleEngine: id
-RuleEngine --> Client: CreateSceneResponse
-deactivate RuleEngine
-
-Client -> RuleEngine: CreateRuleRequest
-activate RuleEngine
-RuleEngine -> RulesStore: Rule
-RulesStore --> RuleEngine: id
-RuleEngine --> Client: CreateRuleResponse
-deactivate RuleEngine
-
-Client -> RuleEngine: CreateRuleActionLinkRequest
-activate RuleEngine
-RuleEngine -> RuleActionLinkStore: RuleActionLink
-RuleActionLinkStore --> RuleEngine: id
-RuleEngine --> Client: CreateRuleActionLinkResponse
-deactivate RuleEngine
-
-@enduml
diff --git a/snapshot-service/Makefile b/snapshot-service/Makefile
index 3d4969461..25a1a528d 100644
--- a/snapshot-service/Makefile
+++ b/snapshot-service/Makefile
@@ -48,6 +48,7 @@ GRPCGATEWAY_MODULE_PATH := $(shell go list -m -f '{{.Dir}}' github.com/grpc-ecos
proto/generate:
protoc -I=. -I=$(REPOSITORY_DIRECTORY) -I=$(GOPATH)/src -I=$(GOOGLEAPIS_PATH) -I=$(GRPCGATEWAY_MODULE_PATH) --go_out=$(GOPATH)/src $(WORKING_DIRECTORY)/pb/service.proto
+ protoc-go-inject-tag -remove_tag_comment -input=$(WORKING_DIRECTORY)/pb/service.pb.go
protoc -I=. -I=$(REPOSITORY_DIRECTORY) -I=$(GOPATH)/src -I=$(GOOGLEAPIS_PATH) -I=$(GRPCGATEWAY_MODULE_PATH) --openapiv2_out=$(REPOSITORY_DIRECTORY) \
--openapiv2_opt logtostderr=true \
$(WORKING_DIRECTORY)/pb/service.proto
diff --git a/snapshot-service/pb/README.md b/snapshot-service/pb/README.md
index 93920f126..e4bbc3699 100644
--- a/snapshot-service/pb/README.md
+++ b/snapshot-service/pb/README.md
@@ -100,17 +100,25 @@ driven by resource change event
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
-| id | [string](#string) | | |
-| version | [uint64](#uint64) | | |
-| name | [string](#string) | | |
-| enabled | [bool](#bool) | | |
-| configuration_id | [string](#string) | | |
-| device_id_filter | [string](#string) | repeated | |
-| resource_type_filter | [string](#string) | repeated | |
-| resource_href_filter | [string](#string) | repeated | |
-| jq_expression_filter | [string](#string) | | |
-| api_access_token | [string](#string) | | token used to update resources in the configuration |
-| owner | [string](#string) | | |
+| id | [string](#string) | | Unique condition ID
+
+@gotags: bson:"_id" |
+| version | [uint64](#uint64) | | @gotags: bson:"version" |
+| name | [string](#string) | | @gotags: bson:"name,omitempty" |
+| enabled | [bool](#bool) | | @gotags: bson:"enabled,omitempty" |
+| configuration_id | [string](#string) | | ID of the configuration to be applied when the condition is satisfied
+
+@gotags: bson:"configurationId" |
+| device_id_filter | [string](#string) | repeated | @gotags: bson:"deviceIdFilter,omitempty" |
+| resource_type_filter | [string](#string) | repeated | @gotags: bson:"resourceTypeFilter,omitempty" |
+| resource_href_filter | [string](#string) | repeated | @gotags: bson:"resourceHrefFilter,omitempty" |
+| jq_expression_filter | [string](#string) | | @gotags: bson:"jqExpressionFilter,omitempty" |
+| api_access_token | [string](#string) | | Token used to update resources in the configuration
+
+@gotags: bson:"-" |
+| owner | [string](#string) | | Condition owner
+
+@gotags: bson:"owner" |
diff --git a/snapshot-service/pb/condition.go b/snapshot-service/pb/condition.go
new file mode 100644
index 000000000..37874e0ab
--- /dev/null
+++ b/snapshot-service/pb/condition.go
@@ -0,0 +1,51 @@
+package pb
+
+import (
+ "errors"
+ "fmt"
+ "slices"
+ "sort"
+
+ "github.com/google/uuid"
+)
+
+type Conditions []*Condition
+
+func (cs Conditions) Sort() {
+ sort.Slice(cs, func(i, j int) bool {
+ return cs[i].GetId() < cs[j].GetId()
+ })
+}
+
+func (c *Condition) CopyData(condition *Condition) {
+ c.Id = condition.GetId()
+ c.Version = condition.GetVersion()
+ c.Name = condition.GetName()
+ c.Enabled = condition.GetEnabled()
+ c.ConfigurationId = condition.GetConfigurationId()
+ c.DeviceIdFilter = slices.Clone(condition.GetDeviceIdFilter())
+ c.ResourceTypeFilter = slices.Clone(condition.GetResourceTypeFilter())
+ c.ResourceHrefFilter = slices.Clone(condition.GetResourceHrefFilter())
+ c.JqExpressionFilter = condition.GetJqExpressionFilter()
+ c.ApiAccessToken = condition.GetApiAccessToken()
+ c.Owner = condition.GetOwner()
+}
+
+func (c *Condition) Validate() error {
+ if c.GetId() == "" {
+ return errors.New("empty condition ID")
+ }
+ if _, err := uuid.Parse(c.GetId()); err != nil {
+ return fmt.Errorf("invalid condition ID(%v): %w", c.GetId(), err)
+ }
+ if c.GetConfigurationId() == "" {
+ return errors.New("empty configuration ID")
+ }
+ if _, err := uuid.Parse(c.GetConfigurationId()); err != nil {
+ return fmt.Errorf("invalid configuration ID(%v): %w", c.GetConfigurationId(), err)
+ }
+ if c.GetOwner() == "" {
+ return errors.New("empty condition owner")
+ }
+ return nil
+}
diff --git a/snapshot-service/pb/doc.html b/snapshot-service/pb/doc.html
index d9d66cfb1..7f945e7a0 100644
--- a/snapshot-service/pb/doc.html
+++ b/snapshot-service/pb/doc.html
@@ -428,77 +428,85 @@
Condition
id |
string |
|
- |
+ Unique condition ID
+
+@gotags: bson:"_id" |
version |
uint64 |
|
- |
+ @gotags: bson:"version" |
name |
string |
|
- |
+ @gotags: bson:"name,omitempty" |
enabled |
bool |
|
- |
+ @gotags: bson:"enabled,omitempty" |
configuration_id |
string |
|
- |
+ ID of the configuration to be applied when the condition is satisfied
+
+@gotags: bson:"configurationId" |
device_id_filter |
string |
repeated |
- |
+ @gotags: bson:"deviceIdFilter,omitempty" |
resource_type_filter |
string |
repeated |
- |
+ @gotags: bson:"resourceTypeFilter,omitempty" |
resource_href_filter |
string |
repeated |
- |
+ @gotags: bson:"resourceHrefFilter,omitempty" |
jq_expression_filter |
string |
|
- |
+ @gotags: bson:"jqExpressionFilter,omitempty" |
api_access_token |
string |
|
- token used to update resources in the configuration |
+ Token used to update resources in the configuration
+
+@gotags: bson:"-" |
owner |
string |
|
- |
+ Condition owner
+
+@gotags: bson:"owner" |
diff --git a/snapshot-service/pb/service.pb.go b/snapshot-service/pb/service.pb.go
index 46b44cf4c..ae140bec8 100644
--- a/snapshot-service/pb/service.pb.go
+++ b/snapshot-service/pb/service.pb.go
@@ -193,17 +193,21 @@ type Condition struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
- Version uint64 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"`
- Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"`
- Enabled bool `protobuf:"varint,4,opt,name=enabled,proto3" json:"enabled,omitempty"`
- ConfigurationId string `protobuf:"bytes,5,opt,name=configuration_id,json=configurationId,proto3" json:"configuration_id,omitempty"`
- DeviceIdFilter []string `protobuf:"bytes,6,rep,name=device_id_filter,json=deviceIdFilter,proto3" json:"device_id_filter,omitempty"`
- ResourceTypeFilter []string `protobuf:"bytes,7,rep,name=resource_type_filter,json=resourceTypeFilter,proto3" json:"resource_type_filter,omitempty"`
- ResourceHrefFilter []string `protobuf:"bytes,8,rep,name=resource_href_filter,json=resourceHrefFilter,proto3" json:"resource_href_filter,omitempty"`
- JqExpressionFilter string `protobuf:"bytes,9,opt,name=jq_expression_filter,json=jqExpressionFilter,proto3" json:"jq_expression_filter,omitempty"`
- ApiAccessToken string `protobuf:"bytes,10,opt,name=api_access_token,json=apiAccessToken,proto3" json:"api_access_token,omitempty"` // token used to update resources in the configuration
- Owner string `protobuf:"bytes,11,opt,name=owner,proto3" json:"owner,omitempty"`
+ // Unique condition ID
+ Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty" bson:"_id"`
+ Version uint64 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty" bson:"version"`
+ Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty" bson:"name,omitempty"`
+ Enabled bool `protobuf:"varint,4,opt,name=enabled,proto3" json:"enabled,omitempty" bson:"enabled,omitempty"`
+ // ID of the configuration to be applied when the condition is satisfied
+ ConfigurationId string `protobuf:"bytes,5,opt,name=configuration_id,json=configurationId,proto3" json:"configuration_id,omitempty" bson:"configurationId"`
+ DeviceIdFilter []string `protobuf:"bytes,6,rep,name=device_id_filter,json=deviceIdFilter,proto3" json:"device_id_filter,omitempty" bson:"deviceIdFilter,omitempty"`
+ ResourceTypeFilter []string `protobuf:"bytes,7,rep,name=resource_type_filter,json=resourceTypeFilter,proto3" json:"resource_type_filter,omitempty" bson:"resourceTypeFilter,omitempty"`
+ ResourceHrefFilter []string `protobuf:"bytes,8,rep,name=resource_href_filter,json=resourceHrefFilter,proto3" json:"resource_href_filter,omitempty" bson:"resourceHrefFilter,omitempty"`
+ JqExpressionFilter string `protobuf:"bytes,9,opt,name=jq_expression_filter,json=jqExpressionFilter,proto3" json:"jq_expression_filter,omitempty" bson:"jqExpressionFilter,omitempty"`
+ // Token used to update resources in the configuration
+ ApiAccessToken string `protobuf:"bytes,10,opt,name=api_access_token,json=apiAccessToken,proto3" json:"api_access_token,omitempty" bson:"-"`
+ // Condition owner
+ Owner string `protobuf:"bytes,11,opt,name=owner,proto3" json:"owner,omitempty" bson:"owner"`
}
func (x *Condition) Reset() {
diff --git a/snapshot-service/pb/service.proto b/snapshot-service/pb/service.proto
index f8261704b..6c8092048 100644
--- a/snapshot-service/pb/service.proto
+++ b/snapshot-service/pb/service.proto
@@ -49,17 +49,21 @@ message IDFilter {
}
message Condition { // driven by resource change event
- string id = 1;
- uint64 version = 2;
- string name = 3;
- bool enabled = 4;
- string configuration_id = 5;
- repeated string device_id_filter = 6;
- repeated string resource_type_filter = 7;
- repeated string resource_href_filter = 8;
- string jq_expression_filter = 9;
- string api_access_token = 10; // token used to update resources in the configuration
- string owner = 11;
+ // Unique condition ID
+ string id = 1; // @gotags: bson:"_id"
+ uint64 version = 2; // @gotags: bson:"version"
+ string name = 3; // @gotags: bson:"name,omitempty"
+ bool enabled = 4; // @gotags: bson:"enabled,omitempty"
+ // ID of the configuration to be applied when the condition is satisfied
+ string configuration_id = 5; // @gotags: bson:"configurationId"
+ repeated string device_id_filter = 6; // @gotags: bson:"deviceIdFilter,omitempty"
+ repeated string resource_type_filter = 7; // @gotags: bson:"resourceTypeFilter,omitempty"
+ repeated string resource_href_filter = 8; // @gotags: bson:"resourceHrefFilter,omitempty"
+ string jq_expression_filter = 9; // @gotags: bson:"jqExpressionFilter,omitempty"
+ // Token used to update resources in the configuration
+ string api_access_token = 10; // @gotags: bson:"-"
+ // Condition owner
+ string owner = 11; // @gotags: bson:"owner"
}
message GetConditionsRequest { repeated IDFilter id_filter = 1; }
diff --git a/snapshot-service/pb/service.swagger.json b/snapshot-service/pb/service.swagger.json
index 47265c7c8..bfae974db 100644
--- a/snapshot-service/pb/service.swagger.json
+++ b/snapshot-service/pb/service.swagger.json
@@ -132,6 +132,7 @@
"parameters": [
{
"name": "id",
+ "description": "Unique condition ID\n\n@gotags: bson:\"_id\"",
"in": "path",
"required": true,
"type": "string"
@@ -450,44 +451,56 @@
"properties": {
"version": {
"type": "string",
- "format": "uint64"
+ "format": "uint64",
+ "title": "@gotags: bson:\"version\""
},
"name": {
- "type": "string"
+ "type": "string",
+ "title": "@gotags: bson:\"name,omitempty\""
},
"enabled": {
- "type": "boolean"
+ "type": "boolean",
+ "title": "@gotags: bson:\"enabled,omitempty\""
},
"configurationId": {
- "type": "string"
+ "type": "string",
+ "description": "@gotags: bson:\"configurationId\"",
+ "title": "ID of the configuration to be applied when the condition is satisfied"
},
"deviceIdFilter": {
"type": "array",
"items": {
"type": "string"
- }
+ },
+ "title": "@gotags: bson:\"deviceIdFilter,omitempty\""
},
"resourceTypeFilter": {
"type": "array",
"items": {
"type": "string"
- }
+ },
+ "title": "@gotags: bson:\"resourceTypeFilter,omitempty\""
},
"resourceHrefFilter": {
"type": "array",
"items": {
"type": "string"
- }
+ },
+ "title": "@gotags: bson:\"resourceHrefFilter,omitempty\""
},
"jqExpressionFilter": {
- "type": "string"
+ "type": "string",
+ "title": "@gotags: bson:\"jqExpressionFilter,omitempty\""
},
"apiAccessToken": {
"type": "string",
- "title": "token used to update resources in the configuration"
+ "description": "@gotags: bson:\"-\"",
+ "title": "Token used to update resources in the configuration"
},
"owner": {
- "type": "string"
+ "type": "string",
+ "description": "@gotags: bson:\"owner\"",
+ "title": "Condition owner"
}
},
"title": "driven by resource change event"
@@ -603,48 +616,62 @@
"type": "object",
"properties": {
"id": {
- "type": "string"
+ "type": "string",
+ "description": "@gotags: bson:\"_id\"",
+ "title": "Unique condition ID"
},
"version": {
"type": "string",
- "format": "uint64"
+ "format": "uint64",
+ "title": "@gotags: bson:\"version\""
},
"name": {
- "type": "string"
+ "type": "string",
+ "title": "@gotags: bson:\"name,omitempty\""
},
"enabled": {
- "type": "boolean"
+ "type": "boolean",
+ "title": "@gotags: bson:\"enabled,omitempty\""
},
"configurationId": {
- "type": "string"
+ "type": "string",
+ "description": "@gotags: bson:\"configurationId\"",
+ "title": "ID of the configuration to be applied when the condition is satisfied"
},
"deviceIdFilter": {
"type": "array",
"items": {
"type": "string"
- }
+ },
+ "title": "@gotags: bson:\"deviceIdFilter,omitempty\""
},
"resourceTypeFilter": {
"type": "array",
"items": {
"type": "string"
- }
+ },
+ "title": "@gotags: bson:\"resourceTypeFilter,omitempty\""
},
"resourceHrefFilter": {
"type": "array",
"items": {
"type": "string"
- }
+ },
+ "title": "@gotags: bson:\"resourceHrefFilter,omitempty\""
},
"jqExpressionFilter": {
- "type": "string"
+ "type": "string",
+ "title": "@gotags: bson:\"jqExpressionFilter,omitempty\""
},
"apiAccessToken": {
"type": "string",
- "title": "token used to update resources in the configuration"
+ "description": "@gotags: bson:\"-\"",
+ "title": "Token used to update resources in the configuration"
},
"owner": {
- "type": "string"
+ "type": "string",
+ "description": "@gotags: bson:\"owner\"",
+ "title": "Condition owner"
}
},
"title": "driven by resource change event"
diff --git a/snapshot-service/service/condition/createCondition.go b/snapshot-service/service/condition/createCondition.go
new file mode 100644
index 000000000..ff2fec882
--- /dev/null
+++ b/snapshot-service/service/condition/createCondition.go
@@ -0,0 +1 @@
+package condition
diff --git a/snapshot-service/service/service.go b/snapshot-service/service/service.go
index 82e87789f..b6feee37e 100644
--- a/snapshot-service/service/service.go
+++ b/snapshot-service/service/service.go
@@ -23,7 +23,7 @@ import (
"go.opentelemetry.io/otel/trace"
)
-const serviceName = "certificate-authority"
+const serviceName = "snapshot-service"
func createStore(ctx context.Context, config storeConfig.Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (store.Store, error) {
switch config.Use {
diff --git a/snapshot-service/store/condition.go b/snapshot-service/store/condition.go
new file mode 100644
index 000000000..d595c94d2
--- /dev/null
+++ b/snapshot-service/store/condition.go
@@ -0,0 +1,12 @@
+package store
+
+import (
+ "github.com/plgd-dev/hub/v2/snapshot-service/pb"
+)
+
+const (
+ DeviceIDFilterKey = "deviceIdFilter" // must match with pb.Condition.DeviceIdFilter tag
+ OwnerKey = "owner" // must match with pb.Condition.Owner tag
+)
+
+type Condition = pb.Condition
diff --git a/snapshot-service/store/cqldb/condition.go b/snapshot-service/store/cqldb/condition.go
new file mode 100644
index 000000000..3d6cffcdf
--- /dev/null
+++ b/snapshot-service/store/cqldb/condition.go
@@ -0,0 +1,15 @@
+package cqldb
+
+import (
+ "context"
+
+ "github.com/plgd-dev/hub/v2/snapshot-service/store"
+)
+
+func (s *Store) CreateCondition(context.Context, *store.Condition) error {
+ return store.ErrNotSupported
+}
+
+func (s *Store) LoadConditions(context.Context, string, *store.ConditionsQuery, store.LoadConditionsFunc) error {
+ return store.ErrNotSupported
+}
diff --git a/snapshot-service/store/cqldb/store.go b/snapshot-service/store/cqldb/store.go
index ebdcf0075..7a4536d18 100644
--- a/snapshot-service/store/cqldb/store.go
+++ b/snapshot-service/store/cqldb/store.go
@@ -35,9 +35,7 @@ var primaryKey = []string{idKey, ownerKey, commonNameKey}
// Store implements an Store for cqldb.
type Store struct {
- client *cqldb.Client
- config *Config
- logger log.Logger
+ *cqldb.Store
}
func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) {
@@ -83,7 +81,7 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config *
}
if config.Table == "" {
- config.Table = "signedCertificateRecords"
+ config.Table = "snapshots"
}
err := createEventsTable(ctx, client, config.Table)
@@ -92,36 +90,6 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config *
}
return &Store{
- client: client,
- logger: logger,
- config: config,
+ Store: cqldb.NewStore(config.Table, client, logger),
}, nil
}
-
-// Clear clears the event storage.
-func (s *Store) Clear(ctx context.Context) error {
- err := s.client.DropKeyspace(ctx)
- if err != nil {
- return fmt.Errorf("cannot clear: %w", err)
- }
- return nil
-}
-
-func (s *Store) Table() string {
- return s.client.Keyspace() + "." + s.config.Table
-}
-
-// Clear documents in collections, but don't drop the database or the collections
-func (s *Store) ClearTable(ctx context.Context) error {
- return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec()
-}
-
-// Close closes the database session.
-func (s *Store) Close(_ context.Context) error {
- s.client.Close()
- return nil
-}
-
-func (s *Store) AddCloseFunc(f func()) {
- s.client.AddCloseFunc(f)
-}
diff --git a/snapshot-service/store/mongodb/condition.go b/snapshot-service/store/mongodb/condition.go
new file mode 100644
index 000000000..c2ce9857b
--- /dev/null
+++ b/snapshot-service/store/mongodb/condition.go
@@ -0,0 +1,105 @@
+package mongodb
+
+import (
+ "context"
+ "errors"
+
+ "github.com/hashicorp/go-multierror"
+ "github.com/plgd-dev/hub/v2/snapshot-service/store"
+ "go.mongodb.org/mongo-driver/bson"
+ "go.mongodb.org/mongo-driver/mongo"
+)
+
+/*
+Condition -> podmienka za akych okolnosti sa aplikuje
+ - id (identifikator) (user nevie menit)
+ - verzia (pri update sa verzia inkremente)
+ - pri ukladani checknem v DB ze predchadzajuca verzia je o 1 mensie
+ - t.j. check nenastala mi medzi tym zmena
+ - name - user-friendly meno
+ - enabled
+ - configuration id
+ - device_id_filter - OR - pride event a chcecknem ci device_id_filter obsahuje ID device
+ - ak je prazdny tak vsetko pustit
+ - resource_type_filter - AND - ked mam viac musia sa vsetky matchnut
+ - ak je prazdny tak vsetko pustit
+ - resource_href_filer - OR - musim matchnut aspon jeden href
+ - ak je prazdny tak vsetko pustit- resource_href_filter
+ - jq_expression - expression pustim nad obsahom ResourceChanged eventom, mal by vratit true/false (ci hodnota existuje)
+ - dalsia podmienka
+ https://github.com/itchyny/gojq
+ - api_access_token - TODO, zatial nechame otvorene; ked sa ide aplikovat konfiguraciu tak potrebujes token na autorizaciu
+ - owner -> musi sediet s tym co je v DB
+*/
+
+func (s *Store) CreateCondition(ctx context.Context, cond *store.Condition) error {
+ if err := cond.Validate(); err != nil {
+ return err
+ }
+ _, err := s.Collection(conditionsCol).InsertOne(ctx, cond)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+type conditionsIterator struct {
+ iter *mongo.Cursor
+}
+
+func (i *conditionsIterator) Next(ctx context.Context, s *store.Condition) bool {
+ if !i.iter.Next(ctx) {
+ return false
+ }
+ err := i.iter.Decode(s)
+ return err == nil
+}
+
+func (i *conditionsIterator) Err() error {
+ return i.iter.Err()
+}
+
+func toDeviceIDQueryFilter(deviceID string) bson.M {
+ return bson.M{"$or": []bson.D{
+ {{Key: store.DeviceIDFilterKey, Value: bson.M{"$exists": false}}},
+ {{Key: store.DeviceIDFilterKey, Value: deviceID}},
+ }}
+}
+
+func toConditionsQueryFilter(owner string, queries *store.ConditionsQuery) interface{} {
+ filter := make([]interface{}, 0, 2)
+ if owner != "" {
+ filter = append(filter, bson.D{{Key: store.OwnerKey, Value: owner}})
+ }
+ if queries.DeviceID != "" {
+ filter = append(filter, toDeviceIDQueryFilter(queries.DeviceID))
+ }
+ if len(filter) == 0 {
+ return bson.D{}
+ }
+ if len(filter) == 1 {
+ return filter[0]
+ }
+ return bson.M{"$and": filter}
+}
+
+func (s *Store) LoadConditions(ctx context.Context, owner string, query *store.ConditionsQuery, h store.LoadConditionsFunc) error {
+ col := s.Collection(conditionsCol)
+ iter, err := col.Find(ctx, toConditionsQueryFilter(owner, query))
+ if errors.Is(err, mongo.ErrNilDocument) {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ var errors *multierror.Error
+ i := conditionsIterator{
+ iter: iter,
+ }
+ err = h(ctx, &i)
+ errors = multierror.Append(errors, err)
+ errClose := iter.Close(ctx)
+ errors = multierror.Append(errors, errClose)
+ return errors.ErrorOrNil()
+}
diff --git a/snapshot-service/store/mongodb/condition_test.go b/snapshot-service/store/mongodb/condition_test.go
new file mode 100644
index 000000000..8c4ac5551
--- /dev/null
+++ b/snapshot-service/store/mongodb/condition_test.go
@@ -0,0 +1,295 @@
+package mongodb_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/plgd-dev/hub/v2/snapshot-service/pb"
+ "github.com/plgd-dev/hub/v2/snapshot-service/store"
+ "github.com/plgd-dev/hub/v2/snapshot-service/test"
+ "github.com/plgd-dev/hub/v2/test/config"
+ "github.com/stretchr/testify/require"
+)
+
+func TestStoreCreateCondition(t *testing.T) {
+ s, cleanUpStore := test.NewMongoStore(t)
+ defer cleanUpStore()
+
+ ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT)
+ defer cancel()
+
+ condID := uuid.New().String()
+
+ type args struct {
+ cond *store.Condition
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "valid",
+ args: args{
+ cond: &store.Condition{
+ Id: condID,
+ ConfigurationId: uuid.New().String(),
+ Owner: "owner",
+ },
+ },
+ },
+ {
+ name: "missing ID",
+ args: args{
+ cond: &store.Condition{
+ ConfigurationId: uuid.New().String(),
+ Owner: "owner",
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "invalid ID",
+ args: args{
+ cond: &store.Condition{
+ Id: "invalid",
+ ConfigurationId: uuid.New().String(),
+ Owner: "owner",
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "missing ConfigurationId",
+ args: args{
+ cond: &store.Condition{
+ Id: uuid.New().String(),
+ Owner: "owner",
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "invalid ID",
+ args: args{
+ cond: &store.Condition{
+ Id: uuid.New().String(),
+ ConfigurationId: "invalid",
+ Owner: "owner",
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "missing Owner",
+ args: args{
+ cond: &store.Condition{
+ Id: uuid.New().String(),
+ ConfigurationId: uuid.New().String(),
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "duplicit ID",
+ args: args{
+ cond: &store.Condition{
+ Id: condID,
+ ConfigurationId: uuid.New().String(),
+ Owner: "owner",
+ },
+ },
+ wantErr: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ err := s.CreateCondition(ctx, tt.args.cond)
+ if tt.wantErr {
+ require.Error(t, err)
+ return
+ }
+ require.NoError(t, err)
+ })
+ }
+}
+
+func TestStoreLoadConditionsByDeviceIDAndOwner(t *testing.T) {
+ s, cleanUpStore := test.NewMongoStore(t)
+ defer cleanUpStore()
+
+ ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT)
+ defer cancel()
+
+ const deviceID1 = "deviceID1"
+ const deviceID2 = "deviceID2"
+ const deviceID3 = "deviceID3"
+ const owner1 = "owner1"
+ const owner2 = "owner2"
+ cond1 := &store.Condition{
+ Id: uuid.New().String(),
+ Name: "c1",
+ ConfigurationId: uuid.New().String(),
+ Owner: owner1,
+ }
+ err := s.CreateCondition(ctx, cond1)
+ require.NoError(t, err)
+
+ cond2 := &store.Condition{
+ Id: uuid.New().String(),
+ Name: "c2",
+ ConfigurationId: uuid.New().String(),
+ DeviceIdFilter: []string{deviceID1},
+ Owner: owner1,
+ }
+ err = s.CreateCondition(ctx, cond2)
+ require.NoError(t, err)
+
+ cond3 := &store.Condition{
+ Id: uuid.New().String(),
+ Name: "c3",
+ ConfigurationId: uuid.New().String(),
+ DeviceIdFilter: []string{deviceID2},
+ Owner: owner1,
+ }
+ err = s.CreateCondition(ctx, cond3)
+ require.NoError(t, err)
+
+ cond4 := &store.Condition{
+ Id: uuid.New().String(),
+ Name: "c4",
+ ConfigurationId: uuid.New().String(),
+ DeviceIdFilter: []string{deviceID1, deviceID3},
+ Owner: owner1,
+ }
+ err = s.CreateCondition(ctx, cond4)
+ require.NoError(t, err)
+
+ cond5 := &store.Condition{
+ Id: uuid.New().String(),
+ Name: "c5",
+ ConfigurationId: uuid.New().String(),
+ DeviceIdFilter: []string{deviceID3},
+ Owner: owner2,
+ }
+ err = s.CreateCondition(ctx, cond5)
+ require.NoError(t, err)
+
+ type args struct {
+ query *store.ConditionsQuery
+ owner string
+ }
+
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ want pb.Conditions
+ }{
+ {
+ name: "all",
+ args: args{
+ query: &store.ConditionsQuery{},
+ },
+ want: pb.Conditions{cond1, cond2, cond3, cond4, cond5},
+ },
+ {
+ name: "deviceID3",
+ args: args{
+ query: &store.ConditionsQuery{
+ DeviceID: deviceID3,
+ },
+ },
+ want: pb.Conditions{cond1, cond4, cond5},
+ },
+ {
+ name: "owner1",
+ args: args{
+ query: &store.ConditionsQuery{},
+ owner: owner1,
+ },
+ want: pb.Conditions{cond1, cond2, cond3, cond4},
+ },
+ {
+ name: "owner1/deviceID1",
+ args: args{
+ query: &store.ConditionsQuery{
+ DeviceID: deviceID1,
+ },
+ owner: owner1,
+ },
+ want: pb.Conditions{cond1, cond2, cond4},
+ },
+ {
+ name: "owner1/deviceID2",
+ args: args{
+ query: &store.ConditionsQuery{
+ DeviceID: deviceID2,
+ },
+ owner: owner1,
+ },
+ want: pb.Conditions{cond1, cond3},
+ },
+ {
+ name: "owner1/new deviceID",
+ args: args{
+ query: &store.ConditionsQuery{
+ DeviceID: uuid.New().String(),
+ },
+ owner: owner1,
+ },
+ want: pb.Conditions{cond1},
+ },
+ {
+ name: "owner2/deviceID1",
+ args: args{
+ query: &store.ConditionsQuery{
+ DeviceID: deviceID1,
+ },
+ owner: owner2,
+ },
+ want: pb.Conditions{},
+ },
+ {
+ name: "owner2/deviceID3",
+ args: args{
+ query: &store.ConditionsQuery{
+ DeviceID: deviceID3,
+ },
+ owner: owner2,
+ },
+ want: pb.Conditions{cond5},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var got pb.Conditions
+ err := s.LoadConditions(ctx, tt.args.owner, tt.args.query, func(ctx context.Context, iter store.ConditionIter) error {
+ var c store.Condition
+ for iter.Next(ctx, &c) {
+ cond := &store.Condition{}
+ cond.CopyData(&c)
+ got = append(got, cond)
+ }
+ return iter.Err()
+ })
+ if tt.wantErr {
+ require.Error(t, err)
+ return
+ }
+ require.NoError(t, err)
+
+ require.Equal(t, len(tt.want), len(got))
+ if len(tt.want) == len(got) {
+ got.Sort()
+ tt.want.Sort()
+ for i := range got {
+ test.CmpCondition(t, tt.want[i], got[i])
+ }
+ }
+ })
+ }
+}
diff --git a/snapshot-service/store/mongodb/store.go b/snapshot-service/store/mongodb/store.go
index c4265d6b9..5b2c095b3 100644
--- a/snapshot-service/store/mongodb/store.go
+++ b/snapshot-service/store/mongodb/store.go
@@ -15,7 +15,7 @@ type Store struct {
*pkgMongo.Store
}
-const snapshotsCol = "snapshots"
+const conditionsCol = "conditions"
func New(ctx context.Context, cfg *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) {
certManager, err := client.New(cfg.Mongo.TLS, fileWatcher, logger)
@@ -34,7 +34,7 @@ func New(ctx context.Context, cfg *Config, fileWatcher *fsnotify.Watcher, logger
}
func (s *Store) clearDatabases(ctx context.Context) error {
- return s.Collection(snapshotsCol).Drop(ctx)
+ return s.Collection(conditionsCol).Drop(ctx)
}
func (s *Store) Close(ctx context.Context) error {
diff --git a/snapshot-service/store/store.go b/snapshot-service/store/store.go
index a1c489dea..f736ee808 100644
--- a/snapshot-service/store/store.go
+++ b/snapshot-service/store/store.go
@@ -2,8 +2,33 @@ package store
import (
"context"
+ "errors"
)
+type (
+ ConditionsQuery struct {
+ DeviceID string
+ }
+)
+
+type ConditionIter interface {
+ Next(ctx context.Context, condition *Condition) bool
+ Err() error
+}
+
+type (
+ LoadConditionsFunc = func(ctx context.Context, iter ConditionIter) (err error)
+)
+
+var ErrNotSupported = errors.New("not supported")
+
type Store interface {
+ // CreateCondition creates a new condition. If the condition already exists, it will throw an error.
+ CreateCondition(ctx context.Context, condition *Condition) error
+ // UpdateSigningRecord updates an existing signing record. If the record does not exist, it will create a new one.
+ // UpdateSigningRecord(ctx context.Context, record *SigningRecord) error
+ // DeleteSigningRecords(ctx context.Context, ownerID string, query *DeleteSigningRecordsQuery) (int64, error)
+ LoadConditions(ctx context.Context, ownerID string, query *ConditionsQuery, h LoadConditionsFunc) error
+
Close(ctx context.Context) error
}
diff --git a/snapshot-service/test/test.go b/snapshot-service/test/test.go
new file mode 100644
index 000000000..a3aec27c2
--- /dev/null
+++ b/snapshot-service/test/test.go
@@ -0,0 +1,17 @@
+package test
+
+import (
+ "testing"
+
+ "github.com/plgd-dev/hub/v2/snapshot-service/store"
+ "github.com/plgd-dev/kit/v2/codec/json"
+ "github.com/stretchr/testify/require"
+)
+
+func CmpCondition(t *testing.T, want, got *store.Condition) {
+ wantJson, err := json.Encode(want)
+ require.NoError(t, err)
+ gotJson, err := json.Encode(got)
+ require.NoError(t, err)
+ require.JSONEq(t, string(wantJson), string(gotJson))
+}