From 66a8f739bcecc4b719adc9a063d0b17ec766d76e Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 29 Nov 2024 04:57:58 +0200 Subject: [PATCH 01/95] [fix][build] Fix error "Element encoding is not allowed here" in pom.xml (#23655) --- pom.xml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index de05eae5755c0..e6d154c1b34d6 100644 --- a/pom.xml +++ b/pom.xml @@ -2432,7 +2432,8 @@ flexible messaging model and an intuitive client API. ${pulsar.basedir}/buildtools/src/main/resources/pulsar/checkstyle.xml ${pulsar.basedir}/buildtools/src/main/resources/pulsar/suppressions.xml - UTF-8 + UTF-8 + UTF-8 **/proto/* @@ -2496,7 +2497,7 @@ flexible messaging model and an intuitive client API. ${pulsar.basedir}/buildtools/src/main/resources/pulsar/checkstyle.xml ${pulsar.basedir}/buildtools/src/main/resources/pulsar/suppressions.xml - UTF-8 + UTF-8 **/proto/* From 32b3ccfd331a1a6093aff80b78512dfd0809992f Mon Sep 17 00:00:00 2001 From: zhou zhuohan <843520313@qq.com> Date: Fri, 29 Nov 2024 10:59:19 +0800 Subject: [PATCH 02/95] [improve][client] Replace NameUtil#generateRandomName with RandomStringUtils#randomAlphanumeric (#23645) --- .../pulsar/broker/service/ServerCnxTest.java | 10 +++---- .../pulsar/client/impl/ConsumerBase.java | 5 ++-- .../pulsar/client/impl/ConsumerImpl.java | 4 +-- .../client/impl/MultiTopicsConsumerImpl.java | 6 ++-- .../apache/pulsar/client/util/NameUtil.java | 28 ------------------- .../pulsar/client/impl/ConsumerImplTest.java | 7 +++++ 6 files changed, 20 insertions(+), 40 deletions(-) delete mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/util/NameUtil.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 59e9847b75a33..b1c99940827c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -86,6 +86,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -108,7 +109,6 @@ import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.transaction.TxnID; -import org.apache.pulsar.client.util.NameUtil; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.AuthMethod; import org.apache.pulsar.common.api.proto.BaseCommand; @@ -1084,8 +1084,8 @@ public void testHandleConsumerAfterClientChannelInactive() throws Exception { final long consumerId = 1; final MutableInt requestId = new MutableInt(1); final String sName = successSubName; - final String cName1 = NameUtil.generateRandomName(); - final String cName2 = NameUtil.generateRandomName(); + final String cName1 = RandomStringUtils.randomAlphanumeric(5); + final String cName2 = RandomStringUtils.randomAlphanumeric(5); resetChannel(); setChannelConnected(); @@ -1126,8 +1126,8 @@ public void test2ndSubFailedIfDisabledConCheck() final long consumerId = 1; final MutableInt requestId = new MutableInt(1); final String sName = successSubName; - final String cName1 = NameUtil.generateRandomName(); - final String cName2 = NameUtil.generateRandomName(); + final String cName1 = RandomStringUtils.randomAlphanumeric(5); + final String cName2 = RandomStringUtils.randomAlphanumeric(5); // Disabled connection check. pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(-1); resetChannel(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 31aef2fd25abb..1ad8c6d28f1d7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -43,6 +43,7 @@ import java.util.concurrent.locks.ReentrantLock; import lombok.Getter; import lombok.Setter; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -61,7 +62,6 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.ExecutorProvider; -import org.apache.pulsar.client.util.NameUtil; import org.apache.pulsar.client.util.NoOpLock; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe; @@ -132,7 +132,8 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat this.maxReceiverQueueSize = receiverQueueSize; this.subscription = conf.getSubscriptionName(); this.conf = conf; - this.consumerName = conf.getConsumerName() == null ? NameUtil.generateRandomName() : conf.getConsumerName(); + this.consumerName = + conf.getConsumerName() == null ? RandomStringUtils.randomAlphanumeric(5) : conf.getConsumerName(); this.subscribeFuture = subscribeFuture; this.listener = conf.getMessageListener(); this.consumerEventListener = conf.getConsumerEventListener(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index d46f4af1be748..390a70095182f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -69,6 +69,7 @@ import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.Getter; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.Consumer; @@ -98,7 +99,6 @@ import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.ExecutorProvider; -import org.apache.pulsar.client.util.NameUtil; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.EncryptionContext; @@ -2267,7 +2267,7 @@ private void initDeadLetterProducerIfNeeded() { .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) .topic(this.deadLetterPolicy.getDeadLetterTopic()) .producerName(String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription, - this.consumerName, NameUtil.generateRandomName())) + this.consumerName, RandomStringUtils.randomAlphanumeric(5))) .blockIfQueueFull(false) .enableBatching(false) .enableChunking(true) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index dd6a304d9985c..6f9c5b47c55bb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -53,6 +53,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.annotation.Nullable; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; @@ -69,7 +70,6 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.ExecutorProvider; -import org.apache.pulsar.client.util.NameUtil; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.TopicName; @@ -113,7 +113,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, ExecutorProvider executorProvider, CompletableFuture> subscribeFuture, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist) { - this(client, DUMMY_TOPIC_NAME_PREFIX + NameUtil.generateRandomName(), conf, executorProvider, + this(client, DUMMY_TOPIC_NAME_PREFIX + RandomStringUtils.randomAlphanumeric(5), conf, executorProvider, subscribeFuture, schema, interceptors, createTopicIfDoesNotExist); } @@ -121,7 +121,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { ExecutorProvider executorProvider, CompletableFuture> subscribeFuture, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId, long startMessageRollbackDurationInSec) { - this(client, DUMMY_TOPIC_NAME_PREFIX + NameUtil.generateRandomName(), conf, executorProvider, + this(client, DUMMY_TOPIC_NAME_PREFIX + RandomStringUtils.randomAlphanumeric(5), conf, executorProvider, subscribeFuture, schema, interceptors, createTopicIfDoesNotExist, startMessageId, startMessageRollbackDurationInSec); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/NameUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/NameUtil.java deleted file mode 100644 index 4c416b6152cf8..0000000000000 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/NameUtil.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.util; - -import java.util.UUID; -import org.apache.commons.codec.digest.DigestUtils; - -public class NameUtil { - public static String generateRandomName() { - return DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 5); - } -} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index e62958eb96887..4831f1e384d4b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import lombok.Cleanup; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -296,4 +297,10 @@ public void testSeekAsyncInternal() { assertTrue(secondResult.isCompletedExceptionally()); verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong()); } + + @Test(invocationTimeOut = 1000) + public void testAutoGenerateConsumerName() { + Pattern consumerNamePattern = Pattern.compile("[a-zA-Z0-9]{5}"); + assertTrue(consumerNamePattern.matcher(consumer.getConsumerName()).matches()); + } } From 3a502552f5cbe3717c1039028a8e07d2abef4b06 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 29 Nov 2024 11:03:23 +0800 Subject: [PATCH 03/95] [improve] [pip] PIP-373: Add a topic's system prop that indicates whether users have published TXN messages in before. (#23210) [improve] [pip] PIP-373: Add a topic's system prop that indicates whether users have published TXN messages in before. (#23210) --- pip/pip-373.md | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 pip/pip-373.md diff --git a/pip/pip-373.md b/pip/pip-373.md new file mode 100644 index 0000000000000..b40824a186a73 --- /dev/null +++ b/pip/pip-373.md @@ -0,0 +1,78 @@ +# PIP-373: Add a topic's system prop that indicates whether users have published TXN messages in before. + +# Background knowledge + +**Transaction Buffer** +- It maintains `aborted` messages indexes and `ongoing` TXN messages indexes in memory, we call them `Two Indexes` in the following sections. +- It helps to filter out the messages that have been aborted or are still binding with an `on-going` TXN, to avoid consuming aborted messages or TXN in-progress messages. + +**Transaction Buffer initializes when a topic is loading up** +- It reads messages that contain TXN state from the original topic, to re-build `Two Indexes` into memory. + +**Transaction Buffer closing when a topic is unloading** +- It takes a snapshot for `Two Indexes` to rebuild the states faster next loading up. + +# Motivation + +- Since TXN is a feature that across multiple namespaces, it can be enabled or disabled at the cluster level. +- Transaction Buffer will try to re-build `Two Indexes` for every topic if you enable TXN. +- If you have a huge number of topics in a namespace, the task that re-build `Two Indexes` costs huge resources(CPU, Memory). + +We'd better skip Transaction Buffer re-building `Two Indexes` if the topic it is related to does not contain TXN messages to save resources usage. + +# Goals + +Skip Transaction Buffer re-building `Two Indexes` if the topic it is related to does not contain TXN messages to save resources usage. + +## In Scope + +This PIP only focuses on improving the scenario in which users have never published TXN messages on a topic. + +## Out of Scope + +This PIP does not focus on the scenario that follows. +- Published TXN messages. +- Consumed all TXN messages that were sent before. +- Transaction Buffer keeps re-building `Two Indexes` even if there are no TXN messages in the topic anymore, and it costs resources. + +# Detailed Design + +## Design & Implementation Details + +- Add a topic-level system property named `__contains_txn_messages`, the default value is `false`, and it will be set to `true` when the first TXN message is publishing. +- Transaction Buffer skips re-building `Two Indexes` if the property is `false`. + +## Public-facing Changes + +The topic property `__contains_txn_messages` becomes to a system property, it can never be used by users anymore. + +### Public API +- You will get a `NotAllowedException` when you try to set/remove a topic property named `__contains_txn_messages` by the API `pulsar-admin topics update-properties/remove-properties`. +- The value of the property `__contains_txn_messages` can be queried by `pulsar-admin topics get-properties`. + + +### Metrics +| Name | Description | Attributes | Units| +|--------------------------------------------|-----------------------------------------------------| --- | --- | +| `pulsar_broker_using_txn_topics_count` | Counter. The number of topics contains TXN messages. | cluster | - | + +# Backward & Forward Compatibility + +## Upgrade + +There are `3` scenarios that the topic's property `__contains_txn_messages` is when users try to upgrade. +- `__contains_txn_messages` is empty: broker initializes it by confirming whether there is TXN messages or not. +- **(Highlight)** `__contains_txn_messages` is not empty and is not typed `boolean`, rollback to the original behavior that always re-building `Two Indexes`, but you can never modify it anymore. +- **(Highlight)** `__contains_txn_messages` is not empty and is typed `boolean`, but it is a users' property, broker assumed that it is a system property. + +## Downgrade / Rollback + +You can downgrade or roll back gracefully. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +The PIP does not affect Geo-Replication. + +# Links +* Mailing List discussion thread: https://lists.apache.org/thread/7mblhyvsrw5zybo0gs5512xg8f9sm67v +* Mailing List voting thread: https://lists.apache.org/thread/s6z5gcjyw081cxf9pwz361r8dt2k8gvl From d1753ee44221cd2bb9f16f18412617ab533112f0 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Fri, 29 Nov 2024 14:00:16 +0800 Subject: [PATCH 04/95] [fix][broker] Revert "[improve][client] Add log when can't add message to the container (#23657) --- .../java/org/apache/pulsar/client/impl/MessagesImpl.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java index 9768fd7c74b0f..d4cd36a22e15f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java @@ -22,13 +22,11 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import lombok.extern.slf4j.Slf4j; import net.jcip.annotations.NotThreadSafe; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; @NotThreadSafe -@Slf4j public class MessagesImpl implements Messages { private final List> messageList; @@ -51,14 +49,10 @@ protected boolean canAdd(Message message) { return true; } if (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages) { - log.warn("can't add message to the container, has exceeded the maxNumberOfMessages : {} ", - maxNumberOfMessages); return false; } if (maxSizeOfMessages > 0 && currentSizeOfMessages + message.size() > maxSizeOfMessages) { - log.warn("can't add message to the container, has exceeded the maxSizeOfMessages : {} ", - maxSizeOfMessages); return false; } From eb60d0ab2022f11ebd2217a32ef60886f757cddf Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Fri, 29 Nov 2024 16:30:48 +0800 Subject: [PATCH 05/95] [fix][broker]: support missing broker level fine-granted permissions (#23637) --- .../authorization/AuthorizationProvider.java | 10 + .../authorization/AuthorizationService.java | 23 ++ .../pulsar/broker/admin/impl/BrokersBase.java | 100 ++++++- .../BrokerEndpointsAuthorizationTest.java | 277 ++++++++++++++++++ .../client/admin/internal/BrokersImpl.java | 2 +- .../common/policies/data/BrokerOperation.java | 39 +++ 6 files changed, 437 insertions(+), 14 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerOperation.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java index ffb38f770a9cc..48386265940a3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java @@ -32,6 +32,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.BrokerOperation; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; @@ -383,4 +384,13 @@ default CompletableFuture>> getPermissionsAsync(Name String.format("getPermissionsAsync on namespaceName %s is not supported by the Authorization", namespaceName))); } + + default CompletableFuture allowBrokerOperationAsync(String clusterName, + String brokerId, + BrokerOperation brokerOperation, + String role, + AuthenticationDataSource authData) { + return FutureUtil.failedFuture( + new UnsupportedOperationException("allowBrokerOperationAsync is not supported yet.")); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 2951eb1f2973f..1348a405b0dfa 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -38,6 +38,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.BrokerOperation; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; @@ -544,6 +545,28 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, } } + public CompletableFuture allowBrokerOperationAsync(String clusterName, + String brokerId, + BrokerOperation brokerOperation, + String originalRole, + String role, + AuthenticationDataSource authData) { + if (!isValidOriginalPrincipal(role, originalRole, authData)) { + return CompletableFuture.completedFuture(false); + } + + if (isProxyRole(role)) { + final var isRoleAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, + brokerOperation, role, authData); + final var isOriginalAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, + brokerOperation, originalRole, authData); + return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, + (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); + } else { + return provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData); + } + } + /** * @deprecated - will be removed after 2.12. Use async variant. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index e397dbb64a075..a24a78d8e3102 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -69,6 +69,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BrokerInfo; +import org.apache.pulsar.common.policies.data.BrokerOperation; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ThreadDumpUtil; @@ -107,7 +108,8 @@ public class BrokersBase extends AdminResource { @ApiResponse(code = 404, message = "Cluster does not exist: cluster={clustername}") }) public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse, @PathParam("cluster") String cluster) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(cluster == null ? pulsar().getConfiguration().getClusterName() + : cluster, pulsar().getBrokerId(), BrokerOperation.LIST_BROKERS) .thenCompose(__ -> validateClusterOwnershipAsync(cluster)) .thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync()) .thenAccept(activeBrokers -> { @@ -148,7 +150,9 @@ public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse) throw @ApiResponse(code = 403, message = "This operation requires super-user access"), @ApiResponse(code = 404, message = "Leader broker not found") }) public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) { - validateSuperUserAccessAsync().thenAccept(__ -> { + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), + pulsar().getBrokerId(), BrokerOperation.GET_LEADER_BROKER) + .thenAccept(__ -> { LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader() .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker")); BrokerInfo brokerInfo = BrokerInfo.builder() @@ -175,7 +179,8 @@ public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) { public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse, @PathParam("clusterName") String cluster, @PathParam("brokerId") String brokerId) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), + pulsar().getBrokerId(), BrokerOperation.LIST_OWNED_NAMESPACES) .thenCompose(__ -> maybeRedirectToBroker(brokerId)) .thenCompose(__ -> validateClusterOwnershipAsync(cluster)) .thenCompose(__ -> pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync()) @@ -204,7 +209,8 @@ public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse, public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse, @PathParam("configName") String configName, @PathParam("configValue") String configValue) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), + BrokerOperation.UPDATE_DYNAMIC_CONFIGURATION) .thenCompose(__ -> persistDynamicConfigurationAsync(configName, configValue)) .thenAccept(__ -> { LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue); @@ -228,7 +234,8 @@ public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse, public void deleteDynamicConfiguration( @Suspended AsyncResponse asyncResponse, @PathParam("configName") String configName) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), + BrokerOperation.DELETE_DYNAMIC_CONFIGURATION) .thenCompose(__ -> internalDeleteDynamicConfigurationOnMetadataAsync(configName)) .thenAccept(__ -> { LOG.info("[{}] Successfully to delete dynamic configuration {}", clientAppId(), configName); @@ -249,7 +256,8 @@ public void deleteDynamicConfiguration( @ApiResponse(code = 404, message = "Configuration not found"), @ApiResponse(code = 500, message = "Internal server error")}) public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), + BrokerOperation.LIST_DYNAMIC_CONFIGURATIONS) .thenCompose(__ -> dynamicConfigurationResources().getDynamicConfigurationAsync()) .thenAccept(configOpt -> asyncResponse.resume(configOpt.orElseGet(Collections::emptyMap))) .exceptionally(ex -> { @@ -266,7 +274,8 @@ public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse) @ApiResponses(value = { @ApiResponse(code = 403, message = "You don't have admin permission to get configuration")}) public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), + BrokerOperation.LIST_DYNAMIC_CONFIGURATIONS) .thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getDynamicConfiguration())) .exceptionally(ex -> { LOG.error("[{}] Failed to get all dynamic configuration names.", clientAppId(), ex); @@ -281,7 +290,8 @@ public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse) response = String.class, responseContainer = "Map") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) public void getRuntimeConfiguration(@Suspended AsyncResponse asyncResponse) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), + BrokerOperation.LIST_RUNTIME_CONFIGURATIONS) .thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getRuntimeConfiguration())) .exceptionally(ex -> { LOG.error("[{}] Failed to get runtime configuration.", clientAppId(), ex); @@ -322,7 +332,8 @@ private synchronized CompletableFuture persistDynamicConfigurationAsync( @ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class) @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), + BrokerOperation.GET_INTERNAL_CONFIGURATION_DATA) .thenAccept(__ -> asyncResponse.resume(pulsar().getInternalConfigurationData())) .exceptionally(ex -> { LOG.error("[{}] Failed to get internal configuration data.", clientAppId(), ex); @@ -339,7 +350,8 @@ public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse) @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 500, message = "Internal server error")}) public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), + BrokerOperation.CHECK_BACKLOG_QUOTA) .thenAcceptAsync(__ -> { pulsar().getBrokerService().monitorBacklogQuota(); asyncResponse.resume(Response.noContent().build()); @@ -378,7 +390,8 @@ public void healthCheck(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "Topic Version") @QueryParam("topicVersion") TopicVersion topicVersion, @QueryParam("brokerId") String brokerId) { - validateSuperUserAccessAsync() + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), StringUtils.isBlank(brokerId) + ? pulsar().getBrokerId() : brokerId, BrokerOperation.HEALTH_CHECK) .thenAccept(__ -> checkDeadlockedThreads()) .thenCompose(__ -> maybeRedirectToBroker( StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId)) @@ -596,8 +609,9 @@ public void shutDownBrokerGracefully( @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean forcedTerminateTopic, @Suspended final AsyncResponse asyncResponse ) { - validateSuperUserAccess(); - doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, forcedTerminateTopic) + validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), + BrokerOperation.SHUTDOWN) + .thenCompose(__ -> doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, forcedTerminateTopic)) .thenAccept(__ -> { LOG.info("[{}] Successfully shutdown broker gracefully", clientAppId()); asyncResponse.resume(Response.noContent().build()); @@ -614,5 +628,65 @@ private CompletableFuture doShutDownBrokerGracefullyAsync(int maxConcurren pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic); return pulsar().closeAsync(); } + + + private CompletableFuture validateBothSuperuserAndBrokerOperation(String cluster, String brokerId, + BrokerOperation operation) { + final var superUserAccessValidation = validateSuperUserAccessAsync(); + final var brokerOperationValidation = validateBrokerOperationAsync(cluster, brokerId, operation); + return FutureUtil.waitForAll(List.of(superUserAccessValidation, brokerOperationValidation)) + .handle((result, err) -> { + if (!superUserAccessValidation.isCompletedExceptionally() + || !brokerOperationValidation.isCompletedExceptionally()) { + return null; + } + if (LOG.isDebugEnabled()) { + Throwable superUserValidationException = null; + try { + superUserAccessValidation.join(); + } catch (Throwable ex) { + superUserValidationException = FutureUtil.unwrapCompletionException(ex); + } + Throwable brokerOperationValidationException = null; + try { + brokerOperationValidation.join(); + } catch (Throwable ex) { + brokerOperationValidationException = FutureUtil.unwrapCompletionException(ex); + } + LOG.debug("validateBothSuperuserAndBrokerOperation failed." + + " originalPrincipal={} clientAppId={} operation={} broker={} " + + "superuserValidationError={} brokerOperationValidationError={}", + originalPrincipal(), clientAppId(), operation.toString(), brokerId, + superUserValidationException, brokerOperationValidationException); + } + throw new RestException(Status.UNAUTHORIZED, + String.format("Unauthorized to validateBothSuperuserAndBrokerOperation for" + + " originalPrincipal [%s] and clientAppId [%s] " + + "about operation [%s] on broker [%s]", + originalPrincipal(), clientAppId(), operation.toString(), brokerId)); + }); + } + + + private CompletableFuture validateBrokerOperationAsync(String cluster, String brokerId, + BrokerOperation operation) { + final var pulsar = pulsar(); + if (pulsar.getBrokerService().isAuthenticationEnabled() + && pulsar.getBrokerService().isAuthorizationEnabled()) { + return pulsar.getBrokerService().getAuthorizationService() + .allowBrokerOperationAsync(cluster, brokerId, operation, originalPrincipal(), + clientAppId(), clientAuthData()) + .thenAccept(isAuthorized -> { + if (!isAuthorized) { + throw new RestException(Status.UNAUTHORIZED, + String.format("Unauthorized to validateBrokerOperation for" + + " originalPrincipal [%s] and clientAppId [%s] " + + "about operation [%s] on broker [%s]", + originalPrincipal(), clientAppId(), operation.toString(), brokerId)); + } + }); + } + return CompletableFuture.completedFuture(null); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java new file mode 100644 index 0000000000000..ef66f005b3c58 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import lombok.SneakyThrows; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.naming.TopicVersion; +import org.apache.pulsar.common.policies.data.BrokerOperation; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +public class BrokerEndpointsAuthorizationTest extends MockedPulsarStandalone { + private AuthorizationService orignalAuthorizationService; + private AuthorizationService spyAuthorizationService; + + private PulsarAdmin superUserAdmin; + private PulsarAdmin nobodyAdmin; + + @SneakyThrows + @BeforeClass(alwaysRun = true) + public void setup() { + configureTokenAuthentication(); + configureDefaultAuthorization(); + start(); + this.superUserAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + this.nobodyAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(NOBODY_TOKEN)) + .build(); + } + + @BeforeMethod(alwaysRun = true) + public void before() throws IllegalAccessException { + orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); + spyAuthorizationService = spy(orignalAuthorizationService); + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + spyAuthorizationService, true); + } + + @AfterMethod(alwaysRun = true) + public void after() throws IllegalAccessException { + if (orignalAuthorizationService != null) { + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", orignalAuthorizationService, true); + } + } + + @SneakyThrows + @AfterClass(alwaysRun = true) + public void cleanup() { + if (superUserAdmin != null) { + superUserAdmin.close(); + superUserAdmin = null; + } + spyAuthorizationService = null; + orignalAuthorizationService = null; + super.close(); + } + + @Test + public void testGetActiveBroker() throws PulsarAdminException { + superUserAdmin.brokers().getActiveBrokers(); + final String brokerId = getPulsarService().getBrokerId(); + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.LIST_BROKERS), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().getActiveBrokers()); + } + + @Test + public void testGetActiveBrokerWithCluster() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.brokers().getActiveBrokers(clusterName); + final String brokerId = getPulsarService().getBrokerId(); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.LIST_BROKERS), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().getActiveBrokers(clusterName)); + } + + @Test + public void testGetLeaderBroker() throws PulsarAdminException { + superUserAdmin.brokers().getLeaderBroker(); + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.GET_LEADER_BROKER), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().getLeaderBroker()); + } + + @Test + public void testGetOwnedNamespaces() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + superUserAdmin.brokers().getOwnedNamespaces(clusterName, brokerId); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.LIST_OWNED_NAMESPACES), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().getOwnedNamespaces(clusterName, brokerId)); + } + + @Test + public void testUpdateDynamicConfiguration() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + superUserAdmin.brokers().updateDynamicConfiguration("maxTenants", "10"); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.UPDATE_DYNAMIC_CONFIGURATION), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().updateDynamicConfiguration("maxTenants", "10")); + } + + @Test + public void testDeleteDynamicConfiguration() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + superUserAdmin.brokers().deleteDynamicConfiguration("maxTenants"); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.DELETE_DYNAMIC_CONFIGURATION), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().deleteDynamicConfiguration("maxTenants")); + } + + + @Test + public void testGetAllDynamicConfiguration() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + superUserAdmin.brokers().getAllDynamicConfigurations(); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.LIST_DYNAMIC_CONFIGURATIONS), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().getAllDynamicConfigurations()); + } + + + @Test + public void testGetDynamicConfigurationName() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + superUserAdmin.brokers().getDynamicConfigurationNames(); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.LIST_DYNAMIC_CONFIGURATIONS), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().getDynamicConfigurationNames()); + } + + + @Test + public void testGetRuntimeConfiguration() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + superUserAdmin.brokers().getRuntimeConfigurations(); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.LIST_RUNTIME_CONFIGURATIONS), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().getRuntimeConfigurations()); + } + + + @Test + public void testGetInternalConfigurationData() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + superUserAdmin.brokers().getInternalConfigurationData(); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.GET_INTERNAL_CONFIGURATION_DATA), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().getInternalConfigurationData()); + } + + + @Test + public void testBacklogQuotaCheck() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + superUserAdmin.brokers().backlogQuotaCheck(); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.CHECK_BACKLOG_QUOTA), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().backlogQuotaCheck()); + } + + @Test + public void testHealthCheck() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + final String brokerId = getPulsarService().getBrokerId(); + superUserAdmin.brokers().healthcheck(TopicVersion.V2); + // test allow broker operation + verify(spyAuthorizationService) + .allowBrokerOperationAsync(eq(clusterName), eq(brokerId), eq(BrokerOperation.HEALTH_CHECK), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.brokers().healthcheck(TopicVersion.V2)); + } +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index 35b261b196eee..b0cd3edeb21fe 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -162,7 +162,7 @@ public void backlogQuotaCheck() throws PulsarAdminException { @Override public CompletableFuture backlogQuotaCheckAsync() { - WebTarget path = adminBrokers.path("backlogQuotaCheck"); + WebTarget path = adminBrokers.path("backlog-quota-check"); return asyncGetRequest(path, new FutureCallback() {}); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerOperation.java new file mode 100644 index 0000000000000..de053fea6ad4a --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerOperation.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +public enum BrokerOperation { + LIST_BROKERS, + GET_BROKER, + + GET_LEADER_BROKER, + LIST_OWNED_NAMESPACES, + + LIST_DYNAMIC_CONFIGURATIONS, + UPDATE_DYNAMIC_CONFIGURATION, + DELETE_DYNAMIC_CONFIGURATION, + + LIST_RUNTIME_CONFIGURATIONS, + + GET_INTERNAL_CONFIGURATION_DATA, + + CHECK_BACKLOG_QUOTA, + HEALTH_CHECK, + SHUTDOWN +} From 9ed44dd77f81abeb63fd5ed2877a1601afdc0bcc Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 29 Nov 2024 11:05:29 +0200 Subject: [PATCH 06/95] [improve] Upgrade OpenTelemetry library to 1.44.1 version (#23656) --- .../server/src/assemble/LICENSE.bin.txt | 41 +++++++++---------- .../shell/src/assemble/LICENSE.bin.txt | 6 +-- pom.xml | 2 +- .../stats/BrokerOpenTelemetryTestUtil.java | 13 ++++-- .../client/metrics/ClientMetricsTest.java | 18 ++++++-- 5 files changed, 47 insertions(+), 33 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index fd393cfec9b76..10899bc0ae7aa 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -338,12 +338,11 @@ The Apache Software License, Version 2.0 - io.prometheus-simpleclient_tracer_otel-0.16.0.jar - io.prometheus-simpleclient_tracer_otel_agent-0.16.0.jar * Prometheus exporter - - io.prometheus-prometheus-metrics-config-1.3.1.jar - - io.prometheus-prometheus-metrics-exporter-common-1.3.1.jar - - io.prometheus-prometheus-metrics-exporter-httpserver-1.3.1.jar - - io.prometheus-prometheus-metrics-exposition-formats-1.3.1.jar - - io.prometheus-prometheus-metrics-model-1.3.1.jar - - io.prometheus-prometheus-metrics-shaded-protobuf-1.3.1.jar + - io.prometheus-prometheus-metrics-config-1.3.3.jar + - io.prometheus-prometheus-metrics-exporter-common-1.3.3.jar + - io.prometheus-prometheus-metrics-exporter-httpserver-1.3.3.jar + - io.prometheus-prometheus-metrics-exposition-formats-1.3.3.jar + - io.prometheus-prometheus-metrics-model-1.3.3.jar * Jakarta Bean Validation API - jakarta.validation-jakarta.validation-api-2.0.2.jar - javax.validation-validation-api-1.1.0.Final.jar @@ -516,21 +515,21 @@ The Apache Software License, Version 2.0 * RoaringBitmap - org.roaringbitmap-RoaringBitmap-1.2.0.jar * OpenTelemetry - - io.opentelemetry-opentelemetry-api-1.41.0.jar - - io.opentelemetry-opentelemetry-api-incubator-1.41.0-alpha.jar - - io.opentelemetry-opentelemetry-context-1.41.0.jar - - io.opentelemetry-opentelemetry-exporter-common-1.41.0.jar - - io.opentelemetry-opentelemetry-exporter-otlp-1.41.0.jar - - io.opentelemetry-opentelemetry-exporter-otlp-common-1.41.0.jar - - io.opentelemetry-opentelemetry-exporter-prometheus-1.41.0-alpha.jar - - io.opentelemetry-opentelemetry-exporter-sender-okhttp-1.41.0.jar - - io.opentelemetry-opentelemetry-sdk-1.41.0.jar - - io.opentelemetry-opentelemetry-sdk-common-1.41.0.jar - - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-1.41.0.jar - - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-spi-1.41.0.jar - - io.opentelemetry-opentelemetry-sdk-logs-1.41.0.jar - - io.opentelemetry-opentelemetry-sdk-metrics-1.41.0.jar - - io.opentelemetry-opentelemetry-sdk-trace-1.41.0.jar + - io.opentelemetry-opentelemetry-api-1.44.1.jar + - io.opentelemetry-opentelemetry-api-incubator-1.44.1-alpha.jar + - io.opentelemetry-opentelemetry-context-1.44.1.jar + - io.opentelemetry-opentelemetry-exporter-common-1.44.1.jar + - io.opentelemetry-opentelemetry-exporter-otlp-1.44.1.jar + - io.opentelemetry-opentelemetry-exporter-otlp-common-1.44.1.jar + - io.opentelemetry-opentelemetry-exporter-prometheus-1.44.1-alpha.jar + - io.opentelemetry-opentelemetry-exporter-sender-okhttp-1.44.1.jar + - io.opentelemetry-opentelemetry-sdk-1.44.1.jar + - io.opentelemetry-opentelemetry-sdk-common-1.44.1.jar + - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-1.44.1.jar + - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-spi-1.44.1.jar + - io.opentelemetry-opentelemetry-sdk-logs-1.44.1.jar + - io.opentelemetry-opentelemetry-sdk-metrics-1.44.1.jar + - io.opentelemetry-opentelemetry-sdk-trace-1.44.1.jar - io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-1.33.6.jar - io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-semconv-1.33.6-alpha.jar - io.opentelemetry.instrumentation-opentelemetry-resources-1.33.6-alpha.jar diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 1601f32bb2b34..07a40d3bc1bc8 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -388,9 +388,9 @@ The Apache Software License, Version 2.0 - log4j-slf4j2-impl-2.23.1.jar - log4j-web-2.23.1.jar * OpenTelemetry - - opentelemetry-api-1.41.0.jar - - opentelemetry-api-incubator-1.41.0-alpha.jar - - opentelemetry-context-1.41.0.jar + - opentelemetry-api-1.44.1.jar + - opentelemetry-api-incubator-1.44.1-alpha.jar + - opentelemetry-context-1.44.1.jar * BookKeeper - bookkeeper-common-allocator-4.17.1.jar diff --git a/pom.xml b/pom.xml index e6d154c1b34d6..4830358f5ea72 100644 --- a/pom.xml +++ b/pom.xml @@ -258,7 +258,7 @@ flexible messaging model and an intuitive client API. 3.4.3 1.5.2-3 2.0.6 - 1.41.0 + 1.44.1 ${opentelemetry.version}-alpha 1.33.6 ${opentelemetry.instrumentation.version}-alpha diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java index 0d46e80a70302..3bfbf2064e156 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.stats; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; import io.opentelemetry.sdk.metrics.data.MetricData; @@ -52,13 +53,14 @@ public static void disableExporters(AutoConfiguredOpenTelemetrySdkBuilder sdkBui public static void assertMetricDoubleSumValue(Collection metrics, String metricName, Attributes attributes, Consumer valueConsumer) { + Map, Object> attributesMap = attributes.asMap(); assertThat(metrics) .anySatisfy(metric -> assertThat(metric) .hasName(metricName) .hasDoubleSumSatisfying(sum -> sum.satisfies( sumData -> assertThat(sumData.getPoints()).anySatisfy( point -> { - assertThat(point.getAttributes()).isEqualTo(attributes); + assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap); valueConsumer.accept(point.getValue()); })))); } @@ -70,13 +72,14 @@ public static void assertMetricLongSumValue(Collection metrics, Stri public static void assertMetricLongSumValue(Collection metrics, String metricName, Attributes attributes, Consumer valueConsumer) { + Map, Object> attributesMap = attributes.asMap(); assertThat(metrics) .anySatisfy(metric -> assertThat(metric) .hasName(metricName) .hasLongSumSatisfying(sum -> sum.satisfies( sumData -> assertThat(sumData.getPoints()).anySatisfy( point -> { - assertThat(point.getAttributes()).isEqualTo(attributes); + assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap); valueConsumer.accept(point.getValue()); })))); } @@ -88,13 +91,14 @@ public static void assertMetricLongGaugeValue(Collection metrics, St public static void assertMetricLongGaugeValue(Collection metrics, String metricName, Attributes attributes, Consumer valueConsumer) { + Map, Object> attributesMap = attributes.asMap(); assertThat(metrics) .anySatisfy(metric -> assertThat(metric) .hasName(metricName) .hasLongGaugeSatisfying(gauge -> gauge.satisfies( pointData -> assertThat(pointData.getPoints()).anySatisfy( point -> { - assertThat(point.getAttributes()).isEqualTo(attributes); + assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap); valueConsumer.accept(point.getValue()); })))); } @@ -106,13 +110,14 @@ public static void assertMetricDoubleGaugeValue(Collection metrics, public static void assertMetricDoubleGaugeValue(Collection metrics, String metricName, Attributes attributes, Consumer valueConsumer) { + Map, Object> attributesMap = attributes.asMap(); assertThat(metrics) .anySatisfy(metric -> assertThat(metric) .hasName(metricName) .hasDoubleGaugeSatisfying(gauge -> gauge.satisfies( pointData -> assertThat(pointData.getPoints()).anySatisfy( point -> { - assertThat(point.getAttributes()).isEqualTo(attributes); + assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap); valueConsumer.accept(point.getValue()); })))); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java index 31305123c4148..02b38acf865d4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java @@ -21,7 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; -import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.metrics.SdkMeterProvider; @@ -48,7 +48,7 @@ public class ClientMetricsTest extends ProducerConsumerBase { InMemoryMetricReader reader; - OpenTelemetry otel; + OpenTelemetrySdk otel; @BeforeMethod @Override @@ -67,6 +67,14 @@ protected void setup() throws Exception { @Override protected void cleanup() throws Exception { super.internalCleanup(); + if (otel != null) { + otel.close(); + otel = null; + } + if (reader != null) { + reader.close(); + reader = null; + } } private Map collectMetrics() { @@ -88,8 +96,9 @@ private long getCounterValue(Map metrics, String name, assertNotNull(md, "metric not found: " + name); assertEquals(md.getType(), MetricDataType.LONG_SUM); + Map, Object> expectedAttributesMap = expectedAttributes.asMap(); for (var ex : md.getLongSumData().getPoints()) { - if (ex.getAttributes().equals(expectedAttributes)) { + if (ex.getAttributes().asMap().equals(expectedAttributesMap)) { return ex.getValue(); } } @@ -109,8 +118,9 @@ private long getHistoCountValue(Map metrics, String name, assertNotNull(md, "metric not found: " + name); assertEquals(md.getType(), MetricDataType.HISTOGRAM); + Map, Object> expectedAttributesMap = expectedAttributes.asMap(); for (var ex : md.getHistogramData().getPoints()) { - if (ex.getAttributes().equals(expectedAttributes)) { + if (ex.getAttributes().asMap().equals(expectedAttributesMap)) { return ex.getCount(); } } From 429f7de96826d20bc90eb54a3b40df0e31f4b42f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 29 Nov 2024 11:17:08 +0200 Subject: [PATCH 07/95] [improve][client] Reduce unshaded dependencies and shading warnings in shaded Java client modules (#23647) --- .../server/src/assemble/LICENSE.bin.txt | 2 - .../shell/src/assemble/LICENSE.bin.txt | 3 - jetcd-core-shaded/pom.xml | 4 + pom.xml | 30 +- pulsar-broker-auth-oidc/pom.xml | 4 + pulsar-broker-common/pom.xml | 4 +- pulsar-broker/pom.xml | 4 +- pulsar-client-admin-shaded/pom.xml | 311 ++++++++++------- ...sar.shade.javax.ws.rs.client.ClientBuilder | 1 + ...lsar.shade.javax.ws.rs.ext.RuntimeDelegate | 1 + pulsar-client-admin/pom.xml | 4 +- pulsar-client-all/pom.xml | 312 ++++++++++-------- ...sar.shade.javax.ws.rs.client.ClientBuilder | 1 + ...lsar.shade.javax.ws.rs.ext.RuntimeDelegate | 1 + pulsar-client-auth-athenz/pom.xml | 5 + pulsar-client-auth-sasl/pom.xml | 4 +- pulsar-client-shaded/pom.xml | 240 ++++++++------ pulsar-common/pom.xml | 4 +- pulsar-functions/proto/pom.xml | 6 +- pulsar-functions/runtime/pom.xml | 6 +- pulsar-functions/secrets/pom.xml | 6 +- pulsar-proxy/pom.xml | 4 +- tiered-storage/jcloud/pom.xml | 4 +- 23 files changed, 580 insertions(+), 381 deletions(-) create mode 100644 pulsar-client-admin-shaded/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.client.ClientBuilder create mode 100644 pulsar-client-admin-shaded/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.ext.RuntimeDelegate create mode 100644 pulsar-client-all/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.client.ClientBuilder create mode 100644 pulsar-client-all/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.ext.RuntimeDelegate diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 10899bc0ae7aa..21422a41dcef9 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -571,10 +571,8 @@ Protocol Buffers License CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt * Java Annotations API - com.sun.activation-javax.activation-1.2.0.jar - - javax.xml.bind-jaxb-api-2.3.1.jar * Java Servlet API -- javax.servlet-javax.servlet-api-3.1.0.jar * WebSocket Server API -- javax.websocket-javax.websocket-client-api-1.0.jar - * Java Web Service REST API -- javax.ws.rs-javax.ws.rs-api-2.1.jar * HK2 - Dependency Injection Kernel - org.glassfish.hk2-hk2-api-2.6.1.jar - org.glassfish.hk2-hk2-locator-2.6.1.jar diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 07a40d3bc1bc8..a21c272f91b1d 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -431,11 +431,8 @@ MIT License CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt * Java Annotations API - - javax.annotation-api-1.3.2.jar - javax.activation-1.2.0.jar - - jaxb-api-2.3.1.jar * WebSocket Server API -- javax.websocket-client-api-1.0.jar - * Java Web Service REST API -- javax.ws.rs-api-2.1.jar * HK2 - Dependency Injection Kernel - hk2-api-2.6.1.jar - hk2-locator-2.6.1.jar diff --git a/jetcd-core-shaded/pom.xml b/jetcd-core-shaded/pom.xml index 0b79df8278f81..2a5536987cd42 100644 --- a/jetcd-core-shaded/pom.xml +++ b/jetcd-core-shaded/pom.xml @@ -45,6 +45,10 @@ io.netty * + + javax.annotation + javax.annotation-api + diff --git a/pom.xml b/pom.xml index 4830358f5ea72..54744a253bb2b 100644 --- a/pom.xml +++ b/pom.xml @@ -220,7 +220,7 @@ flexible messaging model and an intuitive client API. 1.10 2.14.0 1.15 - 2.1 + 2.1.6 2.1.9 3.1.0 2.9.1 @@ -231,7 +231,7 @@ flexible messaging model and an intuitive client API. 2.1.0 3.24.2 1.18.32 - 1.3.2 + 1.3.5 2.3.1 1.2.0 1.2.2 @@ -854,9 +854,9 @@ flexible messaging model and an intuitive client API. - javax.ws.rs - javax.ws.rs-api - ${javax.ws.rs-api.version} + jakarta.ws.rs + jakarta.ws.rs-api + ${jakarta.ws.rs-api.version} @@ -990,6 +990,12 @@ flexible messaging model and an intuitive client API. com.yahoo.athenz athenz-zts-java-client-core ${athenz.version} + + + javax.ws.rs + javax.ws.rs-api + + @@ -1418,9 +1424,9 @@ flexible messaging model and an intuitive client API. - javax.annotation - javax.annotation-api - ${javax.annotation-api.version} + jakarta.annotation + jakarta.annotation-api + ${jakarta.annotation-api.version} @@ -1723,11 +1729,6 @@ flexible messaging model and an intuitive client API. lombok provided - - javax.annotation - javax.annotation-api - provided - @@ -2003,6 +2004,9 @@ flexible messaging model and an intuitive client API. src/assemble/LICENSE.bin.txt src/assemble/NOTICE.bin.txt + + **/META-INF/services/* + src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java diff --git a/pulsar-broker-auth-oidc/pom.xml b/pulsar-broker-auth-oidc/pom.xml index 72351bf47d288..2d0931c3f2dfe 100644 --- a/pulsar-broker-auth-oidc/pom.xml +++ b/pulsar-broker-auth-oidc/pom.xml @@ -95,6 +95,10 @@ bcprov-jdk18on org.bouncycastle + + javax.annotation + javax.annotation-api + diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index 30b703cd78a92..858a234a32dcd 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -60,8 +60,8 @@ - javax.ws.rs - javax.ws.rs-api + jakarta.ws.rs + jakarta.ws.rs-api diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 97ede1f76e969..a34f566447006 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -446,8 +446,8 @@ - javax.xml.bind - jaxb-api + jakarta.xml.bind + jakarta.xml.bind-api javax.activation diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index ab42f0e2aef59..74ced063fbfd4 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -1,4 +1,4 @@ - + - + 4.0.0 org.apache.pulsar pulsar 4.1.0-SNAPSHOT - pulsar-client-admin Pulsar Client Admin - ${project.groupId} @@ -73,7 +70,6 @@ - maven-antrun-plugin @@ -86,15 +82,13 @@ - + - org.apache.maven.plugins maven-shade-plugin @@ -107,48 +101,55 @@ true true - - org.apache.pulsar:pulsar-client-original - org.apache.pulsar:pulsar-client-admin-original - org.apache.commons:commons-lang3 - commons-codec:commons-codec - commons-collections:commons-collections - org.asynchttpclient:* - org.reactivestreams:reactive-streams - com.typesafe.netty:netty-reactive-streams - org.javassist:javassist - com.google.guava:guava + com.fasterxml.jackson.*:* + com.google.*:* + com.google.auth:* com.google.code.gson:gson + com.google.guava:guava com.google.re2j:re2j com.spotify:completable-futures - com.fasterxml.jackson.*:* - io.netty:* - io.netty.incubator:* - org.apache.pulsar:pulsar-common - org.apache.bookkeeper:* + com.squareup.*:* + com.sun.activation:javax.activation + com.typesafe.netty:netty-reactive-streams + com.yahoo.datasketches:* com.yahoo.datasketches:sketches-core - org.glassfish.jersey*:* - javax.ws.rs:* - javax.xml.bind:jaxb-api - jakarta.annotation:* - org.glassfish.hk2*:* + commons-*:* + commons-codec:commons-codec + commons-collections:commons-collections + io.airlift:* io.grpc:* + io.netty.incubator:* + io.netty:* + io.opencensus:* io.perfmark:* - com.yahoo.datasketches:* - com.squareup.*:* - com.google.*:* - commons-*:* + io.swagger:* + jakarta.activation:jakarta.activation-api + jakarta.annotation:jakarta.annotation-api + jakarta.ws.rs:jakarta.ws.rs-api + jakarta.xml.bind:jakarta.xml.bind-api + javax.ws.rs:* + javax.xml.bind:jaxb-api + net.jcip:jcip-annotations + org.apache.bookkeeper:* + org.apache.commons:commons-compress + org.apache.commons:commons-lang3 + org.apache.pulsar:pulsar-client-admin-original + + org.apache.pulsar:pulsar-client-messagecrypto-bc + org.apache.pulsar:pulsar-client-original + org.apache.pulsar:pulsar-common + org.asynchttpclient:* + org.checkerframework:* org.eclipse.jetty:* - com.google.auth:* + org.glassfish.hk2*:* + org.glassfish.jersey*:* + org.javassist:javassist org.jvnet.mimepull:* - io.opencensus:* org.objenesis:* + org.reactivestreams:reactive-streams org.yaml:snakeyaml - io.swagger:* - - org.apache.pulsar:pulsar-client-messagecrypto-bc com.fasterxml.jackson.core:jackson-annotations @@ -162,7 +163,7 @@ ** - + org/bouncycastle/** @@ -172,19 +173,54 @@ ** - + org/bouncycastle/** + + org.asynchttpclient:async-http-client + + ** + + + org/asynchttpclient/config/ahc-default.properties + + + + *:* + + **/module-info.class + findbugsExclude.xml + META-INF/*-LICENSE + META-INF/*-NOTICE + META-INF/*.DSA + META-INF/*.RSA + META-INF/*.SF + META-INF/DEPENDENCIES* + META-INF/io.netty.versions.properties + META-INF/LICENSE* + META-INF/license/** + META-INF/MANIFEST.MF + META-INF/maven/** + META-INF/native-image/** + META-INF/NOTICE* + META-INF/proguard/** + + - - org.asynchttpclient - org.apache.pulsar.shade.org.asynchttpclient + + + (META-INF/native/(lib)?)(netty.+\.(so|jnilib|dll))$ + $1org_apache_pulsar_shade_$3 + true - org.apache.commons - org.apache.pulsar.shade.org.apache.commons + com.fasterxml.jackson + org.apache.pulsar.shade.com.fasterxml.jackson + + com.fasterxml.jackson.annotation.* + com.google @@ -198,44 +234,61 @@ org.apache.pulsar.shade.com.spotify.futures - com.fasterxml.jackson - org.apache.pulsar.shade.com.fasterxml.jackson - - com.fasterxml.jackson.annotation.* - + com.squareup + org.apache.pulsar.shade.com.squareup - io.netty - org.apache.pulsar.shade.io.netty + com.sun.activation + org.apache.pulsar.shade.com.sun.activation - org.apache.pulsar.policies - org.apache.pulsar.shade.org.apache.pulsar.policies - - - org.apache.pulsar.checksum - org.apache.pulsar.shade.org.apache.pulsar.checksum + com.typesafe + org.apache.pulsar.shade.com.typesafe com.yahoo org.apache.pulsar.shade.com.yahoo - com.typesafe - org.apache.pulsar.shade.com.typesafe + io.airlift + org.apache.pulsar.shade.io.airlift - org.glassfish - org.apache.pulsar.shade.org.glassfish + io.grpc + org.apache.pulsar.shade.io.grpc - javax.ws - org.apache.pulsar.shade.javax.ws + io.netty + org.apache.pulsar.shade.io.netty + + + io.opencensus + org.apache.pulsar.shade.io.opencensus + + + io.swagger + org.apache.pulsar.shade.io.swagger + + + javassist + org.apache.pulsar.shade.javassist + + + javax.activation + org.apache.pulsar.shade.javax.activation javax.annotation org.apache.pulsar.shade.javax.annotation + + javax.inject + org.apache.pulsar.shade.javax.inject + + + javax.ws + org.apache.pulsar.shade.javax.ws + javax.xml.bind org.apache.pulsar.shade.javax.xml.bind @@ -245,71 +298,95 @@ org.apache.pulsar.shade.jersey - org.jvnet - org.apache.pulsar.shade.org.jvnet + META-INF/versions/(\d+)/com/fasterxml/jackson/core/ + META-INF/versions/$1/org/apache/pulsar/shade/com/fasterxml/jackson/core/ + + true + + + META-INF/versions/(\d+)/javax/xml/bind/ + META-INF/versions/$1/org/apache/pulsar/shade/javax/xml/bind/ + true + + + META-INF/versions/(\d+)/org/glassfish/ + META-INF/versions/$1/org/apache/pulsar/shade/org/glassfish/ + true + + + META-INF/versions/(\d+)/org/yaml/ + META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/ + true + + + net.jcip + org.apache.pulsar.shade.net.jcip + + + okio + org.apache.pulsar.shade.okio org.aopalliance org.apache.pulsar.shade.org.aopalliance - javassist - org.apache.pulsar.shade.javassist + org.apache.bookkeeper + org.apache.pulsar.shade.org.apache.bookkeeper - javax.inject - org.apache.pulsar.shade.javax.inject + org.apache.commons + org.apache.pulsar.shade.org.apache.commons + + + org.apache.pulsar.checksum + org.apache.pulsar.shade.org.apache.pulsar.checksum + + + org.apache.pulsar.policies + org.apache.pulsar.shade.org.apache.pulsar.policies + + + org.asynchttpclient + org.apache.pulsar.shade.org.asynchttpclient + + + org.checkerframework + org.apache.pulsar.shade.org.checkerframework + + + org.eclipse.jetty + org.apache.pulsar.shade.org.eclipse.jetty + + + org.glassfish + org.apache.pulsar.shade.org.glassfish + + + org.jvnet + org.apache.pulsar.shade.org.jvnet + + + org.objenesis + org.apache.pulsar.shade.org.objenesis org.reactivestreams org.apache.pulsar.shade.org.reactivestreams - - io.grpc - org.apache.pulsar.shade.io.grpc - - - okio - org.apache.pulsar.shade.okio - - - com.squareup - org.apache.pulsar.shade.com.squareup - - - io.opencensus - org.apache.pulsar.shade.io.opencensus - - - org.eclipse.jetty - org.apache.pulsar.shade.org.eclipse.jetty - - - org.objenesis - org.apache.pulsar.shade.org.objenesis - - - org.yaml - org.apache.pulsar.shade.org.yaml - - - io.swagger - org.apache.pulsar.shade.io.swagger - - - org.apache.bookkeeper - org.apache.pulsar.shade.org.apache.bookkeeper - - - - (META-INF/native/(lib)?)(netty.+\.(so|jnilib|dll))$ - $1org_apache_pulsar_shade_$3 - true - + + org.yaml + org.apache.pulsar.shade.org.yaml + - - + + + + + true + + diff --git a/pulsar-client-admin-shaded/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.client.ClientBuilder b/pulsar-client-admin-shaded/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.client.ClientBuilder new file mode 100644 index 0000000000000..99a08cc8ca4d1 --- /dev/null +++ b/pulsar-client-admin-shaded/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.client.ClientBuilder @@ -0,0 +1 @@ +org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyClientBuilder \ No newline at end of file diff --git a/pulsar-client-admin-shaded/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.ext.RuntimeDelegate b/pulsar-client-admin-shaded/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.ext.RuntimeDelegate new file mode 100644 index 0000000000000..0adc919b7f0c2 --- /dev/null +++ b/pulsar-client-admin-shaded/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.ext.RuntimeDelegate @@ -0,0 +1 @@ +org.apache.pulsar.shade.org.glassfish.jersey.internal.RuntimeDelegateImpl \ No newline at end of file diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml index 36070618ed891..0a94e48e9b939 100644 --- a/pulsar-client-admin/pom.xml +++ b/pulsar-client-admin/pom.xml @@ -76,8 +76,8 @@ - javax.xml.bind - jaxb-api + jakarta.xml.bind + jakarta.xml.bind-api javax.activation diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml index e26f6eeac57bf..74007745c70ee 100644 --- a/pulsar-client-all/pom.xml +++ b/pulsar-client-all/pom.xml @@ -1,3 +1,4 @@ + + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - org.apache.pulsar pulsar 4.1.0-SNAPSHOT - pulsar-client-all Pulsar Client All - ${project.groupId} @@ -69,7 +67,6 @@ test - @@ -107,7 +104,6 @@ - maven-antrun-plugin @@ -120,15 +116,13 @@ - + - org.apache.maven.plugins @@ -145,67 +139,67 @@ true true false - - org.apache.pulsar:pulsar-client-original - org.apache.pulsar:pulsar-client-admin-original - org.apache.commons:commons-lang3 - commons-codec:commons-codec - commons-collections:commons-collections - org.asynchttpclient:* - io.netty:netty-codec-http - io.netty:netty-transport-native-epoll - org.reactivestreams:reactive-streams - com.typesafe.netty:netty-reactive-streams - org.javassist:javassist - com.google.guava:* - org.checkerframework:* + com.fasterxml.jackson.*:* + com.google.*:* + com.google.auth:* com.google.code.findbugs:* + com.google.code.gson:gson com.google.errorprone:* + com.google.guava:* com.google.j2objc:* - com.google.code.gson:gson com.google.re2j:re2j com.spotify:completable-futures - com.fasterxml.jackson.*:* - io.netty:netty - io.netty:netty-all - io.netty:netty-tcnative-boringssl-static - org.eclipse.jetty:* + com.squareup.*:* + com.sun.activation:javax.activation + + com.thoughtworks.paranamer:paranamer + com.typesafe.netty:netty-reactive-streams com.yahoo.datasketches:* + com.yahoo.datasketches:sketches-core commons-*:* - io.swagger:* + commons-codec:commons-codec + commons-collections:commons-collections io.airlift:* - - org.apache.pulsar:pulsar-common - org.apache.bookkeeper:* - com.yahoo.datasketches:sketches-core - org.glassfish.jersey*:* - javax.ws.rs:* - javax.xml.bind:jaxb-api - jakarta.annotation:* - org.glassfish.hk2*:* io.grpc:* - io.perfmark:* - com.yahoo.datasketches:* - io.netty:* io.netty.incubator:* - com.squareup.*:* - com.google.*:* - commons-*:* - org.eclipse.jetty:* - com.google.auth:* - org.jvnet.mimepull:* + io.netty:* + io.netty:netty + io.netty:netty-all + io.netty:netty-codec-http + io.netty:netty-tcnative-boringssl-static + io.netty:netty-transport-native-epoll io.opencensus:* - org.objenesis:* - org.yaml:snakeyaml + io.perfmark:* + io.swagger:* + jakarta.activation:jakarta.activation-api + jakarta.annotation:jakarta.annotation-api + jakarta.ws.rs:jakarta.ws.rs-api + jakarta.xml.bind:jakarta.xml.bind-api + javax.ws.rs:* + javax.xml.bind:jaxb-api + net.jcip:jcip-annotations org.apache.avro:* - - com.thoughtworks.paranamer:paranamer + org.apache.bookkeeper:* org.apache.commons:commons-compress - org.tukaani:xz + org.apache.commons:commons-lang3 + org.apache.pulsar:pulsar-client-admin-original org.apache.pulsar:pulsar-client-messagecrypto-bc + org.apache.pulsar:pulsar-client-original + org.apache.pulsar:pulsar-common + org.asynchttpclient:* + org.checkerframework:* + org.eclipse.jetty:* + org.glassfish.hk2*:* + org.glassfish.jersey*:* + org.javassist:javassist + org.jvnet.mimepull:* + org.objenesis:* + org.reactivestreams:reactive-streams + org.tukaani:xz + org.yaml:snakeyaml com.fasterxml.jackson.core:jackson-annotations @@ -219,23 +213,54 @@ ** - + org/bouncycastle/** + + org.asynchttpclient:async-http-client + + ** + + + org/asynchttpclient/config/ahc-default.properties + + + + *:* + + **/module-info.class + findbugsExclude.xml + META-INF/*-LICENSE + META-INF/*-NOTICE + META-INF/*.DSA + META-INF/*.RSA + META-INF/*.SF + META-INF/DEPENDENCIES* + META-INF/io.netty.versions.properties + META-INF/LICENSE* + META-INF/license/** + META-INF/MANIFEST.MF + META-INF/maven/** + META-INF/native-image/** + META-INF/NOTICE* + META-INF/proguard/** + + + - org.asynchttpclient - org.apache.pulsar.shade.org.asynchttpclient - - - org.apache.bookkeeper - org.apache.pulsar.shade.org.apache.bookkeeper + (META-INF/native/(lib)?)(netty.+\.(so|jnilib|dll))$ + $1org_apache_pulsar_shade_$3 + true - org.apache.commons - org.apache.pulsar.shade.org.apache.commons + com.fasterxml.jackson + org.apache.pulsar.shade.com.fasterxml.jackson + + com.fasterxml.jackson.annotation.* + com.google @@ -249,103 +274,109 @@ org.apache.pulsar.shade.com.spotify.futures - com.fasterxml.jackson - org.apache.pulsar.shade.com.fasterxml.jackson - - com.fasterxml.jackson.annotation.* - + com.squareup + org.apache.pulsar.shade.com.squareup - io.netty - org.apache.pulsar.shade.io.netty + com.sun.activation + org.apache.pulsar.shade.com.sun.activation - org.apache.pulsar.policies - org.apache.pulsar.shade.org.apache.pulsar.policies + com.thoughtworks.paranamer + org.apache.pulsar.shade.com.thoughtworks.paranamer - com.yahoo.datasketches - org.apache.pulsar.shade.com.yahoo.datasketches + com.typesafe + org.apache.pulsar.shade.com.typesafe com.yahoo org.apache.pulsar.shade.com.yahoo - org.eclipse.jetty - org.apache.pulsar.shade.org.eclipse - - - org.reactivestreams - org.apache.pulsar.shade.org.reactivestreams + com.yahoo.datasketches + org.apache.pulsar.shade.com.yahoo.datasketches - com.typesafe - org.apache.pulsar.shade.com.typesafe + io.airlift + org.apache.pulsar.shade.io.airlift - javax.ws - org.apache.pulsar.shade.javax.ws + io.grpc + org.apache.pulsar.shade.io.grpc - javax.annotation - org.apache.pulsar.shade.javax.annotation + io.netty + org.apache.pulsar.shade.io.netty - javax.xml.bind - org.apache.pulsar.shade.javax.xml.bind + io.opencensus + org.apache.pulsar.shade.io.opencensus - jersey - org.apache.pulsar.shade.jersey + io.swagger + org.apache.pulsar.shade.io.swagger - org.jvnet - org.apache.pulsar.shade.org.jvnet + javassist + org.apache.pulsar.shade.javassist - org.aopalliance - org.apache.pulsar.shade.org.aopalliance + javax.activation + org.apache.pulsar.shade.javax.activation - javassist - org.apache.pulsar.shade.javassist + javax.annotation + org.apache.pulsar.shade.javax.annotation javax.inject org.apache.pulsar.shade.javax.inject - org.glassfish - org.apache.pulsar.shade.org.glassfish + javax.ws + org.apache.pulsar.shade.javax.ws + + + javax.xml.bind + org.apache.pulsar.shade.javax.xml.bind + + + jersey + org.apache.pulsar.shade.jersey - io.grpc - org.apache.pulsar.shade.io.grpc + META-INF/versions/(\d+)/com/fasterxml/jackson/core/ + META-INF/versions/$1/org/apache/pulsar/shade/com/fasterxml/jackson/core/ + + true - okio - org.apache.pulsar.shade.okio + META-INF/versions/(\d+)/javax/xml/bind/ + META-INF/versions/$1/org/apache/pulsar/shade/javax/xml/bind/ + true - com.squareup - org.apache.pulsar.shade.com.squareup + META-INF/versions/(\d+)/org/glassfish/ + META-INF/versions/$1/org/apache/pulsar/shade/org/glassfish/ + true - io.opencensus - org.apache.pulsar.shade.io.opencensus + META-INF/versions/(\d+)/org/yaml/ + META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/ + true - org.eclipse.jetty - org.apache.pulsar.shade.org.eclipse.jetty + net.jcip + org.apache.pulsar.shade.net.jcip - org.objenesis - org.apache.pulsar.shade.org.objenesis + okio + org.apache.pulsar.shade.okio - org.yaml - org.apache.pulsar.shade.org.yaml + org.aopalliance + org.apache.pulsar.shade.org.aopalliance org.apache.avro @@ -363,45 +394,68 @@ org.apache.avro.reflect.Union - - - org.codehaus.jackson - org.apache.pulsar.shade.org.codehaus.jackson - - com.thoughtworks.paranamer - org.apache.pulsar.shade.com.thoughtworks.paranamer + org.apache.bookkeeper + org.apache.pulsar.shade.org.apache.bookkeeper org.apache.commons org.apache.pulsar.shade.org.apache.commons - io.airlift - org.apache.pulsar.shade.io.airlift + org.apache.pulsar.policies + org.apache.pulsar.shade.org.apache.pulsar.policies + + + org.asynchttpclient + org.apache.pulsar.shade.org.asynchttpclient org.checkerframework org.apache.pulsar.shade.org.checkerframework + - javax.annotation - org.apache.pulsar.shade.javax.annotation + org.codehaus.jackson + org.apache.pulsar.shade.org.codehaus.jackson + + + org.eclipse.jetty + org.apache.pulsar.shade.org.eclipse + + + org.glassfish + org.apache.pulsar.shade.org.glassfish + + + org.jvnet + org.apache.pulsar.shade.org.jvnet + + + org.objenesis + org.apache.pulsar.shade.org.objenesis + + + org.reactivestreams + org.apache.pulsar.shade.org.reactivestreams org.tukaani org.apache.pulsar.shade.org.tukaani - - (META-INF/native/(lib)?)(netty.+\.(so|jnilib|dll))$ - $1org_apache_pulsar_shade_$3 - true + org.yaml + org.apache.pulsar.shade.org.yaml - - + + + + + true + + diff --git a/pulsar-client-all/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.client.ClientBuilder b/pulsar-client-all/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.client.ClientBuilder new file mode 100644 index 0000000000000..99a08cc8ca4d1 --- /dev/null +++ b/pulsar-client-all/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.client.ClientBuilder @@ -0,0 +1 @@ +org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyClientBuilder \ No newline at end of file diff --git a/pulsar-client-all/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.ext.RuntimeDelegate b/pulsar-client-all/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.ext.RuntimeDelegate new file mode 100644 index 0000000000000..0adc919b7f0c2 --- /dev/null +++ b/pulsar-client-all/src/main/resources/META-INF/services/org.apache.pulsar.shade.javax.ws.rs.ext.RuntimeDelegate @@ -0,0 +1 @@ +org.apache.pulsar.shade.org.glassfish.jersey.internal.RuntimeDelegateImpl \ No newline at end of file diff --git a/pulsar-client-auth-athenz/pom.xml b/pulsar-client-auth-athenz/pom.xml index 99786b4d18dd8..0e994ce25c24e 100644 --- a/pulsar-client-auth-athenz/pom.xml +++ b/pulsar-client-auth-athenz/pom.xml @@ -46,6 +46,11 @@ athenz-zts-java-client-core + + jakarta.ws.rs + jakarta.ws.rs-api + + com.yahoo.athenz athenz-cert-refresher diff --git a/pulsar-client-auth-sasl/pom.xml b/pulsar-client-auth-sasl/pom.xml index 1a4720b3d589d..61d1157afda34 100644 --- a/pulsar-client-auth-sasl/pom.xml +++ b/pulsar-client-auth-sasl/pom.xml @@ -57,8 +57,8 @@ - javax.ws.rs - javax.ws.rs-api + jakarta.ws.rs + jakarta.ws.rs-api diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index cd87d901b21bb..1093b405731ea 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -1,3 +1,4 @@ + + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - org.apache.pulsar pulsar 4.1.0-SNAPSHOT - pulsar-client Pulsar Client Java - ${project.groupId} @@ -48,11 +46,8 @@ ${project.version} - - - org.apache.maven.plugins maven-dependency-plugin @@ -88,7 +83,6 @@ - maven-antrun-plugin @@ -101,15 +95,13 @@ - + - org.apache.maven.plugins @@ -124,50 +116,54 @@ true true false - - org.apache.pulsar:pulsar-client-original - org.apache.bookkeeper:* - org.apache.commons:commons-lang3 - commons-codec:commons-codec - commons-collections:commons-collections - org.asynchttpclient:* - io.netty:netty-codec-http - io.netty:netty-transport-native-epoll - org.reactivestreams:reactive-streams - com.typesafe.netty:netty-reactive-streams - org.javassist:javassist - com.google.guava:* - org.checkerframework:* + com.fasterxml.jackson.*:* com.google.code.findbugs:* + com.google.code.gson:gson com.google.errorprone:* + com.google.guava:* com.google.j2objc:* - com.google.code.gson:gson com.google.re2j:re2j com.spotify:completable-futures - com.fasterxml.jackson.*:* - io.netty:* - io.netty.incubator:* - io.perfmark:* - org.eclipse.jetty:* + com.sun.activation:javax.activation + + com.thoughtworks.paranamer:paranamer + com.typesafe.netty:netty-reactive-streams com.yahoo.datasketches:* + com.yahoo.datasketches:sketches-core commons-*:* - io.swagger:* + commons-codec:commons-codec + commons-collections:commons-collections io.airlift:* - - org.apache.pulsar:pulsar-common - com.yahoo.datasketches:sketches-core - org.objenesis:* - org.yaml:snakeyaml - + io.netty.incubator:* + io.netty:* + io.netty:netty-codec-http + io.netty:netty-transport-native-epoll + io.perfmark:* + io.swagger:* + jakarta.activation:jakarta.activation-api + jakarta.annotation:jakarta.annotation-api + jakarta.ws.rs:jakarta.ws.rs-api + jakarta.xml.bind:jakarta.xml.bind-api + javax.ws.rs:* + net.jcip:jcip-annotations org.apache.avro:* - - com.thoughtworks.paranamer:paranamer + org.apache.bookkeeper:* org.apache.commons:commons-compress - org.tukaani:xz + org.apache.commons:commons-lang3 org.apache.pulsar:pulsar-client-messagecrypto-bc + org.apache.pulsar:pulsar-client-original + org.apache.pulsar:pulsar-common + org.asynchttpclient:* + org.checkerframework:* + org.eclipse.jetty:* + org.javassist:javassist + org.objenesis:* + org.reactivestreams:reactive-streams + org.tukaani:xz + org.yaml:snakeyaml com.fasterxml.jackson.core:jackson-annotations @@ -180,23 +176,45 @@ ** - + org/bouncycastle/** + + *:* + + **/module-info.class + findbugsExclude.xml + META-INF/*-LICENSE + META-INF/*-NOTICE + META-INF/*.DSA + META-INF/*.RSA + META-INF/*.SF + META-INF/DEPENDENCIES* + META-INF/io.netty.versions.properties + META-INF/LICENSE* + META-INF/license/** + META-INF/MANIFEST.MF + META-INF/maven/** + META-INF/native-image/** + META-INF/NOTICE* + META-INF/proguard/** + + + - org.asynchttpclient - org.apache.pulsar.shade.org.asynchttpclient - - - org.apache.commons - org.apache.pulsar.shade.org.apache.commons + (META-INF/native/(lib)?)(netty.+\.(so|jnilib|dll))$ + $1org_apache_pulsar_shade_$3 + true - io.airlift - org.apache.pulsar.shade.io.airlift + com.fasterxml.jackson + org.apache.pulsar.shade.com.fasterxml.jackson + + com.fasterxml.jackson.annotation.* + com.google @@ -210,63 +228,67 @@ org.apache.pulsar.shade.com.spotify.futures - com.fasterxml.jackson - org.apache.pulsar.shade.com.fasterxml.jackson - - com.fasterxml.jackson.annotation.* - + com.sun.activation + org.apache.pulsar.shade.com.sun.activation - io.netty - org.apache.pulsar.shade.io.netty + com.thoughtworks.paranamer + org.apache.pulsar.shade.com.thoughtworks.paranamer - org.checkerframework - org.apache.pulsar.shade.org.checkerframework + com.typesafe + org.apache.pulsar.shade.com.typesafe - javax.annotation - org.apache.pulsar.shade.javax.annotation + com.yahoo.datasketches + org.apache.pulsar.shade.com.yahoo.datasketches - io.swagger - org.apache.pulsar.shade.io.swagger + com.yahoo.memory + org.apache.pulsar.shade.com.yahoo.memory - org.apache.pulsar.policies - org.apache.pulsar.shade.org.apache.pulsar.policies + com.yahoo.sketches + org.apache.pulsar.shade.com.yahoo.sketches - com.yahoo.datasketches - org.apache.pulsar.shade.com.yahoo.datasketches + io.airlift + org.apache.pulsar.shade.io.airlift - com.yahoo.sketches - org.apache.pulsar.shade.com.yahoo.sketches + io.netty + org.apache.pulsar.shade.io.netty - org.eclipse.jetty - org.apache.pulsar.shade.org.eclipse + io.swagger + org.apache.pulsar.shade.io.swagger - org.reactivestreams - org.apache.pulsar.shade.org.reactivestreams + javax.activation + org.apache.pulsar.shade.javax.activation - com.typesafe - org.apache.pulsar.shade.com.typesafe + javax.annotation + org.apache.pulsar.shade.javax.annotation - com.yahoo.memory - org.apache.pulsar.shade.com.yahoo.memory + javax.ws + org.apache.pulsar.shade.javax.ws - org.objenesis - org.apache.pulsar.shade.org.objenesis + META-INF/versions/(\d+)/com/fasterxml/jackson/core/ + META-INF/versions/$1/org/apache/pulsar/shade/com/fasterxml/jackson/core/ + + true - org.yaml - org.apache.pulsar.shade.org.yaml + META-INF/versions/(\d+)/org/yaml/ + META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/ + true + + + net.jcip + org.apache.pulsar.shade.net.jcip org.apache.avro @@ -284,43 +306,65 @@ org.apache.avro.reflect.Union - + + org.apache.bookkeeper + org.apache.pulsar.shade.org.apache.bookkeeper + + + org.apache.commons + org.apache.pulsar.shade.org.apache.commons + + + org.apache.pulsar.policies + org.apache.pulsar.shade.org.apache.pulsar.policies + + + org.asynchttpclient + org.apache.pulsar.shade.org.asynchttpclient + + + org.checkerframework + org.apache.pulsar.shade.org.checkerframework + + org.codehaus.jackson org.apache.pulsar.shade.org.codehaus.jackson - com.thoughtworks.paranamer - org.apache.pulsar.shade.com.thoughtworks.paranamer + org.eclipse.jetty + org.apache.pulsar.shade.org.eclipse - org.apache.commons - org.apache.pulsar.shade.org.apache.commons + org.objenesis + org.apache.pulsar.shade.org.objenesis - org.tukaani - org.apache.pulsar.shade.org.tukaani + org.reactivestreams + org.apache.pulsar.shade.org.reactivestreams - org.apache.bookkeeper - org.apache.pulsar.shade.org.apache.bookkeeper + org.tukaani + org.apache.pulsar.shade.org.tukaani - - (META-INF/native/(lib)?)(netty.+\.(so|jnilib|dll))$ - $1org_apache_pulsar_shade_$3 - true + org.yaml + org.apache.pulsar.shade.org.yaml - - + + + + + true + + - com.github.spotbugs spotbugs-maven-plugin diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index 241dd173ea9a4..c823b8408c9bd 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -184,8 +184,8 @@ - javax.ws.rs - javax.ws.rs-api + jakarta.ws.rs + jakarta.ws.rs-api diff --git a/pulsar-functions/proto/pom.xml b/pulsar-functions/proto/pom.xml index db87ff7ef2031..e9e9678d1b9b7 100644 --- a/pulsar-functions/proto/pom.xml +++ b/pulsar-functions/proto/pom.xml @@ -44,10 +44,10 @@ com.google.protobuf protobuf-java-util - + - javax.annotation - javax.annotation-api + jakarta.annotation + jakarta.annotation-api diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml index b04d25a5af08a..b2df8f224bc26 100644 --- a/pulsar-functions/runtime/pom.xml +++ b/pulsar-functions/runtime/pom.xml @@ -77,6 +77,10 @@ bcprov-jdk18on org.bouncycastle + + javax.annotation + javax.annotation-api + @@ -106,7 +110,7 @@ - + org.apache.maven.plugins diff --git a/pulsar-functions/secrets/pom.xml b/pulsar-functions/secrets/pom.xml index c7ab69ec612db..ac7d89a0a0c27 100644 --- a/pulsar-functions/secrets/pom.xml +++ b/pulsar-functions/secrets/pom.xml @@ -48,6 +48,10 @@ bcprov-jdk18on org.bouncycastle + + javax.annotation + javax.annotation-api + @@ -76,7 +80,7 @@ - + com.github.spotbugs spotbugs-maven-plugin diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index 73ed347d24fee..2cea6e0893005 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -134,8 +134,8 @@ - javax.xml.bind - jaxb-api + jakarta.xml.bind + jakarta.xml.bind-api diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml index 366ba9ae38cc9..8fa504227feb1 100644 --- a/tiered-storage/jcloud/pom.xml +++ b/tiered-storage/jcloud/pom.xml @@ -104,8 +104,8 @@ - javax.xml.bind - jaxb-api + jakarta.xml.bind + jakarta.xml.bind-api javax.activation From 280997e688722be7240e777d5ca3500362c689bf Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Fri, 29 Nov 2024 22:30:16 +0800 Subject: [PATCH 08/95] [fix][broker] support missing tenant level fine-granted permissions (#23660) --- .../pulsar/broker/admin/impl/TenantsBase.java | 48 +++++- .../TenantEndpointsAuthorizationTest.java | 160 ++++++++++++++++++ .../common/policies/data/TenantOperation.java | 6 + 3 files changed, 209 insertions(+), 5 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java index 0d1f79a09dc14..ff32e41977aaa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java @@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +63,7 @@ public class TenantsBase extends PulsarWebResource { @ApiResponse(code = 404, message = "Tenant doesn't exist")}) public void getTenants(@Suspended final AsyncResponse asyncResponse) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(null, TenantOperation.LIST_TENANTS) .thenCompose(__ -> tenantResources().listTenantsAsync()) .thenAccept(tenants -> { // deep copy the tenants to avoid concurrent sort exception @@ -84,7 +85,7 @@ public void getTenants(@Suspended final AsyncResponse asyncResponse) { public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(tenant, TenantOperation.GET_TENANT) .thenCompose(__ -> tenantResources().getTenantAsync(tenant)) .thenApply(tenantInfo -> { if (!tenantInfo.isPresent()) { @@ -121,7 +122,7 @@ public void createTenant(@Suspended final AsyncResponse asyncResponse, asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid")); return; } - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(tenant, TenantOperation.CREATE_TENANT) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> validateClustersAsync(tenantInfo)) .thenCompose(__ -> validateAdminRoleAsync(tenantInfo)) @@ -169,7 +170,7 @@ public void updateTenant(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant, @ApiParam(value = "TenantInfo") TenantInfoImpl newTenantAdmin) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(tenant, TenantOperation.UPDATE_TENANT) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> validateClustersAsync(newTenantAdmin)) .thenCompose(__ -> validateAdminRoleAsync(newTenantAdmin)) @@ -206,7 +207,7 @@ public void deleteTenant(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "The tenant name") String tenant, @QueryParam("force") @DefaultValue("false") boolean force) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(tenant, TenantOperation.DELETE_TENANT) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> internalDeleteTenant(tenant, force)) .thenAccept(__ -> { @@ -304,4 +305,41 @@ private CompletableFuture validateAdminRoleAsync(TenantInfoImpl info) { } return CompletableFuture.completedFuture(null); } + + private CompletableFuture validateBothSuperUserAndTenantOperation(String tenant, + TenantOperation operation) { + final var superUserValidationFuture = validateSuperUserAccessAsync(); + final var tenantOperationValidationFuture = validateTenantOperationAsync(tenant, operation); + return CompletableFuture.allOf(superUserValidationFuture, tenantOperationValidationFuture) + .handle((__, err) -> { + if (!superUserValidationFuture.isCompletedExceptionally() + || !tenantOperationValidationFuture.isCompletedExceptionally()) { + return true; + } + if (log.isDebugEnabled()) { + Throwable superUserValidationException = null; + try { + superUserValidationFuture.join(); + } catch (Throwable ex) { + superUserValidationException = FutureUtil.unwrapCompletionException(ex); + } + Throwable brokerOperationValidationException = null; + try { + tenantOperationValidationFuture.join(); + } catch (Throwable ex) { + brokerOperationValidationException = FutureUtil.unwrapCompletionException(ex); + } + log.debug("validateBothTenantOperationAndSuperUser failed." + + " originalPrincipal={} clientAppId={} operation={} " + + "superuserValidationError={} tenantOperationValidationError={}", + originalPrincipal(), clientAppId(), operation.toString(), + superUserValidationException, brokerOperationValidationException); + } + throw new RestException(Status.UNAUTHORIZED, + String.format("Unauthorized to validateBothTenantOperationAndSuperUser for" + + " originalPrincipal [%s] and clientAppId [%s] " + + "about operation [%s] ", + originalPrincipal(), clientAppId(), operation.toString())); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java new file mode 100644 index 0000000000000..2cf3ea374c33c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import lombok.SneakyThrows; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TenantOperation; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.Set; +import java.util.UUID; + +@Test(groups = "broker-admin") +public class TenantEndpointsAuthorizationTest extends MockedPulsarStandalone { + + private AuthorizationService orignalAuthorizationService; + private AuthorizationService spyAuthorizationService; + + private PulsarAdmin superUserAdmin; + private PulsarAdmin nobodyAdmin; + + @SneakyThrows + @BeforeClass(alwaysRun = true) + public void setup() { + configureTokenAuthentication(); + configureDefaultAuthorization(); + start(); + this.superUserAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + this.nobodyAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(NOBODY_TOKEN)) + .build(); + } + + @BeforeMethod(alwaysRun = true) + public void before() throws IllegalAccessException { + orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); + spyAuthorizationService = spy(orignalAuthorizationService); + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + spyAuthorizationService, true); + } + + @AfterMethod(alwaysRun = true) + public void after() throws IllegalAccessException { + if (orignalAuthorizationService != null) { + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", orignalAuthorizationService, true); + } + } + + @SneakyThrows + @AfterClass(alwaysRun = true) + public void cleanup() { + if (superUserAdmin != null) { + superUserAdmin.close(); + superUserAdmin = null; + } + spyAuthorizationService = null; + orignalAuthorizationService = null; + super.close(); + } + + @Test + public void testListTenants() throws PulsarAdminException { + superUserAdmin.tenants().getTenants(); + // test allow broker operation + verify(spyAuthorizationService) + .allowTenantOperationAsync(isNull(), Mockito.eq(TenantOperation.LIST_TENANTS), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.tenants().getTenants()); + } + + + @Test + public void testGetTenant() throws PulsarAdminException { + String tenantName = "public"; + superUserAdmin.tenants().getTenantInfo(tenantName); + // test allow broker operation + verify(spyAuthorizationService) + .allowTenantOperationAsync(eq(tenantName), Mockito.eq(TenantOperation.GET_TENANT), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.tenants().getTenantInfo(tenantName)); + } + + @Test + public void testUpdateTenant() throws PulsarAdminException { + String tenantName = "public"; + superUserAdmin.tenants().updateTenant(tenantName, TenantInfo.builder() + .allowedClusters(Set.of(getPulsarService().getConfiguration().getClusterName())) + .adminRoles(Set.of("example")).build()); + // test allow broker operation + verify(spyAuthorizationService) + .allowTenantOperationAsync(eq(tenantName), Mockito.eq(TenantOperation.UPDATE_TENANT), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.tenants() + .updateTenant(tenantName, TenantInfo.builder().adminRoles(Set.of("example")).build())); + } + + @Test + public void testDeleteTenant() throws PulsarAdminException { + String tenantName = UUID.randomUUID().toString(); + superUserAdmin.tenants().createTenant(tenantName, TenantInfo.builder() + .allowedClusters(Set.of(getPulsarService().getConfiguration().getClusterName())) + .adminRoles(Set.of("example")).build()); + + Mockito.clearInvocations(spyAuthorizationService); + superUserAdmin.tenants().deleteTenant(tenantName); + // test allow broker operation + verify(spyAuthorizationService) + .allowTenantOperationAsync(eq(tenantName), Mockito.eq(TenantOperation.DELETE_TENANT), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.tenants().deleteTenant(tenantName)); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java index 1c52f69006403..e0518e510f9dc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java @@ -25,4 +25,10 @@ public enum TenantOperation { CREATE_NAMESPACE, DELETE_NAMESPACE, LIST_NAMESPACES, + + LIST_TENANTS, + GET_TENANT, + CREATE_TENANT, + UPDATE_TENANT, + DELETE_TENANT, } From bf1f67742243f1b40c258c64e1fd6a960611a780 Mon Sep 17 00:00:00 2001 From: hanmz Date: Fri, 29 Nov 2024 22:56:03 +0800 Subject: [PATCH 09/95] [fix][client] Fix race-condition causing doReconsumeLater to hang when creating retryLetterProducer has failed (#23560) --- .../pulsar/client/api/RetryTopicTest.java | 68 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 2 + 2 files changed, 70 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 9cb82fde04118..cd598585c8e87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.Data; @@ -45,6 +46,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import org.testng.collections.Lists; @Test(groups = "broker-api") public class RetryTopicTest extends ProducerConsumerBase { @@ -713,4 +715,70 @@ public void testRetryProducerWillCloseByConsumer() throws Exception { admin.topics().delete(topicDLQ, false); } + + @Test(timeOut = 30000L) + public void testRetryTopicExceptionWithConcurrent() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic"; + final int maxRedeliveryCount = 2; + final int sendMessages = 10; + // subscribe before publish + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .receiverQueueSize(100) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliveryCount) + .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .build()) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + for (int i = 0; i < sendMessages; i++) { + producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send(); + } + producer.close(); + + // mock a retry producer exception when reconsumelater is called + MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; + List> consumers = multiTopicsConsumer.getConsumers(); + for (ConsumerImpl c : consumers) { + Set deadLetterPolicyField = + ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy")); + + if (deadLetterPolicyField.size() != 0) { + Field field = deadLetterPolicyField.iterator().next(); + field.setAccessible(true); + DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c); + deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#"); + } + } + + List> messages = Lists.newArrayList(); + for (int i = 0; i < sendMessages; i++) { + messages.add(consumer.receive()); + } + + // mock call the reconsumeLater method concurrently + CountDownLatch latch = new CountDownLatch(messages.size()); + for (Message message : messages) { + new Thread(() -> { + try { + consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); + } catch (Exception ignore) { + + } finally { + latch.countDown(); + } + }).start(); + } + + latch.await(); + consumer.close(); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 390a70095182f..d2753856264fc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -752,6 +752,8 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a } catch (Exception e) { result.completeExceptionally(e); } + } else { + result.completeExceptionally(new PulsarClientException("Retry letter producer is null.")); } MessageId finalMessageId = messageId; result.exceptionally(ex -> { From 7fc88d650202bca5d0462b41a89c5189f90a3859 Mon Sep 17 00:00:00 2001 From: Yike Xiao Date: Fri, 29 Nov 2024 23:03:55 +0800 Subject: [PATCH 10/95] [improve][client] Enhance error handling for non-exist subscription in consumer creation (#23254) --- .../apache/pulsar/client/api/MultiTopicsConsumerTest.java | 6 ++++-- .../org/apache/pulsar/client/api/PulsarClientException.java | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index 7a12acd47edf9..ea8eb6e8cc081 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -413,8 +413,9 @@ public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClient .isAckReceiptEnabled(true) .subscribe(); assertTrue(singleTopicConsumer instanceof ConsumerImpl); + } catch (PulsarClientException.SubscriptionNotFoundException ignore) { } catch (Throwable t) { - assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException); + fail("Should throw PulsarClientException.SubscriptionNotFoundException instead"); } try { @@ -424,8 +425,9 @@ public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClient .isAckReceiptEnabled(true) .subscribe(); assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl); + } catch (PulsarClientException.SubscriptionNotFoundException ignore) { } catch (Throwable t) { - assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException); + fail("Should throw PulsarClientException.SubscriptionNotFoundException instead"); } pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 9eb6c612a52a2..b2c9b2b697b42 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -1113,6 +1113,8 @@ public static PulsarClientException unwrap(Throwable t) { newException = new TransactionConflictException(msg); } else if (cause instanceof TopicDoesNotExistException) { newException = new TopicDoesNotExistException(msg); + } else if (cause instanceof SubscriptionNotFoundException) { + newException = new SubscriptionNotFoundException(msg); } else if (cause instanceof ProducerFencedException) { newException = new ProducerFencedException(msg); } else if (cause instanceof MemoryBufferIsFullError) { From 46037229947c8031207eaed70dd937e6990de544 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sat, 30 Nov 2024 01:17:16 +0800 Subject: [PATCH 11/95] [feat][broker] Implement allowBrokerOperationAsync in PulsarAuthorizationProvider to avoid exception thrown (#23663) --- .../broker/authorization/PulsarAuthorizationProvider.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 0af63724cc812..50783c4d1338b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -40,6 +40,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AuthPolicies; +import org.apache.pulsar.common.policies.data.BrokerOperation; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; @@ -690,6 +691,13 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, }); } + @Override + public CompletableFuture allowBrokerOperationAsync(String clusterName, String brokerId, + BrokerOperation brokerOperation, String role, + AuthenticationDataSource authData) { + return isSuperUser(role, authData, conf); + } + @Override public CompletableFuture allowTopicPolicyOperationAsync(TopicName topicName, String role, PolicyName policyName, From a61b4267a6e085a2d103b510de09d0a7b71cb3f3 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 21 Oct 2024 14:11:34 +0300 Subject: [PATCH 12/95] Add solution to PulsarMockBookKeeper for intercepting reads --- .../client/PulsarMockBookKeeper.java | 8 +++- .../client/PulsarMockLedgerHandle.java | 2 +- .../client/PulsarMockReadHandle.java | 30 +++++++++------ .../PulsarMockReadHandleInterceptor.java | 37 +++++++++++++++++++ 4 files changed, 64 insertions(+), 13 deletions(-) create mode 100644 testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 11ec2dec938a8..b9e9042500eff 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -38,6 +38,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -93,6 +95,9 @@ public static Collection getMockEnsemble() { final Queue addEntryResponseDelaysMillis = new ConcurrentLinkedQueue<>(); final List> failures = new ArrayList<>(); final List> addEntryFailures = new ArrayList<>(); + @Setter + @Getter + private volatile PulsarMockReadHandleInterceptor readHandleInterceptor; public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception { this.orderedExecutor = orderedExecutor; @@ -246,7 +251,8 @@ public CompletableFuture execute() { return FutureUtils.exception(new BKException.BKUnauthorizedAccessException()); } else { return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId, - lh.getLedgerMetadata(), lh.entries)); + lh.getLedgerMetadata(), lh.entries, + PulsarMockBookKeeper.this::getReadHandleInterceptor)); } }); } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index aa61e541d0d6b..d30684e604670 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -73,7 +73,7 @@ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, this.digest = digest; this.passwd = Arrays.copyOf(passwd, passwd.length); - readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries); + readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, bk::getReadHandleInterceptor); } @Override diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index a4361f62254e4..82a48472bbb46 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -40,28 +41,35 @@ class PulsarMockReadHandle implements ReadHandle { private final long ledgerId; private final LedgerMetadata metadata; private final List entries; + private final Supplier readHandleInterceptorSupplier; PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, - List entries) { + List entries, + Supplier readHandleInterceptorSupplier) { this.bk = bk; this.ledgerId = ledgerId; this.metadata = metadata; this.entries = entries; + this.readHandleInterceptorSupplier = readHandleInterceptorSupplier; } @Override public CompletableFuture readAsync(long firstEntry, long lastEntry) { return bk.getProgrammedFailure().thenComposeAsync((res) -> { - log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); - List seq = new ArrayList<>(); - long entryId = firstEntry; - while (entryId <= lastEntry && entryId < entries.size()) { - seq.add(entries.get((int) entryId++).duplicate()); - } - log.debug("Entries read: {}", seq); - - return FutureUtils.value(LedgerEntriesImpl.create(seq)); - }); + log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); + List seq = new ArrayList<>(); + long entryId = firstEntry; + while (entryId <= lastEntry && entryId < entries.size()) { + seq.add(entries.get((int) entryId++).duplicate()); + } + log.debug("Entries read: {}", seq); + LedgerEntriesImpl ledgerEntries = LedgerEntriesImpl.create(seq); + PulsarMockReadHandleInterceptor pulsarMockReadHandleInterceptor = readHandleInterceptorSupplier.get(); + if (pulsarMockReadHandleInterceptor != null) { + return pulsarMockReadHandleInterceptor.interceptReadAsync(firstEntry, lastEntry, ledgerEntries); + } + return FutureUtils.value(ledgerEntries); + }); } @Override diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java new file mode 100644 index 0000000000000..0886d1a15951c --- /dev/null +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.api.LedgerEntries; + +/** + * Interceptor interface for intercepting read handle readAsync operations. + * This is useful for testing purposes, for example for introducing delays. + */ +public interface PulsarMockReadHandleInterceptor { + /** + * Intercepts the readAsync operation on a read handle. + * @param firstEntry first entry to read + * @param lastEntry last entry to read + * @param entries entries that would be returned by the read operation + * @return CompletableFuture that will complete with the entries to return + */ + CompletableFuture interceptReadAsync(long firstEntry, long lastEntry, LedgerEntries entries); +} From a2de406a7df10582574d7a07bd4dc446fa5f7bbf Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 21 Oct 2024 14:12:25 +0300 Subject: [PATCH 13/95] Add validation to getManagedLedgerMaxReadsInFlightSizeInMB --- .../pulsar/broker/ManagedLedgerClientFactory.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 737bc69bf24df..79611e17b1013 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -72,8 +72,18 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis( conf.getManagedLedgerCacheEvictionTimeThresholdMillis()); managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries()); + long managedLedgerMaxReadsInFlightSizeBytes = conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L * 1024L; + if (managedLedgerMaxReadsInFlightSizeBytes > 0 && conf.getDispatcherMaxReadSizeBytes() > 0 + && managedLedgerMaxReadsInFlightSizeBytes < conf.getDispatcherMaxReadSizeBytes()) { + log.warn("Invalid configuration for managedLedgerMaxReadsInFlightSizeInMB: {}, " + + "dispatcherMaxReadSizeBytes: {}. managedLedgerMaxReadsInFlightSizeInMB in bytes should " + + "be greater than dispatcherMaxReadSizeBytes. You should set " + + "managedLedgerMaxReadsInFlightSizeInMB to at least {}", + conf.getManagedLedgerMaxReadsInFlightSizeInMB(), conf.getDispatcherMaxReadSizeBytes(), + (conf.getDispatcherMaxReadSizeBytes() / (1024L * 1024L)) + 1); + } managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize( - conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L * 1024L); + managedLedgerMaxReadsInFlightSizeBytes); managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds( conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds()); managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution()); From 29c89524f4878aa18c61be1f0dbfd688d81eb789 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 21 Oct 2024 14:13:00 +0300 Subject: [PATCH 14/95] Don't divide by 0 if it can be avoided --- .../mledger/impl/ManagedCursorImpl.java | 18 +++++++++--------- .../bookkeeper/mledger/util/StatsBuckets.java | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 478c6a1b37976..f48eb4ab8db7c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3720,15 +3720,15 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { } double avgEntrySize = ledger.getStats().getEntrySizeAverage(); - if (!Double.isFinite(avgEntrySize)) { - // We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats - avgEntrySize = (double) entriesReadSize / (double) entriesReadCount; - } - - if (!Double.isFinite(avgEntrySize)) { - // If we still don't have any information, it means this is the first time we attempt reading - // and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats - return 1; + if (Double.isInfinite(avgEntrySize)) { + if (entriesReadCount != 0) { + // We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats + avgEntrySize = (double) entriesReadSize / (double) entriesReadCount; + } else { + // If we still don't have any information, it means this is the first time we attempt reading + // and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats + return 1; + } } int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java index 60c0a7f6c9d22..49cf79bb70ae4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/StatsBuckets.java @@ -97,7 +97,7 @@ public long getSum() { } public double getAvg() { - return sum / (double) count; + return count != 0 ? sum / (double) count : Double.POSITIVE_INFINITY; } public void addAll(StatsBuckets other) { From cd2427d85f07c1afde74783690035ae2ba24a037 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 21 Oct 2024 14:14:20 +0300 Subject: [PATCH 15/95] Estimate entry size for InflightReadsLimiter by keeping stats --- .../impl/cache/RangeEntryCacheImpl.java | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index cb006a5f0cea9..865cc1dff88e5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -58,6 +59,7 @@ public class RangeEntryCacheImpl implements EntryCache { * Overhead per-entry to take into account the envelope. */ private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64; + private static final int DEFAULT_ESTIMATED_ENTRY_SIZE = 10 * 1024; private final RangeEntryCacheManagerImpl manager; final ManagedLedgerImpl ml; @@ -66,12 +68,13 @@ public class RangeEntryCacheImpl implements EntryCache { private final boolean copyEntries; private final PendingReadsManager pendingReadsManager; - private volatile long estimatedEntrySize = 10 * 1024; - private final long readEntryTimeoutMillis; private static final double MB = 1024 * 1024; + private final LongAdder totalAddedEntriesSize = new LongAdder(); + private final LongAdder totalAddedEntriesCount = new LongAdder(); + public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) { this.manager = manager; this.ml = ml; @@ -150,6 +153,8 @@ public boolean insert(EntryImpl entry) { EntryImpl cacheEntry = EntryImpl.create(position, cachedData); cachedData.release(); if (entries.put(position, cacheEntry)) { + totalAddedEntriesSize.add(entry.getLength()); + totalAddedEntriesCount.increment(); manager.entryAdded(entry.getLength()); return true; } else { @@ -360,8 +365,14 @@ private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle l if (pendingReadsLimiter.isDisabled()) { return originalCallback; } - long estimatedReadSize = (1 + lastEntry - firstEntry) - * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + + long estimatedEntrySize = getEstimatedEntrySize(); + long numberOfEntries = 1 + lastEntry - firstEntry; + long estimatedReadSize = numberOfEntries * estimatedEntrySize; + if (log.isDebugEnabled()) { + log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size", estimatedReadSize, + numberOfEntries, estimatedEntrySize); + } final AsyncCallbacks.ReadEntriesCallback callback; InflightReadsLimiter.Handle newHandle = pendingReadsLimiter.acquire(estimatedReadSize, handle); if (!newHandle.success) { @@ -372,7 +383,7 @@ private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle l + lh.getId() + ", " + getName() + ", estimated read size " + estimatedReadSize + " bytes" - + " for " + (1 + lastEntry - firstEntry) + + " for " + numberOfEntries + " entries (check managedLedgerMaxReadsInFlightSizeInMB)"; log.error(message); pendingReadsLimiter.release(newHandle); @@ -391,9 +402,6 @@ private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle l @Override public void readEntriesComplete(List entries, Object ctx) { if (!entries.isEmpty()) { - long size = entries.get(0).getLength(); - estimatedEntrySize = size; - AtomicInteger remainingCount = new AtomicInteger(entries.size()); for (Entry entry : entries) { ((EntryImpl) entry).onDeallocate(() -> { @@ -418,6 +426,20 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { return callback; } + private long getEstimatedEntrySize() { + long estimatedEntrySize = getAvgEntrySize(); + if (estimatedEntrySize == 0) { + estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE; + } + return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY; + } + + private long getAvgEntrySize() { + long totalAddedEntriesCount = this.totalAddedEntriesCount.sum(); + long totalAddedEntriesSize = this.totalAddedEntriesSize.sum(); + return totalAddedEntriesCount != 0 ? totalAddedEntriesSize / totalAddedEntriesCount : 0; + } + /** * Reads the entries from Storage. * @param lh the handle From 6b63b32dbabf23472d1aec2dde07b2f56498e2e5 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 21 Oct 2024 14:15:45 +0300 Subject: [PATCH 16/95] Limit replay messages by bytes size --- .../PersistentDispatcherMultipleConsumers.java | 11 ++++++++--- ...stentStickyKeyDispatcherMultipleConsumersTest.java | 4 ++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index b1cd186c31784..81f1a794bcfc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -384,7 +384,7 @@ public synchronized void readMoreEntries() { } Set messagesToReplayNow = - canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet(); + canReplayMessages() ? getMessagesToReplayNow(messagesToRead, bytesToRead) : Collections.emptySet(); if (!messagesToReplayNow.isEmpty()) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, @@ -1344,7 +1344,7 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata } } - protected synchronized NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { + protected synchronized NavigableSet getMessagesToReplayNow(int maxMessagesToRead, long bytesToRead) { if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); NavigableSet messagesAvailableNow = @@ -1352,7 +1352,12 @@ protected synchronized NavigableSet getMessagesToReplayNow(int maxMess messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId())); } if (!redeliveryMessages.isEmpty()) { - return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead, createFilterForReplay()); + int cappedMaxMessagesToRead = cursor.applyMaxSizeCap(maxMessagesToRead, bytesToRead); + if (cappedMaxMessagesToRead < maxMessagesToRead && log.isDebugEnabled()) { + log.debug("[{}] Capped max messages to read from redelivery list to {} (max was {})", + name, cappedMaxMessagesToRead, maxMessagesToRead); + } + return redeliveryMessages.getMessagesToReplayNow(cappedMaxMessagesToRead, createFilterForReplay()); } else { return Collections.emptyNavigableSet(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 7234f0caefc63..aec161f237be0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -191,6 +191,10 @@ public void setup() throws Exception { doReturn(null).when(cursorMock).getLastIndividualDeletedRange(); doReturn(subscriptionName).when(cursorMock).getName(); doReturn(ledgerMock).when(cursorMock).getManagedLedger(); + doAnswer(invocation -> { + int max = invocation.getArgument(0); + return max; + }).when(cursorMock).applyMaxSizeCap(anyInt(), anyLong()); consumerMock = createMockConsumer(); channelMock = mock(ChannelPromise.class); From 25f1c318f4b9a5c2c20905cb8945d7b5994c4858 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 21 Oct 2024 14:15:58 +0300 Subject: [PATCH 17/95] Test limiting by bytes size --- ...istentDispatcherMultipleConsumersTest.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java index 772b1843d2894..f36db3765c6c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -19,23 +19,32 @@ package org.apache.pulsar.broker.service.persistent; import com.carrotsearch.hppc.ObjectSet; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; +import org.jetbrains.annotations.NotNull; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -53,6 +62,13 @@ protected void setup() throws Exception { super.producerBaseSetup(); } + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setManagedLedgerMaxReadsInFlightSizeInMB(10); + conf.setManagedLedgerCacheSizeMB(1); + } + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { @@ -169,4 +185,61 @@ public void testSkipReadEntriesFromCloseCursor() throws Exception { // Verify: the topic can be deleted successfully. admin.topics().delete(topicName, false); } + + @Test(timeOut = 30 * 1000) + public void testManagedLedgerMaxReadsInFlightSizeInMBForRedeliveries() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName( + "persistent://public/default/testManagedLedgerMaxReadsInFlightSizeInMBForRedeliveries"); + final String subscription = "sub"; + + // Create two consumers on a shared subscription + @Cleanup + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared) + .receiverQueueSize(10000) + .subscribe(); + + @Cleanup + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared) + .startPaused(true) + .receiverQueueSize(10000) + .subscribe(); + + // Produce about 20MB of messages + @Cleanup + Producer producer = + pulsarClient.newProducer().enableBatching(false).topic(topicName).create(); + int numberOfMessages = 200; + byte[] payload = RandomUtils.nextBytes(1025 * 1024); // 1025kB + for (int i = 0; i < numberOfMessages; i++) { + producer.send(payload); + } + + // Consume messages with consumer1 but don't ack + for (int i = 0; i < numberOfMessages; i++) { + consumer1.receive(); + } + + // Close consumer1 and resume consumer2 + consumer1.close(); + + Executor executor = CompletableFuture.delayedExecutor(4, TimeUnit.SECONDS); + pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor((firstEntry, lastEntry, entries) -> { + return CompletableFuture.supplyAsync(() -> entries, executor); + }); + + consumer2.resume(); + + // Verify that consumer2 can receive the messages + for (int i = 0; i < 100; i++) { + Message msg = consumer2.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(msg, "Consumer2 should receive the message"); + consumer2.acknowledge(msg); + } + } } From ddc9fa97221979df592e8e873f220e11b8cc63b3 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 21 Oct 2024 15:25:28 +0300 Subject: [PATCH 18/95] Test --- .../persistent/PersistentDispatcherMultipleConsumers.java | 2 +- pulsar-broker/src/test/resources/log4j2.xml | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 81f1a794bcfc7..ee28d947b0d72 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1352,7 +1352,7 @@ protected synchronized NavigableSet getMessagesToReplayNow(int maxMess messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId())); } if (!redeliveryMessages.isEmpty()) { - int cappedMaxMessagesToRead = cursor.applyMaxSizeCap(maxMessagesToRead, bytesToRead); + int cappedMaxMessagesToRead = maxMessagesToRead; // cursor.applyMaxSizeCap(maxMessagesToRead, bytesToRead); if (cappedMaxMessagesToRead < maxMessagesToRead && log.isDebugEnabled()) { log.debug("[{}] Capped max messages to read from redelivery list to {} (max was {})", name, cappedMaxMessagesToRead, maxMessagesToRead); diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml index a0732096f2845..3cf7c502ac0f9 100644 --- a/pulsar-broker/src/test/resources/log4j2.xml +++ b/pulsar-broker/src/test/resources/log4j2.xml @@ -36,6 +36,9 @@ + + +