Skip to content

Commit

Permalink
Fixes after CR
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Jul 8, 2024
1 parent 37b395c commit 0dfa3f3
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 165 deletions.
39 changes: 4 additions & 35 deletions charts/plgd-hub/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2294,7 +2294,7 @@ certificateauthority:
expiresIn: "87600h"

snippetservice:
# -- Enable snippet-service service
# -- Enable snippet-service
enabled: true
# -- Name of component. Used in label selectors
name: snippet-service
Expand All @@ -2316,9 +2316,9 @@ snippetservice:
grpc:
# -- Service type
type: ClusterIP
# -- Labels for snippet-service service
# -- Labels for snippet-service
labels: {}
# -- Annotations for snippet-service service
# -- Annotations for snippet-service
annotations: {}
# -- Target port
targetPort: grpc
Expand Down Expand Up @@ -2456,7 +2456,7 @@ snippetservice:
encoderConfig:
# -- Time format for logs. The supported values are: "rfc3339nano", "rfc3339"
timeEncoder: rfc3339nano
# -- For complete snippet-service service configuration see [plgd/snippet-service](https://github.com/plgd-dev/hub/tree/main/snippet-service)
# -- For complete snippet-service configuration see [plgd/snippet-service](https://github.com/plgd-dev/hub/tree/main/snippet-service)
apis:
grpc:
address:
Expand Down Expand Up @@ -2525,37 +2525,6 @@ snippetservice:
keyFile:
certFile:
useSystemCAPool: false
bulkWrite:
# -- A time limit for write bulk to mongodb. A Timeout of zero means no timeout.
timeout: 1m0s
# -- The amount of time to wait until a record is written to mongodb. Any records collected during the throttle time will also be written. A throttle time of zero writes immediately. If recordLimit is reached, all records are written immediately
throttleTime: 500ms
# -- The maximum number of documents to cache before an immediate write.
documentLimit: 1000
cqlDB:
table: snippetServiceRecords
hosts: []
port: 9142
numConnections: 16
connectTimeout: 10s
# -- Resolve IP address to hostname before validate certificate. If false, the TLS validator will use ip/hostname advertised by the Cassandra node.
useHostnameResolution: true
reconnectionPolicy:
constant:
interval: 3s
# 0 - means infinity
maxRetries: 3
keyspace:
name: plgdhub
create: true
replication:
class: SimpleStrategy
replication_factor: 1
tls:
caPool:
keyFile:
certFile:
useSystemCAPool: false
resourceUpdater:
cleanUpExpiredUpdates: "0 * * * *"
grpc:
Expand Down
26 changes: 0 additions & 26 deletions snippet-service/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,6 @@ clients:
keyFile: "/secrets/private/cert.key"
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
bulkWrite:
timeout: 1m0s
throttleTime: 500ms
documentLimit: 1000
cqlDB:
table: "snippets"
hosts: []
port: 9142
numConnections: 16
connectTimeout: 10s
useHostnameResolution: true
reconnectionPolicy:
constant:
interval: 3s
maxRetries: 3
keyspace:
name: plgdhub
create: true
replication:
class: SimpleStrategy
replication_factor: 1
tls:
caPool: "/secrets/public/rootca.crt"
keyFile: "/secrets/private/cert.key"
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
openTelemetryCollector:
grpc:
enabled: false
Expand Down
70 changes: 32 additions & 38 deletions snippet-service/service/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,29 @@ func errCannotGetConfigurations(err error) error {
return fmt.Errorf("cannot get configurations: %w", err)
}

