Skip to content

Commit

Permalink
fixup! snapshot-service: persistent Condition storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed May 17, 2024
1 parent 82c7726 commit 835a887
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 82 deletions.
23 changes: 12 additions & 11 deletions certificate-authority/store/mongodb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
pkgMongo "github.com/plgd-dev/hub/v2/pkg/mongodb"
"github.com/plgd-dev/hub/v2/pkg/security/certManager/client"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -18,18 +19,18 @@ type Store struct {
bulkWriter *bulkWriter
}

var DeviceIDKeyQueryIndex = bson.D{
{Key: store.DeviceIDKey, Value: 1},
{Key: store.OwnerKey, Value: 1},
var deviceIDKeyQueryIndex = mongo.IndexModel{
Keys: bson.D{
{Key: store.DeviceIDKey, Value: 1},
{Key: store.OwnerKey, Value: 1},
},
}

var CommonNameKeyQueryIndex = bson.D{
{Key: store.CommonNameKey, Value: 1},
{Key: store.OwnerKey, Value: 1},
}

var PublicKeyQueryIndex = bson.D{
{Key: store.CommonNameKey, Value: 1},
var commonNameKeyQueryIndex = mongo.IndexModel{
Keys: bson.D{
{Key: store.CommonNameKey, Value: 1},
{Key: store.OwnerKey, Value: 1},
},
}

func New(ctx context.Context, cfg *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) {
Expand All @@ -44,7 +45,7 @@ func New(ctx context.Context, cfg *Config, fileWatcher *fsnotify.Watcher, logger
}
bulkWriter := newBulkWriter(m.Collection(signingRecordsCol), cfg.BulkWrite.DocumentLimit, cfg.BulkWrite.ThrottleTime, cfg.BulkWrite.Timeout, logger)
s := Store{Store: m, bulkWriter: bulkWriter}
err = s.EnsureIndex(ctx, signingRecordsCol, CommonNameKeyQueryIndex, DeviceIDKeyQueryIndex)
err = s.EnsureIndex(ctx, signingRecordsCol, commonNameKeyQueryIndex, deviceIDKeyQueryIndex)
if err != nil {
certManager.Close()
return nil, err
Expand Down
32 changes: 20 additions & 12 deletions cloud2cloud-gateway/store/mongodb/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,32 @@ const (
initializedKey = "initialized"
)

var typeQueryIndex = bson.D{
{Key: typeKey, Value: 1},
var typeQueryIndex = mongo.IndexModel{
Keys: bson.D{
{Key: typeKey, Value: 1},
},
}

var typeDeviceIDQueryIndex = bson.D{
{Key: typeKey, Value: 1},
{Key: deviceIDKey, Value: 1},
var typeDeviceIDQueryIndex = mongo.IndexModel{
Keys: bson.D{
{Key: typeKey, Value: 1},
{Key: deviceIDKey, Value: 1},
},
}

var typeResourceIDQueryIndex = bson.D{
{Key: typeKey, Value: 1},
{Key: deviceIDKey, Value: 1},
{Key: hrefKey, Value: 1},
var typeResourceIDQueryIndex = mongo.IndexModel{
Keys: bson.D{
{Key: typeKey, Value: 1},
{Key: deviceIDKey, Value: 1},
{Key: hrefKey, Value: 1},
},
}

var typeInitializedIDQueryIndex = bson.D{
{Key: "_id", Value: 1},
{Key: initializedKey, Value: 1},
var typeInitializedIDQueryIndex = mongo.IndexModel{
Keys: bson.D{
{Key: "_id", Value: 1},
{Key: initializedKey, Value: 1},
},
}

type DBSub struct {
Expand Down
15 changes: 10 additions & 5 deletions identity-store/persistence/mongodb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,23 @@ import (
pkgMongo "github.com/plgd-dev/hub/v2/pkg/mongodb"
"github.com/plgd-dev/hub/v2/pkg/security/certManager/client"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.opentelemetry.io/otel/trace"
)

const userDevicesCName = "userdevices"

var userDeviceQueryIndex = bson.D{
{Key: ownerKey, Value: 1},
{Key: deviceIDKey, Value: 1},
var userDeviceQueryIndex = mongo.IndexModel{
Keys: bson.D{
{Key: ownerKey, Value: 1},
{Key: deviceIDKey, Value: 1},
},
}

var userDevicesQueryIndex = bson.D{
{Key: ownerKey, Value: 1},
var userDevicesQueryIndex = mongo.IndexModel{
Keys: bson.D{
{Key: ownerKey, Value: 1},
},
}

type Store struct {
Expand Down
14 changes: 4 additions & 10 deletions pkg/mongodb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/hashicorp/go-multierror"
"github.com/plgd-dev/hub/v2/pkg/fn"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
Expand Down Expand Up @@ -59,7 +58,7 @@ func NewStore(ctx context.Context, cfg *Config, tls *tls.Config, tracerProvider
return s, nil
}

func NewStoreWithCollection(ctx context.Context, cfg *Config, tls *tls.Config, tracerProvider trace.TracerProvider, collection string, indexes ...bson.D) (*Store, error) {
func NewStoreWithCollection(ctx context.Context, cfg *Config, tls *tls.Config, tracerProvider trace.TracerProvider, collection string, indexes ...mongo.IndexModel) (*Store, error) {
s, err := NewStore(ctx, cfg, tls, tracerProvider)
if err != nil {
return nil, err
Expand All @@ -76,7 +75,7 @@ func NewStoreWithCollection(ctx context.Context, cfg *Config, tls *tls.Config, t
return s, nil
}

func (s *Store) EnsureIndex(ctx context.Context, collection string, indexes ...bson.D) error {
func (s *Store) EnsureIndex(ctx context.Context, collection string, indexes ...mongo.IndexModel) error {
if len(indexes) == 0 {
return nil
}
Expand All @@ -87,13 +86,8 @@ func (s *Store) EnsureIndex(ctx context.Context, collection string, indexes ...b
return nil
}

func ensureIndex(ctx context.Context, col *mongo.Collection, indexes ...bson.D) error {
for _, keys := range indexes {
opts := &options.IndexOptions{}
index := mongo.IndexModel{
Keys: keys,
Options: opts,
}
func ensureIndex(ctx context.Context, col *mongo.Collection, indexes ...mongo.IndexModel) error {
for _, index := range indexes {
_, err := col.Indexes().CreateOne(ctx, index)
if err != nil {
if strings.HasPrefix(err.Error(), "(IndexKeySpecsConflict)") {
Expand Down
10 changes: 7 additions & 3 deletions snapshot-service/pb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,20 @@ driven by resource change event
| ----- | ---- | ----- | ----------- |
| id | [string](#string) | | Unique condition ID

@gotags: bson:"_id" |
@gotags: bson:"id" |
| version | [uint64](#uint64) | | @gotags: bson:"version" |
| name | [string](#string) | | @gotags: bson:"name,omitempty" |
| enabled | [bool](#bool) | | @gotags: bson:"enabled,omitempty" |
| configuration_id | [string](#string) | | ID of the configuration to be applied when the condition is satisfied

@gotags: bson:"configurationId" |
| device_id_filter | [string](#string) | repeated | @gotags: bson:"deviceIdFilter,omitempty" |
| device_id_filter | [string](#string) | repeated | list of device IDs to which the condition applies

@gotags: bson:"deviceIdFilter,omitempty" |
| resource_type_filter | [string](#string) | repeated | @gotags: bson:"resourceTypeFilter,omitempty" |
| resource_href_filter | [string](#string) | repeated | @gotags: bson:"resourceHrefFilter,omitempty" |
| resource_href_filter | [string](#string) | repeated | list of resource hrefs to which the condition applies

@gotags: bson:"resourceHrefFilter,omitempty" |
| jq_expression_filter | [string](#string) | | @gotags: bson:"jqExpressionFilter,omitempty" |
| api_access_token | [string](#string) | | Token used to update resources in the configuration

Expand Down
10 changes: 7 additions & 3 deletions snapshot-service/pb/doc.html
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ <h3 id="snapshotservice.pb.Condition">Condition</h3>
<td></td>
<td><p>Unique condition ID

@gotags: bson:&#34;_id&#34; </p></td>
@gotags: bson:&#34;id&#34; </p></td>
</tr>

<tr>
Expand Down Expand Up @@ -467,7 +467,9 @@ <h3 id="snapshotservice.pb.Condition">Condition</h3>
<td>device_id_filter</td>
<td><a href="#string">string</a></td>
<td>repeated</td>
<td><p>@gotags: bson:&#34;deviceIdFilter,omitempty&#34; </p></td>
<td><p>list of device IDs to which the condition applies

@gotags: bson:&#34;deviceIdFilter,omitempty&#34; </p></td>
</tr>

<tr>
Expand All @@ -481,7 +483,9 @@ <h3 id="snapshotservice.pb.Condition">Condition</h3>
<td>resource_href_filter</td>
<td><a href="#string">string</a></td>
<td>repeated</td>
<td><p>@gotags: bson:&#34;resourceHrefFilter,omitempty&#34; </p></td>
<td><p>list of resource hrefs to which the condition applies

@gotags: bson:&#34;resourceHrefFilter,omitempty&#34; </p></td>
</tr>

<tr>
Expand Down
6 changes: 4 additions & 2 deletions snapshot-service/pb/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion snapshot-service/pb/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ message IDFilter {

message Condition { // driven by resource change event
// Unique condition ID
string id = 1; // @gotags: bson:"_id"
string id = 1; // @gotags: bson:"id"
uint64 version = 2; // @gotags: bson:"version"
string name = 3; // @gotags: bson:"name,omitempty"
bool enabled = 4; // @gotags: bson:"enabled,omitempty"
// ID of the configuration to be applied when the condition is satisfied
string configuration_id = 5; // @gotags: bson:"configurationId"
// list of device IDs to which the condition applies
repeated string device_id_filter = 6; // @gotags: bson:"deviceIdFilter,omitempty"
repeated string resource_type_filter = 7; // @gotags: bson:"resourceTypeFilter,omitempty"
// list of resource hrefs to which the condition applies
repeated string resource_href_filter = 8; // @gotags: bson:"resourceHrefFilter,omitempty"
string jq_expression_filter = 9; // @gotags: bson:"jqExpressionFilter,omitempty"
// Token used to update resources in the configuration
Expand Down
16 changes: 10 additions & 6 deletions snapshot-service/pb/service.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
"parameters": [
{
"name": "id",
"description": "Unique condition ID\n\n@gotags: bson:\"_id\"",
"description": "Unique condition ID\n\n@gotags: bson:\"id\"",
"in": "path",
"required": true,
"type": "string"
Expand Down Expand Up @@ -472,7 +472,8 @@
"items": {
"type": "string"
},
"title": "@gotags: bson:\"deviceIdFilter,omitempty\""
"description": "@gotags: bson:\"deviceIdFilter,omitempty\"",
"title": "list of device IDs to which the condition applies"
},
"resourceTypeFilter": {
"type": "array",
Expand All @@ -486,7 +487,8 @@
"items": {
"type": "string"
},
"title": "@gotags: bson:\"resourceHrefFilter,omitempty\""
"description": "@gotags: bson:\"resourceHrefFilter,omitempty\"",
"title": "list of resource hrefs to which the condition applies"
},
"jqExpressionFilter": {
"type": "string",
Expand Down Expand Up @@ -617,7 +619,7 @@
"properties": {
"id": {
"type": "string",
"description": "@gotags: bson:\"_id\"",
"description": "@gotags: bson:\"id\"",
"title": "Unique condition ID"
},
"version": {
Expand All @@ -643,7 +645,8 @@
"items": {
"type": "string"
},
"title": "@gotags: bson:\"deviceIdFilter,omitempty\""
"description": "@gotags: bson:\"deviceIdFilter,omitempty\"",
"title": "list of device IDs to which the condition applies"
},
"resourceTypeFilter": {
"type": "array",
Expand All @@ -657,7 +660,8 @@
"items": {
"type": "string"
},
"title": "@gotags: bson:\"resourceHrefFilter,omitempty\""
"description": "@gotags: bson:\"resourceHrefFilter,omitempty\"",
"title": "list of resource hrefs to which the condition applies"
},
"jqExpressionFilter": {
"type": "string",
Expand Down
7 changes: 5 additions & 2 deletions snapshot-service/store/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
)

const (
DeviceIDFilterKey = "deviceIdFilter" // must match with pb.Condition.DeviceIdFilter tag
OwnerKey = "owner" // must match with pb.Condition.Owner tag
IDKey = "id" // must match with pb.Condition.Id tag
VersionKey = "version" // must match with pb.Condition.Version tag
DeviceIDFilterKey = "deviceIdFilter" // must match with pb.Condition.DeviceIdFilter tag
ResourceHrefFilterKey = "resourceHrefFilter" // must match with pb.Condition.ResourceHrefFilter tag
OwnerKey = "owner" // must match with pb.Condition.Owner tag
)

type Condition = pb.Condition
15 changes: 13 additions & 2 deletions snapshot-service/store/mongodb/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mongodb
import (
"context"
"errors"
"slices"

"github.com/hashicorp/go-multierror"
"github.com/plgd-dev/hub/v2/snapshot-service/store"
Expand Down Expand Up @@ -36,6 +37,7 @@ func (s *Store) CreateCondition(ctx context.Context, cond *store.Condition) erro
if err := cond.Validate(); err != nil {
return err
}
slices.Sort(cond.GetResourceTypeFilter())
_, err := s.Collection(conditionsCol).InsertOne(ctx, cond)
if err != nil {
return err
Expand Down Expand Up @@ -66,6 +68,13 @@ func toDeviceIDQueryFilter(deviceID string) bson.M {
}}
}

func toResourceHrefQueryFilter(href string) bson.M {
return bson.M{"$or": []bson.D{
{{Key: store.ResourceHrefFilterKey, Value: bson.M{"$exists": false}}},
{{Key: store.ResourceHrefFilterKey, Value: href}},
}}
}

func toConditionsQueryFilter(owner string, queries *store.ConditionsQuery) interface{} {
filter := make([]interface{}, 0, 2)
if owner != "" {
Expand All @@ -74,6 +83,9 @@ func toConditionsQueryFilter(owner string, queries *store.ConditionsQuery) inter
if queries.DeviceID != "" {
filter = append(filter, toDeviceIDQueryFilter(queries.DeviceID))
}
if queries.Href != "" {
filter = append(filter, toResourceHrefQueryFilter(queries.Href))
}
if len(filter) == 0 {
return bson.D{}
}
Expand All @@ -89,10 +101,9 @@ func (s *Store) LoadConditions(ctx context.Context, owner string, query *store.C
if errors.Is(err, mongo.ErrNilDocument) {
return nil
}
if err != nil {
if h == nil || err != nil {
return err
}

var errors *multierror.Error
i := conditionsIterator{
iter: iter,
Expand Down
Loading

0 comments on commit 835a887

Please sign in to comment.