From 6bddcbdaee04b39bb0b107f7d28094e3db80fec7 Mon Sep 17 00:00:00 2001 From: Jesse Awan Date: Tue, 9 Dec 2025 09:03:31 +0100 Subject: [PATCH 1/3] fix(in_kubernetes_events): buffer incomplete JSON across HTTP chunk boundaries Fixes #11252 When HTTP chunked transfer encoding splits JSON event objects across chunk boundaries, implement buffering to handle incomplete data until a complete JSON message is received. This prevents 'bad formed JSON' errors and watch stream stalls. - Add chunk_buffer field to k8s_events struct - Modify process_http_chunk() to buffer incomplete JSON data - Process buffered data when stream closes - Add test with 400-byte chunks splitting events into 3 parts Signed-off-by: Jesse Awan --- .../in_kubernetes_events/kubernetes_events.c | 130 ++++++++++++++++-- .../in_kubernetes_events/kubernetes_events.h | 3 + .../kubernetes_events_conf.c | 7 + tests/runtime/in_kubernetes_events.c | 46 +++++++ 4 files changed, 176 insertions(+), 10 deletions(-) diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index 79f3b16e676..9caece85ff8 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -746,32 +746,107 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, size_t token_size = 0; char *token_start = 0; char *token_end = NULL; + char *search_start; + size_t remaining; + flb_sds_t working_buffer = NULL; + + /* + * Prepend any buffered incomplete data from previous chunks. + * HTTP chunked encoding can split JSON objects across chunk boundaries, + * so we need to buffer incomplete data until we find a complete JSON line. + */ + if (ctx->chunk_buffer != NULL) { + working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size); + if (!working_buffer) { + flb_plg_error(ctx->ins, "failed to concatenate chunk buffer"); + flb_sds_destroy(ctx->chunk_buffer); + ctx->chunk_buffer = NULL; + return -1; + } + /* + * flb_sds_cat modifies and returns the first argument, so working_buffer + * IS ctx->chunk_buffer (reallocated). Clear our reference to it. + */ + ctx->chunk_buffer = NULL; + token_start = working_buffer; + } + else { + token_start = c->resp.payload; + } - token_start = c->resp.payload; - token_end = strpbrk(token_start, JSON_ARRAY_DELIM); - while ( token_end != NULL && ret == 0 ) { + search_start = token_start; + token_end = strpbrk(search_start, JSON_ARRAY_DELIM); + + while (token_end != NULL && ret == 0) { token_size = token_end - token_start; + + /* Skip empty lines */ + if (token_size == 0) { + token_start = token_end + 1; + search_start = token_start; + token_end = strpbrk(search_start, JSON_ARRAY_DELIM); + continue; + } + ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed); if (ret == -1) { - flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s", - c->resp.payload); + flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON"); } else { - *bytes_consumed += token_size + 1; + /* + * For non-buffered data, track consumed bytes. + * For buffered data, we'll mark everything consumed after the loop. + */ + if (!working_buffer) { + *bytes_consumed += token_size + 1; + } ret = process_watched_event(ctx, buf_data, buf_size); } flb_free(buf_data); - if (buf_data) { - buf_data = NULL; + buf_data = NULL; + + token_start = token_end + 1; + search_start = token_start; + token_end = strpbrk(search_start, JSON_ARRAY_DELIM); + } + + /* + * Always consume all bytes from the current chunk since we've examined them all. + * Even if we buffer the data, we've still "consumed" it from the HTTP payload. + */ + *bytes_consumed = c->resp.payload_size; + + /* + * If there's remaining data without a newline delimiter, it means the JSON + * object is incomplete (split across chunk boundaries). Buffer it for next chunk. + */ + if (working_buffer) { + remaining = flb_sds_len(working_buffer) - (token_start - working_buffer); + } + else { + remaining = c->resp.payload_size - (token_start - c->resp.payload); + } + + if (remaining > 0 && ret == 0) { + ctx->chunk_buffer = flb_sds_create_len(token_start, remaining); + if (!ctx->chunk_buffer) { + flb_plg_error(ctx->ins, "failed to create chunk buffer"); + ret = -1; + } + else { + flb_plg_trace(ctx->ins, "buffering %zu bytes of incomplete JSON data", remaining); } - token_start = token_end+1; - token_end = strpbrk(token_start, JSON_ARRAY_DELIM); } + if (working_buffer) { + flb_sds_destroy(working_buffer); + } + if (buf_data) { flb_free(buf_data); } + return ret; } @@ -889,6 +964,13 @@ static int check_and_init_stream(struct k8s_events *ctx) flb_upstream_conn_release(ctx->current_connection); ctx->current_connection = NULL; } + + /* Clear any buffered incomplete data on failure */ + if (ctx->chunk_buffer) { + flb_sds_destroy(ctx->chunk_buffer); + ctx->chunk_buffer = NULL; + } + return FLB_FALSE; } @@ -927,6 +1009,28 @@ static int k8s_events_collect(struct flb_input_instance *ins, } else if (ret == FLB_HTTP_OK) { flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval."); + + /* + * If there's buffered data when stream closes, try to process it. + * This handles the case where the last chunk doesn't end with a newline. + */ + if (ctx->chunk_buffer && flb_sds_len(ctx->chunk_buffer) > 0) { + int buf_ret; + int root_type; + size_t consumed = 0; + char *buf_data = NULL; + size_t buf_size; + + buf_ret = flb_pack_json(ctx->chunk_buffer, flb_sds_len(ctx->chunk_buffer), + &buf_data, &buf_size, &root_type, &consumed); + if (buf_ret == 0) { + process_watched_event(ctx, buf_data, buf_size); + } + + if (buf_data) { + flb_free(buf_data); + } + } } else { flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s", @@ -938,6 +1042,12 @@ static int k8s_events_collect(struct flb_input_instance *ins, flb_upstream_conn_release(ctx->current_connection); ctx->streaming_client = NULL; ctx->current_connection = NULL; + + /* Clear any buffered incomplete data when stream closes */ + if (ctx->chunk_buffer) { + flb_sds_destroy(ctx->chunk_buffer); + ctx->chunk_buffer = NULL; + } } pthread_mutex_unlock(&ctx->lock); diff --git a/plugins/in_kubernetes_events/kubernetes_events.h b/plugins/in_kubernetes_events/kubernetes_events.h index 1309406349f..43b136e0807 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.h +++ b/plugins/in_kubernetes_events/kubernetes_events.h @@ -85,6 +85,9 @@ struct k8s_events { struct flb_connection *current_connection; struct flb_http_client *streaming_client; + /* Buffer for incomplete JSON data from chunked responses */ + flb_sds_t chunk_buffer; + /* limit for event queries */ int limit_request; /* last highest seen resource_version */ diff --git a/plugins/in_kubernetes_events/kubernetes_events_conf.c b/plugins/in_kubernetes_events/kubernetes_events_conf.c index e84872f70e7..067d84a265e 100644 --- a/plugins/in_kubernetes_events/kubernetes_events_conf.c +++ b/plugins/in_kubernetes_events/kubernetes_events_conf.c @@ -158,6 +158,9 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) pthread_mutexattr_init(&attr); pthread_mutex_init(&ctx->lock, &attr); + /* Initialize buffer for incomplete chunk data */ + ctx->chunk_buffer = NULL; + /* Load the config map */ ret = flb_input_config_map_set(ins, (void *) ctx); if (ret == -1) { @@ -289,6 +292,10 @@ void k8s_events_conf_destroy(struct k8s_events *ctx) flb_ra_destroy(ctx->ra_resource_version); } + if (ctx->chunk_buffer) { + flb_sds_destroy(ctx->chunk_buffer); + } + if(ctx->streaming_client) { flb_http_client_destroy(ctx->streaming_client); } diff --git a/tests/runtime/in_kubernetes_events.c b/tests/runtime/in_kubernetes_events.c index d17707057f2..784bf2b140b 100644 --- a/tests/runtime/in_kubernetes_events.c +++ b/tests/runtime/in_kubernetes_events.c @@ -444,10 +444,56 @@ void flb_test_events_with_chunkedrecv() test_ctx_destroy(ctx); } +/* Test with smaller chunks - splits single event into 3 chunks */ +void flb_test_events_with_3chunks() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int trys; + + int ret; + int num; + const char *filename = "eventlist_v1_with_lastTimestamp"; + const char *stream_filename = "watch_v1_with_lastTimestamp"; + + clear_output_num(); + + /* Use 400 byte chunks to split 1176-byte JSON into 3 chunks */ + struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api( + filename, stream_filename, 400 + ); + + cb_data.cb = cb_check_result_json; + cb_data.data = (void *)k8s_server; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* waiting to flush */ + for (trys = 0; trys < 5 && get_output_num() <= 1; trys++) { + flb_time_msleep(1000); + } + + num = get_output_num(); + if (!TEST_CHECK(num >= 2)) { + TEST_MSG("2 output records are expected found %d", num); + } + + mock_k8s_api_destroy(k8s_server); + test_ctx_destroy(ctx); +} + TEST_LIST = { {"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp}, {"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp}, //{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv}, + {"events_v1_with_3chunks", flb_test_events_with_3chunks}, {NULL, NULL} }; From a6bf771313fb8e11d894a095bdd63e36cb07b69f Mon Sep 17 00:00:00 2001 From: Jesse Awan Date: Tue, 9 Dec 2025 18:38:42 +0100 Subject: [PATCH 2/3] fix(in_kubernetes_events): add debug logging and fix buffering logic - Add comprehensive debug logging to trace chunk processing - Fix buffering logic to only buffer when parse fails (incomplete JSON) - Previously was buffering all data even on successful parse Signed-off-by: Jesse Awan --- .../in_kubernetes_events/kubernetes_events.c | 71 +++++++++++-------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index 9caece85ff8..63e90723194 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -756,6 +756,9 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, * so we need to buffer incomplete data until we find a complete JSON line. */ if (ctx->chunk_buffer != NULL) { + size_t buffer_len = flb_sds_len(ctx->chunk_buffer); + flb_plg_debug(ctx->ins, "prepending %zu bytes from chunk_buffer to %zu new bytes", + buffer_len, c->resp.payload_size); working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size); if (!working_buffer) { flb_plg_error(ctx->ins, "failed to concatenate chunk buffer"); @@ -771,6 +774,7 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, token_start = working_buffer; } else { + flb_plg_debug(ctx->ins, "processing %zu bytes from new chunk", c->resp.payload_size); token_start = c->resp.payload; } @@ -789,37 +793,32 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, } ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed); - if (ret == -1) { - flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON"); - } - else { - /* - * For non-buffered data, track consumed bytes. - * For buffered data, we'll mark everything consumed after the loop. - */ + if (ret == 0) { + /* Successfully parsed JSON */ + flb_plg_debug(ctx->ins, "successfully parsed JSON event (%zu bytes)", token_size); if (!working_buffer) { *bytes_consumed += token_size + 1; } ret = process_watched_event(ctx, buf_data, buf_size); + flb_free(buf_data); + buf_data = NULL; + + token_start = token_end + 1; + search_start = token_start; + token_end = strpbrk(search_start, JSON_ARRAY_DELIM); + } + else { + /* JSON parse failed - this line is incomplete, don't advance */ + flb_plg_debug(ctx->ins, "JSON parse failed for %zu bytes at offset %ld - will buffer", + token_size, token_start - (working_buffer ? working_buffer : c->resp.payload)); + break; } - - flb_free(buf_data); - buf_data = NULL; - - token_start = token_end + 1; - search_start = token_start; - token_end = strpbrk(search_start, JSON_ARRAY_DELIM); } /* - * Always consume all bytes from the current chunk since we've examined them all. - * Even if we buffer the data, we've still "consumed" it from the HTTP payload. - */ - *bytes_consumed = c->resp.payload_size; - - /* - * If there's remaining data without a newline delimiter, it means the JSON - * object is incomplete (split across chunk boundaries). Buffer it for next chunk. + * Calculate remaining unparsed data. + * If we broke out of the loop due to parse failure or no newline found, + * buffer the remaining data for the next chunk. */ if (working_buffer) { remaining = flb_sds_len(working_buffer) - (token_start - working_buffer); @@ -828,16 +827,32 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, remaining = c->resp.payload_size - (token_start - c->resp.payload); } - if (remaining > 0 && ret == 0) { + if (remaining > 0) { + /* We have unparsed data - buffer it for next chunk */ + flb_plg_debug(ctx->ins, "buffering %zu bytes of incomplete JSON data for next chunk", remaining); ctx->chunk_buffer = flb_sds_create_len(token_start, remaining); if (!ctx->chunk_buffer) { flb_plg_error(ctx->ins, "failed to create chunk buffer"); - ret = -1; - } - else { - flb_plg_trace(ctx->ins, "buffering %zu bytes of incomplete JSON data", remaining); + if (working_buffer) { + flb_sds_destroy(working_buffer); + } + if (buf_data) { + flb_free(buf_data); + } + return -1; } } + else { + flb_plg_debug(ctx->ins, "all data processed, no buffering needed"); + } + + /* + * Mark bytes consumed from the current HTTP chunk. + * If we used working_buffer, all original payload bytes are consumed. + */ + if (working_buffer) { + *bytes_consumed = c->resp.payload_size; + } if (working_buffer) { flb_sds_destroy(working_buffer); From 357d6761acceb3f86421656b81582cc87b194538 Mon Sep 17 00:00:00 2001 From: Jesse Awan Date: Tue, 9 Dec 2025 18:52:50 +0100 Subject: [PATCH 3/3] fix(in_kubernetes_events): always consume entire payload to avoid duplicates Critical fix: Always mark the entire HTTP payload as consumed, not just parsed lines. Otherwise the HTTP layer keeps the trailing fragment and we get duplicates when prepending chunk_buffer on the next call. This was causing 'bad formed JSON' errors in production because fragments were duplicated at chunk boundaries. Signed-off-by: Jesse Awan --- plugins/in_kubernetes_events/kubernetes_events.c | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index 63e90723194..c09981239d3 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -796,9 +796,6 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, if (ret == 0) { /* Successfully parsed JSON */ flb_plg_debug(ctx->ins, "successfully parsed JSON event (%zu bytes)", token_size); - if (!working_buffer) { - *bytes_consumed += token_size + 1; - } ret = process_watched_event(ctx, buf_data, buf_size); flb_free(buf_data); buf_data = NULL; @@ -847,12 +844,12 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c, } /* - * Mark bytes consumed from the current HTTP chunk. - * If we used working_buffer, all original payload bytes are consumed. + * At this point we've either parsed all complete lines and/or buffered + * any remaining tail into ctx->chunk_buffer, so we no longer need any + * bytes from this HTTP payload. Tell the HTTP client that the whole + * payload has been consumed to avoid duplicates. */ - if (working_buffer) { - *bytes_consumed = c->resp.payload_size; - } + *bytes_consumed = c->resp.payload_size; if (working_buffer) { flb_sds_destroy(working_buffer);