Skip to content

KAFKA-19411: Fix deleteAcls bug which allows more deletions than max records per user op #19974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -184,6 +184,10 @@ ControllerResult<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> filter
validateFilter(filter);
AclDeleteResult result = deleteAclsForFilter(filter, records);
results.add(result);
} catch (BoundedListTooLongException e) {
// we do not return partial results here because the fact that only a portion of the deletions
// succeeded can be easily missed due to response size. instead fail the entire response
throw new InvalidRequestException(e.getMessage(), e);
} catch (Throwable e) {
results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception()));
}
@@ -199,13 +203,14 @@ AclDeleteResult deleteAclsForFilter(AclBindingFilter filter,
StandardAcl acl = entry.getValue();
AclBinding binding = acl.toBinding();
if (filter.matches(binding)) {
deleted.add(new AclBindingDeleteResult(binding));
records.add(new ApiMessageAndVersion(
new RemoveAccessControlEntryRecord().setId(id), (short) 0));
if (records.size() > MAX_RECORDS_PER_USER_OP) {
// check size limitation first before adding additional records
if (records.size() >= MAX_RECORDS_PER_USER_OP) {
throw new BoundedListTooLongException("Cannot remove more than " +
MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.");
}
deleted.add(new AclBindingDeleteResult(binding));
records.add(new ApiMessageAndVersion(
new RemoveAccessControlEntryRecord().setId(id), (short) 0));
}
}
return new AclDeleteResult(deleted);
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.apache.kafka.timeline.SnapshotRegistry;

import org.junit.jupiter.api.Test;
@@ -69,6 +70,7 @@
import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.PatternType.MATCH;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -380,4 +382,60 @@ public void testDeleteDedupe() {
assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsBothFilters.records().get(0).message()).id());
assertEquals(2, deleteAclResultsBothFilters.response().size());
}

@Test
public void testDeleteExceedsMaxRecords() {
AclControlManager manager = new AclControlManager.Builder().build();
MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer();
authorizer.loadSnapshot(manager.idToAcl());

List<AclBinding> firstCreate = new ArrayList<>();
List<AclBinding> secondCreate = new ArrayList<>();

// create MAX_RECORDS_PER_USER_OP + 2 ACLs
for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
StandardAclWithId acl = new StandardAclWithId(Uuid.randomUuid(),
new StandardAcl(
ResourceType.TOPIC,
"mytopic_" + i,
PatternType.LITERAL,
"User:alice",
"127.0.0.1",
AclOperation.READ,
AclPermissionType.ALLOW));

// split acl creations between two create requests
if (i % 2 == 0) {
firstCreate.add(acl.toBinding());
} else {
secondCreate.add(acl.toBinding());
}
}
ControllerResult<List<AclCreateResult>> firstCreateResult = manager.createAcls(firstCreate);
assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, firstCreateResult.response().size());
for (AclCreateResult result : firstCreateResult.response()) {
assertTrue(result.exception().isEmpty());
}

ControllerResult<List<AclCreateResult>> secondCreateResult = manager.createAcls(secondCreate);
assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, secondCreateResult.response().size());
for (AclCreateResult result : secondCreateResult.response()) {
assertTrue(result.exception().isEmpty());
}

RecordTestUtils.replayAll(manager, firstCreateResult.records());
RecordTestUtils.replayAll(manager, secondCreateResult.records());
assertFalse(manager.idToAcl().isEmpty());

ArrayList<AclBindingFilter> filters = new ArrayList<>();
for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
filters.add(new AclBindingFilter(
new ResourcePatternFilter(ResourceType.TOPIC, "mytopic_" + i, PatternType.LITERAL),
AccessControlEntryFilter.ANY));
}

Exception exception = assertThrows(InvalidRequestException.class, () -> manager.deleteAcls(filters));
assertEquals(BoundedListTooLongException.class, exception.getCause().getClass());
assertEquals("Cannot remove more than " + MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.", exception.getCause().getMessage());
}
}