From 2467abf51d6c5cf33782e25afb7882920867f634 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Wed, 12 Jun 2024 12:35:51 +0200 Subject: [PATCH] fixup! snippet-service: add ResourceChanged event subscription --- cloud2cloud-gateway/test/test.go | 4 +- coap-gateway/service/clientObserveHandler.go | 4 +- .../service/observation/deviceObserver.go | 4 +- coap-gateway/service/signIn.go | 4 +- grpc-gateway/client/client_test.go | 21 -- grpc-gateway/client/createResource_test.go | 3 +- grpc-gateway/client/deleteResource_test.go | 5 +- .../client/deviceSubscriptions_test.go | 7 +- grpc-gateway/client/getDevice_test.go | 3 +- grpc-gateway/client/getDevices_test.go | 3 +- grpc-gateway/client/getResource_test.go | 3 +- grpc-gateway/client/maintenance.go | 4 +- grpc-gateway/client/maintenance_test.go | 5 +- .../client/observeDeviceResources_test.go | 3 +- grpc-gateway/client/observeDevices_test.go | 3 +- grpc-gateway/client/observeResource_test.go | 3 +- grpc-gateway/client/updateResource_test.go | 5 +- grpc-gateway/test/test.go | 21 ++ .../service/getDeviceResourceLinks_test.go | 6 +- pkg/strings/slice.go | 28 +-- pkg/strings/slice_test.go | 43 ---- snippet-service/config.yaml | 19 ++ snippet-service/service/config.go | 27 ++- snippet-service/service/config_test.go | 10 + snippet-service/service/grpc/server.go | 15 ++ snippet-service/service/resourceSubscriber.go | 215 ++++++++++++++++++ snippet-service/service/service.go | 117 +++++----- snippet-service/service/service_test.go | 120 ++++++++++ snippet-service/store/condition.go | 14 +- snippet-service/store/mongodb/condition.go | 53 ++--- .../store/mongodb/getLatestConditions_test.go | 109 +++++---- snippet-service/test/condition.go | 7 +- snippet-service/test/service.go | 6 + test/test.go | 4 +- 34 files changed, 636 insertions(+), 262 deletions(-) diff --git a/cloud2cloud-gateway/test/test.go b/cloud2cloud-gateway/test/test.go index f75879c09..500dbee87 100644 --- a/cloud2cloud-gateway/test/test.go +++ b/cloud2cloud-gateway/test/test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "net" + "slices" "sync" "testing" "time" @@ -15,7 +16,6 @@ import ( "github.com/plgd-dev/hub/v2/pkg/mongodb" "github.com/plgd-dev/hub/v2/pkg/net/http" "github.com/plgd-dev/hub/v2/pkg/security/certManager/server" - pkgStrings "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/test/config" testHttp "github.com/plgd-dev/hub/v2/test/http" "github.com/stretchr/testify/require" @@ -98,7 +98,7 @@ func C2CURI(uri string) string { func GetUniqueSubscriptionID(subIDS ...string) string { id := uuid.NewString() for { - if !pkgStrings.Contains(subIDS, id) { + if !slices.Contains(subIDS, id) { break } id = uuid.NewString() diff --git a/coap-gateway/service/clientObserveHandler.go b/coap-gateway/service/clientObserveHandler.go index 00f8d4813..a2f265505 100644 --- a/coap-gateway/service/clientObserveHandler.go +++ b/coap-gateway/service/clientObserveHandler.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "slices" "sync" "sync/atomic" @@ -17,7 +18,6 @@ import ( "github.com/plgd-dev/hub/v2/coap-gateway/service/message" "github.com/plgd-dev/hub/v2/grpc-gateway/pb" "github.com/plgd-dev/hub/v2/grpc-gateway/subscription" - "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" "github.com/plgd-dev/hub/v2/resource-aggregate/events" ) @@ -119,7 +119,7 @@ func (s *resourceSubscription) isDuplicateEvent(ev *events.ResourceChanged) bool func (s *resourceSubscription) eventHandler(e *pb.Event) error { switch { case e.GetResourceUnpublished() != nil: - if !strings.Contains(e.GetResourceUnpublished().GetHrefs(), s.href) { + if !slices.Contains(e.GetResourceUnpublished().GetHrefs(), s.href) { return nil } s.cancelSubscription(coapCodes.ServiceUnavailable) diff --git a/coap-gateway/service/observation/deviceObserver.go b/coap-gateway/service/observation/deviceObserver.go index baaf63218..cd99da812 100644 --- a/coap-gateway/service/observation/deviceObserver.go +++ b/coap-gateway/service/observation/deviceObserver.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "slices" "github.com/plgd-dev/device/v2/schema" "github.com/plgd-dev/device/v2/schema/interfaces" @@ -16,7 +17,6 @@ import ( "github.com/plgd-dev/hub/v2/grpc-gateway/pb" "github.com/plgd-dev/hub/v2/pkg/log" "github.com/plgd-dev/hub/v2/pkg/net/coap" - pkgStrings "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" pbRD "github.com/plgd-dev/hub/v2/resource-directory/pb" "google.golang.org/grpc" @@ -308,7 +308,7 @@ func IsDiscoveryResourceObservable(links schema.ResourceLinks) (bool, error) { return observable, nil } - return pkgStrings.Contains(res.Interfaces, observeInterface), nil + return slices.Contains(res.Interfaces, observeInterface), nil } func detectObservationType(links schema.ResourceLinks) (ObservationType, error) { diff --git a/coap-gateway/service/signIn.go b/coap-gateway/service/signIn.go index fcf6ff884..a3b5551c0 100644 --- a/coap-gateway/service/signIn.go +++ b/coap-gateway/service/signIn.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "slices" "time" "github.com/plgd-dev/go-coap/v3/message" @@ -14,7 +15,6 @@ import ( grpcgwClient "github.com/plgd-dev/hub/v2/grpc-gateway/client" "github.com/plgd-dev/hub/v2/identity-store/events" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" - "github.com/plgd-dev/hub/v2/pkg/strings" pkgTime "github.com/plgd-dev/hub/v2/pkg/time" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" "github.com/plgd-dev/kit/v2/codec/cbor" @@ -173,7 +173,7 @@ func subscribeToDeviceEvents(client *session, owner, deviceID string) error { if evt.GetOwner() != owner { return } - if !strings.Contains(evt.GetDeviceIds(), deviceID) { + if !slices.Contains(evt.GetDeviceIds(), deviceID) { return } client.Close() diff --git a/grpc-gateway/client/client_test.go b/grpc-gateway/client/client_test.go index 94555f8c9..bfb571a1d 100644 --- a/grpc-gateway/client/client_test.go +++ b/grpc-gateway/client/client_test.go @@ -2,13 +2,9 @@ package client_test import ( "context" - "crypto/tls" - "crypto/x509" - "testing" "github.com/plgd-dev/device/v2/schema/device" "github.com/plgd-dev/go-coap/v3/message" - "github.com/plgd-dev/hub/v2/grpc-gateway/client" "github.com/plgd-dev/hub/v2/grpc-gateway/pb" "github.com/plgd-dev/hub/v2/pkg/net/grpc/server" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" @@ -16,7 +12,6 @@ import ( "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" "github.com/plgd-dev/kit/v2/codec/cbor" - "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -26,22 +21,6 @@ const ( TestManufacturer = "Test Manufacturer" ) -func NewTestClient(t *testing.T) *client.Client { - rootCAs := x509.NewCertPool() - for _, c := range test.GetRootCertificateAuthorities(t) { - rootCAs.AddCert(c) - } - tlsCfg := tls.Config{ - RootCAs: rootCAs, - } - clientConfig := client.Config{ - GatewayAddress: config.GRPC_GW_HOST, - } - c, err := client.NewFromConfig(&clientConfig, &tlsCfg) - require.NoError(t, err) - return c -} - func NewGateway(addr string) (*server.Server, error) { s, err := server.NewServer(addr) if err != nil { diff --git a/grpc-gateway/client/createResource_test.go b/grpc-gateway/client/createResource_test.go index aad482a4c..745e73b7e 100644 --- a/grpc-gateway/client/createResource_test.go +++ b/grpc-gateway/client/createResource_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/plgd-dev/device/v2/schema/device" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" @@ -67,7 +68,7 @@ func TestClient_CreateResource(t *testing.T) { ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) diff --git a/grpc-gateway/client/deleteResource_test.go b/grpc-gateway/client/deleteResource_test.go index 9dbc20918..3d53a7fbc 100644 --- a/grpc-gateway/client/deleteResource_test.go +++ b/grpc-gateway/client/deleteResource_test.go @@ -10,6 +10,7 @@ import ( "github.com/plgd-dev/device/v2/schema/resources" "github.com/plgd-dev/hub/v2/grpc-gateway/client" extCodes "github.com/plgd-dev/hub/v2/grpc-gateway/pb/codes" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" @@ -83,7 +84,7 @@ func TestClientDeleteResource(t *testing.T) { ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) @@ -172,7 +173,7 @@ func TestClientBatchDeleteResource(t *testing.T) { ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) diff --git a/grpc-gateway/client/deviceSubscriptions_test.go b/grpc-gateway/client/deviceSubscriptions_test.go index 069fca867..ee672927b 100644 --- a/grpc-gateway/client/deviceSubscriptions_test.go +++ b/grpc-gateway/client/deviceSubscriptions_test.go @@ -9,6 +9,7 @@ import ( "github.com/plgd-dev/device/v2/schema/platform" "github.com/plgd-dev/go-coap/v3/message" "github.com/plgd-dev/hub/v2/grpc-gateway/pb" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" "github.com/plgd-dev/hub/v2/pkg/fsnotify" "github.com/plgd-dev/hub/v2/pkg/log" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" @@ -82,7 +83,7 @@ func TestObserveDeviceResourcesRetrieve(t *testing.T) { }() rac := raservice.NewResourceAggregateClient(raConn.GRPC()) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { errC := c.Close() require.NoError(t, errC) @@ -177,7 +178,7 @@ func TestObserveDeviceResourcesUpdate(t *testing.T) { }() rac := raservice.NewResourceAggregateClient(raConn.GRPC()) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { errC := c.Close() require.NoError(t, errC) @@ -306,7 +307,7 @@ func TestObserveDeviceResourcesCreateAndDelete(t *testing.T) { }() rac := raservice.NewResourceAggregateClient(raConn.GRPC()) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { errC := c.Close() require.NoError(t, errC) diff --git a/grpc-gateway/client/getDevice_test.go b/grpc-gateway/client/getDevice_test.go index 525d5f517..28399270f 100644 --- a/grpc-gateway/client/getDevice_test.go +++ b/grpc-gateway/client/getDevice_test.go @@ -10,6 +10,7 @@ import ( "github.com/plgd-dev/device/v2/test/resource/types" "github.com/plgd-dev/hub/v2/grpc-gateway/client" "github.com/plgd-dev/hub/v2/grpc-gateway/pb" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" test "github.com/plgd-dev/hub/v2/test" @@ -92,7 +93,7 @@ func TestClient_GetDevice(t *testing.T) { ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) diff --git a/grpc-gateway/client/getDevices_test.go b/grpc-gateway/client/getDevices_test.go index e92774ab1..546bc7eb5 100644 --- a/grpc-gateway/client/getDevices_test.go +++ b/grpc-gateway/client/getDevices_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/plgd-dev/hub/v2/grpc-gateway/client" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" test "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" @@ -69,7 +70,7 @@ func TestClient_GetDevices(t *testing.T) { ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) diff --git a/grpc-gateway/client/getResource_test.go b/grpc-gateway/client/getResource_test.go index 6131789f8..2467f73ab 100644 --- a/grpc-gateway/client/getResource_test.go +++ b/grpc-gateway/client/getResource_test.go @@ -8,6 +8,7 @@ import ( "github.com/plgd-dev/device/v2/schema/configuration" "github.com/plgd-dev/device/v2/schema/interfaces" "github.com/plgd-dev/hub/v2/grpc-gateway/client" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" @@ -100,7 +101,7 @@ func TestClientGetResource(t *testing.T) { ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) diff --git a/grpc-gateway/client/maintenance.go b/grpc-gateway/client/maintenance.go index 088ee18ec..1201aa0cc 100644 --- a/grpc-gateway/client/maintenance.go +++ b/grpc-gateway/client/maintenance.go @@ -3,10 +3,10 @@ package client import ( "context" "net/http" + "slices" "github.com/plgd-dev/device/v2/schema/maintenance" "github.com/plgd-dev/hub/v2/pkg/net/grpc" - "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/resource-aggregate/events" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -65,7 +65,7 @@ func (c *Client) updateMaintenanceResource( } var href string for _, r := range v.GetResources() { - if r.GetDeviceId() == deviceID && strings.Contains(r.GetResourceTypes(), maintenance.ResourceType) { + if r.GetDeviceId() == deviceID && slices.Contains(r.GetResourceTypes(), maintenance.ResourceType) { href = r.GetHref() break } diff --git a/grpc-gateway/client/maintenance_test.go b/grpc-gateway/client/maintenance_test.go index 7256a3b0e..6ec137392 100644 --- a/grpc-gateway/client/maintenance_test.go +++ b/grpc-gateway/client/maintenance_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" test "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" @@ -50,7 +51,7 @@ func TestClientFactoryReset(t *testing.T) { ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) @@ -104,7 +105,7 @@ func TestClientReboot(t *testing.T) { ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) diff --git a/grpc-gateway/client/observeDeviceResources_test.go b/grpc-gateway/client/observeDeviceResources_test.go index 9a05ff6a8..0c29b82c7 100644 --- a/grpc-gateway/client/observeDeviceResources_test.go +++ b/grpc-gateway/client/observeDeviceResources_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/resource-aggregate/events" test "github.com/plgd-dev/hub/v2/test" @@ -24,7 +25,7 @@ func TestObserveDeviceResourcesPublish(t *testing.T) { defer tearDown() ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) diff --git a/grpc-gateway/client/observeDevices_test.go b/grpc-gateway/client/observeDevices_test.go index 6f32642a2..964bcf8cc 100644 --- a/grpc-gateway/client/observeDevices_test.go +++ b/grpc-gateway/client/observeDevices_test.go @@ -7,6 +7,7 @@ import ( "testing" client "github.com/plgd-dev/hub/v2/grpc-gateway/client" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" @@ -23,7 +24,7 @@ func TestObserveDevices(t *testing.T) { defer tearDown() ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) diff --git a/grpc-gateway/client/observeResource_test.go b/grpc-gateway/client/observeResource_test.go index cdeaca197..1e7e81c4b 100644 --- a/grpc-gateway/client/observeResource_test.go +++ b/grpc-gateway/client/observeResource_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/plgd-dev/device/v2/schema/configuration" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" @@ -25,7 +26,7 @@ func TestObservingResource(t *testing.T) { defer tearDown() ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { errC := c.Close() require.NoError(t, errC) diff --git a/grpc-gateway/client/updateResource_test.go b/grpc-gateway/client/updateResource_test.go index f77e1e8f9..e2a69ae1f 100644 --- a/grpc-gateway/client/updateResource_test.go +++ b/grpc-gateway/client/updateResource_test.go @@ -8,6 +8,7 @@ import ( "github.com/plgd-dev/device/v2/schema/configuration" "github.com/plgd-dev/device/v2/schema/interfaces" "github.com/plgd-dev/hub/v2/grpc-gateway/client" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" @@ -94,7 +95,7 @@ func TestClientUpdateResource(t *testing.T) { ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { errC := c.Close() require.NoError(t, errC) @@ -131,7 +132,7 @@ func TestUpdateConfigurationName(t *testing.T) { defer tearDown() ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) - c := NewTestClient(t) + c := grpcgwTest.NewTestClient(t) defer func() { err := c.Close() require.NoError(t, err) diff --git a/grpc-gateway/test/test.go b/grpc-gateway/test/test.go index 5cc26e62c..9ac9ebba1 100644 --- a/grpc-gateway/test/test.go +++ b/grpc-gateway/test/test.go @@ -2,16 +2,37 @@ package test import ( "context" + "crypto/tls" + "crypto/x509" "sync" + "testing" "time" + "github.com/plgd-dev/hub/v2/grpc-gateway/client" "github.com/plgd-dev/hub/v2/grpc-gateway/service" "github.com/plgd-dev/hub/v2/pkg/fsnotify" "github.com/plgd-dev/hub/v2/pkg/log" + "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" "github.com/stretchr/testify/require" ) +func NewTestClient(t *testing.T) *client.Client { + rootCAs := x509.NewCertPool() + for _, c := range test.GetRootCertificateAuthorities(t) { + rootCAs.AddCert(c) + } + tlsCfg := tls.Config{ + RootCAs: rootCAs, + } + clientConfig := client.Config{ + GatewayAddress: config.GRPC_GW_HOST, + } + c, err := client.NewFromConfig(&clientConfig, &tlsCfg) + require.NoError(t, err) + return c +} + func MakeConfig(t require.TestingT) service.Config { var cfg service.Config diff --git a/http-gateway/service/getDeviceResourceLinks_test.go b/http-gateway/service/getDeviceResourceLinks_test.go index a7a71b7e1..13a8a650c 100644 --- a/http-gateway/service/getDeviceResourceLinks_test.go +++ b/http-gateway/service/getDeviceResourceLinks_test.go @@ -6,6 +6,7 @@ import ( "errors" "io" "net/http" + "slices" "testing" "time" @@ -16,7 +17,6 @@ import ( httpgwTest "github.com/plgd-dev/hub/v2/http-gateway/test" "github.com/plgd-dev/hub/v2/http-gateway/uri" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" - "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" "github.com/plgd-dev/hub/v2/resource-aggregate/events" test "github.com/plgd-dev/hub/v2/test" @@ -100,8 +100,8 @@ func TestRequestHandlerGetDeviceResourceLinks(t *testing.T) { { DeviceId: deviceID, Resources: test.ResourceLinksToResources(deviceID, test.FilterResourceLink(func(rl schema.ResourceLink) bool { - return strings.Contains(rl.ResourceTypes, collection.ResourceType) || - strings.Contains(rl.ResourceTypes, types.BINARY_SWITCH) + return slices.Contains(rl.ResourceTypes, collection.ResourceType) || + slices.Contains(rl.ResourceTypes, types.BINARY_SWITCH) }, resourceLinks)), AuditContext: commands.NewAuditContext(oauthService.DeviceUserID, "", oauthService.DeviceUserID), }, diff --git a/pkg/strings/slice.go b/pkg/strings/slice.go index 264e4d171..c754aea59 100644 --- a/pkg/strings/slice.go +++ b/pkg/strings/slice.go @@ -1,6 +1,9 @@ package strings -import "errors" +import ( + "errors" + "slices" +) var ErrInvalidType = errors.New("invalid type") @@ -51,27 +54,8 @@ func Unique(s []string) []string { return nil } - set := make(map[string]struct{}) - for _, v := range s { - set[v] = struct{}{} - } - - keys := make([]string, len(set)) - i := 0 - for k := range set { - keys[i] = k - i++ - } - return keys -} - -func Contains(slice []string, s string) bool { - for _, v := range slice { - if v == s { - return true - } - } - return false + slices.Sort(s) + return slices.Compact(s) } // ToSlice converts a string or a []string. diff --git a/pkg/strings/slice_test.go b/pkg/strings/slice_test.go index 6fee31e9f..dd1d021d3 100644 --- a/pkg/strings/slice_test.go +++ b/pkg/strings/slice_test.go @@ -173,49 +173,6 @@ func TestUnique(t *testing.T) { } } -func TestContains(t *testing.T) { - type args struct { - slice []string - s string - } - tests := []struct { - name string - args args - want bool - }{ - { - name: "Empty", - args: args{ - slice: nil, - s: "a", - }, - want: false, - }, - { - name: "Not found", - args: args{ - slice: []string{"a", "bb", "ccc"}, - s: "b", - }, - want: false, - }, - { - name: "Found", - args: args{ - slice: []string{"a", "bb", "ccc"}, - s: "bb", - }, - want: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := Contains(tt.args.slice, tt.args.s) - require.Equal(t, tt.want, got) - }) - } -} - func TestToSlice(t *testing.T) { s, err := ToSlice(nil) require.NoError(t, err) diff --git a/snippet-service/config.yaml b/snippet-service/config.yaml index 4b65c0826..52b629784 100644 --- a/snippet-service/config.yaml +++ b/snippet-service/config.yaml @@ -50,6 +50,11 @@ apis: readHeaderTimeout: 4s writeTimeout: 16s idleTimeout: 30s + tls: + caPool: "/secrets/public/rootca.crt" + keyFile: "/secrets/private/cert.key" + certFile: "/secrets/private/cert.crt" + clientCertificateRequired: true authorization: ownerClaim: "sub" authority: "" @@ -129,3 +134,17 @@ clients: keyFile: "/secrets/private/cert.key" certFile: "/secrets/public/cert.crt" useSystemCAPool: false + resourceAggregate: + grpc: + address: "" + sendMsgSize: 4194304 + recvMsgSize: 4194304 + keepAlive: + time: 10s + timeout: 20s + permitWithoutStream: true + tls: + caPool: "/secrets/public/rootca.crt" + keyFile: "/secrets/private/cert.key" + certFile: "/secrets/public/cert.crt" + useSystemCAPool: false diff --git a/snippet-service/service/config.go b/snippet-service/service/config.go index 8632663bd..e6461a587 100644 --- a/snippet-service/service/config.go +++ b/snippet-service/service/config.go @@ -9,9 +9,11 @@ import ( "github.com/google/uuid" "github.com/plgd-dev/hub/v2/pkg/config" "github.com/plgd-dev/hub/v2/pkg/log" + grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client" grpcServer "github.com/plgd-dev/hub/v2/pkg/net/grpc/server" httpServer "github.com/plgd-dev/hub/v2/pkg/net/http/server" otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" + certManagerServer "github.com/plgd-dev/hub/v2/pkg/security/certManager/server" natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" grpcService "github.com/plgd-dev/hub/v2/snippet-service/service/grpc" storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" @@ -20,6 +22,7 @@ import ( type HTTPConfig struct { Addr string `yaml:"address" json:"address"` Server httpServer.Config `yaml:",inline" json:",inline"` + TLS certManagerServer.Config `yaml:"tls" json:"tls"` Authorization grpcServer.AuthorizationConfig `yaml:"authorization" json:"authorization"` } @@ -27,6 +30,9 @@ func (c *HTTPConfig) Validate() error { if _, err := net.ResolveTCPAddr("tcp", c.Addr); err != nil { return fmt.Errorf("address('%v') - %w", c.Addr, err) } + if err := c.TLS.Validate(); err != nil { + return fmt.Errorf("tls.%w", err) + } if err := c.Authorization.Validate(); err != nil { return fmt.Errorf("authorization.%w", err) } @@ -81,10 +87,22 @@ func (c *StorageConfig) Validate() error { return nil } +type ResourceAggregateConfig struct { + Connection grpcClient.Config `yaml:"grpc" json:"grpc"` +} + +func (c *ResourceAggregateConfig) Validate() error { + if err := c.Connection.Validate(); err != nil { + return fmt.Errorf("grpc.%w", err) + } + return nil +} + type ClientsConfig struct { - Storage StorageConfig `yaml:"storage" json:"storage"` - OpenTelemetryCollector otelClient.Config `yaml:"openTelemetryCollector" json:"openTelemetryCollector"` - NATS natsClient.Config `yaml:"nats" json:"nats"` + Storage StorageConfig `yaml:"storage" json:"storage"` + OpenTelemetryCollector otelClient.Config `yaml:"openTelemetryCollector" json:"openTelemetryCollector"` + NATS natsClient.Config `yaml:"nats" json:"nats"` + ResourceAggregate ResourceAggregateConfig `yaml:"resourceAggregate" json:"resourceAggregate"` } func (c *ClientsConfig) Validate() error { @@ -97,6 +115,9 @@ func (c *ClientsConfig) Validate() error { if err := c.NATS.Validate(); err != nil { return fmt.Errorf("nats.%w", err) } + if err := c.ResourceAggregate.Validate(); err != nil { + return fmt.Errorf("resourceAggregate.%w", err) + } return nil } diff --git a/snippet-service/service/config_test.go b/snippet-service/service/config_test.go index de046bb2b..9bd469c7f 100644 --- a/snippet-service/service/config_test.go +++ b/snippet-service/service/config_test.go @@ -6,6 +6,7 @@ import ( "github.com/plgd-dev/hub/v2/pkg/log" grpcServer "github.com/plgd-dev/hub/v2/pkg/net/grpc/server" otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" + certManagerServer "github.com/plgd-dev/hub/v2/pkg/security/certManager/server" natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" "github.com/plgd-dev/hub/v2/snippet-service/service" storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" @@ -77,6 +78,15 @@ func TestHTTPConfig(t *testing.T) { }(), wantErr: true, }, + { + name: "invalid - bad TLS", + cfg: func() service.HTTPConfig { + cfg := test.MakeHTTPConfig() + cfg.TLS = certManagerServer.Config{} + return cfg + }(), + wantErr: true, + }, { name: "invalid - bad authorization", cfg: func() service.HTTPConfig { diff --git a/snippet-service/service/grpc/server.go b/snippet-service/service/grpc/server.go index f6374b063..ac9603024 100644 --- a/snippet-service/service/grpc/server.go +++ b/snippet-service/service/grpc/server.go @@ -224,6 +224,21 @@ func (s *SnippetServiceServer) GetConditions(req *pb.GetConditionsRequest, srv p return nil } +func (s *SnippetServiceServer) GetLatestConditions(req *store.GetLatestConditionsQuery, srv pb.SnippetService_GetConditionsServer) error { + owner, err := s.checkOwner(srv.Context(), "") + if err != nil { + return s.logger.LogAndReturnError(status.Errorf(codes.PermissionDenied, "%v", errCannotGetConditions(err))) + } + + err = s.store.GetLatestConditions(srv.Context(), owner, req, func(c *store.Condition) error { + return sendCondition(srv, c) + }) + if err != nil { + return s.logger.LogAndReturnError(status.Errorf(codes.Internal, "%v", errCannotGetConditions(err))) + } + return nil +} + func errCannotDeleteConditions(err error) error { return fmt.Errorf("cannot delete conditions: %w", err) } diff --git a/snippet-service/service/resourceSubscriber.go b/snippet-service/service/resourceSubscriber.go index 875222fce..fb54575a3 100644 --- a/snippet-service/service/resourceSubscriber.go +++ b/snippet-service/service/resourceSubscriber.go @@ -2,19 +2,234 @@ package service import ( "context" + "errors" "fmt" "github.com/google/uuid" + "github.com/hashicorp/go-multierror" isEvents "github.com/plgd-dev/hub/v2/identity-store/events" + "github.com/plgd-dev/hub/v2/pkg/fn" "github.com/plgd-dev/hub/v2/pkg/fsnotify" "github.com/plgd-dev/hub/v2/pkg/log" + pkgGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" + grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client" + "github.com/plgd-dev/hub/v2/resource-aggregate/commands" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus" natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/subscriber" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/utils" "github.com/plgd-dev/hub/v2/resource-aggregate/events" + raService "github.com/plgd-dev/hub/v2/resource-aggregate/service" + "github.com/plgd-dev/hub/v2/snippet-service/pb" + "github.com/plgd-dev/hub/v2/snippet-service/store" + "go.opentelemetry.io/otel/trace" ) +type resourceChangedHandler struct { + storage store.Store + raConn *grpcClient.Client + raClient raService.ResourceAggregateClient + logger log.Logger +} + +func newResourceChangedHandler(config ResourceAggregateConfig, storage store.Store, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*resourceChangedHandler, error) { + var fl fn.FuncList + raConn, err := grpcClient.New(config.Connection, fileWatcher, logger, tracerProvider) + if err != nil { + return nil, fmt.Errorf("cannot connect to resource aggregate: %w", err) + } + fl.AddFunc(func() { + if err := raConn.Close(); err != nil && !pkgGrpc.IsContextCanceled(err) { + logger.Errorf("error occurs during closing of the connection to resource-aggregate: %w", err) + } + }) + return &resourceChangedHandler{ + storage: storage, + raConn: raConn, + raClient: raService.NewResourceAggregateClient(raConn.GRPC()), + logger: logger, + }, nil +} + +func (h *resourceChangedHandler) getConditions(ctx context.Context, owner, deviceID, resourceHref string, resourceTypes []string) ([]*pb.Condition, error) { + conditions := make([]*pb.Condition, 0, 4) + err := h.storage.GetLatestConditions(ctx, owner, &store.GetLatestConditionsQuery{ + DeviceID: deviceID, + ResourceHref: resourceHref, + ResourceTypeFilter: resourceTypes, + }, func(v *store.Condition) error { + c, errG := v.GetLatest() + if errG != nil { + return fmt.Errorf("cannot get condition: %w", errG) + } + conditions = append(conditions, c.Clone()) + return nil + }) + if err != nil { + return nil, fmt.Errorf("cannot get latest conditions: %w", err) + } + + // TODO: evaluate conditions + // https://github.com/itchyny/gojq + + return conditions, nil +} + +type configurationWithTokens struct { + configuration *pb.Configuration + tokens []string +} + +func (h *resourceChangedHandler) getConfigurationsWithTokens(ctx context.Context, owner string, conditions []*pb.Condition) (map[string]configurationWithTokens, error) { + confTokens := make(map[string][]string) + idFilter := make([]*pb.IDFilter, 0, len(conditions)) + for _, c := range conditions { + confID := c.GetConfigurationId() + if confID == "" { + h.logger.Warnf("invalid condition(%v)", c) + continue + } + tokens := confTokens[confID] + tokens = append(tokens, c.GetApiAccessToken()) + confTokens[confID] = tokens + idFilter = append(idFilter, &pb.IDFilter{ + Id: confID, + Version: &pb.IDFilter_Latest{ + Latest: true, + }, + }) + } + if (len(idFilter)) == 0 { + return map[string]configurationWithTokens{}, nil + } + + // get configurations + configurations := make([]*pb.Configuration, 0, 4) + err := h.storage.GetConfigurations(ctx, owner, &pb.GetConfigurationsRequest{ + IdFilter: idFilter, + }, func(v *store.Configuration) error { + c, errG := v.GetLatest() + if errG != nil { + return fmt.Errorf("cannot get configuration: %w", errG) + } + configurations = append(configurations, c.Clone()) + return nil + }) + if err != nil { + return nil, fmt.Errorf("cannot get configurations: %w", err) + } + + confsWithTokens := make(map[string]configurationWithTokens) + for _, c := range configurations { + tokens := confTokens[c.GetId()] + if len(tokens) == 0 { + h.logger.Errorf("no tokens found for configuration(id:%v)", c.GetId()) + continue + } + confsWithTokens[c.GetId()] = configurationWithTokens{ + configuration: c, + tokens: confTokens[c.GetId()], + } + } + + return confsWithTokens, nil +} + +func (h *resourceChangedHandler) applyConfigurations(ctx context.Context, rc *events.ResourceChanged) error { + owner := rc.GetAuditContext().GetOwner() + + if owner == "" { + return errors.New("owner not set") + } + + resourceID := rc.GetResourceId() + deviceID := resourceID.GetDeviceId() + resourceHref := resourceID.GetHref() + resourceTypes := rc.GetResourceTypes() + // get matching conditions + conditions, err := h.getConditions(ctx, owner, deviceID, resourceHref, resourceTypes) + if err != nil { + return err + } + + // get configurations + confsWithTokens, err := h.getConfigurationsWithTokens(ctx, owner, conditions) + if err != nil { + return err + } + + // apply configurations + var errors *multierror.Error + for confID, c := range confsWithTokens { + for _, cr := range c.configuration.GetResources() { + if cr.GetHref() != resourceHref { + continue + } + + // insert applied configuration + /// - ak force tak upsert + + // CorrelationId = vygenerovat uuid.NewString, cez InvokeConfiguration sa moze nastavit + // + + for _, token := range c.tokens { + h.logger.Infof("applying configuration(id:%v) to resource(%v)", confID, resourceHref) + ctxWithToken := pkgGrpc.CtxWithToken(ctx, token) + upd := &commands.UpdateResourceRequest{ + ResourceId: resourceID, + CorrelationId: "snippet-service configuration apply", + Content: cr.GetContent(), + TimeToLive: cr.GetTimeToLive(), + CommandMetadata: &commands.CommandMetadata{ + ConnectionId: confID, + }, + } + _, err := h.raClient.UpdateResource(ctxWithToken, upd) + if err != nil { + errors = multierror.Append(errors, err) + continue + } + + // v response je validUntil, ak uplynie cas a zostane v stave pending tak nastavit timeout + + // zapis do AppliedConfigurations + + h.logger.Infof("configuration(id:%v) applied to resource(%v)", confID, resourceHref) + break + } + // TODO: write applied configuration to storage + // ak sa nepodari ziadny update tak zapisat ResourceUpdated so statusom chybou + } + } + return errors.ErrorOrNil() +} + +func (h *resourceChangedHandler) Handle(ctx context.Context, iter eventbus.Iter) error { + for { + ev, ok := iter.Next(ctx) + if !ok { + return iter.Err() + } + var s events.ResourceChanged + if ev.EventType() != s.EventType() { + h.logger.Errorf("unexpected event type: %v", ev.EventType()) + continue + } + if err := ev.Unmarshal(&s); err != nil { + h.logger.Errorf("cannot unmarshal event: %w", err) + continue + } + h.logger.Infof("resource change received: %v", &s) + if err := h.applyConfigurations(ctx, &s); err != nil { + h.logger.Errorf("cannot apply configurations: %w", err) + } + } +} + +func (h *resourceChangedHandler) Close() error { + return h.raConn.Close() +} + type ResourceSubscriber struct { natsClient *natsClient.Client subscriptionHandler eventbus.Handler diff --git a/snippet-service/service/service.go b/snippet-service/service/service.go index 02331b9ea..eae0a7c48 100644 --- a/snippet-service/service/service.go +++ b/snippet-service/service/service.go @@ -14,8 +14,6 @@ import ( otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" "github.com/plgd-dev/hub/v2/pkg/security/jwt/validator" "github.com/plgd-dev/hub/v2/pkg/service" - "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus" - "github.com/plgd-dev/hub/v2/resource-aggregate/events" grpcService "github.com/plgd-dev/hub/v2/snippet-service/service/grpc" httpService "github.com/plgd-dev/hub/v2/snippet-service/service/http" "github.com/plgd-dev/hub/v2/snippet-service/store" @@ -30,26 +28,9 @@ const serviceName = "snippet-service" type Service struct { *service.Service - resourceSubscriber *ResourceSubscriber -} - -type changeHandler struct{} - -func (h *changeHandler) Handle(ctx context.Context, iter eventbus.Iter) (err error) { - for { - ev, ok := iter.Next(ctx) - if !ok { - return iter.Err() - } - var s events.ResourceChanged - if ev.EventType() != s.EventType() { - continue - } - if err := ev.Unmarshal(&s); err != nil { - return err - } - // TODO: handle resource changed event - } + snippetService *grpcService.SnippetServiceServer + resourceChangeHandler *resourceChangedHandler + resourceSubscriber *ResourceSubscriber } func createStore(ctx context.Context, config storeConfig.Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (store.Store, error) { @@ -85,6 +66,7 @@ func newStore(ctx context.Context, config StorageConfig, fileWatcher *fsnotify.W if config.CleanUpRecords == "" { return db, fl.ToFunction(), nil } + // TODO: do we need a cron job? s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan if err != nil { fl.Execute() @@ -111,6 +93,39 @@ func newStore(ctx context.Context, config StorageConfig, fileWatcher *fsnotify.W return db, fl.ToFunction(), nil } +func newHttpService(ctx context.Context, config HTTPConfig, ss *grpcService.SnippetServiceServer, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*httpService.Service, func(), error) { + httpValidator, err := validator.New(ctx, config.Authorization.Config, fileWatcher, logger, tracerProvider) + if err != nil { + return nil, nil, fmt.Errorf("cannot create http validator: %w", err) + } + httpService, err := httpService.New(serviceName, httpService.Config{ + Connection: listener.Config{ + Addr: config.Addr, + TLS: config.TLS, + }, + Authorization: config.Authorization.Config, + Server: config.Server, + }, ss, httpValidator, fileWatcher, logger, tracerProvider) + if err != nil { + httpValidator.Close() + return nil, nil, fmt.Errorf("cannot create http service: %w", err) + } + return httpService, httpValidator.Close, nil +} + +func newGrpcService(ctx context.Context, config grpcService.Config, ss *grpcService.SnippetServiceServer, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*grpcService.Service, func(), error) { + grpcValidator, err := validator.New(ctx, config.Authorization.Config, fileWatcher, logger, tracerProvider) + if err != nil { + return nil, nil, fmt.Errorf("cannot create grpc validator: %w", err) + } + grpcService, err := grpcService.New(config, ss, grpcValidator, fileWatcher, logger, tracerProvider) + if err != nil { + grpcValidator.Close() + return nil, nil, fmt.Errorf("cannot create grpc service: %w", err) + } + return grpcService, grpcValidator.Close, nil +} + func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logger log.Logger) (*Service, error) { otelClient, err := otelClient.New(ctx, config.Clients.OpenTelemetryCollector, serviceName, fileWatcher, logger) if err != nil { @@ -127,61 +142,59 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg } closerFn.AddFunc(closeStore) - ca := grpcService.NewSnippetServiceServer(config.APIs.GRPC.Authorization.OwnerClaim, config.HubID, dbStorage, logger) + snippetService := grpcService.NewSnippetServiceServer(config.APIs.GRPC.Authorization.OwnerClaim, config.HubID, dbStorage, logger) closerFn.AddFunc(func() { - errC := ca.Close(ctx) + errC := snippetService.Close(ctx) if errC != nil { log.Errorf("failed to close grpc %s server: %w", serviceName, errC) } }) - resourceSubscriber, err := NewResourceSubscriber(ctx, config.Clients.NATS, fileWatcher, logger, &changeHandler{}) + resourceChangeHandler, err := newResourceChangedHandler(config.Clients.ResourceAggregate, dbStorage, fileWatcher, logger, tracerProvider) if err != nil { closerFn.Execute() - return nil, fmt.Errorf("cannot create resource subscriber: %w", err) + return nil, fmt.Errorf("cannot create resource change handler: %w", err) } closerFn.AddFunc(func() { - errC := resourceSubscriber.Close() + errC := resourceChangeHandler.Close() if errC != nil { - log.Errorf("failed to close resource subscriber: %w", errC) + log.Errorf("failed to close resource change handler: %w", errC) } }) - httpValidator, err := validator.New(ctx, config.APIs.HTTP.Authorization.Config, fileWatcher, logger, tracerProvider) + resourceSubscriber, err := NewResourceSubscriber(ctx, config.Clients.NATS, fileWatcher, logger, resourceChangeHandler) if err != nil { closerFn.Execute() - return nil, fmt.Errorf("cannot create http validator: %w", err) - } - closerFn.AddFunc(httpValidator.Close) - httpService, err := httpService.New(serviceName, httpService.Config{ - Connection: listener.Config{ - Addr: config.APIs.HTTP.Addr, - TLS: config.APIs.GRPC.TLS, - }, - Authorization: config.APIs.GRPC.Authorization.Config, - Server: config.APIs.HTTP.Server, - }, ca, httpValidator, fileWatcher, logger, tracerProvider) - if err != nil { - closerFn.Execute() - return nil, fmt.Errorf("cannot create http service: %w", err) + return nil, fmt.Errorf("cannot create resource subscriber: %w", err) } - grpcValidator, err := validator.New(ctx, config.APIs.GRPC.Authorization.Config, fileWatcher, logger, tracerProvider) + closerFn.AddFunc(func() { + errC := resourceSubscriber.Close() + if errC != nil { + log.Errorf("failed to close resource subscriber: %w", errC) + } + }) + + httpService, httpServiceClose, err := newHttpService(ctx, config.APIs.HTTP, snippetService, fileWatcher, logger, tracerProvider) if err != nil { - _ = httpService.Close() closerFn.Execute() - return nil, fmt.Errorf("cannot create grpc validator: %w", err) + return nil, err } - closerFn.AddFunc(grpcValidator.Close) - grpcService, err := grpcService.New(config.APIs.GRPC, ca, grpcValidator, fileWatcher, logger, tracerProvider) + closerFn.AddFunc(httpServiceClose) + + grpcService, grpcServiceClose, err := newGrpcService(ctx, config.APIs.GRPC, snippetService, fileWatcher, logger, tracerProvider) if err != nil { - _ = httpService.Close() closerFn.Execute() - return nil, fmt.Errorf("cannot create grpc service: %w", err) + return nil, err } + closerFn.AddFunc(grpcServiceClose) + s := service.New(httpService, grpcService) s.AddCloseFunc(closerFn.Execute) return &Service{ - Service: s, - resourceSubscriber: resourceSubscriber, + Service: s, + + snippetService: snippetService, + resourceChangeHandler: resourceChangeHandler, + resourceSubscriber: resourceSubscriber, }, nil } diff --git a/snippet-service/service/service_test.go b/snippet-service/service/service_test.go index c54cbfdec..56625c563 100644 --- a/snippet-service/service/service_test.go +++ b/snippet-service/service/service_test.go @@ -18,23 +18,35 @@ package service_test import ( "context" + "crypto/tls" "fmt" "testing" + "github.com/plgd-dev/go-coap/v3/message" + grpcgwTest "github.com/plgd-dev/hub/v2/grpc-gateway/test" "github.com/plgd-dev/hub/v2/pkg/config/database" "github.com/plgd-dev/hub/v2/pkg/fsnotify" "github.com/plgd-dev/hub/v2/pkg/log" "github.com/plgd-dev/hub/v2/pkg/mongodb" + pkgGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" httpClient "github.com/plgd-dev/hub/v2/pkg/net/http/client" otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" + "github.com/plgd-dev/hub/v2/resource-aggregate/commands" + natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client" + "github.com/plgd-dev/hub/v2/snippet-service/pb" "github.com/plgd-dev/hub/v2/snippet-service/service" storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config" storeCqlDB "github.com/plgd-dev/hub/v2/snippet-service/store/cqldb" storeMongo "github.com/plgd-dev/hub/v2/snippet-service/store/mongodb" "github.com/plgd-dev/hub/v2/snippet-service/test" + hubTest "github.com/plgd-dev/hub/v2/test" "github.com/plgd-dev/hub/v2/test/config" + oauthService "github.com/plgd-dev/hub/v2/test/oauth-server/service" + oauthTest "github.com/plgd-dev/hub/v2/test/oauth-server/test" hubTestService "github.com/plgd-dev/hub/v2/test/service" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) func TestServiceNew(t *testing.T) { @@ -121,6 +133,24 @@ func TestServiceNew(t *testing.T) { }(), wantErr: true, }, + { + name: "invalid resource subscriber config", + cfg: func() service.Config { + cfg := test.MakeConfig(t) + cfg.Clients.NATS = natsClient.Config{} + return cfg + }(), + wantErr: true, + }, + { + name: "invalid resource aggregate client config", + cfg: func() service.Config { + cfg := test.MakeConfig(t) + cfg.Clients.ResourceAggregate = service.ResourceAggregateConfig{} + return cfg + }(), + wantErr: true, + }, { name: "invalid HTTP validator config", cfg: func() service.Config { @@ -176,3 +206,93 @@ func TestServiceNew(t *testing.T) { }) } } + +func TestService(t *testing.T) { + deviceID := hubTest.MustFindDeviceByName(hubTest.TestDeviceName) + ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT*100) + defer cancel() + + logCfg := log.MakeDefaultConfig() + logCfg.Level = log.DebugLevel + log.Setup(logCfg) + tearDown := hubTestService.SetUp(ctx, t) + defer tearDown() + + snippetCfg := test.MakeConfig(t) + shutdownSnippetService := test.New(t, snippetCfg) + // defer shutdownSnippetService() + + snippetClientConn, err := grpc.NewClient(config.SNIPPET_SERVICE_HOST, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ + RootCAs: hubTest.GetRootCertificatePool(t), + }))) + require.NoError(t, err) + defer func() { + _ = snippetClientConn.Close() + }() + snippetClient := pb.NewSnippetServiceClient(snippetClientConn) + + token := oauthTest.GetDefaultAccessToken(t) + ctx = pkgGrpc.CtxWithToken(ctx, token) + + // configuration -> /light/1 -> { state: on, power: 42 } + conf, err := snippetClient.CreateConfiguration(ctx, &pb.Configuration{ + Name: "update light", + Owner: oauthService.DeviceUserID, + Resources: []*pb.Configuration_Resource{ + { + Href: hubTest.TestResourceLightInstanceHref("1"), + Content: &commands.Content{ + ContentType: message.AppOcfCbor.String(), + Data: hubTest.EncodeToCbor(t, map[string]interface{}{ + "state": true, + "power": 42, + }), + }, + }, + }, + }) + require.NoError(t, err) + require.NotEmpty(t, conf.GetId()) + + // condition for /light/1 + _, err = snippetClient.CreateCondition(ctx, &pb.Condition{ + Name: "apply update light", + Owner: oauthService.DeviceUserID, + Enabled: true, + ConfigurationId: conf.GetId(), + DeviceIdFilter: []string{deviceID}, + ResourceHrefFilter: []string{hubTest.TestResourceLightInstanceHref("1")}, + ApiAccessToken: token, + }) + require.NoError(t, err) + + grpcClient := grpcgwTest.NewTestClient(t) + defer func() { + err = grpcClient.Close() + require.NoError(t, err) + }() + + resources := hubTest.GetAllBackendResourceLinks() + _, shutdownDevSim := hubTest.OnboardDevSim(ctx, t, grpcClient.GrpcGatewayClient(), deviceID, config.ACTIVE_COAP_SCHEME+"://"+config.COAP_GW_HOST, resources) + defer shutdownDevSim() + + var got map[interface{}]interface{} + err = grpcClient.GetResource(ctx, deviceID, hubTest.TestResourceLightInstanceHref("1"), &got) + require.NoError(t, err) + + require.Equal(t, map[interface{}]interface{}{ + "state": true, + "power": uint64(42), + "name": "Light", + }, got) + + // TODO: use defer after AppliedConfiguration is implemented + shutdownSnippetService() + + // restore state + err = grpcClient.UpdateResource(ctx, deviceID, hubTest.TestResourceLightInstanceHref("1"), map[string]interface{}{ + "state": false, + "power": uint64(0), + }, nil) + require.NoError(t, err) +} diff --git a/snippet-service/store/condition.go b/snippet-service/store/condition.go index c8c69c189..d7e3e288c 100644 --- a/snippet-service/store/condition.go +++ b/snippet-service/store/condition.go @@ -6,6 +6,7 @@ import ( "slices" "github.com/google/uuid" + "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/snippet-service/pb" ) @@ -20,11 +21,6 @@ func checkConfigurationId(c string, isUpdate bool) error { return nil } -func NormalizeSlice(s []string) []string { - slices.Sort(s) - return slices.Compact(s) -} - func ValidateAndNormalizeCondition(c *pb.Condition, isUpdate bool) error { if isUpdate || c.GetId() != "" { if _, err := uuid.Parse(c.GetId()); err != nil { @@ -38,9 +34,9 @@ func ValidateAndNormalizeCondition(c *pb.Condition, isUpdate bool) error { return errInvalidArgument(errors.New("missing owner")) } // ensure that filter arrays are sorted and compacted, so we can query for exact match instead of other more expensive queries - c.DeviceIdFilter = NormalizeSlice(c.GetDeviceIdFilter()) - c.ResourceTypeFilter = NormalizeSlice(c.GetResourceTypeFilter()) - c.ResourceHrefFilter = NormalizeSlice(c.GetResourceHrefFilter()) + c.DeviceIdFilter = strings.Unique(c.GetDeviceIdFilter()) + c.ResourceTypeFilter = strings.Unique(c.GetResourceTypeFilter()) + c.ResourceHrefFilter = strings.Unique(c.GetResourceHrefFilter()) return nil } @@ -161,6 +157,6 @@ func ValidateAndNormalizeConditionsQuery(q *GetLatestConditionsQuery) error { if q.DeviceID == "" && q.ResourceHref == "" && len(q.ResourceTypeFilter) == 0 { return errInvalidArgument(errors.New("at least one condition filter must be set")) } - q.ResourceTypeFilter = NormalizeSlice(q.ResourceTypeFilter) + q.ResourceTypeFilter = strings.Unique(q.ResourceTypeFilter) return nil } diff --git a/snippet-service/store/mongodb/condition.go b/snippet-service/store/mongodb/condition.go index 558091bbe..0b5af2274 100644 --- a/snippet-service/store/mongodb/condition.go +++ b/snippet-service/store/mongodb/condition.go @@ -2,11 +2,11 @@ package mongodb import ( "context" - "slices" "time" "github.com/google/uuid" "github.com/hashicorp/go-multierror" + "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/snippet-service/pb" "github.com/plgd-dev/hub/v2/snippet-service/store" "go.mongodb.org/mongo-driver/bson" @@ -14,28 +14,6 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -/* -Condition -> podmienka za akych okolnosti sa aplikuje - - id (identifikator) (user nevie menit) - - verzia (pri update sa verzia inkremente) - - pri ukladani checknem v DB ze predchadzajuca verzia je o 1 mensie - - t.j. check nenastala mi medzi tym zmena - - name - user-friendly meno - - enabled - - configuration id - - device_id_filter - OR - pride event a chcecknem ci device_id_filter obsahuje ID device - - ak je prazdny tak vsetko pustit - - resource_type_filter - AND - ked mam viac musia sa vsetky matchnut - - ak je prazdny tak vsetko pustit - - resource_href_filer - OR - musim matchnut aspon jeden href - - ak je prazdny tak vsetko pustit- resource_href_filter - - jq_expression - expression pustim nad obsahom ResourceChanged eventom, mal by vratit true/false (ci hodnota existuje) - - dalsia podmienka - https://github.com/itchyny/gojq - - api_access_token - TODO, zatial nechame otvorene; ked sa ide aplikovat konfiguraciu tak potrebujes token na autorizaciu - - owner -> musi sediet s tym co je v DB -*/ - func (s *Store) InsertConditions(ctx context.Context, conds ...*store.Condition) error { documents := make([]interface{}, 0, len(conds)) for _, cond := range conds { @@ -189,14 +167,9 @@ func (s *Store) getConditionsByAggregation(ctx context.Context, owner, id string return processCursor(ctx, cur, p) } -func normalizeSlice(s []string) []string { - slices.Sort(s) - return slices.Compact(s) -} - func (s *Store) GetConditions(ctx context.Context, owner string, query *pb.GetConditionsRequest, p store.Process[store.Condition]) error { vf := pb.PartitionIDFilter(query.GetIdFilter()) - confIdLatestFilter := normalizeSlice(query.GetConfigurationIdFilter()) + confIdLatestFilter := strings.Unique(query.GetConfigurationIdFilter()) var errors *multierror.Error if len(vf.All) > 0 || vf.IsEmpty() && len(confIdLatestFilter) == 0 { err := s.getConditionsByID(ctx, owner, vf.All, p) @@ -219,6 +192,11 @@ func (s *Store) DeleteConditions(ctx context.Context, owner string, query *pb.De return s.delete(ctx, conditionsCol, owner, query.GetIdFilter()) } +func toLatestEnabledQueryFilter() bson.D { + key := store.LatestKey + "." + store.EnabledKey + return bson.D{{Key: key, Value: true}} +} + func toLatestDeviceIDQueryFilter(deviceID string) bson.M { key := store.LatestKey + "." + store.DeviceIDFilterKey return bson.M{"$or": bson.A{ @@ -235,8 +213,17 @@ func toLatestResourceHrefQueryFilter(resourceHref string) bson.M { }} } +func toLatestResouceTypeQueryFilter(resourceTypeFilter []string) bson.M { + key := store.LatestKey + "." + store.ResourceTypeFilterKey + return bson.M{"$or": bson.A{ + bson.M{key: bson.M{"$exists": false}}, + bson.M{key: bson.M{"$all": resourceTypeFilter}}, + }} +} + func toLatestConditionsQueryFilter(owner string, queries *store.GetLatestConditionsQuery) interface{} { - filter := make([]interface{}, 0, 4) + filter := make([]interface{}, 0, 5) + filter = append(filter, toLatestEnabledQueryFilter()) if owner != "" { filter = append(filter, bson.D{{Key: store.OwnerKey, Value: owner}}) } @@ -246,9 +233,9 @@ func toLatestConditionsQueryFilter(owner string, queries *store.GetLatestConditi if queries.ResourceHref != "" { filter = append(filter, toLatestResourceHrefQueryFilter(queries.ResourceHref)) } - // if len(queries.ResourceTypeFilter) > 0 { - // filter = append(filter, toResouceTypeQueryFilter(queries.ResourceTypeFilter)) - // } + if len(queries.ResourceTypeFilter) > 0 { + filter = append(filter, toLatestResouceTypeQueryFilter(queries.ResourceTypeFilter)) + } if len(filter) == 0 { return bson.D{} } diff --git a/snippet-service/store/mongodb/getLatestConditions_test.go b/snippet-service/store/mongodb/getLatestConditions_test.go index fad0612e6..3d532d1a1 100644 --- a/snippet-service/store/mongodb/getLatestConditions_test.go +++ b/snippet-service/store/mongodb/getLatestConditions_test.go @@ -12,11 +12,11 @@ import ( "github.com/stretchr/testify/require" ) -func TestStoreFindConditions(t *testing.T) { +func TestStoreGetLatestConditions(t *testing.T) { s, cleanUpStore := test.NewMongoStore(t) defer cleanUpStore() - ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT*100) + ctx, cancel := context.WithTimeout(context.Background(), config.TEST_TIMEOUT) defer cancel() const deviceID1 = "deviceID1" @@ -35,6 +35,7 @@ func TestStoreFindConditions(t *testing.T) { cond1In := &pb.Condition{ Id: uuid.NewString(), Name: "c1", + Enabled: true, ConfigurationId: uuid.NewString(), Owner: owner1, } @@ -44,6 +45,7 @@ func TestStoreFindConditions(t *testing.T) { cond2In := &pb.Condition{ Id: uuid.NewString(), Name: "c2", + Enabled: true, ConfigurationId: uuid.NewString(), DeviceIdFilter: []string{deviceID1}, ResourceHrefFilter: []string{href1, href2, href3}, @@ -56,6 +58,7 @@ func TestStoreFindConditions(t *testing.T) { cond3In := &pb.Condition{ Id: uuid.NewString(), Name: "c3", + Enabled: true, ConfigurationId: uuid.NewString(), DeviceIdFilter: []string{deviceID2}, ResourceHrefFilter: []string{href3, href4, href5}, @@ -68,6 +71,7 @@ func TestStoreFindConditions(t *testing.T) { cond4In := &pb.Condition{ Id: uuid.NewString(), Name: "c4", + Enabled: true, ConfigurationId: uuid.NewString(), DeviceIdFilter: []string{deviceID1, deviceID3}, ResourceHrefFilter: []string{href1, href5}, @@ -80,6 +84,7 @@ func TestStoreFindConditions(t *testing.T) { cond5In := &pb.Condition{ Id: uuid.NewString(), Name: "c5", + Enabled: true, ConfigurationId: uuid.NewString(), DeviceIdFilter: []string{deviceID3}, ResourceHrefFilter: []string{href1, href2}, @@ -90,9 +95,10 @@ func TestStoreFindConditions(t *testing.T) { require.NoError(t, err) cond6In := &pb.Condition{ - Id: uuid.New().String(), + Id: uuid.NewString(), Name: "c6", - ConfigurationId: uuid.New().String(), + Enabled: true, + ConfigurationId: uuid.NewString(), DeviceIdFilter: []string{deviceID2, deviceID3}, ResourceHrefFilter: []string{href2, href3, href4}, ResourceTypeFilter: []string{type1, type2, type3}, @@ -101,6 +107,19 @@ func TestStoreFindConditions(t *testing.T) { cond6, err := s.CreateCondition(ctx, cond6In) require.NoError(t, err) + cond7In := &pb.Condition{ + Id: uuid.NewString(), + Name: "c7 - disabled", + Enabled: false, + ConfigurationId: uuid.NewString(), + DeviceIdFilter: []string{deviceID2, deviceID3}, + ResourceHrefFilter: []string{href2, href3, href4}, + ResourceTypeFilter: []string{type1, type2, type3}, + Owner: owner2, + } + _, err = s.CreateCondition(ctx, cond7In) + require.NoError(t, err) + type args struct { query *store.GetLatestConditionsQuery owner string @@ -207,47 +226,47 @@ func TestStoreFindConditions(t *testing.T) { }, want: []*pb.Condition{cond5, cond6}, }, - // { - // name: "[type2]", - // args: args{ - // query: &store.ConditionsQuery{ - // ResourceTypeFilter: []string{type2}, - // }, - // }, - // want: pb.Conditions{cond1, cond5}, - // }, - // { - // name: "deviceID2/[type3]", - // args: args{ - // query: &store.ConditionsQuery{ - // DeviceID: deviceID2, - // ResourceTypeFilter: []string{type3}, - // }, - // }, - // want: pb.Conditions{cond1, cond3}, - // }, - // { - // name: "owner2/[type1,type2,type3}", - // args: args{ - // query: &store.ConditionsQuery{ - // // order should not matter - // ResourceTypeFilter: []string{type2, type1, type3}, - // }, - // owner: owner2, - // }, - // want: pb.Conditions{cond6}, - // }, - // { - // name: "deviceID1/href5/[type3]", - // args: args{ - // query: &store.ConditionsQuery{ - // DeviceID: deviceID1, - // ResourceHref: href5, - // ResourceTypeFilter: []string{type3}, - // }, - // }, - // want: pb.Conditions{cond1, cond4}, - // }, + { + name: "[type2]", + args: args{ + query: &store.GetLatestConditionsQuery{ + ResourceTypeFilter: []string{type2}, + }, + }, + want: []*pb.Condition{cond1, cond2, cond5, cond6}, + }, + { + name: "deviceID2/[type3]", + args: args{ + query: &store.GetLatestConditionsQuery{ + DeviceID: deviceID2, + ResourceTypeFilter: []string{type3}, + }, + }, + want: []*pb.Condition{cond1, cond3, cond6}, + }, + { + name: "owner2/[type1,type2,type3}", + args: args{ + query: &store.GetLatestConditionsQuery{ + // order should not matter + ResourceTypeFilter: []string{type2, type1, type3}, + }, + owner: owner2, + }, + want: []*pb.Condition{cond6}, + }, + { + name: "deviceID1/href5/[type3]", + args: args{ + query: &store.GetLatestConditionsQuery{ + DeviceID: deviceID1, + ResourceHref: href5, + ResourceTypeFilter: []string{type3}, + }, + }, + want: []*pb.Condition{cond1, cond4}, + }, } for _, tt := range tests { diff --git a/snippet-service/test/condition.go b/snippet-service/test/condition.go index 3019b38b5..cf432ced1 100644 --- a/snippet-service/test/condition.go +++ b/snippet-service/test/condition.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" pkgGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" + "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/snippet-service/pb" "github.com/plgd-dev/hub/v2/snippet-service/store" "github.com/stretchr/testify/require" @@ -35,15 +36,15 @@ func stringSlice(prefix string, start, n int) []string { } func ConditionDeviceIdFilter(start, n int) []string { - return store.NormalizeSlice(stringSlice("device", start, n)) + return strings.Unique(stringSlice("device", start, n)) } func ConditionResourceTypeFilter(start, n int) []string { - return store.NormalizeSlice(stringSlice("rt", start, n)) + return strings.Unique(stringSlice("rt", start, n)) } func ConditionResourceHrefFilter(start, n int) []string { - return store.NormalizeSlice(stringSlice("/href/", start, n)) + return strings.Unique(stringSlice("/href/", start, n)) } func ConditionJqExpressionFilter(i int) string { diff --git a/snippet-service/test/service.go b/snippet-service/test/service.go index 84da9b007..bca8a9cfb 100644 --- a/snippet-service/test/service.go +++ b/snippet-service/test/service.go @@ -24,9 +24,12 @@ func HTTPURI(uri string) string { } func MakeHTTPConfig() service.HTTPConfig { + tls := config.MakeTLSServerConfig() + tls.ClientCertificateRequired = false return service.HTTPConfig{ Addr: config.SNIPPET_SERVICE_HTTP_HOST, Server: config.MakeHttpServerConfig(), + TLS: tls, Authorization: config.MakeAuthorizationConfig(), } } @@ -45,6 +48,9 @@ func MakeClientsConfig() service.ClientsConfig { Storage: MakeStorageConfig(), OpenTelemetryCollector: config.MakeOpenTelemetryCollectorClient(), NATS: config.MakeSubscriberConfig(), + ResourceAggregate: service.ResourceAggregateConfig{ + Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST), + }, } } diff --git a/test/test.go b/test/test.go index 605679d74..fad511dee 100644 --- a/test/test.go +++ b/test/test.go @@ -7,6 +7,7 @@ import ( "net" "os" "os/exec" + "slices" "sort" "strings" "testing" @@ -34,7 +35,6 @@ import ( "github.com/plgd-dev/hub/v2/grpc-gateway/client" "github.com/plgd-dev/hub/v2/grpc-gateway/pb" isEvents "github.com/plgd-dev/hub/v2/identity-store/events" - pkgStrings "github.com/plgd-dev/hub/v2/pkg/strings" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" "github.com/plgd-dev/hub/v2/resource-aggregate/events" "github.com/plgd-dev/hub/v2/test/config" @@ -941,7 +941,7 @@ func DeviceIsBatchObservable(ctx context.Context, t *testing.T, deviceID string) require.NoError(t, err) require.Len(t, links, 1) return links[0].Policy.BitMask.Has(schema.Observable) && - pkgStrings.Contains(links[0].Interfaces, interfaces.OC_IF_B) + slices.Contains(links[0].Interfaces, interfaces.OC_IF_B) } func GetAllBackendResourceLinks() schema.ResourceLinks {