From 66c36d4d94535ac940adf28b82445a3d37316f1a Mon Sep 17 00:00:00 2001 From: williamlardier Date: Mon, 24 Jun 2024 09:40:37 +0200 Subject: [PATCH 1/6] ZENKO-4833: rename quota teardown function --- tests/ctst/common/hooks.ts | 4 ++-- tests/ctst/steps/quotas/quotas.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/ctst/common/hooks.ts b/tests/ctst/common/hooks.ts index e5ecef70eb..80c8fd4c37 100644 --- a/tests/ctst/common/hooks.ts +++ b/tests/ctst/common/hooks.ts @@ -6,7 +6,7 @@ import { } from '@cucumber/cucumber'; import Zenko from '../world/Zenko'; import { Identity } from 'cli-testing'; -import { prepareQuotaScenarios, quotaScenarioteardown } from 'steps/quotas/quotas'; +import { prepareQuotaScenarios, teardownQuotaScenarios } from 'steps/quotas/quotas'; // HTTPS should not cause any error for CTST process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; @@ -27,7 +27,7 @@ Before({ tags: '@Quotas', timeout: 1200000 }, async function (scenarioOptions) { }); After({ tags: '@Quotas' }, async function () { - await quotaScenarioteardown(this as Zenko); + await teardownQuotaScenarios(this as Zenko); }); export default Zenko; diff --git a/tests/ctst/steps/quotas/quotas.ts b/tests/ctst/steps/quotas/quotas.ts index b820b5e953..56128e3d7b 100644 --- a/tests/ctst/steps/quotas/quotas.ts +++ b/tests/ctst/steps/quotas/quotas.ts @@ -89,7 +89,7 @@ export async function prepareQuotaScenarios(world: Zenko, scenarioConfiguration: Identity.addIdentity(IdentityEnum.ACCOUNT, key, configuration[key], undefined, true, true); } -export async function quotaScenarioteardown(world: Zenko) { +export async function teardownQuotaScenarios(world: Zenko) { // Remove any quota at the end of the scenario, in case // the account gets reused, placed after the global After // hook to make sure it is executed first. From c3fc72adc1b444b2595954e21db0c6fc26f77e3a Mon Sep 17 00:00:00 2001 From: williamlardier Date: Mon, 24 Jun 2024 09:41:42 +0200 Subject: [PATCH 2/6] ZENKO-4833: add kube client for reconciliation and deploy watchers --- tests/ctst/steps/utils/kubernetes.ts | 211 ++++++++++++++++++++++++++- 1 file changed, 210 insertions(+), 1 deletion(-) diff --git a/tests/ctst/steps/utils/kubernetes.ts b/tests/ctst/steps/utils/kubernetes.ts index f53d8a15cc..16cea40f3f 100644 --- a/tests/ctst/steps/utils/kubernetes.ts +++ b/tests/ctst/steps/utils/kubernetes.ts @@ -1,7 +1,25 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ import { KubernetesHelper, Utils } from 'cli-testing'; import Zenko from 'world/Zenko'; -import { V1Job, Watch, V1ObjectMeta } from '@kubernetes/client-node'; +import { + V1Job, + Watch, + V1ObjectMeta, + AppsV1Api, + V1Deployment, + AppsApi, + CustomObjectsApi, +} from '@kubernetes/client-node'; + +type ZenkoStatusValue = { + lastTransitionTime: string, + message: string, + status: 'True' | 'False', + reason?: string, + type: 'DeploymentFailure' | 'DeploymentInProgress' | 'Available', +}; + +type ZenkoStatus = ZenkoStatusValue[]; export function createKubeBatchClient(world: Zenko) { if (!KubernetesHelper.clientBatch) { @@ -24,6 +42,27 @@ export function createKubeWatchClient(world: Zenko) { return KubernetesHelper.clientWatch as Watch; } +export function createKubeAppsV1Client(world: Zenko) { + if (!KubernetesHelper.clientAppsV1) { + KubernetesHelper.init(world.parameters); + } + return KubernetesHelper.clientAppsV1 as AppsV1Api; +} + +export function createKubeAppsClient(world: Zenko) { + if (!KubernetesHelper.clientApps) { + KubernetesHelper.init(world.parameters); + } + return KubernetesHelper.clientApps as AppsApi; +} + +export function createKubeCustomObjectClient(world: Zenko) { + if (!KubernetesHelper.customObject) { + KubernetesHelper.init(world.parameters); + } + return KubernetesHelper.customObject as CustomObjectsApi; +} + export async function createJobAndWaitForCompletion(world: Zenko, jobName: string, customMetadata?: string) { const batchClient = createKubeBatchClient(world); const watchClient = createKubeWatchClient(world); @@ -84,3 +123,173 @@ export async function createJobAndWaitForCompletion(world: Zenko, jobName: strin throw err; } } + +export async function waitForZenkoToStabilize( + world: Zenko, needsReconciliation = false, timeout = 15 * 60 * 1000, namespace = 'default') { + // ZKOP pulls the overlay configuration from Pensieve every 5 seconds + // So the status might not be updated immediately after the overlay is applied. + // So, this function will first wait till we detect a reconciliation + // (deploymentInProgress = true), and then wait for the status to be available + const startTime = Date.now(); + let status = false; + let deploymentFailure: ZenkoStatusValue = { + lastTransitionTime: '', + message: '', + status: 'False', + type: 'DeploymentFailure', + }; + let deploymentInProgress: ZenkoStatusValue = { + lastTransitionTime: '', + message: '', + status: 'False', + type: 'DeploymentInProgress', + }; + let available: ZenkoStatusValue = { + lastTransitionTime: '', + message: '', + status: 'False', + type: 'Available', + }; + // If needsReconciliation is true, we expect a reconciliation + // otherwise, we can use the function as a sanity check of the + // zenko status. + let reconciliationDetected = !needsReconciliation; + + world.logger.debug('Waiting for Zenko to stabilize'); + const zenkoClient = createKubeCustomObjectClient(world); + + while (!status && Date.now() - startTime < timeout) { + const zenkoCR = await zenkoClient.getNamespacedCustomObject( + 'zenko.io', + 'v1alpha2', + namespace, + 'zenkos', + 'end2end', + ).catch(err => { + world.logger.error('Error getting Zenko CR', { + err: err as unknown, + }); + return null; + }); + + if (!zenkoCR) { + await Utils.sleep(1000); + continue; + } + + const conditions: ZenkoStatus = (zenkoCR.body as { + status: { + conditions: ZenkoStatus, + }, + })?.status?.conditions || []; + + conditions.forEach(condition => { + if (condition.type === 'DeploymentFailure') { + deploymentFailure = condition; + } else if (condition.type === 'DeploymentInProgress') { + deploymentInProgress = condition; + } else if (condition.type === 'Available') { + available = condition; + } + }); + + world.logger.debug('Checking Zenko CR status', { + conditions, + deploymentFailure, + deploymentInProgress, + available, + }); + + if (!reconciliationDetected && + deploymentInProgress.status === 'True' && + deploymentInProgress.reason === 'Reconciling' + ) { + reconciliationDetected = true; + continue; + } + + if (reconciliationDetected && + deploymentFailure.status === 'False' && + deploymentInProgress.status === 'False' && + available.status === 'True' + ) { + status = true; + } + + await Utils.sleep(1000); + } + + if (!status) { + throw new Error('Zenko did not stabilize'); + } +} + +export async function waitForDataServicesToStabilize(world: Zenko, timeout = 15 * 60 * 1000, namespace = 'default') { + let allRunning = false; + const startTime = Date.now(); + const annotationKey = 'operator.zenko.io/dependencies'; + const dataServices = ['connector-cloudserver-config', 'backbeat-config']; + + const appsClient = createKubeAppsV1Client(world); + + world.logger.debug('Waiting for data services to stabilize', { + namespace, + }); + + // First list all deployments, and then filter the ones with an annotation that matches the data services + const deployments: V1Deployment[] = []; + const serviceDeployments = await appsClient.listNamespacedDeployment(namespace); + for (const deployment of serviceDeployments.body.items) { + const annotations = deployment.metadata?.annotations; + if (annotations && dataServices.some(service => annotations[annotationKey]?.includes(service))) { + deployments.push(deployment); + } + } + + world.logger.debug('Got the list of deployments to check for stabilization', { + deployments: deployments.map(deployment => deployment.metadata?.name), + }); + + while (!allRunning && Date.now() - startTime < timeout) { + allRunning = true; + + // get the deployments in the array, and check in loop if they are ready + for (const deployment of deployments) { + const deploymentName = deployment.metadata?.name; + if (!deploymentName) { + throw new Error('Deployment name not found'); + } + + const deploymentStatus = await appsClient.readNamespacedDeploymentStatus(deploymentName, namespace); + const replicas = deploymentStatus.body.status?.replicas; + const readyReplicas = deploymentStatus.body.status?.readyReplicas; + const updatedReplicas = deploymentStatus.body.status?.updatedReplicas; + const availableReplicas = deploymentStatus.body.status?.availableReplicas; + + world.logger.debug('Checking deployment status', { + deployment: deploymentName, + replicas, + readyReplicas, + updatedReplicas, + availableReplicas, + }); + + if (replicas !== readyReplicas || replicas !== updatedReplicas || replicas !== availableReplicas) { + allRunning = false; + world.logger.debug('Waiting for data service to stabilize', { + deployment: deploymentName, + replicas, + readyReplicas, + }); + } + } + + await Utils.sleep(1000); + } + + if (!allRunning) { + throw new Error('Data services did not stabilize'); + } + + return allRunning; +} From b04b791c172038d7127c8c5c72f8b6ecf8ba63fb Mon Sep 17 00:00:00 2001 From: williamlardier Date: Mon, 24 Jun 2024 09:43:45 +0200 Subject: [PATCH 3/6] ZENKO-4833: bump ctst to 1.0.1 --- tests/ctst/package.json | 2 +- tests/ctst/yarn.lock | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/ctst/package.json b/tests/ctst/package.json index e5324e6424..66a014cf6e 100644 --- a/tests/ctst/package.json +++ b/tests/ctst/package.json @@ -26,7 +26,7 @@ "@typescript-eslint/eslint-plugin": "^5.45.0", "@typescript-eslint/parser": "^5.45.0", "babel-jest": "^29.3.1", - "cli-testing": "github:scality/cli-testing.git#1.0.0", + "cli-testing": "github:scality/cli-testing.git#1.0.1", "eslint": "^8.28.0" }, "scripts": { diff --git a/tests/ctst/yarn.lock b/tests/ctst/yarn.lock index 69ee8980e0..8721d6c831 100644 --- a/tests/ctst/yarn.lock +++ b/tests/ctst/yarn.lock @@ -4807,9 +4807,9 @@ cli-table3@^0.6.0: optionalDependencies: "@colors/colors" "1.5.0" -"cli-testing@github:scality/cli-testing.git#1.0.0": - version "1.0.0" - resolved "git+ssh://git@github.com/scality/cli-testing.git#7fd04802dd895f4db460078b53465c6d5b6452ed" +"cli-testing@github:scality/cli-testing.git#1.0.1": + version "1.0.1" + resolved "git+ssh://git@github.com/scality/cli-testing.git#46c8ec4b9b22928ae806786f1175416949fa30eb" dependencies: "@aws-crypto/sha256-universal" "^5.2.0" "@aws-sdk/client-iam" "^3.484.0" From 746a5c6c5c1513df388a75f825942702ff507ba9 Mon Sep 17 00:00:00 2001 From: williamlardier Date: Mon, 24 Jun 2024 09:46:26 +0200 Subject: [PATCH 4/6] ZENKO-4833: rework kafkacleaner test --- tests/ctst/common/common.ts | 63 ++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/tests/ctst/common/common.ts b/tests/ctst/common/common.ts index 264fa747c5..4d20e65876 100644 --- a/tests/ctst/common/common.ts +++ b/tests/ctst/common/common.ts @@ -305,7 +305,7 @@ Then('object {string} should be {string} and have the storage class {string}', { }); When('i delete object {string}', async function (this: Zenko, objectName: string) { - const objName = getObjectNameWithBackendFlakiness.call(this, objectName) || this.getSaved('objectName'); + const objName = getObjectNameWithBackendFlakiness.call(this, objectName) || this.getSaved('objectName'); this.resetCommand(); this.addCommandParameter({ bucket: this.getSaved('bucketName') }); this.addCommandParameter({ key: objName }); @@ -328,12 +328,9 @@ Then('i {string} be able to add user metadata to object {string}', Then('kafka consumed messages should not take too much place on disk', { timeout: -1 }, async function (this: Zenko) { + let timeoutID; const kfkcIntervalSeconds = parseInt(this.parameters.KafkaCleanerInterval); - const checkInterval = kfkcIntervalSeconds * (1000 + 3000); - - const timeoutID = setTimeout(() => { - assert.fail('Kafka cleaner did not clean the topics'); - }, checkInterval * 5); // Timeout after 5 kafkacleaner intervals + const checkInterval = kfkcIntervalSeconds * 1000 + 5000; try { const ignoredTopics = ['dead-letter']; @@ -342,8 +339,13 @@ Then('kafka consumed messages should not take too much place on disk', { timeout .filter(t => (t.includes(this.parameters.InstanceID) && !ignoredTopics.some(e => t.includes(e)))); + timeoutID = setTimeout(() => { + assert.fail('Kafka cleaner did not clean the topics within the expected time'); + }, (topics.length || 1) * checkInterval * 5); // Timeout after 5 Kafka cleaner intervals + + const previousOffsets = await getTopicsOffsets(topics, kafkaAdmin); + while (topics.length > 0) { - const previousOffsets = await getTopicsOffsets(topics, kafkaAdmin); // Checking topics offsets before kafkacleaner passes to be sure kafkacleaner works // This function can be improved by consuming messages and // verify that the timestamp is not older than last kafkacleaner run @@ -357,32 +359,37 @@ Then('kafka consumed messages should not take too much place on disk', { timeout for (let i = 0; i < topics.length; i++) { this.logger.debug('Checking topic', { topic: topics[i] }); + let topicCleaned = false; for (let j = 0; j < newOffsets[i].partitions.length; j++) { - const newMessagesAfterClean = - newOffsets[i].partitions[j].low === previousOffsets[i].partitions[j].high && - newOffsets[i].partitions[j].high > previousOffsets[i].partitions[j].high; + const newOffsetPartition = newOffsets[i].partitions[j]; + const oldOffsetPartition = previousOffsets[i].partitions[j]; - if (newMessagesAfterClean) { - // If new messages appeared after we gathered the offsets, we need to recheck after - this.logger.warn('New messages after clean', { topic: topics[i] }); + if (!oldOffsetPartition) { continue; } - const lowOffsetIncreased = newOffsets[i].partitions[j].low > - previousOffsets[i].partitions[j].low; - const allMessagesCleaned = newOffsets[i].partitions[j].high === - newOffsets[i].partitions[j].low; - - // If the low offset increased it means the topic has been cleaned - // If low offset is the same as high offset, - // it means the topic is completly cleaned even though lowOffset didnt increased - assert.ok(lowOffsetIncreased || allMessagesCleaned, - `Topic ${topics[i]} partition ${j} offset has not increased, - previousOffsets: ${previousOffsets[i].partitions[j].low} /\ - ${previousOffsets[i].partitions[j].high}, - newOffsets: ${newOffsets[i].partitions[j].low} / ${newOffsets[i].partitions[j].high}`); - - // Topic is cleaned, we don't need to check it anymore + // Ensure we're accessing the correct partition details + const lowOffsetIncreased = parseInt(newOffsetPartition.low) > + parseInt(oldOffsetPartition.low); + // We tolerate one message not being cleaned, as it can be due to the + // message being consumed during the check + const allMessagesCleaned = parseInt(newOffsetPartition.low) + 1 >= + parseInt(newOffsetPartition.high); + + // We consider one topic as xleaned if kafkacleaner affected the + // offset (low) or all messages are cleaned. + if (lowOffsetIncreased || allMessagesCleaned) { + topicCleaned = true; + } else { + // Log warning if the condition is not met for this partition + this.logger.debug(`Partition ${j} of topic ${topics[i]} not cleaned as expected`, { + previousOffsets: oldOffsetPartition, + newOffsets: newOffsetPartition, + }); + } + } + if (topicCleaned) { + // All partitions of the topic are cleaned, remove from array topics.splice(i, 1); } } From bbd4e40f6261ac860d5ed9b13aedf91a84d77fe3 Mon Sep 17 00:00:00 2001 From: williamlardier Date: Mon, 1 Jul 2024 09:12:39 +0200 Subject: [PATCH 5/6] ZENKO-4833: wait once for all topics --- tests/ctst/common/common.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/ctst/common/common.ts b/tests/ctst/common/common.ts index 4d20e65876..746b1cec27 100644 --- a/tests/ctst/common/common.ts +++ b/tests/ctst/common/common.ts @@ -328,10 +328,13 @@ Then('i {string} be able to add user metadata to object {string}', Then('kafka consumed messages should not take too much place on disk', { timeout: -1 }, async function (this: Zenko) { - let timeoutID; const kfkcIntervalSeconds = parseInt(this.parameters.KafkaCleanerInterval); const checkInterval = kfkcIntervalSeconds * 1000 + 5000; + const timeoutID = setTimeout(() => { + assert.fail('Kafka cleaner did not clean the topics within the expected time'); + }, checkInterval * 5); // Timeout after 5 Kafka cleaner intervals + try { const ignoredTopics = ['dead-letter']; const kafkaAdmin = new Kafka({ brokers: [this.parameters.KafkaHosts] }).admin(); @@ -339,10 +342,6 @@ Then('kafka consumed messages should not take too much place on disk', { timeout .filter(t => (t.includes(this.parameters.InstanceID) && !ignoredTopics.some(e => t.includes(e)))); - timeoutID = setTimeout(() => { - assert.fail('Kafka cleaner did not clean the topics within the expected time'); - }, (topics.length || 1) * checkInterval * 5); // Timeout after 5 Kafka cleaner intervals - const previousOffsets = await getTopicsOffsets(topics, kafkaAdmin); while (topics.length > 0) { @@ -376,7 +375,7 @@ Then('kafka consumed messages should not take too much place on disk', { timeout const allMessagesCleaned = parseInt(newOffsetPartition.low) + 1 >= parseInt(newOffsetPartition.high); - // We consider one topic as xleaned if kafkacleaner affected the + // We consider one topic as cleaned if kafkacleaner affected the // offset (low) or all messages are cleaned. if (lowOffsetIncreased || allMessagesCleaned) { topicCleaned = true; From e331443ee8eb4ad4cfb3bebe2993640e50505263 Mon Sep 17 00:00:00 2001 From: williamlardier Date: Mon, 1 Jul 2024 12:10:11 +0200 Subject: [PATCH 6/6] ZENKO-4833: improve waiting duration --- tests/ctst/common/common.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/ctst/common/common.ts b/tests/ctst/common/common.ts index 746b1cec27..e0e07d4421 100644 --- a/tests/ctst/common/common.ts +++ b/tests/ctst/common/common.ts @@ -329,11 +329,11 @@ Then('i {string} be able to add user metadata to object {string}', Then('kafka consumed messages should not take too much place on disk', { timeout: -1 }, async function (this: Zenko) { const kfkcIntervalSeconds = parseInt(this.parameters.KafkaCleanerInterval); - const checkInterval = kfkcIntervalSeconds * 1000 + 5000; + const checkInterval = kfkcIntervalSeconds * (1000 + 5000); const timeoutID = setTimeout(() => { assert.fail('Kafka cleaner did not clean the topics within the expected time'); - }, checkInterval * 5); // Timeout after 5 Kafka cleaner intervals + }, checkInterval * 10); // Timeout after 10 Kafka cleaner intervals try { const ignoredTopics = ['dead-letter'];