From e223ca86f11f70f0d7b16b850b7ea57588942ecb Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 10 Jun 2026 12:07:15 +0900 Subject: [PATCH 1/2] out_cloudwatch_logs: Ensure escaping for special characters Signed-off-by: Hiroshi Hatake --- plugins/out_cloudwatch_logs/cloudwatch_api.c | 154 ++++++++++++++++--- plugins/out_cloudwatch_logs/cloudwatch_api.h | 4 + 2 files changed, 134 insertions(+), 24 deletions(-) diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index 9f79fe2208f..16c913ca567 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -371,20 +371,54 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu return -1; } +static int escape_stream_names(struct log_stream *stream, + char **group_name, size_t *group_name_size, + char **stream_name, size_t *stream_name_size) +{ + int ret; + + ret = flb_utils_write_str_buf(stream->group, strlen(stream->group), + group_name, group_name_size, FLB_FALSE); + if (ret < 0) { + return -1; + } + + ret = flb_utils_write_str_buf(stream->name, strlen(stream->name), + stream_name, stream_name_size, FLB_FALSE); + if (ret < 0) { + flb_free(*group_name); + *group_name = NULL; + return -1; + } + + return 0; +} + /* * Writes the "header" for a put log events payload */ -static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, - struct log_stream *stream, int *offset) +int flb_cloudwatch_init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, + struct log_stream *stream, int *offset) { + char *group_name = NULL; + char *stream_name = NULL; + size_t group_name_size; + size_t stream_name_size; int ret; + + ret = escape_stream_names(stream, &group_name, &group_name_size, + &stream_name, &stream_name_size); + if (ret < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "{\"logGroupName\":\"", 17)) { goto error; } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - stream->group, 0)) { + group_name, group_name_size)) { goto error; } @@ -394,7 +428,7 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - stream->name, 0)) { + stream_name, stream_name_size)) { goto error; } @@ -443,9 +477,13 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, goto error; } + flb_free(group_name); + flb_free(stream_name); return 0; error: + flb_free(group_name); + flb_free(stream_name); return -1; } @@ -754,13 +792,25 @@ int process_event(struct flb_cloudwatch *ctx, struct cw_flush *buf, /* Resets or inits a cw_flush struct */ void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf) { + char *group_name = NULL; + char *stream_name = NULL; + size_t group_name_size = 0; + size_t stream_name_size = 0; + int ret; + buf->event_index = 0; buf->tmp_buf_offset = 0; buf->event_index = 0; buf->data_size = PUT_LOG_EVENTS_HEADER_LEN + PUT_LOG_EVENTS_FOOTER_LEN; if (buf->current_stream != NULL) { - buf->data_size += strlen(buf->current_stream->name); - buf->data_size += strlen(buf->current_stream->group); + ret = escape_stream_names(buf->current_stream, &group_name, &group_name_size, + &stream_name, &stream_name_size); + if (ret == 0) { + buf->data_size += group_name_size; + buf->data_size += stream_name_size; + flb_free(group_name); + flb_free(stream_name); + } } } @@ -783,7 +833,7 @@ int send_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf) { buf->current_stream->oldest_event = 0; offset = 0; - ret = init_put_payload(ctx, buf, buf->current_stream, &offset); + ret = flb_cloudwatch_init_put_payload(ctx, buf, buf->current_stream, &offset); if (ret < 0) { flb_plg_error(ctx->ins, "Failed to initialize PutLogEvents payload"); return -1; @@ -1888,6 +1938,78 @@ int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream) return -1; } +flb_sds_t flb_cloudwatch_create_log_stream_body(struct log_stream *stream) +{ + const char *group_prefix = "{\"logGroupName\":\""; + const char *stream_prefix = "\",\"logStreamName\":\""; + const char *suffix = "\"}"; + flb_sds_t body = NULL; + flb_sds_t tmp; + char *group_name = NULL; + char *stream_name = NULL; + size_t group_name_size = 0; + size_t stream_name_size = 0; + int ret; + + ret = escape_stream_names(stream, &group_name, &group_name_size, + &stream_name, &stream_name_size); + if (ret < 0) { + return NULL; + } + + body = flb_sds_create_size(50 + group_name_size + stream_name_size); + if (!body) { + flb_errno(); + goto error; + } + + tmp = flb_sds_cat(body, group_prefix, strlen(group_prefix)); + if (!tmp) { + flb_errno(); + goto error; + } + body = tmp; + + tmp = flb_sds_cat(body, group_name, group_name_size); + if (!tmp) { + flb_errno(); + goto error; + } + body = tmp; + + tmp = flb_sds_cat(body, stream_prefix, strlen(stream_prefix)); + if (!tmp) { + flb_errno(); + goto error; + } + body = tmp; + + tmp = flb_sds_cat(body, stream_name, stream_name_size); + if (!tmp) { + flb_errno(); + goto error; + } + body = tmp; + + tmp = flb_sds_cat(body, suffix, strlen(suffix)); + if (!tmp) { + flb_errno(); + goto error; + } + body = tmp; + + flb_free(group_name); + flb_free(stream_name); + + return body; + +error: + flb_free(group_name); + flb_free(stream_name); + flb_sds_destroy(body); + return NULL; +} + int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, int can_retry) { @@ -1895,32 +2017,16 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, struct flb_http_client *c = NULL; struct flb_aws_client *cw_client; flb_sds_t body; - flb_sds_t tmp; flb_sds_t error; int ret; flb_plg_info(ctx->ins, "Creating log stream %s in log group %s", stream->name, stream->group); - body = flb_sds_create_size(50 + strlen(stream->group) + - strlen(stream->name)); + body = flb_cloudwatch_create_log_stream_body(stream); if (!body) { - flb_sds_destroy(body); - flb_errno(); - return -1; - } - - /* construct CreateLogStream request body */ - tmp = flb_sds_printf(&body, - "{\"logGroupName\":\"%s\",\"logStreamName\":\"%s\"}", - stream->group, - stream->name); - if (!tmp) { - flb_sds_destroy(body); - flb_errno(); return -1; } - body = tmp; cw_client = ctx->cw_client; if (plugin_under_test() == FLB_TRUE) { diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.h b/plugins/out_cloudwatch_logs/cloudwatch_api.h index dff2dbf2599..021f2cc8625 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.h @@ -69,6 +69,9 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, const char *data, size_t bytes, int event_type, struct flb_config *config); int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, int can_retry); +flb_sds_t flb_cloudwatch_create_log_stream_body(struct log_stream *stream); +int flb_cloudwatch_init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, + struct log_stream *stream, int *offset); struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, flb_sds_t tag, const msgpack_object map); int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, @@ -76,5 +79,6 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, size_t payload_size); int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream); int compare_events(const void *a_arg, const void *b_arg); +void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf); #endif From 2e1ac0c7d2fe9ebcf2e970f1ea194874be2878f7 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 10 Jun 2026 12:07:54 +0900 Subject: [PATCH 2/2] tests: runtime: Add test cases for confirming escaped backslash Signed-off-by: Hiroshi Hatake --- tests/runtime/out_cloudwatch.c | 81 ++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tests/runtime/out_cloudwatch.c b/tests/runtime/out_cloudwatch.c index 41fc4fa9336..f5b215c306b 100644 --- a/tests/runtime/out_cloudwatch.c +++ b/tests/runtime/out_cloudwatch.c @@ -400,6 +400,85 @@ void flb_test_cloudwatch_error_put_retention_policy(void) flb_destroy(ctx); } +void flb_test_cloudwatch_create_stream_escapes_json(void) +{ + struct log_stream stream; + flb_sds_t body; + char *expected; + int ret; + + memset(&stream, 0, sizeof(struct log_stream)); + stream.group = flb_sds_create("fluent"); + stream.name = flb_sds_create("systemd-fsck@dev-disk-by\\x2dlabel-BOOT.service"); + TEST_CHECK(stream.group != NULL); + TEST_CHECK(stream.name != NULL); + + if (stream.group && stream.name) { + body = flb_cloudwatch_create_log_stream_body(&stream); + TEST_CHECK(body != NULL); + + if (body) { + expected = "{\"logGroupName\":\"fluent\"," + "\"logStreamName\":\"systemd-fsck@dev-disk-by\\\\x2dlabel-BOOT.service\"}"; + ret = strcmp(body, expected); + TEST_CHECK(ret == 0); + flb_sds_destroy(body); + } + } + + flb_sds_destroy(stream.group); + flb_sds_destroy(stream.name); +} + +void flb_test_cloudwatch_put_events_escapes_stream_name(void) +{ + struct flb_cloudwatch ctx; + struct log_stream stream; + struct cw_flush buf; + char out_buf[512]; + char *expected_stream_name; + char *expected; + int offset; + int ret; + + memset(&ctx, 0, sizeof(struct flb_cloudwatch)); + memset(&stream, 0, sizeof(struct log_stream)); + memset(&buf, 0, sizeof(struct cw_flush)); + + stream.group = flb_sds_create("fluent"); + stream.name = flb_sds_create("systemd-fsck@dev-disk-by\\x2dlabel-BOOT.service"); + TEST_CHECK(stream.group != NULL); + TEST_CHECK(stream.name != NULL); + + if (stream.group && stream.name) { + offset = 0; + buf.out_buf = out_buf; + buf.out_buf_size = sizeof(out_buf); + buf.current_stream = &stream; + + ret = flb_cloudwatch_init_put_payload(&ctx, &buf, &stream, &offset); + TEST_CHECK(ret == 0); + + if (ret == 0) { + expected = "{\"logGroupName\":\"fluent\"," + "\"logStreamName\":\"systemd-fsck@dev-disk-by\\\\x2dlabel-BOOT.service\"," + "\"logEvents\":["; + TEST_CHECK(offset == strlen(expected)); + TEST_CHECK(strncmp(out_buf, expected, offset) == 0); + } + + expected_stream_name = "systemd-fsck@dev-disk-by\\\\x2dlabel-BOOT.service"; + reset_flush_buf(&ctx, &buf); + TEST_CHECK(buf.data_size == PUT_LOG_EVENTS_HEADER_LEN + + PUT_LOG_EVENTS_FOOTER_LEN + + strlen("fluent") + + strlen(expected_stream_name)); + } + + flb_sds_destroy(stream.group); + flb_sds_destroy(stream.name); +} + /* Helper function to create a large JSON message of specified size */ static char* create_large_json_message(size_t target_size) { @@ -551,6 +630,8 @@ TEST_LIST = { {"put_retention_policy_success", flb_test_cloudwatch_put_retention_policy_success }, {"already_exists_create_group_put_retention_policy", flb_test_cloudwatch_already_exists_create_group_put_retention_policy }, {"error_put_retention_policy", flb_test_cloudwatch_error_put_retention_policy }, + {"create_stream_escapes_json", flb_test_cloudwatch_create_stream_escapes_json }, + {"put_events_escapes_stream_name", flb_test_cloudwatch_put_events_escapes_stream_name }, {"event_size_at_limit", flb_test_cloudwatch_event_size_at_limit }, {"event_size_over_limit", flb_test_cloudwatch_event_size_over_limit }, {"event_truncation_with_backslash", flb_test_cloudwatch_event_truncation_with_backslash },