Skip to content

Commit

Permalink
[OpinionatedWatcher/Reconciler] Patch Against the Preferred Version (#…
Browse files Browse the repository at this point in the history
…511)

To allow the OpinionatedWatcher and OpinionatedReconciler to issue patch
requests to the preferred API version, instead of the version being
watched (as conversion for the patching can lead to edge-case conversion
problems), introduce `k8s.DynamicPatcher`, which issues patch requests
to the preferred API version of a GroupKind, using the discovery client
to get (and cache) preferred version(s).

This change also introduces a wrapper for this client to avoid changing
the `operator.PatchClient` used for OpinionatedWatcher and
OpinionatedReconciler--this wrapper is unexported and used in
`simple.App` and `simple.Operator` so that apps built that way will work
properly. This is so the same change can also be made to the `lts/v0.24`
branch. A later state of this will likely move the DynamicPatcher to
only allow for patching of metadata, or at least having the opinionated
watcher and reconciler use an interface that only patches metadata,
rather than the whole object, which the DynamicPatcher can satisfy.

Relates to #506

---------

Co-authored-by: Igor Suleymanov <[email protected]>
  • Loading branch information
IfSentient and radiohead authored Dec 3, 2024
1 parent 5562d49 commit afa5519
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 4 deletions.
144 changes: 144 additions & 0 deletions k8s/dynamic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package k8s

import (
"context"
"fmt"
"sync"
"time"

"golang.org/x/sync/singleflight"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"

"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana-app-sdk/resource"
)

// DynamicPatcher is a client which will always patch against the current preferred version of a kind.
type DynamicPatcher struct {
client *dynamic.DynamicClient
discovery *discovery.DiscoveryClient
preferred map[string]metav1.APIResource
mux sync.RWMutex
lastUpdate time.Time
updateInterval time.Duration
group singleflight.Group
}

// NewDynamicPatcher returns a new DynamicPatcher using the provided rest.Config for its internal client(s),
// and cacheUpdateInterval as the interval to refresh its preferred version cache from the API server.
// To disable the cache refresh (and only update on first request and whenever ForceRefresh() is called),
// set this value to <= 0.
func NewDynamicPatcher(cfg *rest.Config, cacheUpdateInterval time.Duration) (*DynamicPatcher, error) {
client, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("error creating dynamic client: %w", err)
}
disc, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("error creating discovery client: %w", err)
}
return &DynamicPatcher{
client: client,
discovery: disc,
preferred: make(map[string]metav1.APIResource),
updateInterval: cacheUpdateInterval,
}, nil
}

type DynamicKindPatcher struct {
patcher *DynamicPatcher
groupKind schema.GroupKind
}

func (d *DynamicKindPatcher) Patch(ctx context.Context, identifier resource.Identifier, patch resource.PatchRequest, options resource.PatchOptions) (resource.Object, error) {
return d.patcher.Patch(ctx, d.groupKind, identifier, patch, options)
}

func (d *DynamicPatcher) Patch(ctx context.Context, groupKind schema.GroupKind, identifier resource.Identifier, patch resource.PatchRequest, _ resource.PatchOptions) (*resource.UnstructuredWrapper, error) {
preferred, err := d.getPreferred(groupKind)
if err != nil {
return nil, err
}
logging.FromContext(ctx).Debug("patching with dynamic client", "group", groupKind.Group, "version", preferred.Version, "kind", groupKind.Kind, "plural", preferred.Name)
data, err := marshalJSONPatch(patch)
if err != nil {
return nil, fmt.Errorf("failed to marshal patch: %w", err)
}
res := d.client.Resource(schema.GroupVersionResource{
Group: preferred.Group,
Version: preferred.Version,
Resource: preferred.Name,
})
if preferred.Namespaced {
resp, err := res.Namespace(identifier.Namespace).Patch(ctx, identifier.Name, types.JSONPatchType, data, metav1.PatchOptions{})
if err != nil {
return nil, err
}
return resource.NewUnstructuredWrapper(resp), nil
}
resp, err := res.Patch(ctx, identifier.Name, types.JSONPatchType, data, metav1.PatchOptions{})
if err != nil {
return nil, err
}
return resource.NewUnstructuredWrapper(resp), nil
}

// ForKind returns a DynamicKindPatcher for the provided group and kind, which implements the Patch method from resource.Client.
// It wraps DynamicPatcher's Patch method, and will use the same self-updating cache of the preferred version
func (d *DynamicPatcher) ForKind(groupKind schema.GroupKind) *DynamicKindPatcher {
return &DynamicKindPatcher{
patcher: d,
groupKind: groupKind,
}
}

// ForceRefresh forces an update of the DiscoveryClient's cache of preferred versions for kinds
func (d *DynamicPatcher) ForceRefresh() error {
return d.updatePreferred()
}

func (d *DynamicPatcher) getPreferred(kind schema.GroupKind) (*metav1.APIResource, error) {
_, err, _ := d.group.Do("check-cache-update", func() (any, error) {
if d.preferred == nil || (d.updateInterval >= 0 && d.lastUpdate.Before(now().Add(-d.updateInterval))) {
if err := d.updatePreferred(); err != nil {
return nil, err
}
}
return nil, nil
})
if err != nil {
return nil, err
}
d.mux.RLock()
defer d.mux.RUnlock()

preferred, ok := d.preferred[kind.String()]
if !ok {
return nil, fmt.Errorf("preferred resource not found for kind '%s'", kind)
}
return &preferred, nil
}

func (d *DynamicPatcher) updatePreferred() error {
d.mux.Lock()
defer d.mux.Unlock()
preferred, err := d.discovery.ServerPreferredResources()
if err != nil {
return err
}
for _, pref := range preferred {
for _, res := range pref.APIResources {
d.preferred[schema.GroupKind{
Group: res.Group,
Kind: res.Kind,
}.String()] = res
}
}
d.lastUpdate = now()
return nil
}
6 changes: 6 additions & 0 deletions operator/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (s *Runner) Run(ctx context.Context, provider app.Provider) error {
for _, kind := range manifestData.Kinds {
for _, version := range kind.Versions {
if version.Admission == nil {
if kind.Conversion {
anyWebhooks = true
vkCapabilities[fmt.Sprintf("%s/%s", kind.Kind, version.Name)] = capabilities{
conversion: kind.Conversion,
}
}
continue
}
vkCapabilities[fmt.Sprintf("%s/%s", kind.Kind, version.Name)] = capabilities{
Expand Down
143 changes: 143 additions & 0 deletions resource/unstructured.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package resource

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)

var _ Object = &UnstructuredWrapper{}

// UnstructuredWrapper wraps unstructured.Unstructured so that it implements resource.Object
type UnstructuredWrapper struct {
*unstructured.Unstructured `json:",inline"`
}

// NewUnstructuredWrapper creates a new UnstructuredWrapper wrapping the provided unstructured.Unstructured resource
func NewUnstructuredWrapper(obj *unstructured.Unstructured) *UnstructuredWrapper {
return &UnstructuredWrapper{obj}
}

func (u *UnstructuredWrapper) GetSpec() any {
return u.Object["spec"]
}

func (u *UnstructuredWrapper) SetSpec(a any) error {
u.Object["spec"] = a
return nil
}

func (u *UnstructuredWrapper) GetSubresources() map[string]any {
subresources := make(map[string]any)
for k, v := range u.Object {
if k == "metadata" || k == "apiVersion" || k == "kind" || k == "spec" {
continue
}
subresources[k] = v
}
return subresources
}

func (u *UnstructuredWrapper) GetSubresource(s string) (any, bool) {
sr, ok := u.Object[s]
return sr, ok
}

func (u *UnstructuredWrapper) SetSubresource(key string, val any) error {
u.Object[key] = val
return nil
}

func (u *UnstructuredWrapper) GetStaticMetadata() StaticMetadata {
return StaticMetadata{
Name: u.GetName(),
Namespace: u.GetNamespace(),
Group: u.GroupVersionKind().Group,
Version: u.GroupVersionKind().Version,
Kind: u.GroupVersionKind().Kind,
}
}

func (u *UnstructuredWrapper) SetStaticMetadata(metadata StaticMetadata) {
u.SetName(metadata.Name)
u.SetNamespace(metadata.Namespace)
u.SetGroupVersionKind(schema.GroupVersionKind{
Group: metadata.Group,
Version: metadata.Version,
Kind: metadata.Kind,
})
}

//nolint:revive,staticcheck
func (u *UnstructuredWrapper) GetCommonMetadata() CommonMetadata {
var err error
dt := u.GetDeletionTimestamp()
var deletionTimestamp *time.Time
if dt != nil {
deletionTimestamp = &dt.Time
}
updt := time.Time{}
createdBy := ""
updatedBy := ""
if annotations := u.GetAnnotations(); annotations != nil {
strUpdt, ok := annotations[AnnotationUpdateTimestamp]
if ok {
updt, err = time.Parse(time.RFC3339, strUpdt)
if err != nil {
// HMMMM
}
}
createdBy = annotations[AnnotationCreatedBy]
updatedBy = annotations[AnnotationUpdatedBy]
}
return CommonMetadata{
UID: string(u.GetUID()),
ResourceVersion: u.GetResourceVersion(),
Generation: u.GetGeneration(),
Labels: u.GetLabels(),
CreationTimestamp: u.GetCreationTimestamp().Time,
DeletionTimestamp: deletionTimestamp,
Finalizers: u.GetFinalizers(),
UpdateTimestamp: updt,
CreatedBy: createdBy,
UpdatedBy: updatedBy,
}
}

func (u *UnstructuredWrapper) SetCommonMetadata(metadata CommonMetadata) {
u.SetUID(types.UID(metadata.UID))
u.SetResourceVersion(metadata.ResourceVersion)
u.SetGeneration(metadata.Generation)
u.SetLabels(metadata.Labels)
u.SetCreationTimestamp(metav1.NewTime(metadata.CreationTimestamp))
if metadata.DeletionTimestamp != nil {
dt := metav1.NewTime(*metadata.DeletionTimestamp)
u.SetDeletionTimestamp(&dt)
} else {
u.SetDeletionTimestamp(nil)
}
u.SetFinalizers(metadata.Finalizers)
annotations := u.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
if !metadata.UpdateTimestamp.IsZero() {
annotations[AnnotationUpdateTimestamp] = metadata.UpdateTimestamp.Format(time.RFC3339)
}
if metadata.CreatedBy != "" {
annotations[AnnotationCreatedBy] = metadata.CreatedBy
}
if metadata.UpdatedBy != "" {
annotations[AnnotationUpdatedBy] = metadata.UpdatedBy
}
u.SetAnnotations(annotations)
}

func (u *UnstructuredWrapper) Copy() Object {
uns := unstructured.Unstructured{}
u.DeepCopyInto(&uns)
return &UnstructuredWrapper{&uns}
}
36 changes: 34 additions & 2 deletions simple/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -102,6 +103,7 @@ type App struct {
cfg AppConfig
converters map[string]Converter
customRoutes map[string]AppCustomRouteHandler
patcher *k8s.DynamicPatcher
}

// AppConfig is the configuration used by App
Expand All @@ -112,6 +114,11 @@ type AppConfig struct {
ManagedKinds []AppManagedKind
UnmanagedKinds []AppUnmanagedKind
Converters map[schema.GroupKind]Converter
// DiscoveryRefreshInterval is the interval at which the API discovery cache should be refreshed.
// This is primarily used by the DynamicPatcher in the OpinionatedWatcher/OpinionatedReconciler
// for sending finalizer add/remove patches to the latest version of the kind.
// This defaults to 10 minutes.
DiscoveryRefreshInterval time.Duration
}

// AppInformerConfig contains configuration for the App's internal operator.InformerController
Expand Down Expand Up @@ -206,6 +213,15 @@ func NewApp(config AppConfig) (*App, error) {
customRoutes: make(map[string]AppCustomRouteHandler),
cfg: config,
}
discoveryRefresh := config.DiscoveryRefreshInterval
if discoveryRefresh == 0 {
discoveryRefresh = time.Minute * 10
}
p, err := k8s.NewDynamicPatcher(&config.KubeConfig, discoveryRefresh)
if err != nil {
return nil, err
}
a.patcher = p
for _, kind := range config.ManagedKinds {
err := a.manageKind(kind)
if err != nil {
Expand Down Expand Up @@ -338,7 +354,7 @@ func (a *App) watchKind(kind AppUnmanagedKind) error {
if kind.Reconciler != nil {
reconciler := kind.Reconciler
if !kind.ReconcileOptions.UsePlain {
op, err := operator.NewOpinionatedReconciler(client, a.getFinalizer(kind.Kind))
op, err := operator.NewOpinionatedReconciler(&watchPatcher{a.patcher.ForKind(kind.Kind.GroupVersionKind().GroupKind())}, a.getFinalizer(kind.Kind))
if err != nil {
return err
}
Expand All @@ -353,7 +369,7 @@ func (a *App) watchKind(kind AppUnmanagedKind) error {
if kind.Watcher != nil {
watcher := kind.Watcher
if !kind.ReconcileOptions.UsePlain {
op, err := operator.NewOpinionatedWatcherWithFinalizer(kind.Kind, client, a.getFinalizer)
op, err := operator.NewOpinionatedWatcherWithFinalizer(kind.Kind, &watchPatcher{a.patcher.ForKind(kind.Kind.GroupVersionKind().GroupKind())}, a.getFinalizer)
if err != nil {
return err
}
Expand Down Expand Up @@ -479,3 +495,19 @@ type k8sRunnable struct {
func (k *k8sRunnable) Run(ctx context.Context) error {
return k.runner.Run(ctx.Done())
}

var _ operator.PatchClient = &watchPatcher{}

type watchPatcher struct {
patcher *k8s.DynamicKindPatcher
}

func (w *watchPatcher) PatchInto(ctx context.Context, identifier resource.Identifier, req resource.PatchRequest, options resource.PatchOptions, into resource.Object) error {
obj, err := w.patcher.Patch(ctx, identifier, req, options)
if err != nil {
return err
}
// This is only used to update the finalizers list, so we just need to update metadata
into.SetCommonMetadata(obj.GetCommonMetadata())
return nil
}
Loading

0 comments on commit afa5519

Please sign in to comment.