Skip to content

Commit

Permalink
Replace runtime map with periodic database check
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Jul 3, 2024
1 parent f8f167d commit cfe0d44
Show file tree
Hide file tree
Showing 25 changed files with 604 additions and 548 deletions.
22 changes: 11 additions & 11 deletions charts/plgd-hub/templates/snippet-service/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,21 @@ data:
{{- $cqlDbTls := .clients.storage.cqlDB.tls }}
{{- include "plgd-hub.certificateConfig" (list $ $cqlDbTls $cert ) | indent 10 }}
useSystemCAPool: {{ .clients.storage.cqlDB.tls.useSystemCAPool }}
resourceAggregate:
pendingCommandsCheckInterval: {{ .clients.resourceAggregate.pendingCommandsCheckInterval }}
resourceUpdater:
cleanUpExpiredUpdates: {{ .clients.resourceUpdater.cleanUpExpiredUpdates | quote }}
grpc:
{{- $resourceAggregate := .clients.resourceAggregate.grpc.address }}
address:{{ printf " " }}{{- include "plgd-hub.resourceAggregateAddress" (list $ $resourceAggregate ) | quote }}
sendMsgSize: {{ int64 .clients.resourceAggregate.grpc.sendMsgSize | default 4194304 }}
recvMsgSize: {{ int64 .clients.resourceAggregate.grpc.recvMsgSize | default 4194304 }}
{{- $resourceUpdater := .clients.resourceUpdater.grpc.address }}
address:{{ printf " " }}{{- include "plgd-hub.resourceAggregateAddress" (list $ $resourceUpdater ) | quote }}
sendMsgSize: {{ int64 .clients.resourceUpdater.grpc.sendMsgSize | default 4194304 }}
recvMsgSize: {{ int64 .clients.resourceUpdater.grpc.recvMsgSize | default 4194304 }}
keepAlive:
time: {{ .clients.resourceAggregate.grpc.keepAlive.time }}
timeout: {{ .clients.resourceAggregate.grpc.keepAlive.timeout }}
permitWithoutStream: {{ .clients.resourceAggregate.grpc.keepAlive.permitWithoutStream }}
time: {{ .clients.resourceUpdater.grpc.keepAlive.time }}
timeout: {{ .clients.resourceUpdater.grpc.keepAlive.timeout }}
permitWithoutStream: {{ .clients.resourceUpdater.grpc.keepAlive.permitWithoutStream }}
tls:
{{- $raClientTls := .clients.resourceAggregate.grpc.tls }}
{{- $raClientTls := .clients.resourceUpdater.grpc.tls }}
{{- include "plgd-hub.certificateConfig" (list $ $raClientTls $cert) | indent 10 }}
useSystemCAPool: {{ .clients.resourceAggregate.grpc.tls.useSystemCAPool }}
useSystemCAPool: {{ .clients.resourceUpdater.grpc.tls.useSystemCAPool }}
{{- include "plgd-hub.openTelemetryExporterConfig" (list $ $cert ) | nindent 6 }}
{{- end }}
{{- end }}
4 changes: 2 additions & 2 deletions charts/plgd-hub/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2508,8 +2508,8 @@ snippetservice:
keyFile:
certFile:
useSystemCAPool: false
resourceAggregate:
pendingCommandsCheckInterval: 1m
resourceUpdater:
cleanUpExpiredUpdates: "0 * * * *"
grpc:
address: ""
sendMsgSize: 4194304
Expand Down
17 changes: 15 additions & 2 deletions pkg/mongodb/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type updateJSON = func(map[string]interface{})

func ConvertStringValueToInt(json map[string]interface{}, path string) {
func ConvertStringValueToInt64(json map[string]interface{}, path string) {
pos := strings.Index(path, ".")
if pos == -1 {
valueI, ok := json[path]
Expand All @@ -36,11 +36,24 @@ func ConvertStringValueToInt(json map[string]interface{}, path string) {
if !ok {
return
}
elemArray, ok := elem.([]interface{})
if ok {
for i, elem := range elemArray {
elemMap, ok2 := elem.(map[string]interface{})
if !ok2 {
continue
}
ConvertStringValueToInt64(elemMap, path[pos+1:])
elemArray[i] = elemMap
}
json[elemPath] = elemArray
return
}
elemMap, ok := elem.(map[string]interface{})
if !ok {
return
}
ConvertStringValueToInt(elemMap, path[pos+1:])
ConvertStringValueToInt64(elemMap, path[pos+1:])
json[elemPath] = elemMap
}

Expand Down
4 changes: 2 additions & 2 deletions snippet-service/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ clients:
keyFile: "/secrets/private/cert.key"
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
resourceAggregate:
resourceUpdater:
cleanUpExpiredUpdates: "0 * * * *"
grpc:
address: ""
sendMsgSize: 4194304
Expand All @@ -128,4 +129,3 @@ clients:
keyFile: "/secrets/private/cert.key"
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
pendingCommandsCheckInterval: 1m
2 changes: 1 addition & 1 deletion snippet-service/store/jq.go → snippet-service/jq/jq.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package jq

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package store_test
package jq_test

import (
"testing"

"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/hub/v2/resource-aggregate/commands"
"github.com/plgd-dev/hub/v2/snippet-service/store"
"github.com/plgd-dev/hub/v2/snippet-service/jq"
hubTest "github.com/plgd-dev/hub/v2/test"
"github.com/plgd-dev/kit/v2/codec/json"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestEvalJQCondition(t *testing.T) {
err = commands.DecodeContent(tt.content, &json)
}
require.NoError(t, err)
got, err := store.EvalJQCondition(tt.jq, json)
got, err := jq.EvalJQCondition(tt.jq, json)
if tt.wantErr {
require.Error(t, err)
return
Expand Down
12 changes: 9 additions & 3 deletions snippet-service/pb/appliedConfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,20 @@ func (r *AppliedConfiguration_Resource) Clone() *AppliedConfiguration_Resource {
CorrelationId: r.GetCorrelationId(),
Status: r.GetStatus(),
ResourceUpdated: r.GetResourceUpdated().Clone(),
ValidUntil: r.GetValidUntil(),
}
}

func (r *AppliedConfiguration_Resource) UnmarshalBSON(data []byte) error {
return pkgMongo.UnmarshalProtoBSON(data, r, nil)
}

func (r *AppliedConfiguration_Resource) jsonToBSONTag(json map[string]interface{}) {
pkgMongo.ConvertStringValueToInt64(json, "validUntil")
}

func (r *AppliedConfiguration_Resource) MarshalBSON() ([]byte, error) {
return pkgMongo.MarshalProtoBSON(r, nil)
return pkgMongo.MarshalProtoBSON(r, r.jsonToBSONTag)
}

func (c *AppliedConfiguration) CloneExecutedBy() isAppliedConfiguration_ExecutedBy {
Expand Down Expand Up @@ -133,8 +138,9 @@ func (c *AppliedConfiguration) Clone() *AppliedConfiguration {
}

func (c *AppliedConfiguration) jsonToBSONTag(json map[string]interface{}) {
pkgMongo.ConvertStringValueToInt(json, "configurationId.version")
pkgMongo.ConvertStringValueToInt(json, "conditionId.version")
pkgMongo.ConvertStringValueToInt64(json, "configurationId.version")
pkgMongo.ConvertStringValueToInt64(json, "conditionId.version")
pkgMongo.ConvertStringValueToInt64(json, "resources.validUntil")
}

func (c *AppliedConfiguration) MarshalBSON() ([]byte, error) {
Expand Down
63 changes: 7 additions & 56 deletions snippet-service/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ package service
import (
"fmt"
"net"
"time"

"github.com/go-co-op/gocron/v2"
"github.com/google/uuid"
"github.com/plgd-dev/hub/v2/pkg/config"
"github.com/plgd-dev/hub/v2/pkg/log"
grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client"
httpServer "github.com/plgd-dev/hub/v2/pkg/net/http/server"
otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client"
natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client"
grpcService "github.com/plgd-dev/hub/v2/snippet-service/service/grpc"
storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config"
"github.com/plgd-dev/hub/v2/snippet-service/updater"
)

type HTTPConfig struct {
Expand Down Expand Up @@ -45,53 +43,6 @@ func (c *APIsConfig) Validate() error {
return nil
}

type StorageConfig struct {
Embedded storeConfig.Config `yaml:",inline" json:",inline"`
ExtendCronParserBySeconds bool `yaml:"-" json:"-"`
CleanUpRecords string `yaml:"cleanUpRecords" json:"cleanUpRecords"`
}

func (c *StorageConfig) Validate() error {
if err := c.Embedded.Validate(); err != nil {
return err
}
if c.CleanUpRecords == "" {
return nil
}
s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan
if err != nil {
return fmt.Errorf("cannot create cron job: %w", err)
}
defer func() {
if errS := s.Shutdown(); errS != nil {
log.Errorf("failed to shutdown cron job: %w", errS)
}
}()
_, err = s.NewJob(gocron.CronJob(c.CleanUpRecords, c.ExtendCronParserBySeconds),
gocron.NewTask(func() {
// do nothing
}))
if err != nil {
return fmt.Errorf("cleanUpRecords('%v') - %w", c.CleanUpRecords, err)
}
return nil
}

type ResourceAggregateConfig struct {
Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
PendingCommandsCheckInterval time.Duration `yaml:"pendingCommandsCheckInterval" json:"pendingCommandsCheckInterval"`
}

func (c *ResourceAggregateConfig) Validate() error {
if err := c.Connection.Validate(); err != nil {
return fmt.Errorf("grpc.%w", err)
}
if c.PendingCommandsCheckInterval <= 0 {
return fmt.Errorf("pendingCommandsCheckInterval('%v') - must be greater than 0", c.PendingCommandsCheckInterval)
}
return nil
}

type EventBusConfig struct {
NATS natsClient.Config `yaml:"nats" json:"nats"`
}
Expand All @@ -104,10 +55,10 @@ func (c *EventBusConfig) Validate() error {
}

type ClientsConfig struct {
Storage StorageConfig `yaml:"storage" json:"storage"`
OpenTelemetryCollector otelClient.Config `yaml:"openTelemetryCollector" json:"openTelemetryCollector"`
EventBus EventBusConfig `yaml:"eventBus" json:"eventBus"`
ResourceAggregate ResourceAggregateConfig `yaml:"resourceAggregate" json:"resourceAggregate"`
Storage storeConfig.Config `yaml:"storage" json:"storage"`
OpenTelemetryCollector otelClient.Config `yaml:"openTelemetryCollector" json:"openTelemetryCollector"`
EventBus EventBusConfig `yaml:"eventBus" json:"eventBus"`
ResourceUpdater updater.ResourceUpdaterConfig `yaml:"resourceUpdater" json:"resourceUpdater"`
}

func (c *ClientsConfig) Validate() error {
Expand All @@ -120,8 +71,8 @@ func (c *ClientsConfig) Validate() error {
if err := c.EventBus.Validate(); err != nil {
return fmt.Errorf("eventBus.%w", err)
}
if err := c.ResourceAggregate.Validate(); err != nil {
return fmt.Errorf("resourceAggregate.%w", err)
if err := c.ResourceUpdater.Validate(); err != nil {
return fmt.Errorf("resourceUpdater.%w", err)
}
return nil
}
Expand Down
85 changes: 7 additions & 78 deletions snippet-service/service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package service_test

import (
"testing"
"time"

"github.com/plgd-dev/hub/v2/pkg/log"
grpcServer "github.com/plgd-dev/hub/v2/pkg/net/grpc/server"
Expand All @@ -11,7 +10,7 @@ import (
"github.com/plgd-dev/hub/v2/snippet-service/service"
storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config"
"github.com/plgd-dev/hub/v2/snippet-service/test"
"github.com/plgd-dev/hub/v2/test/config"
"github.com/plgd-dev/hub/v2/snippet-service/updater"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -95,90 +94,20 @@ func TestHTTPConfig(t *testing.T) {
func TestStorageConfig(t *testing.T) {
tests := []struct {
name string
cfg service.StorageConfig
cfg storeConfig.Config
wantErr bool
}{
{
name: "valid",
cfg: test.MakeStorageConfig(),
cfg: test.MakeStoreConfig(),
wantErr: false,
},
{
name: "valid - no cron",
cfg: func() service.StorageConfig {
cfg := test.MakeStorageConfig()
cfg.CleanUpRecords = ""
return cfg
}(),
wantErr: false,
},
{
name: "invalid - no storage",
cfg: func() service.StorageConfig {
cfg := test.MakeStorageConfig()
cfg.Embedded = storeConfig.Config{}
return cfg
}(),
wantErr: true,
},
{
name: "invalid - bad cron expression",
cfg: func() service.StorageConfig {
cfg := test.MakeStorageConfig()
cfg.CleanUpRecords = "bad"
return cfg
}(),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
})
}
}

func TestResourceAggregateConfig(t *testing.T) {
tests := []struct {
name string
cfg service.ResourceAggregateConfig
wantErr bool
}{
{
name: "valid",
cfg: service.ResourceAggregateConfig{
Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST),
PendingCommandsCheckInterval: time.Second * 10,
},
},
{
name: "invalid - no connection",
cfg: func() service.ResourceAggregateConfig {
cfg := service.ResourceAggregateConfig{
PendingCommandsCheckInterval: time.Second * 10,
}
return cfg
}(),
wantErr: true,
},
{
name: "invalid - pending commands check interval",
cfg: func() service.ResourceAggregateConfig {
cfg := service.ResourceAggregateConfig{
Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST),
PendingCommandsCheckInterval: -1,
}
return cfg
}(),
name: "invalid",
cfg: storeConfig.Config{},
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
Expand All @@ -205,7 +134,7 @@ func TestClientsConfig(t *testing.T) {
name: "invalid - no storage",
cfg: func() service.ClientsConfig {
cfg := test.MakeClientsConfig()
cfg.Storage = service.StorageConfig{}
cfg.Storage = storeConfig.Config{}
return cfg
}(),
wantErr: true,
Expand Down Expand Up @@ -238,7 +167,7 @@ func TestClientsConfig(t *testing.T) {
name: "invalid ResourceAggregate",
cfg: func() service.ClientsConfig {
cfg := test.MakeClientsConfig()
cfg.ResourceAggregate = service.ResourceAggregateConfig{}
cfg.ResourceUpdater = updater.ResourceUpdaterConfig{}
return cfg
}(),
wantErr: true,
Expand Down
Loading

0 comments on commit cfe0d44

Please sign in to comment.