-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[LTS v0.24][OpinionatedWatcher/Reconciler] Patch Against the Preferre…
…d Version (#520) # What this PR Does This is a duplicate of #511, applied to the `lts/v0.24` branch instead of `main`, to be a part of the next LTS release. ### Original PR Text To allow the OpinionatedWatcher and OpinionatedReconciler to issue patch requests to the preferred API version, instead of the version being watched (as conversion for the patching can lead to edge-case conversion problems), introduce `k8s.DynamicPatcher`, which issues patch requests to the preferred API version of a GroupKind, using the discovery client to get (and cache) preferred version(s). This change also introduces a wrapper for this client to avoid changing the `operator.PatchClient` used for OpinionatedWatcher and OpinionatedReconciler--this wrapper is unexported and used in `simple.App` and `simple.Operator` so that apps built that way will work properly. This is so the same change can also be made to the `lts/v0.24` branch. A later state of this will likely move the DynamicPatcher to only allow for patching of metadata, or at least having the opinionated watcher and reconciler use an interface that only patches metadata, rather than the whole object, which the DynamicPatcher can satisfy. Relates to #506
- Loading branch information
1 parent
bbb501e
commit b93f075
Showing
6 changed files
with
347 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
package k8s | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"golang.org/x/sync/singleflight" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/client-go/discovery" | ||
"k8s.io/client-go/dynamic" | ||
"k8s.io/client-go/rest" | ||
|
||
"github.com/grafana/grafana-app-sdk/logging" | ||
"github.com/grafana/grafana-app-sdk/resource" | ||
) | ||
|
||
// DynamicPatcher is a client which will always patch against the current preferred version of a kind. | ||
type DynamicPatcher struct { | ||
client *dynamic.DynamicClient | ||
discovery *discovery.DiscoveryClient | ||
preferred map[string]metav1.APIResource | ||
mux sync.RWMutex | ||
lastUpdate time.Time | ||
updateInterval time.Duration | ||
group singleflight.Group | ||
} | ||
|
||
// NewDynamicPatcher returns a new DynamicPatcher using the provided rest.Config for its internal client(s), | ||
// and cacheUpdateInterval as the interval to refresh its preferred version cache from the API server. | ||
// To disable the cache refresh (and only update on first request and whenever ForceRefresh() is called), | ||
// set this value to <= 0. | ||
func NewDynamicPatcher(cfg *rest.Config, cacheUpdateInterval time.Duration) (*DynamicPatcher, error) { | ||
client, err := dynamic.NewForConfig(cfg) | ||
if err != nil { | ||
return nil, fmt.Errorf("error creating dynamic client: %w", err) | ||
} | ||
disc, err := discovery.NewDiscoveryClientForConfig(cfg) | ||
if err != nil { | ||
return nil, fmt.Errorf("error creating discovery client: %w", err) | ||
} | ||
return &DynamicPatcher{ | ||
client: client, | ||
discovery: disc, | ||
preferred: make(map[string]metav1.APIResource), | ||
updateInterval: cacheUpdateInterval, | ||
}, nil | ||
} | ||
|
||
type DynamicKindPatcher struct { | ||
patcher *DynamicPatcher | ||
groupKind schema.GroupKind | ||
} | ||
|
||
func (d *DynamicKindPatcher) Patch(ctx context.Context, identifier resource.Identifier, patch resource.PatchRequest, options resource.PatchOptions) (resource.Object, error) { | ||
return d.patcher.Patch(ctx, d.groupKind, identifier, patch, options) | ||
} | ||
|
||
func (d *DynamicPatcher) Patch(ctx context.Context, groupKind schema.GroupKind, identifier resource.Identifier, patch resource.PatchRequest, _ resource.PatchOptions) (*resource.UnstructuredWrapper, error) { | ||
preferred, err := d.getPreferred(groupKind) | ||
if err != nil { | ||
return nil, err | ||
} | ||
logging.FromContext(ctx).Debug("patching with dynamic client", "group", groupKind.Group, "version", preferred.Version, "kind", groupKind.Kind, "plural", preferred.Name) | ||
data, err := marshalJSONPatch(patch) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to marshal patch: %w", err) | ||
} | ||
res := d.client.Resource(schema.GroupVersionResource{ | ||
Group: preferred.Group, | ||
Version: preferred.Version, | ||
Resource: preferred.Name, | ||
}) | ||
if preferred.Namespaced { | ||
resp, err := res.Namespace(identifier.Namespace).Patch(ctx, identifier.Name, types.JSONPatchType, data, metav1.PatchOptions{}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return resource.NewUnstructuredWrapper(resp), nil | ||
} | ||
resp, err := res.Patch(ctx, identifier.Name, types.JSONPatchType, data, metav1.PatchOptions{}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return resource.NewUnstructuredWrapper(resp), nil | ||
} | ||
|
||
// ForKind returns a DynamicKindPatcher for the provided group and kind, which implements the Patch method from resource.Client. | ||
// It wraps DynamicPatcher's Patch method, and will use the same self-updating cache of the preferred version | ||
func (d *DynamicPatcher) ForKind(groupKind schema.GroupKind) *DynamicKindPatcher { | ||
return &DynamicKindPatcher{ | ||
patcher: d, | ||
groupKind: groupKind, | ||
} | ||
} | ||
|
||
// ForceRefresh forces an update of the DiscoveryClient's cache of preferred versions for kinds | ||
func (d *DynamicPatcher) ForceRefresh() error { | ||
return d.updatePreferred() | ||
} | ||
|
||
func (d *DynamicPatcher) getPreferred(kind schema.GroupKind) (*metav1.APIResource, error) { | ||
_, err, _ := d.group.Do("check-cache-update", func() (any, error) { | ||
if d.preferred == nil || (d.updateInterval >= 0 && d.lastUpdate.Before(now().Add(-d.updateInterval))) { | ||
if err := d.updatePreferred(); err != nil { | ||
return nil, err | ||
} | ||
} | ||
return nil, nil | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
d.mux.RLock() | ||
defer d.mux.RUnlock() | ||
|
||
preferred, ok := d.preferred[kind.String()] | ||
if !ok { | ||
return nil, fmt.Errorf("preferred resource not found for kind '%s'", kind) | ||
} | ||
return &preferred, nil | ||
} | ||
|
||
func (d *DynamicPatcher) updatePreferred() error { | ||
d.mux.Lock() | ||
defer d.mux.Unlock() | ||
preferred, err := d.discovery.ServerPreferredResources() | ||
if err != nil { | ||
return err | ||
} | ||
for _, pref := range preferred { | ||
for _, res := range pref.APIResources { | ||
d.preferred[schema.GroupKind{ | ||
Group: res.Group, | ||
Kind: res.Kind, | ||
}.String()] = res | ||
} | ||
} | ||
d.lastUpdate = now() | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package resource | ||
|
||
import ( | ||
"time" | ||
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apimachinery/pkg/types" | ||
) | ||
|
||
var _ Object = &UnstructuredWrapper{} | ||
|
||
// UnstructuredWrapper wraps unstructured.Unstructured so that it implements resource.Object | ||
type UnstructuredWrapper struct { | ||
*unstructured.Unstructured `json:",inline"` | ||
} | ||
|
||
// NewUnstructuredWrapper creates a new UnstructuredWrapper wrapping the provided unstructured.Unstructured resource | ||
func NewUnstructuredWrapper(obj *unstructured.Unstructured) *UnstructuredWrapper { | ||
return &UnstructuredWrapper{obj} | ||
} | ||
|
||
func (u *UnstructuredWrapper) GetSpec() any { | ||
return u.Object["spec"] | ||
} | ||
|
||
func (u *UnstructuredWrapper) SetSpec(a any) error { | ||
u.Object["spec"] = a | ||
return nil | ||
} | ||
|
||
func (u *UnstructuredWrapper) GetSubresources() map[string]any { | ||
subresources := make(map[string]any) | ||
for k, v := range u.Object { | ||
if k == "metadata" || k == "apiVersion" || k == "kind" || k == "spec" { | ||
continue | ||
} | ||
subresources[k] = v | ||
} | ||
return subresources | ||
} | ||
|
||
func (u *UnstructuredWrapper) GetSubresource(s string) (any, bool) { | ||
sr, ok := u.Object[s] | ||
return sr, ok | ||
} | ||
|
||
func (u *UnstructuredWrapper) SetSubresource(key string, val any) error { | ||
u.Object[key] = val | ||
return nil | ||
} | ||
|
||
func (u *UnstructuredWrapper) GetStaticMetadata() StaticMetadata { | ||
return StaticMetadata{ | ||
Name: u.GetName(), | ||
Namespace: u.GetNamespace(), | ||
Group: u.GroupVersionKind().Group, | ||
Version: u.GroupVersionKind().Version, | ||
Kind: u.GroupVersionKind().Kind, | ||
} | ||
} | ||
|
||
func (u *UnstructuredWrapper) SetStaticMetadata(metadata StaticMetadata) { | ||
u.SetName(metadata.Name) | ||
u.SetNamespace(metadata.Namespace) | ||
u.SetGroupVersionKind(schema.GroupVersionKind{ | ||
Group: metadata.Group, | ||
Version: metadata.Version, | ||
Kind: metadata.Kind, | ||
}) | ||
} | ||
|
||
//nolint:revive,staticcheck | ||
func (u *UnstructuredWrapper) GetCommonMetadata() CommonMetadata { | ||
var err error | ||
dt := u.GetDeletionTimestamp() | ||
var deletionTimestamp *time.Time | ||
if dt != nil { | ||
deletionTimestamp = &dt.Time | ||
} | ||
updt := time.Time{} | ||
createdBy := "" | ||
updatedBy := "" | ||
if annotations := u.GetAnnotations(); annotations != nil { | ||
strUpdt, ok := annotations[AnnotationUpdateTimestamp] | ||
if ok { | ||
updt, err = time.Parse(time.RFC3339, strUpdt) | ||
if err != nil { | ||
// HMMMM | ||
} | ||
} | ||
createdBy = annotations[AnnotationCreatedBy] | ||
updatedBy = annotations[AnnotationUpdatedBy] | ||
} | ||
return CommonMetadata{ | ||
UID: string(u.GetUID()), | ||
ResourceVersion: u.GetResourceVersion(), | ||
Generation: u.GetGeneration(), | ||
Labels: u.GetLabels(), | ||
CreationTimestamp: u.GetCreationTimestamp().Time, | ||
DeletionTimestamp: deletionTimestamp, | ||
Finalizers: u.GetFinalizers(), | ||
UpdateTimestamp: updt, | ||
CreatedBy: createdBy, | ||
UpdatedBy: updatedBy, | ||
} | ||
} | ||
|
||
func (u *UnstructuredWrapper) SetCommonMetadata(metadata CommonMetadata) { | ||
u.SetUID(types.UID(metadata.UID)) | ||
u.SetResourceVersion(metadata.ResourceVersion) | ||
u.SetGeneration(metadata.Generation) | ||
u.SetLabels(metadata.Labels) | ||
u.SetCreationTimestamp(metav1.NewTime(metadata.CreationTimestamp)) | ||
if metadata.DeletionTimestamp != nil { | ||
dt := metav1.NewTime(*metadata.DeletionTimestamp) | ||
u.SetDeletionTimestamp(&dt) | ||
} else { | ||
u.SetDeletionTimestamp(nil) | ||
} | ||
u.SetFinalizers(metadata.Finalizers) | ||
annotations := u.GetAnnotations() | ||
if annotations == nil { | ||
annotations = make(map[string]string) | ||
} | ||
if !metadata.UpdateTimestamp.IsZero() { | ||
annotations[AnnotationUpdateTimestamp] = metadata.UpdateTimestamp.Format(time.RFC3339) | ||
} | ||
if metadata.CreatedBy != "" { | ||
annotations[AnnotationCreatedBy] = metadata.CreatedBy | ||
} | ||
if metadata.UpdatedBy != "" { | ||
annotations[AnnotationUpdatedBy] = metadata.UpdatedBy | ||
} | ||
u.SetAnnotations(annotations) | ||
} | ||
|
||
func (u *UnstructuredWrapper) Copy() Object { | ||
uns := unstructured.Unstructured{} | ||
u.DeepCopyInto(&uns) | ||
return &UnstructuredWrapper{&uns} | ||
} |
Oops, something went wrong.