diff --git a/cmd/ucpd/ucp-self-hosted-dev.yaml b/cmd/ucpd/ucp-self-hosted-dev.yaml index f5311cbee1..d69c202251 100644 --- a/cmd/ucpd/ucp-self-hosted-dev.yaml +++ b/cmd/ucpd/ucp-self-hosted-dev.yaml @@ -9,6 +9,7 @@ # - Talk to Portable Resources' Providers on port 8081 # - Disables metrics and profiler # +location: 'global' storageProvider: provider: "apiserver" apiserver: diff --git a/deploy/Chart/templates/ucp/configmaps.yaml b/deploy/Chart/templates/ucp/configmaps.yaml index 96b012f2b7..18fe1d60e1 100644 --- a/deploy/Chart/templates/ucp/configmaps.yaml +++ b/deploy/Chart/templates/ucp/configmaps.yaml @@ -10,6 +10,7 @@ data: ucp-config.yaml: |- # Radius configuration file. # See https://github.com/radius-project/radius/blob/main/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md for more information. + location: 'global' storageProvider: provider: "apiserver" apiserver: diff --git a/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md b/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md index 307e0e09b0..d6245a508a 100644 --- a/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md +++ b/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md @@ -218,6 +218,7 @@ ucp: ### UCP ```yaml +location: 'global' storageProvider: provider: "apiserver" apiserver: diff --git a/pkg/armrpc/asyncoperation/worker/worker.go b/pkg/armrpc/asyncoperation/worker/worker.go index 36486b73b3..20640b57d7 100644 --- a/pkg/armrpc/asyncoperation/worker/worker.go +++ b/pkg/armrpc/asyncoperation/worker/worker.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "runtime/debug" "strings" "time" @@ -252,6 +251,14 @@ func (w *AsyncRequestProcessWorker) runOperation(ctx context.Context, message *q logger.Info("Start processing operation.") result, err := asyncCtrl.Run(asyncReqCtx, asyncReq) + + code := "" + if result.Error != nil { + code = result.Error.Code + } + + logger.Info("Operation returned", "success", result.Error == nil, "code", code, "provisioningState", result.ProvisioningState(), "err", err) + // There are two cases when asyncReqCtx is canceled. // 1. When the operation is timed out, w.completeOperation will be called in L186 // 2. When parent context is canceled or done, we need to requeue the operation to reprocess the request. @@ -262,6 +269,7 @@ func (w *AsyncRequestProcessWorker) runOperation(ctx context.Context, message *q result.SetFailed(armErr, false) logger.Error(err, "Operation Failed") } + w.completeOperation(ctx, message, result, asyncCtrl.StorageClient()) } trace.SetAsyncResultStatus(result, span) @@ -347,10 +355,10 @@ func (w *AsyncRequestProcessWorker) updateResourceAndOperationStatus(ctx context return err } - opType, _ := v1.ParseOperationType(req.OperationType) - err = updateResourceState(ctx, sc, rID.String(), state) - if err != nil && !(opType.Method == http.MethodDelete && errors.Is(&store.ErrNotFound{ID: rID.String()}, err)) { + if errors.Is(err, &store.ErrNotFound{}) { + logger.Info("failed to update the provisioningState in resource because it no longer exists.") + } else if err != nil { logger.Error(err, "failed to update the provisioningState in resource.") return err } diff --git a/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess.go b/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess.go new file mode 100644 index 0000000000..5623d88518 --- /dev/null +++ b/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess.go @@ -0,0 +1,94 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcegroups + +import ( + "context" + "errors" + "fmt" + "net/http" + + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/frontend/controller/resourcegroups" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/trackedresource" + "github.com/radius-project/radius/pkg/ucp/ucplog" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +var _ ctrl.Controller = (*TrackedResourceProcessController)(nil) + +type updater interface { + Update(ctx context.Context, downstreamURL string, originalID resources.ID, version string) error +} + +// TrackedResourceProcessController is the async operation controller to perform background processing on tracked resources. +type TrackedResourceProcessController struct { + ctrl.BaseController + + // Updater is the utility struct that can perform updates on tracked resources. This can be modified for testing. + updater updater +} + +// NewTrackedResourceProcessController creates a new TrackedResourceProcessController controller which is used to process resources asynchronously. +func NewTrackedResourceProcessController(opts ctrl.Options) (ctrl.Controller, error) { + transport := otelhttp.NewTransport(http.DefaultTransport) + return &TrackedResourceProcessController{ctrl.NewBaseAsyncController(opts), trackedresource.NewUpdater(opts.StorageClient, &http.Client{Transport: transport})}, nil +} + +// Run retrieves a resource from storage, parses the resource ID, and updates our tracked resource entry in the background. +func (c *TrackedResourceProcessController) Run(ctx context.Context, request *ctrl.Request) (ctrl.Result, error) { + resource, err := store.GetResource[datamodel.GenericResource](ctx, c.StorageClient(), request.ResourceID) + if errors.Is(err, &store.ErrNotFound{}) { + return ctrl.NewFailedResult(v1.ErrorDetails{Code: v1.CodeNotFound, Message: fmt.Sprintf("resource %q not found", request.ResourceID), Target: request.ResourceID}), nil + } else if err != nil { + return ctrl.Result{}, err + } + + originalID, err := resources.Parse(resource.Properties.ID) + if err != nil { + return ctrl.Result{}, err + } + + downstreamURL, err := resourcegroups.ValidateDownstream(ctx, c.StorageClient(), originalID) + if errors.Is(err, &resourcegroups.NotFoundError{}) { + return ctrl.NewFailedResult(v1.ErrorDetails{Code: v1.CodeNotFound, Message: err.Error(), Target: request.ResourceID}), nil + } else if errors.Is(err, &resourcegroups.InvalidError{}) { + return ctrl.NewFailedResult(v1.ErrorDetails{Code: v1.CodeInvalid, Message: err.Error(), Target: request.ResourceID}), nil + } else if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to validate downstream: %w", err) + } + + logger := ucplog.FromContextOrDiscard(ctx) + logger.Info("Processing tracked resource", "resourceID", originalID) + err = c.updater.Update(ctx, downstreamURL.String(), originalID, resource.Properties.APIVersion) + if errors.Is(err, &trackedresource.InProgressErr{}) { + // The resource is still being processed, so we can sleep for a while. + result := ctrl.Result{} + result.SetFailed(v1.ErrorDetails{Code: v1.CodeConflict, Message: err.Error(), Target: request.ResourceID}, true) + + return result, nil + } else if err != nil { + return ctrl.Result{}, err + } + + logger.Info("Completed processing tracked resource", "resourceID", originalID) + return ctrl.Result{}, nil +} diff --git a/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess_test.go b/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess_test.go new file mode 100644 index 0000000000..6ab1ad7cea --- /dev/null +++ b/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess_test.go @@ -0,0 +1,180 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcegroups + +import ( + "context" + "fmt" + "testing" + + "github.com/golang/mock/gomock" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" + "github.com/radius-project/radius/pkg/to" + "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/trackedresource" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/require" +) + +func Test_Run(t *testing.T) { + setup := func(t *testing.T) (*TrackedResourceProcessController, *mockUpdater, *store.MockStorageClient) { + ctrl := gomock.NewController(t) + storageClient := store.NewMockStorageClient(ctrl) + + pc, err := NewTrackedResourceProcessController(controller.Options{StorageClient: storageClient}) + require.NoError(t, err) + + updater := mockUpdater{} + pc.(*TrackedResourceProcessController).updater = &updater + return pc.(*TrackedResourceProcessController), &updater, storageClient + } + + id := resources.MustParse("/planes/test/local/resourceGroups/test-rg/providers/Applications.Test/testResources/my-resource") + trackingID := trackedresource.IDFor(id) + + plane := datamodel.Plane{ + Properties: datamodel.PlaneProperties{ + Kind: datamodel.PlaneKind(v20231001preview.PlaneKindUCPNative), + ResourceProviders: map[string]*string{ + "Applications.Test": to.Ptr("https://localhost:1234"), + }, + }, + } + resourceGroup := datamodel.ResourceGroup{} + data := datamodel.GenericResourceFromID(id, trackingID) + + // Most of the heavy lifting is done by the updater. We just need to test that we're calling it correctly. + t.Run("Success", func(t *testing.T) { + pc, _, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(&store.Object{Data: data}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+trackingID.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, controller.Result{}, result) + require.NoError(t, err) + }) + + t.Run("retry", func(t *testing.T) { + pc, updater, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(&store.Object{Data: data}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+trackingID.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + // Force a retry. + updater.Result = &trackedresource.InProgressErr{} + + expected := controller.Result{} + expected.SetFailed(v1.ErrorDetails{Code: v1.CodeConflict, Message: updater.Result.Error(), Target: trackingID.String()}, true) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, expected, result) + require.NoError(t, err) + }) + + t.Run("Failure (resource not found)", func(t *testing.T) { + pc, _, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(nil, &store.ErrNotFound{}).Times(1) + + expected := controller.NewFailedResult(v1.ErrorDetails{ + Code: v1.CodeNotFound, + Message: fmt.Sprintf("resource %q not found", trackingID.String()), + Target: trackingID.String(), + }) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, expected, result) + require.NoError(t, err) + }) + + t.Run("Failure (validate downstream: not found)", func(t *testing.T) { + pc, _, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(&store.Object{Data: data}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+trackingID.PlaneNamespace(), gomock.Any()). + Return(nil, &store.ErrNotFound{}).Times(1) + + expected := controller.NewFailedResult(v1.ErrorDetails{ + Code: v1.CodeNotFound, + Message: "plane \"/planes/test/local\" not found", + Target: trackingID.String(), + }) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, expected, result) + require.NoError(t, err) + }) + + t.Run("Failure (validate downstream: invalid downstream)", func(t *testing.T) { + pc, _, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(&store.Object{Data: data}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+trackingID.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: datamodel.Plane{}}, nil).Times(1) + + expected := controller.NewFailedResult(v1.ErrorDetails{ + Code: v1.CodeInvalid, + Message: "unexpected plane type ", + Target: trackingID.String(), + }) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, expected, result) + require.NoError(t, err) + }) +} + +type mockUpdater struct { + Result error +} + +func (u *mockUpdater) Update(ctx context.Context, downstreamURL string, originalID resources.ID, version string) error { + return u.Result +} diff --git a/pkg/ucp/backend/service.go b/pkg/ucp/backend/service.go index ad3039e447..66c902fac7 100644 --- a/pkg/ucp/backend/service.go +++ b/pkg/ucp/backend/service.go @@ -20,12 +20,17 @@ import ( "context" "fmt" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" "github.com/radius-project/radius/pkg/armrpc/asyncoperation/worker" "github.com/radius-project/radius/pkg/armrpc/hostoptions" + "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/backend/controller/resourcegroups" + "github.com/radius-project/radius/pkg/ucp/datamodel" ) const ( - UCPProviderName = "ucp" + UCPProviderName = "System.Resources" ) // Service is a service to run AsyncReqeustProcessWorker. @@ -65,5 +70,24 @@ func (w *Service) Run(ctx context.Context) error { } } + opts := ctrl.Options{ + DataProvider: w.StorageProvider, + } + + err := RegisterControllers(ctx, w.Controllers, opts) + if err != nil { + return err + } + return w.Start(ctx, workerOpts) } + +// RegisterControllers registers the controllers for the UCP backend. +func RegisterControllers(ctx context.Context, registry *worker.ControllerRegistry, opts ctrl.Options) error { + err := registry.Register(ctx, v20231001preview.ResourceType, v1.OperationMethod(datamodel.OperationProcess), resourcegroups.NewTrackedResourceProcessController, opts) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/ucp/datamodel/genericresource.go b/pkg/ucp/datamodel/genericresource.go index f80d60a3b2..f561569a54 100644 --- a/pkg/ucp/datamodel/genericresource.go +++ b/pkg/ucp/datamodel/genericresource.go @@ -16,7 +16,17 @@ limitations under the License. package datamodel -import v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" +import ( + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/ucp/resources" +) + +const ( + // OperationProcess is the operation type for processing a tracked resource. + OperationProcess = "PROCESS" + // ResourceType is the resource type for a generic resource. + ResourceType = "System.Resources/resources" +) // GenericResource represents a stored "tracked resource" within a UCP resource group. // @@ -37,7 +47,7 @@ type GenericResource struct { // ResourceTypeName gives the type of ucp resource. func (r *GenericResource) ResourceTypeName() string { - return "System.Resources/resources" + return ResourceType } // GenericResourceProperties stores the properties of the resource being tracked. @@ -52,4 +62,30 @@ type GenericResourceProperties struct { Name string `json:"name"` // Type is the resource type. Type string `json:"type"` + + // APIVersion is the version of the API that can be used to query the resource. + APIVersion string `json:"apiVersion"` + + // OperationID is the last operation that updated this entry. This is used when an operation + // is enqueued as a way to force a different Etag to be returned. This data doesn't need to be + // read or used, it's just acting as a "salt" for the Etag. + OperationID string `json:"operationId"` +} + +// GenericResourceFromID creates a new GenericResource from the given original resource ID and tracking ID. +func GenericResourceFromID(originalID resources.ID, trackingID resources.ID) *GenericResource { + return &GenericResource{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: trackingID.String(), + Type: trackingID.Type(), + Name: trackingID.Name(), + }, + }, + Properties: GenericResourceProperties{ + ID: originalID.String(), + Name: originalID.Name(), + Type: originalID.Type(), + }, + } } diff --git a/pkg/ucp/datamodel/genericresource_test.go b/pkg/ucp/datamodel/genericresource_test.go new file mode 100644 index 0000000000..e1e0054a38 --- /dev/null +++ b/pkg/ucp/datamodel/genericresource_test.go @@ -0,0 +1,37 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamodel + +import ( + "testing" + + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/stretchr/testify/require" +) + +func Test_GenericResourceFromID(t *testing.T) { + id := resources.MustParse("/planes/test/local/resourceGroups/test-rg/providers/Applications.Test/testResources/my-resource") + trackingID := resources.MustParse("/planes/test/local/resourceGroups/test-rg/providers/System.Resources/genericResources/asdf") + + actual := GenericResourceFromID(id, trackingID) + require.Equal(t, trackingID.String(), actual.ID) + require.Equal(t, trackingID.Type(), actual.Type) + require.Equal(t, trackingID.Name(), actual.Name) + require.Equal(t, id.String(), actual.Properties.ID) + require.Equal(t, id.Type(), actual.Properties.Type) + require.Equal(t, id.Name(), actual.Properties.Name) +} diff --git a/pkg/ucp/frontend/api/server.go b/pkg/ucp/frontend/api/server.go index 78f88b04a6..8222a316b1 100644 --- a/pkg/ucp/frontend/api/server.go +++ b/pkg/ucp/frontend/api/server.go @@ -25,6 +25,7 @@ import ( "net/http" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" armrpc_controller "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/frontend/defaultoperation" "github.com/radius-project/radius/pkg/armrpc/servicecontext" @@ -113,7 +114,6 @@ func (s *Service) Name() string { // registers the routes, configures the default planes, and sets up the http server with the appropriate middleware. It // returns an http server and an error if one occurs. func (s *Service) Initialize(ctx context.Context) (*http.Server, error) { - var err error r := chi.NewRouter() s.storageProvider = dataprovider.NewStorageProvider(s.options.StorageProviderOptions) @@ -125,6 +125,13 @@ func (s *Service) Initialize(ctx context.Context) (*http.Server, error) { return nil, err } + queueClient, err := s.queueProvider.GetClient(ctx) + if err != nil { + return nil, err + } + + statusManager := statusmanager.New(s.storageProvider, queueClient, s.options.Location) + moduleOptions := modules.Options{ Address: s.options.Address, PathBase: s.options.PathBase, @@ -134,6 +141,7 @@ func (s *Service) Initialize(ctx context.Context) (*http.Server, error) { QueueProvider: s.queueProvider, SecretProvider: s.secretProvider, SpecLoader: specLoader, + StatusManager: statusManager, UCPConnection: s.options.UCPConnection, } diff --git a/pkg/ucp/frontend/controller/radius/proxy.go b/pkg/ucp/frontend/controller/radius/proxy.go new file mode 100644 index 0000000000..a11bd92c04 --- /dev/null +++ b/pkg/ucp/frontend/controller/radius/proxy.go @@ -0,0 +1,323 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package radius + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" + armrpc_controller "github.com/radius-project/radius/pkg/armrpc/frontend/controller" + armrpc_rest "github.com/radius-project/radius/pkg/armrpc/rest" + "github.com/radius-project/radius/pkg/middleware" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/frontend/controller/resourcegroups" + "github.com/radius-project/radius/pkg/ucp/proxy" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/trackedresource" + "github.com/radius-project/radius/pkg/ucp/ucplog" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +const ( + PlanesPath = "/planes" + + // ProcessOperationTimeout is the timeout for processing a tracked resource in the background. + ProcessOperationTimeout = 12 * time.Hour + + // ProcessOperationRetryAfter is the retry interval for processing a tracked resource in the background. + // This is used when the tracked resource is not in a terminal state. + ProcessOperationRetryAfter = 5 * time.Second + + // EnqueueOperationRetryCount is the number of times to retry enqueueing an async operation before giving up. + EnqueueOperationRetryCount = 10 +) + +type updater interface { + Update(ctx context.Context, downstream string, id resources.ID, version string) error +} + +var _ armrpc_controller.Controller = (*ProxyController)(nil) + +// ProxyController is the controller implementation to proxy requests to appropriate RP or URL. +type ProxyController struct { + armrpc_controller.Operation[*datamodel.Plane, datamodel.Plane] + + // transport is the http.RoundTripper to use for proxying requests. Can be overridden for testing. + transport http.RoundTripper + + // updater is used to process tracked resources. Can be overridden for testing. + updater updater +} + +// # Function Explanation +// +// NewProxyController creates a new ProxyPlane controller with the given options and returns it, or returns an error if the +// controller cannot be created. +func NewProxyController(opts armrpc_controller.Options) (armrpc_controller.Controller, error) { + transport := otelhttp.NewTransport(http.DefaultTransport) + updater := trackedresource.NewUpdater(opts.StorageClient, &http.Client{Transport: transport}) + return &ProxyController{ + Operation: armrpc_controller.NewOperation(opts, armrpc_controller.ResourceOptions[datamodel.Plane]{}), + transport: transport, + updater: updater, + }, nil +} + +// # Function Explanation +// +// Run() takes in a request object and context, looks up the plane and resource provider associated with the +// request, and proxies the request to the appropriate resource provider. +func (p *ProxyController) Run(ctx context.Context, w http.ResponseWriter, req *http.Request) (armrpc_rest.Response, error) { + logger := ucplog.FromContextOrDiscard(ctx) + + logger.V(ucplog.LevelDebug).Info("starting proxy request") + for key, value := range req.Header { + logger.V(ucplog.LevelDebug).Info("incoming request header", "key", key, "value", value) + } + + // NOTE: avoid using the request URL directly as the casing may have been normalized. + // use the original URL instead. + requestCtx := v1.ARMRequestContextFromContext(ctx) + id := requestCtx.ResourceID + relativePath := middleware.GetRelativePath(p.Options().PathBase, requestCtx.OrignalURL.Path) + + downstreamURL, err := resourcegroups.ValidateDownstream(ctx, p.StorageClient(), id) + if errors.Is(err, &resourcegroups.NotFoundError{}) { + return armrpc_rest.NewNotFoundResponse(id), nil + } else if errors.Is(err, &resourcegroups.InvalidError{}) { + response := v1.ErrorResponse{Error: v1.ErrorDetails{Code: v1.CodeInvalid, Message: err.Error(), Target: id.String()}} + return armrpc_rest.NewBadRequestARMResponse(response), nil + } else if err != nil { + return nil, fmt.Errorf("failed to validate downstream: %w", err) + } + + proxyReq, err := p.PrepareProxyRequest(ctx, req, downstreamURL.String(), relativePath) + if err != nil { + return nil, err + } + + interceptor := &responseInterceptor{Inner: p.transport} + + sender := proxy.NewARMProxy(proxy.ReverseProxyOptions{RoundTripper: interceptor}, downstreamURL, nil) + sender.ServeHTTP(w, proxyReq) + + if interceptor.Response == nil { + logger.V(ucplog.LevelDebug).Error(err, "failed to proxy request") + return nil, nil + } + + // If we get here then we've successfully proxied the request. Now we interpret the response. + logger.V(ucplog.LevelDebug).Info("finished proxy request", "http.statuscode", interceptor.Response.StatusCode) + for key, value := range req.Header { + logger.V(ucplog.LevelDebug).Info("outgoing response header", "key", key, "value", value) + } + + if !p.ShouldTrackRequest(req.Method, id, interceptor.Response) { + logger.V(ucplog.LevelDebug).Info("request does not need to be tracked") + return nil, nil + } + + if p.IsTerminalResponse(interceptor.Response) { + logger.V(ucplog.LevelDebug).Info("response is terminal, updating tracked resource synchronously") + err = p.UpdateTrackedResource(ctx, downstreamURL.String(), id, requestCtx.APIVersion) + if errors.Is(err, &trackedresource.InProgressErr{}) { + logger.V(ucplog.LevelDebug).Info("synchronous update failed, updating tracked resource asynchronously") + // Continue executing + } else if err != nil { + // We can't return the response to the client if we failed to update the tracked resource. Instead + // fallback to the async path. + logger.Error(err, "failed to update tracked resource synchronously") + // Continue executing + } else { + logger.V(ucplog.LevelDebug).Info("tracked resource updated synchronously") + return nil, nil + } + } else { + logger.V(ucplog.LevelDebug).Info("response is not terminal, updating tracked resource asynchronously") + } + + // If we get here then we need to update the tracked resource, but the operation is not yet complete. + err = p.EnqueueTrackedResourceUpdate(ctx, id, requestCtx.APIVersion) + if err != nil { + logger.Error(err, "failed to enqueue tracked resource update") + return nil, nil + } + + return nil, nil +} + +// PrepareProxyRequest constructs and initializes the proxy request. +func (p *ProxyController) PrepareProxyRequest(ctx context.Context, originalReq *http.Request, downstream string, relativePath string) (*http.Request, error) { + proxyReq := originalReq.Clone(ctx) + requestURL, err := url.Parse(downstream) + if err != nil { + return nil, fmt.Errorf("failed to parse downstream URL: %w", err) + } + proxyReq.URL = requestURL + proxyReq.URL.Path = relativePath + proxyReq.URL.RawQuery = originalReq.URL.RawQuery + + refererURL := url.URL{ + Scheme: "http", + Host: originalReq.Host, + Path: originalReq.URL.Path, + RawQuery: originalReq.URL.RawQuery, + } + + // As per https://github.com/golang/go/issues/28940#issuecomment-441749380, the way to check + // for http vs https is check the TLS field + if originalReq.TLS != nil { + refererURL.Scheme = "https" + } + + proxyReq.Header.Set("X-Forwarded-Proto", refererURL.Scheme) + proxyReq.Header.Set(v1.RefererHeader, refererURL.String()) + + return proxyReq, nil +} + +// ShouldTrackRequest returns true if the request should be tracked. +func (p *ProxyController) ShouldTrackRequest(httpMethod string, id resources.ID, resp *http.Response) bool { + // Only track mutating requests. + if !strings.EqualFold(httpMethod, http.MethodPut) && !strings.EqualFold(httpMethod, http.MethodPatch) && !strings.EqualFold(httpMethod, http.MethodDelete) { + return false + } + + // For now we just track top-level resources. + if len(id.TypeSegments()) != 1 || !id.IsResource() { + return false + } + + if resp.StatusCode < 200 && resp.StatusCode >= 300 { + return false // Not a success + } + + return true +} + +// IsTerminalResponse returns true if the response is terminal. +func (p *ProxyController) IsTerminalResponse(resp *http.Response) bool { + return resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusAccepted +} + +// UpdateTrackedResource updates the tracked resource synchronously. +func (p *ProxyController) UpdateTrackedResource(ctx context.Context, downstream string, id resources.ID, apiVersion string) error { + return p.updater.Update(ctx, downstream, id, apiVersion) +} + +// EnqueueTrackedResourceUpdate enqueues an async operation to update the tracked resource. +func (p *ProxyController) EnqueueTrackedResourceUpdate(ctx context.Context, id resources.ID, apiVersion string) error { + logger := ucplog.FromContextOrDiscard(ctx) + + trackingID := trackedresource.IDFor(id) + + // Create a serviceCtx for the operation that we're going to process on the resource. + serviceCtx := *v1.ARMRequestContextFromContext(ctx) + serviceCtx.ResourceID = trackingID + serviceCtx.OperationType = v1.OperationType{Type: trackingID.Type(), Method: datamodel.OperationProcess} + + // Create the database entry for the tracked resource. + // + // If a non-terminal response was returned from the RP then at this instant the resource exists, even if it is + // being deleted. + entry := datamodel.GenericResourceFromID(id, trackingID) + entry.Properties.APIVersion = apiVersion + entry.Properties.OperationID = serviceCtx.OperationID.String() + + // We need to update the tracked resource entry in the database using optimistic concurrency. This means that we + // need to read the existing entry, update it, and then write it back. If the write fails then we need to retry. + // + // This concurrency scheme ensures that the background process will "observe" the last state of the resource. + // + // Think of it like this, each time the resource is changing we poke the background process and say "hey, the + // resource is changing, you should check it out". The background process then reads the resource and updates the + // state. + queueOperation := false +retry: + for retryCount := 1; retryCount <= EnqueueOperationRetryCount; retryCount++ { + obj, err := p.StorageClient().Get(ctx, trackingID.String()) + if errors.Is(err, &store.ErrNotFound{}) { + // Safe to ignore. This means that the resource has not been tracked yet. + } else if err != nil { + return err + } + + etag := "" + if obj != nil { + etag = obj.ETag + err = obj.As(&entry) + if err != nil { + return err + } + } + + // Keep the existing provisioningState if possible. + if entry.InternalMetadata.AsyncProvisioningState == "" || entry.InternalMetadata.AsyncProvisioningState.IsTerminal() { + queueOperation = true + entry.InternalMetadata.AsyncProvisioningState = v1.ProvisioningStateAccepted + } + + logger.V(ucplog.LevelDebug).Info("enqueuing tracked resource update") + err = p.StorageClient().Save(ctx, &store.Object{Metadata: store.Metadata{ID: trackingID.String()}, Data: entry}, store.WithETag(etag)) + if errors.Is(err, &store.ErrConcurrency{}) { + // This means we hit a concurrency error saving the tracked resource entry. This means that the resource + // was updated in the background. We should retry. + logger.V(ucplog.LevelDebug).Info("enqueue tracked resource update failed due to concurrency error", "retryCount", retryCount) + continue + } else if err != nil { + return err + } + + break retry + } + + // Only queue an operation if necessary, eg: if we changed the provisioningState. + if !queueOperation { + return nil + } + + err := p.StatusManager().QueueAsyncOperation(ctx, &serviceCtx, statusmanager.QueueOperationOptions{OperationTimeout: ProcessOperationTimeout, RetryAfter: ProcessOperationRetryAfter}) + if err != nil { + return err + } + + return nil +} + +// responseInterceptor is a http.RoundTripper that records the response and error from the inner http.RoundTripper. +// +// This type is NOT thread-safe and should be created and used per-request. +type responseInterceptor struct { + Inner http.RoundTripper + + Response *http.Response + Error error +} + +// RoundTrip implements http.RoundTripper by capturing the response and error. +func (i *responseInterceptor) RoundTrip(req *http.Request) (*http.Response, error) { + i.Response, i.Error = i.Inner.RoundTrip(req) + return i.Response, i.Error +} diff --git a/pkg/ucp/frontend/controller/radius/proxy_test.go b/pkg/ucp/frontend/controller/radius/proxy_test.go new file mode 100644 index 0000000000..69212994e7 --- /dev/null +++ b/pkg/ucp/frontend/controller/radius/proxy_test.go @@ -0,0 +1,357 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package radius + +import ( + "context" + "crypto/tls" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/golang/mock/gomock" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" + "github.com/radius-project/radius/pkg/armrpc/frontend/controller" + "github.com/radius-project/radius/pkg/armrpc/rest" + "github.com/radius-project/radius/pkg/to" + "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/trackedresource" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/require" +) + +// The Run function is also tested by integration tests in the pkg/ucp/integrationtests/radius package. + +func createController(t *testing.T) (*ProxyController, *store.MockStorageClient, *mockUpdater, *mockRoundTripper, *statusmanager.MockStatusManager) { + ctrl := gomock.NewController(t) + storageClient := store.NewMockStorageClient(ctrl) + statusManager := statusmanager.NewMockStatusManager(ctrl) + + p, err := NewProxyController(controller.Options{StorageClient: storageClient, StatusManager: statusManager}) + require.NoError(t, err) + + updater := mockUpdater{} + roundTripper := mockRoundTripper{} + + pc := p.(*ProxyController) + pc.updater = &updater + pc.transport = &roundTripper + + return pc, storageClient, &updater, &roundTripper, statusManager +} + +func Test_Run(t *testing.T) { + id := resources.MustParse("/planes/test/local/resourceGroups/test-rg/providers/Applications.Test/testResources/my-resource") + + plane := datamodel.Plane{ + Properties: datamodel.PlaneProperties{ + Kind: datamodel.PlaneKind(v20231001preview.PlaneKindUCPNative), + ResourceProviders: map[string]*string{ + "Applications.Test": to.Ptr("https://localhost:1234"), + }, + }, + } + resourceGroup := datamodel.ResourceGroup{} + + t.Run("success (non-tracked)", func(t *testing.T) { + p, storageClient, _, roundTripper, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + + // Not a mutating request + req := httptest.NewRequest(http.MethodGet, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), id.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + downstreamResponse := httptest.NewRecorder() + downstreamResponse.WriteHeader(http.StatusOK) + roundTripper.Response = downstreamResponse.Result() + + response, err := p.Run(ctx, w, req.WithContext(ctx)) + require.NoError(t, err) + require.Nil(t, response) + }) + + t.Run("success (tracked terminal response)", func(t *testing.T) { + p, storageClient, updater, roundTripper, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + + // Mutating request that will complete synchronously + req := httptest.NewRequest(http.MethodDelete, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), id.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + downstreamResponse := httptest.NewRecorder() + downstreamResponse.WriteHeader(http.StatusOK) + roundTripper.Response = downstreamResponse.Result() + + // Successful update + updater.Result = nil + + response, err := p.Run(ctx, w, req.WithContext(ctx)) + require.NoError(t, err) + require.Nil(t, response) + }) + + t.Run("success (fallback to async)", func(t *testing.T) { + p, storageClient, updater, roundTripper, statusManager := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + + // Mutating request that will complete synchronously + req := httptest.NewRequest(http.MethodDelete, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), id.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + // Tracking entry created + storageClient.EXPECT(). + Get(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, &store.ErrNotFound{}).Times(1) + storageClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil).Times(1) + + downstreamResponse := httptest.NewRecorder() + downstreamResponse.WriteHeader(http.StatusOK) + roundTripper.Response = downstreamResponse.Result() + + // Contended update, fallback to async + updater.Result = &trackedresource.InProgressErr{} + + statusManager.EXPECT(). + QueueAsyncOperation(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil).Times(1) + + response, err := p.Run(ctx, w, req.WithContext(ctx)) + require.NoError(t, err) + require.Nil(t, response) + }) + + t.Run("success (fallback to async without workitem)", func(t *testing.T) { + p, storageClient, updater, roundTripper, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + + // Mutating request that will complete asynchronously + req := httptest.NewRequest(http.MethodDelete, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), id.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + // Tracking entry created + existingEntry := &store.Object{ + Data: &datamodel.GenericResource{ + BaseResource: v1.BaseResource{ + InternalMetadata: v1.InternalMetadata{ + AsyncProvisioningState: v1.ProvisioningStateAccepted, + }, + }, + }, + } + storageClient.EXPECT(). + Get(gomock.Any(), gomock.Any(), gomock.Any()). + Return(existingEntry, nil).Times(1) + storageClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil).Times(1) + + downstreamResponse := httptest.NewRecorder() + downstreamResponse.WriteHeader(http.StatusAccepted) + roundTripper.Response = downstreamResponse.Result() + + // Contended update, fallback to async + updater.Result = &trackedresource.InProgressErr{} + + // No work item created, it was already in the queue. + + response, err := p.Run(ctx, w, req.WithContext(ctx)) + require.NoError(t, err) + require.Nil(t, response) + }) + + t.Run("failure (validate downstream: not found)", func(t *testing.T) { + p, storageClient, _, _, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPut, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(nil, &store.ErrNotFound{}).Times(1) + + expected := rest.NewNotFoundResponse(id) + + response, err := p.Run(ctx, w, req) + require.NoError(t, err) + require.Equal(t, expected, response) + }) + + t.Run("failure (validate downstream: invalid downstream)", func(t *testing.T) { + p, storageClient, _, _, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPut, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: datamodel.Plane{}}, nil).Times(1) + + expected := rest.NewBadRequestARMResponse(v1.ErrorResponse{Error: v1.ErrorDetails{Code: v1.CodeInvalid, Message: "unexpected plane type ", Target: id.String()}}) + response, err := p.Run(ctx, w, req) + require.NoError(t, err) + require.Equal(t, expected, response) + }) +} + +func Test_ProxyController_PrepareProxyRequest(t *testing.T) { + downstream := "http://localhost:7443" + relativePath := "/planes/radius/local/resourceGroups/test-group/providers/System.TestRP" + t.Run("success (http)", func(t *testing.T) { + originalURL, err := url.Parse("http://localhost:9443/path/base/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes") + require.NoError(t, err) + originalReq := &http.Request{ + Host: originalURL.Host, + Header: http.Header{"Copied": []string{"yes"}}, + TLS: nil, + URL: originalURL} + + p, _, _, _, _ := createController(t) + proxyReq, err := p.PrepareProxyRequest(testcontext.New(t), originalReq, downstream, relativePath) + require.NoError(t, err) + require.NotNil(t, proxyReq) + + require.Equal(t, "http://localhost:7443/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes", proxyReq.URL.String()) + require.Equal(t, "http", proxyReq.Header.Get("X-Forwarded-Proto")) + require.Equal(t, "http://localhost:9443/path/base/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes", proxyReq.Header.Get("Referer")) + require.Equal(t, "yes", proxyReq.Header.Get("Copied")) + }) + + t.Run("success (http)", func(t *testing.T) { + originalURL, err := url.Parse("http://localhost:9443/path/base/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes") + require.NoError(t, err) + originalReq := &http.Request{ + Host: originalURL.Host, + Header: http.Header{"Copied": []string{"yes"}}, + TLS: &tls.ConnectionState{}, + URL: originalURL} + + p, _, _, _, _ := createController(t) + proxyReq, err := p.PrepareProxyRequest(testcontext.New(t), originalReq, downstream, relativePath) + require.NoError(t, err) + require.NotNil(t, proxyReq) + + require.Equal(t, "http://localhost:7443/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes", proxyReq.URL.String()) + require.Equal(t, "https", proxyReq.Header.Get("X-Forwarded-Proto")) + require.Equal(t, "https://localhost:9443/path/base/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes", proxyReq.Header.Get("Referer")) + require.Equal(t, "yes", proxyReq.Header.Get("Copied")) + }) + + t.Run("invalid downstream URL", func(t *testing.T) { + originalReq := &http.Request{Header: http.Header{}, URL: &url.URL{}} + + p, _, _, _, _ := createController(t) + proxyReq, err := p.PrepareProxyRequest(testcontext.New(t), originalReq, "\ninvalid", relativePath) + require.Error(t, err) + require.Equal(t, "failed to parse downstream URL: parse \"\\ninvalid\": net/url: invalid control character in URL", err.Error()) + require.Nil(t, proxyReq) + }) +} + +type mockUpdater struct { + Result error +} + +func (u *mockUpdater) Update(ctx context.Context, downstreamURL string, originalID resources.ID, version string) error { + return u.Result +} + +type mockRoundTripper struct { + Response *http.Response + Err error +} + +func (rt *mockRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + if rt.Response != nil { + rt.Response.Request = r + } + return rt.Response, rt.Err +} diff --git a/pkg/ucp/frontend/controller/resourcegroups/util.go b/pkg/ucp/frontend/controller/resourcegroups/util.go new file mode 100644 index 0000000000..185500b8d9 --- /dev/null +++ b/pkg/ucp/frontend/controller/resourcegroups/util.go @@ -0,0 +1,111 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package resourcegroups + +import ( + "context" + "errors" + "fmt" + "net/url" + + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + resources_radius "github.com/radius-project/radius/pkg/ucp/resources/radius" + "github.com/radius-project/radius/pkg/ucp/rest" + "github.com/radius-project/radius/pkg/ucp/store" +) + +// NotFoundError is returned when a resource group or plane is not found. +type NotFoundError struct { + Message string +} + +// Error returns the error message. +func (e *NotFoundError) Error() string { + return e.Message +} + +// Is returns true if the error is a NotFoundError. +func (e *NotFoundError) Is(err error) bool { + _, ok := err.(*NotFoundError) + return ok +} + +// InvalidError is returned when the data is invalid. +type InvalidError struct { + Message string +} + +// Error returns the error message. +func (e *InvalidError) Error() string { + return e.Message +} + +// Is returns true if the error is a InvalidError. +func (e *InvalidError) Is(err error) bool { + _, ok := err.(*InvalidError) + return ok +} + +// ValidateDownstream can be used to find and validate the downstream URL for a resource. +// Returns NotFoundError for the case where the plane or resource group does not exist. +// Returns InvalidError for cases where the data is invalid, like when the resource provider is not configured. +func ValidateDownstream(ctx context.Context, client store.StorageClient, id resources.ID) (*url.URL, error) { + planeID, err := resources.ParseScope(id.PlaneScope()) + if err != nil { + // Not expected to happen. + return nil, err + } + + plane, err := store.GetResource[datamodel.Plane](ctx, client, planeID.String()) + if errors.Is(err, &store.ErrNotFound{}) { + return nil, &NotFoundError{Message: fmt.Sprintf("plane %q not found", planeID.String())} + } else if err != nil { + return nil, fmt.Errorf("failed to find plane %q: %w", planeID.String(), err) + } + + if plane.Properties.Kind != rest.PlaneKindUCPNative { + return nil, &InvalidError{Message: fmt.Sprintf("unexpected plane type %s", plane.Properties.Kind)} + } + + // If the ID contains a resource group, validate it now. + if id.FindScope(resources_radius.ScopeResourceGroups) != "" { + resourceGroupID, err := resources.ParseScope(id.RootScope()) + if err != nil { + // Not expected to happen. + return nil, err + } + + _, err = store.GetResource[datamodel.ResourceGroup](ctx, client, resourceGroupID.String()) + if errors.Is(err, &store.ErrNotFound{}) { + return nil, &NotFoundError{Message: fmt.Sprintf("resource group %q not found", resourceGroupID.String())} + } else if err != nil { + return nil, fmt.Errorf("failed to find resource group %q: %w", resourceGroupID.String(), err) + } + } + + downstream := plane.LookupResourceProvider(id.ProviderNamespace()) + if downstream == "" { + return nil, &InvalidError{Message: fmt.Sprintf("resource provider %s not configured", id.ProviderNamespace())} + } + + downstreamURL, err := url.Parse(downstream) + if err != nil { + return nil, &InvalidError{Message: fmt.Sprintf("failed to parse downstream URL: %v", err.Error())} + } + + return downstreamURL, nil +} diff --git a/pkg/ucp/frontend/controller/resourcegroups/util_test.go b/pkg/ucp/frontend/controller/resourcegroups/util_test.go new file mode 100644 index 0000000000..ff751721ff --- /dev/null +++ b/pkg/ucp/frontend/controller/resourcegroups/util_test.go @@ -0,0 +1,204 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package resourcegroups + +import ( + "errors" + "fmt" + "net/url" + "testing" + + "github.com/golang/mock/gomock" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/to" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/rest" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/require" +) + +func Test_ValidateDownstream(t *testing.T) { + id, err := resources.ParseResource("/planes/radius/local/resourceGroups/test-group/providers/System.TestRP/testResources/name") + require.NoError(t, err) + + idWithoutResourceGroup, err := resources.Parse("/planes/radius/local/providers/System.TestRP/testResources") + require.NoError(t, err) + + downstream := "http://localhost:7443" + + plane := &datamodel.Plane{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.PlaneScope(), + }, + }, + Properties: datamodel.PlaneProperties{ + Kind: rest.PlaneKindUCPNative, + ResourceProviders: map[string]*string{ + "System.TestRP": to.Ptr(downstream), + }, + }, + } + + setup := func(t *testing.T) *store.MockStorageClient { + ctrl := gomock.NewController(t) + return store.NewMockStorageClient(ctrl) + } + + t.Run("success (resource group)", func(t *testing.T) { + resourceGroup := &datamodel.ResourceGroup{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.RootScope(), + }, + }, + } + + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + expectedURL, err := url.Parse(downstream) + require.NoError(t, err) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.NoError(t, err) + require.Equal(t, expectedURL, downstreamURL) + }) + + t.Run("success (non resource group)", func(t *testing.T) { + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), idWithoutResourceGroup.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + + expectedURL, err := url.Parse(downstream) + require.NoError(t, err) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, idWithoutResourceGroup) + require.NoError(t, err) + require.Equal(t, expectedURL, downstreamURL) + }) + + t.Run("plane not found", func(t *testing.T) { + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(nil, &store.ErrNotFound{}).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, &NotFoundError{Message: "plane \"/planes/radius/local\" not found"}, err) + require.Nil(t, downstreamURL) + }) + + t.Run("plane retreival failure", func(t *testing.T) { + mock := setup(t) + + expected := fmt.Errorf("failed to find plane \"/planes/radius/local\": %w", errors.New("test error")) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(nil, errors.New("test error")).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, expected, err) + require.Nil(t, downstreamURL) + }) + + t.Run("resource group not found", func(t *testing.T) { + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(nil, &store.ErrNotFound{}).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, &NotFoundError{Message: "resource group \"/planes/radius/local/resourceGroups/test-group\" not found"}, err) + require.Nil(t, downstreamURL) + }) + + t.Run("resource group err", func(t *testing.T) { + mock := setup(t) + + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(nil, errors.New("test error")).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, "failed to find resource group \"/planes/radius/local/resourceGroups/test-group\": test error", err.Error()) + require.Nil(t, downstreamURL) + }) + + t.Run("resource provider not found", func(t *testing.T) { + plane := &datamodel.Plane{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.PlaneScope(), + }, + }, + Properties: datamodel.PlaneProperties{ + Kind: rest.PlaneKindUCPNative, + ResourceProviders: map[string]*string{}, + }, + } + + resourceGroup := &datamodel.ResourceGroup{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.RootScope(), + }, + }, + } + + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, &InvalidError{Message: "resource provider System.TestRP not configured"}, err) + require.Nil(t, downstreamURL) + }) + + t.Run("resource provider invalid URL", func(t *testing.T) { + plane := &datamodel.Plane{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.PlaneScope(), + }, + }, + Properties: datamodel.PlaneProperties{ + Kind: rest.PlaneKindUCPNative, + ResourceProviders: map[string]*string{ + "System.TestRP": to.Ptr("\ninvalid"), + }, + }, + } + + resourceGroup := &datamodel.ResourceGroup{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.RootScope(), + }, + }, + } + + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, &InvalidError{Message: "failed to parse downstream URL: parse \"\\ninvalid\": net/url: invalid control character in URL"}, err) + require.Nil(t, downstreamURL) + }) +} diff --git a/pkg/ucp/frontend/modules/types.go b/pkg/ucp/frontend/modules/types.go index a8d4cd7d05..6b0a0d6e9a 100644 --- a/pkg/ucp/frontend/modules/types.go +++ b/pkg/ucp/frontend/modules/types.go @@ -20,6 +20,7 @@ import ( "context" "net/http" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/sdk" "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/hostoptions" @@ -71,6 +72,9 @@ type Options struct { // SpecLoader is the OpenAPI spec loader containing specs for the UCP APIs. SpecLoader *validator.Loader + // StatusManager is the async operation status manager. + StatusManager statusmanager.StatusManager + // UCPConnection is the connection used to communicate with UCP APIs. UCPConnection sdk.Connection } diff --git a/pkg/ucp/frontend/radius/routes.go b/pkg/ucp/frontend/radius/routes.go index b36810dff9..a6135f7c65 100644 --- a/pkg/ucp/frontend/radius/routes.go +++ b/pkg/ucp/frontend/radius/routes.go @@ -27,7 +27,7 @@ import ( "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" "github.com/radius-project/radius/pkg/ucp/datamodel" "github.com/radius-project/radius/pkg/ucp/datamodel/converter" - planes_ctrl "github.com/radius-project/radius/pkg/ucp/frontend/controller/planes" + radius_ctrl "github.com/radius-project/radius/pkg/ucp/frontend/controller/radius" resourcegroups_ctrl "github.com/radius-project/radius/pkg/ucp/frontend/controller/resourcegroups" "github.com/radius-project/radius/pkg/validator" ) @@ -117,21 +117,22 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: resourceGroupResourceRouter, Path: server.CatchAllPath, OperationType: &v1.OperationType{Type: OperationTypeUCPRadiusProxy, Method: v1.OperationProxy}, - ControllerFactory: planes_ctrl.NewProxyController, + ControllerFactory: radius_ctrl.NewProxyController, }, { // Proxy request should use CatchAllPath(/*) to process all requests under /planes/radius/{planeName}/. ParentRouter: baseRouter, Path: server.CatchAllPath, OperationType: &v1.OperationType{Type: OperationTypeUCPRadiusProxy, Method: v1.OperationProxy}, - ControllerFactory: planes_ctrl.NewProxyController, + ControllerFactory: radius_ctrl.NewProxyController, }, } ctrlOptions := controller.Options{ - Address: m.options.Address, - PathBase: m.options.PathBase, - DataProvider: m.options.DataProvider, + Address: m.options.Address, + PathBase: m.options.PathBase, + DataProvider: m.options.DataProvider, + StatusManager: m.options.StatusManager, } for _, h := range handlerOptions { diff --git a/pkg/ucp/integrationtests/radius/proxy_test.go b/pkg/ucp/integrationtests/radius/proxy_test.go index 462d0dc538..47d2dbd0ef 100644 --- a/pkg/ucp/integrationtests/radius/proxy_test.go +++ b/pkg/ucp/integrationtests/radius/proxy_test.go @@ -42,6 +42,9 @@ const ( testResourceGroupID = testRadiusPlaneID + "/resourceGroups/test-rg" testResourceCollectionID = testResourceGroupID + "/providers/System.Test/testResources" testResourceID = testResourceCollectionID + "/test-resource" + + assertTimeout = time.Second * 10 + assertRetry = time.Second * 2 ) func Test_RadiusPlane_Proxy_ResourceGroupDoesNotExist(t *testing.T) { @@ -72,6 +75,12 @@ func Test_RadiusPlane_ResourceSync(t *testing.T) { message := "here is some test data" + expectedTrackedResource := v20231001preview.GenericResource{ + ID: to.Ptr(testResourceID), + Name: to.Ptr("test-resource"), + Type: to.Ptr("System.Test/testResources"), + } + t.Run("PUT", func(t *testing.T) { data := testrp.TestResource{ Properties: testrp.TestResourceProperties{ @@ -101,6 +110,17 @@ func Test_RadiusPlane_ResourceSync(t *testing.T) { require.Equal(t, message, *resources.Value[0].Properties.Message) }) + t.Run("List - Tracked Resources", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Len(t, resources.Value, 1) + require.Equal(t, expectedTrackedResource, *resources.Value[0]) + }) + t.Run("GET", func(t *testing.T) { response := ucp.MakeRequest(http.MethodGet, testResourceID+"?api-version="+testrp.Version, nil) response.EqualsStatusCode(http.StatusOK) @@ -121,6 +141,16 @@ func Test_RadiusPlane_ResourceSync(t *testing.T) { response.EqualsStatusCode(http.StatusNotFound) }) + t.Run("List - Tracked Resources (after delete)", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Empty(t, resources.Value) + }) + t.Run("DELETE (again)", func(t *testing.T) { response := ucp.MakeRequest(http.MethodDelete, testResourceID+"?api-version="+testrp.Version, nil) response.EqualsStatusCode(http.StatusNoContent) @@ -132,16 +162,19 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { rp := testrp.Start(t) // Block background work item completion until we're ready. - putCh := make(chan struct{}) - deleteCh := make(chan struct{}) + putCh := make(chan backend_ctrl.Result) + deleteCh := make(chan backend_ctrl.Result) onPut := func(ctx context.Context, request *backend_ctrl.Request) (backend_ctrl.Result, error) { t.Log("PUT operation is waiting for completion") - <-putCh - return backend_ctrl.Result{}, nil + result := <-putCh + return result, nil } onDelete := func(ctx context.Context, request *backend_ctrl.Request) (backend_ctrl.Result, error) { t.Log("DELETE operation is waiting for completion") - <-deleteCh + result := <-deleteCh + if result.Requeue || result.Error != nil { + return result, nil + } client, err := ucp.Clients.StorageProvider.GetStorageClient(ctx, "System.Test/testResources") require.NoError(t, err) @@ -162,7 +195,14 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { message := "here is some test data" + expectedTrackedResource := v20231001preview.GenericResource{ + ID: to.Ptr(testResourceID), + Name: to.Ptr("test-resource"), + Type: to.Ptr("System.Test/testResources"), + } + t.Run("PUT", func(t *testing.T) { + t.Log("starting PUT operation") data := testrp.TestResource{ Properties: testrp.TestResourceProperties{ Message: to.Ptr(message), @@ -178,7 +218,7 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { err = json.Unmarshal(response.Body.Bytes(), resource) require.NoError(t, err) require.Equal(t, message, *resource.Properties.Message) - require.Equal(t, string(v1.ProvisioningStateAccepted), *resource.Properties.ProvisioningState) + require.False(t, v1.ProvisioningState(*resource.Properties.ProvisioningState).IsTerminal()) location := response.Raw.Header.Get("Location") azureAsyncOperation := response.Raw.Header.Get("Azure-AsyncOperation") @@ -195,7 +235,18 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { require.NoError(t, err) require.Len(t, resources.Value, 1) require.Equal(t, message, *resources.Value[0].Properties.Message) - require.Equal(t, string(v1.ProvisioningStateAccepted), *resources.Value[0].Properties.ProvisioningState) + require.False(t, v1.ProvisioningState(*resources.Value[0].Properties.ProvisioningState).IsTerminal()) + }) + + t.Run("List - Tracked Resources (during PUT)", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Len(t, resources.Value, 1) + require.Equal(t, expectedTrackedResource, *resources.Value[0]) }) t.Run("GET (during PUT)", func(t *testing.T) { @@ -210,7 +261,8 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { }) t.Run("Complete PUT", func(t *testing.T) { - putCh <- struct{}{} + t.Log("completing PUT operation") + putCh <- backend_ctrl.Result{} require.EventuallyWithT(t, func(collect *assert.CollectT) { response := ucp.MakeRequest(http.MethodGet, testResourceID+"?api-version="+testrp.Version, nil) assert.Equal(collect, http.StatusOK, response.Raw.StatusCode) @@ -219,10 +271,46 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { err := json.Unmarshal(response.Body.Bytes(), resource) assert.NoError(collect, err) assert.Equal(collect, string(v1.ProvisioningStateSucceeded), *resource.Properties.ProvisioningState) - }, time.Second*5, time.Millisecond*100) + }, assertTimeout, assertRetry) + }) + + t.Run("DELETE FAILURE", func(t *testing.T) { + t.Log("starting DELETE FAILURE operation") + response := ucp.MakeRequest(http.MethodDelete, testResourceID+"?api-version="+testrp.Version, nil) + response.EqualsStatusCode(http.StatusAccepted) + }) + + t.Run("Complete DELETE FAILURE", func(t *testing.T) { + t.Log("completing DELETE FAILURE operation") + deleteCh <- backend_ctrl.NewFailedResult(v1.ErrorDetails{ + Code: v1.CodeInternal, + Message: "Oh no!", + }) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + response := ucp.MakeRequest(http.MethodGet, testResourceID+"?api-version="+testrp.Version, nil) + assert.Equal(collect, http.StatusOK, response.Raw.StatusCode) + + resource := &testrp.TestResource{} + err := json.Unmarshal(response.Body.Bytes(), resource) + assert.NoError(collect, err) + assert.Equal(collect, string(v1.ProvisioningStateFailed), *resource.Properties.ProvisioningState) + t.Logf("Resource provisioning state: %s", *resource.Properties.ProvisioningState) + }, assertTimeout, assertRetry) + }) + + t.Run("List - Tracked Resources (after failed delete)", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Len(t, resources.Value, 1) + require.Equal(t, expectedTrackedResource, *resources.Value[0]) }) t.Run("DELETE", func(t *testing.T) { + t.Log("starting DELETE operation") response := ucp.MakeRequest(http.MethodDelete, testResourceID+"?api-version="+testrp.Version, nil) response.EqualsStatusCode(http.StatusAccepted) }) @@ -236,7 +324,18 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { require.NoError(t, err) require.Len(t, resources.Value, 1) require.Equal(t, message, *resources.Value[0].Properties.Message) - require.Equal(t, string(v1.ProvisioningStateAccepted), *resources.Value[0].Properties.ProvisioningState) + require.False(t, v1.ProvisioningState(*resources.Value[0].Properties.ProvisioningState).IsTerminal()) + }) + + t.Run("List - Tracked Resources (during delete)", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Len(t, resources.Value, 1) + require.Equal(t, expectedTrackedResource, *resources.Value[0]) }) t.Run("GET (during delete)", func(t *testing.T) { @@ -247,15 +346,16 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { err := json.Unmarshal(response.Body.Bytes(), resource) require.NoError(t, err) require.Equal(t, message, *resource.Properties.Message) - require.Equal(t, string(v1.ProvisioningStateAccepted), *resource.Properties.ProvisioningState) + require.False(t, v1.ProvisioningState(*resource.Properties.ProvisioningState).IsTerminal()) }) t.Run("Complete DELETE", func(t *testing.T) { - deleteCh <- struct{}{} + t.Log("completing DELETE operation") + deleteCh <- backend_ctrl.Result{} require.EventuallyWithT(t, func(collect *assert.CollectT) { response := ucp.MakeRequest(http.MethodGet, testResourceID+"?api-version="+testrp.Version, nil) assert.Equal(collect, http.StatusNotFound, response.Raw.StatusCode) - }, time.Second*5, time.Millisecond*100) + }, assertTimeout, assertRetry) }) t.Run("GET (after delete)", func(t *testing.T) { @@ -263,6 +363,19 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { response.EqualsStatusCode(http.StatusNotFound) }) + t.Run("List - Tracked Resources (after delete)", func(t *testing.T) { + // This is eventually consistent. + require.EventuallyWithT(t, func(collect *assert.CollectT) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + assert.Equal(collect, http.StatusOK, response.Raw.StatusCode) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + assert.NoError(collect, err) + assert.Empty(collect, resources.Value) + }, assertTimeout, assertRetry) + }) + t.Run("DELETE (again)", func(t *testing.T) { response := ucp.MakeRequest(http.MethodDelete, testResourceID+"?api-version="+testrp.Version, nil) response.EqualsStatusCode(http.StatusNoContent) diff --git a/pkg/ucp/integrationtests/testrp/async.go b/pkg/ucp/integrationtests/testrp/async.go index 4723b359e6..144b598c8c 100644 --- a/pkg/ucp/integrationtests/testrp/async.go +++ b/pkg/ucp/integrationtests/testrp/async.go @@ -33,6 +33,7 @@ import ( "github.com/radius-project/radius/pkg/armrpc/servicecontext" "github.com/radius-project/radius/pkg/middleware" "github.com/radius-project/radius/pkg/ucp/integrationtests/testserver" + queueprovider "github.com/radius-project/radius/pkg/ucp/queue/provider" "github.com/radius-project/radius/test/testcontext" "github.com/stretchr/testify/require" ) @@ -58,7 +59,18 @@ func AsyncResource(t *testing.T, ts *testserver.TestServer, rootScope string, pu resourceType := "System.Test/testResources" - queueClient, err := ts.Clients.QueueProvider.GetClient(ctx) + // We can share the storage provider with the test server. + _, err := ts.Clients.StorageProvider.GetStorageClient(ctx, "System.Test/operationStatuses") + require.NoError(t, err) + + // Do not share the queue. + queueOptions := queueprovider.QueueProviderOptions{ + Provider: queueprovider.TypeInmemory, + InMemory: &queueprovider.InMemoryQueueOptions{}, + Name: "System.Test", + } + queueProvider := queueprovider.New(queueOptions) + queueClient, err := queueProvider.GetClient(ctx) require.NoError(t, err) statusManager := statusmanager.New(ts.Clients.StorageProvider, queueClient, v1.LocationGlobal) diff --git a/pkg/ucp/integrationtests/testserver/testserver.go b/pkg/ucp/integrationtests/testserver/testserver.go index fa38494e83..cbd0a9a8f7 100644 --- a/pkg/ucp/integrationtests/testserver/testserver.go +++ b/pkg/ucp/integrationtests/testserver/testserver.go @@ -37,9 +37,13 @@ import ( etcdclient "go.etcd.io/etcd/client/v3" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + backend_ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/worker" "github.com/radius-project/radius/pkg/armrpc/rpctest" "github.com/radius-project/radius/pkg/armrpc/servicecontext" "github.com/radius-project/radius/pkg/middleware" + "github.com/radius-project/radius/pkg/ucp/backend" "github.com/radius-project/radius/pkg/ucp/data" "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/frontend/api" @@ -50,6 +54,7 @@ import ( queueprovider "github.com/radius-project/radius/pkg/ucp/queue/provider" "github.com/radius-project/radius/pkg/ucp/secret" secretprovider "github.com/radius-project/radius/pkg/ucp/secret/provider" + "github.com/radius-project/radius/pkg/ucp/server" "github.com/radius-project/radius/pkg/ucp/store" "github.com/radius-project/radius/pkg/validator" "github.com/radius-project/radius/swagger" @@ -156,6 +161,8 @@ func StartWithMocks(t *testing.T, configureModules func(options modules.Options) secretProvider := secretprovider.NewSecretProvider(secretprovider.SecretProviderOptions{}) secretProvider.SetClient(secretClient) + statusManager := statusmanager.NewMockStatusManager(ctrl) + router := chi.NewRouter() router.Use(servicecontext.ARMRequestCtx(pathBase, "global")) @@ -176,6 +183,7 @@ func StartWithMocks(t *testing.T, configureModules func(options modules.Options) DataProvider: dataProvider, SecretProvider: secretProvider, SpecLoader: specLoader, + StatusManager: statusManager, } if configureModules == nil { @@ -222,6 +230,7 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) }) ctx, cancel := testcontext.NewWithCancel(t) + t.Cleanup(cancel) stoppedChan := make(chan struct{}) defer close(stoppedChan) @@ -235,7 +244,9 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) // and you'll be able to see the spam from etcd. // // This is caught by the race checker and will fail your pr if you do it. - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err := etcd.Run(ctx) if err != nil { t.Logf("error from etcd: %v", err) @@ -254,7 +265,7 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) ETCD: storageOptions.ETCD, } queueOptions := queueprovider.QueueProviderOptions{ - Name: "System.Resources", + Name: server.UCPProviderName, Provider: queueprovider.TypeInmemory, InMemory: &queueprovider.InMemoryQueueOptions{}, } @@ -265,6 +276,21 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) secretProvider := secretprovider.NewSecretProvider(secretOptions) queueProvider := queueprovider.New(queueOptions) + queueClient, err := queueProvider.GetClient(ctx) + require.NoError(t, err) + + statusManager := statusmanager.New(dataProvider, queueClient, v1.LocationGlobal) + + registry := worker.NewControllerRegistry(dataProvider) + err = backend.RegisterControllers(ctx, registry, backend_ctrl.Options{DataProvider: dataProvider}) + require.NoError(t, err) + + w := worker.New(worker.Options{}, statusManager, queueClient, registry) + go func() { + err = w.Start(ctx) + require.NoError(t, err) + }() + router := chi.NewRouter() router.Use(servicecontext.ARMRequestCtx(pathBase, "global")) @@ -285,6 +311,7 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) SecretProvider: secretProvider, SpecLoader: specLoader, QueueProvider: queueProvider, + StatusManager: statusManager, } if configureModules == nil { diff --git a/pkg/ucp/server/server.go b/pkg/ucp/server/server.go index 0896121804..a24af00caf 100644 --- a/pkg/ucp/server/server.go +++ b/pkg/ucp/server/server.go @@ -23,7 +23,7 @@ import ( "strings" "time" - hostOpts "github.com/radius-project/radius/pkg/armrpc/hostoptions" + hostopts "github.com/radius-project/radius/pkg/armrpc/hostoptions" "github.com/radius-project/radius/pkg/kubeutil" metricsprovider "github.com/radius-project/radius/pkg/metrics/provider" metricsservice "github.com/radius-project/radius/pkg/metrics/service" @@ -69,7 +69,7 @@ type Options struct { Location string } -const UCPProviderName = "ucp" +const UCPProviderName = "System.Resources" // NewServerOptionsFromEnvironment creates a new Options struct from environment variables and returns it along with any errors. func NewServerOptionsFromEnvironment() (Options, error) { @@ -187,8 +187,12 @@ func NewServer(options *Options) (*hosting.Host, error) { hostingServices = append(hostingServices, profilerservice.NewService(profilerOptions)) } - backendServiceOptions := hostOpts.HostOptions{ - Config: &hostOpts.ProviderConfig{ + backendServiceOptions := hostopts.HostOptions{ + + Config: &hostopts.ProviderConfig{ + Env: hostopts.EnvironmentOptions{ + RoleLocation: options.Config.Location, + }, StorageProvider: options.StorageProviderOptions, SecretProvider: options.SecretProviderOptions, QueueProvider: options.QueueProviderOptions, diff --git a/pkg/ucp/trackedresource/doc.go b/pkg/ucp/trackedresource/doc.go new file mode 100644 index 0000000000..2b93fc134b --- /dev/null +++ b/pkg/ucp/trackedresource/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// package trackedresource provides utility functionality for working with tracked resources. +// This functionality is shared between frontend and backend controllers for tracked resources. +package trackedresource diff --git a/pkg/ucp/trackedresource/name.go b/pkg/ucp/trackedresource/name.go new file mode 100644 index 0000000000..4e94796a7b --- /dev/null +++ b/pkg/ucp/trackedresource/name.go @@ -0,0 +1,85 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trackedresource + +import ( + "crypto/sha1" + "fmt" + "strings" + + "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/resources" +) + +// NameFor computes the resource name of a tracked resource from its ID. +// +// This can be used to compute the name of a tracked resource based on the resource that it is tracking. +// +// Names are computed by taking the name of the resource being tracked and appending a suffix to it based +// on the hash of the resource ID. This ensures that the name is unique and deterministic. +func NameFor(id resources.ID) string { + if id.IsEmpty() { + return "" + } + + // We need to generate a valid ARM/UCP name. The original resource name is used as a prefix for readability + // followed by the hash of the resource ID. + // + // example: my-resource-ec291e26078b7ea8a74abfac82530005a0ecbf15 + // + // We want this to fit in 63 characters so we allow a prefix of 22 characters a separator and a hash of 40 characters. + const prefixLength = 22 + + prefix := strings.ToLower(id.Name()) + if len(prefix) > prefixLength { + prefix = prefix[:prefixLength] + } + + hasher := sha1.New() + + // It's OK to ignore the error here, it's part of the API because io.Writer is being used, but the implementation + // does not return errors. + _, err := hasher.Write([]byte(strings.ToLower(id.String()))) + if err != nil { + panic("unexpected error writing to hash: " + err.Error()) + } + + hash := hasher.Sum(nil) + + return fmt.Sprintf("%s-%x", prefix, hash) +} + +// IDFor computes the resource ID of a tracked resource entry from the original resource ID. +func IDFor(id resources.ID) resources.ID { + if id.IsEmpty() { + return resources.ID{} + } + + // Tracking ID is the ID of the entry that will store the data. + // + // Example: + // id: /planes/radius/local/resourceGroups/test-group/providers/Applications.Core/applications/test-app + // trackingID: /planes/radius/local/resourceGroups/test-group/providers/System.Resources/genericResources/test-app-ec291e26078b7ea8a74abfac82530005a0ecbf15 + return resources.MustParse(resources.MakeUCPID( + id.ScopeSegments(), + []resources.TypeSegment{ + { + Type: v20231001preview.ResourceType, + Name: NameFor(id), + }, + }, nil)) +} diff --git a/pkg/ucp/trackedresource/name_test.go b/pkg/ucp/trackedresource/name_test.go new file mode 100644 index 0000000000..f23263253f --- /dev/null +++ b/pkg/ucp/trackedresource/name_test.go @@ -0,0 +1,38 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trackedresource + +import ( + "testing" + + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/stretchr/testify/require" +) + +var ( + testID = resources.MustParse("/planes/radius/local/resourceGroups/test-group/providers/Applications.Core/applications/test-app") +) + +func Test_NameFor(t *testing.T) { + name := NameFor(testID) + require.Equal(t, "test-app-303153687ee5adbcf353bc6c2caa4373f31e04c6", name) +} + +func Test_IDFor(t *testing.T) { + id := IDFor(testID) + require.Equal(t, resources.MustParse("/planes/radius/local/resourceGroups/test-group/providers/System.Resources/resources/test-app-303153687ee5adbcf353bc6c2caa4373f31e04c6"), id) +} diff --git a/pkg/ucp/trackedresource/update.go b/pkg/ucp/trackedresource/update.go new file mode 100644 index 0000000000..49f9de3c8c --- /dev/null +++ b/pkg/ucp/trackedresource/update.go @@ -0,0 +1,294 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trackedresource + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "mime" + "net/http" + "net/url" + "strings" + "time" + + "github.com/go-logr/logr" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/ucplog" +) + +const ( + retryCount = 10 + retryDelay = time.Second * 3 + requestTimeout = time.Second * 10 +) + +// NewUpdater creates a new Updater. +func NewUpdater(storeClient store.StorageClient, httpClient *http.Client) *Updater { + return &Updater{ + Store: storeClient, + Client: httpClient, + AttemptCount: retryCount, + RetryDelay: retryDelay, + RequestTimeout: requestTimeout, + } +} + +// Updater is a utility struct that can perform updates on tracked resources. +type Updater struct { + // Store is the storage client used to access the database. + Store store.StorageClient + + // Client is the HTTP client used to make requests to the downstream API. + Client *http.Client + + // AttemptCount is the number of times to attempt a request and database update. + AttemptCount int + + // RetryDelay is the delay between retries. + RetryDelay time.Duration + + // RequestTimeout is the timeout used for requests to the downstream API. + RequestTimeout time.Duration +} + +// InProgressErr signifies that the resource is currently in a non-terminal state. +type InProgressErr struct { +} + +// Error returns the error message. +func (e *InProgressErr) Error() string { + return "resource is still being provisioned" +} + +// Is returns true if the other error is an InProgressErr. +func (e *InProgressErr) Is(other error) bool { + _, ok := other.(*InProgressErr) + return ok +} + +// trackedResourceState holds the state of a tracked resource as reported by the downstream API. +// This only defines the fields we use, so many fields returned by the API are omitted. +type trackedResourceState struct { + ID string `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + Properties trackedResourceStateProperties `json:"properties,omitempty"` +} + +type trackedResourceStateProperties struct { + ProvisioningState *v1.ProvisioningState `json:"provisioningState,omitempty"` +} + +// Update updates a tracked resource. +// +// This function return attempt to update the state using optimistic concurrency and will retry on the following +// conditions: +// +// - Downstream failure or timeout +// - Database failure +// - Optimistic concurrency failure +// - Resource is still being provisioned (provisioning state is non-terminal) +func (u *Updater) Update(ctx context.Context, downstream string, id resources.ID, apiVersion string) error { + logger := ucplog.FromContextOrDiscard(ctx) + destination, err := url.Parse(downstream) + if err != nil { + return err + } + + destination = destination.JoinPath(id.String()) + + query := destination.Query() + query.Set("api-version", apiVersion) + destination.RawQuery = query.Encode() + + // Tracking ID is the ID of the TrackedResourceEntry that will store the data. + // + // Example: + // id: /planes/radius/local/resourceGroups/test-group/providers/Applications.Core/applications/test-app + // trackingID: /planes/radius/local/resourceGroups/test-group/providers/System.Resources/trackingResourceEntries/test-app-ec291e26078b7ea8a74abfac82530005a0ecbf15 + trackingID := IDFor(id) + + logger = logger.WithValues("id", id, "trackingID", trackingID, "destination", destination.String()) + logger.V(ucplog.LevelDebug).Info("updating tracked resource") + for attempt := 1; attempt <= u.AttemptCount; attempt++ { + logger.WithValues("attempt", attempt) + ctx := logr.NewContext(ctx, logger) + logger.V(ucplog.LevelDebug).Info("beginning attempt") + + err := u.run(ctx, id, trackingID, destination, apiVersion) + if errors.Is(err, &InProgressErr{}) && attempt == u.AttemptCount { + // Preserve the InprogressErr for the last attempt. + return err + } else if err != nil { + logger.Error(err, "attempt failed", "delay", u.RetryDelay) + time.Sleep(u.RetryDelay) + continue + } + + logger.V(ucplog.LevelDebug).Info("tracked resource processing completed successfully") + return nil + } + + return fmt.Errorf("failed to update tracked resource after %d attempts", u.AttemptCount) +} + +func (u *Updater) run(ctx context.Context, id resources.ID, trackingID resources.ID, destination *url.URL, apiVersion string) error { + logger := ucplog.FromContextOrDiscard(ctx) + obj, err := u.Store.Get(ctx, trackingID.String()) + if errors.Is(err, &store.ErrNotFound{}) { + // This is fine. It might be a new resource. + } else if err != nil { + return err + } + + etag := "" + entry := datamodel.GenericResourceFromID(id, trackingID) + entry.Properties.APIVersion = apiVersion + if obj != nil { + etag = obj.ETag + err := obj.As(&entry) + if err != nil { + return err + } + } + + data, err := u.fetch(ctx, destination) + if err != nil { + return err + } + + if data == nil { + // Resource was not found. We can delete the tracked resource entry. + logger.V(ucplog.LevelDebug).Info("deleting tracked resource entry") + err = u.Store.Delete(ctx, trackingID.String(), store.WithETag(etag)) + if errors.Is(err, &store.ErrNotFound{}) { + return nil + } else if err != nil { + return err + } + + return nil + } else if data.Properties.ProvisioningState != nil && !data.Properties.ProvisioningState.IsTerminal() { + // Resource is still being provisioned. We should not update anything yet. + logger.V(ucplog.LevelDebug).Info("resource is still being provisioned") + return &InProgressErr{} + } + + // If we get here we're ready to save the changes for a create/update. + // + // Mark the resource as provisioned. This will will "reset" the lock on the resource. + entry.AsyncProvisioningState = v1.ProvisioningStateSucceeded + if data.Properties.ProvisioningState != nil { + entry.AsyncProvisioningState = *data.Properties.ProvisioningState + } + + obj = &store.Object{ + Metadata: store.Metadata{ + ID: trackingID.String(), + }, + Data: entry, + } + logger.V(ucplog.LevelDebug).Info("updating tracked resource entry") + err = u.Store.Save(ctx, obj, store.WithETag(etag)) + if errors.Is(err, &store.ErrConcurrency{}) { + logger.V(ucplog.LevelDebug).Info("tracked resource was updated concurrently") + return &InProgressErr{} + } else if err != nil { + return err + } + + return nil +} + +func (u *Updater) fetch(ctx context.Context, destination *url.URL) (*trackedResourceState, error) { + logger := ucplog.FromContextOrDiscard(ctx) + + ctx, cancel := context.WithTimeout(ctx, requestTimeout) + defer cancel() + + logger.V(ucplog.LevelDebug).Info("fetching resource") + request, err := http.NewRequestWithContext(ctx, http.MethodGet, destination.String(), nil) + if err != nil { + return nil, err + } + response, err := u.Client.Do(request) + if err != nil { + return nil, err + } + logger.V(ucplog.LevelDebug).Info("resource fetched", "status", response.StatusCode) + + defer response.Body.Close() + if !u.isJSONResponse(response) { + return nil, fmt.Errorf("response is not JSON. Content-Type: %q", response.Header.Get("Content-Type")) + } + + if response.StatusCode == 404 { + return nil, nil + } + + if response.StatusCode >= 400 { + return nil, u.reportRequestFailure(response) + } + + data := &trackedResourceState{} + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(data) + if err != nil { + return nil, err + } + + return data, nil +} + +func (u *Updater) isJSONResponse(response *http.Response) bool { + contentType, _, err := mime.ParseMediaType(response.Header.Get("Content-Type")) + if err != nil { + return false + } + + if contentType == "application/json" { + return true + } else if contentType == "text/json" { + return true + } else if strings.HasSuffix(contentType, "+json") { + return true + } + + return false +} + +func (u *Updater) reportRequestFailure(response *http.Response) error { + data := v1.ErrorResponse{} + + decoder := json.NewDecoder(response.Body) + err := decoder.Decode(&data) + if err != nil { + return err + } + + body, err := json.MarshalIndent(data, "", " ") + if err != nil { + return err + } + + return fmt.Errorf("request failed with status code %s:\n%s", response.Status, body) +} diff --git a/pkg/ucp/trackedresource/update_test.go b/pkg/ucp/trackedresource/update_test.go new file mode 100644 index 0000000000..bb252688a5 --- /dev/null +++ b/pkg/ucp/trackedresource/update_test.go @@ -0,0 +1,442 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trackedresource + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/golang/mock/gomock" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/to" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/require" +) + +var ( + testURL = func() *url.URL { + u, err := url.Parse("http://example.com/some-url") + if err != nil { + panic(err) + } + return u + }() +) + +func setupUpdater(t *testing.T) (*Updater, *store.MockStorageClient, *mockRoundTripper) { + ctrl := gomock.NewController(t) + + storeClient := store.NewMockStorageClient(ctrl) + roundTripper := &mockRoundTripper{} + updater := NewUpdater(storeClient, &http.Client{Transport: roundTripper}) + + // Optimize these values for testability. We don't want to wait for retries or timeouts unless + // the test is specifically testing that behavior. + updater.RetryDelay = time.Millisecond * 100 + updater.AttemptCount = 1 + updater.RequestTimeout = time.Microsecond * 100 + + return updater, storeClient, roundTripper +} + +func Test_Update(t *testing.T) { + t.Run("successful update", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + apiVersion := "1234" + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{}, + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error { + require.Equal(t, IDFor(testID).String(), obj.ID) + + dm := obj.Data.(*datamodel.GenericResource) + require.Equal(t, IDFor(testID).String(), dm.ID) + require.Equal(t, testID.String(), dm.Properties.ID) + require.Equal(t, apiVersion, dm.Properties.APIVersion) + return nil + }). + Times(1) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.NoError(t, err) + }) + + t.Run("retry then success", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + updater.AttemptCount = 2 + + apiVersion := "1234" + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{}, + } + + // Fail once, then succeed. + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, errors.New("this will be retried")). + Times(1) + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error { + require.Equal(t, IDFor(testID).String(), obj.ID) + + dm := obj.Data.(*datamodel.GenericResource) + require.Equal(t, IDFor(testID).String(), dm.ID) + require.Equal(t, testID.String(), dm.Properties.ID) + require.Equal(t, apiVersion, dm.Properties.APIVersion) + return nil + }). + Times(1) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.NoError(t, err) + }) + + t.Run("resource still provisioning", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + apiVersion := "1234" + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{ + "provisioningState": v1.ProvisioningStateAccepted, + }, + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (non-terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.Error(t, err) + require.ErrorIs(t, err, &InProgressErr{}) + }) + + t.Run("tracked resource updated concurrently", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + apiVersion := "1234" + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Fail the "Save" operation due to a concurrent update. + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&store.ErrConcurrency{}). + Times(1) + + // Mock a successful (non-terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.Error(t, err) + require.ErrorIs(t, err, &InProgressErr{}) + }) + + t.Run("retries exhausted", func(t *testing.T) { + updater, storeClient, _ := setupUpdater(t) + updater.AttemptCount = 3 + + apiVersion := "1234" + + // Fail enough times to exhaust our retries. + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, errors.New("this will be retried")). + Times(3) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.Error(t, err) + require.Equal(t, "failed to update tracked resource after 3 attempts", err.Error()) + }) +} + +func Test_run(t *testing.T) { + apiVersion := "1234" + + t.Run("successful update (new resource)", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{}, + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error { + require.Equal(t, IDFor(testID).String(), obj.ID) + + dm := obj.Data.(*datamodel.GenericResource) + require.Equal(t, IDFor(testID).String(), dm.ID) + require.Equal(t, testID.String(), dm.Properties.ID) + require.Equal(t, apiVersion, dm.Properties.APIVersion) + return nil + }). + Times(1) + + err := updater.run(testcontext.New(t), testID, IDFor(testID), testURL, apiVersion) + require.NoError(t, err) + }) + + t.Run("successful update (existing resource)", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{}, + } + + etag := "some-etag" + dm := datamodel.GenericResourceFromID(testID, IDFor(testID)) + dm.Properties.APIVersion = apiVersion + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(&store.Object{Metadata: store.Metadata{ETag: etag}, Data: dm}, nil). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error { + require.Equal(t, IDFor(testID).String(), obj.ID) + + dm := obj.Data.(*datamodel.GenericResource) + require.Equal(t, IDFor(testID).String(), dm.ID) + require.Equal(t, testID.String(), dm.Properties.ID) + require.Equal(t, apiVersion, dm.Properties.APIVersion) + return nil + }). + Times(1) + + err := updater.run(testcontext.New(t), testID, IDFor(testID), testURL, apiVersion) + require.NoError(t, err) + }) + + t.Run("successful delete", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + etag := "some-etag" + dm := datamodel.GenericResourceFromID(testID, IDFor(testID)) + dm.Properties.APIVersion = apiVersion + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(&store.Object{Metadata: store.Metadata{ETag: etag}, Data: dm}, nil). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusNotFound, &v1.ErrorResponse{Error: v1.ErrorDetails{Code: v1.CodeNotFound}}) + + storeClient.EXPECT(). + Delete(gomock.Any(), IDFor(testID).String(), gomock.Any()). + Return(nil). + Times(1) + + err := updater.run(testcontext.New(t), testID, IDFor(testID), testURL, apiVersion) + require.NoError(t, err) + }) + + t.Run("resource still provisioning", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{ + "provisioningState": v1.ProvisioningStateAccepted, + }, + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + err := updater.run(testcontext.New(t), testID, IDFor(testID), testURL, apiVersion) + require.Error(t, err) + require.ErrorIs(t, err, &InProgressErr{}) + }) +} + +func Test_fetch(t *testing.T) { + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{ + "provisioningState": v1.ProvisioningStateAccepted, + }, + } + + errorResponse := &v1.ErrorResponse{ + Error: v1.ErrorDetails{ + Code: "SomeErrorCode", + Message: "This is a test.", + }, + } + b, err := json.MarshalIndent(errorResponse, "", " ") + require.NoError(t, err) + errorResponseText := string(b) + + t.Run("successful fetch (200)", func(t *testing.T) { + updater, _, roundTripper := setupUpdater(t) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + expected := &trackedResourceState{ + ID: testID.String(), + Name: testID.Name(), + Type: testID.Type(), + Properties: trackedResourceStateProperties{ + ProvisioningState: to.Ptr(v1.ProvisioningStateAccepted), + }, + } + + state, err := updater.fetch(testcontext.New(t), testURL) + require.NoError(t, err) + require.Equal(t, expected, state) + }) + + t.Run("successful fetch (404)", func(t *testing.T) { + updater, _, roundTripper := setupUpdater(t) + + // We consider 404 a success case. + roundTripper.RespondWithJSON(t, http.StatusNotFound, errorResponse) + + state, err := updater.fetch(testcontext.New(t), testURL) + require.NoError(t, err) + require.Nil(t, state) + }) + + t.Run("failure (non-JSON)", func(t *testing.T) { + updater, _, roundTripper := setupUpdater(t) + + w := httptest.NewRecorder() + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("LOL here's some not-JSON")) + roundTripper.Response = w.Result() + + state, err := updater.fetch(testcontext.New(t), testURL) + require.Error(t, err) + require.Equal(t, "response is not JSON. Content-Type: \"text/plain\"", err.Error()) + require.Nil(t, state) + }) + + t.Run("failure (non-JSON)", func(t *testing.T) { + updater, _, roundTripper := setupUpdater(t) + + roundTripper.RespondWithJSON(t, http.StatusBadRequest, errorResponse) + + state, err := updater.fetch(testcontext.New(t), testURL) + require.Error(t, err) + require.Equal(t, "request failed with status code 400 Bad Request:\n"+errorResponseText, err.Error()) + require.Nil(t, state) + }) +} + +type mockRoundTripper struct { + Response *http.Response + Err error +} + +func (rt *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return rt.Response, rt.Err +} + +func (rt *mockRoundTripper) RespondWithJSON(t *testing.T, statusCode int, body any) { + t.Helper() + + b, err := json.Marshal(body) + require.NoError(t, err) + + w := httptest.NewRecorder() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + _, _ = w.Write(b) + + rt.Response = w.Result() +} diff --git a/test/functional/shared/test.go b/test/functional/shared/test.go index 87b795a6e5..5bec9096f6 100644 --- a/test/functional/shared/test.go +++ b/test/functional/shared/test.go @@ -82,16 +82,17 @@ func NewRPTestOptions(t *testing.T) RPTestOptions { return RPTestOptions{ TestOptions: test.NewTestOptions(t), + Workspace: workspace, CustomAction: customAction, ManagementClient: client, AWSClient: awsClient, Connection: connection, - Workspace: workspace, } } type RPTestOptions struct { test.TestOptions + CustomAction *clientv2.CustomActionClient ManagementClient clients.ApplicationsManagementClient AWSClient aws.AWSCloudControlClient diff --git a/test/functional/ucp/tracked_resource_test.go b/test/functional/ucp/tracked_resource_test.go new file mode 100644 index 0000000000..5ed7375afe --- /dev/null +++ b/test/functional/ucp/tracked_resource_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ucp + +import ( + "encoding/json" + "fmt" + "sort" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/google/uuid" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + aztoken "github.com/radius-project/radius/pkg/azure/tokencredentials" + corerp "github.com/radius-project/radius/pkg/corerp/api/v20231001preview" + "github.com/radius-project/radius/pkg/sdk" + ucp "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/test/functional/shared" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_TrackedResources(t *testing.T) { + log := func(message string, obj any) { + j, err := json.MarshalIndent(&obj, "", " ") + require.NoError(t, err) + t.Logf("%s:\n\n%+v", message, string(j)) + } + + ctx := testcontext.New(t) + options := shared.NewRPTestOptions(t) + resourceGroupID := resources.MustParse("/planes/radius/local/resourcegroups/test-" + uuid.New().String()) + + rgc, err := ucp.NewResourceGroupsClient(&aztoken.AnonymousCredential{}, sdk.NewClientOptions(options.Connection)) + require.NoError(t, err) + rc, err := ucp.NewResourcesClient(&aztoken.AnonymousCredential{}, sdk.NewClientOptions(options.Connection)) + require.NoError(t, err) + ac, err := corerp.NewApplicationsClient(resourceGroupID.String(), &aztoken.AnonymousCredential{}, sdk.NewClientOptions(options.Connection)) + require.NoError(t, err) + exc, err := corerp.NewExtendersClient(resourceGroupID.String(), &aztoken.AnonymousCredential{}, sdk.NewClientOptions(options.Connection)) + require.NoError(t, err) + + rg, err := rgc.CreateOrUpdate(ctx, "radius", "local", resourceGroupID.Name(), ucp.ResourceGroupResource{Location: to.Ptr(v1.LocationGlobal)}, nil) + require.NoError(t, err) + log("Created resource group", rg) + + t.Run("Resource group starts empty", func(t *testing.T) { + resources := []*ucp.GenericResource{} + pager := rc.NewListPager("radius", "local", resourceGroupID.Name(), nil) + for pager.More() { + page, err := pager.NextPage(ctx) + require.NoError(t, err) + log("Got resource page", page) + resources = append(resources, page.Value...) + } + require.Empty(t, resources) + }) + + t.Run("Create resources", func(t *testing.T) { + for i := 0; i < 3; i++ { + a, err := ac.CreateOrUpdate(ctx, fmt.Sprintf("app-%d", i), corerp.ApplicationResource{ + Location: to.Ptr(v1.LocationGlobal), + Properties: &corerp.ApplicationProperties{ + Environment: to.Ptr(options.Workspace.Environment), + }, + }, nil) + require.NoError(t, err) + log("Got application", a) + + // We're using extender here because its operations are asynchronous. + poller, err := exc.BeginCreateOrUpdate(ctx, fmt.Sprintf("ex-%d", i), corerp.ExtenderResource{ + Location: to.Ptr(v1.LocationGlobal), + Properties: &corerp.ExtenderProperties{ + Environment: to.Ptr(options.Workspace.Environment), + Application: to.Ptr(*a.ID), + ResourceProvisioning: to.Ptr(corerp.ResourceProvisioningManual), + }, + }, nil) + require.NoError(t, err) + + ex, err := poller.PollUntilDone(ctx, nil) + require.NoError(t, err) + log("Got extender", ex) + } + }) + + t.Run("Resource group contains resources", func(t *testing.T) { + expected := []*ucp.GenericResource{} + + for i := 0; i < 3; i++ { + expected = append(expected, &ucp.GenericResource{ + ID: to.Ptr(resourceGroupID.Append(resources.TypeSegment{Type: "Applications.Core/applications", Name: fmt.Sprintf("app-%d", i)}).String()), + Name: to.Ptr(fmt.Sprintf("app-%d", i)), + Type: to.Ptr("Applications.Core/applications"), + }) + expected = append(expected, &ucp.GenericResource{ + ID: to.Ptr(resourceGroupID.Append(resources.TypeSegment{Type: "Applications.Core/extenders", Name: fmt.Sprintf("ex-%d", i)}).String()), + Name: to.Ptr(fmt.Sprintf("ex-%d", i)), + Type: to.Ptr("Applications.Core/extenders"), + }) + } + + sort.Slice(expected, func(i, j int) bool { + return *expected[i].ID < *expected[j].ID + }) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + resources := []*ucp.GenericResource{} + pager := rc.NewListPager("radius", "local", resourceGroupID.Name(), nil) + for pager.More() { + page, err := pager.NextPage(ctx) + require.NoError(t, err) + log("Got resource page", page) + resources = append(resources, page.Value...) + } + + sort.Slice(resources, func(i, j int) bool { + return *resources[i].ID < *resources[j].ID + }) + assert.Equal(t, expected, resources) + }, time.Second*30, time.Millisecond*500) + }) + + t.Run("Delete resources", func(t *testing.T) { + for i := 0; i < 3; i++ { + // Delete in reverse order to make sure the extender is deleted before the application it + // belongs to. + poller, err := exc.BeginDelete(ctx, fmt.Sprintf("ex-%d", i), nil) + require.NoError(t, err) + + _, err = poller.PollUntilDone(ctx, nil) + require.NoError(t, err) + + _, err = ac.Delete(ctx, fmt.Sprintf("app-%d", i), nil) + require.NoError(t, err) + } + }) + + t.Run("Resource group is empty again", func(t *testing.T) { + require.EventuallyWithT(t, func(t *assert.CollectT) { + resources := []*ucp.GenericResource{} + pager := rc.NewListPager("radius", "local", resourceGroupID.Name(), nil) + for pager.More() { + page, err := pager.NextPage(ctx) + require.NoError(t, err) + log("Got resource page", page) + resources = append(resources, page.Value...) + } + assert.Empty(t, resources) + }, time.Second*30, time.Millisecond*500) + }) + + t.Run("Delete resource group", func(t *testing.T) { + _, err := rgc.Delete(ctx, "radius", "local", resourceGroupID.Name(), nil) + require.NoError(t, err) + }) +}