diff --git a/k8s/dynamic.go b/k8s/dynamic.go new file mode 100644 index 00000000..8fb84915 --- /dev/null +++ b/k8s/dynamic.go @@ -0,0 +1,144 @@ +package k8s + +import ( + "context" + "fmt" + "sync" + "time" + + "golang.org/x/sync/singleflight" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + + "github.com/grafana/grafana-app-sdk/logging" + "github.com/grafana/grafana-app-sdk/resource" +) + +// DynamicPatcher is a client which will always patch against the current preferred version of a kind. +type DynamicPatcher struct { + client *dynamic.DynamicClient + discovery *discovery.DiscoveryClient + preferred map[string]metav1.APIResource + mux sync.RWMutex + lastUpdate time.Time + updateInterval time.Duration + group singleflight.Group +} + +// NewDynamicPatcher returns a new DynamicPatcher using the provided rest.Config for its internal client(s), +// and cacheUpdateInterval as the interval to refresh its preferred version cache from the API server. +// To disable the cache refresh (and only update on first request and whenever ForceRefresh() is called), +// set this value to <= 0. +func NewDynamicPatcher(cfg *rest.Config, cacheUpdateInterval time.Duration) (*DynamicPatcher, error) { + client, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("error creating dynamic client: %w", err) + } + disc, err := discovery.NewDiscoveryClientForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("error creating discovery client: %w", err) + } + return &DynamicPatcher{ + client: client, + discovery: disc, + preferred: make(map[string]metav1.APIResource), + updateInterval: cacheUpdateInterval, + }, nil +} + +type DynamicKindPatcher struct { + patcher *DynamicPatcher + groupKind schema.GroupKind +} + +func (d *DynamicKindPatcher) Patch(ctx context.Context, identifier resource.Identifier, patch resource.PatchRequest, options resource.PatchOptions) (resource.Object, error) { + return d.patcher.Patch(ctx, d.groupKind, identifier, patch, options) +} + +func (d *DynamicPatcher) Patch(ctx context.Context, groupKind schema.GroupKind, identifier resource.Identifier, patch resource.PatchRequest, _ resource.PatchOptions) (*resource.UnstructuredWrapper, error) { + preferred, err := d.getPreferred(groupKind) + if err != nil { + return nil, err + } + logging.FromContext(ctx).Debug("patching with dynamic client", "group", groupKind.Group, "version", preferred.Version, "kind", groupKind.Kind, "plural", preferred.Name) + data, err := marshalJSONPatch(patch) + if err != nil { + return nil, fmt.Errorf("failed to marshal patch: %w", err) + } + res := d.client.Resource(schema.GroupVersionResource{ + Group: preferred.Group, + Version: preferred.Version, + Resource: preferred.Name, + }) + if preferred.Namespaced { + resp, err := res.Namespace(identifier.Namespace).Patch(ctx, identifier.Name, types.JSONPatchType, data, metav1.PatchOptions{}) + if err != nil { + return nil, err + } + return resource.NewUnstructuredWrapper(resp), nil + } + resp, err := res.Patch(ctx, identifier.Name, types.JSONPatchType, data, metav1.PatchOptions{}) + if err != nil { + return nil, err + } + return resource.NewUnstructuredWrapper(resp), nil +} + +// ForKind returns a DynamicKindPatcher for the provided group and kind, which implements the Patch method from resource.Client. +// It wraps DynamicPatcher's Patch method, and will use the same self-updating cache of the preferred version +func (d *DynamicPatcher) ForKind(groupKind schema.GroupKind) *DynamicKindPatcher { + return &DynamicKindPatcher{ + patcher: d, + groupKind: groupKind, + } +} + +// ForceRefresh forces an update of the DiscoveryClient's cache of preferred versions for kinds +func (d *DynamicPatcher) ForceRefresh() error { + return d.updatePreferred() +} + +func (d *DynamicPatcher) getPreferred(kind schema.GroupKind) (*metav1.APIResource, error) { + _, err, _ := d.group.Do("check-cache-update", func() (any, error) { + if d.preferred == nil || (d.updateInterval >= 0 && d.lastUpdate.Before(now().Add(-d.updateInterval))) { + if err := d.updatePreferred(); err != nil { + return nil, err + } + } + return nil, nil + }) + if err != nil { + return nil, err + } + d.mux.RLock() + defer d.mux.RUnlock() + + preferred, ok := d.preferred[kind.String()] + if !ok { + return nil, fmt.Errorf("preferred resource not found for kind '%s'", kind) + } + return &preferred, nil +} + +func (d *DynamicPatcher) updatePreferred() error { + d.mux.Lock() + defer d.mux.Unlock() + preferred, err := d.discovery.ServerPreferredResources() + if err != nil { + return err + } + for _, pref := range preferred { + for _, res := range pref.APIResources { + d.preferred[schema.GroupKind{ + Group: res.Group, + Kind: res.Kind, + }.String()] = res + } + } + d.lastUpdate = now() + return nil +} diff --git a/operator/runner.go b/operator/runner.go index b45b4a9f..3e6a61e7 100644 --- a/operator/runner.go +++ b/operator/runner.go @@ -155,6 +155,12 @@ func (s *Runner) Run(ctx context.Context, provider app.Provider) error { for _, kind := range manifestData.Kinds { for _, version := range kind.Versions { if version.Admission == nil { + if kind.Conversion { + anyWebhooks = true + vkCapabilities[fmt.Sprintf("%s/%s", kind.Kind, version.Name)] = capabilities{ + conversion: kind.Conversion, + } + } continue } vkCapabilities[fmt.Sprintf("%s/%s", kind.Kind, version.Name)] = capabilities{ diff --git a/resource/unstructured.go b/resource/unstructured.go new file mode 100644 index 00000000..575e7225 --- /dev/null +++ b/resource/unstructured.go @@ -0,0 +1,143 @@ +package resource + +import ( + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" +) + +var _ Object = &UnstructuredWrapper{} + +// UnstructuredWrapper wraps unstructured.Unstructured so that it implements resource.Object +type UnstructuredWrapper struct { + *unstructured.Unstructured `json:",inline"` +} + +// NewUnstructuredWrapper creates a new UnstructuredWrapper wrapping the provided unstructured.Unstructured resource +func NewUnstructuredWrapper(obj *unstructured.Unstructured) *UnstructuredWrapper { + return &UnstructuredWrapper{obj} +} + +func (u *UnstructuredWrapper) GetSpec() any { + return u.Object["spec"] +} + +func (u *UnstructuredWrapper) SetSpec(a any) error { + u.Object["spec"] = a + return nil +} + +func (u *UnstructuredWrapper) GetSubresources() map[string]any { + subresources := make(map[string]any) + for k, v := range u.Object { + if k == "metadata" || k == "apiVersion" || k == "kind" || k == "spec" { + continue + } + subresources[k] = v + } + return subresources +} + +func (u *UnstructuredWrapper) GetSubresource(s string) (any, bool) { + sr, ok := u.Object[s] + return sr, ok +} + +func (u *UnstructuredWrapper) SetSubresource(key string, val any) error { + u.Object[key] = val + return nil +} + +func (u *UnstructuredWrapper) GetStaticMetadata() StaticMetadata { + return StaticMetadata{ + Name: u.GetName(), + Namespace: u.GetNamespace(), + Group: u.GroupVersionKind().Group, + Version: u.GroupVersionKind().Version, + Kind: u.GroupVersionKind().Kind, + } +} + +func (u *UnstructuredWrapper) SetStaticMetadata(metadata StaticMetadata) { + u.SetName(metadata.Name) + u.SetNamespace(metadata.Namespace) + u.SetGroupVersionKind(schema.GroupVersionKind{ + Group: metadata.Group, + Version: metadata.Version, + Kind: metadata.Kind, + }) +} + +//nolint:revive,staticcheck +func (u *UnstructuredWrapper) GetCommonMetadata() CommonMetadata { + var err error + dt := u.GetDeletionTimestamp() + var deletionTimestamp *time.Time + if dt != nil { + deletionTimestamp = &dt.Time + } + updt := time.Time{} + createdBy := "" + updatedBy := "" + if annotations := u.GetAnnotations(); annotations != nil { + strUpdt, ok := annotations[AnnotationUpdateTimestamp] + if ok { + updt, err = time.Parse(time.RFC3339, strUpdt) + if err != nil { + // HMMMM + } + } + createdBy = annotations[AnnotationCreatedBy] + updatedBy = annotations[AnnotationUpdatedBy] + } + return CommonMetadata{ + UID: string(u.GetUID()), + ResourceVersion: u.GetResourceVersion(), + Generation: u.GetGeneration(), + Labels: u.GetLabels(), + CreationTimestamp: u.GetCreationTimestamp().Time, + DeletionTimestamp: deletionTimestamp, + Finalizers: u.GetFinalizers(), + UpdateTimestamp: updt, + CreatedBy: createdBy, + UpdatedBy: updatedBy, + } +} + +func (u *UnstructuredWrapper) SetCommonMetadata(metadata CommonMetadata) { + u.SetUID(types.UID(metadata.UID)) + u.SetResourceVersion(metadata.ResourceVersion) + u.SetGeneration(metadata.Generation) + u.SetLabels(metadata.Labels) + u.SetCreationTimestamp(metav1.NewTime(metadata.CreationTimestamp)) + if metadata.DeletionTimestamp != nil { + dt := metav1.NewTime(*metadata.DeletionTimestamp) + u.SetDeletionTimestamp(&dt) + } else { + u.SetDeletionTimestamp(nil) + } + u.SetFinalizers(metadata.Finalizers) + annotations := u.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + if !metadata.UpdateTimestamp.IsZero() { + annotations[AnnotationUpdateTimestamp] = metadata.UpdateTimestamp.Format(time.RFC3339) + } + if metadata.CreatedBy != "" { + annotations[AnnotationCreatedBy] = metadata.CreatedBy + } + if metadata.UpdatedBy != "" { + annotations[AnnotationUpdatedBy] = metadata.UpdatedBy + } + u.SetAnnotations(annotations) +} + +func (u *UnstructuredWrapper) Copy() Object { + uns := unstructured.Unstructured{} + u.DeepCopyInto(&uns) + return &UnstructuredWrapper{&uns} +} diff --git a/simple/app.go b/simple/app.go index 2e6b7fc9..db641ebc 100644 --- a/simple/app.go +++ b/simple/app.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strings" + "time" "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/runtime/schema" @@ -102,6 +103,7 @@ type App struct { cfg AppConfig converters map[string]Converter customRoutes map[string]AppCustomRouteHandler + patcher *k8s.DynamicPatcher } // AppConfig is the configuration used by App @@ -112,6 +114,11 @@ type AppConfig struct { ManagedKinds []AppManagedKind UnmanagedKinds []AppUnmanagedKind Converters map[schema.GroupKind]Converter + // DiscoveryRefreshInterval is the interval at which the API discovery cache should be refreshed. + // This is primarily used by the DynamicPatcher in the OpinionatedWatcher/OpinionatedReconciler + // for sending finalizer add/remove patches to the latest version of the kind. + // This defaults to 10 minutes. + DiscoveryRefreshInterval time.Duration } // AppInformerConfig contains configuration for the App's internal operator.InformerController @@ -206,6 +213,15 @@ func NewApp(config AppConfig) (*App, error) { customRoutes: make(map[string]AppCustomRouteHandler), cfg: config, } + discoveryRefresh := config.DiscoveryRefreshInterval + if discoveryRefresh == 0 { + discoveryRefresh = time.Minute * 10 + } + p, err := k8s.NewDynamicPatcher(&config.KubeConfig, discoveryRefresh) + if err != nil { + return nil, err + } + a.patcher = p for _, kind := range config.ManagedKinds { err := a.manageKind(kind) if err != nil { @@ -338,7 +354,7 @@ func (a *App) watchKind(kind AppUnmanagedKind) error { if kind.Reconciler != nil { reconciler := kind.Reconciler if !kind.ReconcileOptions.UsePlain { - op, err := operator.NewOpinionatedReconciler(client, a.getFinalizer(kind.Kind)) + op, err := operator.NewOpinionatedReconciler(&watchPatcher{a.patcher.ForKind(kind.Kind.GroupVersionKind().GroupKind())}, a.getFinalizer(kind.Kind)) if err != nil { return err } @@ -353,7 +369,7 @@ func (a *App) watchKind(kind AppUnmanagedKind) error { if kind.Watcher != nil { watcher := kind.Watcher if !kind.ReconcileOptions.UsePlain { - op, err := operator.NewOpinionatedWatcherWithFinalizer(kind.Kind, client, a.getFinalizer) + op, err := operator.NewOpinionatedWatcherWithFinalizer(kind.Kind, &watchPatcher{a.patcher.ForKind(kind.Kind.GroupVersionKind().GroupKind())}, a.getFinalizer) if err != nil { return err } @@ -479,3 +495,19 @@ type k8sRunnable struct { func (k *k8sRunnable) Run(ctx context.Context) error { return k.runner.Run(ctx.Done()) } + +var _ operator.PatchClient = &watchPatcher{} + +type watchPatcher struct { + patcher *k8s.DynamicKindPatcher +} + +func (w *watchPatcher) PatchInto(ctx context.Context, identifier resource.Identifier, req resource.PatchRequest, options resource.PatchOptions, into resource.Object) error { + obj, err := w.patcher.Patch(ctx, identifier, req, options) + if err != nil { + return err + } + // This is only used to update the finalizers list, so we just need to update metadata + into.SetCommonMetadata(obj.GetCommonMetadata()) + return nil +} diff --git a/simple/operator.go b/simple/operator.go index 867c22be..dcc7cc78 100644 --- a/simple/operator.go +++ b/simple/operator.go @@ -28,6 +28,11 @@ type OperatorConfig struct { // the finalizer name MUST be 63 chars or fewer, and should be unique to the operator FinalizerGenerator func(kind resource.Schema) string InformerCacheResyncInterval time.Duration + // DiscoveryRefreshInterval is the interval at which the API discovery cache should be refreshed. + // This is primarily used by the DynamicPatcher in the OpinionatedWatcher/OpinionatedReconciler + // for sending finalizer add/remove patches to the latest version of the kind. + // This defaults to 10 minutes. + DiscoveryRefreshInterval time.Duration } // WebhookConfig is a configuration for exposed kubernetes webhooks for an Operator @@ -86,6 +91,14 @@ func NewOperator(cfg OperatorConfig) (*Operator, error) { return nil, err } } + discoveryRefresh := cfg.DiscoveryRefreshInterval + if discoveryRefresh == 0 { + discoveryRefresh = time.Minute * 10 + } + patcher, err := k8s.NewDynamicPatcher(&cfg.KubeConfig, discoveryRefresh) + if err != nil { + return nil, err + } informerControllerConfig := operator.DefaultInformerControllerConfig() informerControllerConfig.MetricsConfig.Namespace = cfg.Metrics.Namespace @@ -121,6 +134,7 @@ func NewOperator(cfg OperatorConfig) (*Operator, error) { admission: ws, metricsExporter: me, cacheResyncInterval: cfg.InformerCacheResyncInterval, + patcher: patcher, } op.controller.ErrorHandler = op.ErrorHandler return op, nil @@ -143,6 +157,7 @@ type Operator struct { admission *k8s.WebhookServer metricsExporter *metrics.Exporter cacheResyncInterval time.Duration + patcher *k8s.DynamicPatcher } // SyncWatcher extends operator.ResourceWatcher with a Sync method which can be called by the operator.OpinionatedWatcher @@ -203,7 +218,7 @@ func (o *Operator) WatchKind(kind resource.Kind, watcher SyncWatcher, options op if err != nil { return err } - ow, err := operator.NewOpinionatedWatcherWithFinalizer(kind, client, func(sch resource.Schema) string { + ow, err := operator.NewOpinionatedWatcherWithFinalizer(kind, &watchPatcher{o.patcher.ForKind(kind.GroupVersionKind().GroupKind())}, func(sch resource.Schema) string { if o.FinalizerGenerator != nil { return o.FinalizerGenerator(sch) } @@ -246,7 +261,7 @@ func (o *Operator) ReconcileKind(kind resource.Kind, reconciler operator.Reconci } else if o.Name != "" { finalizer = fmt.Sprintf("%s-%s-finalizer", o.Name, kind.Plural()) } - or, err := operator.NewOpinionatedReconciler(client, finalizer) + or, err := operator.NewOpinionatedReconciler(&watchPatcher{o.patcher.ForKind(kind.GroupVersionKind().GroupKind())}, finalizer) if err != nil { return err }