-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17026: Implement updateCacheAndOffsets functionality on LSO movement #16459
base: trunk
Are you sure you want to change the base?
Conversation
c153fba
to
e5ea7d4
Compare
4a844ef
to
a11e394
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, some minor comments.
@@ -44,7 +44,7 @@ | |||
<suppress checks="MethodLength" | |||
files="(KafkaClusterTestKit).java"/> | |||
<suppress checks="JavaNCSS" | |||
files="RemoteLogManagerTest.java"/> | |||
files="(RemoteLogManagerTest|SharePartitionTest).java"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to avoid this suppression by reafctoring tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Apoorv, I tried some small refactors like having a function assertBatchEquals / acquireRecords (commit), but they were too small an improvements to remove the NCSS issue. Reading about this checkstyle, it takes into account the total lines of non-commented source code. So, when we add new test cases, this might come again, hence added the suppression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it's not required now, then remove it. In future if it happens again then we can add it back.
@@ -903,7 +1023,7 @@ private boolean checkForFullMatch(InFlightBatch inFlightBatch, long firstOffsetT | |||
* @return True if the start offset has moved and within the request first and last offset, false otherwise. | |||
*/ | |||
private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batchLastOffset) { | |||
return batchFirstOffset < startOffset && batchLastOffset >= startOffset; | |||
return batchFirstOffset < startOffset() && batchLastOffset >= startOffset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 though the method is being called from methods which already has lock but to make it safe you might want to use a single variable which first reads startOffset()
and then is used. So we make sure that value is always same and not changed within 2 different calls to startOffset()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the usages of this function checkForStartOffsetWithinBatch
. In all the functions that it is being used, we have acquired the write locks before we read it, so instead of having the function startOffset()
, we can just use the variable. I will remove this line change from the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I won't suggest that, the method should be defined in a way that it's thread safe. I ll suggest somehting like below:
long localStartOffset = startOffset();
return batchFirstOffset < localStartOffset && batchLastOffset >= localStartOffset;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just minor suggestions.
@@ -44,7 +44,7 @@ | |||
<suppress checks="MethodLength" | |||
files="(KafkaClusterTestKit).java"/> | |||
<suppress checks="JavaNCSS" | |||
files="RemoteLogManagerTest.java"/> | |||
files="(RemoteLogManagerTest|SharePartitionTest).java"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it's not required now, then remove it. In future if it happens again then we can add it back.
About
Implemented the functionality which takes care of archiving the records when LSO moves past them. Implemented the following functions -
updateCacheAndOffsets
- Updates the cached state, start and end offsets of the share partition as per the new log start offset. The method is called when the log start offset is moved for the share partition.archiveAvailableRecordsOnLsoMovement
- This function archives all the available records when they are behind the LSO.archivePerOffsetBatchRecords
- It archives all the available records in the per offset tracked batch passed to this function.archiveCompleteBatch
- It archives all the available records of the complete batch passed to this function.Testing
The added functionality has been tested with unit tests.
Suppression
In order for build to pass, I had to add JavaNCSS suppression for
SharePartitionTest
. It takes into account the total lines of non-commented source code. Agreed, that we can optimize the currentSharePartitionTest
code at a few places, however, when we add new test cases, we will again breach this limit which is currently set at 1500. Hence, adding the suppression made sense to me.