From dd41ffbf212e16774be3bd0efbc2f3755fdd128e Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Thu, 4 Jun 2026 18:35:10 -0400 Subject: [PATCH] Send advisory messages using Broker connection context (#2071) This updates the AdvisoryBroker to always publish advisory messages that were generated by other events to use the Broker's own ConnectionContext. Before this change the AdvisoryBroker was using the original ConnectionContext that used used for the action that triggered the advisory. This doesn't make sense because its actually the broker itself firing the advisory message and not the original connection. It also meant requiring all users to be given access to create new advisory topics that could be created on demand. After this update, all users no longer need permission to create advisory destinations which was required previously. Users only need read access to the temporary destination advisories for the AMQ client as the broker itself will now use its own context going forward to create all the destinations on demand and for publishing. This update also consolidates the on consumer with no messages advisory into the Advisory broker so it is all managed in one location. (cherry picked from commit 3598fc562337c467b3f6b59364b29ac2d9bdb068) --- .../activemq/advisory/AdvisoryBroker.java | 134 ++++++++++++++---- .../org/apache/activemq/broker/Broker.java | 8 ++ .../apache/activemq/broker/BrokerFilter.java | 5 + .../apache/activemq/broker/BrokerService.java | 16 ++- .../apache/activemq/broker/EmptyBroker.java | 5 + .../apache/activemq/broker/ErrorBroker.java | 5 + .../broker/region/BaseDestination.java | 50 +------ .../broker/util/LoggingBrokerPlugin.java | 9 ++ .../DemandForwardingBridgeSupport.java | 12 +- .../transport/mqtt/MQTTAuthTestSupport.java | 12 +- .../transport/stomp/StompTestSupport.java | 12 +- .../activemq/advisory/AdvisoryTests.java | 36 ++++- .../activemq/broker/policy/SecureDLQTest.java | 4 +- .../security/DestinationAdminAuthzTest.java | 9 +- .../SimpleSecurityBrokerSystemTest.java | 4 +- .../jaas-broker-guest-no-creds-only.xml | 9 +- .../activemq/security/jaas-broker-guest.xml | 9 +- .../apache/activemq/security/jaas-broker.xml | 11 +- assembly/src/release/conf/activemq.xml | 27 ++-- 19 files changed, 256 insertions(+), 121 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 38cd49bab07..5eda70dc24b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -23,9 +23,11 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.broker.Broker; @@ -61,7 +63,6 @@ import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.SessionId; import org.apache.activemq.filter.DestinationPath; -import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; import org.apache.activemq.util.IdGenerator; @@ -79,6 +80,7 @@ public class AdvisoryBroker extends BrokerFilter { private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class); private static final IdGenerator ID_GENERATOR = new IdGenerator(); + protected final AtomicReference advisoryConnectionContext = new AtomicReference<>(); protected final ConcurrentMap connections = new ConcurrentHashMap(); private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); @@ -107,13 +109,40 @@ public class AdvisoryBroker extends BrokerFilter { private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); - private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher(); + private final VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher(); public AdvisoryBroker(Broker next) { super(next); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); } + @Override + public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { + super.setAdminConnectionContext(adminConnectionContext); + // Create a copy of the adminConnection context and set flow control false + // This will be used to publish all advisories. This will be called + // during broker construction and before the first advisories are sent. + ConnectionContext connectionContext = adminConnectionContext.copy(); + connectionContext.setProducerFlowControl(false); + this.advisoryConnectionContext.set(connectionContext); + } + + @Override + public void start() throws Exception { + super.start(); + // Sanity check to make sure we setAdminConnectionContext() was called and + // we initialized the admin context + if (advisoryConnectionContext.get() == null) { + throw new IllegalArgumentException("AdminConnectionContext was not initialized"); + } + } + + @Override + public void stop() throws Exception { + super.stop(); + this.advisoryConnectionContext.set(null); + } + @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { super.addConnection(context, info); @@ -538,6 +567,46 @@ public void messageDiscarded(ConnectionContext context, Subscription sub, Messag } } + @Override + public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) { + super.messageNoConsumers(context, messageReference); + try { + if (!messageReference.isAdvisory()) { + // allow messages with no consumers to be dispatched to a dead + // letter queue + BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); + ActiveMQDestination destination = baseDestination.getActiveMQDestination(); + if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { + + Message message = messageReference.getMessage().copy(); + // The original destination and transaction id do not get + // filled when the message is first sent, + // it is only populated if the message is routed to another + // destination like the DLQ + if (message.getOriginalDestination() != null) { + message.setOriginalDestination(message.getDestination()); + } + if (message.getOriginalTransactionId() != null) { + message.setOriginalTransactionId(message.getTransactionId()); + } + + ActiveMQTopic advisoryTopic; + if (destination.isQueue()) { + advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); + } else { + advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); + } + message.setDestination(advisoryTopic); + message.setTransactionId(null); + + context.getBroker().send(newAdvisoryProducerExchange(), message); + } + } + } catch (Exception e) { + handleFireFailure("discarded", e); + } + } + @Override public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { super.slowConsumer(context, destination, subs); @@ -775,10 +844,7 @@ public void nowMasterBroker() { try { ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic(); ActiveMQMessage advisoryMessage = new ActiveMQMessage(); - ConnectionContext context = new ConnectionContext(); - context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); - context.setBroker(getBrokerService().getBroker()); - fireAdvisory(context, topic, null, null, advisoryMessage); + fireAdvisory(topic, null, advisoryMessage); } catch (Exception e) { handleFireFailure("now master broker", e); } @@ -817,12 +883,7 @@ public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, advisoryMessage.setStringProperty("remoteIp", remoteIp); networkBridges.putIfAbsent(brokerInfo, advisoryMessage); - ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); - - ConnectionContext context = new ConnectionContext(); - context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); - context.setBroker(getBrokerService().getBroker()); - fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); + fireAdvisory(AdvisorySupport.getNetworkBridgeAdvisoryTopic(), brokerInfo, advisoryMessage); } } catch (Exception e) { handleFireFailure("network bridge started", e); @@ -837,12 +898,7 @@ public void networkBridgeStopped(BrokerInfo brokerInfo) { advisoryMessage.setBooleanProperty("started", false); networkBridges.remove(brokerInfo); - ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); - - ConnectionContext context = new ConnectionContext(); - context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); - context.setBroker(getBrokerService().getBroker()); - fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); + fireAdvisory(AdvisorySupport.getNetworkBridgeAdvisoryTopic(), brokerInfo, advisoryMessage); } } catch (Exception e) { handleFireFailure("network bridge stopped", e); @@ -899,7 +955,19 @@ protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestinati fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } - public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { + public void fireFailedForwardAdvisory(Message message, Throwable error) throws Exception { + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); + + fireAdvisory(AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), message, advisoryMessage); + } + + private void fireAdvisory(ActiveMQTopic topic, Command command, ActiveMQMessage advisoryMessage) throws Exception { + fireAdvisory(advisoryConnectionContext.get(), topic, command, null, advisoryMessage); + } + + private void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, + ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { //set properties advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; @@ -925,17 +993,11 @@ public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command advisoryMessage.setDestination(topic); advisoryMessage.setResponseRequired(false); advisoryMessage.setProducerId(advisoryProducerId); - boolean originalFlowControl = context.isProducerFlowControl(); - final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); - producerExchange.setConnectionContext(context); - producerExchange.setMutable(true); - producerExchange.setProducerState(new ProducerState(new ProducerInfo())); - try { - context.setProducerFlowControl(false); - next.send(producerExchange, advisoryMessage); - } finally { - context.setProducerFlowControl(originalFlowControl); - } + + // The advisory messages are generated the broker itself, so this send will + // publish the advisory message using the Broker ConnectionContext so there will + // be admin permissions granted. + next.send(newAdvisoryProducerExchange(), advisoryMessage); } public Map getAdvisoryConnections() { @@ -963,7 +1025,7 @@ public ConcurrentMap getVirtualDestinationCons return virtualDestinationConsumers; } - private class VirtualConsumerPair { + protected class VirtualConsumerPair { private final VirtualDestination virtualDestination; //destination that matches this virtualDestination as part target @@ -1028,4 +1090,14 @@ private AdvisoryBroker getOuterType() { return AdvisoryBroker.this; } } + + protected ProducerBrokerExchange newAdvisoryProducerExchange() { + final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + producerExchange.setConnectionContext(Objects.requireNonNull(advisoryConnectionContext.get(), + "Advisory ConnectionContext must not be null")); + producerExchange.setMutable(true); + producerExchange.setProducerState(new ProducerState(new ProducerInfo())); + return producerExchange; + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java index b96167c5efa..0462b76cfa5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java @@ -370,6 +370,14 @@ public interface Broker extends Region, Service { */ void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference); + /** + * Called when a message is processed with no consumers + * + * @param context connection context + * @param messageReference message reference + */ + void messageNoConsumers(ConnectionContext context, MessageReference messageReference); + /** * Called when there is a slow consumer * @param context connection context diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java index b9374e352de..c0c5d05d632 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -361,6 +361,11 @@ public void messageDiscarded(ConnectionContext context,Subscription sub, Message getNext().messageDiscarded(context, sub, messageReference); } + @Override + public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) { + getNext().messageNoConsumers(context, messageReference); + } + @Override public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { getNext().slowConsumer(context, destination,subs); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 99404db5dcc..4cb353ba3c1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -733,8 +733,18 @@ public void run() { } } + // Ensure the broker chain is fully initialized and we create the admin connection. + // The admin connection is needed to create destinations and for the AdvisoryBroker + // before broker startup. Creating the connection will also call the + // setAdminConnectionContext() callback on the Broker chain. This ensures initialization + // is done correctly even if someone overrides getAdminConnectionContext(); + private void initializeAdminConnection() throws Exception { + BrokerSupport.getConnectionContext(getBroker()); + } + private void doStartBroker() throws Exception { checkStartException(); + initializeAdminConnection(); startDestinations(); addShutdownHook(); @@ -2447,7 +2457,7 @@ protected DestinationInterceptor[] createDefaultDestinationInterceptor() { protected Broker addInterceptors(Broker broker) throws Exception { if (isAdvisorySupport()) { // AMQ-9187 - the AdvisoryBroker must be after the SchedulerBroker - broker = new AdvisoryBroker(broker); + broker = createAdvisoryBroker(broker); } if (isSchedulerSupport()) { SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); @@ -2515,6 +2525,10 @@ protected PersistenceAdapter createPersistenceAdapter() throws IOException { } } + protected AdvisoryBroker createAdvisoryBroker(Broker broker) { + return new AdvisoryBroker(broker); + } + protected ObjectName createBrokerObjectName() throws MalformedObjectNameException { return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName()); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 4872a5a0fa5..7bd65fcc716 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -317,6 +317,11 @@ public void messageDispatched(ConnectionContext context, Subscription sub, Messa public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { } + @Override + public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) { + + } + @Override public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) { } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 8c138e39457..ccfbc9daa86 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -358,6 +358,11 @@ public void messageDiscarded(ConnectionContext context, Subscription sub, Messag throw new BrokerStoppedException(this.message); } + @Override + public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) { + throw new BrokerStoppedException(this.message); + } + @Override public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { throw new BrokerStoppedException(this.message); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 82ed8784a26..6aaa317fe42 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -22,7 +22,6 @@ import jakarta.jms.ResourceAllocationException; -import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -643,50 +642,11 @@ public boolean isDisposed() { * Provides a hook to allow messages with no consumer to be processed in * some way - such as to send to a dead letter queue or something.. */ - protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { - if (!msg.isPersistent()) { - if (isSendAdvisoryIfNoConsumers()) { - // allow messages with no consumers to be dispatched to a dead - // letter queue - if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { - - Message message = msg.copy(); - // The original destination and transaction id do not get - // filled when the message is first sent, - // it is only populated if the message is routed to another - // destination like the DLQ - if (message.getOriginalDestination() != null) { - message.setOriginalDestination(message.getDestination()); - } - if (message.getOriginalTransactionId() != null) { - message.setOriginalTransactionId(message.getTransactionId()); - } - - ActiveMQTopic advisoryTopic; - if (destination.isQueue()) { - advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); - } else { - advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); - } - message.setDestination(advisoryTopic); - message.setTransactionId(null); - - // Disable flow control for this since since we don't want - // to block. - boolean originalFlowControl = context.isProducerFlowControl(); - try { - context.setProducerFlowControl(false); - ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); - producerExchange.setMutable(false); - producerExchange.setConnectionContext(context); - producerExchange.setProducerState(new ProducerState(new ProducerInfo())); - context.getBroker().send(producerExchange, message); - } finally { - context.setProducerFlowControl(originalFlowControl); - } - - } - } + protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) { + if (!msg.isPersistent() && isSendAdvisoryIfNoConsumers()) { + // allow messages with no consumers to be dispatched to a dead + // letter queue + broker.messageNoConsumers(context, msg); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java index 798f5644cba..72c6f352d27 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java @@ -559,6 +559,15 @@ public void messageDiscarded(ConnectionContext context, Subscription sub, Messag super.messageDiscarded(context, sub, messageReference); } + @Override + public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) { + if (isLogAll() || isLogInternalEvents()) { + String msg = messageReference.getMessage().toString(); + LOG.info("Message without consumers: {}", msg); + } + super.messageNoConsumers(context, messageReference); + } + @Override public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 8d16445fb96..26a5769ed0d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -1152,18 +1152,10 @@ private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwabl advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); if (advisoryBroker != null) { - ConnectionContext context = new ConnectionContext(); - context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); - context.setBroker(brokerService.getBroker()); - - ActiveMQMessage advisoryMessage = new ActiveMQMessage(); - advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); - advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null, - advisoryMessage); - + advisoryBroker.fireFailedForwardAdvisory(messageDispatch.getMessage(), error); } } catch (Exception e) { - LOG.warn("failed to fire forward failure advisory, cause: {}", e); + LOG.warn("failed to fire forward failure advisory, cause: {}", e.getMessage()); LOG.debug("detail", e); } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java index 6e327bfe53e..11de2984198 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java @@ -103,10 +103,16 @@ protected BrokerPlugin configureAuthorization() throws Exception { entry.setAdmin("guests,users,anonymous"); authorizationEntries.add(entry); entry = new AuthorizationEntry(); - entry.setTopic("ActiveMQ.Advisory.>"); + entry.setTopic("ActiveMQ.Advisory.TempQueue"); entry.setRead("guests,users,anonymous"); - entry.setWrite("guests,users,anonymous"); - entry.setAdmin("guests,users,anonymous"); + entry.setWrite("admins"); + entry.setAdmin("admins"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("ActiveMQ.Advisory.TempTopic"); + entry.setRead("guests,users,anonymous"); + entry.setWrite("admins"); + entry.setAdmin("admins"); authorizationEntries.add(entry); TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java index ecdeae58331..e155599d2db 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java @@ -292,10 +292,16 @@ protected BrokerPlugin configureAuthorization() throws Exception { entry.setAdmin("guests,users"); authorizationEntries.add(entry); entry = new AuthorizationEntry(); - entry.setTopic("ActiveMQ.Advisory.>"); + entry.setTopic("ActiveMQ.Advisory.TempQueue"); entry.setRead("guests,users"); - entry.setWrite("guests,users"); - entry.setAdmin("guests,users"); + entry.setWrite("admins"); + entry.setAdmin("admins"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("ActiveMQ.Advisory.TempTopic"); + entry.setRead("guests,users"); + entry.setWrite("admins"); + entry.setAdmin("admins"); authorizationEntries.add(entry); TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index 8b78f6849fb..40d09c35e79 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -19,12 +19,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.Collection; import java.util.Enumeration; import java.util.HashSet; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import jakarta.jms.BytesMessage; import jakarta.jms.Connection; @@ -45,6 +47,8 @@ import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -53,6 +57,7 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.security.SecurityContext; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -687,7 +692,35 @@ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { } protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); + BrokerService answer = new BrokerService() { + // Wrap the broker used by the Advisory broker so we can intercept the send() + // calls and verify any messages published to advisory topics by the advisory broker + // use the right context. The AdvisoryBroker delegates to the "next" broker in the + // chain when sending generated advisories. + @Override + protected AdvisoryBroker createAdvisoryBroker(Broker broker) { + return new AdvisoryBroker(new BrokerFilter(broker) { + // track first connection context used for advisories + private final AtomicReference first = new AtomicReference<>(); + + @Override + public void send(ProducerBrokerExchange producerExchange, + org.apache.activemq.command.Message messageSend) throws Exception { + super.send(producerExchange, messageSend); + // Verify all advisory topic publishes use the admin context + // This filter is only used by the advisory broker so all published + // advisories should be the broker security context + if (AdvisorySupport.isAdvisoryTopic(messageSend.getDestination())) { + first.compareAndSet(null, producerExchange.getConnectionContext()); + assertEquals(SecurityContext.BROKER_SECURITY_CONTEXT, + producerExchange.getConnectionContext().getSecurityContext()); + // ConnectionContext is reused for each message (but producer exchange is not) + assertSame(first.get(), producerExchange.getConnectionContext()); + } + } + }); + } + }; configureBroker(answer); answer.start(); return answer; @@ -742,6 +775,7 @@ public void preProcessDispatch(MessageDispatch messageDispatch) { super.preProcessDispatch(messageDispatch); } + }; } } }); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java index 2ef59938e4b..5a83787a7b9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java @@ -43,8 +43,8 @@ public static AuthorizationMap createAuthorizationMap() { writeAccess.put(new ActiveMQQueue("TEST"), USERS); writeAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS); - readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); - writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), WILDCARD); + readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), WILDCARD); DestinationMap adminAccess = new DefaultAuthorizationMap(); adminAccess.put(new ActiveMQQueue("TEST"), ADMINS); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java index a06861fa0c0..08366af042d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java @@ -269,9 +269,12 @@ public static AuthorizationMap createAuthorizationMap() { adminAccess.put(new ActiveMQQueue("app1.>"), APP1GROUP); adminAccess.put(new ActiveMQQueue("app2.>"), APP2GROUP); - readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); - writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); - adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), WILDCARD); + writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), ADMINS); + adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), ADMINS); + readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), WILDCARD); + writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), ADMINS); + adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), ADMINS); var authorizationMap = new SimpleAuthorizationMap(writeAccess, readAccess, adminAccess); var tempDestinationAuthorizationEntry = new TempDestinationAuthorizationEntry(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java index fc9dc800732..fe237c7940e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java @@ -130,8 +130,8 @@ public static AuthorizationMap createAuthorizationMap() { writeAccess.put(new ActiveMQTopic("GUEST.>"), USERS); writeAccess.put(new ActiveMQTopic("GUEST.>"), GUESTS); - readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); - writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempQueue"), WILDCARD); + readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.TempTopic"), WILDCARD); DestinationMap adminAccess = new DefaultAuthorizationMap(); adminAccess.put(new ActiveMQTopic(">"), ADMINS); diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml index 6ad1e72c2f1..4c171a05b25 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest-no-creds-only.xml @@ -41,7 +41,12 @@ - + + + + + + @@ -53,4 +58,4 @@ - \ No newline at end of file + diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml index c8474a349e4..c2d10ba5ddc 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker-guest.xml @@ -41,7 +41,12 @@ - + + + + + + @@ -53,4 +58,4 @@ - \ No newline at end of file + diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml index 246921584fd..a30f17aa41f 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/jaas-broker.xml @@ -46,13 +46,12 @@ - - + + - - - + + + diff --git a/assembly/src/release/conf/activemq.xml b/assembly/src/release/conf/activemq.xml index db176d44850..71285294a0b 100644 --- a/assembly/src/release/conf/activemq.xml +++ b/assembly/src/release/conf/activemq.xml @@ -74,17 +74,24 @@ entirely; specify only packages you explicitly trust). NOTE ABOUT ADVISORY TOPICS: - 1. All users need permission to create ActiveMQ.Advisory destinations, - which is given by the "admin" acl. However, normal users should - generally NOT be given access to read/write for advisories (except temp) - as those messages are meant for admin users. - 2. A notable exception to number 1 is regular users should be given access to + 1. Normal (non-admin) users should generally NOT be given access to + create/delete/read/write for advisories as those messages are meant for admin users. + 2. A notable exception to number 1 is normal users should be given read access to advisories for temporary destinations because ActiveMQConnection uses those advisories. + Temp dest advisory topics are: ActiveMQ.Advisory.TempQueue and ActiveMQ.Advisory.TempTopic 3. In addition, dynamic network connectors use advisories to determine consumer demand so the users that will be used to create bridges need access - consumer and virtual destination consumer advisories. + consumer and virtual destination consumer advisories. Example, assuming bridge-user + is the user that is used for the network connection: - For more information, see: + + ...... + + + ...... + + + For more information, see: https://activemq.apache.org/security -->