func sendConfiguration(srv pb.SnippetService_GetConfigurationsServer, c *store.Configuration) error {
var lastVersion *store.ConfigurationVersion
for i := range c.Versions {
err := srv.Send(c.GetConfiguration(i))
if err != nil {
return err
func sendConfiguration(srv pb.SnippetService_GetConfigurationsServer, sc *store.Configuration) error {
var err error
var lastVersion *pb.Configuration
sc.RangeVersions(func(_ int, c *pb.Configuration) bool {
errS := srv.Send(c)
if errS != nil {
err = errS
return false
}
lastVersion = &c.Versions[i]
lastVersion = c
return true
})
if err != nil {
return err
}
if c.Latest == nil {
if sc.Latest == nil {
return nil
}
latest, err := c.GetLatest()
latest, err := sc.GetLatest()
if err != nil {
return err
}
if lastVersion != nil && lastVersion.Version == latest.GetVersion() {
if lastVersion != nil && lastVersion.GetVersion() == latest.GetVersion() {
// already sent when iterating over versions array
return nil
}
Expand Down Expand Up @@ -233,23 +239,29 @@ func errCannotGetConditions(err error) error {
return fmt.Errorf("cannot get conditions: %w", err)
}

func sendCondition(srv pb.SnippetService_GetConditionsServer, c *store.Condition) error {
var lastVersion *store.ConditionVersion
for i := range c.Versions {
err := srv.Send(c.GetCondition(i))
if err != nil {
return err
func sendCondition(srv pb.SnippetService_GetConditionsServer, sc *store.Condition) error {
var err error
var lastVersion *pb.Condition
sc.RangeVersions(func(_ int, c *pb.Condition) bool {
errS := srv.Send(c)
if errS != nil {
err = errS
return false
}
lastVersion = &c.Versions[i]
lastVersion = c
return true
})
if err != nil {
return err
}
if c.Latest == nil {
if sc.Latest == nil {
return nil
}
latest, err := c.GetLatest()
latest, err := sc.GetLatest()
if err != nil {
return err
}
if lastVersion != nil && lastVersion.Version == latest.GetVersion() {
if lastVersion != nil && lastVersion.GetVersion() == latest.GetVersion() {
// already sent when iterating over versions array
return nil
}
Expand Down Expand Up @@ -296,24 +308,6 @@ func (s *SnippetServiceServer) DeleteConditions(ctx context.Context, req *pb.Del
}, nil
}

func errCannotCreateAppliedConfiguration(err error) error {
return fmt.Errorf("cannot create applied configuration: %w", err)
}

func (s *SnippetServiceServer) CreateAppliedConfiguration(ctx context.Context, configuration *pb.AppliedConfiguration) (*pb.AppliedConfiguration, error) {
owner, err := s.checkOwner(ctx, configuration.GetOwner())
if err != nil {
return nil, s.logger.LogAndReturnError(status.Errorf(codes.PermissionDenied, "%v", errCannotCreateAppliedConfiguration(err)))
}

configuration.Owner = owner
c, _, err := s.store.CreateAppliedConfiguration(ctx, configuration, false)
if err != nil {
return nil, s.logger.LogAndReturnError(status.Errorf(getGRPCErrorCode(err), "%v", errCannotCreateAppliedConfiguration(err)))
}
return c, nil
}

func errCannotGetAppliedConfigurations(err error) error {
return fmt.Errorf("cannot get applied configurations: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestRequestHandlerDeleteAppliedConfigurations(t *testing.T) {
ss, shutdownHttp := test.New(t, snippetCfg)
defer shutdownHttp()

_ = test.AddAppliedConfigurations(ctx, t, snippetCfg.APIs.GRPC.Authorization.OwnerClaim, ss)
_ = test.AddAppliedConfigurationsToStore(ctx, t, ss.SnippetServiceStore())

conn, err := grpc.NewClient(config.SNIPPET_SERVICE_HOST, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
RootCAs: hubTest.GetRootCertificatePool(t),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestRequestHandlerGetAppliedConfigurations(t *testing.T) {
ss, shutdownHttp := test.New(t, snippetCfg)
defer shutdownHttp()

appliedConfs := test.AddAppliedConfigurations(ctx, t, snippetCfg.APIs.GRPC.Authorization.OwnerClaim, ss)
appliedConfs := test.AddAppliedConfigurationsToStore(ctx, t, ss.SnippetServiceStore())

type args struct {
token string
Expand Down
8 changes: 4 additions & 4 deletions snippet-service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const serviceName = "snippet-service"
type Service struct {
*service.Service

snippetService *grpcService.SnippetServiceServer
store store.Store
resourceUpdater *updater.ResourceUpdater
resourceSubscriber *ResourceSubscriber
}
Expand Down Expand Up @@ -143,12 +143,12 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
return &Service{
Service: s,

snippetService: snippetService,
store: db,
resourceUpdater: resourceUpdater,
resourceSubscriber: resourceSubscriber,
}, nil
}

func (s *Service) SnippetServiceServer() *grpcService.SnippetServiceServer {
return s.snippetService
func (s *Service) SnippetServiceStore() store.Store {
return s.store
}
16 changes: 0 additions & 16 deletions snippet-service/service/service_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
// ************************************************************************
// Copyright (C) 2022 plgd.dev, s.r.o.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ************************************************************************

package service_test

import (
Expand Down
33 changes: 19 additions & 14 deletions snippet-service/store/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,25 @@ func (c *Condition) GetLatest() (*pb.Condition, error) {
}, nil
}

func (c *Condition) GetCondition(index int) *pb.Condition {
return &pb.Condition{
Id: c.Id,
Owner: c.Owner,
ConfigurationId: c.ConfigurationId,
Name: c.Versions[index].Name,
Enabled: c.Versions[index].Enabled,
Version: c.Versions[index].Version,
Timestamp: c.Versions[index].Timestamp,
DeviceIdFilter: c.Versions[index].DeviceIdFilter,
ResourceTypeFilter: c.Versions[index].ResourceTypeFilter,
ResourceHrefFilter: c.Versions[index].ResourceHrefFilter,
JqExpressionFilter: c.Versions[index].JqExpressionFilter,
ApiAccessToken: c.Versions[index].ApiAccessToken,
func (c *Condition) RangeVersions(f func(int, *pb.Condition) bool) {
for i := range c.Versions {
cond := &pb.Condition{
Id: c.Id,
Owner: c.Owner,
ConfigurationId: c.ConfigurationId,
Name: c.Versions[i].Name,
Enabled: c.Versions[i].Enabled,
Version: c.Versions[i].Version,
Timestamp: c.Versions[i].Timestamp,
DeviceIdFilter: c.Versions[i].DeviceIdFilter,
ResourceTypeFilter: c.Versions[i].ResourceTypeFilter,
ResourceHrefFilter: c.Versions[i].ResourceHrefFilter,
JqExpressionFilter: c.Versions[i].JqExpressionFilter,
ApiAccessToken: c.Versions[i].ApiAccessToken,
}
if !f(i, cond) {
break
}
}
}

Expand Down
21 changes: 13 additions & 8 deletions snippet-service/store/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,19 @@ func (c *Configuration) GetLatest() (*pb.Configuration, error) {
}, nil
}

func (c *Configuration) GetConfiguration(index int) *pb.Configuration {
return &pb.Configuration{
Id: c.Id,
Owner: c.Owner,
Name: c.Versions[index].Name,
Timestamp: c.Versions[index].Timestamp,
Version: c.Versions[index].Version,
Resources: c.Versions[index].Resources,
func (c *Configuration) RangeVersions(f func(int, *pb.Configuration) bool) {
for i := range c.Versions {
conf := &pb.Configuration{
Id: c.Id,
Owner: c.Owner,
Name: c.Versions[i].Name,
Timestamp: c.Versions[i].Timestamp,
Version: c.Versions[i].Version,
Resources: c.Versions[i].Resources,
}
if !f(i, conf) {
break
}
}
}

Expand Down
14 changes: 0 additions & 14 deletions snippet-service/test/appliedConfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"time"

"github.com/google/uuid"
pkgGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc"
"github.com/plgd-dev/hub/v2/snippet-service/pb"
"github.com/plgd-dev/hub/v2/snippet-service/service"
"github.com/plgd-dev/hub/v2/snippet-service/store"
hubTest "github.com/plgd-dev/hub/v2/test"
pbTest "github.com/plgd-dev/hub/v2/test/pb"
Expand Down Expand Up @@ -106,15 +104,3 @@ func AddAppliedConfigurationsToStore(ctx context.Context, t *testing.T, s store.
require.NoError(t, err)
return acs
}

func AddAppliedConfigurations(ctx context.Context, t *testing.T, ownerClaim string, ss *service.Service) map[string]*store.AppliedConfiguration {
configurations := getAppliedConfigurations(t)
for _, c := range configurations {
ctxWithToken := pkgGrpc.CtxWithIncomingToken(ctx, GetTokenWithOwnerClaim(t, c.GetOwner(), ownerClaim))
newConf, err := ss.SnippetServiceServer().CreateAppliedConfiguration(ctxWithToken, c.GetAppliedConfiguration())
require.NoError(t, err)
c.Timestamp = newConf.GetTimestamp()
configurations[newConf.GetId()] = c
}
return configurations
}
8 changes: 4 additions & 4 deletions snippet-service/test/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,19 @@ func AddConditions(ctx context.Context, t *testing.T, ownerClaim string, ssc pb.
conditions := getConditions(n, calcVersion)
for _, c := range conditions {
ctxWithToken := pkgGrpc.CtxWithToken(ctx, GetTokenWithOwnerClaim(t, c.Owner, ownerClaim))
for i := range c.Versions {
cond := c.GetCondition(i)
c.RangeVersions(func(i int, cond *pb.Condition) bool {
if i == 0 {
createdCond, err := ssc.CreateCondition(ctxWithToken, cond)
require.NoError(t, err)
c.Latest.Timestamp = createdCond.GetTimestamp()
c.Versions[i].Timestamp = createdCond.GetTimestamp()
continue
return true
}
updatedCond, err := ssc.UpdateCondition(ctxWithToken, cond)
require.NoError(t, err)
c.Versions[i].Timestamp = updatedCond.GetTimestamp()
}
return true
})
conditions[c.Id] = c
}
return conditions
Expand Down
Loading

0 comments on commit 0dfa3f3

Please sign in to comment.