Skip to content

Commit af865c9

Browse files
committed
#120 add second channel, and use it for queue declarations
1 parent 797fed4 commit af865c9

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

sr_consume.c

+8-3
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ int sr_consume_setup(struct sr_context *sr_c)
8686
amqp_boolean_t passive = 0;
8787
amqp_boolean_t exclusive = 0;
8888
amqp_boolean_t auto_delete = 0;
89+
amqp_queue_declare_ok_t *r;
8990
struct sr_binding_s *t;
9091
static amqp_basic_properties_t props;
9192
static amqp_table_t table;
@@ -121,12 +122,16 @@ int sr_consume_setup(struct sr_context *sr_c)
121122

122123
//amqp_queue_declare_ok_t *r =
123124
if (sr_c->cfg->queueDeclare) {
124-
amqp_queue_declare(sr_c->cfg->broker->conn,
125-
1,
125+
r = amqp_queue_declare(sr_c->cfg->broker->conn,
126+
2,
126127
amqp_cstring_bytes(sr_c->cfg->queuename),
127128
passive, sr_c->cfg->durable, exclusive, auto_delete, table);
128129
/* FIXME how to parse r for error? */
129-
130+
131+
if (r) {
132+
sr_log_msg(LOG_INFO, "queue declared: %p messages in queue: %d\n",
133+
sr_c->cfg->queuename, r->message_count );
134+
}
130135
reply = amqp_get_rpc_reply(sr_c->cfg->broker->conn);
131136
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
132137
sr_amqp_reply_print(reply, "queue declare failed");

sr_context.c

+17-5
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,13 @@ struct sr_broker_s *sr_broker_connect(struct sr_broker_s *broker)
166166

167167
open_status = amqp_channel_open(broker->conn, 1);
168168
if (open_status == NULL) {
169-
sr_log_msg(LOG_ERROR, "failed AMQP amqp_channel_open\n");
169+
sr_log_msg(LOG_ERROR, "failed AMQP amqp_channel_open 1\n");
170+
goto have_channel;
171+
}
172+
173+
open_status = amqp_channel_open(broker->conn, 2);
174+
if (open_status == NULL) {
175+
sr_log_msg(LOG_ERROR, "failed AMQP amqp_channel_open 2\n");
170176
goto have_channel;
171177
}
172178

@@ -192,6 +198,7 @@ struct sr_broker_s *sr_broker_connect(struct sr_broker_s *broker)
192198

193199
have_channel:
194200
reply = amqp_channel_close(broker->conn, 1, AMQP_REPLY_SUCCESS);
201+
reply = amqp_channel_close(broker->conn, 2, AMQP_REPLY_SUCCESS);
195202

196203
have_socket:
197204
reply = amqp_connection_close(broker->conn, AMQP_REPLY_SUCCESS);
@@ -357,13 +364,18 @@ void sr_broker_close(struct sr_broker_s *broker)
357364
//sr_log_msg(LOG_DEBUG, "amqp broker close: no connection present.\n");
358365
return;
359366
}
360-
reply = amqp_channel_close(broker->conn, 1, AMQP_REPLY_SUCCESS);
367+
reply = amqp_channel_close(broker->conn, 2, AMQP_REPLY_SUCCESS);
361368
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
362-
sr_log_msg(LOG_ERROR, "amqp channel close failed.\n");
369+
sr_log_msg(LOG_ERROR, "amqp channel close 2 failed.\n");
363370
} else {
364-
reply = amqp_connection_close(broker->conn, AMQP_REPLY_SUCCESS);
371+
reply = amqp_channel_close(broker->conn, 1, AMQP_REPLY_SUCCESS);
365372
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
366-
sr_log_msg(LOG_ERROR, "amqp connection close failed.\n");
373+
sr_log_msg(LOG_ERROR, "amqp channel close 1 failed.\n");
374+
} else {
375+
reply = amqp_connection_close(broker->conn, AMQP_REPLY_SUCCESS);
376+
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
377+
sr_log_msg(LOG_ERROR, "amqp connection close failed.\n");
378+
}
367379
}
368380
}
369381

sr_context.h

-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ struct sr_context {
5757
const char *file;
5858
const char *post_baseUrl;
5959
amqp_socket_t *socket;
60-
amqp_connection_state_t conn;
6160
int port;
6261
struct sr_config_s *cfg;
6362
struct sr_metrics_s metrics;

0 commit comments

Comments
 (0)