Skip to content

Commit ccbc24d

Browse files
committed
Initial connection.blocked/unblocked support
Add encode/decode support for connection.blocked, connection.unblocked RabbitMQ AMQP extension
1 parent 27245a4 commit ccbc24d

File tree

3 files changed

+40
-1
lines changed

3 files changed

+40
-1
lines changed

librabbitmq/amqp_framing.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ char const *amqp_method_name(amqp_method_number_t methodNumber) {
105105
case AMQP_CONNECTION_OPEN_OK_METHOD: return "AMQP_CONNECTION_OPEN_OK_METHOD";
106106
case AMQP_CONNECTION_CLOSE_METHOD: return "AMQP_CONNECTION_CLOSE_METHOD";
107107
case AMQP_CONNECTION_CLOSE_OK_METHOD: return "AMQP_CONNECTION_CLOSE_OK_METHOD";
108+
case AMQP_CONNECTION_BLOCKED_METHOD: return "AMQP_CONNECTION_BLOCKED_METHOD";
109+
case AMQP_CONNECTION_UNBLOCKED_METHOD: return "AMQP_CONNECTION_UNBLOCKED_METHOD";
108110
case AMQP_CHANNEL_OPEN_METHOD: return "AMQP_CHANNEL_OPEN_METHOD";
109111
case AMQP_CHANNEL_OPEN_OK_METHOD: return "AMQP_CHANNEL_OPEN_OK_METHOD";
110112
case AMQP_CHANNEL_FLOW_METHOD: return "AMQP_CHANNEL_FLOW_METHOD";
@@ -326,6 +328,23 @@ int amqp_decode_method(amqp_method_number_t methodNumber,
326328
*decoded = m;
327329
return 0;
328330
}
331+
case AMQP_CONNECTION_BLOCKED_METHOD: {
332+
amqp_connection_blocked_t *m = (amqp_connection_blocked_t *) amqp_pool_alloc(pool, sizeof(amqp_connection_blocked_t));
333+
if (m == NULL) { return AMQP_STATUS_NO_MEMORY; }
334+
{
335+
uint8_t len;
336+
if (!amqp_decode_8(encoded, &offset, &len)
337+
|| !amqp_decode_bytes(encoded, &offset, &m->reason, len))
338+
return AMQP_STATUS_BAD_AMQP_DATA;
339+
}
340+
*decoded = m;
341+
return 0;
342+
}
343+
case AMQP_CONNECTION_UNBLOCKED_METHOD: {
344+
amqp_connection_unblocked_t *m = NULL; /* no fields */
345+
*decoded = m;
346+
return 0;
347+
}
329348
case AMQP_CHANNEL_OPEN_METHOD: {
330349
amqp_channel_open_t *m = (amqp_channel_open_t *) amqp_pool_alloc(pool, sizeof(amqp_channel_open_t));
331350
if (m == NULL) { return AMQP_STATUS_NO_MEMORY; }
@@ -1267,6 +1286,16 @@ int amqp_encode_method(amqp_method_number_t methodNumber,
12671286
case AMQP_CONNECTION_CLOSE_OK_METHOD: {
12681287
return offset;
12691288
}
1289+
case AMQP_CONNECTION_BLOCKED_METHOD: {
1290+
amqp_connection_blocked_t *m = (amqp_connection_blocked_t *) decoded;
1291+
if (!amqp_encode_8(encoded, &offset, m->reason.len)
1292+
|| !amqp_encode_bytes(encoded, &offset, m->reason))
1293+
return AMQP_STATUS_BAD_AMQP_DATA;
1294+
return offset;
1295+
}
1296+
case AMQP_CONNECTION_UNBLOCKED_METHOD: {
1297+
return offset;
1298+
}
12701299
case AMQP_CHANNEL_OPEN_METHOD: {
12711300
amqp_channel_open_t *m = (amqp_channel_open_t *) decoded;
12721301
if (!amqp_encode_8(encoded, &offset, m->out_of_band.len)

librabbitmq/amqp_framing.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,16 @@ typedef struct amqp_connection_close_ok_t_ {
183183
char dummy; /* Dummy field to avoid empty struct */
184184
} amqp_connection_close_ok_t;
185185

186+
#define AMQP_CONNECTION_BLOCKED_METHOD ((amqp_method_number_t) 0x000A003C) /* 10, 60; 655420 */
187+
typedef struct amqp_connection_blocked_t_ {
188+
amqp_bytes_t reason;
189+
} amqp_connection_blocked_t;
190+
191+
#define AMQP_CONNECTION_UNBLOCKED_METHOD ((amqp_method_number_t) 0x000A003D) /* 10, 61; 655421 */
192+
typedef struct amqp_connection_unblocked_t_ {
193+
char dummy; /* Dummy field to avoid empty struct */
194+
} amqp_connection_unblocked_t;
195+
186196
#define AMQP_CHANNEL_OPEN_METHOD ((amqp_method_number_t) 0x0014000A) /* 20, 10; 1310730 */
187197
typedef struct amqp_channel_open_t_ {
188198
amqp_bytes_t out_of_band;

0 commit comments

Comments
 (0)