Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.activemq.broker.Broker;
Expand Down Expand Up @@ -61,7 +63,6 @@
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.filter.DestinationPath;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.IdGenerator;
Expand All @@ -79,6 +80,7 @@ public class AdvisoryBroker extends BrokerFilter {
private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class);
private static final IdGenerator ID_GENERATOR = new IdGenerator();

protected final AtomicReference<ConnectionContext> advisoryConnectionContext = new AtomicReference<>();
protected final ConcurrentMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();

private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -107,13 +109,40 @@ public class AdvisoryBroker extends BrokerFilter {

private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();

private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher();
private final VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher();

public AdvisoryBroker(Broker next) {
super(next);
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
}

@Override
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
super.setAdminConnectionContext(adminConnectionContext);
// Create a copy of the adminConnection context and set flow control false
// This will be used to publish all advisories. This will be called
// during broker construction and before the first advisories are sent.
ConnectionContext connectionContext = adminConnectionContext.copy();
connectionContext.setProducerFlowControl(false);
this.advisoryConnectionContext.set(connectionContext);
}

@Override
public void start() throws Exception {
super.start();
// Sanity check to make sure we setAdminConnectionContext() was called and
// we initialized the admin context
if (advisoryConnectionContext.get() == null) {
throw new IllegalArgumentException("AdminConnectionContext was not initialized");
}
}

@Override
public void stop() throws Exception {
super.stop();
this.advisoryConnectionContext.set(null);
}

@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
super.addConnection(context, info);
Expand Down Expand Up @@ -538,6 +567,46 @@ public void messageDiscarded(ConnectionContext context, Subscription sub, Messag
}
}

@Override
public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) {
super.messageNoConsumers(context, messageReference);
try {
if (!messageReference.isAdvisory()) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
ActiveMQDestination destination = baseDestination.getActiveMQDestination();
if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {

Message message = messageReference.getMessage().copy();
// The original destination and transaction id do not get
// filled when the message is first sent,
// it is only populated if the message is routed to another
// destination like the DLQ
if (message.getOriginalDestination() != null) {
message.setOriginalDestination(message.getDestination());
}
if (message.getOriginalTransactionId() != null) {
message.setOriginalTransactionId(message.getTransactionId());
}

ActiveMQTopic advisoryTopic;
if (destination.isQueue()) {
advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
} else {
advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
}
message.setDestination(advisoryTopic);
message.setTransactionId(null);

context.getBroker().send(newAdvisoryProducerExchange(), message);
}
}
} catch (Exception e) {
handleFireFailure("discarded", e);
}
}

@Override
public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
super.slowConsumer(context, destination, subs);
Expand Down Expand Up @@ -775,10 +844,7 @@ public void nowMasterBroker() {
try {
ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
ConnectionContext context = new ConnectionContext();
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(getBrokerService().getBroker());
fireAdvisory(context, topic, null, null, advisoryMessage);
fireAdvisory(topic, null, advisoryMessage);
} catch (Exception e) {
handleFireFailure("now master broker", e);
}
Expand Down Expand Up @@ -817,12 +883,7 @@ public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex,
advisoryMessage.setStringProperty("remoteIp", remoteIp);
networkBridges.putIfAbsent(brokerInfo, advisoryMessage);

ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();

ConnectionContext context = new ConnectionContext();
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(getBrokerService().getBroker());
fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
fireAdvisory(AdvisorySupport.getNetworkBridgeAdvisoryTopic(), brokerInfo, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("network bridge started", e);
Expand All @@ -837,12 +898,7 @@ public void networkBridgeStopped(BrokerInfo brokerInfo) {
advisoryMessage.setBooleanProperty("started", false);
networkBridges.remove(brokerInfo);

ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();

ConnectionContext context = new ConnectionContext();
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(getBrokerService().getBroker());
fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
fireAdvisory(AdvisorySupport.getNetworkBridgeAdvisoryTopic(), brokerInfo, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("network bridge stopped", e);
Expand Down Expand Up @@ -899,7 +955,19 @@ protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestinati
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}

public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
public void fireFailedForwardAdvisory(Message message, Throwable error) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());

fireAdvisory(AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), message, advisoryMessage);
}

private void fireAdvisory(ActiveMQTopic topic, Command command, ActiveMQMessage advisoryMessage) throws Exception {
fireAdvisory(advisoryConnectionContext.get(), topic, command, null, advisoryMessage);
}

private void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command,
ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
//set properties
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
Expand All @@ -925,17 +993,11 @@ public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command
advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false);
advisoryMessage.setProducerId(advisoryProducerId);
boolean originalFlowControl = context.isProducerFlowControl();
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
next.send(producerExchange, advisoryMessage);
} finally {
context.setProducerFlowControl(originalFlowControl);
}

// The advisory messages are generated the broker itself, so this send will
// publish the advisory message using the Broker ConnectionContext so there will
// be admin permissions granted.
next.send(newAdvisoryProducerExchange(), advisoryMessage);
}

