diff --git a/tests/ctst/common/common.ts b/tests/ctst/common/common.ts index 264fa747c..4d20e6587 100644 --- a/tests/ctst/common/common.ts +++ b/tests/ctst/common/common.ts @@ -305,7 +305,7 @@ Then('object {string} should be {string} and have the storage class {string}', { }); When('i delete object {string}', async function (this: Zenko, objectName: string) { - const objName = getObjectNameWithBackendFlakiness.call(this, objectName) || this.getSaved('objectName'); + const objName = getObjectNameWithBackendFlakiness.call(this, objectName) || this.getSaved('objectName'); this.resetCommand(); this.addCommandParameter({ bucket: this.getSaved('bucketName') }); this.addCommandParameter({ key: objName }); @@ -328,12 +328,9 @@ Then('i {string} be able to add user metadata to object {string}', Then('kafka consumed messages should not take too much place on disk', { timeout: -1 }, async function (this: Zenko) { + let timeoutID; const kfkcIntervalSeconds = parseInt(this.parameters.KafkaCleanerInterval); - const checkInterval = kfkcIntervalSeconds * (1000 + 3000); - - const timeoutID = setTimeout(() => { - assert.fail('Kafka cleaner did not clean the topics'); - }, checkInterval * 5); // Timeout after 5 kafkacleaner intervals + const checkInterval = kfkcIntervalSeconds * 1000 + 5000; try { const ignoredTopics = ['dead-letter']; @@ -342,8 +339,13 @@ Then('kafka consumed messages should not take too much place on disk', { timeout .filter(t => (t.includes(this.parameters.InstanceID) && !ignoredTopics.some(e => t.includes(e)))); + timeoutID = setTimeout(() => { + assert.fail('Kafka cleaner did not clean the topics within the expected time'); + }, (topics.length || 1) * checkInterval * 5); // Timeout after 5 Kafka cleaner intervals + + const previousOffsets = await getTopicsOffsets(topics, kafkaAdmin); + while (topics.length > 0) { - const previousOffsets = await getTopicsOffsets(topics, kafkaAdmin); // Checking topics offsets before kafkacleaner passes to be sure kafkacleaner works // This function can be improved by consuming messages and // verify that the timestamp is not older than last kafkacleaner run @@ -357,32 +359,37 @@ Then('kafka consumed messages should not take too much place on disk', { timeout for (let i = 0; i < topics.length; i++) { this.logger.debug('Checking topic', { topic: topics[i] }); + let topicCleaned = false; for (let j = 0; j < newOffsets[i].partitions.length; j++) { - const newMessagesAfterClean = - newOffsets[i].partitions[j].low === previousOffsets[i].partitions[j].high && - newOffsets[i].partitions[j].high > previousOffsets[i].partitions[j].high; + const newOffsetPartition = newOffsets[i].partitions[j]; + const oldOffsetPartition = previousOffsets[i].partitions[j]; - if (newMessagesAfterClean) { - // If new messages appeared after we gathered the offsets, we need to recheck after - this.logger.warn('New messages after clean', { topic: topics[i] }); + if (!oldOffsetPartition) { continue; } - const lowOffsetIncreased = newOffsets[i].partitions[j].low > - previousOffsets[i].partitions[j].low; - const allMessagesCleaned = newOffsets[i].partitions[j].high === - newOffsets[i].partitions[j].low; - - // If the low offset increased it means the topic has been cleaned - // If low offset is the same as high offset, - // it means the topic is completly cleaned even though lowOffset didnt increased - assert.ok(lowOffsetIncreased || allMessagesCleaned, - `Topic ${topics[i]} partition ${j} offset has not increased, - previousOffsets: ${previousOffsets[i].partitions[j].low} /\ - ${previousOffsets[i].partitions[j].high}, - newOffsets: ${newOffsets[i].partitions[j].low} / ${newOffsets[i].partitions[j].high}`); - - // Topic is cleaned, we don't need to check it anymore + // Ensure we're accessing the correct partition details + const lowOffsetIncreased = parseInt(newOffsetPartition.low) > + parseInt(oldOffsetPartition.low); + // We tolerate one message not being cleaned, as it can be due to the + // message being consumed during the check + const allMessagesCleaned = parseInt(newOffsetPartition.low) + 1 >= + parseInt(newOffsetPartition.high); + + // We consider one topic as xleaned if kafkacleaner affected the + // offset (low) or all messages are cleaned. + if (lowOffsetIncreased || allMessagesCleaned) { + topicCleaned = true; + } else { + // Log warning if the condition is not met for this partition + this.logger.debug(`Partition ${j} of topic ${topics[i]} not cleaned as expected`, { + previousOffsets: oldOffsetPartition, + newOffsets: newOffsetPartition, + }); + } + } + if (topicCleaned) { + // All partitions of the topic are cleaned, remove from array topics.splice(i, 1); } }