-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16308 [3/N]: Introduce feature dependency validation to UpdateFeatures command #16443
Conversation
I will probably just include this in 4.0 so we can drop the ZK support as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan : Thanks for the PR. Made a pass of non-testing files. Left a couple of comments.
@@ -3603,13 +3603,16 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): Unit = { | |||
def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = { | |||
errors match { | |||
// Hard-code version to 1 since version 2 will not be implemented for 4.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure that I follow. This PR is for 4.0, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to remove the ZK code in this PR so I hard-coded this so tests pass. When we remove the ZK, this will go away too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we prevent the client from issuing V2 updateFeature request in ZK mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code will be removed before the release. I wasn't sure if it was necessary to include handling at all if folks are running off of trunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have removed as part of this PR but I thought that should be done separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Perhaps make it clear that this is for ZK, which is not supported in 4.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
.setErrorMessage(error.message())); | ||
}); | ||
// If the request is a newer version, indicate the update failed with a top level error if any update failed. | ||
if (context.requestHeader().requestApiVersion() > 1 && featuresWithErrors.size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a partial failure, it seems weird to return an error at the top level but with no error at some features. In that case, none of the updated features is persisted. For example, in produce response, we set the partition level error to be the same as the top level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change all the partitions to the top level error if that is preferred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the other question I have is whether it is better to fail fast or potentially show what other errors could be.
For example, if one feature is invalid because it not a real feature and one is invalid because of a dependency, should we fail with just the not real feature error? Or should we include both in the error message?
It is a little annoying to have to rerun the command for each error, but it could also be confusing if the error code is not the same for the different errors and we set them all to be the same.
Perhaps failing fast with one error at at time is the best option at the expense of the user having to rerun the command for each error. In that case, we can also change the error message logic to just share the error of the one error we found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my latest push, I took the approach of failing fast and only replying with the features we have seen. Another option is to mark the remaining features as also failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan : Thanks for the updated PR. A few more comments.
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
if (validateOnly) { | ||
if (validateOnly || updateFailed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We changed the implementation such that if one feature has an error, none of the features will be processed. It seems that we only need to return a top level error in UpdateFeaturesResponse. There is no need to have the per feature error code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works too. I can do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking a closer look at this -- are you suggesting to return a different type of controller response here? There are a few options. One is to simply return the feature and error that caused the failure above (in the if block). Another is to modify the type to account for top level errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of just having the following schema. We can return the cause in the top level error message.
{
"apiKey": 57,
"type": "response",
"name": "UpdateFeaturesResponse",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or `0` if there was no top-level error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The top-level error message, or `null` if there was no top-level error." }
]
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. This makes sense. I think the part I was not sure about was the return type of ControllerResult. I think the approach I am taking right now is to return ControllerResult with a single entry containing the feature with the error. In QuorumController, I convert this to the top level error to return.
Ie.
if (!error.error().equals(Errors.NONE)) {
return ControllerResult.of(Collections.emptyList(), Collections.singletonMap(entry.getKey(), error));
}
if (errorEntry.isPresent()) {
String errorFeatureName = errorEntry.get().getKey();
ApiError topError = errorEntry.get().getValue();
String errorString = errorFeatureName + ":" + topError.error().exceptionName() + " (" + topError.message() + ")";
responseData.setErrorCode(topError.error().code());
responseData.setErrorMessage("The update failed for all features since the following feature had an error: " + errorString);
thinking about ways to clean up how we find the one error. Will update again after lunch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Maybe ControllerResult just need to include a single error code and error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed the new approach. Let me know if it makes sense. (I can also clean up the surrounding code if we want to do it this way. 👍
String errorFeatureName = errorEntry.get().getKey(); | ||
ApiError topError = errorEntry.get().getValue(); | ||
String errorString = errorFeatureName + ":" + topError.error().exceptionName() + " (" + topError.message() + ")"; | ||
responseData.setErrorCode(Errors.INVALID_UPDATE_VERSION.code()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use the error code from topError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KIP specified this error code. We can change this if we think this should not be the case.
The UpdateFeaturesReponse will contain a top level INVALID_UPDATE_VERSION error if any feature fails to update and no updates will persist if a single feature in the requrest fails validation. The same INVALID_UPDATE_VERSION error will be specified in UpdatableFeatureResult.ErrorCode for the features that failed. This will be accompanied with UpdatableFeatureResult.ErrorMessage an error message explaining the dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, but is ControllerResult.response returned from featureControl.updateFeatures
supposed to contain the error code for the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I included it there so the error message could contain the "real" error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao I guess to summarize the conversation -- does it make sense to simply return whatever error occurred as the top level error and not invalid update version? I would think we would still want the specific feature in the response message so it can be communicated to the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to convert arbitrary server errors to just INVALID_UPDATE_VERSION
. I was just wondering where the conversion happens. It seems that we already did that conversion in FeatureControlManager.updateFeature()
. If FeatureControlManager.updateFeature()
always returns INVALID_UPDATE_VERSION
, we could just pick it up here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. I see what you are saying. So we should just take the error and not further convert it. The main thing that is important is to also include the feature name.
@@ -3603,13 +3603,16 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): Unit = { | |||
def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = { | |||
errors match { | |||
// Hard-code version to 1 since version 2 will not be implemented for 4.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we prevent the client from issuing V2 updateFeature request in ZK mode?
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
Outdated
Show resolved
Hide resolved
…the top level error (not change to the invalid update version for all)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan : Thanks for the updated PR. A few more comments.
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
Outdated
Show resolved
Hide resolved
results.add(result); | ||
Optional<Map.Entry<String, ApiError>> errorEntry = Optional.empty(); | ||
if (version > 1) { | ||
Stream<Map.Entry<String, ApiError>> errorEntries = updateErrors.entrySet().stream().filter(entry -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For v2, we can just ignore updateErrors and only consider topLevelError, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting, because this code is really only currently called in the ZK path. In the case where it would be called with a partial update, we would potentially want to catch the error? But I can remove if it is confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case where it would be called with a partial update, we would potentially want to catch the error?
But the ZK response is hard coded for v1, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I wrote this in case someone would use this in the the future for some Kraft case. But maybe that is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative is to just remove the ability to pass in a collection of updates if ZK path is fully removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok -- actually I think the best way forward is to leave as is and not update the createWithErrors method. When we remove the ZK code, we can remove the ability to pass in updateErrors. I can leave a todo for this if it helps.
} | ||
|
||
if (errorEntry.isPresent()) { | ||
String errorFeatureName = errorEntry.get().getKey(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For v1, the existing behavior is that if updateErrors is not empty, we set topLevelError to be none. It seems that we are changing the behavior here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we are changing it to be none. We set it as usual in line 114.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we set topLevelError
to topError in line 100. This means that if there is a feature level error, the top level error is not none, which is different from the current behavior in v1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errorEntry will never be present if version is 1 or below. I agree this is not ideal for readability though.
…FeaturesResponse to be simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan : Thanks for the updated PR. A couple of more comments.
@@ -17,7 +17,7 @@ | |||
"apiKey": 57, | |||
"type": "response", | |||
"name": "UpdateFeaturesResponse", | |||
"validVersions": "0-1", | |||
"validVersions": "0-2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove the Results
field in V2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose so. I will need to update a few parts of the KIP 😅 but good to get it right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way the future is structured, it expects results for all the features.
kafka/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Line 4555 in be11615
// The server should send back a response for every feature, but we do a sanity check anyway. |
We can do this, but that will mean I will have to change this code too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is not a great way to check version here, so the only thing I can do easily is remove this line. :(
I'm wondering if this is the best move.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we make UpdateFeaturesResponse.Results
ignorable from V2, the field will be null in V2 response. In V1 response, it can only be an empty array. We can tell whether this is V2 response based on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In V1, if there is a top level error, we don't populate the results array, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we don't, but we also don't set a top level error if a feture fails. I can amend this to not return the results array in the error case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In V1, if we get a top level error like NOT_CONTROLLER, we don't populate the results array, right?
If we fail fast for V1, we could just do the same thing, i.e. set a top level error and don't populate the results array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup -- the key difference between not controller and a feature specific error is that we want the user to know which feature failed still. That's why the error message is crucial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I believe I've pushed the code with this implementation. Please take a look when you get a chance :)
upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), records)); | ||
ApiError error = updateFeature(entry.getKey(), entry.getValue(), | ||
upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), records, proposedUpdatedVersions); | ||
if (requestVersion > 1 && !error.error().equals(Errors.NONE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about a bit more. Even for V0/V1 request, it's probably better to enforce the dependency check since it's more correct. Perhaps it's better to just fail fast for all versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a breaking change to only return partial results though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 3.9, there is only one feature that one could set. So that behavior won't be changing even if we fail fast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm -- 3.9 has kraft and metadata version right?
I'm also wondering about the case where someone is using an old client on a new server (maybe not common) and there could be many features
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm -- 3.9 has kraft and metadata version right?
That's true.
Here is my reasoning. The old behavior of v1 allows the setting of kraft.version=1 and metadata.version=3.8. However, this is incorrect since kraft.version=1 depends on metadata.version=3.9. So, by failing fast, we are fixing a bug here.
Failing fast does mean that we disallow certain old behavior like accepting metadata.version=3.8 and rejecting kraft.version=-1. But the impact is not as important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan : Thanks for the updated PR. A few more comments.
clients/src/main/resources/common/message/UpdateFeaturesResponse.json
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
Outdated
Show resolved
Hide resolved
if (errorEntry.isPresent()) { | ||
String errorFeatureName = errorEntry.get().getKey(); | ||
ApiError topError = errorEntry.get().getValue(); | ||
String errorString = errorFeatureName + ":" + topError.error().exceptionName() + " (" + topError.message() + ")"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to construct the new error message? FeatureControlManager.invalidUpdateVersion
already includes the feature name in the error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I originally did this to also include the exception name but if we just take it as the top level exception we don't need to do this.
2 test failures look related so i will take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan : Thanks for the updated PR. Made a pass of all files. A few more comments.
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
Show resolved
Hide resolved
UpdateFeaturesResponse.createWithErrors( | ||
ApiError.NONE, | ||
featureUpdateErrors.asJava, | ||
featureUpdateErrors.asJava.keySet(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We added updates
in UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Set<String> updates, int throttleTimeMs)
just for this call. But if this call is wrong anyway, it seems it's simpler to just remove updates
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also used for None errors in the KafkaAdminClient test testUpdateFeaturesHandleNotControllerException
. Maybe it is not totally necessary though. The other thing that is a little interesting about that test is that it doesn't serialize the result -- so it will always look like the v1 response.
server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
Outdated
Show resolved
Hide resolved
final ExecutionException e = assertThrows(ExecutionException.class, future::get); | ||
assertEquals(e.getCause().getClass(), error.exception().getClass()); | ||
} | ||
// If any update fails, we should have a top level error. The future should be successful. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If any update fails, we should have a top level error.
But this path has no top level error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment was trying to say that if the feature (what is being checked) failed, there should be a non-NONE error. Since there is a NONE error, the future should be successful. I can try to re-word to make it clearer.
@@ -6410,8 +6377,7 @@ public void testUpdateFeaturesHandleNotControllerException() throws Exception { | |||
request -> request instanceof UpdateFeaturesRequest, | |||
UpdateFeaturesResponse.createWithErrors( | |||
ApiError.NONE, | |||
Utils.mkMap(Utils.mkEntry("test_feature_1", ApiError.NONE), | |||
Utils.mkEntry("test_feature_2", ApiError.NONE)), | |||
Utils.mkSet("test_feature_1", "test_feature_2"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which version of the response are we testing here? If it's the latest version, the set should be empty, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, I referenced this above. There is no serialization so this is going to respond like a v1 request (we won't ignore the feature fields).
I can change it to not do this, but I wonder if we have testing for v1 responses now. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I will add some v1/v2 tests here, and one for the serialization in the UpdateFeaturesResponseTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan : Thanks for the updated PR. Just one more comment.
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan : Thanks for the updated PR. LGTM. It would be useful to update the KIP and the mailing list with the changes.
Yes! I will do that. Thanks for the thorough reviews Jun! |
I also noticed my comment about version 2 is off. I will update that in the json. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan : Thanks for the latest update. LGTM
This change includes: