Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17026: Implement updateCacheAndOffsets functionality on LSO movement #16459

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from

Conversation

adixitconfluent
Copy link
Contributor

@adixitconfluent adixitconfluent commented Jun 26, 2024

About

Implemented the functionality which takes care of archiving the records when LSO moves past them. Implemented the following functions -

  1. updateCacheAndOffsets - Updates the cached state, start and end offsets of the share partition as per the new log start offset. The method is called when the log start offset is moved for the share partition.
  2. archiveAvailableRecordsOnLsoMovement - This function archives all the available records when they are behind the LSO.
  3. archivePerOffsetBatchRecords - It archives all the available records in the per offset tracked batch passed to this function.
  4. archiveCompleteBatch - It archives all the available records of the complete batch passed to this function.

Testing

The added functionality has been tested with unit tests.

Suppression

In order for build to pass, I had to add JavaNCSS suppression for SharePartitionTest. It takes into account the total lines of non-commented source code. Agreed, that we can optimize the current SharePartitionTest code at a few places, however, when we add new test cases, we will again breach this limit which is currently set at 1500. Hence, adding the suppression made sense to me.

@adixitconfluent adixitconfluent marked this pull request as ready for review June 27, 2024 10:55
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, some minor comments.

@@ -44,7 +44,7 @@
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
<suppress checks="JavaNCSS"
files="RemoteLogManagerTest.java"/>
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid this suppression by reafctoring tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Apoorv, I tried some small refactors like having a function assertBatchEquals / acquireRecords (commit), but they were too small an improvements to remove the NCSS issue. Reading about this checkstyle, it takes into account the total lines of non-commented source code. So, when we add new test cases, this might come again, hence added the suppression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it's not required now, then remove it. In future if it happens again then we can add it back.

core/src/main/java/kafka/server/share/SharePartition.java Outdated Show resolved Hide resolved
@@ -903,7 +1023,7 @@ private boolean checkForFullMatch(InFlightBatch inFlightBatch, long firstOffsetT
* @return True if the start offset has moved and within the request first and last offset, false otherwise.
*/
private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batchLastOffset) {
return batchFirstOffset < startOffset && batchLastOffset >= startOffset;
return batchFirstOffset < startOffset() && batchLastOffset >= startOffset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 though the method is being called from methods which already has lock but to make it safe you might want to use a single variable which first reads startOffset() and then is used. So we make sure that value is always same and not changed within 2 different calls to startOffset().

Copy link
Contributor Author

@adixitconfluent adixitconfluent Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the usages of this function checkForStartOffsetWithinBatch. In all the functions that it is being used, we have acquired the write locks before we read it, so instead of having the function startOffset(), we can just use the variable. I will remove this line change from the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't suggest that, the method should be defined in a way that it's thread safe. I ll suggest somehting like below:

long localStartOffset = startOffset();
return batchFirstOffset < localStartOffset && batchLastOffset >= localStartOffset;

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just minor suggestions.

@@ -44,7 +44,7 @@
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
<suppress checks="JavaNCSS"
files="RemoteLogManagerTest.java"/>
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it's not required now, then remove it. In future if it happens again then we can add it back.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants