From 21a10e6cd006280e4181354cea756e14a5db62bb Mon Sep 17 00:00:00 2001 From: Murad Biashimov Date: Thu, 14 Sep 2023 20:16:32 +0200 Subject: [PATCH] chore: use v2 client with context support (#497) --- CHANGELOG.md | 1 + controllers/basic_controller.go | 22 +++++------ controllers/cassandra_controller.go | 4 +- controllers/clickhouse_controller.go | 4 +- controllers/clickhouseuser_controller.go | 20 +++++----- controllers/common.go | 6 +-- controllers/connectionpool_controller.go | 41 ++++++++++---------- controllers/database_controller.go | 31 ++++++++------- controllers/generic_service_handler.go | 39 ++++++++++--------- controllers/grafana_controller.go | 4 +- controllers/kafka_controller.go | 6 +-- controllers/kafkaacl_controller.go | 35 +++++++++-------- controllers/kafkaconnect_controller.go | 4 +- controllers/kafkaconnector_controller.go | 36 ++++++++--------- controllers/kafkaschema_controller.go | 29 +++++++------- controllers/kafkatopic_controller.go | 38 +++++++++--------- controllers/mysql_controller.go | 4 +- controllers/opensearch_controller.go | 4 +- controllers/postgresql_controller.go | 4 +- controllers/project_controller.go | 36 ++++++++--------- controllers/projectvpc_controller.go | 24 ++++++------ controllers/redis_controller.go | 4 +- controllers/serviceintegration_controller.go | 26 +++++++------ controllers/serviceuser_controller.go | 30 +++++++------- go.mod | 2 +- go.sum | 4 +- tests/cassandra_test.go | 6 ++- tests/clickhouse_test.go | 6 ++- tests/clickhouseuser_test.go | 8 ++-- tests/connectionpool_test.go | 12 +++--- tests/database_test.go | 8 ++-- tests/generic_service_handler_test.go | 6 ++- tests/grafana_test.go | 6 ++- tests/kafka_test.go | 6 ++- tests/kafka_with_projectvpc_ref_test.go | 6 ++- tests/kafkaacl_test.go | 12 +++--- tests/kafkaconnect_test.go | 4 +- tests/kafkaschema_test.go | 6 ++- tests/kafkatopic_test.go | 12 +++--- tests/mysql_test.go | 6 ++- tests/opensearch_test.go | 6 ++- tests/postgresql_test.go | 15 ++++--- tests/project_test.go | 4 +- tests/projectvpc_test.go | 12 +++--- tests/redis_test.go | 6 ++- tests/serviceintegration_test.go | 27 +++++++------ tests/serviceuser_test.go | 8 ++-- tests/session.go | 2 +- tests/suite_test.go | 2 +- 49 files changed, 347 insertions(+), 297 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88cc62c3..918055a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Make `projectVpcId` and `projectVPCRef` mutable - Fix panic on `nil` user config conversion +- Use aiven-go-client with context support ## v0.13.0 - 2023-08-18 diff --git a/controllers/basic_controller.go b/controllers/basic_controller.go index e0b29dbb..75ef35a3 100644 --- a/controllers/basic_controller.go +++ b/controllers/basic_controller.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" "github.com/go-logr/logr" "github.com/hashicorp/go-multierror" "github.com/pkg/errors" @@ -45,20 +45,20 @@ type ( // of the Aiven services lifecycle. Handlers interface { // create or updates an instance on the Aiven side. - createOrUpdate(*aiven.Client, client.Object, []client.Object) error + createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error // delete removes an instance on Aiven side. // If an object is already deleted and cannot be found, it should not be an error. For other deletion // errors, return an error. - delete(*aiven.Client, client.Object) (bool, error) + delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) // get retrieve an object and a secret (for example, connection credentials) that is generated on the // fly based on data from Aiven API. When not applicable to service, it should return nil. - get(*aiven.Client, client.Object) (*corev1.Secret, error) + get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) // checkPreconditions check whether all preconditions for creating (or updating) the resource are in place. // For example, it is applicable when a service needs to be running before this resource can be created. - checkPreconditions(*aiven.Client, client.Object) (bool, error) + checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) } aivenManagedObject interface { @@ -212,7 +212,7 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien if !isAlreadyProcessed(o) { i.rec.Event(o, corev1.EventTypeNormal, eventCreateOrUpdatedAtAiven, "about to create instance at aiven") - if err := i.createOrUpdateInstance(o, refs); err != nil { + if err := i.createOrUpdateInstance(ctx, o, refs); err != nil { i.rec.Event(o, corev1.EventTypeWarning, eventUnableToCreateOrUpdateAtAiven, err.Error()) return ctrl.Result{}, fmt.Errorf("unable to create or update instance at aiven: %w", err) } @@ -262,7 +262,7 @@ func (i instanceReconcilerHelper) checkPreconditions(ctx context.Context, o clie i.log.Info("all references are good") } - check, err := i.h.checkPreconditions(i.avn, o) + check, err := i.h.checkPreconditions(ctx, i.avn, o) if err != nil { i.rec.Event(o, corev1.EventTypeWarning, eventUnableToWaitForPreconditions, err.Error()) return false, fmt.Errorf("unable to wait for preconditions: %w", err) @@ -317,7 +317,7 @@ func (i instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Ob func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object) (ctrl.Result, error) { i.rec.Event(o, corev1.EventTypeNormal, eventTryingToDeleteAtAiven, "trying to delete instance at aiven") - finalised, err := i.h.delete(i.avn, o) + finalised, err := i.h.delete(ctx, i.avn, o) // There are dependencies on Aiven side, resets error, so it goes for requeue // Handlers does not have logger, it goes here @@ -376,13 +376,13 @@ func (i instanceReconcilerHelper) isInvalidTokenError(err error) bool { return strings.Contains(msg, "Invalid token") || strings.Contains(msg, "Missing (expired) db token") } -func (i instanceReconcilerHelper) createOrUpdateInstance(o client.Object, refs []client.Object) error { +func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o client.Object, refs []client.Object) error { i.log.Info("generation wasn't processed, creation or updating instance on aiven side") a := o.GetAnnotations() delete(a, processedGenerationAnnotation) delete(a, instanceIsRunningAnnotation) - if err := i.h.createOrUpdate(i.avn, o, refs); err != nil { + if err := i.h.createOrUpdate(ctx, i.avn, o, refs); err != nil { return fmt.Errorf("unable to create or update aiven instance: %w", err) } @@ -415,7 +415,7 @@ func (i instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx c err = err.(*multierror.Error).ErrorOrNil() }() - serviceSecret, err := i.h.get(i.avn, o) + serviceSecret, err := i.h.get(ctx, i.avn, o) if err != nil { return false, err } else if serviceSecret != nil { diff --git a/controllers/cassandra_controller.go b/controllers/cassandra_controller.go index 3406190c..bbf40ed8 100644 --- a/controllers/cassandra_controller.go +++ b/controllers/cassandra_controller.go @@ -7,7 +7,7 @@ import ( "fmt" "strings" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -66,7 +66,7 @@ func (a *cassandraAdapter) getUserConfig() any { return &a.Spec.UserConfig } -func (a *cassandraAdapter) newSecret(s *aiven.Service) (*corev1.Secret, error) { +func (a *cassandraAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { stringData := map[string]string{ "HOST": s.URIParams["host"], "PORT": s.URIParams["port"], diff --git a/controllers/clickhouse_controller.go b/controllers/clickhouse_controller.go index d5cf0e11..75d68651 100644 --- a/controllers/clickhouse_controller.go +++ b/controllers/clickhouse_controller.go @@ -6,7 +6,7 @@ import ( "context" "fmt" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -65,7 +65,7 @@ func (a *clickhouseAdapter) getUserConfig() any { return &a.Spec.UserConfig } -func (a *clickhouseAdapter) newSecret(s *aiven.Service) (*corev1.Secret, error) { +func (a *clickhouseAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { prefix := getSecretPrefix(a) stringData := map[string]string{ prefix + "HOST": s.URIParams["host"], diff --git a/controllers/clickhouseuser_controller.go b/controllers/clickhouseuser_controller.go index 93655ef4..77631eff 100644 --- a/controllers/clickhouseuser_controller.go +++ b/controllers/clickhouseuser_controller.go @@ -9,7 +9,7 @@ import ( "fmt" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,13 +42,13 @@ func (r *ClickhouseUserReconciler) SetupWithManager(mgr ctrl.Manager) error { type clickhouseUserHandler struct{} -func (h *clickhouseUserHandler) createOrUpdate(avn *aiven.Client, obj client.Object, _ []client.Object) error { +func (h *clickhouseUserHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { user, err := h.convert(obj) if err != nil { return err } - r, err := avn.ClickhouseUser.Create(user.Spec.Project, user.Spec.ServiceName, user.Name) + r, err := avn.ClickhouseUser.Create(ctx, user.Spec.Project, user.Spec.ServiceName, user.Name) if err != nil { return err } @@ -73,7 +73,7 @@ func (h *clickhouseUserHandler) createOrUpdate(avn *aiven.Client, obj client.Obj return nil } -func (h *clickhouseUserHandler) delete(avn *aiven.Client, obj client.Object) (bool, error) { +func (h *clickhouseUserHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { user, err := h.convert(obj) if err != nil { return false, err @@ -84,7 +84,7 @@ func (h *clickhouseUserHandler) delete(avn *aiven.Client, obj client.Object) (bo return true, nil } - err = avn.ClickhouseUser.Delete(user.Spec.Project, user.Spec.ServiceName, user.Status.UUID) + err = avn.ClickhouseUser.Delete(ctx, user.Spec.Project, user.Spec.ServiceName, user.Status.UUID) if !aiven.IsNotFound(err) { return false, err } @@ -92,13 +92,13 @@ func (h *clickhouseUserHandler) delete(avn *aiven.Client, obj client.Object) (bo return true, nil } -func (h *clickhouseUserHandler) get(avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { +func (h *clickhouseUserHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { user, err := h.convert(obj) if err != nil { return nil, err } - s, err := avn.Services.Get(user.Spec.Project, user.Spec.ServiceName) + s, err := avn.Services.Get(ctx, user.Spec.Project, user.Spec.ServiceName) if err != nil { return nil, err } @@ -108,7 +108,7 @@ func (h *clickhouseUserHandler) get(avn *aiven.Client, obj client.Object) (*core // And all other GET methods return empty password, even this one. // So the only way to have a secret here is to reset it manually password := randPassword(maxUserPasswordLength) - _, err = avn.ClickhouseUser.ResetPassword(user.Spec.Project, user.Spec.ServiceName, user.Status.UUID, password) + _, err = avn.ClickhouseUser.ResetPassword(ctx, user.Spec.Project, user.Spec.ServiceName, user.Status.UUID, password) if err != nil { return nil, err } @@ -136,7 +136,7 @@ func (h *clickhouseUserHandler) get(avn *aiven.Client, obj client.Object) (*core return secret, nil } -func (h *clickhouseUserHandler) checkPreconditions(avn *aiven.Client, obj client.Object) (bool, error) { +func (h *clickhouseUserHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { user, err := h.convert(obj) if err != nil { return false, err @@ -145,7 +145,7 @@ func (h *clickhouseUserHandler) checkPreconditions(avn *aiven.Client, obj client meta.SetStatusCondition(&user.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(avn, user.Spec.Project, user.Spec.ServiceName) + return checkServiceIsRunning(ctx, avn, user.Spec.Project, user.Spec.ServiceName) } func (h *clickhouseUserHandler) convert(i client.Object) (*v1alpha1.ClickhouseUser, error) { diff --git a/controllers/common.go b/controllers/common.go index ae33e537..0e255d33 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -8,7 +8,7 @@ import ( "strconv" "strings" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" "github.com/liip/sheriff" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,8 +35,8 @@ var ( errTerminationProtectionOn = errors.New("termination protection is on") ) -func checkServiceIsRunning(c *aiven.Client, project, serviceName string) (bool, error) { - s, err := c.Services.Get(project, serviceName) +func checkServiceIsRunning(ctx context.Context, c *aiven.Client, project, serviceName string) (bool, error) { + s, err := c.Services.Get(ctx, project, serviceName) if err != nil { // if service is not found, it is not running if aiven.IsNotFound(err) { diff --git a/controllers/connectionpool_controller.go b/controllers/connectionpool_controller.go index 2d8b20b6..d3843a8a 100644 --- a/controllers/connectionpool_controller.go +++ b/controllers/connectionpool_controller.go @@ -7,7 +7,7 @@ import ( "fmt" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -40,19 +40,19 @@ func (r *ConnectionPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h ConnectionPoolHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs []client.Object) error { - cp, err := h.convert(i) +func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, _ []client.Object) error { + cp, err := h.convert(obj) if err != nil { return err } - exists, err := h.exists(avn, cp) + exists, err := h.exists(ctx, avn, cp) if err != nil { return err } var reason string if !exists { - _, err := avn.ConnectionPools.Create(cp.Spec.Project, cp.Spec.ServiceName, + _, err := avn.ConnectionPools.Create(ctx, cp.Spec.Project, cp.Spec.ServiceName, aiven.CreateConnectionPoolRequest{ Database: cp.Spec.DatabaseName, PoolMode: cp.Spec.PoolMode, @@ -65,7 +65,7 @@ func (h ConnectionPoolHandler) createOrUpdate(avn *aiven.Client, i client.Object } reason = "Created" } else { - _, err := avn.ConnectionPools.Update(cp.Spec.Project, cp.Spec.ServiceName, cp.Name, + _, err := avn.ConnectionPools.Update(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name, aiven.UpdateConnectionPoolRequest{ Database: cp.Spec.DatabaseName, PoolMode: cp.Spec.PoolMode, @@ -92,14 +92,13 @@ func (h ConnectionPoolHandler) createOrUpdate(avn *aiven.Client, i client.Object return nil } -func (h ConnectionPoolHandler) delete(avn *aiven.Client, i client.Object) (bool, error) { - cp, err := h.convert(i) +func (h ConnectionPoolHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + cp, err := h.convert(obj) if err != nil { return false, err } - err = avn.ConnectionPools.Delete( - cp.Spec.Project, cp.Spec.ServiceName, cp.Name) + err = avn.ConnectionPools.Delete(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name) if err != nil && !aiven.IsNotFound(err) { return false, err } @@ -107,8 +106,8 @@ func (h ConnectionPoolHandler) delete(avn *aiven.Client, i client.Object) (bool, return true, nil } -func (h ConnectionPoolHandler) exists(avn *aiven.Client, cp *v1alpha1.ConnectionPool) (bool, error) { - conPool, err := avn.ConnectionPools.Get(cp.Spec.Project, cp.Spec.ServiceName, cp.Name) +func (h ConnectionPoolHandler) exists(ctx context.Context, avn *aiven.Client, cp *v1alpha1.ConnectionPool) (bool, error) { + conPool, err := avn.ConnectionPools.Get(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name) if err != nil { if aiven.IsNotFound(err) { return false, nil @@ -119,18 +118,18 @@ func (h ConnectionPoolHandler) exists(avn *aiven.Client, cp *v1alpha1.Connection return conPool != nil, nil } -func (h ConnectionPoolHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret, error) { - connPool, err := h.convert(i) +func (h ConnectionPoolHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + connPool, err := h.convert(obj) if err != nil { return nil, err } - cp, err := avn.ConnectionPools.Get(connPool.Spec.Project, connPool.Spec.ServiceName, connPool.Name) + cp, err := avn.ConnectionPools.Get(ctx, connPool.Spec.Project, connPool.Spec.ServiceName, connPool.Name) if err != nil { return nil, fmt.Errorf("cannot get ConnectionPool: %w", err) } - s, err := avn.Services.Get(connPool.Spec.Project, connPool.Spec.ServiceName) + s, err := avn.Services.Get(ctx, connPool.Spec.Project, connPool.Spec.ServiceName) if err != nil { return nil, fmt.Errorf("cannot get service: %w", err) } @@ -164,7 +163,7 @@ func (h ConnectionPoolHandler) get(avn *aiven.Client, i client.Object) (*corev1. return newSecret(connPool, stringData, false), nil } - u, err := avn.ServiceUsers.Get(connPool.Spec.Project, connPool.Spec.ServiceName, connPool.Spec.Username) + u, err := avn.ServiceUsers.Get(ctx, connPool.Spec.Project, connPool.Spec.ServiceName, connPool.Spec.Username) if err != nil { return nil, fmt.Errorf("cannot get user: %w", err) } @@ -190,8 +189,8 @@ func (h ConnectionPoolHandler) get(avn *aiven.Client, i client.Object) (*corev1. return newSecret(connPool, stringData, false), nil } -func (h ConnectionPoolHandler) checkPreconditions(avn *aiven.Client, i client.Object) (bool, error) { - cp, err := h.convert(i) +func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + cp, err := h.convert(obj) if err != nil { return false, err } @@ -199,13 +198,13 @@ func (h ConnectionPoolHandler) checkPreconditions(avn *aiven.Client, i client.Ob meta.SetStatusCondition(&cp.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - check, err := checkServiceIsRunning(avn, cp.Spec.Project, cp.Spec.ServiceName) + check, err := checkServiceIsRunning(ctx, avn, cp.Spec.Project, cp.Spec.ServiceName) if err != nil { return false, err } if check { - db, err := avn.Databases.Get(cp.Spec.Project, cp.Spec.ServiceName, cp.Spec.DatabaseName) + db, err := avn.Databases.Get(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Spec.DatabaseName) if err != nil { return false, err } diff --git a/controllers/database_controller.go b/controllers/database_controller.go index de9141a2..b782108e 100644 --- a/controllers/database_controller.go +++ b/controllers/database_controller.go @@ -7,7 +7,7 @@ import ( "fmt" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,20 +38,20 @@ func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h DatabaseHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs []client.Object) error { - db, err := h.convert(i) +func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + db, err := h.convert(obj) if err != nil { return err } - exists, err := h.exists(avn, db) + exists, err := h.exists(ctx, avn, db) if err != nil { return err } if !exists { - _, err := avn.Databases.Create(db.Spec.Project, db.Spec.ServiceName, aiven.CreateDatabaseRequest{ + _, err := avn.Databases.Create(ctx, db.Spec.Project, db.Spec.ServiceName, aiven.CreateDatabaseRequest{ Database: db.Name, LcCollate: db.Spec.LcCollate, LcType: db.Spec.LcCtype, @@ -75,8 +75,8 @@ func (h DatabaseHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs return nil } -func (h DatabaseHandler) delete(avn *aiven.Client, i client.Object) (bool, error) { - db, err := h.convert(i) +func (h DatabaseHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + db, err := h.convert(obj) if err != nil { return false, err } @@ -86,6 +86,7 @@ func (h DatabaseHandler) delete(avn *aiven.Client, i client.Object) (bool, error } err = avn.Databases.Delete( + ctx, db.Spec.Project, db.Spec.ServiceName, db.Name) @@ -96,8 +97,8 @@ func (h DatabaseHandler) delete(avn *aiven.Client, i client.Object) (bool, error return true, nil } -func (h DatabaseHandler) exists(avn *aiven.Client, db *v1alpha1.Database) (bool, error) { - d, err := avn.Databases.Get(db.Spec.Project, db.Spec.ServiceName, db.Name) +func (h DatabaseHandler) exists(ctx context.Context, avn *aiven.Client, db *v1alpha1.Database) (bool, error) { + d, err := avn.Databases.Get(ctx, db.Spec.Project, db.Spec.ServiceName, db.Name) if aiven.IsNotFound(err) { return false, nil } @@ -105,13 +106,13 @@ func (h DatabaseHandler) exists(avn *aiven.Client, db *v1alpha1.Database) (bool, return d != nil, nil } -func (h DatabaseHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret, error) { - db, err := h.convert(i) +func (h DatabaseHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + db, err := h.convert(obj) if err != nil { return nil, err } - _, err = avn.Databases.Get(db.Spec.Project, db.Spec.ServiceName, db.Name) + _, err = avn.Databases.Get(ctx, db.Spec.Project, db.Spec.ServiceName, db.Name) if err != nil { return nil, err } @@ -125,8 +126,8 @@ func (h DatabaseHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret return nil, nil } -func (h DatabaseHandler) checkPreconditions(avn *aiven.Client, i client.Object) (bool, error) { - db, err := h.convert(i) +func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + db, err := h.convert(obj) if err != nil { return false, err } @@ -134,7 +135,7 @@ func (h DatabaseHandler) checkPreconditions(avn *aiven.Client, i client.Object) meta.SetStatusCondition(&db.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(avn, db.Spec.Project, db.Spec.ServiceName) + return checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName) } func (h DatabaseHandler) convert(i client.Object) (*v1alpha1.Database, error) { diff --git a/controllers/generic_service_handler.go b/controllers/generic_service_handler.go index a844ed29..181dcba6 100644 --- a/controllers/generic_service_handler.go +++ b/controllers/generic_service_handler.go @@ -1,10 +1,11 @@ package controllers import ( + "context" "fmt" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,8 +24,8 @@ type genericServiceHandler struct { fabric serviceAdapterFabric } -func (h *genericServiceHandler) createOrUpdate(a *aiven.Client, object client.Object, refs []client.Object) error { - o, err := h.fabric(a, object) +func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + o, err := h.fabric(avn, obj) if err != nil { return err } @@ -41,7 +42,7 @@ func (h *genericServiceHandler) createOrUpdate(a *aiven.Client, object client.Ob } } - _, err = a.Services.Get(spec.Project, ometa.Name) + _, err = avn.Services.Get(ctx, spec.Project, ometa.Name) exists := err == nil if !exists && !aiven.IsNotFound(err) { return fmt.Errorf("failed to fetch service: %w", err) @@ -77,7 +78,7 @@ func (h *genericServiceHandler) createOrUpdate(a *aiven.Client, object client.Ob req.ServiceIntegrations = append(req.ServiceIntegrations, i) } - _, err = a.Services.Create(spec.Project, req) + _, err = avn.Services.Create(ctx, spec.Project, req) if err != nil { return fmt.Errorf("failed to create service: %w", err) } @@ -98,7 +99,7 @@ func (h *genericServiceHandler) createOrUpdate(a *aiven.Client, object client.Ob TerminationProtection: fromAnyPointer(spec.TerminationProtection), UserConfig: userConfig, } - _, err = a.Services.Update(spec.Project, ometa.Name, req) + _, err = avn.Services.Update(ctx, spec.Project, ometa.Name, req) if err != nil { return fmt.Errorf("failed to update service: %w", err) } @@ -113,7 +114,7 @@ func (h *genericServiceHandler) createOrUpdate(a *aiven.Client, object client.Ob if spec.Tags != nil { req.Tags = spec.Tags } - _, err = a.ServiceTags.Set(spec.Project, ometa.Name, req) + _, err = avn.ServiceTags.Set(ctx, spec.Project, ometa.Name, req) if err != nil { return fmt.Errorf("failed to update tags: %w", err) } @@ -127,13 +128,13 @@ func (h *genericServiceHandler) createOrUpdate(a *aiven.Client, object client.Ob metav1.SetMetaDataAnnotation( o.getObjectMeta(), processedGenerationAnnotation, - strconv.FormatInt(object.GetGeneration(), formatIntBaseDecimal), + strconv.FormatInt(obj.GetGeneration(), formatIntBaseDecimal), ) return nil } -func (h *genericServiceHandler) delete(a *aiven.Client, object client.Object) (bool, error) { - o, err := h.fabric(a, object) +func (h *genericServiceHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + o, err := h.fabric(avn, obj) if err != nil { return false, err } @@ -143,7 +144,7 @@ func (h *genericServiceHandler) delete(a *aiven.Client, object client.Object) (b return false, errTerminationProtectionOn } - err = a.Services.Delete(spec.Project, o.getObjectMeta().Name) + err = avn.Services.Delete(ctx, spec.Project, o.getObjectMeta().Name) if err == nil || aiven.IsNotFound(err) { return true, nil } @@ -151,13 +152,13 @@ func (h *genericServiceHandler) delete(a *aiven.Client, object client.Object) (b return false, fmt.Errorf("failed to delete service in Aiven: %w", err) } -func (h *genericServiceHandler) get(a *aiven.Client, object client.Object) (*corev1.Secret, error) { - o, err := h.fabric(a, object) +func (h *genericServiceHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + o, err := h.fabric(avn, obj) if err != nil { return nil, err } - s, err := a.Services.Get(o.getServiceCommonSpec().Project, o.getObjectMeta().Name) + s, err := avn.Services.Get(ctx, o.getServiceCommonSpec().Project, o.getObjectMeta().Name) if err != nil { return nil, fmt.Errorf("failed to get service from Aiven: %w", err) } @@ -172,14 +173,14 @@ func (h *genericServiceHandler) get(a *aiven.Client, object client.Object) (*cor // Some services get secrets after they are running only, // like ip addresses (hosts) - return o.newSecret(s) + return o.newSecret(ctx, s) } return nil, nil } // checkPreconditions not required for now by services to be implemented -func (h *genericServiceHandler) checkPreconditions(a *aiven.Client, object client.Object) (bool, error) { - o, err := h.fabric(a, object) +func (h *genericServiceHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + o, err := h.fabric(avn, obj) if err != nil { return false, err } @@ -189,7 +190,7 @@ func (h *genericServiceHandler) checkPreconditions(a *aiven.Client, object clien // Validates that read_replica is running // If not, the wrapper controller will try later if s.IntegrationType == "read_replica" { - r, err := checkServiceIsRunning(a, spec.Project, s.SourceServiceName) + r, err := checkServiceIsRunning(ctx, avn, spec.Project, s.SourceServiceName) if !(r && err == nil) { return false, nil } @@ -209,5 +210,5 @@ type serviceAdapter interface { getServiceType() string getDiskSpace() string getUserConfig() any - newSecret(*aiven.Service) (*corev1.Secret, error) + newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) } diff --git a/controllers/grafana_controller.go b/controllers/grafana_controller.go index bb539873..4ca7531b 100644 --- a/controllers/grafana_controller.go +++ b/controllers/grafana_controller.go @@ -7,7 +7,7 @@ import ( "fmt" "strings" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -66,7 +66,7 @@ func (a *grafanaAdapter) getUserConfig() any { return &a.Spec.UserConfig } -func (a *grafanaAdapter) newSecret(s *aiven.Service) (*corev1.Secret, error) { +func (a *grafanaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { stringData := map[string]string{ "HOST": s.URIParams["host"], "PORT": s.URIParams["port"], diff --git a/controllers/kafka_controller.go b/controllers/kafka_controller.go index 1acb0d7e..07f4dad2 100644 --- a/controllers/kafka_controller.go +++ b/controllers/kafka_controller.go @@ -7,7 +7,7 @@ import ( "fmt" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -66,14 +66,14 @@ func (a *kafkaAdapter) getUserConfig() any { return &a.Spec.UserConfig } -func (a *kafkaAdapter) newSecret(s *aiven.Service) (*corev1.Secret, error) { +func (a *kafkaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { var userName, password string if len(s.Users) > 0 { userName = s.Users[0].Username password = s.Users[0].Password } - caCert, err := a.avn.CA.Get(a.getServiceCommonSpec().Project) + caCert, err := a.avn.CA.Get(ctx, a.getServiceCommonSpec().Project) if err != nil { return nil, fmt.Errorf("aiven client error %w", err) } diff --git a/controllers/kafkaacl_controller.go b/controllers/kafkaacl_controller.go index 5fcefd47..9e6af9ad 100644 --- a/controllers/kafkaacl_controller.go +++ b/controllers/kafkaacl_controller.go @@ -8,7 +8,7 @@ import ( "net/http" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,21 +38,22 @@ func (r *KafkaACLReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h KafkaACLHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs []client.Object) error { - acl, err := h.convert(i) +func (h KafkaACLHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + acl, err := h.convert(obj) if err != nil { return err } // ACL can't be really modified // Tries to delete it instead - _, err = h.delete(avn, i) + _, err = h.delete(ctx, avn, obj) if err != nil { return err } // Creates it from scratch r, err := avn.KafkaACLs.Create( + ctx, acl.Spec.Project, acl.Spec.ServiceName, aiven.CreateKafkaACLRequest{ @@ -81,15 +82,15 @@ func (h KafkaACLHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs return nil } -func (h KafkaACLHandler) delete(avn *aiven.Client, i client.Object) (bool, error) { - acl, err := h.convert(i) +func (h KafkaACLHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + acl, err := h.convert(obj) if err != nil { return false, err } - id, err := h.getID(avn, acl) + id, err := h.getID(ctx, avn, acl) if err == nil { - err = avn.KafkaACLs.Delete(acl.Spec.Project, acl.Spec.ServiceName, id) + err = avn.KafkaACLs.Delete(ctx, acl.Spec.Project, acl.Spec.ServiceName, id) } if err != nil && !aiven.IsNotFound(err) { @@ -101,7 +102,7 @@ func (h KafkaACLHandler) delete(avn *aiven.Client, i client.Object) (bool, error // todo: remove in v1 // getID returns ACL's ID in < v0.5.1 compatible mode -func (h KafkaACLHandler) getID(avn *aiven.Client, acl *v1alpha1.KafkaACL) (string, error) { +func (h KafkaACLHandler) getID(ctx context.Context, avn *aiven.Client, acl *v1alpha1.KafkaACL) (string, error) { // ACLs made prior to v0.5.1 doesn't have an ID. // This block is for fresh made ACLs only // The rest of this function tries to guess it filtering the list. @@ -110,7 +111,7 @@ func (h KafkaACLHandler) getID(avn *aiven.Client, acl *v1alpha1.KafkaACL) (strin } // For old ACLs only - list, err := avn.KafkaACLs.List(acl.Spec.Project, acl.Spec.ServiceName) + list, err := avn.KafkaACLs.List(ctx, acl.Spec.Project, acl.Spec.ServiceName) if err != nil { return "", err } @@ -125,18 +126,18 @@ func (h KafkaACLHandler) getID(avn *aiven.Client, acl *v1alpha1.KafkaACL) (strin return "", aiven.Error{Status: http.StatusNotFound, Message: fmt.Sprintf("Kafka ACL %q not found", acl.Name)} } -func (h KafkaACLHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret, error) { - acl, err := h.convert(i) +func (h KafkaACLHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + acl, err := h.convert(obj) if err != nil { return nil, err } - id, err := h.getID(avn, acl) + id, err := h.getID(ctx, avn, acl) if err != nil { return nil, err } - _, err = avn.KafkaACLs.Get(acl.Spec.Project, acl.Spec.ServiceName, id) + _, err = avn.KafkaACLs.Get(ctx, acl.Spec.Project, acl.Spec.ServiceName, id) if err != nil { return nil, err } @@ -150,8 +151,8 @@ func (h KafkaACLHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret return nil, nil } -func (h KafkaACLHandler) checkPreconditions(avn *aiven.Client, i client.Object) (bool, error) { - acl, err := h.convert(i) +func (h KafkaACLHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + acl, err := h.convert(obj) if err != nil { return false, err } @@ -159,7 +160,7 @@ func (h KafkaACLHandler) checkPreconditions(avn *aiven.Client, i client.Object) meta.SetStatusCondition(&acl.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(avn, acl.Spec.Project, acl.Spec.ServiceName) + return checkServiceIsRunning(ctx, avn, acl.Spec.Project, acl.Spec.ServiceName) } func (h KafkaACLHandler) convert(i client.Object) (*v1alpha1.KafkaACL, error) { diff --git a/controllers/kafkaconnect_controller.go b/controllers/kafkaconnect_controller.go index 6cb81f31..ba294580 100644 --- a/controllers/kafkaconnect_controller.go +++ b/controllers/kafkaconnect_controller.go @@ -6,7 +6,7 @@ import ( "context" "fmt" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -63,7 +63,7 @@ func (a *kafkaConnectAdapter) getUserConfig() any { return &a.Spec.UserConfig } -func (a *kafkaConnectAdapter) newSecret(_ *aiven.Service) (*corev1.Secret, error) { +func (a *kafkaConnectAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { return nil, nil } diff --git a/controllers/kafkaconnector_controller.go b/controllers/kafkaconnector_controller.go index b75534e3..afba0e5f 100644 --- a/controllers/kafkaconnector_controller.go +++ b/controllers/kafkaconnector_controller.go @@ -9,7 +9,7 @@ import ( "strconv" "text/template" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -44,13 +44,13 @@ func (r *KafkaConnectorReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h KafkaConnectorHandler) createOrUpdate(avn *aiven.Client, o client.Object, refs []client.Object) error { - conn, err := h.convert(o) +func (h KafkaConnectorHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + conn, err := h.convert(obj) if err != nil { return err } - exists, err := h.exists(avn, conn) + exists, err := h.exists(ctx, avn, conn) if err != nil { return fmt.Errorf("unable to check if kafka connector exists: %w", err) } @@ -62,13 +62,13 @@ func (h KafkaConnectorHandler) createOrUpdate(avn *aiven.Client, o client.Object var reason string if !exists { - err = avn.KafkaConnectors.Create(conn.Spec.Project, conn.Spec.ServiceName, connCfg) + err = avn.KafkaConnectors.Create(ctx, conn.Spec.Project, conn.Spec.ServiceName, connCfg) if err != nil && !aiven.IsAlreadyExists(err) { return err } reason = "Created" } else { - _, err := avn.KafkaConnectors.Update(conn.Spec.Project, conn.Spec.ServiceName, conn.Name, connCfg) + _, err := avn.KafkaConnectors.Update(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name, connCfg) if err != nil { return err } @@ -134,33 +134,33 @@ func (h KafkaConnectorHandler) buildConnectorConfig(conn *v1alpha1.KafkaConnecto return aiven.KafkaConnectorConfig(m), nil } -func (h KafkaConnectorHandler) delete(avn *aiven.Client, o client.Object) (bool, error) { - conn, err := h.convert(o) +func (h KafkaConnectorHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + conn, err := h.convert(obj) if err != nil { return false, err } - err = avn.KafkaConnectors.Delete(conn.Spec.Project, conn.Spec.ServiceName, conn.Name) + err = avn.KafkaConnectors.Delete(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name) if err != nil && !aiven.IsNotFound(err) { return false, fmt.Errorf("unable to delete kafka connector: %w", err) } return true, nil } -func (h KafkaConnectorHandler) exists(avn *aiven.Client, conn *v1alpha1.KafkaConnector) (bool, error) { - connector, err := avn.KafkaConnectors.Status(conn.Spec.Project, conn.Spec.ServiceName, conn.Name) +func (h KafkaConnectorHandler) exists(ctx context.Context, avn *aiven.Client, conn *v1alpha1.KafkaConnector) (bool, error) { + connector, err := avn.KafkaConnectors.Status(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name) if err != nil && !aiven.IsNotFound(err) { return false, err } return connector != nil, nil } -func (h KafkaConnectorHandler) get(avn *aiven.Client, o client.Object) (*corev1.Secret, error) { - conn, err := h.convert(o) +func (h KafkaConnectorHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + conn, err := h.convert(obj) if err != nil { return nil, err } - connAtAiven, err := avn.KafkaConnectors.GetByName(conn.Spec.Project, conn.Spec.ServiceName, conn.Name) + connAtAiven, err := avn.KafkaConnectors.GetByName(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name) if err != nil { return nil, err } @@ -173,7 +173,7 @@ func (h KafkaConnectorHandler) get(avn *aiven.Client, o client.Object) (*corev1. Version: connAtAiven.Plugin.Version, } - connStat, err := avn.KafkaConnectors.Status(conn.Spec.Project, conn.Spec.ServiceName, conn.Name) + connStat, err := avn.KafkaConnectors.Status(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name) if err != nil { return nil, err } @@ -206,8 +206,8 @@ func (h KafkaConnectorHandler) get(avn *aiven.Client, o client.Object) (*corev1. return nil, nil } -func (h KafkaConnectorHandler) checkPreconditions(avn *aiven.Client, o client.Object) (bool, error) { - conn, err := h.convert(o) +func (h KafkaConnectorHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + conn, err := h.convert(obj) if err != nil { return false, err } @@ -215,7 +215,7 @@ func (h KafkaConnectorHandler) checkPreconditions(avn *aiven.Client, o client.Ob meta.SetStatusCondition(&conn.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(avn, conn.Spec.Project, conn.Spec.ServiceName) + return checkServiceIsRunning(ctx, avn, conn.Spec.Project, conn.Spec.ServiceName) } func (h KafkaConnectorHandler) convert(o client.Object) (*v1alpha1.KafkaConnector, error) { diff --git a/controllers/kafkaschema_controller.go b/controllers/kafkaschema_controller.go index 2c82b71c..4e293c4a 100644 --- a/controllers/kafkaschema_controller.go +++ b/controllers/kafkaschema_controller.go @@ -7,7 +7,7 @@ import ( "fmt" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,14 +37,15 @@ func (r *KafkaSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h KafkaSchemaHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs []client.Object) error { - schema, err := h.convert(i) +func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + schema, err := h.convert(obj) if err != nil { return err } // createOrUpdate Kafka Schema Subject _, err = avn.KafkaSubjectSchemas.Add( + ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, @@ -59,6 +60,7 @@ func (h KafkaSchemaHandler) createOrUpdate(avn *aiven.Client, i client.Object, r // set compatibility level if defined for a newly created Kafka Schema Subject if schema.Spec.CompatibilityLevel != "" { _, err := avn.KafkaSubjectSchemas.UpdateConfiguration( + ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, @@ -70,7 +72,7 @@ func (h KafkaSchemaHandler) createOrUpdate(avn *aiven.Client, i client.Object, r } // get last version - version, err := h.getLastVersion(avn, schema) + version, err := h.getLastVersion(ctx, avn, schema) if err != nil { return fmt.Errorf("cannot get Kafka Schema Subject version: %w", err) } @@ -91,13 +93,13 @@ func (h KafkaSchemaHandler) createOrUpdate(avn *aiven.Client, i client.Object, r return nil } -func (h KafkaSchemaHandler) delete(avn *aiven.Client, i client.Object) (bool, error) { - schema, err := h.convert(i) +func (h KafkaSchemaHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + schema, err := h.convert(obj) if err != nil { return false, err } - err = avn.KafkaSubjectSchemas.Delete(schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) + err = avn.KafkaSubjectSchemas.Delete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) if err != nil && !aiven.IsNotFound(err) { return false, fmt.Errorf("aiven client delete Kafka Schema error: %w", err) } @@ -105,8 +107,8 @@ func (h KafkaSchemaHandler) delete(avn *aiven.Client, i client.Object) (bool, er return true, nil } -func (h KafkaSchemaHandler) get(_ *aiven.Client, i client.Object) (*corev1.Secret, error) { - schema, err := h.convert(i) +func (h KafkaSchemaHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + schema, err := h.convert(obj) if err != nil { return nil, err } @@ -120,13 +122,13 @@ func (h KafkaSchemaHandler) get(_ *aiven.Client, i client.Object) (*corev1.Secre return nil, nil } -func (h KafkaSchemaHandler) checkPreconditions(avn *aiven.Client, i client.Object) (bool, error) { - schema, err := h.convert(i) +func (h KafkaSchemaHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + schema, err := h.convert(obj) if err != nil { return false, err } - return checkServiceIsRunning(avn, schema.Spec.Project, schema.Spec.ServiceName) + return checkServiceIsRunning(ctx, avn, schema.Spec.Project, schema.Spec.ServiceName) } func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, error) { @@ -138,8 +140,9 @@ func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, err return schema, nil } -func (h KafkaSchemaHandler) getLastVersion(avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) { +func (h KafkaSchemaHandler) getLastVersion(ctx context.Context, avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) { ver, err := avn.KafkaSubjectSchemas.GetVersions( + ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) diff --git a/controllers/kafkatopic_controller.go b/controllers/kafkatopic_controller.go index faab8314..12d4e320 100644 --- a/controllers/kafkatopic_controller.go +++ b/controllers/kafkatopic_controller.go @@ -7,7 +7,7 @@ import ( "fmt" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,8 +37,8 @@ func (r *KafkaTopicReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h KafkaTopicHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs []client.Object) error { - topic, err := h.convert(i) +func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + topic, err := h.convert(obj) if err != nil { return err } @@ -51,14 +51,14 @@ func (h KafkaTopicHandler) createOrUpdate(avn *aiven.Client, i client.Object, re }) } - exists, err := h.exists(avn, topic) + exists, err := h.exists(ctx, avn, topic) if err != nil { return err } var reason string if !exists { - err = avn.KafkaTopics.Create(topic.Spec.Project, topic.Spec.ServiceName, aiven.CreateKafkaTopicRequest{ + err = avn.KafkaTopics.Create(ctx, topic.Spec.Project, topic.Spec.ServiceName, aiven.CreateKafkaTopicRequest{ Partitions: &topic.Spec.Partitions, Replication: &topic.Spec.Replication, TopicName: topic.GetTopicName(), @@ -71,7 +71,7 @@ func (h KafkaTopicHandler) createOrUpdate(avn *aiven.Client, i client.Object, re reason = "Created" } else { - err = avn.KafkaTopics.Update(topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName(), + err = avn.KafkaTopics.Update(ctx, topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName(), aiven.UpdateKafkaTopicRequest{ Partitions: &topic.Spec.Partitions, Replication: &topic.Spec.Replication, @@ -99,8 +99,8 @@ func (h KafkaTopicHandler) createOrUpdate(avn *aiven.Client, i client.Object, re return nil } -func (h KafkaTopicHandler) delete(avn *aiven.Client, i client.Object) (bool, error) { - topic, err := h.convert(i) +func (h KafkaTopicHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + topic, err := h.convert(obj) if err != nil { return false, err } @@ -110,7 +110,7 @@ func (h KafkaTopicHandler) delete(avn *aiven.Client, i client.Object) (bool, err } // Delete project on Aiven side - err = avn.KafkaTopics.Delete(topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName()) + err = avn.KafkaTopics.Delete(ctx, topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName()) if err != nil && !aiven.IsNotFound(err) { return false, err } @@ -118,8 +118,8 @@ func (h KafkaTopicHandler) delete(avn *aiven.Client, i client.Object) (bool, err return true, nil } -func (h KafkaTopicHandler) exists(avn *aiven.Client, topic *v1alpha1.KafkaTopic) (bool, error) { - t, err := avn.KafkaTopics.Get(topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName()) +func (h KafkaTopicHandler) exists(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (bool, error) { + t, err := avn.KafkaTopics.Get(ctx, topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName()) if err != nil && !aiven.IsNotFound(err) { if aivenError, ok := err.(aiven.Error); ok { // Getting topic info can sometimes temporarily fail with 501 and 502. Don't @@ -135,13 +135,13 @@ func (h KafkaTopicHandler) exists(avn *aiven.Client, topic *v1alpha1.KafkaTopic) return t != nil, nil } -func (h KafkaTopicHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret, error) { - topic, err := h.convert(i) +func (h KafkaTopicHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + topic, err := h.convert(obj) if err != nil { return nil, err } - state, err := h.getState(avn, topic) + state, err := h.getState(ctx, avn, topic) if err != nil { return nil, err } @@ -159,8 +159,8 @@ func (h KafkaTopicHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secr return nil, err } -func (h KafkaTopicHandler) checkPreconditions(avn *aiven.Client, i client.Object) (bool, error) { - topic, err := h.convert(i) +func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + topic, err := h.convert(obj) if err != nil { return false, err } @@ -168,11 +168,11 @@ func (h KafkaTopicHandler) checkPreconditions(avn *aiven.Client, i client.Object meta.SetStatusCondition(&topic.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(avn, topic.Spec.Project, topic.Spec.ServiceName) + return checkServiceIsRunning(ctx, avn, topic.Spec.Project, topic.Spec.ServiceName) } -func (h KafkaTopicHandler) getState(avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) { - t, err := avn.KafkaTopics.Get(topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName()) +func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) { + t, err := avn.KafkaTopics.Get(ctx, topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName()) if err != nil { if aivenError, ok := err.(aiven.Error); ok { // Getting topic info can sometimes temporarily fail with 501 and 502. Don't diff --git a/controllers/mysql_controller.go b/controllers/mysql_controller.go index 26d761f4..e7ed6ead 100644 --- a/controllers/mysql_controller.go +++ b/controllers/mysql_controller.go @@ -6,7 +6,7 @@ import ( "context" "fmt" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -65,7 +65,7 @@ func (a *mySQLAdapter) getUserConfig() any { return &a.Spec.UserConfig } -func (a *mySQLAdapter) newSecret(s *aiven.Service) (*corev1.Secret, error) { +func (a *mySQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { stringData := map[string]string{ "HOST": s.URIParams["host"], "PORT": s.URIParams["port"], diff --git a/controllers/opensearch_controller.go b/controllers/opensearch_controller.go index bbd26492..f9d1ef03 100644 --- a/controllers/opensearch_controller.go +++ b/controllers/opensearch_controller.go @@ -6,7 +6,7 @@ import ( "context" "fmt" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -67,7 +67,7 @@ func (a *opensearchAdapter) getUserConfig() any { return &a.Spec.UserConfig } -func (a *opensearchAdapter) newSecret(s *aiven.Service) (*corev1.Secret, error) { +func (a *opensearchAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { prefix := getSecretPrefix(a) stringData := map[string]string{ prefix + "HOST": s.URIParams["host"], diff --git a/controllers/postgresql_controller.go b/controllers/postgresql_controller.go index 5aad7829..b2d90a86 100644 --- a/controllers/postgresql_controller.go +++ b/controllers/postgresql_controller.go @@ -6,7 +6,7 @@ import ( "context" "fmt" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -64,7 +64,7 @@ func (a *postgresSQLAdapter) getUserConfig() any { return &a.Spec.UserConfig } -func (a *postgresSQLAdapter) newSecret(s *aiven.Service) (*corev1.Secret, error) { +func (a *postgresSQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { prefix := getSecretPrefix(a) stringData := map[string]string{ prefix + "HOST": s.URIParams["host"], diff --git a/controllers/project_controller.go b/controllers/project_controller.go index 3902374b..8096c20f 100644 --- a/controllers/project_controller.go +++ b/controllers/project_controller.go @@ -8,7 +8,7 @@ import ( "regexp" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,12 +41,12 @@ func (r *ProjectReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h ProjectHandler) getLongCardID(client *aiven.Client, cardID string) (*string, error) { +func (h ProjectHandler) getLongCardID(ctx context.Context, client *aiven.Client, cardID string) (*string, error) { if cardID == "" { return nil, nil } - card, err := client.CardsHandler.Get(cardID) + card, err := client.CardsHandler.Get(ctx, cardID) if err != nil { return nil, err } @@ -59,8 +59,8 @@ func (h ProjectHandler) getLongCardID(client *aiven.Client, cardID string) (*str } // create creates a project on Aiven side -func (h ProjectHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs []client.Object) error { - project, err := h.convert(i) +func (h ProjectHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + project, err := h.convert(obj) if err != nil { return err } @@ -75,12 +75,12 @@ func (h ProjectHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs technicalEmails = aiven.ContactEmailFromStringSlice(project.Spec.TechnicalEmails) } - exists, err := h.exists(avn, project) + exists, err := h.exists(ctx, avn, project) if err != nil { return fmt.Errorf("project does not exists: %w", err) } - cardID, err := h.getLongCardID(avn, project.Spec.CardID) + cardID, err := h.getLongCardID(ctx, avn, project.Spec.CardID) if err != nil { return fmt.Errorf("cannot get long card id: %w", err) } @@ -88,7 +88,7 @@ func (h ProjectHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs var reason string var p *aiven.Project if !exists { - p, err = avn.Projects.Create(aiven.CreateProjectRequest{ + p, err = avn.Projects.Create(ctx, aiven.CreateProjectRequest{ BillingAddress: toOptionalStringPointer(project.Spec.BillingAddress), BillingEmails: billingEmails, BillingExtraText: toOptionalStringPointer(project.Spec.BillingExtraText), @@ -111,7 +111,7 @@ func (h ProjectHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs reason = "Created" } else { - p, err = avn.Projects.Update(project.Name, aiven.UpdateProjectRequest{ + p, err = avn.Projects.Update(ctx, project.Name, aiven.UpdateProjectRequest{ BillingAddress: toOptionalStringPointer(project.Spec.BillingAddress), BillingEmails: billingEmails, BillingExtraText: toOptionalStringPointer(project.Spec.BillingExtraText), @@ -150,13 +150,13 @@ func (h ProjectHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs return nil } -func (h ProjectHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret, error) { - project, err := h.convert(i) +func (h ProjectHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + project, err := h.convert(obj) if err != nil { return nil, err } - cert, err := avn.CA.Get(project.Name) + cert, err := avn.CA.Get(ctx, project.Name) if err != nil { return nil, fmt.Errorf("aiven client error %w", err) } @@ -177,8 +177,8 @@ func (h ProjectHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret, } // exists checks if project already exists on Aiven side -func (h ProjectHandler) exists(avn *aiven.Client, project *v1alpha1.Project) (bool, error) { - pr, err := avn.Projects.Get(project.Name) +func (h ProjectHandler) exists(ctx context.Context, avn *aiven.Client, project *v1alpha1.Project) (bool, error) { + pr, err := avn.Projects.Get(ctx, project.Name) if aiven.IsNotFound(err) { return false, nil } @@ -187,14 +187,14 @@ func (h ProjectHandler) exists(avn *aiven.Client, project *v1alpha1.Project) (bo } // delete deletes Aiven project -func (h ProjectHandler) delete(avn *aiven.Client, i client.Object) (bool, error) { - project, err := h.convert(i) +func (h ProjectHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + project, err := h.convert(obj) if err != nil { return false, err } // Delete project on Aiven side - if err := avn.Projects.Delete(project.Name); err != nil { + if err := avn.Projects.Delete(ctx, project.Name); err != nil { var skip bool // If project not found then there is nothing to delete @@ -227,6 +227,6 @@ func (h ProjectHandler) convert(i client.Object) (*v1alpha1.Project, error) { return p, nil } -func (h ProjectHandler) checkPreconditions(_ *aiven.Client, _ client.Object) (bool, error) { +func (h ProjectHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { return true, nil } diff --git a/controllers/projectvpc_controller.go b/controllers/projectvpc_controller.go index 2791449e..7524523f 100644 --- a/controllers/projectvpc_controller.go +++ b/controllers/projectvpc_controller.go @@ -7,7 +7,7 @@ import ( "fmt" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,13 +42,13 @@ func (r *ProjectVPCReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h *ProjectVPCHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs []client.Object) error { - projectVPC, err := h.convert(i) +func (h *ProjectVPCHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + projectVPC, err := h.convert(obj) if err != nil { return err } - vpc, err := avn.VPCs.Create(projectVPC.Spec.Project, aiven.CreateVPCRequest{ + vpc, err := avn.VPCs.Create(ctx, projectVPC.Spec.Project, aiven.CreateVPCRequest{ CloudName: projectVPC.Spec.CloudName, NetworkCIDR: projectVPC.Spec.NetworkCidr, }) @@ -72,13 +72,13 @@ func (h *ProjectVPCHandler) createOrUpdate(avn *aiven.Client, i client.Object, r return nil } -func (h *ProjectVPCHandler) delete(avn *aiven.Client, i client.Object) (bool, error) { - projectVPC, err := h.convert(i) +func (h *ProjectVPCHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + projectVPC, err := h.convert(obj) if err != nil { return false, err } - vpc, err := avn.VPCs.Get(projectVPC.Spec.Project, projectVPC.Status.ID) + vpc, err := avn.VPCs.Get(ctx, projectVPC.Spec.Project, projectVPC.Status.ID) if aiven.IsNotFound(err) { return true, nil } @@ -92,7 +92,7 @@ func (h *ProjectVPCHandler) delete(avn *aiven.Client, i client.Object) (bool, er return true, nil } - err = avn.VPCs.Delete(projectVPC.Spec.Project, projectVPC.Status.ID) + err = avn.VPCs.Delete(ctx, projectVPC.Spec.Project, projectVPC.Status.ID) if isDependencyError(err) { return false, fmt.Errorf("%w: %s", v1alpha1.ErrDeleteDependencies, err) } @@ -100,13 +100,13 @@ func (h *ProjectVPCHandler) delete(avn *aiven.Client, i client.Object) (bool, er return false, nil } -func (h *ProjectVPCHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret, error) { - projectVPC, err := h.convert(i) +func (h *ProjectVPCHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + projectVPC, err := h.convert(obj) if err != nil { return nil, err } - vpc, err := avn.VPCs.Get(projectVPC.Spec.Project, projectVPC.Status.ID) + vpc, err := avn.VPCs.Get(ctx, projectVPC.Spec.Project, projectVPC.Status.ID) if err != nil { return nil, err } @@ -123,7 +123,7 @@ func (h *ProjectVPCHandler) get(avn *aiven.Client, i client.Object) (*corev1.Sec return nil, nil } -func (h *ProjectVPCHandler) checkPreconditions(_ *aiven.Client, _ client.Object) (bool, error) { +func (h *ProjectVPCHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { return true, nil } diff --git a/controllers/redis_controller.go b/controllers/redis_controller.go index 20434658..6416e0d6 100644 --- a/controllers/redis_controller.go +++ b/controllers/redis_controller.go @@ -6,7 +6,7 @@ import ( "context" "fmt" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -67,7 +67,7 @@ func (a *redisAdapter) getUserConfig() any { return &a.Spec.UserConfig } -func (a *redisAdapter) newSecret(s *aiven.Service) (*corev1.Secret, error) { +func (a *redisAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { prefix := getSecretPrefix(a) stringData := map[string]string{ prefix + "HOST": s.URIParams["host"], diff --git a/controllers/serviceintegration_controller.go b/controllers/serviceintegration_controller.go index ad2a2fec..a225954f 100644 --- a/controllers/serviceintegration_controller.go +++ b/controllers/serviceintegration_controller.go @@ -8,7 +8,7 @@ import ( "strconv" "strings" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,8 +38,8 @@ func (r *ServiceIntegrationReconciler) SetupWithManager(mgr ctrl.Manager) error Complete(r) } -func (h ServiceIntegrationHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs []client.Object) error { - si, err := h.convert(i) +func (h ServiceIntegrationHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + si, err := h.convert(obj) if err != nil { return err } @@ -58,6 +58,7 @@ func (h ServiceIntegrationHandler) createOrUpdate(avn *aiven.Client, i client.Ob } integration, err = avn.ServiceIntegrations.Create( + ctx, si.Spec.Project, aiven.CreateServiceIntegrationRequest{ DestinationEndpointID: anyOptional(si.Spec.DestinationEndpointID), @@ -82,6 +83,7 @@ func (h ServiceIntegrationHandler) createOrUpdate(avn *aiven.Client, i client.Ob } integration, err = avn.ServiceIntegrations.Update( + ctx, si.Spec.Project, si.Status.ID, aiven.UpdateServiceIntegrationRequest{ @@ -113,13 +115,13 @@ func (h ServiceIntegrationHandler) createOrUpdate(avn *aiven.Client, i client.Ob return nil } -func (h ServiceIntegrationHandler) delete(avn *aiven.Client, i client.Object) (bool, error) { - si, err := h.convert(i) +func (h ServiceIntegrationHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + si, err := h.convert(obj) if err != nil { return false, err } - err = avn.ServiceIntegrations.Delete(si.Spec.Project, si.Status.ID) + err = avn.ServiceIntegrations.Delete(ctx, si.Spec.Project, si.Status.ID) if err != nil && !aiven.IsNotFound(err) { return false, fmt.Errorf("aiven client delete service ingtegration error: %w", err) } @@ -127,8 +129,8 @@ func (h ServiceIntegrationHandler) delete(avn *aiven.Client, i client.Object) (b return true, nil } -func (h ServiceIntegrationHandler) get(_ *aiven.Client, i client.Object) (*corev1.Secret, error) { - si, err := h.convert(i) +func (h ServiceIntegrationHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + si, err := h.convert(obj) if err != nil { return nil, err } @@ -142,8 +144,8 @@ func (h ServiceIntegrationHandler) get(_ *aiven.Client, i client.Object) (*corev return nil, nil } -func (h ServiceIntegrationHandler) checkPreconditions(avn *aiven.Client, i client.Object) (bool, error) { - si, err := h.convert(i) +func (h ServiceIntegrationHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + si, err := h.convert(obj) if err != nil { return false, err } @@ -158,7 +160,7 @@ func (h ServiceIntegrationHandler) checkPreconditions(avn *aiven.Client, i clien if project == "" { project = si.Spec.Project } - running, err := checkServiceIsRunning(avn, project, si.Spec.SourceServiceName) + running, err := checkServiceIsRunning(ctx, avn, project, si.Spec.SourceServiceName) if !running || err != nil { return false, err } @@ -169,7 +171,7 @@ func (h ServiceIntegrationHandler) checkPreconditions(avn *aiven.Client, i clien if project == "" { project = si.Spec.Project } - running, err := checkServiceIsRunning(avn, project, si.Spec.DestinationServiceName) + running, err := checkServiceIsRunning(ctx, avn, project, si.Spec.DestinationServiceName) if !running || err != nil { return false, err } diff --git a/controllers/serviceuser_controller.go b/controllers/serviceuser_controller.go index af56007c..9a9a240a 100644 --- a/controllers/serviceuser_controller.go +++ b/controllers/serviceuser_controller.go @@ -7,7 +7,7 @@ import ( "fmt" "strconv" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,13 +38,13 @@ func (r *ServiceUserReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h ServiceUserHandler) createOrUpdate(avn *aiven.Client, i client.Object, refs []client.Object) error { - user, err := h.convert(i) +func (h ServiceUserHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error { + user, err := h.convert(obj) if err != nil { return err } - u, err := avn.ServiceUsers.Create(user.Spec.Project, user.Spec.ServiceName, + u, err := avn.ServiceUsers.Create(ctx, user.Spec.Project, user.Spec.ServiceName, aiven.CreateServiceUserRequest{ Username: user.Name, AccessControl: &aiven.AccessControl{ @@ -76,13 +76,13 @@ func (h ServiceUserHandler) createOrUpdate(avn *aiven.Client, i client.Object, r return nil } -func (h ServiceUserHandler) delete(avn *aiven.Client, i client.Object) (bool, error) { - user, err := h.convert(i) +func (h ServiceUserHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + user, err := h.convert(obj) if err != nil { return false, err } - err = avn.ServiceUsers.Delete(user.Spec.Project, user.Spec.ServiceName, user.Name) + err = avn.ServiceUsers.Delete(ctx, user.Spec.Project, user.Spec.ServiceName, user.Name) if !aiven.IsNotFound(err) { return false, err } @@ -90,25 +90,25 @@ func (h ServiceUserHandler) delete(avn *aiven.Client, i client.Object) (bool, er return true, nil } -func (h ServiceUserHandler) get(avn *aiven.Client, i client.Object) (*corev1.Secret, error) { - user, err := h.convert(i) +func (h ServiceUserHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) { + user, err := h.convert(obj) if err != nil { return nil, err } - u, err := avn.ServiceUsers.Get(user.Spec.Project, user.Spec.ServiceName, user.Name) + u, err := avn.ServiceUsers.Get(ctx, user.Spec.Project, user.Spec.ServiceName, user.Name) if err != nil { return nil, err } - s, err := avn.Services.Get(user.Spec.Project, user.Spec.ServiceName) + s, err := avn.Services.Get(ctx, user.Spec.Project, user.Spec.ServiceName) if err != nil { return nil, err } params := s.URIParams - caCert, err := avn.CA.Get(user.Spec.Project) + caCert, err := avn.CA.Get(ctx, user.Spec.Project) if err != nil { return nil, fmt.Errorf("aiven client error %w", err) } @@ -141,8 +141,8 @@ func (h ServiceUserHandler) get(avn *aiven.Client, i client.Object) (*corev1.Sec return newSecret(user, stringData, false), nil } -func (h ServiceUserHandler) checkPreconditions(avn *aiven.Client, i client.Object) (bool, error) { - user, err := h.convert(i) +func (h ServiceUserHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { + user, err := h.convert(obj) if err != nil { return false, err } @@ -150,7 +150,7 @@ func (h ServiceUserHandler) checkPreconditions(avn *aiven.Client, i client.Objec meta.SetStatusCondition(&user.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(avn, user.Spec.Project, user.Spec.ServiceName) + return checkServiceIsRunning(ctx, avn, user.Spec.Project, user.Spec.ServiceName) } func (h ServiceUserHandler) convert(i client.Object) (*v1alpha1.ServiceUser, error) { diff --git a/go.mod b/go.mod index a4c4127c..ab5c9e6a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/aiven/aiven-operator go 1.18 require ( - github.com/aiven/aiven-go-client v1.36.0 + github.com/aiven/aiven-go-client/v2 v2.0.0-rc.5 github.com/aiven/go-api-schemas v1.33.0 github.com/dave/jennifer v1.7.0 github.com/docker/go-units v0.5.0 diff --git a/go.sum b/go.sum index d78b526b..02038515 100644 --- a/go.sum +++ b/go.sum @@ -63,8 +63,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/aiven/aiven-go-client v1.36.0 h1:AUuldvYdk2b9wu4v7L9qx01d6ZB5VckiMgRW37yxZVE= -github.com/aiven/aiven-go-client v1.36.0/go.mod h1:3Hh1PDNcqNNCYrkU/jSAHMV/b/ynoy73fwhBPKnMe6I= +github.com/aiven/aiven-go-client/v2 v2.0.0-rc.5 h1:iftFzqK5NKsrwxVP/4rN6CG4UNKjAbyeO/ZGYZPDnQo= +github.com/aiven/aiven-go-client/v2 v2.0.0-rc.5/go.mod h1:x0xhzxWEKAwKv0xY5FvECiI6tesWshcPHvjwl0B/1SU= github.com/aiven/go-api-schemas v1.33.0 h1:ORMoEnVb04x6Ev4qezSkFvTfagSbVunwatxHxO7TxgM= github.com/aiven/go-api-schemas v1.33.0/go.mod h1:RmQ8MfxwxAP2ji9eJtP6dICOaTMcQD9b5aQT3Bp7uzI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/tests/cassandra_test.go b/tests/cassandra_test.go index a44cdb6d..121d035a 100644 --- a/tests/cassandra_test.go +++ b/tests/cassandra_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -48,6 +49,7 @@ func TestCassandra(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() name := randName("cassandra") yml := getCassandraYaml(testProject, name, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -64,7 +66,7 @@ func TestCassandra(t *testing.T) { require.NoError(t, s.GetRunning(cs, name)) // THEN - csAvn, err := avnClient.Services.Get(testProject, name) + csAvn, err := avnClient.Services.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, csAvn.Name, cs.GetName()) assert.Equal(t, "RUNNING", cs.Status.State) @@ -74,7 +76,7 @@ func TestCassandra(t *testing.T) { assert.Equal(t, "450Gib", cs.Spec.DiskSpace) assert.Equal(t, 460800, csAvn.DiskSpaceMB) assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, cs.Spec.Tags) - csResp, err := avnClient.ServiceTags.Get(testProject, name) + csResp, err := avnClient.ServiceTags.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, csResp.Tags, cs.Spec.Tags) diff --git a/tests/clickhouse_test.go b/tests/clickhouse_test.go index 9005bbb4..5a6a3ae9 100644 --- a/tests/clickhouse_test.go +++ b/tests/clickhouse_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -44,6 +45,7 @@ func TestClickhouse(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() name := randName("clickhouse") yml := getClickhouseYaml(testProject, name, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -60,7 +62,7 @@ func TestClickhouse(t *testing.T) { require.NoError(t, s.GetRunning(ch, name)) // THEN - chAvn, err := avnClient.Services.Get(testProject, name) + chAvn, err := avnClient.Services.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, chAvn.Name, ch.GetName()) assert.Equal(t, "RUNNING", ch.Status.State) @@ -68,7 +70,7 @@ func TestClickhouse(t *testing.T) { assert.Equal(t, chAvn.Plan, ch.Spec.Plan) assert.Equal(t, chAvn.CloudName, ch.Spec.CloudName) assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, ch.Spec.Tags) - chResp, err := avnClient.ServiceTags.Get(testProject, name) + chResp, err := avnClient.ServiceTags.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, chResp.Tags, ch.Spec.Tags) diff --git a/tests/clickhouseuser_test.go b/tests/clickhouseuser_test.go index 92d6ba9e..c6803d4c 100644 --- a/tests/clickhouseuser_test.go +++ b/tests/clickhouseuser_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -54,6 +55,7 @@ func TestClickhouseUser(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() chName := randName("clickhouse-user") userName := randName("clickhouse-user") yml := getClickhouseUserYaml(testProject, chName, userName, testPrimaryCloudName) @@ -71,7 +73,7 @@ func TestClickhouseUser(t *testing.T) { require.NoError(t, s.GetRunning(ch, chName)) // THEN - chAvn, err := avnClient.Services.Get(testProject, chName) + chAvn, err := avnClient.Services.Get(ctx, testProject, chName) require.NoError(t, err) assert.Equal(t, chAvn.Name, ch.GetName()) assert.Equal(t, "RUNNING", ch.Status.State) @@ -82,7 +84,7 @@ func TestClickhouseUser(t *testing.T) { user := new(v1alpha1.ClickhouseUser) require.NoError(t, s.GetRunning(user, userName)) - userAvn, err := avnClient.ClickhouseUser.Get(testProject, chName, user.Status.UUID) + userAvn, err := avnClient.ClickhouseUser.Get(ctx, testProject, chName, user.Status.UUID) require.NoError(t, err) assert.Equal(t, userName, user.GetName()) assert.Equal(t, userAvn.Name, user.GetName()) @@ -105,7 +107,7 @@ func TestClickhouseUser(t *testing.T) { // if service is deleted, user is destroyed in Aiven. No service — no user. No user — no user. // And we make sure that controller can delete user itself assert.NoError(t, s.Delete(user, func() error { - _, err = avnClient.ClickhouseUser.Get(testProject, chName, user.Status.UUID) + _, err = avnClient.ClickhouseUser.Get(ctx, testProject, chName, user.Status.UUID) return err })) } diff --git a/tests/connectionpool_test.go b/tests/connectionpool_test.go index efa9e30e..98a14e33 100644 --- a/tests/connectionpool_test.go +++ b/tests/connectionpool_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -78,6 +79,7 @@ func TestConnectionPool(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() pgName := randName("connection-pool") dbName := randName("connection-pool") userName := randName("connection-pool") @@ -107,7 +109,7 @@ func TestConnectionPool(t *testing.T) { // THEN // Validates PostgreSQL - pgAvn, err := avnClient.Services.Get(testProject, pgName) + pgAvn, err := avnClient.Services.Get(ctx, testProject, pgName) require.NoError(t, err) assert.Equal(t, pgAvn.Name, pg.GetName()) assert.Equal(t, "RUNNING", pg.Status.State) @@ -116,20 +118,20 @@ func TestConnectionPool(t *testing.T) { assert.Equal(t, pgAvn.CloudName, pg.Spec.CloudName) // Validates Database - dbAvn, err := avnClient.Databases.Get(testProject, pgName, dbName) + dbAvn, err := avnClient.Databases.Get(ctx, testProject, pgName, dbName) require.NoError(t, err) assert.Equal(t, dbName, db.GetName()) assert.Equal(t, dbAvn.DatabaseName, db.GetName()) // Validates ServiceUser - userAvn, err := avnClient.ServiceUsers.Get(testProject, pgName, userName) + userAvn, err := avnClient.ServiceUsers.Get(ctx, testProject, pgName, userName) require.NoError(t, err) assert.Equal(t, userName, user.GetName()) assert.Equal(t, userName, userAvn.Username) assert.Equal(t, pgName, user.Spec.ServiceName) // Validates ConnectionPool - poolAvn, err := avnClient.ConnectionPools.Get(testProject, pgName, poolName) + poolAvn, err := avnClient.ConnectionPools.Get(ctx, testProject, pgName, poolName) require.NoError(t, err) assert.Equal(t, pgName, pool.Spec.ServiceName) assert.Equal(t, poolName, pool.GetName()) @@ -168,7 +170,7 @@ func TestConnectionPool(t *testing.T) { // if service is deleted, pool is destroyed in Aiven. No service — no pool. No pool — no pool. // And we make sure that controller can delete db itself assert.NoError(t, s.Delete(pool, func() error { - _, err = avnClient.ConnectionPools.Get(testProject, pgName, poolName) + _, err = avnClient.ConnectionPools.Get(ctx, testProject, pgName, poolName) return err })) } diff --git a/tests/database_test.go b/tests/database_test.go index 78ba0457..a2db6e7a 100644 --- a/tests/database_test.go +++ b/tests/database_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -50,6 +51,7 @@ func TestDatabase(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() pgName := randName("database") dbName := randName("database") yml := getDatabaseYaml(testProject, pgName, dbName, testPrimaryCloudName) @@ -71,7 +73,7 @@ func TestDatabase(t *testing.T) { // THEN // Validates PostgreSQL - pgAvn, err := avnClient.Services.Get(testProject, pgName) + pgAvn, err := avnClient.Services.Get(ctx, testProject, pgName) require.NoError(t, err) assert.Equal(t, pgAvn.Name, pg.GetName()) assert.Equal(t, "RUNNING", pg.Status.State) @@ -80,7 +82,7 @@ func TestDatabase(t *testing.T) { assert.Equal(t, pgAvn.CloudName, pg.Spec.CloudName) // Validates Database - dbAvn, err := avnClient.Databases.Get(testProject, pgName, dbName) + dbAvn, err := avnClient.Databases.Get(ctx, testProject, pgName, dbName) require.NoError(t, err) assert.Equal(t, dbName, db.GetName()) assert.Equal(t, dbAvn.DatabaseName, db.GetName()) @@ -94,7 +96,7 @@ func TestDatabase(t *testing.T) { // if service is deleted, db is destroyed in Aiven. No service — no db. No db — no db. // And we make sure that controller can delete db itself assert.NoError(t, s.Delete(db, func() error { - _, err = avnClient.Databases.Get(testProject, pgName, dbName) + _, err = avnClient.Databases.Get(ctx, testProject, pgName, dbName) return err })) } diff --git a/tests/generic_service_handler_test.go b/tests/generic_service_handler_test.go index ae5ca4f4..c6a1aabf 100644 --- a/tests/generic_service_handler_test.go +++ b/tests/generic_service_handler_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -54,6 +55,7 @@ func TestCreateUpdateService(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() pgName := randName("generic-handler") ymlCreate := getCreateServiceYaml(testProject, pgName) s := NewSession(k8sClient, avnClient, testProject) @@ -71,7 +73,7 @@ func TestCreateUpdateService(t *testing.T) { // THEN // Validates tags - tagsCreatedAvn, err := avnClient.ServiceTags.Get(testProject, pgName) + tagsCreatedAvn, err := avnClient.ServiceTags.Get(ctx, testProject, pgName) require.NoError(t, err) assert.Equal(t, map[string]string{"env": "prod", "instance": "master"}, pg.Spec.Tags) @@ -84,7 +86,7 @@ func TestCreateUpdateService(t *testing.T) { pgUpdated := new(v1alpha1.PostgreSQL) require.NoError(t, s.GetRunning(pgUpdated, pgName)) - tagsUpdatedAvn, err := avnClient.ServiceTags.Get(testProject, pgName) + tagsUpdatedAvn, err := avnClient.ServiceTags.Get(ctx, testProject, pgName) require.NoError(t, err) assert.Empty(t, tagsUpdatedAvn.Tags) // cleared tags } diff --git a/tests/grafana_test.go b/tests/grafana_test.go index ebbed62a..a8275451 100644 --- a/tests/grafana_test.go +++ b/tests/grafana_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -47,6 +48,7 @@ func TestGrafana(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() name := randName("grafana") yml := getGrafanaYaml(testProject, name, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -63,7 +65,7 @@ func TestGrafana(t *testing.T) { require.NoError(t, s.GetRunning(grafana, name)) // THEN - grafanaAvn, err := avnClient.Services.Get(testProject, name) + grafanaAvn, err := avnClient.Services.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, grafanaAvn.Name, grafana.GetName()) assert.Equal(t, "RUNNING", grafana.Status.State) @@ -71,7 +73,7 @@ func TestGrafana(t *testing.T) { assert.Equal(t, grafanaAvn.Plan, grafana.Spec.Plan) assert.Equal(t, grafanaAvn.CloudName, grafana.Spec.CloudName) assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, grafana.Spec.Tags) - grafanaResp, err := avnClient.ServiceTags.Get(testProject, name) + grafanaResp, err := avnClient.ServiceTags.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, grafanaResp.Tags, grafana.Spec.Tags) diff --git a/tests/kafka_test.go b/tests/kafka_test.go index 72d059ff..6ba75e2f 100644 --- a/tests/kafka_test.go +++ b/tests/kafka_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -49,6 +50,7 @@ func TestKafka(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() name := randName("kafka") yml := getKafkaYaml(testProject, name, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -65,7 +67,7 @@ func TestKafka(t *testing.T) { require.NoError(t, s.GetRunning(ks, name)) // THEN - ksAvn, err := avnClient.Services.Get(testProject, name) + ksAvn, err := avnClient.Services.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, ksAvn.Name, ks.GetName()) assert.Equal(t, "RUNNING", ks.Status.State) @@ -75,7 +77,7 @@ func TestKafka(t *testing.T) { assert.Equal(t, "600Gib", ks.Spec.DiskSpace) assert.Equal(t, 614400, ksAvn.DiskSpaceMB) assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, ks.Spec.Tags) - ksResp, err := avnClient.ServiceTags.Get(testProject, name) + ksResp, err := avnClient.ServiceTags.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, ksResp.Tags, ks.Spec.Tags) diff --git a/tests/kafka_with_projectvpc_ref_test.go b/tests/kafka_with_projectvpc_ref_test.go index 53000d99..3f17d9f7 100644 --- a/tests/kafka_with_projectvpc_ref_test.go +++ b/tests/kafka_with_projectvpc_ref_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -51,6 +52,7 @@ func TestKafkaWithProjectVPCRef(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() vpcName := randName("kafka-vpc") kafkaName := randName("kafka-vpc") yml := getKafkaWithProjectVPCRefYaml(testProject, vpcName, kafkaName, testPrimaryCloudName) @@ -71,7 +73,7 @@ func TestKafkaWithProjectVPCRef(t *testing.T) { require.NoError(t, s.GetRunning(vpc, vpcName)) // THEN - kafkaAvn, err := avnClient.Services.Get(testProject, kafkaName) + kafkaAvn, err := avnClient.Services.Get(ctx, testProject, kafkaName) require.NoError(t, err) assert.Equal(t, kafkaAvn.Name, kafka.GetName()) assert.Equal(t, "RUNNING", kafka.Status.State) @@ -84,7 +86,7 @@ func TestKafkaWithProjectVPCRef(t *testing.T) { assert.Equal(t, vpcName, kafka.Spec.ProjectVPCRef.Name) assert.Equal(t, anyPointer(vpc.Status.ID), kafkaAvn.ProjectVPCID) - vpcAvn, err := avnClient.VPCs.Get(testProject, vpc.Status.ID) + vpcAvn, err := avnClient.VPCs.Get(ctx, testProject, vpc.Status.ID) require.NoError(t, err) assert.Equal(t, "ACTIVE", vpcAvn.State) assert.Equal(t, vpcAvn.State, vpc.Status.State) diff --git a/tests/kafkaacl_test.go b/tests/kafkaacl_test.go index aa472f83..299528b6 100644 --- a/tests/kafkaacl_test.go +++ b/tests/kafkaacl_test.go @@ -68,6 +68,7 @@ func TestKafkaACL(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() kafkaName := randName("kafka-acl") topicName := randName("kafka-acl") aclName := randName("kafka-acl") @@ -93,7 +94,7 @@ func TestKafkaACL(t *testing.T) { // THEN // Kafka - kafkaAvn, err := avnClient.Services.Get(testProject, kafkaName) + kafkaAvn, err := avnClient.Services.Get(ctx, testProject, kafkaName) require.NoError(t, err) assert.Equal(t, kafkaAvn.Name, kafka.GetName()) assert.Equal(t, "RUNNING", kafka.Status.State) @@ -102,7 +103,7 @@ func TestKafkaACL(t *testing.T) { assert.Equal(t, kafkaAvn.CloudName, kafka.Spec.CloudName) // KafkaTopic - topicAvn, err := avnClient.KafkaTopics.Get(testProject, kafkaName, topic.GetTopicName()) + topicAvn, err := avnClient.KafkaTopics.Get(ctx, testProject, kafkaName, topic.GetTopicName()) require.NoError(t, err) assert.Equal(t, topicName, topic.GetName()) assert.Equal(t, topicName, topic.GetTopicName()) @@ -112,7 +113,7 @@ func TestKafkaACL(t *testing.T) { assert.Len(t, topicAvn.Partitions, topic.Spec.Partitions) // KafkaACL - aclAvn, err := avnClient.KafkaACLs.Get(testProject, kafkaName, acl.Status.ID) + aclAvn, err := avnClient.KafkaACLs.Get(ctx, testProject, kafkaName, acl.Status.ID) require.NoError(t, err) assert.True(t, meta.IsStatusConditionTrue(acl.Status.Conditions, "Running")) assert.Equal(t, "admin", acl.Spec.Permission) @@ -124,7 +125,6 @@ func TestKafkaACL(t *testing.T) { // KafkaACL Update // We check that update changes the ID - ctx := context.Background() aclCopy := acl.DeepCopyObject().(*v1alpha1.KafkaACL) aclCopy.Spec.Permission = "write" require.NoError(t, k8sClient.Update(ctx, aclCopy)) @@ -136,14 +136,14 @@ func TestKafkaACL(t *testing.T) { assert.NotEqual(t, aclWrite.Status.ID, acl.Status.ID) // Permission has changed on Aiven side too - aclWriteAvn, err := avnClient.KafkaACLs.Get(testProject, kafkaName, aclWrite.Status.ID) + aclWriteAvn, err := avnClient.KafkaACLs.Get(ctx, testProject, kafkaName, aclWrite.Status.ID) require.NoError(t, err) assert.Equal(t, "write", aclWrite.Spec.Permission) assert.Equal(t, aclWriteAvn.Permission, aclWrite.Spec.Permission) // Validate delete by new ID assert.NoError(t, s.Delete(aclWrite, func() error { - _, err = avnClient.KafkaACLs.Get(testProject, kafkaName, aclWrite.Status.ID) + _, err = avnClient.KafkaACLs.Get(ctx, testProject, kafkaName, aclWrite.Status.ID) return err })) } diff --git a/tests/kafkaconnect_test.go b/tests/kafkaconnect_test.go index ba0ce5c0..32978b36 100644 --- a/tests/kafkaconnect_test.go +++ b/tests/kafkaconnect_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -47,6 +48,7 @@ func TestKafkaConnect(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() name := randName("kafka-connect") yml := getKafkaConnectYaml(testProject, name, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -63,7 +65,7 @@ func TestKafkaConnect(t *testing.T) { require.NoError(t, s.GetRunning(kc, name)) // THEN - kcAvn, err := avnClient.Services.Get(testProject, name) + kcAvn, err := avnClient.Services.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, kcAvn.Name, kc.GetName()) assert.Equal(t, "RUNNING", kc.Status.State) diff --git a/tests/kafkaschema_test.go b/tests/kafkaschema_test.go index 82874088..8d4e60a0 100644 --- a/tests/kafkaschema_test.go +++ b/tests/kafkaschema_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "encoding/json" "fmt" "testing" @@ -66,6 +67,7 @@ func TestKafkaSchema(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() kafkaName := randName("kafka-schema") schemaName := randName("kafka-schema") subjectName := randName("kafka-schema") @@ -85,7 +87,7 @@ func TestKafkaSchema(t *testing.T) { // THEN // Kafka test - kafkaAvn, err := avnClient.Services.Get(testProject, kafkaName) + kafkaAvn, err := avnClient.Services.Get(ctx, testProject, kafkaName) require.NoError(t, err) assert.Equal(t, kafkaAvn.Name, kafka.GetName()) assert.Equal(t, "RUNNING", kafka.Status.State) @@ -135,7 +137,7 @@ func TestKafkaSchema(t *testing.T) { // Validates deleting, because deleted kafka drops schemas, and we want be sure deletion works assert.NoError(t, s.Delete(schema, func() error { - _, err := avnClient.KafkaSubjectSchemas.Get(testProject, kafkaName, subjectName, 1) + _, err := avnClient.KafkaSubjectSchemas.Get(ctx, testProject, kafkaName, subjectName, 1) return err })) } diff --git a/tests/kafkatopic_test.go b/tests/kafkatopic_test.go index 0de5679c..b2ff1d01 100644 --- a/tests/kafkatopic_test.go +++ b/tests/kafkatopic_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -70,6 +71,7 @@ func TestKafkaTopic(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() ksName := randName("kafka-topic") yml := getKafkaTopicNameYaml(testProject, ksName, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -94,7 +96,7 @@ func TestKafkaTopic(t *testing.T) { // THEN // Validates Kafka - ksAvn, err := avnClient.Services.Get(testProject, ksName) + ksAvn, err := avnClient.Services.Get(ctx, testProject, ksName) require.NoError(t, err) assert.Equal(t, ksAvn.Name, ks.GetName()) assert.Equal(t, ksAvn.State, ks.Status.State) @@ -106,7 +108,7 @@ func TestKafkaTopic(t *testing.T) { assert.True(t, meta.IsStatusConditionTrue(barTopic.Status.Conditions, "Running")) // KafkaTopic with name `foo-topic` - fooAvn, err := avnClient.KafkaTopics.Get(testProject, ksName, fooTopic.GetTopicName()) + fooAvn, err := avnClient.KafkaTopics.Get(ctx, testProject, ksName, fooTopic.GetTopicName()) require.NoError(t, err) assert.Equal(t, "foo-topic", fooTopic.GetName()) assert.Equal(t, "foo-topic", fooTopic.GetTopicName()) @@ -120,7 +122,7 @@ func TestKafkaTopic(t *testing.T) { require.Equal(t, anyPointer(0.2), fooTopic.Spec.Config.MinCleanableDirtyRatio) // KafkaTopic with name `bar_topic_name_with_underscores` - barAvn, err := avnClient.KafkaTopics.Get(testProject, ksName, barTopic.GetTopicName()) + barAvn, err := avnClient.KafkaTopics.Get(ctx, testProject, ksName, barTopic.GetTopicName()) require.NoError(t, err) assert.Equal(t, "bar-topic", barTopic.GetName()) assert.Equal(t, "bar_topic_name_with_underscores", barTopic.GetTopicName()) @@ -138,12 +140,12 @@ func TestKafkaTopic(t *testing.T) { // if service is deleted, topic is destroyed in Aiven. No service — no topic. No topic — no topic. // And we make sure that controller can delete topic itself assert.NoError(t, s.Delete(fooTopic, func() error { - _, err = avnClient.KafkaTopics.Get(testProject, ksName, fooTopic.Name) + _, err = avnClient.KafkaTopics.Get(ctx, testProject, ksName, fooTopic.Name) return err })) assert.NoError(t, s.Delete(barTopic, func() error { - _, err = avnClient.KafkaTopics.Get(testProject, ksName, "bar_topic_name_with_underscores") + _, err = avnClient.KafkaTopics.Get(ctx, testProject, ksName, "bar_topic_name_with_underscores") return err })) } diff --git a/tests/mysql_test.go b/tests/mysql_test.go index b10f5a79..2bb6949c 100644 --- a/tests/mysql_test.go +++ b/tests/mysql_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -47,6 +48,7 @@ func TestMySQL(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() name := randName("mysql") yml := getMySQLYaml(testProject, name, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -63,7 +65,7 @@ func TestMySQL(t *testing.T) { require.NoError(t, s.GetRunning(ms, name)) // THEN - msAvn, err := avnClient.Services.Get(testProject, name) + msAvn, err := avnClient.Services.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, msAvn.Name, ms.GetName()) assert.Equal(t, "RUNNING", ms.Status.State) @@ -73,7 +75,7 @@ func TestMySQL(t *testing.T) { assert.Equal(t, "100Gib", ms.Spec.DiskSpace) assert.Equal(t, 102400, msAvn.DiskSpaceMB) assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, ms.Spec.Tags) - msResp, err := avnClient.ServiceTags.Get(testProject, name) + msResp, err := avnClient.ServiceTags.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, msResp.Tags, ms.Spec.Tags) diff --git a/tests/opensearch_test.go b/tests/opensearch_test.go index 206bd58c..085597e9 100644 --- a/tests/opensearch_test.go +++ b/tests/opensearch_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -52,6 +53,7 @@ func TestOpenSearch(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() name := randName("opensearch") yml := getOpenSearchYaml(testProject, name, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -68,7 +70,7 @@ func TestOpenSearch(t *testing.T) { require.NoError(t, s.GetRunning(os, name)) // THEN - osAvn, err := avnClient.Services.Get(testProject, name) + osAvn, err := avnClient.Services.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, osAvn.Name, os.GetName()) assert.Equal(t, "RUNNING", os.Status.State) @@ -78,7 +80,7 @@ func TestOpenSearch(t *testing.T) { assert.Equal(t, "240Gib", os.Spec.DiskSpace) assert.Equal(t, 245760, osAvn.DiskSpaceMB) assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, os.Spec.Tags) - osResp, err := avnClient.ServiceTags.Get(testProject, name) + osResp, err := avnClient.ServiceTags.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, osResp.Tags, os.Spec.Tags) diff --git a/tests/postgresql_test.go b/tests/postgresql_test.go index 61a34174..408ad0db 100644 --- a/tests/postgresql_test.go +++ b/tests/postgresql_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -65,6 +66,7 @@ func TestPgReadReplica(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() masterName := randName("pg-master") replicaName := randName("pg-replica") yml := getPgReadReplicaYaml(testProject, masterName, replicaName, testPrimaryCloudName) @@ -86,7 +88,7 @@ func TestPgReadReplica(t *testing.T) { // THEN // Validates instances - masterAvn, err := avnClient.Services.Get(testProject, masterName) + masterAvn, err := avnClient.Services.Get(ctx, testProject, masterName) require.NoError(t, err) assert.Equal(t, masterAvn.Name, master.GetName()) assert.Equal(t, "RUNNING", master.Status.State) @@ -96,11 +98,11 @@ func TestPgReadReplica(t *testing.T) { assert.NotNil(t, masterAvn.UserConfig) // "Aiven instance has defaults set" assert.Nil(t, master.Spec.UserConfig) assert.Equal(t, map[string]string{"env": "prod", "instance": "master"}, master.Spec.Tags) - masterResp, err := avnClient.ServiceTags.Get(testProject, masterName) + masterResp, err := avnClient.ServiceTags.Get(ctx, testProject, masterName) require.NoError(t, err) assert.Equal(t, masterResp.Tags, master.Spec.Tags) - replicaAvn, err := avnClient.Services.Get(testProject, replicaName) + replicaAvn, err := avnClient.Services.Get(ctx, testProject, replicaName) require.NoError(t, err) assert.Equal(t, replicaAvn.Name, replica.GetName()) assert.Equal(t, "RUNNING", replica.Status.State) @@ -108,7 +110,7 @@ func TestPgReadReplica(t *testing.T) { assert.Equal(t, replicaAvn.Plan, replica.Spec.Plan) assert.Equal(t, replicaAvn.CloudName, replica.Spec.CloudName) assert.Equal(t, map[string]string{"env": "test", "instance": "replica"}, replica.Spec.Tags) - replicaResp, err := avnClient.ServiceTags.Get(testProject, replicaName) + replicaResp, err := avnClient.ServiceTags.Get(ctx, testProject, replicaName) require.NoError(t, err) assert.Equal(t, replicaResp.Tags, replica.Spec.Tags) @@ -180,6 +182,7 @@ func TestPgCustomPrefix(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() pgName := randName("secret-prefix") yml := getPgCustomPrefixYaml(testProject, pgName, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -197,7 +200,7 @@ func TestPgCustomPrefix(t *testing.T) { // THEN // Validates instance - pgAvn, err := avnClient.Services.Get(testProject, pgName) + pgAvn, err := avnClient.Services.Get(ctx, testProject, pgName) require.NoError(t, err) assert.Equal(t, pgAvn.Name, pg.GetName()) assert.Equal(t, "RUNNING", pg.Status.State) @@ -205,7 +208,7 @@ func TestPgCustomPrefix(t *testing.T) { assert.Equal(t, pgAvn.Plan, pg.Spec.Plan) assert.Equal(t, pgAvn.CloudName, pg.Spec.CloudName) assert.Equal(t, map[string]string{"env": "prod", "instance": "pg"}, pg.Spec.Tags) - masterResp, err := avnClient.ServiceTags.Get(testProject, pgName) + masterResp, err := avnClient.ServiceTags.Get(ctx, testProject, pgName) require.NoError(t, err) assert.Equal(t, masterResp.Tags, pg.Spec.Tags) diff --git a/tests/project_test.go b/tests/project_test.go index 4e1867ed..cca1c88a 100644 --- a/tests/project_test.go +++ b/tests/project_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -34,6 +35,7 @@ func TestProject(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() name := randName("project") yml := getProjectYaml(name) s := NewSession(k8sClient, avnClient, testProject) @@ -51,7 +53,7 @@ func TestProject(t *testing.T) { // THEN // Validates Project - projectAvn, err := avnClient.Projects.Get(name) + projectAvn, err := avnClient.Projects.Get(ctx, name) require.NoError(t, err) assert.Equal(t, name, project.GetName()) assert.Equal(t, projectAvn.Name, project.GetName()) diff --git a/tests/projectvpc_test.go b/tests/projectvpc_test.go index 64fc6ff7..753b9ce5 100644 --- a/tests/projectvpc_test.go +++ b/tests/projectvpc_test.go @@ -5,7 +5,7 @@ import ( "fmt" "testing" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/types" @@ -70,6 +70,7 @@ func TestProjectVPCID(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() vpcName1 := randName("project-vpc-id") vpcName2 := randName("project-vpc-id") vpcYaml := getProjectVPCsYaml(testProject, vpcName1, testSecondaryCloudName, vpcName2, testTertiaryCloudName) @@ -91,7 +92,7 @@ func TestProjectVPCID(t *testing.T) { // THEN // Validates VPC - vpc1Avn, err := avnClient.VPCs.Get(testProject, vpc1.Status.ID) + vpc1Avn, err := avnClient.VPCs.Get(ctx, testProject, vpc1.Status.ID) require.NoError(t, err) assert.Equal(t, "ACTIVE", vpc1Avn.State) assert.Equal(t, vpc1Avn.State, vpc1.Status.State) @@ -99,7 +100,7 @@ func TestProjectVPCID(t *testing.T) { assert.Equal(t, "10.0.0.0/24", vpc1.Spec.NetworkCidr) assert.Equal(t, vpc1Avn.NetworkCIDR, vpc1.Spec.NetworkCidr) - vpc2Avn, err := avnClient.VPCs.Get(testProject, vpc2.Status.ID) + vpc2Avn, err := avnClient.VPCs.Get(ctx, testProject, vpc2.Status.ID) require.NoError(t, err) assert.Equal(t, "ACTIVE", vpc2Avn.State) assert.Equal(t, vpc2Avn.State, vpc2.Status.State) @@ -124,7 +125,7 @@ func TestProjectVPCID(t *testing.T) { require.NoError(t, kafkaSession.GetRunning(kafka, kafkaName)) // THEN - kafkaAvn, err := avnClient.Services.Get(testProject, kafkaName) + kafkaAvn, err := avnClient.Services.Get(ctx, testProject, kafkaName) require.NoError(t, err) assert.Equal(t, kafkaAvn.Name, kafka.GetName()) assert.Equal(t, "RUNNING", kafka.Status.State) @@ -141,14 +142,13 @@ func TestProjectVPCID(t *testing.T) { require.NoError(t, kafkaSession.Apply(kafkaYamlUpd)) // This migration takes too long, so we don't wait it's being in the RUNNING state in kube - ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, waitRunningTimeout) defer cancel() // Gets Aiven object var kafkaAvnUpd *aiven.Service require.NoError(t, retryForever(ctx, func() (bool, error) { - kafkaAvnUpd, err = avnClient.Services.Get(testProject, kafkaName) + kafkaAvnUpd, err = avnClient.Services.Get(ctx, testProject, kafkaName) if err != nil { return false, err } diff --git a/tests/redis_test.go b/tests/redis_test.go index 0fa676f6..2578ec69 100644 --- a/tests/redis_test.go +++ b/tests/redis_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -44,6 +45,7 @@ func TestRedis(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() name := randName("redis") yml := getRedisYaml(testProject, name, testPrimaryCloudName) s := NewSession(k8sClient, avnClient, testProject) @@ -60,7 +62,7 @@ func TestRedis(t *testing.T) { require.NoError(t, s.GetRunning(rs, name)) // THEN - rsAvn, err := avnClient.Services.Get(testProject, name) + rsAvn, err := avnClient.Services.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, rsAvn.Name, rs.GetName()) assert.Equal(t, "RUNNING", rs.Status.State) @@ -68,7 +70,7 @@ func TestRedis(t *testing.T) { assert.Equal(t, rsAvn.Plan, rs.Spec.Plan) assert.Equal(t, rsAvn.CloudName, rs.Spec.CloudName) assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, rs.Spec.Tags) - rsResp, err := avnClient.ServiceTags.Get(testProject, name) + rsResp, err := avnClient.ServiceTags.Get(ctx, testProject, name) require.NoError(t, err) assert.Equal(t, rsResp.Tags, rs.Spec.Tags) diff --git a/tests/serviceintegration_test.go b/tests/serviceintegration_test.go index e229fd77..d0e33762 100644 --- a/tests/serviceintegration_test.go +++ b/tests/serviceintegration_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "os" "testing" @@ -77,6 +78,7 @@ func TestServiceIntegrationClickhousePostgreSQL(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() chName := randName("clickhouse-postgresql") pgName := randName("clickhouse-postgresql") siName := randName("clickhouse-postgresql") @@ -103,7 +105,7 @@ func TestServiceIntegrationClickhousePostgreSQL(t *testing.T) { // THEN // Validates Clickhouse - chAvn, err := avnClient.Services.Get(testProject, chName) + chAvn, err := avnClient.Services.Get(ctx, testProject, chName) require.NoError(t, err) assert.Equal(t, chAvn.Name, ch.GetName()) assert.Equal(t, chAvn.State, ch.Status.State) @@ -113,7 +115,7 @@ func TestServiceIntegrationClickhousePostgreSQL(t *testing.T) { assert.Equal(t, chAvn.MaintenanceWindow.TimeOfDay, ch.Spec.MaintenanceWindowTime) // Validates PostgreSQL - pgAvn, err := avnClient.Services.Get(testProject, pgName) + pgAvn, err := avnClient.Services.Get(ctx, testProject, pgName) require.NoError(t, err) assert.Equal(t, pgAvn.Name, pg.GetName()) assert.Equal(t, pgAvn.State, pg.Status.State) @@ -124,7 +126,7 @@ func TestServiceIntegrationClickhousePostgreSQL(t *testing.T) { assert.Equal(t, pgAvn.UserConfig["pg_version"].(string), *pg.Spec.UserConfig.PgVersion) // Validates ServiceIntegration - siAvn, err := avnClient.ServiceIntegrations.Get(testProject, si.Status.ID) + siAvn, err := avnClient.ServiceIntegrations.Get(ctx, testProject, si.Status.ID) require.NoError(t, err) assert.Equal(t, "clickhouse_postgresql", siAvn.IntegrationType) assert.Equal(t, siAvn.IntegrationType, si.Spec.IntegrationType) @@ -191,6 +193,7 @@ func TestServiceIntegrationKafkaLogs(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() ksName := randName("kafka-logs") ktName := randName("kafka-logs") siName := randName("kafka-logs") @@ -217,7 +220,7 @@ func TestServiceIntegrationKafkaLogs(t *testing.T) { // THEN // Validates Kafka - ksAvn, err := avnClient.Services.Get(testProject, ksName) + ksAvn, err := avnClient.Services.Get(ctx, testProject, ksName) require.NoError(t, err) assert.Equal(t, ksAvn.Name, ks.GetName()) assert.Equal(t, ksAvn.State, ks.Status.State) @@ -225,7 +228,7 @@ func TestServiceIntegrationKafkaLogs(t *testing.T) { assert.Equal(t, ksAvn.CloudName, ks.Spec.CloudName) // Validates KafkaTopic - ktAvn, err := avnClient.KafkaTopics.Get(testProject, ksName, ktName) + ktAvn, err := avnClient.KafkaTopics.Get(ctx, testProject, ksName, ktName) require.NoError(t, err) assert.Equal(t, ktAvn.TopicName, kt.GetName()) assert.Equal(t, ktAvn.State, kt.Status.State) @@ -233,7 +236,7 @@ func TestServiceIntegrationKafkaLogs(t *testing.T) { assert.Len(t, ktAvn.Partitions, kt.Spec.Partitions) // Validates ServiceIntegration - siAvn, err := avnClient.ServiceIntegrations.Get(testProject, si.Status.ID) + siAvn, err := avnClient.ServiceIntegrations.Get(ctx, testProject, si.Status.ID) require.NoError(t, err) assert.Equal(t, "kafka_logs", siAvn.IntegrationType) assert.Equal(t, siAvn.IntegrationType, si.Spec.IntegrationType) @@ -310,6 +313,7 @@ func TestServiceIntegrationKafkaConnect(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() ksName := randName("kafka-connect") kcName := randName("kafka-connect") siName := randName("kafka-connect") @@ -336,7 +340,7 @@ func TestServiceIntegrationKafkaConnect(t *testing.T) { // THEN // Validates Kafka - ksAvn, err := avnClient.Services.Get(testProject, ksName) + ksAvn, err := avnClient.Services.Get(ctx, testProject, ksName) require.NoError(t, err) assert.Equal(t, ksAvn.Name, ks.GetName()) assert.Equal(t, ksAvn.State, ks.Status.State) @@ -344,7 +348,7 @@ func TestServiceIntegrationKafkaConnect(t *testing.T) { assert.Equal(t, ksAvn.CloudName, ks.Spec.CloudName) // Validates KafkaConnect - kcAvn, err := avnClient.Services.Get(testProject, kcName) + kcAvn, err := avnClient.Services.Get(ctx, testProject, kcName) require.NoError(t, err) assert.Equal(t, kcAvn.Name, kc.GetName()) assert.Equal(t, kcAvn.State, kc.Status.State) @@ -354,7 +358,7 @@ func TestServiceIntegrationKafkaConnect(t *testing.T) { assert.True(t, *kc.Spec.UserConfig.PublicAccess.KafkaConnect) // Validates ServiceIntegration - siAvn, err := avnClient.ServiceIntegrations.Get(testProject, si.Status.ID) + siAvn, err := avnClient.ServiceIntegrations.Get(ctx, testProject, si.Status.ID) require.NoError(t, err) assert.Equal(t, "kafka_connect", siAvn.IntegrationType) assert.Equal(t, siAvn.IntegrationType, si.Spec.IntegrationType) @@ -418,6 +422,7 @@ func TestServiceIntegrationDatadog(t *testing.T) { } // GIVEN + ctx := context.Background() pgName := randName("datadog") siName := randName("datadog") @@ -440,14 +445,14 @@ func TestServiceIntegrationDatadog(t *testing.T) { // THEN // Validates PostgreSQL - pgAvn, err := avnClient.Services.Get(testProject, pgName) + pgAvn, err := avnClient.Services.Get(ctx, testProject, pgName) require.NoError(t, err) assert.Equal(t, pgAvn.Name, pg.GetName()) assert.Equal(t, pgAvn.State, pg.Status.State) assert.Equal(t, pgAvn.Plan, pg.Spec.Plan) // Validates Datadog - siAvn, err := avnClient.ServiceIntegrations.Get(testProject, si.Status.ID) + siAvn, err := avnClient.ServiceIntegrations.Get(ctx, testProject, si.Status.ID) require.NoError(t, err) assert.Equal(t, "datadog", siAvn.IntegrationType) assert.Equal(t, siAvn.IntegrationType, si.Spec.IntegrationType) diff --git a/tests/serviceuser_test.go b/tests/serviceuser_test.go index f9114ab5..9a85eb5d 100644 --- a/tests/serviceuser_test.go +++ b/tests/serviceuser_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "testing" @@ -53,6 +54,7 @@ func TestServiceUser(t *testing.T) { defer recoverPanic(t) // GIVEN + ctx := context.Background() pgName := randName("connection-pool") userName := randName("connection-pool") yml := getServiceUserYaml(testProject, pgName, userName, testPrimaryCloudName) @@ -74,7 +76,7 @@ func TestServiceUser(t *testing.T) { // THEN // Validates PostgreSQL - pgAvn, err := avnClient.Services.Get(testProject, pgName) + pgAvn, err := avnClient.Services.Get(ctx, testProject, pgName) require.NoError(t, err) assert.Equal(t, pgAvn.Name, pg.GetName()) assert.Equal(t, "RUNNING", pg.Status.State) @@ -83,7 +85,7 @@ func TestServiceUser(t *testing.T) { assert.Equal(t, pgAvn.CloudName, pg.Spec.CloudName) // Validates ServiceUser - userAvn, err := avnClient.ServiceUsers.Get(testProject, pgName, userName) + userAvn, err := avnClient.ServiceUsers.Get(ctx, testProject, pgName, userName) require.NoError(t, err) assert.Equal(t, userName, user.GetName()) assert.Equal(t, userName, userAvn.Username) @@ -114,7 +116,7 @@ func TestServiceUser(t *testing.T) { // if service is deleted, pool is destroyed in Aiven. No service — no pool. No pool — no pool. // And we make sure that controller can delete db itself assert.NoError(t, s.Delete(user, func() error { - _, err = avnClient.ServiceUsers.Get(testProject, pgName, userName) + _, err = avnClient.ServiceUsers.Get(ctx, testProject, pgName, userName) return err })) } diff --git a/tests/session.go b/tests/session.go index 4f967dad..b023fd98 100644 --- a/tests/session.go +++ b/tests/session.go @@ -13,7 +13,7 @@ import ( "sync" "time" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" diff --git a/tests/suite_test.go b/tests/suite_test.go index c8b1359a..66f91993 100644 --- a/tests/suite_test.go +++ b/tests/suite_test.go @@ -9,7 +9,7 @@ import ( "strconv" "testing" - "github.com/aiven/aiven-go-client" + "github.com/aiven/aiven-go-client/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme"