Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public class TransportConstantTest {
@Test
public void testDefaultOnPom() {
assertEquals(0, TransportConstants.DEFAULT_QUIET_PERIOD, "It is expected to have the default at 0 on the testsuite");
assertEquals(0, TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT, "It is expected to have the default at 0 on the testsuite");
assertEquals(7200000, TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT, "It is expected to have the default at 0 on the testsuite");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -852,12 +852,16 @@ public synchronized void asyncStop(Runnable callback) {
}

private void closeChannelGroup(ChannelGroup channelGroup) {
if (channelGroup != null && !channelGroup.close().awaitUninterruptibly(shutdownTimeout, TimeUnit.MILLISECONDS)) {
ActiveMQServerLogger.LOGGER.nettyChannelGroupError(getName());
for (Channel channel : channelGroup) {
if (channel.isActive()) {
ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel, ProxyProtocolUtil.getRemoteAddress(channel), getName());
if (channelGroup != null) {
if (shutdownTimeout > 0 && !channelGroup.close().awaitUninterruptibly(shutdownTimeout, TimeUnit.MILLISECONDS)) {
ActiveMQServerLogger.LOGGER.nettyChannelGroupError(getName(), shutdownTimeout);
for (Channel channel : channelGroup) {
if (channel.isActive()) {
ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel, ProxyProtocolUtil.getRemoteAddress(channel), getName());
}
}
} else {
channelGroup.close();
}
}
}
Expand Down Expand Up @@ -895,13 +899,15 @@ public synchronized void pause() {

// We *pause* the acceptor so no new connections are made
if (serverChannelGroup != null) {
if (!serverChannelGroup.close().awaitUninterruptibly(shutdownTimeout, TimeUnit.MILLISECONDS)) {
ActiveMQServerLogger.LOGGER.nettyChannelGroupBindError();
if (shutdownTimeout > 0 && !serverChannelGroup.close().awaitUninterruptibly(shutdownTimeout, TimeUnit.MILLISECONDS)) {
ActiveMQServerLogger.LOGGER.nettyChannelGroupBindErrorOnPause(getName(), shutdownTimeout);
for (Channel channel : serverChannelGroup) {
if (channel.isActive()) {
ActiveMQServerLogger.LOGGER.nettyChannelStillBound(channel, ProxyProtocolUtil.getRemoteAddress(channel));
ActiveMQServerLogger.LOGGER.nettyChannelStillBoundOnPause(channel, ProxyProtocolUtil.getRemoteAddress(channel), getName());
}
}
} else {
serverChannelGroup.close();
}
}
paused = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,17 +492,17 @@ void slowConsumerDetected(String sessionID,
@LogMessage(id = 222072, value = "Timed out flushing channel on InVMConnection", level = LogMessage.Level.WARN)
void timedOutFlushingInvmChannel();

@LogMessage(id = 222074, value = "Netty ChannelGroup did not completely close for acceptor '{}'", level = LogMessage.Level.WARN)
void nettyChannelGroupError(String acceptor);
@LogMessage(id = 222074, value = "Netty ChannelGroup for acceptor '{}' did not completely close within {}ms timeout", level = LogMessage.Level.WARN)
void nettyChannelGroupError(String acceptor, int timeout);

@LogMessage(id = 222075, value = "{} is still connected to {} for acceptor '{}'", level = LogMessage.Level.WARN)
void nettyChannelStillOpen(Channel channel, String remoteAddress, String acceptor);

@LogMessage(id = 222076, value = "channel group did not completely unbind", level = LogMessage.Level.WARN)
void nettyChannelGroupBindError();
@LogMessage(id = 222076, value = "Netty ChannelGroup did not completely unbind within {}ms timeout when pausing acceptor '{}'", level = LogMessage.Level.WARN)
void nettyChannelGroupBindErrorOnPause(String acceptor, int timeout);

@LogMessage(id = 222077, value = "{} is still bound to {}", level = LogMessage.Level.WARN)
void nettyChannelStillBound(Channel channel, String remoteAddress);
@LogMessage(id = 222077, value = "{} is still bound to {} when pausing acceptor '{}'", level = LogMessage.Level.WARN)
void nettyChannelStillBoundOnPause(Channel channel, String remoteAddress, String acceptor);

@LogMessage(id = 222080, value = "Error creating acceptor: {}", level = LogMessage.Level.WARN)
void errorCreatingAcceptor(String name, Exception e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,43 @@ public void close() throws IOException {
}

/**
* is there any record matching Level?
* Determines whether there is any log entry matching the specified log level.
*
* @param level the log level to check for in the log entries
* @return true if a log entry matches the specified log level; otherwise, false
*/
public boolean hasLevel(LogLevel level) {
return hasLevel(level, null);
}

/**
* Determines whether there is any log entry matching the specified log level while optionally ignoring messages that
* contain specified substrings.
*
* @param level the log level to check for in the log entries
* @param ignores a list of substrings; log messages containing any of these substrings will be ignored during the
* check
* @return true if a log entry matches the specified log level and does not contain any of the ignored substrings;
* otherwise, false
*/
public boolean hasLevel(LogLevel level, List<String> ignores) {
Level implLevel = level.toImplLevel();
for (LogEntry logEntry : messages) {
if (implLevel == logEntry.level) {
return true;
if (ignores != null) {
boolean ignoreFound = false;
for (String ignore : ignores) {
if (logEntry.message.contains(ignore)) {
ignoreFound = true;
break;
}
}
if (!ignoreFound) {
return true;
}
} else {
return true;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion docs/user-manual/configuring-transports.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ shutdownTimeout::
This is only valid for acceptors.
It is the number of milliseconds the broker will wait when shutting down each of the various Netty `ChannelGroup` instances as well as the `EventLoopGroup` instance associated with the acceptor.
The default is `3000`.
The default can also be set with the Java system property `DEFAULT_SHUTDOWN_TIMEOUT`.
The default can also be set with the Java system property `org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT`.

=== Configuring Netty Native Transport

Expand Down
11 changes: 8 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,14 @@
<!-- for tests that will need a new server created -->
<artemis.distribution.output>${activemq.basedir}/artemis-distribution/target/apache-artemis-${project.version}-bin/apache-artemis-${project.version}</artemis.distribution.output>

<activemq-surefire-argline>-Dbrokerconfig.maxDiskUsage=100 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_QUIET_PERIOD=0 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT=0
-Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686" -Djgroups.bind_addr=localhost
-Djava.net.preferIPv4Stack=true -Dbasedir=${basedir}
<activemq-surefire-argline>
-Dbrokerconfig.maxDiskUsage=100
-Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_QUIET_PERIOD=0
-Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT=7200000
-Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686"
-Djgroups.bind_addr=localhost
-Djava.net.preferIPv4Stack=true
-Dbasedir=${basedir}
-Djdk.attach.allowAttachSelf=true
-Dartemis.distribution.output="${artemis.distribution.output}"
-Dlog4j2.configurationFile="file:${activemq.basedir}/tests/config/${logging.config}"
Expand Down