Skip to content

Commit

Permalink
snapshot-service: persistent Condition storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed May 17, 2024
1 parent 255741c commit 82c7726
Show file tree
Hide file tree
Showing 31 changed files with 728 additions and 316 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ scylla: scylla/clean
--entrypoint /bin/cp \
scylladb/scylla \
/etc/scylla/scylla.yaml /etc-scylla-tmp/scylla.yaml
sudo chown $(shell whoami) $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml

yq -i '.server_encryption_options.internode_encryption="all"' $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml
yq -i '.server_encryption_options.certificate="/certs/http.crt"' $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml
yq -i '.server_encryption_options.keyfile="/certs/http.key"' $(WORKING_DIRECTORY)/.tmp/scylla/etc/scylla.yaml
Expand Down
10 changes: 5 additions & 5 deletions certificate-authority/store/cqldb/signingRecords.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *Store) CreateSigningRecord(ctx context.Context, signingRecord *store.Si
if err != nil {
return err
}
applied, err := s.client.Session().Query(insertQuery).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil)
applied, err := s.Session().Query(insertQuery).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil)
if err != nil {
return err
}
Expand All @@ -95,7 +95,7 @@ func (s *Store) UpdateSigningRecord(ctx context.Context, signingRecord *store.Si
return err
}

return s.client.Session().Query(insertQuery).WithContext(ctx).Exec()
return s.Session().Query(insertQuery).WithContext(ctx).Exec()
}

