Skip to content

Commit 6549a46

Browse files
committed
for #120, adding reporting of the metric.
also messageRateMax implemented for consumer.
1 parent af865c9 commit 6549a46

File tree

3 files changed

+66
-10
lines changed

3 files changed

+66
-10
lines changed

sr_consume.c

+58-5
Original file line numberDiff line numberDiff line change
@@ -76,21 +76,22 @@ int sr_consume_cleanup(struct sr_context *sr_c)
7676
return (1);
7777
}
7878

79-
int sr_consume_setup(struct sr_context *sr_c)
79+
signed int sr_consume_queue_declare(struct sr_context *sr_c, amqp_boolean_t passive)
8080
/*
81-
declare a queue and bind it to the configured exchange.
81+
declare a queue it to the configured exchange.
82+
83+
passive means don't actually declare the queue, just pretend, used to get the message count
8284
8385
*/
8486
{
8587
amqp_rpc_reply_t reply;
86-
amqp_boolean_t passive = 0;
8788
amqp_boolean_t exclusive = 0;
8889
amqp_boolean_t auto_delete = 0;
8990
amqp_queue_declare_ok_t *r;
90-
struct sr_binding_s *t;
9191
static amqp_basic_properties_t props;
9292
static amqp_table_t table;
9393
static amqp_table_entry_t table_entries[2];
94+
signed int message_count;
9495

9596
int tecnt = 0;
9697

@@ -121,6 +122,7 @@ int sr_consume_setup(struct sr_context *sr_c)
121122
msg.user_headers = NULL;
122123

123124
//amqp_queue_declare_ok_t *r =
125+
message_count=-2;
124126
if (sr_c->cfg->queueDeclare) {
125127
r = amqp_queue_declare(sr_c->cfg->broker->conn,
126128
2,
@@ -131,13 +133,29 @@ int sr_consume_setup(struct sr_context *sr_c)
131133
if (r) {
132134
sr_log_msg(LOG_INFO, "queue declared: %p messages in queue: %d\n",
133135
sr_c->cfg->queuename, r->message_count );
136+
message_count = r->message_count;
137+
sr_c->metrics.brokerQueuedMessageCount = message_count;
134138
}
135139
reply = amqp_get_rpc_reply(sr_c->cfg->broker->conn);
136140
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
137141
sr_amqp_reply_print(reply, "queue declare failed");
138-
return (0);
142+
message_count = -1;
139143
}
140144
}
145+
return(message_count);
146+
}
147+
148+
int sr_consume_setup(struct sr_context *sr_c)
149+
{
150+
struct sr_binding_s *t;
151+
amqp_rpc_reply_t reply;
152+
int messageCount;
153+
154+
messageCount = sr_consume_queue_declare(sr_c, 0);
155+
156+
if (messageCount< 0 ) {
157+
return(0);
158+
}
141159

142160
/*
143161
FIXME: topic bindings are not working properly...
@@ -698,6 +716,8 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c)
698716
amqp_rpc_reply_t reply;
699717
amqp_frame_t frame;
700718
int result;
719+
static time_t next_qdeclare_time=0;
720+
static time_t now=0;
701721
static char buf[SR_SARRAC_MAXIMUM_MESSAGE_LEN];
702722
amqp_basic_deliver_t *d;
703723
amqp_basic_properties_t *p;
@@ -709,6 +729,31 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c)
709729
char tag[AMQP_MAX_SS];
710730
char value[AMQP_MAX_SS];
711731
struct sr_header_s *tmph;
732+
static time_t this_second = 0;
733+
static int consumed_this_second = 0;
734+
735+
736+
737+
if (now == 0) {
738+
time(&now);
739+
}
740+
741+
// rate limiting.
742+
sr_log_msg( LOG_INFO, "rateMax: %d, consumed_this_second: %d\n",
743+
sr_c->cfg->messageRateMax, consumed_this_second );
744+
if (sr_c->cfg->messageRateMax > 0) {
745+
if (consumed_this_second >= sr_c->cfg->messageRateMax) {
746+
sr_log_msg(LOG_INFO, "messageRateMax %d per second sleeping for a second.\n",
747+
sr_c->cfg->messageRateMax);
748+
sleep(1);
749+
time(&now);
750+
}
751+
if (now > this_second) {
752+
this_second = now;
753+
consumed_this_second = 0;
754+
}
755+
consumed_this_second++;
756+
}
712757

713758
while (msg.user_headers) {
714759
tmph = msg.user_headers;
@@ -717,6 +762,14 @@ struct sr_message_s *sr_consume(struct sr_context *sr_c)
717762
msg.user_headers = tmph->next;
718763
free(tmph);
719764
}
765+
time(&now);
766+
if (next_qdeclare_time == 0) {
767+
next_qdeclare_time=now+20;
768+
} else if ( now > next_qdeclare_time )
769+
{
770+
sr_consume_queue_declare(sr_c, 1);
771+
next_qdeclare_time += 20;
772+
}
720773

721774
/*
722775
basic_ack added as per michel's algorithm prior to consuming next.

sr_context.c

+7-5
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,9 @@ void sr_context_metrics_cumulative_write(struct sr_context *sr_c)
286286
datestamp[8] = c;
287287

288288
f = fopen( cumulativeFilename, "a+" );
289-
fprintf( f, "\"%s\": { \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f } }, \n" ,
290-
datestamp, sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount, sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping
289+
fprintf( f, "\"%s\": { \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f, \"brokerQueuedMessageCount\": %d } }, \n" ,
290+
datestamp, sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount,
291+
sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping, sr_c->metrics.brokerQueuedMessageCount
291292
);
292293
fclose(f);
293294

@@ -299,6 +300,7 @@ void sr_context_metrics_reset(struct sr_context *sr_c)
299300
struct timespec tnow;
300301

301302
sr_context_metrics_cumulative_write(sr_c);
303+
sr_c->metrics.brokerQueuedMessageCount = 0;
302304
sr_c->metrics.rxGoodCount = 0;
303305
sr_c->metrics.rxBadCount = 0;
304306
sr_c->metrics.rejectCount = 0;
@@ -457,9 +459,9 @@ void sr_context_metrics_write(struct sr_context *sr_c)
457459
FILE *f;
458460

459461
f = fopen( sr_c->cfg->metricsFilename, "w" );
460-
fprintf( f, "{ \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f } }\n" ,
461-
sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount, sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping
462-
);
462+
fprintf( f, "{ \"context\" : { \"rxGoodCount\": %d, \"rxBadCount\": %d, \"rejectCount\": %d, \"txGoodCount\": %d, \"last_housekeeping\": %f , \"brokerQueuedMessageCount\": %d } }\n" ,
463+
sr_c->metrics.rxGoodCount, sr_c->metrics.rxBadCount, sr_c->metrics.rejectCount,
464+
sr_c->metrics.txGoodCount, sr_c->metrics.last_housekeeping, sr_c->metrics.brokerQueuedMessageCount );
463465
fclose(f);
464466
}
465467

sr_context.h

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "sr_config.h"
4545

4646
struct sr_metrics_s {
47+
int brokerQueuedMessageCount;
4748
int rxGoodCount;
4849
int rxBadCount;
4950
int rejectCount;

0 commit comments

Comments
 (0)