diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 041460992c5..c32536317c1 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -143,6 +143,7 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) flb_sds_len(ctx->o->access_token) + 2); if (!output) { flb_plg_error(ctx->ins, "error creating token buffer"); + pthread_mutex_unlock(&ctx->token_mutex); return NULL; } flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type, @@ -409,6 +410,7 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con struct azure_kusto_file *chunk; struct mk_list *tmp; struct mk_list *head; + struct mk_list *f_tmp; struct mk_list *f_head; struct flb_fstore_file *fsf; struct flb_fstore_stream *fs_stream; @@ -421,13 +423,16 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con int is_compressed = FLB_FALSE; flb_sds_t tag_sds; + /* Lock to protect list iteration from concurrent modifications */ + pthread_mutex_lock(&ctx->files_mutex); + mk_list_foreach_safe(head, tmp, &ctx->fs->streams) { fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); if (fs_stream == ctx->stream_upload) { continue; } - mk_list_foreach_safe(f_head, tmp, &fs_stream->files) { + mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) { fsf = mk_list_entry(f_head, struct flb_fstore_file, _head); chunk = fsf->data; @@ -450,12 +455,19 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con continue; } + /* Mark chunk as locked before releasing mutex for I/O */ + azure_kusto_store_file_lock(chunk); + pthread_mutex_unlock(&ctx->files_mutex); + ret = construct_request_buffer(ctx, NULL, chunk, &buffer, &buffer_size); if (ret < 0) { flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: Could not construct request buffer for %s", chunk->file_path); + pthread_mutex_lock(&ctx->files_mutex); + azure_kusto_store_file_unlock(chunk); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } @@ -472,6 +484,9 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con "ingest_all_old_buffer_files :: cannot gzip payload"); flb_sds_destroy(payload); flb_sds_destroy(tag_sds); + pthread_mutex_lock(&ctx->files_mutex); + azure_kusto_store_file_unlock(chunk); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } else { @@ -487,6 +502,14 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con ret = azure_kusto_load_ingestion_resources(ctx, config); if (ret != 0) { flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: cannot load ingestion resources"); + flb_sds_destroy(payload); + flb_sds_destroy(tag_sds); + if (is_compressed) { + flb_free(final_payload); + } + pthread_mutex_lock(&ctx->files_mutex); + azure_kusto_store_file_unlock(chunk); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } @@ -494,10 +517,12 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con ret = azure_kusto_queued_ingestion(ctx, tag_sds, flb_sds_len(tag_sds), final_payload, final_payload_size, chunk); if (ret != 0) { flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: Failed to ingest data to Azure Kusto"); + pthread_mutex_lock(&ctx->files_mutex); if (chunk){ azure_kusto_store_file_unlock(chunk); chunk->failures += 1; } + pthread_mutex_unlock(&ctx->files_mutex); flb_sds_destroy(tag_sds); flb_sds_destroy(payload); if (is_compressed) { @@ -512,11 +537,25 @@ static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *con flb_free(final_payload); } - /* data was sent successfully- delete the local buffer */ - azure_kusto_store_file_cleanup(ctx, chunk); + /* data was sent successfully- cleanup the local buffer */ + if (ctx->buffer_file_delete_early == FLB_TRUE) { + /* File was already unlocked and deleted in azure_kusto_queued_ingestion */ + flb_plg_debug(ctx->ins, "ingest_all_chunks: buffer file already deleted (early delete mode)"); + } + else { + /* Unlock and delete the local buffer */ + azure_kusto_store_file_unlock(chunk); + azure_kusto_store_file_cleanup(ctx, chunk); + } + + /* Re-acquire lock before next iteration */ + pthread_mutex_lock(&ctx->files_mutex); } } + /* Release lock after iteration completes */ + pthread_mutex_unlock(&ctx->files_mutex); + return 0; } @@ -573,6 +612,9 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) flb_plg_debug(ctx->ins, "Running upload timer callback (scheduler_kusto_ingest).."); now = time(NULL); + /* Lock to protect list iteration from concurrent modifications */ + pthread_mutex_lock(&ctx->files_mutex); + /* Iterate over all files in the active stream */ mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { fsf = mk_list_entry(head, struct flb_fstore_file, _head); @@ -591,6 +633,12 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) continue; } + /* Mark file as locked before releasing mutex to prevent concurrent access */ + azure_kusto_store_file_lock(file); + + /* Release lock before I/O operations */ + pthread_mutex_unlock(&ctx->files_mutex); + retry_count = 0; backoff_time = 2; /* Initial backoff time in seconds */ @@ -703,6 +751,8 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) } /* Delete the file after successful ingestion */ + /* Unlock file before delete so the delete helper can proceed */ + azure_kusto_store_file_unlock(file); ret = azure_kusto_store_file_delete(ctx, file); if (ret == 0) { flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: deleted successfully ingested file"); @@ -730,6 +780,8 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) /* If the maximum number of retries is reached, log an error and move to the next file */ if (retry_count >= ctx->scheduler_max_retries) { flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: Max retries reached for file %s", file->fsf->name); + /* Unlock file before delete/inactive so the helper can proceed */ + azure_kusto_store_file_unlock(file); if (ctx->delete_on_max_upload_error){ azure_kusto_store_file_delete(ctx, file); } @@ -737,7 +789,14 @@ static void cb_azure_kusto_ingest(struct flb_config *config, void *data) azure_kusto_store_file_inactive(ctx, file); } } + + /* Re-acquire lock before next iteration */ + pthread_mutex_lock(&ctx->files_mutex); } + + /* Release lock after iteration completes */ + pthread_mutex_unlock(&ctx->files_mutex); + /* Log the end of the upload timer callback */ flb_plg_debug(ctx->ins, "Exited upload timer callback (cb_azure_kusto_ingest).."); } @@ -911,6 +970,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi } ctx->timer_created = FLB_FALSE; + ctx->upload_timer = NULL; ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000; flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size); } @@ -928,6 +988,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi pthread_mutex_init(&ctx->token_mutex, NULL); pthread_mutex_init(&ctx->resources_mutex, NULL); pthread_mutex_init(&ctx->blob_mutex, NULL); + pthread_mutex_init(&ctx->files_mutex, NULL); /* * Create upstream context for Kusto Ingestion endpoint @@ -1196,7 +1257,7 @@ static void flush_init(void *out_context, struct flb_config *config) sched = flb_sched_ctx_get(); ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, - ctx->timer_ms, cb_azure_kusto_ingest, ctx, NULL); + ctx->timer_ms, cb_azure_kusto_ingest, ctx, &ctx->upload_timer); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to create upload timer"); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -1275,6 +1336,8 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, if (upload_file != NULL && upload_file->failures >= ctx->scheduler_max_retries) { flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not " "retry", event_chunk->tag, ctx->scheduler_max_retries); + /* Unlock file before delete/inactive since those skip locked files */ + azure_kusto_store_file_unlock(upload_file); if (ctx->delete_on_max_upload_error){ azure_kusto_store_file_delete(ctx, upload_file); } @@ -1318,16 +1381,18 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, if (ret == 0){ if (ctx->buffering_enabled == FLB_TRUE && ctx->buffer_file_delete_early == FLB_TRUE){ flb_plg_debug(ctx->ins, "buffer file already deleted after blob creation"); + /* File was already unlocked and deleted in ingest_to_kusto, do not access upload_file */ ret = FLB_OK; goto cleanup; } else{ + /* Unlock file before delete since delete skips locked files */ + azure_kusto_store_file_unlock(upload_file); ret = azure_kusto_store_file_delete(ctx, upload_file); if (ret != 0){ /* File couldn't be deleted */ ret = FLB_RETRY; if (upload_file){ - azure_kusto_store_file_unlock(upload_file); upload_file->failures += 1; } goto error; @@ -1488,6 +1553,12 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) return -1; } + /* Cancel upload timer first to prevent race with concurrent ingest callback */ + if (ctx->upload_timer != NULL) { + flb_sched_timer_cb_destroy(ctx->upload_timer); + ctx->upload_timer = NULL; + } + if (ctx->buffering_enabled == FLB_TRUE){ if (azure_kusto_store_has_data(ctx) == FLB_TRUE) { @@ -1508,6 +1579,7 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) pthread_mutex_destroy(&ctx->resources_mutex); pthread_mutex_destroy(&ctx->token_mutex); pthread_mutex_destroy(&ctx->blob_mutex); + pthread_mutex_destroy(&ctx->files_mutex); flb_azure_kusto_conf_destroy(ctx); diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 362b1379533..0a555a9a2c9 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -127,6 +127,7 @@ struct flb_azure_kusto { int timer_created; int timer_ms; + struct flb_sched_timer *upload_timer; /* timer handle for cancellation on exit */ /* mutex for acquiring oauth tokens */ pthread_mutex_t token_mutex; @@ -141,6 +142,9 @@ struct flb_azure_kusto { pthread_mutex_t buffer_mutex; + /* mutex protecting stream_active->files list from concurrent timer/delete races */ + pthread_mutex_t files_mutex; + int buffering_enabled; size_t file_size; diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 38d2aa076e5..2dfc2771758 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -632,18 +632,23 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, blob_uri = azure_kusto_create_blob(ctx, blob_id, payload, payload_size); if (blob_uri) { - if (ctx->buffering_enabled == FLB_TRUE && upload_file != NULL && ctx->buffer_file_delete_early == FLB_TRUE) { - flb_plg_debug(ctx->ins, "buffering enabled, ingest to blob successfully done and now deleting the buffer file %s", blob_id); - if (azure_kusto_store_file_delete(ctx, upload_file) != 0) { - flb_plg_error(ctx->ins, "blob creation successful but error deleting buffer file %s", blob_id); - } - } ret = azure_kusto_enqueue_ingestion(ctx, blob_uri, payload_size); if (ret != 0) { flb_plg_error(ctx->ins, "failed to enqueue ingestion blob to queue"); ret = -1; } + else { + /* Only delete file after successful queue - preserves data for retry on queue failure */ + if (ctx->buffering_enabled == FLB_TRUE && upload_file != NULL && ctx->buffer_file_delete_early == FLB_TRUE) { + flb_plg_debug(ctx->ins, "queue succeeded, deleting buffer file %s", blob_id); + /* Unlock file before delete since delete skips locked files */ + azure_kusto_store_file_unlock(upload_file); + if (azure_kusto_store_file_delete(ctx, upload_file) != 0) { + flb_plg_error(ctx->ins, "queue successful but error deleting buffer file %s", blob_id); + } + } + } flb_sds_destroy(blob_uri); } diff --git a/plugins/out_azure_kusto/azure_kusto_store.c b/plugins/out_azure_kusto/azure_kusto_store.c index a1059edd651..5d27cceff25 100644 --- a/plugins/out_azure_kusto/azure_kusto_store.c +++ b/plugins/out_azure_kusto/azure_kusto_store.c @@ -140,6 +140,9 @@ struct azure_kusto_file *azure_kusto_store_file_get(struct flb_azure_kusto *ctx, * Based in the current ctx->stream_name, locate a candidate file to * store the incoming data using as a lookup pattern the content Tag. */ + /* Lock to protect list iteration from concurrent modifications */ + pthread_mutex_lock(&ctx->files_mutex); + mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) { fsf = mk_list_entry(head, struct flb_fstore_file, _head); @@ -147,6 +150,7 @@ struct azure_kusto_file *azure_kusto_store_file_get(struct flb_azure_kusto *ctx, if (fsf->data == NULL) { flb_plg_warn(ctx->ins, "BAD: found flb_fstore_file with NULL data reference, tag=%s, file=%s, will try to delete", tag, fsf->name); flb_fstore_file_delete(ctx->fs, fsf); + continue; } if (fsf->meta_size != tag_len) { @@ -171,6 +175,8 @@ struct azure_kusto_file *azure_kusto_store_file_get(struct flb_azure_kusto *ctx, } } + pthread_mutex_unlock(&ctx->files_mutex); + if (!found) { return NULL; } @@ -239,9 +245,13 @@ int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto flb_plg_debug(ctx->ins, "[azure_kusto] new buffer file: %s", name); + /* Lock to protect list modification and fsf->data assignment */ + pthread_mutex_lock(&ctx->files_mutex); + /* Create the file */ fsf = flb_fstore_file_create(ctx->fs, ctx->stream_active, name, bytes); if (!fsf) { + pthread_mutex_unlock(&ctx->files_mutex); flb_plg_error(ctx->ins, "could not create the file '%s' in the store", name); flb_sds_destroy(name); @@ -253,6 +263,7 @@ int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto if (ret == -1) { flb_plg_warn(ctx->ins, "Deleting buffer file because metadata could not be written"); flb_fstore_file_delete(ctx->fs, fsf); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } @@ -262,6 +273,7 @@ int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto flb_errno(); flb_plg_warn(ctx->ins, "Deleting buffer file because azure_kusto context creation failed"); flb_fstore_file_delete(ctx->fs, fsf); + pthread_mutex_unlock(&ctx->files_mutex); return -1; } azure_kusto_file->fsf = fsf; @@ -270,6 +282,9 @@ int azure_kusto_store_buffer_put(struct flb_azure_kusto *ctx, struct azure_kusto /* Use fstore opaque 'data' reference to keep our context */ fsf->data = azure_kusto_file; + + pthread_mutex_unlock(&ctx->files_mutex); + flb_sds_destroy(name); } @@ -510,6 +525,9 @@ int azure_kusto_store_exit(struct flb_azure_kusto *ctx) return 0; } + /* Lock to protect list access during cleanup */ + pthread_mutex_lock(&ctx->files_mutex); + /* release local context on non-multi upload files */ mk_list_foreach(head, &ctx->fs->streams) { fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head); @@ -522,10 +540,13 @@ int azure_kusto_store_exit(struct flb_azure_kusto *ctx) if (fsf->data != NULL) { azure_kusto_file = fsf->data; flb_free(azure_kusto_file); + fsf->data = NULL; /* Clear pointer to prevent use-after-free */ } } } + pthread_mutex_unlock(&ctx->files_mutex); + if (ctx->fs) { flb_fstore_destroy(ctx->fs); } @@ -639,11 +660,22 @@ int azure_kusto_store_file_inactive(struct flb_azure_kusto *ctx, struct azure_ku int ret; struct flb_fstore_file *fsf; + /* Lock to protect list modification from concurrent timer access */ + pthread_mutex_lock(&ctx->files_mutex); + + /* Skip if file is locked (being processed by timer callback) */ + if (azure_kusto_file->locked == FLB_TRUE) { + pthread_mutex_unlock(&ctx->files_mutex); + return -1; + } + fsf = azure_kusto_file->fsf; flb_free(azure_kusto_file); ret = flb_fstore_file_inactive(ctx->fs, fsf); + pthread_mutex_unlock(&ctx->files_mutex); + return ret; } @@ -663,12 +695,23 @@ int azure_kusto_store_file_cleanup(struct flb_azure_kusto *ctx, struct azure_kus { struct flb_fstore_file *fsf; + /* Lock to protect list modification from concurrent timer access */ + pthread_mutex_lock(&ctx->files_mutex); + + /* Skip if file is locked (being processed by timer callback) */ + if (azure_kusto_file->locked == FLB_TRUE) { + pthread_mutex_unlock(&ctx->files_mutex); + return -1; + } + fsf = azure_kusto_file->fsf; /* permanent deletion */ flb_fstore_file_delete(ctx->fs, fsf); flb_free(azure_kusto_file); + pthread_mutex_unlock(&ctx->files_mutex); + return 0; } @@ -690,6 +733,15 @@ int azure_kusto_store_file_delete(struct flb_azure_kusto *ctx, struct azure_kust { struct flb_fstore_file *fsf; + /* Lock to protect list modification from concurrent timer access */ + pthread_mutex_lock(&ctx->files_mutex); + + /* Skip if file is locked (being processed by timer callback) */ + if (azure_kusto_file->locked == FLB_TRUE) { + pthread_mutex_unlock(&ctx->files_mutex); + return -1; + } + fsf = azure_kusto_file->fsf; ctx->current_buffer_size -= azure_kusto_file->size; @@ -697,6 +749,8 @@ int azure_kusto_store_file_delete(struct flb_azure_kusto *ctx, struct azure_kust flb_fstore_file_delete(ctx->fs, fsf); flb_free(azure_kusto_file); + pthread_mutex_unlock(&ctx->files_mutex); + return 0; } diff --git a/tests/runtime/out_azure_kusto.c b/tests/runtime/out_azure_kusto.c index 6bf8499ba13..5aeed79c158 100644 --- a/tests/runtime/out_azure_kusto.c +++ b/tests/runtime/out_azure_kusto.c @@ -20,8 +20,30 @@ #include #include "flb_tests_runtime.h" +#include +#include +#include +#include +#include /* Test data */ + +static int flb_kusto_unlink_cb(const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf) +{ + return remove(fpath); +} + +static void flb_kusto_rm_rf(const char *path) +{ + struct stat st; + + if (stat(path, &st) != 0) { + return; + } + + nftw(path, flb_kusto_unlink_cb, 64, FTW_DEPTH | FTW_PHYS); +} + #include "data/common/json_invalid.h" /* JSON_INVALID */ /* Test functions */ @@ -30,6 +52,10 @@ void flb_test_azure_kusto_managed_identity_system(void); void flb_test_azure_kusto_managed_identity_user(void); void flb_test_azure_kusto_service_principal(void); void flb_test_azure_kusto_workload_identity(void); +void flb_test_azure_kusto_buffering_backlog(void); +#ifndef _WIN32 +void flb_test_azure_kusto_timer_flush_race(void); +#endif /* Test list */ TEST_LIST = { @@ -38,6 +64,10 @@ TEST_LIST = { {"managed_identity_user", flb_test_azure_kusto_managed_identity_user}, {"service_principal", flb_test_azure_kusto_service_principal}, {"workload_identity", flb_test_azure_kusto_workload_identity}, + {"buffering_backlog", flb_test_azure_kusto_buffering_backlog}, +#ifndef _WIN32 + {"timer_flush_race", flb_test_azure_kusto_timer_flush_race}, +#endif {NULL, NULL} }; @@ -210,4 +240,178 @@ void flb_test_azure_kusto_workload_identity(void) flb_stop(ctx); flb_destroy(ctx); -} \ No newline at end of file +} + +/* Regression: exercise buffering-enabled backlog processing on restart to validate nested mk_list_foreach_safe fix */ +void flb_test_azure_kusto_buffering_backlog(void) +{ + int i; + int ret; + int bytes; + char sample[] = "{\"k\":\"v\"}"; + size_t sample_size = sizeof(sample) - 1; + char buffer_dir[PATH_MAX]; + pid_t pid; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + pid = getpid(); + snprintf(buffer_dir, sizeof(buffer_dir), "/tmp/flb-kusto-test-%d", (int) pid); + + /* Ensure a clean buffer directory before starting */ + flb_kusto_rm_rf(buffer_dir); + mkdir(buffer_dir, 0700); + + /* First run: enable buffering and write data to disk */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL); + flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + for (i = 0; i < 5; i++) { + bytes = flb_lib_push(ctx, in_ffd, sample, sample_size); + TEST_CHECK(bytes == (int) sample_size); + } + + sleep(1); /* allow flush to write buffered chunks */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Second run: restart to process backlog from buffer_dir */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL); + flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + sleep(1); /* ingest_all_chunks runs on startup for buffered backlog */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup buffer directory after test */ + flb_kusto_rm_rf(buffer_dir); +} + +#ifndef _WIN32 +/* + * TDD RED PHASE: Timer/Flush race condition test + * + * This test exercises the race where cb_azure_kusto_ingest (timer callback) + * iterates ctx->stream_active->files while concurrent flush/exit paths + * delete files without synchronization, leading to UAF/SIGSEGV. + * + * Expected behavior: + * - CURRENT (unfixed): Should crash/fail under ASan due to UAF + * - AFTER FIX: Should pass cleanly when mutex protects file list access + */ +void flb_test_azure_kusto_timer_flush_race(void) +{ + int i; + int ret; + int bytes; + char sample[] = "{\"race\":\"test\"}"; + size_t sample_size = sizeof(sample) - 1; + char buffer_dir[PATH_MAX]; + pid_t pid; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + pid = getpid(); + snprintf(buffer_dir, sizeof(buffer_dir), "/tmp/flb-kusto-race-test-%d", (int) pid); + + /* Ensure clean buffer directory */ + flb_kusto_rm_rf(buffer_dir); + mkdir(buffer_dir, 0700); + + /* Create context with aggressive timing to trigger race */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "race.test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "race.test", NULL); + flb_output_set(ctx, out_ffd, "auth_type", "managed_identity", NULL); + flb_output_set(ctx, out_ffd, "client_id", "system", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "testdb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + + /* Enable buffering with small timeout to trigger concurrent timer/flush */ + flb_output_set(ctx, out_ffd, "buffering_enabled", "true", NULL); + flb_output_set(ctx, out_ffd, "buffer_dir", buffer_dir, NULL); + flb_output_set(ctx, out_ffd, "upload_timeout", "1s", NULL); /* 1 second timeout triggers timer */ + flb_output_set(ctx, out_ffd, "upload_file_size", "1M", NULL); /* Minimum 1MB file size */ + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Push data to create buffered chunks */ + for (i = 0; i < 10; i++) { + bytes = flb_lib_push(ctx, in_ffd, sample, sample_size); + TEST_CHECK(bytes == (int) sample_size); + } + + /* + * Sleep enough to let: + * 1. Flush write chunks to disk + * 2. Timer callback (cb_azure_kusto_ingest) start iterating files + * 3. Concurrent flush/exit delete files while timer is running + * + * This timing creates the race window where timer accesses freed memory. + */ + sleep(2); + + /* + * Stop quickly to trigger exit-path race: + * cb_azure_kusto_exit will delete files while timer callback + * may still be iterating the list. + * + * Expected on CURRENT code: UAF crash under ASan + * Expected AFTER fix: Clean shutdown + */ + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup */ + flb_kusto_rm_rf(buffer_dir); +} +#endif /* _WIN32 */ \ No newline at end of file