public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
Expand Down Expand Up @@ -963,7 +1025,7 @@ public ConcurrentMap<ConsumerInfo, VirtualDestination> getVirtualDestinationCons
return virtualDestinationConsumers;
}

private class VirtualConsumerPair {
protected class VirtualConsumerPair {
private final VirtualDestination virtualDestination;

//destination that matches this virtualDestination as part target
Expand Down Expand Up @@ -1028,4 +1090,14 @@ private AdvisoryBroker getOuterType() {
return AdvisoryBroker.this;
}
}

protected ProducerBrokerExchange newAdvisoryProducerExchange() {
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(Objects.requireNonNull(advisoryConnectionContext.get(),
"Advisory ConnectionContext must not be null"));
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
return producerExchange;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ public interface Broker extends Region, Service {
*/
void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference);

/**
* Called when a message is processed with no consumers
*
* @param context connection context
* @param messageReference message reference
*/
void messageNoConsumers(ConnectionContext context, MessageReference messageReference);

/**
* Called when there is a slow consumer
* @param context connection context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ public void messageDiscarded(ConnectionContext context,Subscription sub, Message
getNext().messageDiscarded(context, sub, messageReference);
}

@Override
public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) {
getNext().messageNoConsumers(context, messageReference);
}

@Override
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
getNext().slowConsumer(context, destination,subs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,18 @@ public void run() {
}
}

// Ensure the broker chain is fully initialized and we create the admin connection.
// The admin connection is needed to create destinations and for the AdvisoryBroker
// before broker startup. Creating the connection will also call the
// setAdminConnectionContext() callback on the Broker chain. This ensures initialization
// is done correctly even if someone overrides getAdminConnectionContext();
private void initializeAdminConnection() throws Exception {
BrokerSupport.getConnectionContext(getBroker());
}

private void doStartBroker() throws Exception {
checkStartException();
initializeAdminConnection();
startDestinations();
addShutdownHook();

Expand Down Expand Up @@ -2447,7 +2457,7 @@ protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
protected Broker addInterceptors(Broker broker) throws Exception {
if (isAdvisorySupport()) {
// AMQ-9187 - the AdvisoryBroker must be after the SchedulerBroker
broker = new AdvisoryBroker(broker);
broker = createAdvisoryBroker(broker);
}
if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
Expand Down Expand Up @@ -2515,6 +2525,10 @@ protected PersistenceAdapter createPersistenceAdapter() throws IOException {
}
}

protected AdvisoryBroker createAdvisoryBroker(Broker broker) {
return new AdvisoryBroker(broker);
}

protected ObjectName createBrokerObjectName() throws MalformedObjectNameException {
return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ public void messageDispatched(ConnectionContext context, Subscription sub, Messa
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
}

@Override
public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) {

}

@Override
public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ public void messageDiscarded(ConnectionContext context, Subscription sub, Messag
throw new BrokerStoppedException(this.message);
}

@Override
public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}

@Override
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
throw new BrokerStoppedException(this.message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import jakarta.jms.ResourceAllocationException;

import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
Expand Down Expand Up @@ -643,50 +642,11 @@ public boolean isDisposed() {
* Provides a hook to allow messages with no consumer to be processed in
* some way - such as to send to a dead letter queue or something..
*/
protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
if (!msg.isPersistent()) {
if (isSendAdvisoryIfNoConsumers()) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {

Message message = msg.copy();
// The original destination and transaction id do not get
// filled when the message is first sent,
// it is only populated if the message is routed to another
// destination like the DLQ
if (message.getOriginalDestination() != null) {
message.setOriginalDestination(message.getDestination());
}
if (message.getOriginalTransactionId() != null) {
message.setOriginalTransactionId(message.getTransactionId());
}

ActiveMQTopic advisoryTopic;
if (destination.isQueue()) {
advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
} else {
advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
}
message.setDestination(advisoryTopic);
message.setTransactionId(null);

// Disable flow control for this since since we don't want
// to block.
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setMutable(false);
producerExchange.setConnectionContext(context);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
context.getBroker().send(producerExchange, message);
} finally {
context.setProducerFlowControl(originalFlowControl);
}

}
}
protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) {
if (!msg.isPersistent() && isSendAdvisoryIfNoConsumers()) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
broker.messageNoConsumers(context, msg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,15 @@ public void messageDiscarded(ConnectionContext context, Subscription sub, Messag
super.messageDiscarded(context, sub, messageReference);
}

@Override
public void messageNoConsumers(ConnectionContext context, MessageReference messageReference) {
if (isLogAll() || isLogInternalEvents()) {
String msg = messageReference.getMessage().toString();
LOG.info("Message without consumers: {}", msg);
}
super.messageNoConsumers(context, messageReference);
}

@Override
public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
Expand Down
Loading
Loading