Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Alarm for notification queue overpassing a given threshold (#4113) #4375

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Add: alarm for notification queue overpassing a given threshold (new CLI `-notifAlarmThreshold` / env var `ORION_NOTIF_ALARM_THRESHOLD`) (#4113)
- Add: servicePath field to builtin attributes (#2877)
- Add: notification.mqtt.retain and notification.mqttCustom.retain flag for MQTT retain in notifications (#4388)
- Fix: logDeprecate not working correctly (`geo:json` wrongly considered as deprecated)
Expand Down
2 changes: 2 additions & 0 deletions doc/manuals/admin/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ The list of available options is the following:
to the `-k` or `--insecure` parameteres of the curl command.
- **-mqttMaxAge**. Max time (in minutes) that an unused MQTT connection is kept. Default: 60
- **-logDeprecate**. Log deprecation usages as warnings. More information in [this section of the documentation](../deprecated.md#log-deprecation-warnings). Default is: false. It can be changed after Orion startup with the [log admin REST API](management_api.md#log-configs-and-trace-levels), with the `deprecated` field
- **-notifAlarmThreshold**. Maximum threshold for notification queue alarms, as a percentage of the maximum queue size, default 0 (meaning no queue alarms are used)

## Configuration using environment variables

Expand Down Expand Up @@ -270,3 +271,4 @@ Two facts have to be taken into account:
| ORION_NGSIV1_AUTOCAST | ngsiv1Autocast |
| ORION_MQTT_MAX_AGE | mqttMaxAge |
| ORION_LOG_DEPRECATE | logDeprecate |
| ORION_NOTIF_ALARM_THRESHOLD | notifAlarmThreshold |
1 change: 1 addition & 0 deletions doc/manuals/admin/logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ Alarm conditions:
| 5 | WARNING | The following WARN text appears in the 'msg' field: "Raising alarm NotificationError `<url>`: `<detail>`". | The following WARN text appears in the 'msg' field: "Releasing alarm NotificationError `<url>`", where `<url>` is the same one that triggered the alarm. Orion prints this trace when it successfully sent a notification to that URL. | Notification Failure. The `<detail>`text contains the detailed information. | Orion is trying to send the HTTP notification to a given receiver and some problem has occurred. It could be due to a problem with the network connectivity or on the receiver, e.g. the receiver is down. In the second case, the owner of the receiver of the notification should be reported. No specific action has to be performed at Orion Context Broker service.
Anjali-NEC marked this conversation as resolved.
Show resolved Hide resolved
| 6 | WARNING | The following WARN text appears in the 'msg' field: "Raising alarm ForwardingError `<url>`": `<detail>`". | The following WARN text appears in the 'msg' field: "Releasing alarm ForwardingError `<url>`", where `<url>` is the same one that triggered the alarm. Orion prints this trace when it successfully interact with ContextProvider to that URL.| Forwarding Error. The `<detail>`text contains the detailed information. | Orion is trying to interact with ContextProvider and some problem has occurred. It may be due to context provider response for forwarded query or update is empty. No specific action has to be performed at Orion Context Broker service.
| 7 | WARNING | The following WARN text appears in the 'msg' field: "Raising alarm MqttConnectionError `<endpoint>`": `<detail>`". | The following WARN text appears in the 'msg' field: "Releasing alarm MqttConnectionError `<endpoint>`", where `<endpoint>` is the same one that triggered the alarm. Orion prints this trace when it successfully interact with ContextProvider to that URL.| Error connection to MQTT broker. The `<detail>`text contains the detailed information. | Orion is trying to connecto to an MQTT broker (associated to a subscription) and some problem has occurred. It may be due to several reasons: MQTT broker in unreachable, user/pass is wrong, etc. No specific action has to be performed at Orion Context Broker service, but maybe in the MQTT broker configuration or in the associated subscription.
| 8 | WARNING | The following WARN text appears in the 'msg' field: "Raising alarm NotificationQueue `<service>`: `<detail>`". | The following WARN text appears in the 'msg' field: "Releasing alarm NotificationQueue `<service>`", where `<service>` is the same one that triggered the alarm. Orion prints this trace when notification queue goes back below the threshold. | The notification queue associated to the service (or "default" for default queue) has overpassed the alarm threshold. The `<detail>` text described the particular threshold. | No specific action has to be performed at Orion Context Broker service, but the update flow causing the notification on that service (or default queue) should be lowered in order to reduce pressure on queue. Another possible problem is due to malfunctioning notification receivers, if they are slow processing notifications and responding to Orion.

By default, Orion only traces the origin (i.e. raising) and end (i.e. releasing) of an alarm, e.g:

Expand Down
15 changes: 13 additions & 2 deletions src/app/contextBroker/contextBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ unsigned long fcMaxInterval;
int mqttMaxAge;

bool logDeprecate;
int notifAlarmThreshold;



Expand Down Expand Up @@ -277,6 +278,7 @@ bool logDeprecate;
#define NGSIV1_AUTOCAST_DESC "automatic cast for number, booleans and dates in NGSIv1 update/create attribute operations"
#define MQTT_MAX_AGE_DESC "max time (in minutes) that an unused MQTT connection is kept, default: 60"
#define LOG_DEPRECATE_DESC "log deprecation usages as warnings"
#define NOTIF_ALARM_THRESHOLD_DESC "maximum threshold for notification queue alarms, as a percentage of the maximum queue size, default 0 (meaning no queue alarms are used)"
#define DBURI_DESC "complete URI for database connection"


Expand Down Expand Up @@ -336,10 +338,8 @@ PaArgument paArgs[] =
{ "-connectionMemory", &connectionMemory, "CONN_MEMORY", PaUInt, PaOpt, 64, 0, 1024, CONN_MEMORY_DESC },
{ "-maxConnections", &maxConnections, "MAX_CONN", PaUInt, PaOpt, 1020, 1, PaNL, MAX_CONN_DESC },
{ "-reqPoolSize", &reqPoolSize, "TRQ_POOL_SIZE", PaUInt, PaOpt, 0, 0, 1024, REQ_POOL_SIZE },

{ "-inReqPayloadMaxSize", &inReqPayloadMaxSize, "IN_REQ_PAYLOAD_MAX_SIZE", PaULong, PaOpt, DEFAULT_IN_REQ_PAYLOAD_MAX_SIZE, 0, PaNL, IN_REQ_PAYLOAD_MAX_SIZE_DESC },
{ "-outReqMsgMaxSize", &outReqMsgMaxSize, "OUT_REQ_MSG_MAX_SIZE", PaULong, PaOpt, DEFAULT_OUT_REQ_MSG_MAX_SIZE, 0, PaNL, OUT_REQ_MSG_MAX_SIZE_DESC },

{ "-notificationMode", &notificationMode, "NOTIF_MODE", PaString, PaOpt, _i "transient", PaNL, PaNL, NOTIFICATION_MODE_DESC },
{ "-notifFlowControl", &notifFlowControl, "NOTIF_FLOW_CONTROL", PaString, PaOpt, _i "", PaNL, PaNL, FLOW_CONTROL_DESC },
{ "-simulatedNotification", &simulatedNotification, "DROP_NOTIF", PaBool, PaOpt, false, false, true, SIMULATED_NOTIF_DESC },
Expand Down Expand Up @@ -371,6 +371,8 @@ PaArgument paArgs[] =

{ "-logDeprecate", &logDeprecate, "LOG_DEPRECATE", PaBool, PaOpt, false, false, true, LOG_DEPRECATE_DESC },

{ "-notifAlarmThreshold", &notifAlarmThreshold, "NOTIF_ALARM_THRESHOLD", PaInt, PaOpt, 0, PaNL, PaNL, NOTIF_ALARM_THRESHOLD_DESC },

PA_END_OF_ARGS
};

Expand Down Expand Up @@ -673,6 +675,15 @@ static void contextBrokerInit(void)
pNotifier = new Notifier();
}

if (notifAlarmThreshold < 0)
{
LM_X(1, ("Fatal Error (notifAlarmThreshold negative value not allowed)"));
}
if (notifAlarmThreshold > 100)
{
LM_X(1, ("Fatal Error (notifAlarmThreshold value is greater than 100)"));
}

/* Set notifier object (singleton) */
setNotifier(pNotifier);

Expand Down
87 changes: 87 additions & 0 deletions src/lib/alarmMgr/AlarmManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ AlarmManager::AlarmManager()
badInputResets(0),
notificationErrors(0),
notificationErrorResets(0),
notificationQueues(0),
notificationQueueResets(0),
forwardingErrors(0),
forwardingErrorResets(0),
mqttConnectionErrors(0),
Expand All @@ -66,6 +68,7 @@ AlarmManager::AlarmManager()
dbErrorResets(0),
dbOk(true),
notificationErrorLogAlways(false),
notificationQueueLogAlways(false),
forwardingErrorLogAlways(false),
mqttConnectionErrorLogAlways(false),
badInputLogAlways(false),
Expand All @@ -82,6 +85,7 @@ AlarmManager::AlarmManager()
int AlarmManager::init(bool logAlreadyRaisedAlarms)
{
notificationErrorLogAlways = logAlreadyRaisedAlarms;
notificationQueueLogAlways = logAlreadyRaisedAlarms;
badInputLogAlways = logAlreadyRaisedAlarms;
dbErrorLogAlways = logAlreadyRaisedAlarms;
mqttConnectionErrorLogAlways = logAlreadyRaisedAlarms;
Expand Down Expand Up @@ -246,6 +250,21 @@ void AlarmManager::notificationErrorGet(int64_t* active, int64_t* raised, int64_
}


/* ****************************************************************************
*
* AlarmManager::notificationQueueGet -
*
* NOTE
* To read values, no semaphore is used.
*/
void AlarmManager::notificationQueueGet(int64_t* active, int64_t* raised, int64_t* released)
{
*active = notificationQ.size();
*raised = notificationQueues;
*released = notificationQueueResets;
}



/* ****************************************************************************
*
Expand Down Expand Up @@ -589,6 +608,74 @@ bool AlarmManager::forwardingErrorReset(const std::string& url)



/* ****************************************************************************
*
* AlarmManager::notificationQueue -
*
* Returns false if no effective alarm transition occurs, otherwise, true is returned.
*/
bool AlarmManager::notificationQueue(const std::string& service, const std::string& details)
{
semTake();

std::map<std::string, int>::iterator iter = notificationQ.find(service);

if (iter != notificationQ.end())
{
iter->second += 1;

if (notificationQueueLogAlways)
{
LM_W(("Repeated NotificationQueue %s: %s", service.c_str(), details.c_str()));
}
else
{
// even if repeat alarms is off, this is a relevant event in debug level
LM_T(LmtNotifierQueue, ("Repeated NotificationQueue %s: %s", service.c_str(), details.c_str()));
}

semGive();
return false;
}

++notificationQueues;

notificationQ[service] = 1;
semGive();

LM_W(("Raising alarm NotificationQueue %s: %s", service.c_str(), details.c_str()));

return true;
}



/* ****************************************************************************
*
* AlarmManager::notificationQueuesResets -
*
* Returns false if no effective alarm transition occurs, otherwise, true is returned.
*/
bool AlarmManager::notificationQueueReset(const std::string& service)
{
semTake();

if (notificationQ.find(service) == notificationQ.end()) // Doesn't exist
{
semGive();
return false;
}

notificationQ.erase(service);
++notificationQueueResets;
semGive();

LM_W(("Releasing alarm NotificationQueue %s", service.c_str()));

return true;
}


/* ****************************************************************************
*
* AlarmManager::badInputReset -
Expand Down
8 changes: 8 additions & 0 deletions src/lib/alarmMgr/AlarmManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class AlarmManager
int64_t badInputResets;
int64_t notificationErrors;
int64_t notificationErrorResets;
int64_t notificationQueues;
int64_t notificationQueueResets;
int64_t forwardingErrors;
int64_t forwardingErrorResets;
int64_t mqttConnectionErrors;
Expand All @@ -53,11 +55,13 @@ class AlarmManager
bool dbOk;

std::map<std::string, int> notificationV;
std::map<std::string, int> notificationQ;
std::map<std::string, int> forwardingErrorV;
std::map<std::string, int> badInputV;
std::map<std::string, int> mqttConnectionErrorV;

bool notificationErrorLogAlways;
bool notificationQueueLogAlways;
bool forwardingErrorLogAlways;
bool mqttConnectionErrorLogAlways;
bool badInputLogAlways;
Expand All @@ -80,6 +84,9 @@ class AlarmManager
bool notificationError(const std::string& url, const std::string& details);
bool notificationErrorReset(const std::string& url);

bool notificationQueueReset(const std::string& service);
bool notificationQueue(const std::string& service, const std::string& details);

bool forwardingError(const std::string& url, const std::string& details);
bool forwardingErrorReset(const std::string& url);

Expand All @@ -93,6 +100,7 @@ class AlarmManager
void dbErrorsGet(bool* active, int64_t* raised, int64_t* released);
void badInputGet(int64_t* active, int64_t* raised, int64_t* released);
void notificationErrorGet(int64_t* active, int64_t* raised, int64_t* released);
void notificationQueueGet(int64_t* active, int64_t* raised, int64_t* released);
void forwardingErrorGet(int64_t* active, int64_t* raised, int64_t* released);
void mqttConnectionErrorGet(int64_t* active, int64_t* raised, int64_t* released);

Expand Down
1 change: 1 addition & 0 deletions src/lib/logMsg/traceLevels.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ typedef enum TraceLevels
LmtNotificationRequestPayload,
LmtNotificationResponsePayload,
LmtMqttNotif,
LmtNotifierQueue,

/* Input/Output payloads (180-199) */
LmtServiceInputPayload = 180,
Expand Down
1 change: 1 addition & 0 deletions src/lib/logSummary/logSummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ static void* logSummary(void* vP)
alarmMgr.dbErrorsGet(&deActive, &deRaised, &deReleased);
alarmMgr.badInputGet(&biActive, &biRaised, &biReleased);
alarmMgr.notificationErrorGet(&neActive, &neRaised, &neReleased);
alarmMgr.notificationQueueGet(&neActive, &neRaised, &neReleased);
alarmMgr.forwardingErrorGet(&fwdActive, &fwdRaised, &fwdReleased);
alarmMgr.mqttConnectionErrorGet(&mceActive, &mceRaised, &mceReleased);

Expand Down
19 changes: 19 additions & 0 deletions src/lib/ngsiNotify/QueueNotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ void QueueNotifier::sendNotifyContextRequest
}

bool enqueued = sq->try_push(paramsP);

if (!enqueued)
{
QueueStatistics::incReject(1);
Expand All @@ -186,5 +187,23 @@ void QueueNotifier::sendNotifyContextRequest
return;
}

extern int notifAlarmThreshold;

if (notifAlarmThreshold != 0)
{
std::string details = ("notification queue reached maximum threshold");

long unsigned int threshold = queueSize(service)*notifAlarmThreshold/100;

if (threshold >= queueSize(service))
{
alarmMgr.notificationQueue(queueName.c_str(), details.c_str());
}
}
Comment on lines +194 to +202
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code can be simplified this way:

Suggested change
std::string details = ("notification queue reached maximum threshold");
long unsigned int threshold = queueSize(service)*notifAlarmThreshold/100;
if (threshold >= queueSize(service))
{
alarmMgr.notificationQueue(queueName.c_str(), details.c_str());
}
}
long unsigned int threshold = queueSize(service)*notifAlarmThreshold/100;
if (threshold >= queueSize(service))
{
alarmMgr.notificationQueue(queueName.c_str(), "notification queue reached maximum threshold");
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover, the detail message could provide information about the particular threshold for this case. For instance, if we have a queue of size 6 and the threshold is 50%, something like this:

notification queue reached maximum threshold (3)

else
{
alarmMgr.notificationQueueReset(queueName.c_str());
}

QueueStatistics::incIn(1);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ Usage: contextBroker [option '-U' (extended usage)]
[option '-ngsiv1Autocast' (automatic cast for number, booleans and dates in NGSIv1 update/create attribute operations)]
[option '-mqttMaxAge' <max time (in minutes) that an unused MQTT connection is kept, default: 60>]
[option '-logDeprecate' (log deprecation usages as warnings)]
[option '-notifAlarmThreshold' <maximum threshold for notification queue alarms, as a percentage of the maximum queue size, default 0 (meaning no queue alarms are used)>]

--TEARDOWN--
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,6 @@ Usage: contextBroker [option '-U' (extended usage)]
[option '-ngsiv1Autocast' (automatic cast for number, booleans and dates in NGSIv1 update/create attribute operations)]
[option '-mqttMaxAge' <max time (in minutes) that an unused MQTT connection is kept, default: 60>]
[option '-logDeprecate' (log deprecation usages as warnings)]
[option '-notifAlarmThreshold' <maximum threshold for notification queue alarms, as a percentage of the maximum queue size, default 0 (meaning no queue alarms are used)>]

--TEARDOWN--
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,6 @@ Usage: contextBroker [option '-U' (extended usage)]
[option '-ngsiv1Autocast' (automatic cast for number, booleans and dates in NGSIv1 update/create attribute operations)]
[option '-mqttMaxAge' <max time (in minutes) that an unused MQTT connection is kept, default: 60>]
[option '-logDeprecate' (log deprecation usages as warnings)]
[option '-notifAlarmThreshold' <maximum threshold for notification queue alarms, as a percentage of the maximum queue size, default 0 (meaning no queue alarms are used)>]

--TEARDOWN--
1 change: 1 addition & 0 deletions test/functionalTest/cases/3658_env_vars/env_vars.test
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ Extended Usage: contextBroker [option '-U' (extended usage)]
[option '-ngsiv1Autocast' (automatic cast for number, booleans and dates in NGSI] ORION_NGSIV1_AUTOCAST FALSE /FALSE/
[option '-mqttMaxAge' <max time (in minutes) that an unused MQTT connection is k] ORION_MQTT_MAX_AGE 60 /60/
[option '-logDeprecate' (log deprecation usages as warnings)] ORION_LOG_DEPRECATE FALSE /FALSE/
[option '-notifAlarmThreshold' <maximum threshold for notification queue alarms,] ORION_NOTIF_ALARM_THRESHOLD 0 /0/



Expand Down
Loading
Loading