Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Alarm for notification queue overpassing a given threshold (#4113) #4375

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
- Fix: Alarm for notification queue overpassing a given threshold (#4113)
Anjali-NEC marked this conversation as resolved.
Show resolved Hide resolved
- Fix: logDeprecate not working correctly (`geo:json` wrongly considered as deprecated)
4 changes: 2 additions & 2 deletions src/app/contextBroker/contextBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ bool logDeprecate;
#define REQ_POOL_SIZE "size of thread pool for incoming connections"
#define IN_REQ_PAYLOAD_MAX_SIZE_DESC "maximum size (in bytes) of the payload of incoming requests"
#define OUT_REQ_MSG_MAX_SIZE_DESC "maximum size (in bytes) of outgoing forward and notification request messages"
#define THRESHOLD_MAX_SIZE_DESC "maximum threshold limit for notificationQueue"
Anjali-NEC marked this conversation as resolved.
Show resolved Hide resolved
#define SIMULATED_NOTIF_DESC "simulate notifications instead of actual sending them (only for testing)"
#define STAT_COUNTERS "enable request/notification counters statistics"
#define STAT_SEM_WAIT "enable semaphore waiting time statistics"
Expand All @@ -275,6 +276,7 @@ bool logDeprecate;
#define INSECURE_NOTIF_DESC "allow HTTPS notifications to peers which certificate cannot be authenticated with known CA certificates"
#define NGSIV1_AUTOCAST_DESC "automatic cast for number, booleans and dates in NGSIv1 update/create attribute operations"
#define MQTT_MAX_AGE_DESC "max time (in minutes) that an unused MQTT connection is kept, default: 60"
//#define MAX_THRESHOLD_DESC "max time (in minutes) that an unused MQTT connection is kept, default: 60"
#define LOG_DEPRECATE_DESC "log deprecation usages as warnings"


Expand Down Expand Up @@ -333,10 +335,8 @@ PaArgument paArgs[] =
{ "-connectionMemory", &connectionMemory, "CONN_MEMORY", PaUInt, PaOpt, 64, 0, 1024, CONN_MEMORY_DESC },
{ "-maxConnections", &maxConnections, "MAX_CONN", PaUInt, PaOpt, 1020, 1, PaNL, MAX_CONN_DESC },
{ "-reqPoolSize", &reqPoolSize, "TRQ_POOL_SIZE", PaUInt, PaOpt, 0, 0, 1024, REQ_POOL_SIZE },

{ "-inReqPayloadMaxSize", &inReqPayloadMaxSize, "IN_REQ_PAYLOAD_MAX_SIZE", PaULong, PaOpt, DEFAULT_IN_REQ_PAYLOAD_MAX_SIZE, 0, PaNL, IN_REQ_PAYLOAD_MAX_SIZE_DESC },
{ "-outReqMsgMaxSize", &outReqMsgMaxSize, "OUT_REQ_MSG_MAX_SIZE", PaULong, PaOpt, DEFAULT_OUT_REQ_MSG_MAX_SIZE, 0, PaNL, OUT_REQ_MSG_MAX_SIZE_DESC },

{ "-notificationMode", &notificationMode, "NOTIF_MODE", PaString, PaOpt, _i "transient", PaNL, PaNL, NOTIFICATION_MODE_DESC },
{ "-notifFlowControl", &notifFlowControl, "NOTIF_FLOW_CONTROL", PaString, PaOpt, _i "", PaNL, PaNL, FLOW_CONTROL_DESC },
{ "-simulatedNotification", &simulatedNotification, "DROP_NOTIF", PaBool, PaOpt, false, false, true, SIMULATED_NOTIF_DESC },
Expand Down
87 changes: 87 additions & 0 deletions src/lib/alarmMgr/AlarmManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ AlarmManager::AlarmManager()
badInputResets(0),
notificationErrors(0),
notificationErrorResets(0),
notificationQueues(0),
notificationQueueResets(0),
forwardingErrors(0),
forwardingErrorResets(0),
mqttConnectionErrors(0),
Expand All @@ -66,6 +68,7 @@ AlarmManager::AlarmManager()
dbErrorResets(0),
dbOk(true),
notificationErrorLogAlways(false),
notificationQueueLogAlways(false),
forwardingErrorLogAlways(false),
mqttConnectionErrorLogAlways(false),
badInputLogAlways(false),
Expand All @@ -82,6 +85,7 @@ AlarmManager::AlarmManager()
int AlarmManager::init(bool logAlreadyRaisedAlarms)
{
notificationErrorLogAlways = logAlreadyRaisedAlarms;
notificationQueueLogAlways = logAlreadyRaisedAlarms;
badInputLogAlways = logAlreadyRaisedAlarms;
dbErrorLogAlways = logAlreadyRaisedAlarms;
mqttConnectionErrorLogAlways = logAlreadyRaisedAlarms;
Expand Down Expand Up @@ -246,6 +250,21 @@ void AlarmManager::notificationErrorGet(int64_t* active, int64_t* raised, int64_
}


/* ****************************************************************************
*
* AlarmManager::notificationQueueGet -
*
* NOTE
* To read values, no semaphore is used.
*/
void AlarmManager::notificationQueueGet(int64_t* active, int64_t* raised, int64_t* released)
{
*active = notificationQ.size();
*raised = notificationQueues;
*released = notificationQueueResets;
}



/* ****************************************************************************
*
Expand Down Expand Up @@ -589,6 +608,74 @@ bool AlarmManager::forwardingErrorReset(const std::string& url)



/* ****************************************************************************
*
* AlarmManager::notificationQueue -
*
* Returns false if no effective alarm transition occurs, otherwise, true is returned.
*/
bool AlarmManager::notificationQueue(const std::string& service, const std::string& details)
{
semTake();

std::map<std::string, int>::iterator iter = notificationQ.find(details);
Anjali-NEC marked this conversation as resolved.
Show resolved Hide resolved

if (iter != notificationQ.end())
{
iter->second += 1;

if (notificationQueueLogAlways)
{
LM_W(("Repeated NotificationQueue %s: %s", service.c_str(), details.c_str()));
}
else
{
// even if repeat alarms is off, this is a relevant event in debug level
LM_T(LmtCPrForwardRequestPayload, ("Repeated NotificationQueue %s: %s", service.c_str(), details.c_str()));
}

semGive();
return false;
}

++notificationQueues;

notificationQ[details] = 1;
Anjali-NEC marked this conversation as resolved.
Show resolved Hide resolved
semGive();

LM_W(("Raising alarm NotificationQueue %s: %s", service.c_str(), details.c_str()));

return true;
}



/* ****************************************************************************
*
* AlarmManager::notificationQueuesResets -
*
* Returns false if no effective alarm transition occurs, otherwise, true is returned.
*/
bool AlarmManager::notificationQueuesResets(const std::string& details)
Anjali-NEC marked this conversation as resolved.
Show resolved Hide resolved
{
semTake();

if (notificationQ.find(details) == notificationQ.end()) // Doesn't exist
{
semGive();
return false;
}

notificationQ.erase(details);
++notificationQueues;
semGive();

LM_W(("Releasing alarm NotificationQueue %s:", details.c_str()));

return true;
}


/* ****************************************************************************
*
* AlarmManager::badInputReset -
Expand Down
8 changes: 8 additions & 0 deletions src/lib/alarmMgr/AlarmManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class AlarmManager
int64_t badInputResets;
int64_t notificationErrors;
int64_t notificationErrorResets;
int64_t notificationQueues;
int64_t notificationQueueResets;
int64_t forwardingErrors;
int64_t forwardingErrorResets;
int64_t mqttConnectionErrors;
Expand All @@ -53,11 +55,13 @@ class AlarmManager
bool dbOk;

std::map<std::string, int> notificationV;
std::map<std::string, int> notificationQ;
std::map<std::string, int> forwardingErrorV;
std::map<std::string, int> badInputV;
std::map<std::string, int> mqttConnectionErrorV;

bool notificationErrorLogAlways;
bool notificationQueueLogAlways;
bool forwardingErrorLogAlways;
bool mqttConnectionErrorLogAlways;
bool badInputLogAlways;
Expand All @@ -80,6 +84,9 @@ class AlarmManager
bool notificationError(const std::string& url, const std::string& details);
bool notificationErrorReset(const std::string& url);

bool notificationQueuesResets(const std::string& details);
bool notificationQueue(const std::string& url, const std::string& details);
Anjali-NEC marked this conversation as resolved.
Show resolved Hide resolved

bool forwardingError(const std::string& url, const std::string& details);
bool forwardingErrorReset(const std::string& url);

Expand All @@ -93,6 +100,7 @@ class AlarmManager
void dbErrorsGet(bool* active, int64_t* raised, int64_t* released);
void badInputGet(int64_t* active, int64_t* raised, int64_t* released);
void notificationErrorGet(int64_t* active, int64_t* raised, int64_t* released);
void notificationQueueGet(int64_t* active, int64_t* raised, int64_t* released);
void forwardingErrorGet(int64_t* active, int64_t* raised, int64_t* released);
void mqttConnectionErrorGet(int64_t* active, int64_t* raised, int64_t* released);

Expand Down
1 change: 1 addition & 0 deletions src/lib/common/globals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ bool notifQueueStatistics = false;
bool checkIdv1 = false;
unsigned long long inReqPayloadMaxSize = DEFAULT_IN_REQ_PAYLOAD_MAX_SIZE;
unsigned long long outReqMsgMaxSize = DEFAULT_OUT_REQ_MSG_MAX_SIZE;
unsigned long long thresholdMaxSize = DEFAULT_THRESHOLD_MAX_SIZE;



Expand Down
2 changes: 2 additions & 0 deletions src/lib/common/limits.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
#define DEFAULT_OUT_REQ_MSG_MAX_SIZE (8 * 1024 * 1024) // 8 MB default max size of any outgoing request message (see CLI -outReqMsgMaxSize)


#define DEFAULT_THRESHOLD_MAX_SIZE 3
Anjali-NEC marked this conversation as resolved.
Show resolved Hide resolved


/* ****************************************************************************
*
Expand Down
1 change: 1 addition & 0 deletions src/lib/logSummary/logSummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ static void* logSummary(void* vP)
alarmMgr.dbErrorsGet(&deActive, &deRaised, &deReleased);
alarmMgr.badInputGet(&biActive, &biRaised, &biReleased);
alarmMgr.notificationErrorGet(&neActive, &neRaised, &neReleased);
alarmMgr.notificationQueueGet(&neActive, &neRaised, &neReleased);
alarmMgr.forwardingErrorGet(&fwdActive, &fwdRaised, &fwdReleased);
alarmMgr.mqttConnectionErrorGet(&mceActive, &mceRaised, &mceReleased);

Expand Down
17 changes: 17 additions & 0 deletions src/lib/ngsiNotify/QueueNotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,23 @@ void QueueNotifier::sendNotifyContextRequest
}

bool enqueued = sq->try_push(paramsP);

if (enqueued)
{
std::string details;
extern int thresholdMaxSize;
details = "(Max threshold limit: 3)";

if (QueueStatistics::getIn() >= thresholdMaxSize)
{
alarmMgr.notificationQueue(service, "notification queue reached the threshold " + details);
}
else
{
alarmMgr.notificationQueuesResets(service);
}
}

if (!enqueued)
{
QueueStatistics::incReject(1);
Expand Down
Loading
Loading