Skip to content

Commit

Permalink
ZENKO-4833: rework kafkacleaner test
Browse files Browse the repository at this point in the history
  • Loading branch information
williamlardier committed Jun 25, 2024
1 parent fbdcb74 commit dd79b07
Showing 1 changed file with 35 additions and 28 deletions.
63 changes: 35 additions & 28 deletions tests/ctst/common/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ Then('object {string} should be {string} and have the storage class {string}', {
});

When('i delete object {string}', async function (this: Zenko, objectName: string) {
const objName = getObjectNameWithBackendFlakiness.call(this, objectName) || this.getSaved<string>('objectName');
const objName = getObjectNameWithBackendFlakiness.call(this, objectName) || this.getSaved<string>('objectName');
this.resetCommand();
this.addCommandParameter({ bucket: this.getSaved<string>('bucketName') });
this.addCommandParameter({ key: objName });
Expand All @@ -328,12 +328,9 @@ Then('i {string} be able to add user metadata to object {string}',

Then('kafka consumed messages should not take too much place on disk', { timeout: -1 },
async function (this: Zenko) {
let timeoutID;
const kfkcIntervalSeconds = parseInt(this.parameters.KafkaCleanerInterval);
const checkInterval = kfkcIntervalSeconds * (1000 + 3000);

const timeoutID = setTimeout(() => {
assert.fail('Kafka cleaner did not clean the topics');
}, checkInterval * 5); // Timeout after 5 kafkacleaner intervals
const checkInterval = kfkcIntervalSeconds * 1000 + 5000;

try {
const ignoredTopics = ['dead-letter'];
Expand All @@ -342,8 +339,13 @@ Then('kafka consumed messages should not take too much place on disk', { timeout
.filter(t => (t.includes(this.parameters.InstanceID) &&
!ignoredTopics.some(e => t.includes(e))));

timeoutID = setTimeout(() => {
assert.fail('Kafka cleaner did not clean the topics within the expected time');
}, (topics.length || 1) * checkInterval * 5); // Timeout after 5 Kafka cleaner intervals

const previousOffsets = await getTopicsOffsets(topics, kafkaAdmin);

while (topics.length > 0) {
const previousOffsets = await getTopicsOffsets(topics, kafkaAdmin);
// Checking topics offsets before kafkacleaner passes to be sure kafkacleaner works
// This function can be improved by consuming messages and
// verify that the timestamp is not older than last kafkacleaner run
Expand All @@ -357,32 +359,37 @@ Then('kafka consumed messages should not take too much place on disk', { timeout

for (let i = 0; i < topics.length; i++) {
this.logger.debug('Checking topic', { topic: topics[i] });
let topicCleaned = false;
for (let j = 0; j < newOffsets[i].partitions.length; j++) {
const newMessagesAfterClean =
newOffsets[i].partitions[j].low === previousOffsets[i].partitions[j].high &&
newOffsets[i].partitions[j].high > previousOffsets[i].partitions[j].high;
const newOffsetPartition = newOffsets[i].partitions[j];
const oldOffsetPartition = previousOffsets[i].partitions[j];

if (newMessagesAfterClean) {
// If new messages appeared after we gathered the offsets, we need to recheck after
this.logger.warn('New messages after clean', { topic: topics[i] });
if (!oldOffsetPartition) {
continue;
}

const lowOffsetIncreased = newOffsets[i].partitions[j].low >
previousOffsets[i].partitions[j].low;
const allMessagesCleaned = newOffsets[i].partitions[j].high ===
newOffsets[i].partitions[j].low;

// If the low offset increased it means the topic has been cleaned
// If low offset is the same as high offset,
// it means the topic is completly cleaned even though lowOffset didnt increased
assert.ok(lowOffsetIncreased || allMessagesCleaned,
`Topic ${topics[i]} partition ${j} offset has not increased,
previousOffsets: ${previousOffsets[i].partitions[j].low} /\
${previousOffsets[i].partitions[j].high},
newOffsets: ${newOffsets[i].partitions[j].low} / ${newOffsets[i].partitions[j].high}`);

// Topic is cleaned, we don't need to check it anymore
// Ensure we're accessing the correct partition details
const lowOffsetIncreased = parseInt(newOffsetPartition.low) >
parseInt(oldOffsetPartition.low);
// We tolerate one message not being cleaned, as it can be due to the
// message being consumed during the check
const allMessagesCleaned = parseInt(newOffsetPartition.low) + 1 >=
parseInt(newOffsetPartition.high);

// We consider one topic as xleaned if kafkacleaner affected the
// offset (low) or all messages are cleaned.
if (lowOffsetIncreased || allMessagesCleaned) {
topicCleaned = true;
} else {
// Log warning if the condition is not met for this partition
this.logger.debug(`Partition ${j} of topic ${topics[i]} not cleaned as expected`, {
previousOffsets: oldOffsetPartition,
newOffsets: newOffsetPartition,
});
}
}
if (topicCleaned) {
// All partitions of the topic are cleaned, remove from array
topics.splice(i, 1);
}
}
Expand Down

0 comments on commit dd79b07

Please sign in to comment.