Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -184,6 +184,10 @@ ControllerResult<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> filter
validateFilter(filter);
AclDeleteResult result = deleteAclsForFilter(filter, records);
results.add(result);
} catch (BoundedListTooLongException e) {
// we do not return partial results here because the fact that only a portion of the deletions
// succeeded can be easily missed due to response size. instead fail the entire response
throw new InvalidRequestException(e.getMessage(), e);
} catch (Throwable e) {
results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception()));
}
@@ -199,13 +203,14 @@ AclDeleteResult deleteAclsForFilter(AclBindingFilter filter,
StandardAcl acl = entry.getValue();
AclBinding binding = acl.toBinding();
if (filter.matches(binding)) {
deleted.add(new AclBindingDeleteResult(binding));
records.add(new ApiMessageAndVersion(
new RemoveAccessControlEntryRecord().setId(id), (short) 0));
if (records.size() > MAX_RECORDS_PER_USER_OP) {
// check size limitation first before adding additional records
if (records.size() >= MAX_RECORDS_PER_USER_OP) {
throw new BoundedListTooLongException("Cannot remove more than " +
MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.");
}
deleted.add(new AclBindingDeleteResult(binding));
records.add(new ApiMessageAndVersion(
new RemoveAccessControlEntryRecord().setId(id), (short) 0));
}
}
return new AclDeleteResult(deleted);
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.apache.kafka.timeline.SnapshotRegistry;

import org.junit.jupiter.api.Test;
@@ -69,6 +70,7 @@
import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.PatternType.MATCH;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -380,4 +382,60 @@ public void testDeleteDedupe() {
assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsBothFilters.records().get(0).message()).id());
assertEquals(2, deleteAclResultsBothFilters.response().size());
}

@Test
public void testDeleteExceedsMaxRecords() {
AclControlManager manager = new AclControlManager.Builder().build();
MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer();
authorizer.loadSnapshot(manager.idToAcl());

List<AclBinding> firstCreate = new ArrayList<>();
List<AclBinding> secondCreate = new ArrayList<>();

// create MAX_RECORDS_PER_USER_OP + 2 ACLs
for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
StandardAclWithId acl = new StandardAclWithId(Uuid.randomUuid(),
new StandardAcl(
ResourceType.TOPIC,
"mytopic_" + i,
PatternType.LITERAL,
"User:alice",
"127.0.0.1",
AclOperation.READ,
AclPermissionType.ALLOW));

// split acl creations between two create requests
if (i % 2 == 0) {
firstCreate.add(acl.toBinding());
} else {
secondCreate.add(acl.toBinding());
}
}
ControllerResult<List<AclCreateResult>> firstCreateResult = manager.createAcls(firstCreate);
assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, firstCreateResult.response().size());
for (AclCreateResult result : firstCreateResult.response()) {
assertTrue(result.exception().isEmpty());
}

ControllerResult<List<AclCreateResult>> secondCreateResult = manager.createAcls(secondCreate);
assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, secondCreateResult.response().size());
for (AclCreateResult result : secondCreateResult.response()) {
assertTrue(result.exception().isEmpty());
}

RecordTestUtils.replayAll(manager, firstCreateResult.records());
RecordTestUtils.replayAll(manager, secondCreateResult.records());
assertFalse(manager.idToAcl().isEmpty());

ArrayList<AclBindingFilter> filters = new ArrayList<>();
for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
filters.add(new AclBindingFilter(
new ResourcePatternFilter(ResourceType.TOPIC, "mytopic_" + i, PatternType.LITERAL),
AccessControlEntryFilter.ANY));
}

Exception exception = assertThrows(InvalidRequestException.class, () -> manager.deleteAcls(filters));
assertEquals(BoundedListTooLongException.class, exception.getCause().getClass());
assertEquals("Cannot remove more than " + MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.", exception.getCause().getMessage());
}
}