Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka cleaner & Azure Location creation flakies #2097

Open
wants to merge 6 commits into
base: development/2.6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions tests/ctst/common/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ Then('object {string} should be {string} and have the storage class {string}', {
});

When('i delete object {string}', async function (this: Zenko, objectName: string) {
const objName = getObjectNameWithBackendFlakiness.call(this, objectName) || this.getSaved<string>('objectName');
const objName = getObjectNameWithBackendFlakiness.call(this, objectName) || this.getSaved<string>('objectName');
this.resetCommand();
this.addCommandParameter({ bucket: this.getSaved<string>('bucketName') });
this.addCommandParameter({ key: objName });
Expand All @@ -329,11 +329,11 @@ Then('i {string} be able to add user metadata to object {string}',
Then('kafka consumed messages should not take too much place on disk', { timeout: -1 },
async function (this: Zenko) {
const kfkcIntervalSeconds = parseInt(this.parameters.KafkaCleanerInterval);
const checkInterval = kfkcIntervalSeconds * (1000 + 3000);
const checkInterval = kfkcIntervalSeconds * (1000 + 5000);

const timeoutID = setTimeout(() => {
assert.fail('Kafka cleaner did not clean the topics');
}, checkInterval * 5); // Timeout after 5 kafkacleaner intervals
assert.fail('Kafka cleaner did not clean the topics within the expected time');
}, checkInterval * 10); // Timeout after 10 Kafka cleaner intervals

try {
const ignoredTopics = ['dead-letter'];
Expand All @@ -342,8 +342,9 @@ Then('kafka consumed messages should not take too much place on disk', { timeout
.filter(t => (t.includes(this.parameters.InstanceID) &&
!ignoredTopics.some(e => t.includes(e))));

const previousOffsets = await getTopicsOffsets(topics, kafkaAdmin);

while (topics.length > 0) {
const previousOffsets = await getTopicsOffsets(topics, kafkaAdmin);
// Checking topics offsets before kafkacleaner passes to be sure kafkacleaner works
// This function can be improved by consuming messages and
// verify that the timestamp is not older than last kafkacleaner run
Expand All @@ -357,32 +358,37 @@ Then('kafka consumed messages should not take too much place on disk', { timeout

for (let i = 0; i < topics.length; i++) {
this.logger.debug('Checking topic', { topic: topics[i] });
let topicCleaned = false;
for (let j = 0; j < newOffsets[i].partitions.length; j++) {
const newMessagesAfterClean =
newOffsets[i].partitions[j].low === previousOffsets[i].partitions[j].high &&
newOffsets[i].partitions[j].high > previousOffsets[i].partitions[j].high;
const newOffsetPartition = newOffsets[i].partitions[j];
const oldOffsetPartition = previousOffsets[i].partitions[j];

if (newMessagesAfterClean) {
// If new messages appeared after we gathered the offsets, we need to recheck after
this.logger.warn('New messages after clean', { topic: topics[i] });
if (!oldOffsetPartition) {
continue;
}

const lowOffsetIncreased = newOffsets[i].partitions[j].low >
previousOffsets[i].partitions[j].low;
const allMessagesCleaned = newOffsets[i].partitions[j].high ===
newOffsets[i].partitions[j].low;

// If the low offset increased it means the topic has been cleaned
// If low offset is the same as high offset,
// it means the topic is completly cleaned even though lowOffset didnt increased
assert.ok(lowOffsetIncreased || allMessagesCleaned,
`Topic ${topics[i]} partition ${j} offset has not increased,
previousOffsets: ${previousOffsets[i].partitions[j].low} /\
${previousOffsets[i].partitions[j].high},
newOffsets: ${newOffsets[i].partitions[j].low} / ${newOffsets[i].partitions[j].high}`);

// Topic is cleaned, we don't need to check it anymore
// Ensure we're accessing the correct partition details
const lowOffsetIncreased = parseInt(newOffsetPartition.low) >
parseInt(oldOffsetPartition.low);
// We tolerate one message not being cleaned, as it can be due to the
// message being consumed during the check
const allMessagesCleaned = parseInt(newOffsetPartition.low) + 1 >=
parseInt(newOffsetPartition.high);

// We consider one topic as cleaned if kafkacleaner affected the
// offset (low) or all messages are cleaned.
if (lowOffsetIncreased || allMessagesCleaned) {
topicCleaned = true;
} else {
// Log warning if the condition is not met for this partition
this.logger.debug(`Partition ${j} of topic ${topics[i]} not cleaned as expected`, {
previousOffsets: oldOffsetPartition,
newOffsets: newOffsetPartition,
});
}
}
if (topicCleaned) {
// All partitions of the topic are cleaned, remove from array
topics.splice(i, 1);
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/ctst/common/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
} from '@cucumber/cucumber';
import Zenko from '../world/Zenko';
import { Identity } from 'cli-testing';
import { prepareQuotaScenarios, quotaScenarioteardown } from 'steps/quotas/quotas';
import { prepareQuotaScenarios, teardownQuotaScenarios } from 'steps/quotas/quotas';

// HTTPS should not cause any error for CTST
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
Expand All @@ -27,7 +27,7 @@ Before({ tags: '@Quotas', timeout: 1200000 }, async function (scenarioOptions) {
});

After({ tags: '@Quotas' }, async function () {
await quotaScenarioteardown(this as Zenko);
await teardownQuotaScenarios(this as Zenko);
});

export default Zenko;
2 changes: 1 addition & 1 deletion tests/ctst/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"@typescript-eslint/eslint-plugin": "^5.45.0",
"@typescript-eslint/parser": "^5.45.0",
"babel-jest": "^29.3.1",
"cli-testing": "github:scality/cli-testing.git#1.0.0",
"cli-testing": "github:scality/cli-testing.git#1.0.1",
"eslint": "^8.28.0"
},
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion tests/ctst/steps/quotas/quotas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export async function prepareQuotaScenarios(world: Zenko, scenarioConfiguration:
Identity.addIdentity(IdentityEnum.ACCOUNT, key, configuration[key], undefined, true, true);
}

export async function quotaScenarioteardown(world: Zenko) {
export async function teardownQuotaScenarios(world: Zenko) {
// Remove any quota at the end of the scenario, in case
// the account gets reused, placed after the global After
// hook to make sure it is executed first.
Expand Down
211 changes: 210 additions & 1 deletion tests/ctst/steps/utils/kubernetes.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { KubernetesHelper, Utils } from 'cli-testing';
import Zenko from 'world/Zenko';
import { V1Job, Watch, V1ObjectMeta } from '@kubernetes/client-node';
import {
V1Job,
Watch,
V1ObjectMeta,
AppsV1Api,
V1Deployment,
AppsApi,
CustomObjectsApi,
} from '@kubernetes/client-node';

type ZenkoStatusValue = {
lastTransitionTime: string,
message: string,
status: 'True' | 'False',
reason?: string,
type: 'DeploymentFailure' | 'DeploymentInProgress' | 'Available',
};

type ZenkoStatus = ZenkoStatusValue[];

export function createKubeBatchClient(world: Zenko) {
if (!KubernetesHelper.clientBatch) {
Expand All @@ -24,6 +42,27 @@ export function createKubeWatchClient(world: Zenko) {
return KubernetesHelper.clientWatch as Watch;
}

export function createKubeAppsV1Client(world: Zenko) {
if (!KubernetesHelper.clientAppsV1) {
KubernetesHelper.init(world.parameters);
}
return KubernetesHelper.clientAppsV1 as AppsV1Api;
}

export function createKubeAppsClient(world: Zenko) {
if (!KubernetesHelper.clientApps) {
KubernetesHelper.init(world.parameters);
}
return KubernetesHelper.clientApps as AppsApi;
}

export function createKubeCustomObjectClient(world: Zenko) {
if (!KubernetesHelper.customObject) {
KubernetesHelper.init(world.parameters);
}
return KubernetesHelper.customObject as CustomObjectsApi;
}

export async function createJobAndWaitForCompletion(world: Zenko, jobName: string, customMetadata?: string) {
const batchClient = createKubeBatchClient(world);
const watchClient = createKubeWatchClient(world);
Expand Down Expand Up @@ -84,3 +123,173 @@ export async function createJobAndWaitForCompletion(world: Zenko, jobName: strin
throw err;
}
}

export async function waitForZenkoToStabilize(
world: Zenko, needsReconciliation = false, timeout = 15 * 60 * 1000, namespace = 'default') {
// ZKOP pulls the overlay configuration from Pensieve every 5 seconds
// So the status might not be updated immediately after the overlay is applied.
// So, this function will first wait till we detect a reconciliation
// (deploymentInProgress = true), and then wait for the status to be available
const startTime = Date.now();
let status = false;
let deploymentFailure: ZenkoStatusValue = {
lastTransitionTime: '',
message: '',
status: 'False',
type: 'DeploymentFailure',
};
let deploymentInProgress: ZenkoStatusValue = {
lastTransitionTime: '',
message: '',
status: 'False',
type: 'DeploymentInProgress',
};
let available: ZenkoStatusValue = {
lastTransitionTime: '',
message: '',
status: 'False',
type: 'Available',
};
// If needsReconciliation is true, we expect a reconciliation
// otherwise, we can use the function as a sanity check of the
// zenko status.
let reconciliationDetected = !needsReconciliation;

world.logger.debug('Waiting for Zenko to stabilize');
const zenkoClient = createKubeCustomObjectClient(world);

while (!status && Date.now() - startTime < timeout) {
const zenkoCR = await zenkoClient.getNamespacedCustomObject(
'zenko.io',
'v1alpha2',
namespace,
'zenkos',
'end2end',
).catch(err => {
world.logger.error('Error getting Zenko CR', {
err: err as unknown,
});
return null;
});

if (!zenkoCR) {
await Utils.sleep(1000);
continue;
}

const conditions: ZenkoStatus = (zenkoCR.body as {
status: {
conditions: ZenkoStatus,
},
})?.status?.conditions || [];

conditions.forEach(condition => {
if (condition.type === 'DeploymentFailure') {
deploymentFailure = condition;
} else if (condition.type === 'DeploymentInProgress') {
deploymentInProgress = condition;
} else if (condition.type === 'Available') {
available = condition;
}
});

world.logger.debug('Checking Zenko CR status', {
conditions,
deploymentFailure,
deploymentInProgress,
available,
});

if (!reconciliationDetected &&
deploymentInProgress.status === 'True' &&
deploymentInProgress.reason === 'Reconciling'
) {
reconciliationDetected = true;
continue;
}

if (reconciliationDetected &&
deploymentFailure.status === 'False' &&
deploymentInProgress.status === 'False' &&
available.status === 'True'
) {
status = true;
}

await Utils.sleep(1000);
}

if (!status) {
throw new Error('Zenko did not stabilize');
}
}

export async function waitForDataServicesToStabilize(world: Zenko, timeout = 15 * 60 * 1000, namespace = 'default') {
let allRunning = false;
const startTime = Date.now();
const annotationKey = 'operator.zenko.io/dependencies';
const dataServices = ['connector-cloudserver-config', 'backbeat-config'];

const appsClient = createKubeAppsV1Client(world);

world.logger.debug('Waiting for data services to stabilize', {
namespace,
});

// First list all deployments, and then filter the ones with an annotation that matches the data services
const deployments: V1Deployment[] = [];
const serviceDeployments = await appsClient.listNamespacedDeployment(namespace);
for (const deployment of serviceDeployments.body.items) {
const annotations = deployment.metadata?.annotations;
if (annotations && dataServices.some(service => annotations[annotationKey]?.includes(service))) {
deployments.push(deployment);
}
}

world.logger.debug('Got the list of deployments to check for stabilization', {
deployments: deployments.map(deployment => deployment.metadata?.name),
});

while (!allRunning && Date.now() - startTime < timeout) {
allRunning = true;

// get the deployments in the array, and check in loop if they are ready
for (const deployment of deployments) {
const deploymentName = deployment.metadata?.name;
if (!deploymentName) {
throw new Error('Deployment name not found');
}

const deploymentStatus = await appsClient.readNamespacedDeploymentStatus(deploymentName, namespace);
const replicas = deploymentStatus.body.status?.replicas;
const readyReplicas = deploymentStatus.body.status?.readyReplicas;
const updatedReplicas = deploymentStatus.body.status?.updatedReplicas;
const availableReplicas = deploymentStatus.body.status?.availableReplicas;

world.logger.debug('Checking deployment status', {
deployment: deploymentName,
replicas,
readyReplicas,
updatedReplicas,
availableReplicas,
});

if (replicas !== readyReplicas || replicas !== updatedReplicas || replicas !== availableReplicas) {
allRunning = false;
world.logger.debug('Waiting for data service to stabilize', {
deployment: deploymentName,
replicas,
readyReplicas,
});
}
}

await Utils.sleep(1000);
}

if (!allRunning) {
throw new Error('Data services did not stabilize');
}

return allRunning;
}
6 changes: 3 additions & 3 deletions tests/ctst/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4807,9 +4807,9 @@ cli-table3@^0.6.0:
optionalDependencies:
"@colors/colors" "1.5.0"

"cli-testing@github:scality/cli-testing.git#1.0.0":
version "1.0.0"
resolved "git+ssh://[email protected]/scality/cli-testing.git#7fd04802dd895f4db460078b53465c6d5b6452ed"
"cli-testing@github:scality/cli-testing.git#1.0.1":
version "1.0.1"
resolved "git+ssh://[email protected]/scality/cli-testing.git#46c8ec4b9b22928ae806786f1175416949fa30eb"
dependencies:
"@aws-crypto/sha256-universal" "^5.2.0"
"@aws-sdk/client-iam" "^3.484.0"
Expand Down
Loading