Skip to content

Commit

Permalink
Address comments, add v1/v2 testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jolshan committed Sep 30, 2024
1 parent 75c2cff commit f115da8
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static UpdateFeaturesResponse parse(ByteBuffer buffer, short version) {

public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Set<String> updates, int throttleTimeMs) {
final UpdatableFeatureResultCollection results = new UpdatableFeatureResultCollection();
if (topLevelError.error() == Errors.NONE) {
if (topLevelError == ApiError.NONE) {
for (final String feature : updates) {
final UpdatableFeatureResult result = new UpdatableFeatureResult();
result.setFeature(feature)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6324,18 +6324,19 @@ private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
}

private void testUpdateFeatures(Map<String, FeatureUpdate> featureUpdates,
ApiError topLevelError) throws Exception {
ApiError topLevelError,
Set<String> updates) throws Exception {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().prepareResponse(
body -> body instanceof UpdateFeaturesRequest,
UpdateFeaturesResponse.createWithErrors(topLevelError, Utils.mkSet(), 0));
UpdateFeaturesResponse.createWithErrors(topLevelError, updates, 0));
final Map<String, KafkaFuture<Void>> futures = env.adminClient().updateFeatures(
featureUpdates,
new UpdateFeaturesOptions().timeoutMs(10000)).values();
for (final Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) {
final KafkaFuture<Void> future = entry.getValue();
if (topLevelError.error() == Errors.NONE) {
// If any update fails, we should have a top level error. The future should be successful.
// Since the top level error was NONE, each future should be successful.
future.get();
} else {
final ExecutionException e = assertThrows(ExecutionException.class, future::get);
Expand All @@ -6346,20 +6347,24 @@ private void testUpdateFeatures(Map<String, FeatureUpdate> featureUpdates,
}
}

@Test
public void testUpdateFeaturesDuringSuccess() throws Exception {
@ParameterizedTest
@ValueSource(shorts = {1, 2})
public void testUpdateFeaturesDuringSuccess(short version) throws Exception {
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
testUpdateFeatures(updates, ApiError.NONE);
// Only v1 and below specifies error codes per feature for NONE error.
Set<String> features = version <= 1 ? updates.keySet() : Utils.mkSet();
testUpdateFeatures(updates, ApiError.NONE, features);
}

@Test
public void testUpdateFeaturesTopLevelError() throws Exception {
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
testUpdateFeatures(updates, new ApiError(Errors.INVALID_REQUEST));
testUpdateFeatures(updates, new ApiError(Errors.INVALID_REQUEST), Utils.mkSet());
}

@Test
public void testUpdateFeaturesHandleNotControllerException() throws Exception {
@ParameterizedTest
@ValueSource(shorts = {1, 2})
public void testUpdateFeaturesHandleNotControllerException(short version) throws Exception {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().prepareResponseFrom(
request -> request instanceof UpdateFeaturesRequest,
Expand All @@ -6373,11 +6378,13 @@ public void testUpdateFeaturesHandleNotControllerException() throws Exception {
env.cluster().clusterResource().clusterId(),
controllerId,
Collections.emptyList()));
// Only v1 and below specifies error codes per feature for NONE error.
Set<String> features = version <= 1 ? Utils.mkSet("test_feature_1", "test_feature_2") : Utils.mkSet();
env.kafkaClient().prepareResponseFrom(
request -> request instanceof UpdateFeaturesRequest,
UpdateFeaturesResponse.createWithErrors(
ApiError.NONE,
Utils.mkSet("test_feature_1", "test_feature_2"),
features,
0),
env.cluster().nodeById(controllerId));
final KafkaFuture<Void> future = env.adminClient().updateFeatures(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;

import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;

import java.util.Map;

Expand Down Expand Up @@ -58,4 +62,22 @@ public void testErrorCounts() {
assertEquals(2, errorCounts.get(Errors.UNKNOWN_SERVER_ERROR).intValue());
assertEquals(1, errorCounts.get(Errors.FEATURE_UPDATE_FAILED).intValue());
}

@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.UPDATE_FEATURES)
public void testSerialization(short version) {
UpdateFeaturesResponse noErrorResponse = UpdateFeaturesResponse.parse(UpdateFeaturesResponse.createWithErrors(ApiError.NONE,
Utils.mkSet("feature-1", "feature-2"), 0).serialize(version), version);

// Versions 1 and below still contain feature level results when the error is NONE.
int expectedSize = version <= 1 ? 2 : 0;
assertEquals(ApiError.NONE, noErrorResponse.topLevelError());
assertEquals(expectedSize, noErrorResponse.data().results().size());

ApiError error = new ApiError(Errors.INVALID_UPDATE_VERSION);
UpdateFeaturesResponse errorResponse = UpdateFeaturesResponse.parse(UpdateFeaturesResponse.createWithErrors(error,
Utils.mkSet("feature-1", "feature-2"), 0).serialize(version), version);
assertEquals(error, errorResponse.topLevelError());
assertEquals(0, errorResponse.data().results().size());
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3581,7 +3581,7 @@ class KafkaApis(val requestChannel: RequestChannel,
Collections.emptySet(),
throttleTimeMs)
case Right(featureUpdateErrors) =>
// This is response is not correct, but since this is ZK specific code it will be removed in 4.0
// This response is not correct, but since this is ZK specific code it will be removed in 4.0
UpdateFeaturesResponse.createWithErrors(
ApiError.NONE,
featureUpdateErrors.asJava.keySet(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
Expand Down Expand Up @@ -2314,7 +2313,8 @@ public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
return featureControl.updateFeatures(updates, upgradeTypes, request.validateOnly());
}).thenApply(result -> {
UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData();


// Only specify per feature responses if the error is None and request version is less than or equal to 1.
if (result != ApiError.NONE) {
responseData.setErrorCode(result.error().code());
responseData.setErrorMessage("The update failed for all features since the following feature had an error: " + result.message());
Expand All @@ -2324,7 +2324,8 @@ public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
responseData.results().add(
new UpdateFeaturesResponseData.UpdatableFeatureResult()
.setFeature(featureName.feature())
.setErrorCode(Errors.NONE.code())
.setErrorCode(result.error().code())
.setErrorMessage(result.error().message())
));
}
return responseData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testInvalidValidateVersion() {
assertThrows(IllegalArgumentException.class,
() -> Features.validateVersion(
TestFeatureVersion.TEST_2,
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_6_IV0.featureLevel())
Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_7_IV0.featureLevel())
)
);
}
Expand Down

0 comments on commit f115da8

Please sign in to comment.