Skip to content

Commit

Permalink
Remove public access to private encoder / decoder.
Browse files Browse the repository at this point in the history
Private encoder / decoder used for PDU encoding are now always varint as the fixed part of the PDU is handled directly through malbinary methods.
No longer change varint option on internal encoder / decoder (avoid a potential bug due to a lack of synchronization).
Fix termination handling.
  • Loading branch information
freyssin committed Jul 18, 2017
1 parent 1a6d321 commit 0b255c1
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions malzmq/src/malzmq_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,20 @@ struct _malzmq_ctx_t {
void *endpoints_socket; // inproc connected to endpoints
zloop_t *zloop;
malzmq_header_t *malzmq_header;
// Private encoder / decoder used for PDU, always varint as the fixed part of the PDU
// is handled directly through malbinary methods.
mal_encoder_t *encoder;
mal_decoder_t *decoder;
};

mal_encoder_t *malzmq_get_encoder(malzmq_ctx_t *self) {
return self->encoder;
void malzmq_ctx_set_encoder_log_level(malzmq_ctx_t *self, int level) {
if (self != NULL)
mal_encoder_set_log_level(self->encoder, level);
}

mal_decoder_t *malzmq_get_decoder(malzmq_ctx_t *self) {
return self->decoder;
void malzmq_ctx_set_decoder_log_level(malzmq_ctx_t *self, int level) {
if (self != NULL)
mal_decoder_set_log_level(self->decoder, level);
}

// --------------------------------------------------------------------------
Expand Down Expand Up @@ -240,6 +244,12 @@ int malzmq_ctx_mal_socket_handle(zloop_t *loop, zmq_pollitem_t *poller,
// zloop_fn interface for standard socket
int malzmq_ctx_mal_standard_socket_handle(zloop_t *loop, zmq_pollitem_t *poller, void *arg) {
malzmq_ctx_t *self = (malzmq_ctx_t *) arg;
if (self->mal_socket == NULL) {
// The context is closed, return
clog_debug(malzmq_logger, "malzmq_ctx_mal_standard_socket_handle: socket (%d) closed\n", poller->fd);
return -1;
}

zmsg_t *zmsg = zmsg_recv(self->mal_socket);
if (zmsg) {
clog_debug(malzmq_logger, "malzmq_ctx_mal_standard_socket_handle: received zmsg size = %d\n", zmsg_size(zmsg));
Expand All @@ -251,10 +261,16 @@ int malzmq_ctx_mal_standard_socket_handle(zloop_t *loop, zmq_pollitem_t *poller,
// zloop_fn interface for pubsub socket
int malzmq_ctx_mal_pubsub_socket_handle(zloop_t *loop, zmq_pollitem_t *poller, void *arg) {
malzmq_ctx_t *self = (malzmq_ctx_t *) arg;
if (self->mal_pubsub_socket == NULL) {
// The context is closed, return
clog_debug(malzmq_logger, "malzmq_ctx_mal_pubsub_socket_handle: socket (%d) closed\n", poller->fd);
return -1;
}

zmsg_t *zmsg = zmsg_recv(self->mal_pubsub_socket);
if (zmsg) {
clog_debug(malzmq_logger, "malzmq_ctx_mal_pubsub_socket_handle: received zmsg size = %d\n", zmsg_size(zmsg));
malzmq_ctx_mal_socket_handle(loop, poller, self, zmsg, true);
return malzmq_ctx_mal_socket_handle(loop, poller, self, zmsg, true);
}
return 0;
}
Expand All @@ -281,19 +297,12 @@ int malzmq_ctx_mal_socket_handle(zloop_t *loop, zmq_pollitem_t *poller,

clog_debug(malzmq_logger, "malzmq_ctx: frame size: %d\n", zframe_size(frame));

// Use Varint!
((mal_decoder_t *) self->decoder)->varint_supported = true;

mal_uri_t *uri_to;
if (malzmq_decode_uri_to(self->malzmq_header,
self->decoder, (char *) zframe_data(frame), zframe_size(frame), &uri_to) != 0) {
clog_error(malzmq_logger, "malzmq_ctx_mal_socket_handle, could not decode uri_to\n");
// Use Varint!
((mal_decoder_t *) self->decoder)->varint_supported = false;
return -1;
}
// Use Varint!
((mal_decoder_t *) self->decoder)->varint_supported = false;

clog_debug(malzmq_logger, "malzmq_ctx: zmsg decoded.\n");

Expand Down Expand Up @@ -342,8 +351,8 @@ malzmq_ctx_t *malzmq_ctx_new(mal_ctx_t *mal_ctx,
self->port = port;
self->malzmq_header = malzmq_header;

self->encoder = malbinary_encoder_new(false);
self->decoder = malbinary_decoder_new(false);
self->encoder = malbinary_encoder_new(true);
self->decoder = malbinary_decoder_new(true);

int mal_uri_len = strlen(hostname) + strlen(port) + 10;
mal_uri_t mal_uri[mal_uri_len + 1];
Expand All @@ -368,7 +377,7 @@ malzmq_ctx_t *malzmq_ctx_new(mal_ctx_t *mal_ctx,
assert(sub);
self->mal_pubsub_socket = sub;
zsocket_bind(self->mal_pubsub_socket, mcast_uri);
clog_debug(malzmq_logger, "malzmq_ctx: mcast bound to: %s\n", mcast_uri);
clog_debug(malzmq_logger, "malzmq_ctx: mcast bound to: %s / \"%s\"\n", mcast_uri, SUB_NAME);
zsocket_set_subscribe(self->mal_pubsub_socket, SUB_NAME);
} else {
self->mal_pubsub_socket = NULL;
Expand Down Expand Up @@ -418,8 +427,28 @@ int malzmq_ctx_start(void *self) {
}

int malzmq_ctx_stop(void *self) {
malzmq_ctx_t *mal_ctx = (malzmq_ctx_t *) self;

clog_debug(malzmq_logger, "malzmq_ctx: stop...\n");
zloop_destroy(&((malzmq_ctx_t *)self)->zloop);

if (mal_ctx->mal_socket != NULL) {
void* socket = mal_ctx->mal_socket;
mal_ctx->mal_socket = NULL;
zsocket_signal(socket);
zmq_close(socket);

clog_debug(malzmq_logger, "malzmq_ctx_stop: close socket.\n");
}

if (mal_ctx->mal_pubsub_socket != NULL) {
void* socket = mal_ctx->mal_pubsub_socket;
mal_ctx->mal_pubsub_socket = NULL;
zsocket_signal(socket);
zmq_close(socket);

clog_debug(malzmq_logger, "malzmq_ctx_stop: close pubsub.\n");
}

return 0;
}

Expand Down Expand Up @@ -561,10 +590,11 @@ int malzmq_ctx_send_message(void *self, mal_endpoint_t *mal_endpoint,
if (mcast_uri == NULL) {
// send one frame on send stage
rc = zframe_send(&frame, socket, 0);
assert(rc == 0);
} else {
// send two frames on publish stage
rc = zstr_sendm(socket, SUB_NAME);
clog_debug(malzmq_logger, "malzmq_ctx: send the SUB_NAME, rc = %d\n", rc);
clog_debug(malzmq_logger, "malzmq_ctx: send the SUB_NAME \"%s\", rc = %d\n", SUB_NAME, rc);
rc = zframe_send(&frame, socket, 0);
clog_debug(malzmq_logger, "malzmq_ctx: zframe_send the message, rc = %d\n", rc);
assert(rc == 0);
Expand Down

0 comments on commit 0b255c1

Please sign in to comment.