Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 130 additions & 24 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -371,20 +371,54 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu
return -1;
}

static int escape_stream_names(struct log_stream *stream,
char **group_name, size_t *group_name_size,
char **stream_name, size_t *stream_name_size)
{
int ret;

ret = flb_utils_write_str_buf(stream->group, strlen(stream->group),
group_name, group_name_size, FLB_FALSE);
if (ret < 0) {
return -1;
}

ret = flb_utils_write_str_buf(stream->name, strlen(stream->name),
stream_name, stream_name_size, FLB_FALSE);
if (ret < 0) {
flb_free(*group_name);
*group_name = NULL;
return -1;
}

return 0;
}

/*
* Writes the "header" for a put log events payload
*/
static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct log_stream *stream, int *offset)
int flb_cloudwatch_init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct log_stream *stream, int *offset)
{
char *group_name = NULL;
char *stream_name = NULL;
size_t group_name_size;
size_t stream_name_size;
int ret;

ret = escape_stream_names(stream, &group_name, &group_name_size,
&stream_name, &stream_name_size);
if (ret < 0) {
goto error;
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"{\"logGroupName\":\"", 17)) {
goto error;
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
stream->group, 0)) {
group_name, group_name_size)) {
goto error;
}

Expand All @@ -394,7 +428,7 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
stream->name, 0)) {
stream_name, stream_name_size)) {
goto error;
}

Expand Down Expand Up @@ -443,9 +477,13 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
goto error;
}

flb_free(group_name);
flb_free(stream_name);
return 0;

error:
flb_free(group_name);
flb_free(stream_name);
return -1;
}

Expand Down Expand Up @@ -754,13 +792,25 @@ int process_event(struct flb_cloudwatch *ctx, struct cw_flush *buf,

/* Resets or inits a cw_flush struct */
void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf) {
char *group_name = NULL;
char *stream_name = NULL;
size_t group_name_size = 0;
size_t stream_name_size = 0;
int ret;

buf->event_index = 0;
buf->tmp_buf_offset = 0;
buf->event_index = 0;
buf->data_size = PUT_LOG_EVENTS_HEADER_LEN + PUT_LOG_EVENTS_FOOTER_LEN;
if (buf->current_stream != NULL) {
buf->data_size += strlen(buf->current_stream->name);
buf->data_size += strlen(buf->current_stream->group);
ret = escape_stream_names(buf->current_stream, &group_name, &group_name_size,
&stream_name, &stream_name_size);
if (ret == 0) {
buf->data_size += group_name_size;
buf->data_size += stream_name_size;
flb_free(group_name);
flb_free(stream_name);
}
}
}

Expand All @@ -783,7 +833,7 @@ int send_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf) {
buf->current_stream->oldest_event = 0;

offset = 0;
ret = init_put_payload(ctx, buf, buf->current_stream, &offset);
ret = flb_cloudwatch_init_put_payload(ctx, buf, buf->current_stream, &offset);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to initialize PutLogEvents payload");
return -1;
Expand Down Expand Up @@ -1888,39 +1938,95 @@ int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream)
return -1;
}

flb_sds_t flb_cloudwatch_create_log_stream_body(struct log_stream *stream)
{
const char *group_prefix = "{\"logGroupName\":\"";
const char *stream_prefix = "\",\"logStreamName\":\"";
const char *suffix = "\"}";
flb_sds_t body = NULL;
flb_sds_t tmp;
char *group_name = NULL;
char *stream_name = NULL;
size_t group_name_size = 0;
size_t stream_name_size = 0;
int ret;

ret = escape_stream_names(stream, &group_name, &group_name_size,
&stream_name, &stream_name_size);
if (ret < 0) {
return NULL;
}

body = flb_sds_create_size(50 + group_name_size + stream_name_size);
if (!body) {
flb_errno();
goto error;
}

tmp = flb_sds_cat(body, group_prefix, strlen(group_prefix));
if (!tmp) {
flb_errno();
goto error;
}
body = tmp;

tmp = flb_sds_cat(body, group_name, group_name_size);
if (!tmp) {
flb_errno();
goto error;
}
body = tmp;

tmp = flb_sds_cat(body, stream_prefix, strlen(stream_prefix));
if (!tmp) {
flb_errno();
goto error;
}
body = tmp;

tmp = flb_sds_cat(body, stream_name, stream_name_size);
if (!tmp) {
flb_errno();
goto error;
}
body = tmp;

tmp = flb_sds_cat(body, suffix, strlen(suffix));
if (!tmp) {
flb_errno();
goto error;
}
body = tmp;

flb_free(group_name);
flb_free(stream_name);

return body;

error:
flb_free(group_name);
flb_free(stream_name);
flb_sds_destroy(body);
return NULL;
}

