Skip to content

Commit

Permalink
Prevent potential deadlock on shutdown when using AsyncAppender (#396)
Browse files Browse the repository at this point in the history
* Use separate synchronization for the Hierarchy listener list

* Ensure the dispatch thread has terminated before destroying the std::thread
  • Loading branch information
swebb2066 authored Jul 23, 2024
1 parent b00813c commit 1c1cd79
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 37 deletions.
78 changes: 45 additions & 33 deletions src/main/cpp/asyncappender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
* limitations under the License.
*/


#include <log4cxx/asyncappender.h>


#include <log4cxx/helpers/loglog.h>
#include <log4cxx/spi/loggingevent.h>
#include <log4cxx/helpers/stringhelper.h>
Expand Down Expand Up @@ -105,30 +103,34 @@ typedef std::map<LogString, DiscardSummary> DiscardMap;
#endif

#ifdef __cpp_lib_hardware_interference_size
using std::hardware_constructive_interference_size;
using std::hardware_destructive_interference_size;
using std::hardware_constructive_interference_size;
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
constexpr std::size_t hardware_constructive_interference_size = 64;
constexpr std::size_t hardware_destructive_interference_size = 64;
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
constexpr std::size_t hardware_constructive_interference_size = 64;
constexpr std::size_t hardware_destructive_interference_size = 64;
#endif

struct AsyncAppender::AsyncAppenderPriv : public AppenderSkeleton::AppenderSkeletonPrivate
{
AsyncAppenderPriv() :
AppenderSkeletonPrivate(),
buffer(DEFAULT_BUFFER_SIZE),
bufferSize(DEFAULT_BUFFER_SIZE),
dispatcher(),
locationInfo(false),
blocking(true)
AsyncAppenderPriv()
: AppenderSkeletonPrivate()
, buffer(DEFAULT_BUFFER_SIZE)
, bufferSize(DEFAULT_BUFFER_SIZE)
, dispatcher()
, locationInfo(false)
, blocking(true)
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopDispatcher();})
#endif
, eventCount(0)
, dispatchedCount(0)
, commitCount(0)
{ }

~AsyncAppenderPriv()
{
stopDispatcher();
}

/**
Expand Down Expand Up @@ -171,10 +173,7 @@ struct AsyncAppender::AsyncAppenderPriv : public AppenderSkeleton::AppenderSkele

void stopDispatcher()
{
{
std::lock_guard<std::mutex> lock(bufferMutex);
closed = true;
}
this->setClosed();
bufferNotEmpty.notify_all();
bufferNotFull.notify_all();

Expand Down Expand Up @@ -212,6 +211,18 @@ struct AsyncAppender::AsyncAppenderPriv : public AppenderSkeleton::AppenderSkele
* Used to communicate to the dispatch thread when an event is committed in buffer.
*/
alignas(hardware_constructive_interference_size) std::atomic<size_t> commitCount;

bool isClosed()
{
std::lock_guard<std::mutex> lock(this->bufferMutex);
return this->closed;
}

void setClosed()
{
std::lock_guard<std::mutex> lock(this->bufferMutex);
this->closed = true;
}
};


Expand Down Expand Up @@ -280,7 +291,7 @@ void AsyncAppender::append(const spi::LoggingEventPtr& event, Pool& p)

if (!priv->dispatcher.joinable())
{
std::lock_guard<std::mutex> lock(priv->bufferMutex);
std::lock_guard<std::recursive_mutex> lock(priv->mutex);
if (!priv->dispatcher.joinable())
priv->dispatcher = ThreadUtility::instance()->createThread( LOG4CXX_STR("AsyncAppender"), &AsyncAppender::dispatch, this );
}
Expand Down Expand Up @@ -515,7 +526,7 @@ void AsyncAppender::dispatch()
{ return priv->dispatchedCount != priv->commitCount || priv->closed; }
);
}
isActive = !priv->closed;
isActive = !priv->isClosed();

