From ee0a94379edffbef44f68f7ce158200dab038b83 Mon Sep 17 00:00:00 2001 From: Theodor Mihalache Date: Thu, 21 Nov 2024 13:17:26 -0500 Subject: [PATCH 1/4] Add support for db stores in feast go operator Signed-off-by: Theodor Mihalache --- .../api/v1alpha1/featurestore_types.go | 66 +++++- .../api/v1alpha1/zz_generated.deepcopy.go | 75 +++++++ .../crd/bases/feast.dev_featurestores.yaml | 201 ++++++++++++++++- infra/feast-operator/config/rbac/role.yaml | 7 + infra/feast-operator/dist/install.yaml | 208 ++++++++++++++++- .../controller/featurestore_controller.go | 1 + .../featurestore_controller_ephemeral_test.go | 2 +- .../featurestore_controller_pvc_test.go | 2 +- .../featurestore_controller_test.go | 2 +- .../controller/services/repo_config.go | 212 +++++++++++++++--- .../controller/services/repo_config_test.go | 140 ++++++++++-- .../internal/controller/services/services.go | 101 ++++++++- .../controller/services/services_types.go | 38 ++-- .../internal/controller/services/util.go | 201 ++++++++++++++--- .../test/api/featurestore_types_test.go | 82 +++---- 15 files changed, 1175 insertions(+), 163 deletions(-) diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 440518db74..50bf682213 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -75,11 +75,13 @@ type OfflineStore struct { } // OfflineStorePersistence configures the persistence settings for the offline store service +// +kubebuilder:validation:XValidation:rule="[has(self.file), has(self.store)].exists_one(c, c)",message="One selection required between file or store." type OfflineStorePersistence struct { - FilePersistence *OfflineStoreFilePersistence `json:"file,omitempty"` + FilePersistence *OfflineStoreFilePersistence `json:"file,omitempty"` + DBPersistence *OfflineStoreDBStorePersistence `json:"store,omitempty"` } -// OfflineStorePersistence configures the file-based persistence for the offline store service +// OfflineStoreFilePersistence configures the file-based persistence for the offline store service type OfflineStoreFilePersistence struct { // +kubebuilder:validation:Enum=dask;duckdb Type string `json:"type,omitempty"` @@ -91,6 +93,24 @@ var ValidOfflineStoreFilePersistenceTypes = []string{ "duckdb", } +// OfflineStoreDBStorePersistence configures the DB store persistence for the offline store service +type OfflineStoreDBStorePersistence struct { + // +kubebuilder:validation:Enum=snowflake.offline;bigquery;redshift;spark;postgres;feast_trino.trino.TrinoOfflineStore;redis + Type string `json:"type,omitempty"` + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` + SecretKeyName string `json:"secretKeyName,omitempty"` +} + +var ValidOfflineStoreDBStorePersistenceTypes = []string{ + "snowflake.offline", + "bigquery", + "redshift", + "spark", + "postgres", + "feast_trino.trino.TrinoOfflineStore", + "redis", +} + // OnlineStore configures the deployed online store service type OnlineStore struct { ServiceConfigs `json:",inline"` @@ -98,8 +118,10 @@ type OnlineStore struct { } // OnlineStorePersistence configures the persistence settings for the online store service +// +kubebuilder:validation:XValidation:rule="[has(self.file), has(self.store)].exists_one(c, c)",message="One selection required between file or store." type OnlineStorePersistence struct { - FilePersistence *OnlineStoreFilePersistence `json:"file,omitempty"` + FilePersistence *OnlineStoreFilePersistence `json:"file,omitempty"` + DBPersistence *OnlineStoreDBStorePersistence `json:"store,omitempty"` } // OnlineStoreFilePersistence configures the file-based persistence for the offline store service @@ -111,6 +133,28 @@ type OnlineStoreFilePersistence struct { PvcConfig *PvcConfig `json:"pvc,omitempty"` } +// OnlineStoreDBStorePersistence configures the DB store persistence for the offline store service +type OnlineStoreDBStorePersistence struct { + // +kubebuilder:validation:Enum=snowflake.online;redis;ikv;datastore;dynamodb;bigtable;postgres;cassandra;mysql;hazelcast;singlestore + Type string `json:"type,omitempty"` + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` + SecretKeyName string `json:"secretKeyName,omitempty"` +} + +var ValidOnlineStoreDBStorePersistenceTypes = []string{ + "snowflake.online", + "redis", + "ikv", + "datastore", + "dynamodb", + "bigtable", + "postgres", + "cassandra", + "mysql", + "hazelcast", + "singlestore", +} + // LocalRegistryConfig configures the deployed registry service type LocalRegistryConfig struct { ServiceConfigs `json:",inline"` @@ -119,7 +163,8 @@ type LocalRegistryConfig struct { // RegistryPersistence configures the persistence settings for the registry service type RegistryPersistence struct { - FilePersistence *RegistryFilePersistence `json:"file,omitempty"` + FilePersistence *RegistryFilePersistence `json:"file,omitempty"` + DBPersistence *RegistryDBStorePersistence `json:"store,omitempty"` } // RegistryFilePersistence configures the file-based persistence for the registry service @@ -133,6 +178,19 @@ type RegistryFilePersistence struct { S3AdditionalKwargs *map[string]string `json:"s3_additional_kwargs,omitempty"` } +// RegistryDBStorePersistence configures the DB store persistence for the registry service +type RegistryDBStorePersistence struct { + // +kubebuilder:validation:Enum=sql;snowflake.registry + Type string `json:"type,omitempty"` + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` + SecretKeyName string `json:"secretKeyName,omitempty"` +} + +var ValidRegistryDBStorePersistenceTypes = []string{ + "snowflake.registry", + "sql", +} + // PvcConfig defines the settings for a persistent file store based on PVCs. // We can refer to an existing PVC using the `Ref` field, or create a new one using the `Create` field. // +kubebuilder:validation:XValidation:rule="[has(self.ref), has(self.create)].exists_one(c, c)",message="One selection is required between ref and create." diff --git a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go index c020af1121..196b214700 100644 --- a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -236,6 +236,26 @@ func (in *OfflineStore) DeepCopy() *OfflineStore { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OfflineStoreDBStorePersistence) DeepCopyInto(out *OfflineStoreDBStorePersistence) { + *out = *in + if in.SecretRef != nil { + in, out := &in.SecretRef, &out.SecretRef + *out = new(v1.LocalObjectReference) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OfflineStoreDBStorePersistence. +func (in *OfflineStoreDBStorePersistence) DeepCopy() *OfflineStoreDBStorePersistence { + if in == nil { + return nil + } + out := new(OfflineStoreDBStorePersistence) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OfflineStoreFilePersistence) DeepCopyInto(out *OfflineStoreFilePersistence) { *out = *in @@ -264,6 +284,11 @@ func (in *OfflineStorePersistence) DeepCopyInto(out *OfflineStorePersistence) { *out = new(OfflineStoreFilePersistence) (*in).DeepCopyInto(*out) } + if in.DBPersistence != nil { + in, out := &in.DBPersistence, &out.DBPersistence + *out = new(OfflineStoreDBStorePersistence) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OfflineStorePersistence. @@ -297,6 +322,26 @@ func (in *OnlineStore) DeepCopy() *OnlineStore { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OnlineStoreDBStorePersistence) DeepCopyInto(out *OnlineStoreDBStorePersistence) { + *out = *in + if in.SecretRef != nil { + in, out := &in.SecretRef, &out.SecretRef + *out = new(v1.LocalObjectReference) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OnlineStoreDBStorePersistence. +func (in *OnlineStoreDBStorePersistence) DeepCopy() *OnlineStoreDBStorePersistence { + if in == nil { + return nil + } + out := new(OnlineStoreDBStorePersistence) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OnlineStoreFilePersistence) DeepCopyInto(out *OnlineStoreFilePersistence) { *out = *in @@ -325,6 +370,11 @@ func (in *OnlineStorePersistence) DeepCopyInto(out *OnlineStorePersistence) { *out = new(OnlineStoreFilePersistence) (*in).DeepCopyInto(*out) } + if in.DBPersistence != nil { + in, out := &in.DBPersistence, &out.DBPersistence + *out = new(OnlineStoreDBStorePersistence) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OnlineStorePersistence. @@ -444,6 +494,26 @@ func (in *Registry) DeepCopy() *Registry { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RegistryDBStorePersistence) DeepCopyInto(out *RegistryDBStorePersistence) { + *out = *in + if in.SecretRef != nil { + in, out := &in.SecretRef, &out.SecretRef + *out = new(v1.LocalObjectReference) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryDBStorePersistence. +func (in *RegistryDBStorePersistence) DeepCopy() *RegistryDBStorePersistence { + if in == nil { + return nil + } + out := new(RegistryDBStorePersistence) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RegistryFilePersistence) DeepCopyInto(out *RegistryFilePersistence) { *out = *in @@ -483,6 +553,11 @@ func (in *RegistryPersistence) DeepCopyInto(out *RegistryPersistence) { *out = new(RegistryFilePersistence) (*in).DeepCopyInto(*out) } + if in.DBPersistence != nil { + in, out := &in.DBPersistence, &out.DBPersistence + *out = new(RegistryDBStorePersistence) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryPersistence. diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index 99d9104b95..f1c7fca8f5 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -186,8 +186,8 @@ spec: settings for the offline store service properties: file: - description: OfflineStorePersistence configures the file-based - persistence for the offline store service + description: OfflineStoreFilePersistence configures the + file-based persistence for the offline store service properties: pvc: description: |- @@ -270,7 +270,40 @@ spec: - duckdb type: string type: object + store: + description: OfflineStoreDBStorePersistence configures + the DB store persistence for the offline store service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - snowflake.offline + - bigquery + - redshift + - spark + - postgres + - feast_trino.trino.TrinoOfflineStore + - redis + type: string + type: object type: object + x-kubernetes-validations: + - message: One selection required between file or store. + rule: '[has(self.file), has(self.store)].exists_one(c, c)' resources: description: ResourceRequirements describes the compute resource requirements. @@ -548,7 +581,44 @@ spec: - message: Online store does not support S3 or GS buckets. rule: has(self.path) && !self.path.startsWith('s3://') && !self.path.startsWith('gs://') + store: + description: OnlineStoreDBStorePersistence configures + the DB store persistence for the offline store service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - snowflake.online + - redis + - ikv + - datastore + - dynamodb + - bigtable + - postgres + - cassandra + - mysql + - hazelcast + - singlestore + type: string + type: object type: object + x-kubernetes-validations: + - message: One selection required between file or store. + rule: '[has(self.file), has(self.store)].exists_one(c, c)' resources: description: ResourceRequirements describes the compute resource requirements. @@ -843,6 +913,31 @@ spec: for S3 object store URIs. rule: '(has(self.s3_additional_kwargs) && has(self.path)) ? self.path.startsWith(''s3://'') : true' + store: + description: RegistryDBStorePersistence configures + the DB store persistence for the registry service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - sql + - snowflake.registry + type: string + type: object type: object resources: description: ResourceRequirements describes the compute @@ -1083,8 +1178,9 @@ spec: settings for the offline store service properties: file: - description: OfflineStorePersistence configures the - file-based persistence for the offline store service + description: OfflineStoreFilePersistence configures + the file-based persistence for the offline store + service properties: pvc: description: |- @@ -1167,7 +1263,41 @@ spec: - duckdb type: string type: object + store: + description: OfflineStoreDBStorePersistence configures + the DB store persistence for the offline store service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - snowflake.offline + - bigquery + - redshift + - spark + - postgres + - feast_trino.trino.TrinoOfflineStore + - redis + type: string + type: object type: object + x-kubernetes-validations: + - message: One selection required between file or store. + rule: '[has(self.file), has(self.store)].exists_one(c, + c)' resources: description: ResourceRequirements describes the compute resource requirements. @@ -1449,7 +1579,45 @@ spec: buckets. rule: has(self.path) && !self.path.startsWith('s3://') && !self.path.startsWith('gs://') + store: + description: OnlineStoreDBStorePersistence configures + the DB store persistence for the offline store service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - snowflake.online + - redis + - ikv + - datastore + - dynamodb + - bigtable + - postgres + - cassandra + - mysql + - hazelcast + - singlestore + type: string + type: object type: object + x-kubernetes-validations: + - message: One selection required between file or store. + rule: '[has(self.file), has(self.store)].exists_one(c, + c)' resources: description: ResourceRequirements describes the compute resource requirements. @@ -1751,6 +1919,31 @@ spec: only for S3 object store URIs. rule: '(has(self.s3_additional_kwargs) && has(self.path)) ? self.path.startsWith(''s3://'') : true' + store: + description: RegistryDBStorePersistence configures + the DB store persistence for the registry service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - sql + - snowflake.registry + type: string + type: object type: object resources: description: ResourceRequirements describes the compute diff --git a/infra/feast-operator/config/rbac/role.yaml b/infra/feast-operator/config/rbac/role.yaml index 6ca2085990..a4b0acfc1c 100644 --- a/infra/feast-operator/config/rbac/role.yaml +++ b/infra/feast-operator/config/rbac/role.yaml @@ -29,6 +29,13 @@ rules: - list - update - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list - apiGroups: - feast.dev resources: diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index f23dc8b208..83181f53b0 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -194,8 +194,8 @@ spec: settings for the offline store service properties: file: - description: OfflineStorePersistence configures the file-based - persistence for the offline store service + description: OfflineStoreFilePersistence configures the + file-based persistence for the offline store service properties: pvc: description: |- @@ -278,7 +278,40 @@ spec: - duckdb type: string type: object + store: + description: OfflineStoreDBStorePersistence configures + the DB store persistence for the offline store service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - snowflake.offline + - bigquery + - redshift + - spark + - postgres + - feast_trino.trino.TrinoOfflineStore + - redis + type: string + type: object type: object + x-kubernetes-validations: + - message: One selection required between file or store. + rule: '[has(self.file), has(self.store)].exists_one(c, c)' resources: description: ResourceRequirements describes the compute resource requirements. @@ -556,7 +589,44 @@ spec: - message: Online store does not support S3 or GS buckets. rule: has(self.path) && !self.path.startsWith('s3://') && !self.path.startsWith('gs://') + store: + description: OnlineStoreDBStorePersistence configures + the DB store persistence for the offline store service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - snowflake.online + - redis + - ikv + - datastore + - dynamodb + - bigtable + - postgres + - cassandra + - mysql + - hazelcast + - singlestore + type: string + type: object type: object + x-kubernetes-validations: + - message: One selection required between file or store. + rule: '[has(self.file), has(self.store)].exists_one(c, c)' resources: description: ResourceRequirements describes the compute resource requirements. @@ -851,6 +921,31 @@ spec: for S3 object store URIs. rule: '(has(self.s3_additional_kwargs) && has(self.path)) ? self.path.startsWith(''s3://'') : true' + store: + description: RegistryDBStorePersistence configures + the DB store persistence for the registry service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - sql + - snowflake.registry + type: string + type: object type: object resources: description: ResourceRequirements describes the compute @@ -1091,8 +1186,9 @@ spec: settings for the offline store service properties: file: - description: OfflineStorePersistence configures the - file-based persistence for the offline store service + description: OfflineStoreFilePersistence configures + the file-based persistence for the offline store + service properties: pvc: description: |- @@ -1175,7 +1271,41 @@ spec: - duckdb type: string type: object + store: + description: OfflineStoreDBStorePersistence configures + the DB store persistence for the offline store service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - snowflake.offline + - bigquery + - redshift + - spark + - postgres + - feast_trino.trino.TrinoOfflineStore + - redis + type: string + type: object type: object + x-kubernetes-validations: + - message: One selection required between file or store. + rule: '[has(self.file), has(self.store)].exists_one(c, + c)' resources: description: ResourceRequirements describes the compute resource requirements. @@ -1457,7 +1587,45 @@ spec: buckets. rule: has(self.path) && !self.path.startsWith('s3://') && !self.path.startsWith('gs://') + store: + description: OnlineStoreDBStorePersistence configures + the DB store persistence for the offline store service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - snowflake.online + - redis + - ikv + - datastore + - dynamodb + - bigtable + - postgres + - cassandra + - mysql + - hazelcast + - singlestore + type: string + type: object type: object + x-kubernetes-validations: + - message: One selection required between file or store. + rule: '[has(self.file), has(self.store)].exists_one(c, + c)' resources: description: ResourceRequirements describes the compute resource requirements. @@ -1759,6 +1927,31 @@ spec: only for S3 object store URIs. rule: '(has(self.s3_additional_kwargs) && has(self.path)) ? self.path.startsWith(''s3://'') : true' + store: + description: RegistryDBStorePersistence configures + the DB store persistence for the registry service + properties: + secretKeyName: + type: string + secretRef: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + type: + enum: + - sql + - snowflake.registry + type: string + type: object type: object resources: description: ResourceRequirements describes the compute @@ -2080,6 +2273,13 @@ rules: - list - update - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list - apiGroups: - feast.dev resources: diff --git a/infra/feast-operator/internal/controller/featurestore_controller.go b/infra/feast-operator/internal/controller/featurestore_controller.go index 7c78b79d59..278ea4a78f 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller.go +++ b/infra/feast-operator/internal/controller/featurestore_controller.go @@ -54,6 +54,7 @@ type FeatureStoreReconciler struct { //+kubebuilder:rbac:groups=feast.dev,resources=featurestores/finalizers,verbs=update //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;create;update;watch;delete //+kubebuilder:rbac:groups=core,resources=services;configmaps;persistentvolumeclaims;serviceaccounts,verbs=get;list;create;update;watch;delete +//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. diff --git a/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go b/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go index bd597f987c..913f022022 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go @@ -319,7 +319,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { Provider: services.LocalProviderType, EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, OfflineStore: services.OfflineStoreConfig{ - Type: services.OfflineDuckDbConfigType, + Type: services.OfflineFilePersistenceDuckDbConfigType, }, Registry: regRemote, } diff --git a/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go b/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go index 33fdce38fe..f124db55a6 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go @@ -490,7 +490,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { Provider: services.LocalProviderType, EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, OfflineStore: services.OfflineStoreConfig{ - Type: services.OfflineDuckDbConfigType, + Type: services.OfflineFilePersistenceDuckDbConfigType, }, Registry: regRemote, } diff --git a/infra/feast-operator/internal/controller/featurestore_controller_test.go b/infra/feast-operator/internal/controller/featurestore_controller_test.go index 1e9bbc2f52..00a6e71c71 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_test.go @@ -646,7 +646,7 @@ var _ = Describe("FeatureStore Controller", func() { Provider: services.LocalProviderType, EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, OfflineStore: services.OfflineStoreConfig{ - Type: services.OfflineDaskConfigType, + Type: services.OfflineFilePersistenceDaskConfigType, }, Registry: regRemote, } diff --git a/infra/feast-operator/internal/controller/services/repo_config.go b/infra/feast-operator/internal/controller/services/repo_config.go index 34b62ac30a..899a9157d9 100644 --- a/infra/feast-operator/internal/controller/services/repo_config.go +++ b/infra/feast-operator/internal/controller/services/repo_config.go @@ -18,6 +18,7 @@ package services import ( "encoding/base64" + "fmt" "path" "strings" @@ -44,64 +45,41 @@ func (feast *FeastServices) getServiceFeatureStoreYaml(feastType FeastServiceTyp } func (feast *FeastServices) getServiceRepoConfig(feastType FeastServiceType) (RepoConfig, error) { - return getServiceRepoConfig(feastType, feast.FeatureStore) + return getServiceRepoConfig(feastType, feast.FeatureStore, feast.extractConfigFromSecret) } -func getServiceRepoConfig(feastType FeastServiceType, featureStore *feastdevv1alpha1.FeatureStore) (RepoConfig, error) { +func getServiceRepoConfig(feastType FeastServiceType, featureStore *feastdevv1alpha1.FeatureStore, secretExtractionFunc func(secretRef string, secretKeyName string) (map[string]interface{}, error)) (RepoConfig, error) { appliedSpec := featureStore.Status.Applied repoConfig := getClientRepoConfig(featureStore) - isLocalRegistry := isLocalRegistry(featureStore) + isLocalReg := isLocalRegistry(featureStore) if appliedSpec.Services != nil { services := appliedSpec.Services + switch feastType { case OfflineFeastType: // Offline server has an `offline_store` section and a remote `registry` if services.OfflineStore != nil { - fileType := string(OfflineDaskConfigType) - if services.OfflineStore.Persistence != nil && - services.OfflineStore.Persistence.FilePersistence != nil && - len(services.OfflineStore.Persistence.FilePersistence.Type) > 0 { - fileType = services.OfflineStore.Persistence.FilePersistence.Type - } - - repoConfig.OfflineStore = OfflineStoreConfig{ - Type: OfflineConfigType(fileType), + err := setRepoConfigOffline(services, secretExtractionFunc, &repoConfig) + if err != nil { + return repoConfig, err } - repoConfig.OnlineStore = OnlineStoreConfig{} } case OnlineFeastType: // Online server has an `online_store` section, a remote `registry` and a remote `offline_store` if services.OnlineStore != nil { - path := DefaultOnlineStoreEphemeralPath - if services.OnlineStore.Persistence != nil && services.OnlineStore.Persistence.FilePersistence != nil { - filePersistence := services.OnlineStore.Persistence.FilePersistence - path = getActualPath(filePersistence.Path, filePersistence.PvcConfig) - } - - repoConfig.OnlineStore = OnlineStoreConfig{ - Type: OnlineSqliteConfigType, - Path: path, + err := setRepoConfigOnline(services, secretExtractionFunc, &repoConfig) + if err != nil { + return repoConfig, err } } case RegistryFeastType: // Registry server only has a `registry` section - if isLocalRegistry { - path := DefaultRegistryEphemeralPath - var s3AdditionalKwargs *map[string]string - if services != nil && services.Registry != nil && services.Registry.Local != nil && - services.Registry.Local.Persistence != nil && services.Registry.Local.Persistence.FilePersistence != nil { - filePersistence := services.Registry.Local.Persistence.FilePersistence - path = getActualPath(filePersistence.Path, filePersistence.PvcConfig) - s3AdditionalKwargs = filePersistence.S3AdditionalKwargs + if isLocalReg { + err := setRepoConfigRegistry(services, secretExtractionFunc, &repoConfig) + if err != nil { + return repoConfig, err } - repoConfig.Registry = RegistryConfig{ - RegistryType: RegistryFileConfigType, - Path: path, - S3AdditionalKwargs: s3AdditionalKwargs, - } - repoConfig.OfflineStore = OfflineStoreConfig{} - repoConfig.OnlineStore = OnlineStoreConfig{} } } } @@ -109,6 +87,121 @@ func getServiceRepoConfig(feastType FeastServiceType, featureStore *feastdevv1al return repoConfig, nil } +func setRepoConfigRegistry(services *feastdevv1alpha1.FeatureStoreServices, secretExtractionFunc func(secretRef string, secretKeyName string) (map[string]interface{}, error), repoConfig *RepoConfig) error { + repoConfig.Registry = RegistryConfig{} + repoConfig.Registry.Path = DefaultRegistryEphemeralPath + registryPersistence := services.Registry.Local.Persistence + + if registryPersistence != nil { + filePersistence := registryPersistence.FilePersistence + dbPersistence := registryPersistence.DBPersistence + + if filePersistence != nil { + repoConfig.Registry.RegistryType = RegistryFileConfigType + repoConfig.Registry.Path = getActualPath(filePersistence.Path, filePersistence.PvcConfig) + repoConfig.Registry.S3AdditionalKwargs = filePersistence.S3AdditionalKwargs + } else if dbPersistence != nil && len(dbPersistence.Type) > 0 { + repoConfig.Registry.Path = "" + repoConfig.Registry.RegistryType = RegistryConfigType(dbPersistence.Type) + secretKeyName := dbPersistence.SecretKeyName + if len(secretKeyName) == 0 { + secretKeyName = string(repoConfig.Registry.RegistryType) + } + parametersMap, err := secretExtractionFunc(dbPersistence.SecretRef.Name, secretKeyName) + if err != nil { + return err + } + + err = mergeStructWithDBParametersMap(¶metersMap, &repoConfig.Registry) + if err != nil { + return err + } + + repoConfig.Registry.DBParameters = parametersMap + } + } + + repoConfig.OfflineStore = OfflineStoreConfig{} + repoConfig.OnlineStore = OnlineStoreConfig{} + + return nil +} + +func setRepoConfigOnline(services *feastdevv1alpha1.FeatureStoreServices, secretExtractionFunc func(secretRef string, secretKeyName string) (map[string]interface{}, error), repoConfig *RepoConfig) error { + repoConfig.OnlineStore = OnlineStoreConfig{} + + repoConfig.OnlineStore.Path = DefaultOnlineStoreEphemeralPath + repoConfig.OnlineStore.Type = OnlineSqliteConfigType + onlineStorePersistence := services.OnlineStore.Persistence + + if onlineStorePersistence != nil { + filePersistence := onlineStorePersistence.FilePersistence + dbPersistence := onlineStorePersistence.DBPersistence + + if filePersistence != nil { + repoConfig.OnlineStore.Path = getActualPath(filePersistence.Path, filePersistence.PvcConfig) + } else if dbPersistence != nil && len(dbPersistence.Type) > 0 { + repoConfig.OnlineStore.Path = "" + repoConfig.OnlineStore.Type = OnlineConfigType(dbPersistence.Type) + secretKeyName := dbPersistence.SecretKeyName + if len(secretKeyName) == 0 { + secretKeyName = string(repoConfig.OnlineStore.Type) + } + + parametersMap, err := secretExtractionFunc(dbPersistence.SecretRef.Name, secretKeyName) + if err != nil { + return err + } + + err = mergeStructWithDBParametersMap(¶metersMap, &repoConfig.OnlineStore) + if err != nil { + return err + } + + repoConfig.OnlineStore.DBParameters = parametersMap + } + } + + return nil +} + +func setRepoConfigOffline(services *feastdevv1alpha1.FeatureStoreServices, secretExtractionFunc func(secretRef string, secretKeyName string) (map[string]interface{}, error), repoConfig *RepoConfig) error { + repoConfig.OfflineStore = OfflineStoreConfig{} + repoConfig.OfflineStore.Type = OfflineFilePersistenceDaskConfigType + offlineStorePersistence := services.OfflineStore.Persistence + + if offlineStorePersistence != nil { + dbPersistence := offlineStorePersistence.DBPersistence + filePersistence := offlineStorePersistence.FilePersistence + + if filePersistence != nil && len(filePersistence.Type) > 0 { + repoConfig.OfflineStore.Type = OfflineConfigType(filePersistence.Type) + } else if offlineStorePersistence.DBPersistence != nil && len(dbPersistence.Type) > 0 { + repoConfig.OfflineStore.Type = OfflineConfigType(dbPersistence.Type) + secretKeyName := dbPersistence.SecretKeyName + if len(secretKeyName) == 0 { + secretKeyName = string(repoConfig.OfflineStore.Type) + } + + parametersMap, err := secretExtractionFunc(dbPersistence.SecretRef.Name, secretKeyName) + if err != nil { + return err + } + + err = mergeStructWithDBParametersMap(¶metersMap, &repoConfig.OfflineStore) + if err != nil { + return err + } + + repoConfig.OfflineStore.DBParameters = parametersMap + } + } + + repoConfig.OnlineStore = OnlineStoreConfig{} + + return nil +} + func (feast *FeastServices) getClientFeatureStoreYaml() ([]byte, error) { return yaml.Marshal(getClientRepoConfig(feast.FeatureStore)) } @@ -148,3 +241,48 @@ func getActualPath(filePath string, pvcConfig *feastdevv1alpha1.PvcConfig) strin } return path.Join(pvcConfig.MountPath, filePath) } + +func (feast *FeastServices) extractConfigFromSecret(secretRef string, secretKeyName string) (map[string]interface{}, error) { + secret, err := feast.getSecret(secretRef) + if err != nil { + return nil, err + } + parameters := map[string]interface{}{} + + val, exists := secret.Data[secretKeyName] + if !exists { + return nil, fmt.Errorf("secret key %s doesn't exist in secret %s", secretKeyName, secretRef) + } + + err = yaml.Unmarshal(val, ¶meters) + if err != nil { + return nil, fmt.Errorf("secret %s contains invalid value", secretKeyName) + } + + _, exists = parameters["type"] + if exists { + return nil, fmt.Errorf("secret key %s in secret %s contains invalid tag named type", secretKeyName, secretRef) + } + + _, exists = parameters["registry_type"] + if exists { + return nil, fmt.Errorf("secret key %s in secret %s contains invalid tag named registry_type", secretKeyName, secretRef) + } + + return parameters, nil +} + +func mergeStructWithDBParametersMap(parametersMap *map[string]interface{}, s interface{}) error { + for key, val := range *parametersMap { + hasAttribute, err := hasAttrib(s, key, val) + if err != nil { + return err + } + + if hasAttribute { + delete(*parametersMap, key) + } + } + + return nil +} diff --git a/infra/feast-operator/internal/controller/services/repo_config_test.go b/infra/feast-operator/internal/controller/services/repo_config_test.go index 1868bf437b..eaeeb4ea09 100644 --- a/infra/feast-operator/internal/controller/services/repo_config_test.go +++ b/infra/feast-operator/internal/controller/services/repo_config_test.go @@ -17,8 +17,12 @@ limitations under the License. package services import ( + "fmt" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "gopkg.in/yaml.v3" + corev1 "k8s.io/api/core/v1" feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" ) @@ -33,19 +37,19 @@ var _ = Describe("Repo Config", func() { featureStore := minimalFeatureStore() ApplyDefaultsToStatus(featureStore) var repoConfig RepoConfig - repoConfig, err := getServiceRepoConfig(OfflineFeastType, featureStore) + repoConfig, err := getServiceRepoConfig(OfflineFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) - repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) - repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) @@ -69,19 +73,19 @@ var _ = Describe("Repo Config", func() { }, } ApplyDefaultsToStatus(featureStore) - repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) - repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) - repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) @@ -104,25 +108,25 @@ var _ = Describe("Repo Config", func() { }, } ApplyDefaultsToStatus(featureStore) - repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) - repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) - repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) - By("Having the all the services") + By("Having the all the file services") featureStore = minimalFeatureStore() featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ OfflineStore: &feastdevv1alpha1.OfflineStore{ @@ -150,7 +154,7 @@ var _ = Describe("Repo Config", func() { }, } ApplyDefaultsToStatus(featureStore) - repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) expectedOfflineConfig := OfflineStoreConfig{ Type: "duckdb", @@ -159,7 +163,7 @@ var _ = Describe("Repo Config", func() { Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) - repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) expectedOnlineConfig := OnlineStoreConfig{ @@ -169,7 +173,7 @@ var _ = Describe("Repo Config", func() { Expect(repoConfig.OnlineStore).To(Equal(expectedOnlineConfig)) Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) - repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore) + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore, emptyMockExtractConfigFromSecret) Expect(err).NotTo(HaveOccurred()) Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) @@ -178,6 +182,82 @@ var _ = Describe("Repo Config", func() { Path: "/data/registry.db", } Expect(repoConfig.Registry).To(Equal(expectedRegistryConfig)) + + By("Having the all the db services") + featureStore = minimalFeatureStore() + featureStore.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + OfflineStore: &feastdevv1alpha1.OfflineStore{ + Persistence: &feastdevv1alpha1.OfflineStorePersistence{ + DBPersistence: &feastdevv1alpha1.OfflineStoreDBStorePersistence{ + Type: string(OfflineDBPersistenceSnowflakeConfigType), + SecretRef: &corev1.LocalObjectReference{ + Name: "offline-test-secret", + }, + }, + }, + }, + OnlineStore: &feastdevv1alpha1.OnlineStore{ + Persistence: &feastdevv1alpha1.OnlineStorePersistence{ + DBPersistence: &feastdevv1alpha1.OnlineStoreDBStorePersistence{ + Type: string(OnlineDBPersistenceSnowflakeConfigType), + SecretRef: &corev1.LocalObjectReference{ + Name: "online-test-secret", + }, + }, + }, + }, + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + DBPersistence: &feastdevv1alpha1.RegistryDBStorePersistence{ + Type: string(RegistryDBPersistenceSnowflakeConfigType), + SecretRef: &corev1.LocalObjectReference{ + Name: "registry-test-secret", + }, + }, + }, + }, + }, + } + parameterMap := createParameterMap() + ApplyDefaultsToStatus(featureStore) + featureStore.Spec.Services.OfflineStore.Persistence.FilePersistence = nil + featureStore.Spec.Services.OnlineStore.Persistence.FilePersistence = nil + featureStore.Spec.Services.Registry.Local.Persistence.FilePersistence = nil + repoConfig, err = getServiceRepoConfig(OfflineFeastType, featureStore, mockExtractConfigFromSecret) + Expect(err).NotTo(HaveOccurred()) + newMap := CopyMap(parameterMap) + port := parameterMap["port"].(int) + delete(newMap, "port") + expectedOfflineConfig = OfflineStoreConfig{ + Type: OfflineDBPersistenceSnowflakeConfigType, + Port: port, + DBParameters: newMap, + } + Expect(repoConfig.OfflineStore).To(Equal(expectedOfflineConfig)) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(OnlineFeastType, featureStore, mockExtractConfigFromSecret) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + newMap = CopyMap(parameterMap) + expectedOnlineConfig = OnlineStoreConfig{ + Type: OnlineDBPersistenceSnowflakeConfigType, + DBParameters: newMap, + } + Expect(repoConfig.OnlineStore).To(Equal(expectedOnlineConfig)) + Expect(repoConfig.Registry).To(Equal(emptyRegistryConfig())) + + repoConfig, err = getServiceRepoConfig(RegistryFeastType, featureStore, mockExtractConfigFromSecret) + Expect(err).NotTo(HaveOccurred()) + Expect(repoConfig.OfflineStore).To(Equal(emptyOfflineStoreConfig())) + Expect(repoConfig.OnlineStore).To(Equal(emptyOnlineStoreConfig())) + expectedRegistryConfig = RegistryConfig{ + RegistryType: RegistryDBPersistenceSnowflakeConfigType, + DBParameters: parameterMap, + } + Expect(repoConfig.Registry).To(Equal(expectedRegistryConfig)) }) }) }) @@ -201,3 +281,37 @@ func minimalFeatureStore() *feastdevv1alpha1.FeatureStore { }, } } + +func emptyMockExtractConfigFromSecret(secretRef string, secretKeyName string) (map[string]interface{}, error) { + return map[string]interface{}{}, nil +} + +func mockExtractConfigFromSecret(secretRef string, secretKeyName string) (map[string]interface{}, error) { + return createParameterMap(), nil +} + +func createParameterMap() map[string]interface{} { + yamlString := ` +hosts: + - 192.168.1.1 + - 192.168.1.2 + - 192.168.1.3 +keyspace: KeyspaceName +port: 9042 +username: user +password: secret +protocol_version: 5 +load_balancing: + local_dc: datacenter1 + load_balancing_policy: TokenAwarePolicy(DCAwareRoundRobinPolicy) +read_concurrency: 100 +write_concurrency: 100 +` + var parameters map[string]interface{} + + err := yaml.Unmarshal([]byte(yamlString), ¶meters) + if err != nil { + fmt.Println(err) + } + return parameters +} diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index 3a079b5f49..4667c8158a 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -43,14 +43,14 @@ func (feast *FeastServices) Deploy() error { services := feast.FeatureStore.Status.Applied.Services if services != nil { if services.OfflineStore != nil { - if services.OfflineStore.Persistence != nil && - services.OfflineStore.Persistence.FilePersistence != nil && - len(services.OfflineStore.Persistence.FilePersistence.Type) > 0 { - if err := checkOfflineStoreFilePersistenceType(services.OfflineStore.Persistence.FilePersistence.Type); err != nil { - return err - } + offlinePersistence := services.OfflineStore.Persistence + + err := feast.validateOfflineStorePersistence(offlinePersistence) + if err != nil { + return err } - if err := feast.deployFeastServiceByType(OfflineFeastType); err != nil { + + if err = feast.deployFeastServiceByType(OfflineFeastType); err != nil { return err } } else { @@ -60,7 +60,14 @@ func (feast *FeastServices) Deploy() error { } if services.OnlineStore != nil { - if err := feast.deployFeastServiceByType(OnlineFeastType); err != nil { + onlinePersistence := services.OnlineStore.Persistence + + err := feast.validateOnlineStorePersistence(onlinePersistence) + if err != nil { + return err + } + + if err = feast.deployFeastServiceByType(OnlineFeastType); err != nil { return err } } else { @@ -70,7 +77,14 @@ func (feast *FeastServices) Deploy() error { } if feast.isLocalRegistry() { - if err := feast.deployFeastServiceByType(RegistryFeastType); err != nil { + registryPersistence := services.Registry.Local.Persistence + + err := feast.validateRegistryPersistence(registryPersistence) + if err != nil { + return err + } + + if err = feast.deployFeastServiceByType(RegistryFeastType); err != nil { return err } } else { @@ -87,6 +101,75 @@ func (feast *FeastServices) Deploy() error { return nil } +func (feast *FeastServices) validateRegistryPersistence(registryPersistence *feastdevv1alpha1.RegistryPersistence) error { + if registryPersistence != nil { + dbPersistence := registryPersistence.DBPersistence + + if dbPersistence != nil && len(dbPersistence.Type) > 0 { + if err := checkRegistryDBStorePersistenceType(dbPersistence.Type); err != nil { + return err + } + + if dbPersistence.SecretRef != nil { + secretRef := dbPersistence.SecretRef.Name + if _, err := feast.getSecret(secretRef); err != nil { + return err + } + } + } + } + + return nil +} + +func (feast *FeastServices) validateOnlineStorePersistence(onlinePersistence *feastdevv1alpha1.OnlineStorePersistence) error { + if onlinePersistence != nil { + dbPersistence := onlinePersistence.DBPersistence + + if dbPersistence != nil && len(dbPersistence.Type) > 0 { + if err := checkOnlineStoreDBStorePersistenceType(dbPersistence.Type); err != nil { + return err + } + + if dbPersistence.SecretRef != nil { + secretRef := dbPersistence.SecretRef.Name + if _, err := feast.getSecret(secretRef); err != nil { + return err + } + } + } + } + + return nil +} + +func (feast *FeastServices) validateOfflineStorePersistence(offlinePersistence *feastdevv1alpha1.OfflineStorePersistence) error { + if offlinePersistence != nil { + filePersistence := offlinePersistence.FilePersistence + dbPersistence := offlinePersistence.DBPersistence + + if filePersistence != nil && len(filePersistence.Type) > 0 { + if err := checkOfflineStoreFilePersistenceType(filePersistence.Type); err != nil { + return err + } + } else if dbPersistence != nil && + len(dbPersistence.Type) > 0 { + if err := checkOfflineStoreDBStorePersistenceType(dbPersistence.Type); err != nil { + return err + } + + if dbPersistence.SecretRef != nil { + secretRef := dbPersistence.SecretRef.Name + if _, err := feast.getSecret(secretRef); err != nil { + return err + } + } + } + } + + return nil +} + func (feast *FeastServices) deployFeastServiceByType(feastType FeastServiceType) error { if pvcCreate, shouldCreate := shouldCreatePvc(feast.FeatureStore, feastType); shouldCreate { if err := feast.createPVC(pvcCreate, feastType); err != nil { diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index f67f8c0e46..03a99e2160 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -46,15 +46,20 @@ const ( RegistryFeastType FeastServiceType = "registry" ClientFeastType FeastServiceType = "client" - OfflineRemoteConfigType OfflineConfigType = "remote" - OfflineDaskConfigType OfflineConfigType = "dask" - OfflineDuckDbConfigType OfflineConfigType = "duckdb" + OfflineRemoteConfigType OfflineConfigType = "remote" + OfflineFilePersistenceDaskConfigType OfflineConfigType = "dask" + OfflineFilePersistenceDuckDbConfigType OfflineConfigType = "duckdb" + OfflineDBPersistenceSnowflakeConfigType OfflineConfigType = "snowflake.offline" - OnlineRemoteConfigType OnlineConfigType = "remote" - OnlineSqliteConfigType OnlineConfigType = "sqlite" + OnlineRemoteConfigType OnlineConfigType = "remote" + OnlineSqliteConfigType OnlineConfigType = "sqlite" + OnlineDBPersistenceSnowflakeConfigType OnlineConfigType = "snowflake.online" + OnlineDBPersistenceCassandraConfigType OnlineConfigType = "cassandra" - RegistryRemoteConfigType RegistryConfigType = "remote" - RegistryFileConfigType RegistryConfigType = "file" + RegistryRemoteConfigType RegistryConfigType = "remote" + RegistryFileConfigType RegistryConfigType = "file" + RegistryDBPersistenceSnowflakeConfigType RegistryConfigType = "snowflake.registry" + RegistryDBPersistenceSQLConfigType RegistryConfigType = "sql" LocalProviderType FeastProviderType = "local" ) @@ -172,22 +177,25 @@ type RepoConfig struct { // OfflineStoreConfig is the configuration that relates to reading from and writing to the Feast offline store. type OfflineStoreConfig struct { - Host string `yaml:"host,omitempty"` - Type OfflineConfigType `yaml:"type,omitempty"` - Port int `yaml:"port,omitempty"` + Host string `yaml:"host,omitempty"` + Type OfflineConfigType `yaml:"type,omitempty"` + Port int `yaml:"port,omitempty"` + DBParameters map[string]interface{} `yaml:",inline,omitempty"` } // OnlineStoreConfig is the configuration that relates to reading from and writing to the Feast online store. type OnlineStoreConfig struct { - Path string `yaml:"path,omitempty"` - Type OnlineConfigType `yaml:"type,omitempty"` + Path string `yaml:"path,omitempty"` + Type OnlineConfigType `yaml:"type,omitempty"` + DBParameters map[string]interface{} `yaml:",inline,omitempty"` } // RegistryConfig is the configuration that relates to reading from and writing to the Feast registry. type RegistryConfig struct { - Path string `yaml:"path,omitempty"` - RegistryType RegistryConfigType `yaml:"registry_type,omitempty"` - S3AdditionalKwargs *map[string]string `json:"s3_additional_kwargs,omitempty"` + Path string `yaml:"path,omitempty"` + RegistryType RegistryConfigType `yaml:"registry_type,omitempty"` + S3AdditionalKwargs *map[string]string `json:"s3_additional_kwargs,omitempty"` + DBParameters map[string]interface{} `yaml:",inline,omitempty"` } type deploymentSettings struct { diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index b68bf7916c..c366247b6c 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -2,12 +2,19 @@ package services import ( "fmt" + "reflect" "slices" + "strings" "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" ) func isLocalRegistry(featureStore *feastdevv1alpha1.FeatureStore) bool { @@ -63,55 +70,74 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { if services.Registry.Local.Persistence == nil { services.Registry.Local.Persistence = &feastdevv1alpha1.RegistryPersistence{} } - if services.Registry.Local.Persistence.FilePersistence == nil { - services.Registry.Local.Persistence.FilePersistence = &feastdevv1alpha1.RegistryFilePersistence{} - } - if len(services.Registry.Local.Persistence.FilePersistence.Path) == 0 { - services.Registry.Local.Persistence.FilePersistence.Path = defaultRegistryPath(services.Registry.Local.Persistence.FilePersistence) - } - if services.Registry.Local.Persistence.FilePersistence.PvcConfig != nil { - pvc := services.Registry.Local.Persistence.FilePersistence.PvcConfig - if pvc.Create != nil { - ensureRequestedStorage(&pvc.Create.Resources, DefaultRegistryStorageRequest) + + if services.Registry.Local.Persistence.DBPersistence == nil { + if services.Registry.Local.Persistence.FilePersistence == nil { + services.Registry.Local.Persistence.FilePersistence = &feastdevv1alpha1.RegistryFilePersistence{} + } + + if len(services.Registry.Local.Persistence.FilePersistence.Path) == 0 { + services.Registry.Local.Persistence.FilePersistence.Path = defaultRegistryPath(services.Registry.Local.Persistence.FilePersistence) + } + + if services.Registry.Local.Persistence.FilePersistence.PvcConfig != nil { + pvc := services.Registry.Local.Persistence.FilePersistence.PvcConfig + if pvc.Create != nil { + ensureRequestedStorage(&pvc.Create.Resources, DefaultRegistryStorageRequest) + } } } + setServiceDefaultConfigs(&services.Registry.Local.ServiceConfigs.DefaultConfigs) } if services.OfflineStore != nil { - setServiceDefaultConfigs(&services.OfflineStore.ServiceConfigs.DefaultConfigs) if services.OfflineStore.Persistence == nil { services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{} } - if services.OfflineStore.Persistence.FilePersistence == nil { - services.OfflineStore.Persistence.FilePersistence = &feastdevv1alpha1.OfflineStoreFilePersistence{} - } - if len(services.OfflineStore.Persistence.FilePersistence.Type) == 0 { - services.OfflineStore.Persistence.FilePersistence.Type = string(OfflineDaskConfigType) - } - if services.OfflineStore.Persistence.FilePersistence.PvcConfig != nil { - pvc := services.OfflineStore.Persistence.FilePersistence.PvcConfig - if pvc.Create != nil { - ensureRequestedStorage(&pvc.Create.Resources, DefaultOfflineStorageRequest) + + if services.OfflineStore.Persistence.DBPersistence == nil { + if services.OfflineStore.Persistence.FilePersistence == nil { + services.OfflineStore.Persistence.FilePersistence = &feastdevv1alpha1.OfflineStoreFilePersistence{} + } + + if len(services.OfflineStore.Persistence.FilePersistence.Type) == 0 { + services.OfflineStore.Persistence.FilePersistence.Type = string(OfflineFilePersistenceDaskConfigType) + } + + if services.OfflineStore.Persistence.FilePersistence.PvcConfig != nil { + pvc := services.OfflineStore.Persistence.FilePersistence.PvcConfig + if pvc.Create != nil { + ensureRequestedStorage(&pvc.Create.Resources, DefaultOfflineStorageRequest) + } } } + + setServiceDefaultConfigs(&services.OfflineStore.ServiceConfigs.DefaultConfigs) } + if services.OnlineStore != nil { - setServiceDefaultConfigs(&services.OnlineStore.ServiceConfigs.DefaultConfigs) if services.OnlineStore.Persistence == nil { services.OnlineStore.Persistence = &feastdevv1alpha1.OnlineStorePersistence{} } - if services.OnlineStore.Persistence.FilePersistence == nil { - services.OnlineStore.Persistence.FilePersistence = &feastdevv1alpha1.OnlineStoreFilePersistence{} - } - if len(services.OnlineStore.Persistence.FilePersistence.Path) == 0 { - services.OnlineStore.Persistence.FilePersistence.Path = defaultOnlineStorePath(services.OnlineStore.Persistence.FilePersistence) - } - if services.OnlineStore.Persistence.FilePersistence.PvcConfig != nil { - pvc := services.OnlineStore.Persistence.FilePersistence.PvcConfig - if pvc.Create != nil { - ensureRequestedStorage(&pvc.Create.Resources, DefaultOnlineStorageRequest) + + if services.OnlineStore.Persistence.DBPersistence == nil { + if services.OnlineStore.Persistence.FilePersistence == nil { + services.OnlineStore.Persistence.FilePersistence = &feastdevv1alpha1.OnlineStoreFilePersistence{} + } + + if len(services.OnlineStore.Persistence.FilePersistence.Path) == 0 { + services.OnlineStore.Persistence.FilePersistence.Path = defaultOnlineStorePath(services.OnlineStore.Persistence.FilePersistence) + } + + if services.OnlineStore.Persistence.FilePersistence.PvcConfig != nil { + pvc := services.OnlineStore.Persistence.FilePersistence.PvcConfig + if pvc.Create != nil { + ensureRequestedStorage(&pvc.Create.Resources, DefaultOnlineStorageRequest) + } } } + + setServiceDefaultConfigs(&services.OnlineStore.ServiceConfigs.DefaultConfigs) } // overwrite status.applied with every reconcile applied.DeepCopyInto(&cr.Status.Applied) @@ -127,7 +153,7 @@ func checkOfflineStoreFilePersistenceType(value string) error { if slices.Contains(feastdevv1alpha1.ValidOfflineStoreFilePersistenceTypes, value) { return nil } - return fmt.Errorf("invalid file type %s for offline store", value) + return fmt.Errorf("invalid file type %s for offline store", value) } func ensureRequestedStorage(resources *v1.VolumeResourceRequirements, requestedStorage string) { @@ -145,9 +171,118 @@ func defaultOnlineStorePath(persistence *feastdevv1alpha1.OnlineStoreFilePersist } return DefaultOnlineStorePvcPath } + func defaultRegistryPath(persistence *feastdevv1alpha1.RegistryFilePersistence) string { if persistence.PvcConfig == nil { return DefaultRegistryEphemeralPath } return DefaultRegistryPvcPath } + +func checkOfflineStoreDBStorePersistenceType(value string) error { + if slices.Contains(feastdevv1alpha1.ValidOfflineStoreDBStorePersistenceTypes, value) { + return nil + } + return fmt.Errorf("invalid DB store type %s for offline store", value) +} + +func checkOnlineStoreDBStorePersistenceType(value string) error { + if slices.Contains(feastdevv1alpha1.ValidOnlineStoreDBStorePersistenceTypes, value) { + return nil + } + return fmt.Errorf("invalid DB store type %s for online store", value) +} + +func checkRegistryDBStorePersistenceType(value string) error { + if slices.Contains(feastdevv1alpha1.ValidRegistryDBStorePersistenceTypes, value) { + return nil + } + return fmt.Errorf("invalid DB store type %s for registry", value) +} + +func (feast *FeastServices) getSecret(secretRef string) (*corev1.Secret, error) { + secret := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: secretRef, Namespace: feast.FeatureStore.Namespace}} + objectKey := client.ObjectKeyFromObject(secret) + if err := feast.Client.Get(feast.Context, objectKey, secret); err != nil { + if apierrors.IsNotFound(err) || err != nil { + logger := log.FromContext(feast.Context) + logger.Error(err, "invalid secret "+secretRef+" for offline store") + + return nil, err + } + } + + return secret, nil +} + +// Function to check if a struct has a specific field or field tag and sets the value in the field if empty +func hasAttrib(s interface{}, fieldName string, value interface{}) (bool, error) { + //t := reflect.TypeOf(s) + val := reflect.ValueOf(s) + + // Check that the object is a pointer so we can modify it + if val.Kind() != reflect.Ptr || val.IsNil() { + return false, fmt.Errorf("expected a pointer to struct, got %v", val.Kind()) + } + + // Check if the value is a pointer and dereference it if necessary + //if t.Kind() == reflect.Ptr { + // t = t.Elem() + //} + + val = val.Elem() + //t = t.Elem() + + // Loop through the fields and check the tag + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := val.Type().Field(i) + + tagVal := fieldType.Tag.Get("yaml") + //tagVal := field.Tag.Get("yaml") + + // Remove other metadata if exists + commaIndex := strings.Index(tagVal, ",") + + if commaIndex != -1 { + tagVal = tagVal[:commaIndex] + } + + // Check if the field name or the tag value matches the one we're looking for + if strings.EqualFold(fieldType.Name, fieldName) || strings.EqualFold(tagVal, fieldName) { + //val := reflect.ValueOf(s).Field(i) + + //// Check that the object is a pointer so we can modify it + //if val.Kind() != reflect.Ptr || val.IsNil() { + // return false, fmt.Errorf("expected a pointer to struct, got %v", val.Kind()) + //} + + // Ensure the field is settable + if !field.CanSet() { + return false, fmt.Errorf("cannot set field %s", fieldName) + } + + // Check if the field is empty (zero value) + if field.IsZero() { + // Set the field value only if it's empty + field.Set(reflect.ValueOf(value)) + } + + return true, nil + } + } + + return false, nil +} + +func CopyMap(original map[string]interface{}) map[string]interface{} { + // Create a new map to store the copy + newCopy := make(map[string]interface{}) + + // Loop through the original map and copy each key-value pair + for key, value := range original { + newCopy[key] = value + } + + return newCopy +} diff --git a/infra/feast-operator/test/api/featurestore_types_test.go b/infra/feast-operator/test/api/featurestore_types_test.go index ac690bb7c2..16af55b03b 100644 --- a/infra/feast-operator/test/api/featurestore_types_test.go +++ b/infra/feast-operator/test/api/featurestore_types_test.go @@ -40,8 +40,8 @@ func attemptInvalidCreationAndAsserts(ctx context.Context, featurestore *feastde } func onlineStoreWithAbsolutePathForPvc(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ OnlineStore: &feastdevv1alpha1.OnlineStore{ Persistence: &feastdevv1alpha1.OnlineStorePersistence{ FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ @@ -51,11 +51,11 @@ func onlineStoreWithAbsolutePathForPvc(featureStore *feastdevv1alpha1.FeatureSto }, }, } - return copy + return fsCopy } func onlineStoreWithRelativePathForEphemeral(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ OnlineStore: &feastdevv1alpha1.OnlineStore{ Persistence: &feastdevv1alpha1.OnlineStorePersistence{ FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ @@ -64,12 +64,12 @@ func onlineStoreWithRelativePathForEphemeral(featureStore *feastdevv1alpha1.Feat }, }, } - return copy + return fsCopy } func onlineStoreWithObjectStoreBucketForPvc(path string, featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ OnlineStore: &feastdevv1alpha1.OnlineStore{ Persistence: &feastdevv1alpha1.OnlineStorePersistence{ FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ @@ -82,12 +82,12 @@ func onlineStoreWithObjectStoreBucketForPvc(path string, featureStore *feastdevv }, }, } - return copy + return fsCopy } func offlineStoreWithUnmanagedFileType(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ OfflineStore: &feastdevv1alpha1.OfflineStore{ Persistence: &feastdevv1alpha1.OfflineStorePersistence{ FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ @@ -96,12 +96,12 @@ func offlineStoreWithUnmanagedFileType(featureStore *feastdevv1alpha1.FeatureSto }, }, } - return copy + return fsCopy } func registryWithAbsolutePathForPvc(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ Persistence: &feastdevv1alpha1.RegistryPersistence{ @@ -112,11 +112,11 @@ func registryWithAbsolutePathForPvc(featureStore *feastdevv1alpha1.FeatureStore) }, }, } - return copy + return fsCopy } func registryWithRelativePathForEphemeral(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ Persistence: &feastdevv1alpha1.RegistryPersistence{ @@ -127,11 +127,11 @@ func registryWithRelativePathForEphemeral(featureStore *feastdevv1alpha1.Feature }, }, } - return copy + return fsCopy } func registryWithObjectStoreBucketForPvc(path string, featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ Persistence: &feastdevv1alpha1.RegistryPersistence{ @@ -146,11 +146,11 @@ func registryWithObjectStoreBucketForPvc(path string, featureStore *feastdevv1al }, }, } - return copy + return fsCopy } func registryWithS3AdditionalKeywordsForFile(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ Persistence: &feastdevv1alpha1.RegistryPersistence{ @@ -162,11 +162,11 @@ func registryWithS3AdditionalKeywordsForFile(featureStore *feastdevv1alpha1.Feat }, }, } - return copy + return fsCopy } func registryWithS3AdditionalKeywordsForGsBucket(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ Registry: &feastdevv1alpha1.Registry{ Local: &feastdevv1alpha1.LocalRegistryConfig{ Persistence: &feastdevv1alpha1.RegistryPersistence{ @@ -178,12 +178,12 @@ func registryWithS3AdditionalKeywordsForGsBucket(featureStore *feastdevv1alpha1. }, }, } - return copy + return fsCopy } func pvcConfigWithNeitherRefNorCreate(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ OfflineStore: &feastdevv1alpha1.OfflineStore{ Persistence: &feastdevv1alpha1.OfflineStorePersistence{ FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ @@ -192,11 +192,11 @@ func pvcConfigWithNeitherRefNorCreate(featureStore *feastdevv1alpha1.FeatureStor }, }, } - return copy + return fsCopy } func pvcConfigWithBothRefAndCreate(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ OfflineStore: &feastdevv1alpha1.OfflineStore{ Persistence: &feastdevv1alpha1.OfflineStorePersistence{ FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ @@ -210,12 +210,12 @@ func pvcConfigWithBothRefAndCreate(featureStore *feastdevv1alpha1.FeatureStore) }, }, } - return copy + return fsCopy } func pvcConfigWithNoResources(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := featureStore.DeepCopy() - copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + fsCopy := featureStore.DeepCopy() + fsCopy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ OfflineStore: &feastdevv1alpha1.OfflineStore{ Persistence: &feastdevv1alpha1.OfflineStorePersistence{ FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ @@ -249,27 +249,27 @@ func pvcConfigWithNoResources(featureStore *feastdevv1alpha1.FeatureStore) *feas }, }, } - return copy + return fsCopy } func pvcConfigWithResources(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { - copy := pvcConfigWithNoResources(featureStore) - copy.Spec.Services.OfflineStore.Persistence.FilePersistence.PvcConfig.Create.Resources = corev1.VolumeResourceRequirements{ + fsCopy := pvcConfigWithNoResources(featureStore) + fsCopy.Spec.Services.OfflineStore.Persistence.FilePersistence.PvcConfig.Create.Resources = corev1.VolumeResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceStorage: resource.MustParse("10Gi"), }, } - copy.Spec.Services.OnlineStore.Persistence.FilePersistence.PvcConfig.Create.Resources = corev1.VolumeResourceRequirements{ + fsCopy.Spec.Services.OnlineStore.Persistence.FilePersistence.PvcConfig.Create.Resources = corev1.VolumeResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceStorage: resource.MustParse("1Gi"), }, } - copy.Spec.Services.Registry.Local.Persistence.FilePersistence.PvcConfig.Create.Resources = corev1.VolumeResourceRequirements{ + fsCopy.Spec.Services.Registry.Local.Persistence.FilePersistence.PvcConfig.Create.Resources = corev1.VolumeResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceStorage: resource.MustParse("500Mi"), }, } - return copy + return fsCopy } const resourceName = "test-resource" From 42d0801bc62d889e7e1811e7d771f2cb4acd3f8c Mon Sep 17 00:00:00 2001 From: Theodor Mihalache Date: Thu, 21 Nov 2024 16:11:02 -0500 Subject: [PATCH 2/4] Added CR example for store persistence Signed-off-by: Theodor Mihalache --- .../v1alpha1_featurestore_db_persistence.yaml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 infra/feast-operator/config/samples/v1alpha1_featurestore_db_persistence.yaml diff --git a/infra/feast-operator/config/samples/v1alpha1_featurestore_db_persistence.yaml b/infra/feast-operator/config/samples/v1alpha1_featurestore_db_persistence.yaml new file mode 100644 index 0000000000..0540bc90cc --- /dev/null +++ b/infra/feast-operator/config/samples/v1alpha1_featurestore_db_persistence.yaml @@ -0,0 +1,15 @@ +apiVersion: feast.dev/v1alpha1 +kind: FeatureStore +metadata: + name: example + namespace: test +spec: + feastProject: my_project + services: + onlineStore: + persistence: + store: + type: postgres + secretRef: + name: my-secret + secretKeyName: mykey # optional From d6b6f17aabeff86a6593fed0878656a98ab0bda1 Mon Sep 17 00:00:00 2001 From: Theodor Mihalache Date: Thu, 21 Nov 2024 16:13:13 -0500 Subject: [PATCH 3/4] Fixed incorrect yaml tag in RegistryConfig struct Signed-off-by: Theodor Mihalache --- .../internal/controller/services/services_types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index 03a99e2160..1251a2ff9e 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -194,7 +194,7 @@ type OnlineStoreConfig struct { type RegistryConfig struct { Path string `yaml:"path,omitempty"` RegistryType RegistryConfigType `yaml:"registry_type,omitempty"` - S3AdditionalKwargs *map[string]string `json:"s3_additional_kwargs,omitempty"` + S3AdditionalKwargs *map[string]string `yaml:"s3_additional_kwargs,omitempty"` DBParameters map[string]interface{} `yaml:",inline,omitempty"` } From aea02e6c8c25f7b166e6676d0d553d31a2334d61 Mon Sep 17 00:00:00 2001 From: Theodor Mihalache Date: Thu, 21 Nov 2024 16:14:58 -0500 Subject: [PATCH 4/4] Removed leftovers comments from hasAttrib function Signed-off-by: Theodor Mihalache --- .../internal/controller/services/util.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index c366247b6c..5e2daee673 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -217,7 +217,6 @@ func (feast *FeastServices) getSecret(secretRef string) (*corev1.Secret, error) // Function to check if a struct has a specific field or field tag and sets the value in the field if empty func hasAttrib(s interface{}, fieldName string, value interface{}) (bool, error) { - //t := reflect.TypeOf(s) val := reflect.ValueOf(s) // Check that the object is a pointer so we can modify it @@ -225,13 +224,7 @@ func hasAttrib(s interface{}, fieldName string, value interface{}) (bool, error) return false, fmt.Errorf("expected a pointer to struct, got %v", val.Kind()) } - // Check if the value is a pointer and dereference it if necessary - //if t.Kind() == reflect.Ptr { - // t = t.Elem() - //} - val = val.Elem() - //t = t.Elem() // Loop through the fields and check the tag for i := 0; i < val.NumField(); i++ { @@ -239,7 +232,6 @@ func hasAttrib(s interface{}, fieldName string, value interface{}) (bool, error) fieldType := val.Type().Field(i) tagVal := fieldType.Tag.Get("yaml") - //tagVal := field.Tag.Get("yaml") // Remove other metadata if exists commaIndex := strings.Index(tagVal, ",") @@ -250,12 +242,6 @@ func hasAttrib(s interface{}, fieldName string, value interface{}) (bool, error) // Check if the field name or the tag value matches the one we're looking for if strings.EqualFold(fieldType.Name, fieldName) || strings.EqualFold(tagVal, fieldName) { - //val := reflect.ValueOf(s).Field(i) - - //// Check that the object is a pointer so we can modify it - //if val.Kind() != reflect.Ptr || val.IsNil() { - // return false, fmt.Errorf("expected a pointer to struct, got %v", val.Kind()) - //} // Ensure the field is settable if !field.CanSet() {