int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
int can_retry)
{

struct flb_http_client *c = NULL;
struct flb_aws_client *cw_client;
flb_sds_t body;
flb_sds_t tmp;
flb_sds_t error;
int ret;

flb_plg_info(ctx->ins, "Creating log stream %s in log group %s",
stream->name, stream->group);

body = flb_sds_create_size(50 + strlen(stream->group) +
strlen(stream->name));
body = flb_cloudwatch_create_log_stream_body(stream);
if (!body) {
flb_sds_destroy(body);
flb_errno();
return -1;
}

/* construct CreateLogStream request body */
tmp = flb_sds_printf(&body,
"{\"logGroupName\":\"%s\",\"logStreamName\":\"%s\"}",
stream->group,
stream->name);
if (!tmp) {
flb_sds_destroy(body);
flb_errno();
return -1;
}
body = tmp;

cw_client = ctx->cw_client;
if (plugin_under_test() == FLB_TRUE) {
Expand Down
4 changes: 4 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,16 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
const char *data, size_t bytes, int event_type,
struct flb_config *config);
int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, int can_retry);
flb_sds_t flb_cloudwatch_create_log_stream_body(struct log_stream *stream);
int flb_cloudwatch_init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct log_stream *stream, int *offset);
struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, flb_sds_t tag,
const msgpack_object map);
int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct log_stream *stream,
size_t payload_size);
int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream);
int compare_events(const void *a_arg, const void *b_arg);
void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf);

#endif
81 changes: 81 additions & 0 deletions tests/runtime/out_cloudwatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,85 @@ void flb_test_cloudwatch_error_put_retention_policy(void)
flb_destroy(ctx);
}

void flb_test_cloudwatch_create_stream_escapes_json(void)
{
struct log_stream stream;
flb_sds_t body;
char *expected;
int ret;

memset(&stream, 0, sizeof(struct log_stream));
stream.group = flb_sds_create("fluent");
stream.name = flb_sds_create("systemd-fsck@dev-disk-by\\x2dlabel-BOOT.service");
TEST_CHECK(stream.group != NULL);
TEST_CHECK(stream.name != NULL);

if (stream.group && stream.name) {
body = flb_cloudwatch_create_log_stream_body(&stream);
TEST_CHECK(body != NULL);

if (body) {
expected = "{\"logGroupName\":\"fluent\","
"\"logStreamName\":\"systemd-fsck@dev-disk-by\\\\x2dlabel-BOOT.service\"}";
ret = strcmp(body, expected);
TEST_CHECK(ret == 0);
flb_sds_destroy(body);
}
}

flb_sds_destroy(stream.group);
flb_sds_destroy(stream.name);
}

void flb_test_cloudwatch_put_events_escapes_stream_name(void)
{
struct flb_cloudwatch ctx;
struct log_stream stream;
struct cw_flush buf;
char out_buf[512];
char *expected_stream_name;
char *expected;
int offset;
int ret;

memset(&ctx, 0, sizeof(struct flb_cloudwatch));
memset(&stream, 0, sizeof(struct log_stream));
memset(&buf, 0, sizeof(struct cw_flush));

stream.group = flb_sds_create("fluent");
stream.name = flb_sds_create("systemd-fsck@dev-disk-by\\x2dlabel-BOOT.service");
TEST_CHECK(stream.group != NULL);
TEST_CHECK(stream.name != NULL);

if (stream.group && stream.name) {
offset = 0;
buf.out_buf = out_buf;
buf.out_buf_size = sizeof(out_buf);
buf.current_stream = &stream;

ret = flb_cloudwatch_init_put_payload(&ctx, &buf, &stream, &offset);
TEST_CHECK(ret == 0);

if (ret == 0) {
expected = "{\"logGroupName\":\"fluent\","
"\"logStreamName\":\"systemd-fsck@dev-disk-by\\\\x2dlabel-BOOT.service\","
"\"logEvents\":[";
TEST_CHECK(offset == strlen(expected));
TEST_CHECK(strncmp(out_buf, expected, offset) == 0);
}

expected_stream_name = "systemd-fsck@dev-disk-by\\\\x2dlabel-BOOT.service";
reset_flush_buf(&ctx, &buf);
TEST_CHECK(buf.data_size == PUT_LOG_EVENTS_HEADER_LEN +
PUT_LOG_EVENTS_FOOTER_LEN +
strlen("fluent") +
strlen(expected_stream_name));
}

flb_sds_destroy(stream.group);
flb_sds_destroy(stream.name);
}

/* Helper function to create a large JSON message of specified size */
static char* create_large_json_message(size_t target_size)
{
Expand Down Expand Up @@ -551,6 +630,8 @@ TEST_LIST = {
{"put_retention_policy_success", flb_test_cloudwatch_put_retention_policy_success },
{"already_exists_create_group_put_retention_policy", flb_test_cloudwatch_already_exists_create_group_put_retention_policy },
{"error_put_retention_policy", flb_test_cloudwatch_error_put_retention_policy },
{"create_stream_escapes_json", flb_test_cloudwatch_create_stream_escapes_json },
{"put_events_escapes_stream_name", flb_test_cloudwatch_put_events_escapes_stream_name },
{"event_size_at_limit", flb_test_cloudwatch_event_size_at_limit },
{"event_size_over_limit", flb_test_cloudwatch_event_size_over_limit },
{"event_truncation_with_backslash", flb_test_cloudwatch_event_truncation_with_backslash },
Expand Down
Loading