while (events.size() < priv->bufferSize && priv->dispatchedCount != priv->commitCount)
{
Expand Down Expand Up @@ -545,34 +556,35 @@ void AsyncAppender::dispatch()
}
catch (std::exception& ex)
{
if (isActive)
if (!priv->isClosed())
{
priv->errorHandler->error(LOG4CXX_STR("async dispatcher"), ex, 0, item);
isActive = false;
}
}
catch (...)
{
if (isActive)
if (!priv->isClosed())
{
priv->errorHandler->error(LOG4CXX_STR("async dispatcher"));
isActive = false;
}
}
}
if (!isActive && LogLog::isDebugEnabled())
}
if (LogLog::isDebugEnabled())
{
Pool p;
LogString msg(LOG4CXX_STR("AsyncAppender"));
msg += LOG4CXX_STR(" discardCount ");
StringHelper::toString(discardCount, p, msg);
msg += LOG4CXX_STR(" pendingCountHistogram");
for (auto item : pendingCountHistogram)
{
LogString msg(LOG4CXX_STR("AsyncAppender"));
msg += LOG4CXX_STR(" discardCount ");
StringHelper::toString(discardCount, p, msg);
msg += LOG4CXX_STR(" pendingCountHistogram");
for (auto item : pendingCountHistogram)
{
msg += logchar(' ');
StringHelper::toString(item, p, msg);
}
LogLog::debug(msg);
msg += logchar(' ');
StringHelper::toString(item, p, msg);
}
LogLog::debug(msg);
}

}
10 changes: 6 additions & 4 deletions src/main/cpp/hierarchy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ struct Hierarchy::HierarchyPrivate
ProvisionNodeMap provisionNodes;

std::vector<AppenderPtr> allAppenders;

mutable std::mutex listenerMutex;
};

IMPLEMENT_LOG4CXX_OBJECT(Hierarchy)
Expand Down Expand Up @@ -91,7 +93,7 @@ Hierarchy::~Hierarchy()

void Hierarchy::addHierarchyEventListener(const spi::HierarchyEventListenerPtr& listener)
{
std::lock_guard<std::recursive_mutex> lock(m_priv->mutex);
std::lock_guard<std::mutex> lock(m_priv->listenerMutex);

if (std::find(m_priv->listeners.begin(), m_priv->listeners.end(), listener) != m_priv->listeners.end())
{
Expand All @@ -105,7 +107,7 @@ void Hierarchy::addHierarchyEventListener(const spi::HierarchyEventListenerPtr&

void Hierarchy::removeHierarchyEventListener(const spi::HierarchyEventListenerPtr& listener)
{
std::lock_guard<std::recursive_mutex> lock(m_priv->mutex);
std::lock_guard<std::mutex> lock(m_priv->listenerMutex);

auto found = std::find(m_priv->listeners.begin(), m_priv->listeners.end(), listener);
if(found != m_priv->listeners.end()){
Expand Down Expand Up @@ -194,7 +196,7 @@ void Hierarchy::fireAddAppenderEvent(const Logger* logger, const Appender* appen
setConfigured(true);
HierarchyEventListenerList clonedList;
{
std::lock_guard<std::recursive_mutex> lock(m_priv->mutex);
std::lock_guard<std::mutex> lock(m_priv->listenerMutex);
clonedList = m_priv->listeners;
}

Expand All @@ -207,7 +209,7 @@ void Hierarchy::fireRemoveAppenderEvent(const Logger* logger, const Appender* ap
{
HierarchyEventListenerList clonedList;
{
std::lock_guard<std::recursive_mutex> lock(m_priv->mutex);
std::lock_guard<std::mutex> lock(m_priv->listenerMutex);
clonedList = m_priv->listeners;
}
for (auto& item : clonedList)
Expand Down

0 comments on commit 1c1cd79

Please sign in to comment.