func stringsToCQLSet(filter []string, str bool) string {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (s *Store) readPrimaryKeys(ctx context.Context, where string) (primaryKeysV
b.WriteString(s.Table())
b.WriteString(" " + cqldb.WhereClause + " ")
b.WriteString(where)
iter := s.client.Session().Query(b.String()).WithContext(ctx).Iter()
iter := s.Session().Query(b.String()).WithContext(ctx).Iter()
defer iter.Close()
return readPrimaryKeys(iter)
}
Expand Down Expand Up @@ -323,7 +323,7 @@ func (s *Store) DeleteSigningRecords(ctx context.Context, owner string, query *s
b.WriteString(" IF EXISTS")

var count int64
applied, err := s.client.Session().Query(b.String()).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil)
applied, err := s.Session().Query(b.String()).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil)
if err == nil {
if applied {
count++
Expand Down Expand Up @@ -377,7 +377,7 @@ func (i *SigningRecordsIterator) nextQuery() bool {
b.WriteString(i.s.Table())
b.WriteString(" " + cqldb.WhereClause + " ")
b.WriteString(i.queries[i.queriesIdx])
i.iter = i.s.client.Session().Query(b.String()).WithContext(i.ctx).Iter()
i.iter = i.s.Session().Query(b.String()).WithContext(i.ctx).Iter()
i.queriesIdx++
return true
}
Expand Down
37 changes: 2 additions & 35 deletions certificate-authority/store/cqldb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@ type Index struct {
// clustering key: deviceIDKey
var primaryKey = []string{idKey, ownerKey, commonNameKey}

// Store implements an Store for cqldb.
type Store struct {
client *cqldb.Client
config *Config
logger log.Logger
*cqldb.Store
}

func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) {
Expand Down Expand Up @@ -92,36 +89,6 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config *
}

return &Store{
client: client,
logger: logger,
config: config,
Store: cqldb.NewStore(config.Table, client, logger),
}, nil
}

// Clear clears the event storage.
func (s *Store) Clear(ctx context.Context) error {
err := s.client.DropKeyspace(ctx)
if err != nil {
return fmt.Errorf("cannot clear: %w", err)
}
return nil
}

func (s *Store) Table() string {
return s.client.Keyspace() + "." + s.config.Table
}

// Clear documents in collections, but don't drop the database or the collections
func (s *Store) ClearTable(ctx context.Context) error {
return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec()
}

// Close closes the database session.
func (s *Store) Close(_ context.Context) error {
s.client.Close()
return nil
}

func (s *Store) AddCloseFunc(f func()) {
s.client.AddCloseFunc(f)
}
2 changes: 1 addition & 1 deletion identity-store/persistence/cqldb/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type PersistenceTx struct {
// tx := s.persistence.NewTransaction()
// defer tx.Close()
func (s *Store) NewTransaction(ctx context.Context) persistence.PersistenceTx {
return &PersistenceTx{tx: s.client.Session(), table: s.Table(), err: nil, ctx: ctx}
return &PersistenceTx{tx: s.Session(), table: s.Table(), err: nil, ctx: ctx}
}

func (p *PersistenceTx) retrieveDeviceByQuery(whereCondition string) (_ *persistence.AuthorizedDevice, ok bool, err error) {
Expand Down
2 changes: 1 addition & 1 deletion identity-store/persistence/cqldb/persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestPersistenceTxPersist(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := p.TruncateTable(ctx)
err := p.ClearTable(ctx)
require.NoError(t, err)
if tt.actionBeforePersist != nil {
tt.actionBeforePersist(t, tx)
Expand Down
37 changes: 2 additions & 35 deletions identity-store/persistence/cqldb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ var indexes = []cqldb.Index{

// Store implements an Store for cqldb.
type Store struct {
client *cqldb.Client
config *Config
logger log.Logger
*cqldb.Store
}

func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*Store, error) {
Expand Down Expand Up @@ -90,37 +88,6 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config *
}

return &Store{
client: client,
logger: logger,
config: config,
Store: cqldb.NewStore(config.Table, client, logger),
}, nil
}

// Clear clears the event storage.
func (s *Store) Clear(ctx context.Context) error {
err := s.client.DropKeyspace(ctx)
if err != nil {
return fmt.Errorf("cannot clear: %w", err)
}

return nil
}

func (s *Store) Table() string {
return s.client.Keyspace() + "." + s.config.Table
}

// Truncate records in table, but don't drop the keybase or the table.
func (s *Store) TruncateTable(ctx context.Context) error {
return s.client.Session().Query("TRUNCATE TABLE " + s.Table()).WithContext(ctx).Exec()
}

// Close closes the database session.
func (s *Store) Close(_ context.Context) error {
s.client.Close()
return nil
}

func (s *Store) AddCloseFunc(f func()) {
s.client.AddCloseFunc(f)
}
55 changes: 55 additions & 0 deletions pkg/cqldb/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cqldb

import (
"context"
"fmt"

"github.com/gocql/gocql"
"github.com/plgd-dev/hub/v2/pkg/log"
)

// Store implements an Store for cqldb.
type Store struct {
table string
client *Client
logger log.Logger
}

func NewStore(table string, client *Client, logger log.Logger) *Store {
return &Store{
table: table,
client: client,
logger: logger,
}
}

func (s *Store) Table() string {
return s.client.Keyspace() + "." + s.table
}

func (s *Store) Session() *gocql.Session {
return s.client.Session()
}

func (s *Store) AddCloseFunc(f func()) {
s.client.AddCloseFunc(f)
}

func (s *Store) Clear(ctx context.Context) error {
err := s.client.DropKeyspace(ctx)
if err != nil {
return fmt.Errorf("cannot clear: %w", err)
}
return nil
}

// Clear documents in collections, but don't drop the database or the collections
func (s *Store) ClearTable(ctx context.Context) error {
return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec()
}

// Close closes the database session.
func (s *Store) Close(_ context.Context) error {
s.client.Close()
return nil
}
2 changes: 1 addition & 1 deletion resource-aggregate/cqrs/eventstore/cqldb/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ func (s *EventStore) Delete(ctx context.Context, queries []eventstore.DeleteQuer
return errors.New("failed to delete documents: invalid query")
}

return s.client.Session().Query("delete from " + s.Table() + " " + cqldb.WhereClause + " " + deviceIDKey + " in (" + deviceIDFilter + ");").WithContext(ctx).Exec()
return s.Session().Query("delete from " + s.Table() + " " + cqldb.WhereClause + " " + deviceIDKey + " in (" + deviceIDFilter + ");").WithContext(ctx).Exec()
}
41 changes: 6 additions & 35 deletions resource-aggregate/cqrs/eventstore/cqldb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,9 @@ var indexes = []cqldb.Index{

// EventStore implements an EventStore for cqldb.
type EventStore struct {
client *cqldb.Client
config *Config
logger log.Logger
}

func (s *EventStore) AddCloseFunc(f func()) {
s.client.AddCloseFunc(f)
*cqldb.Store
marshalerFunc MarshalerFunc
unmarshalerFunc UnmarshalerFunc
}

func New(ctx context.Context, config *Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider, opts ...Option) (*EventStore, error) {
Expand Down Expand Up @@ -124,9 +120,9 @@ func newEventStoreWithClient(ctx context.Context, client *cqldb.Client, config *
}

return &EventStore{
client: client,
logger: logger,
config: config,
Store: cqldb.NewStore(config.Table, client, logger),
marshalerFunc: config.marshalerFunc,
unmarshalerFunc: config.unmarshalerFunc,
}, nil
}

Expand All @@ -151,28 +147,3 @@ func getLatestEventsSnapshot(events []eventstore.Event, marshaler MarshalerFunc)
}
return lastEvent, snapshot, nil
}

// Clear clears the event storage.
func (s *EventStore) Clear(ctx context.Context) error {
err := s.client.DropKeyspace(ctx)
if err != nil {
return fmt.Errorf("cannot clear: %w", err)
}

return nil
}

func (s *EventStore) Table() string {
return s.client.Keyspace() + "." + s.config.Table
}

// Clear documents in collections, but don't drop the database or the collections
func (s *EventStore) ClearTable(ctx context.Context) error {
return s.client.Session().Query("truncate " + s.Table() + ";").WithContext(ctx).Exec()
}

// Close closes the database session.
func (s *EventStore) Close(_ context.Context) error {
s.client.Close()
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *EventStore) GetLatestDeviceETags(ctx context.Context, deviceID string,
q.WriteString(" " + cqldb.WhereClause)
q.WriteString(" " + deviceIDKey + "=" + deviceID)

iter := s.client.Session().Query(q.String()).WithContext(ctx).Iter()
iter := s.Session().Query(q.String()).WithContext(ctx).Iter()
if iter == nil {
return nil, errors.New("cannot create iterator")
}
Expand Down
4 changes: 2 additions & 2 deletions resource-aggregate/cqrs/eventstore/cqldb/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func (s *EventStore) loadEventsQuery(ctx context.Context, eh eventstore.Handler,
q.WriteString(" " + cqldb.WhereClause + " " + filter)
}
q.WriteString(";")
iter := s.client.Session().Query(q.String()).WithContext(ctx).Iter()
i := newIterator(iter, s.config.unmarshalerFunc)
iter := s.Session().Query(q.String()).WithContext(ctx).Iter()
i := newIterator(iter, s.unmarshalerFunc)
err := eh.Handle(ctx, i)
errClose := iter.Close()
if err == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (s *EventStore) LoadDeviceMetadataByServiceIDs(ctx context.Context, service
}
serviceIDs = pkgStrings.Unique(serviceIDs)
q := cqldb.SelectCommand + " " + deviceIDKey + "," + serviceIDKey + " " + cqldb.FromClause + " " + s.Table() + " " + cqldb.WhereClause + " " + serviceIDKey + " in (" + strings.Join(serviceIDs, ",") + ") LIMIT " + strconv.FormatInt(limit, 10) + ";"
iter := s.client.Session().Query(q).WithContext(ctx).Iter()
iter := s.Session().Query(q).WithContext(ctx).Iter()
if iter == nil {
return nil, errors.New("cannot create iterator")
}
Expand Down
10 changes: 5 additions & 5 deletions resource-aggregate/cqrs/eventstore/cqldb/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func eventsToCQLSetValue(event eventstore.Event, data []byte) string {
}

func (s *EventStore) saveEvent(ctx context.Context, events []eventstore.Event) (status eventstore.SaveStatus, err error) {
lastEvent, snapshotBinary, err := getLatestEventsSnapshot(events, s.config.marshalerFunc)
lastEvent, snapshotBinary, err := getLatestEventsSnapshot(events, s.marshalerFunc)
if err != nil {
return eventstore.Fail, err
}
setters := eventsToCQLSetValue(lastEvent, snapshotBinary)
q := "update " + s.client.Keyspace() + "." + s.config.Table + " set " + setters + " " + cqldb.WhereClause + " " + deviceIDKey + "=" + lastEvent.GroupID() + " and " + idKey + "=" + lastEvent.AggregateID() + " if " + versionKey + "=" + strconv.FormatUint(events[0].Version()-1, 10) + ";"
ok, err := s.client.Session().Query(q).WithContext(ctx).ScanCAS(nil)
q := "update " + s.Table() + " set " + setters + " " + cqldb.WhereClause + " " + deviceIDKey + "=" + lastEvent.GroupID() + " and " + idKey + "=" + lastEvent.AggregateID() + " if " + versionKey + "=" + strconv.FormatUint(events[0].Version()-1, 10) + ";"
ok, err := s.Session().Query(q).WithContext(ctx).ScanCAS(nil)
if err != nil {
return eventstore.Fail, fmt.Errorf("cannot update snapshot event('%v'): %w", events, err)
}
Expand All @@ -112,7 +112,7 @@ func (s *EventStore) Save(ctx context.Context, events ...eventstore.Event) (even
if events[0].Version() != 0 {
return s.saveEvent(ctx, events)
}
lastEvent, data, err := getLatestEventsSnapshot(events, s.config.marshalerFunc)
lastEvent, data, err := getLatestEventsSnapshot(events, s.marshalerFunc)
if err != nil {
return eventstore.Fail, err
}
Expand All @@ -121,7 +121,7 @@ func (s *EventStore) Save(ctx context.Context, events ...eventstore.Event) (even
values := kvs.Values()

q := "insert into " + s.Table() + " (" + strings.Join(keys, ",") + ") values (" + strings.Join(values, ",") + ") if not exists;"
ok, err := s.client.Session().Query(q).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil, nil, nil, nil, nil)
ok, err := s.Session().Query(q).WithContext(ctx).ScanCAS(nil, nil, nil, nil, nil, nil, nil, nil, nil)
if err != nil {
if errors.Is(err, gocql.ErrNotFound) {
return eventstore.Ok, nil
Expand Down
Loading

0 comments on commit 82c7726

Please sign in to comment.