From 0296ff2734341d7c30e5aad81229f319205b2ba2 Mon Sep 17 00:00:00 2001 From: "jinyong.choi" Date: Thu, 11 Dec 2025 20:27:25 +0900 Subject: [PATCH 1/3] tests: in_tail: add regression tests for DB offset and gzip - Added test for offset rewind on shutdown (prevent DB resurrection) - Added gzip tests: resume loss, inotify append, rotation, multi resume Signed-off-by: jinyong.choi --- tests/runtime/in_tail.c | 769 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 769 insertions(+) diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c index 2b1ebb5228a..197208383b9 100644 --- a/tests/runtime/in_tail.c +++ b/tests/runtime/in_tail.c @@ -34,6 +34,10 @@ Approach for this tests is basing on filter_kubernetes tests #include #include #include +#include +#ifdef FLB_HAVE_SQLDB +#include +#endif #include "flb_tests_runtime.h" #define NEW_LINE "\n" @@ -267,10 +271,193 @@ static ssize_t write_msg(struct test_tail_ctx *ctx, char *msg, size_t msg_len) return w_byte; } +/* + * Write raw data to file with optional newline. + * If msg is NULL, only writes a newline (useful for completing incomplete lines). + * If add_newline is FLB_FALSE, data remains in tail buffer as incomplete line. + */ +static ssize_t write_raw(struct test_tail_ctx *ctx, char *msg, size_t msg_len, + int add_newline) +{ + int i; + ssize_t w_byte = 0; + + for (i = 0; i < ctx->fd_num; i++) { + if (msg != NULL && msg_len > 0) { + w_byte = write(ctx->fds[i], msg, msg_len); + if (!TEST_CHECK(w_byte == msg_len)) { + TEST_MSG("write failed ret=%ld", w_byte); + return -1; + } + } + if (add_newline) { + w_byte = write(ctx->fds[i], NEW_LINE, strlen(NEW_LINE)); + if (!TEST_CHECK(w_byte == strlen(NEW_LINE))) { + TEST_MSG("write newline failed ret=%ld", w_byte); + return -1; + } + } + fsync(ctx->fds[i]); + } + return w_byte; +} + #define DPATH FLB_TESTS_DATA_PATH "/data/tail" #define MAX_LINES 32 +/* Gzip helpers */ +static int create_gzip_file(const char *path, const char *data, size_t len) +{ + int ret; + void *gz_data; + size_t gz_len; + FILE *fp; + + ret = flb_gzip_compress((void *)data, len, &gz_data, &gz_len); + if (ret != 0) { + return -1; + } + + fp = fopen(path, "wb"); + if (!fp) { + flb_free(gz_data); + return -1; + } + + if (fwrite(gz_data, 1, gz_len, fp) != gz_len) { + fclose(fp); + flb_free(gz_data); + return -1; + } + fclose(fp); + printf("Created gzip file %s size=%lu\n", path, (unsigned long)gz_len); + flb_free(gz_data); + + return 0; +} + +static int append_gzip_file(const char *path, const char *data, size_t len) +{ + int ret; + void *gz_data; + size_t gz_len; + FILE *fp; + + ret = flb_gzip_compress((void *)data, len, &gz_data, &gz_len); + if (ret != 0) { + return -1; + } + + fp = fopen(path, "ab"); + if (!fp) { + flb_free(gz_data); + return -1; + } + + if (fwrite(gz_data, 1, gz_len, fp) != gz_len) { + fclose(fp); + flb_free(gz_data); + return -1; + } + fclose(fp); + flb_free(gz_data); + + return 0; +} + +struct test_ctx { + int count; + int found_line2; +}; + +static int cb_check_gzip_resume(void *record, size_t size, void *data) +{ + struct test_ctx *ctx = data; + msgpack_unpacked result; + msgpack_object root; + msgpack_object key; + msgpack_object val; + msgpack_object v; + size_t off = 0; + int i; + + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, record, size, &off) == MSGPACK_UNPACK_SUCCESS) { + root = result.data; + if (root.type == MSGPACK_OBJECT_ARRAY && root.via.array.size == 2) { + ctx->count++; + + /* Check content for "line2" */ + val = root.via.array.ptr[1]; /* map */ + if (val.type == MSGPACK_OBJECT_MAP) { + for (i = 0; i < val.via.map.size; i++) { + key = val.via.map.ptr[i].key; + v = val.via.map.ptr[i].val; + if (key.type == MSGPACK_OBJECT_STR && + key.via.str.size == 3 && + memcmp(key.via.str.ptr, "log", 3) == 0) { + if (v.type == MSGPACK_OBJECT_STR) { + if (v.via.str.size >= 5 && + memcmp(v.via.str.ptr, "line2", 5) == 0) { + ctx->found_line2 = 1; + } + } + } + } + } + } + } + msgpack_unpacked_destroy(&result); + + flb_free(record); + return 0; +} + +#ifdef FLB_HAVE_SQLDB +/* + * Helper function to get current file offset from tail DB + */ +static int64_t get_db_offset(const char *db_path, const char *file_name) +{ + sqlite3 *db; + sqlite3_stmt *stmt; + int rc; + int64_t offset = -1; + char like_pattern[256]; + const char *query = "SELECT offset FROM in_tail_files WHERE name LIKE ? LIMIT 1;"; + + rc = sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, NULL); + if (rc != SQLITE_OK) { + return -1; + } + + /* + * Table name is 'in_tail_files' + * We need to find the offset for the specific file. + * Since the test uses a specific filename, we can query by name. + */ + rc = sqlite3_prepare_v2(db, query, -1, &stmt, NULL); + if (rc != SQLITE_OK) { + sqlite3_close(db); + return -1; + } + + snprintf(like_pattern, sizeof(like_pattern), "%%%s%%", file_name); + sqlite3_bind_text(stmt, 1, like_pattern, -1, SQLITE_TRANSIENT); + + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) { + offset = sqlite3_column_int64(stmt, 0); + } + + sqlite3_finalize(stmt); + sqlite3_close(db); + + return offset; +} +#endif /* FLB_HAVE_SQLDB */ + int64_t result_time; struct tail_test_result { const char *target; @@ -311,6 +498,37 @@ void wait_with_timeout(uint32_t timeout_ms, struct tail_test_result *result, int } } +/* + * Wait until output count reaches expected value or timeout. + * Returns the final count. + */ +static int wait_for_count_with_timeout(int *count_ptr, int expected, uint32_t timeout_ms) +{ + struct flb_time start_time; + struct flb_time end_time; + struct flb_time diff_time; + uint64_t elapsed_time_flb = 0; + + flb_time_get(&start_time); + + while (1) { + if (*count_ptr >= expected) { + return *count_ptr; + } + + flb_time_msleep(50); + flb_time_get(&end_time); + flb_time_diff(&end_time, &start_time, &diff_time); + elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000; + + if (elapsed_time_flb > timeout_ms) { + break; + } + } + + return *count_ptr; +} + void wait_num_with_timeout(uint32_t timeout_ms, int *output_num) { struct flb_time start_time; @@ -2649,6 +2867,552 @@ void flb_test_db_compare_filename() test_tail_ctx_destroy(ctx); unlink(db); } + +/* + * Test: flb_test_db_offset_rewind_on_shutdown + * + * This test verifies that unprocessed buffered data is not lost on shutdown. + * When Fluent Bit shuts down with data still in the buffer, the offset should + * be rewound so that the data is re-read on restart. + * + * Scenario: + * 1. Start Fluent Bit with DB enabled + * 2. Write initial data and wait for it to be processed + * 3. Write additional data and immediately stop (before flush) + * 4. Restart Fluent Bit + * 5. Verify that the data written before shutdown is re-read + */ + +void flb_test_db_offset_rewind_on_shutdown() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"test_offset_rewind.log"}; + char *db = "test_offset_rewind.db"; + char *msg_init = "initial message"; + char *msg_before_shutdown = "message before shutdown"; + int i; + int ret; + int num; + int unused; + + unlink(file[0]); + unlink(db); + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + /* First run: write initial data */ + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Set debug log level to see offset rewind messages */ + flb_service_set(ctx->flb, "Log_Level", "debug", NULL); + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "path", file[0], + "read_from_head", "true", + "db", db, + "db.sync", "full", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Write initial message */ + ret = write_msg(ctx, msg_init, strlen(msg_init)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* Wait for data to be processed */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == 1)) { + TEST_MSG("initial message not received. expect=1 got=%d", num); + } + + /* + * Wait for tail to READ the file into buffer (advancing the DB offset). + * Instead of sleeping, we poll the DB until the offset increases. + * The offset should advance by the length of msg_before_shutdown. + */ + int64_t start_offset = -1; + int64_t current_offset = -1; + int attempts = 0; + + /* Get initial offset (should be after the first message) */ + while (start_offset == -1 && attempts < 20) { + start_offset = get_db_offset(db, file[0]); + if (start_offset != -1) { + break; + } + flb_time_msleep(100); + attempts++; + } + + if (!TEST_CHECK(start_offset >= 0)) { + TEST_MSG("failed to get initial db offset"); + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* + * Write message WITHOUT newline - this will remain in the tail buffer + * as an incomplete line, which is the scenario we want to test. + * The tail plugin processes complete lines (ending with \n), so + * data without newline stays in buf_len until more data arrives. + */ + ret = write_raw(ctx, msg_before_shutdown, strlen(msg_before_shutdown), FLB_FALSE); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* Poll loop */ + attempts = 0; + while (attempts < 20) { /* Max 2 seconds wait */ + current_offset = get_db_offset(db, file[0]); + if (current_offset > start_offset) { + break; + } + flb_time_msleep(100); + attempts++; + } + + if (!TEST_CHECK(current_offset > start_offset)) { + TEST_MSG("DB offset did not advance. start=%ld current=%ld", + start_offset, current_offset); + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* Close file descriptors before stopping */ + if (ctx->fds != NULL) { + for (i = 0; i < ctx->fd_num; i++) { + close(ctx->fds[i]); + } + flb_free(ctx->fds); + ctx->fds = NULL; + } + + /* Stop immediately - simulating abrupt shutdown */ + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); + + /* Second run: restart and verify data is re-read */ + clear_output_num(); + num = get_output_num(); + if (!TEST_CHECK(num == 0)) { + TEST_MSG("output count not cleared. expect=0 got=%d", num); + } + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_FALSE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed on restart"); + unlink(file[0]); + unlink(db); + exit(EXIT_FAILURE); + } + + /* Set debug log level to see offset rewind messages */ + flb_service_set(ctx->flb, "Log_Level", "debug", NULL); + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "path", file[0], + "read_from_head", "true", + "db", db, + "db.sync", "full", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Restart the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* + * Write a newline to complete the incomplete line from before shutdown. + * This simulates the scenario where more data arrives after restart, + * completing the previously incomplete line. + */ + ret = write_raw(ctx, NULL, 0, FLB_TRUE); + if (!TEST_CHECK(ret > 0)) { + TEST_MSG("write newline failed"); + } + + /* Wait for data to be processed */ + flb_time_msleep(500); + + num = get_output_num(); + /* + * After restart, we expect to receive the message that was written + * before shutdown. If the offset rewind fix works correctly, + * msg_before_shutdown should be re-read. + * We expect at least 1 message (msg_before_shutdown). + */ + if (!TEST_CHECK(num == 1)) { + TEST_MSG("data loss detected after restart. expect==1 got=%d", num); + } + + test_tail_ctx_destroy(ctx); + unlink(file[0]); + unlink(db); +} + +/* Test case for Gzip resume data loss regression */ +void flb_test_db_gzip_resume_loss() +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct test_ctx t_ctx = {0}; + struct flb_lib_out_cb cb; + char *log_file = "test_gzip_resume.log.gz"; + char *db_file = "test_gzip_resume.db"; + const char *content1 = "line1\nline2"; + const char *content2 = "\nline3\n"; + + cb.cb = cb_check_gzip_resume; + cb.data = &t_ctx; + + unlink(log_file); + unlink(db_file); + + /* 1. Create Gzip file with incomplete line at end */ + TEST_CHECK(create_gzip_file(log_file, content1, strlen(content1)) == 0); + + /* 2. Start Fluent Bit */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Wait for output count to reach 1 */ + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.count == 1); /* Only line1 */ + + /* 3. Restart Fluent Bit */ + TEST_CHECK(append_gzip_file(log_file, content2, strlen(content2)) == 0); + + t_ctx.count = 0; + t_ctx.found_line2 = 0; + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Wait for output count to reach 2 (line2 + line3) */ + wait_for_count_with_timeout(&t_ctx.count, 2, 2000); + + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.found_line2 == 1); + TEST_CHECK(t_ctx.count == 2); + + unlink(log_file); + unlink(db_file); +} + + +void flb_test_db_gzip_inotify_append() +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct test_ctx t_ctx = {0}; + struct flb_lib_out_cb cb; + char *log_file = "test_gzip_inotify.log.gz"; + char *db_file = "test_gzip_inotify.db"; + const char *content1 = "line1\n"; + const char *content2 = "line2\n"; + + cb.cb = cb_check_gzip_resume; /* Reusing callback as it counts lines */ + cb.data = &t_ctx; + + unlink(log_file); + unlink(db_file); + + /* 1. Create initial Gzip file */ + TEST_CHECK(create_gzip_file(log_file, content1, strlen(content1)) == 0); + + /* 2. Start Fluent Bit */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + /* explicit refresh_interval to be sure, though inotify is default */ + "refresh_interval", "1", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Wait for initial read */ + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + TEST_CHECK(t_ctx.count == 1); + + /* 3. Append to Gzip file while running (Simulate Inotify Event) */ + TEST_CHECK(append_gzip_file(log_file, content2, strlen(content2)) == 0); + + /* Wait for inotify/refresh and processing */ + wait_for_count_with_timeout(&t_ctx.count, 2, 2000); + + /* 4. Verify total count */ + TEST_CHECK(t_ctx.count == 2); + /* Verify line2 was actually processed */ + TEST_CHECK(t_ctx.found_line2 == 1); + + flb_stop(ctx); + flb_destroy(ctx); + unlink(log_file); + unlink(db_file); +} + + +void flb_test_db_gzip_rotation() +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct test_ctx t_ctx = {0}; + struct flb_lib_out_cb cb; + char *log_file = "test_gzip_rotate.log.gz"; + char *rot_file = "test_gzip_rotate.log.gz.1"; + char *db_file = "test_gzip_rotate.db"; + const char *content1 = "line1\n"; + const char *content2 = "line2\n"; + const char *content3 = "line3_new\n"; + const char *content4 = "line4_old\n"; + + cb.cb = cb_check_gzip_resume; + cb.data = &t_ctx; + + unlink(log_file); + unlink(rot_file); + unlink(db_file); + + /* 1. Create initial Gzip file */ + TEST_CHECK(create_gzip_file(log_file, content1, strlen(content1)) == 0); + + /* 2. Start Fluent Bit */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "refresh_interval", "1", + "rotate_wait", "5", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Wait for initial read */ + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + TEST_CHECK(t_ctx.count == 1); + + /* 3. Rotate file: Rename .gz -> .gz.1 */ + ret = rename(log_file, rot_file); + TEST_CHECK(ret == 0); + + /* 4. Create NEW file with same name immediately */ + TEST_CHECK(create_gzip_file(log_file, content2, strlen(content2)) == 0); + + /* Wait for rotation detection and new file processing */ + wait_for_count_with_timeout(&t_ctx.count, 2, 2000); + + /* 5. Append to BOTH files within rotate_wait window */ + /* 5a. Append to new file */ + TEST_CHECK(append_gzip_file(log_file, content3, strlen(content3)) == 0); + + /* 5b. Append to OLD file (rotated) - should still be monitored */ + TEST_CHECK(append_gzip_file(rot_file, content4, strlen(content4)) == 0); + + /* Wait for processing */ + wait_for_count_with_timeout(&t_ctx.count, 4, 2000); + + /* 6. Verify total count */ + TEST_CHECK(t_ctx.count == 4); + + flb_stop(ctx); + flb_destroy(ctx); + unlink(log_file); + unlink(rot_file); + unlink(db_file); +} + + +void flb_test_db_gzip_multi_resume() +{ + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + struct test_ctx t_ctx = {0}; + struct flb_lib_out_cb cb; + char *log_file = "test_gzip_multi.log.gz"; + char *db_file = "test_gzip_multi.db"; + const char *content1 = "line1\n"; + const char *content2 = "line2\n"; + const char *content3 = "line3\n"; + + cb.cb = cb_check_gzip_resume; + cb.data = &t_ctx; + + unlink(log_file); + unlink(db_file); + + /* 1. Create file with Line1 */ + TEST_CHECK(create_gzip_file(log_file, content1, strlen(content1)) == 0); + + /* 2. Start (Run 1) */ + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + flb_start(ctx); + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.count == 1); /* Processed Line1 */ + t_ctx.count = 0; + + /* 3. Restart (Run 2) -> Should SKIP Line1 and process Line2 */ + TEST_CHECK(append_gzip_file(log_file, content2, strlen(content2)) == 0); + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + flb_start(ctx); + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.count == 1); /* Should process ONLY line2 */ + t_ctx.count = 0; + + /* 4. Restart (Run 3) -> Should SKIP Line1+Line2 and process Line3 */ + TEST_CHECK(append_gzip_file(log_file, content3, strlen(content3)) == 0); + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "0.5", "Grace", "1", NULL); + in_ffd = flb_input(ctx, "tail", NULL); + flb_input_set(ctx, in_ffd, + "path", log_file, + "read_from_head", "true", + "db", db_file, + "db.sync", "full", + NULL); + + out_ffd = flb_output(ctx, "lib", &cb); + flb_output_set(ctx, out_ffd, "match", "*", NULL); + + flb_start(ctx); + wait_for_count_with_timeout(&t_ctx.count, 1, 2000); + flb_stop(ctx); + flb_destroy(ctx); + + TEST_CHECK(t_ctx.count == 1); + + unlink(log_file); + unlink(db_file); +} + #endif /* FLB_HAVE_SQLDB */ /* Test list */ @@ -2680,6 +3444,11 @@ TEST_LIST = { {"db", flb_test_db}, {"db_delete_stale_file", flb_test_db_delete_stale_file}, {"db_compare_filename", flb_test_db_compare_filename}, + {"db_offset_rewind_on_shutdown", flb_test_db_offset_rewind_on_shutdown}, + {"db_gzip_resume_loss", flb_test_db_gzip_resume_loss }, + {"db_gzip_inotify_append", flb_test_db_gzip_inotify_append }, + {"db_gzip_rotation", flb_test_db_gzip_rotation }, + {"db_gzip_multi_resume", flb_test_db_gzip_multi_resume }, #endif #ifdef FLB_HAVE_UNICODE_ENCODER From 9cc380ffeb7a6ca35e9f94efab3e705c21c7517b Mon Sep 17 00:00:00 2001 From: "jinyong.choi" Date: Sun, 14 Dec 2025 21:41:46 +0900 Subject: [PATCH 2/3] in_tail: fix data loss on shutdown and prevent DB resurrection Unprocessed buffered data is lost on shutdown because the file offset is saved ahead of processing. This patch rewinds the offset by the remaining buffer length on exit, ensuring data is re-read on restart. Also, to prevent resurrecting deleted files in the DB, db_id is reset to 0 upon deletion. The offset update logic now checks db_id > 0 to ensure only active files are updated. Signed-off-by: jinyong.choi --- plugins/in_tail/tail.h | 3 +++ plugins/in_tail/tail_db.c | 1 + plugins/in_tail/tail_file.c | 34 +++++++++++++++++++++++++++++++++- 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/plugins/in_tail/tail.h b/plugins/in_tail/tail.h index a1c7784707d..dfb241a93df 100644 --- a/plugins/in_tail/tail.h +++ b/plugins/in_tail/tail.h @@ -33,6 +33,9 @@ #define FLB_TAIL_STATIC 0 /* Data is being consumed through read(2) */ #define FLB_TAIL_EVENT 1 /* Data is being consumed through inotify */ +/* Database */ +#define FLB_TAIL_DB_ID_NONE 0 /* File not in database or deleted */ + /* Config */ #define FLB_TAIL_CHUNK "32768" /* buffer chunk = 32KB */ #define FLB_TAIL_REFRESH 60 /* refresh every 60 seconds */ diff --git a/plugins/in_tail/tail_db.c b/plugins/in_tail/tail_db.c index a8190e121d0..2e926459c07 100644 --- a/plugins/in_tail/tail_db.c +++ b/plugins/in_tail/tail_db.c @@ -363,6 +363,7 @@ int flb_tail_db_file_delete(struct flb_tail_file *file, } flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name); + file->db_id = FLB_TAIL_DB_ID_NONE; return 0; } diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 1624bd94c7a..18e2ec92c16 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -1274,7 +1274,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, file->dmode_lastline = flb_sds_create_size(ctx->docker_mode == FLB_TRUE ? 20000 : 0); file->dmode_firstline = false; #ifdef FLB_HAVE_SQLDB - file->db_id = 0; + file->db_id = FLB_TAIL_DB_ID_NONE; #endif file->skip_next = FLB_FALSE; file->skip_warn = FLB_FALSE; @@ -1448,6 +1448,38 @@ void flb_tail_file_remove(struct flb_tail_file *file) flb_plg_debug(ctx->ins, "inode=%"PRIu64" removing file name %s", file->inode, file->name); + if (file->buf_len > 0) { + if (file->decompression_context == NULL) { + /* + * If there is data in the buffer, it means it was not processed. + * We must rewind the offset to ensure this data is re-read on restart. + */ + off_t old_offset = file->offset; + + if (file->offset > file->buf_len) { + file->offset -= file->buf_len; + } else { + file->offset = 0; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" rewind offset for %s: " + "old=%"PRId64" new=%"PRId64" (buf_len=%lu)", + file->inode, file->name, old_offset, file->offset, + (unsigned long)file->buf_len); + +#ifdef FLB_HAVE_SQLDB + if (ctx->db && file->db_id > FLB_TAIL_DB_ID_NONE) { + flb_tail_db_file_offset(file, ctx); + } +#endif + } + else { + flb_plg_warn(ctx->ins, "inode=%"PRIu64" cannot rewind compressed file %s; " + "%lu decompressed bytes in buffer may be lost on restart", + file->inode, file->name, (unsigned long)file->buf_len); + } + } + if (file->decompression_context != NULL) { flb_decompression_context_destroy(file->decompression_context); } From fdd817452d12aa6a738cde060c3a0b292c0ef040 Mon Sep 17 00:00:00 2001 From: "jinyong.choi" Date: Fri, 19 Dec 2025 15:45:14 +0900 Subject: [PATCH 3/3] in_tail: fix gzip data loss on resume and update DB schema Previously, when tailing gzip files, there was no mechanism to persistently store the uncompressed position ('skip_bytes'). This meant that upon restart, the plugin could not correctly locate the reading position, identifying it as a rotation or new file case, potentially leading to data loss. To fix this, 'skip_bytes' is now stored in the database to persist the uncompressed offset. Additionally, 'exclude_bytes' is introduced to track runtime skipping without interfering with the persistent value. The SQLite schema has been updated to include 'anchor_offset' and 'skip_bytes' columns to support these features. Signed-off-by: jinyong.choi --- plugins/in_tail/tail_db.c | 98 +++++++++++++++++-- plugins/in_tail/tail_file.c | 137 +++++++++++++++++++++++++-- plugins/in_tail/tail_file_internal.h | 4 + plugins/in_tail/tail_fs_inotify.c | 4 + plugins/in_tail/tail_fs_stat.c | 4 + plugins/in_tail/tail_sql.h | 14 +-- 6 files changed, 235 insertions(+), 26 deletions(-) diff --git a/plugins/in_tail/tail_db.c b/plugins/in_tail/tail_db.c index 2e926459c07..a95c4241cda 100644 --- a/plugins/in_tail/tail_db.c +++ b/plugins/in_tail/tail_db.c @@ -25,11 +25,13 @@ #include "tail_sql.h" #include "tail_file.h" -struct query_status { - int id; - int rows; - int64_t offset; -}; +/* Callback to detect if a query returned any rows */ +static int cb_column_exists(void *data, int argc, char **argv, char **cols) +{ + int *found = (int *)data; + *found = 1; + return 0; +} /* Open or create database required by tail plugin */ struct flb_sqldb *flb_tail_db_open(const char *path, @@ -38,6 +40,7 @@ struct flb_sqldb *flb_tail_db_open(const char *path, struct flb_config *config) { int ret; + int column_found; char tmp[64]; struct flb_sqldb *db; @@ -55,6 +58,54 @@ struct flb_sqldb *flb_tail_db_open(const char *path, return NULL; } + /* Check if 'skip' column exists (migration for older databases) */ + column_found = 0; + ret = flb_sqldb_query(db, + "SELECT 1 FROM pragma_table_info('in_tail_files') " + "WHERE name='skip';", + cb_column_exists, &column_found); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not query table info for 'skip' column"); + flb_sqldb_close(db); + return NULL; + } + if (column_found == 0) { + flb_plg_debug(ctx->ins, "db: migrating database, adding 'skip' column"); + ret = flb_sqldb_query(db, + "ALTER TABLE in_tail_files " + "ADD COLUMN skip INTEGER DEFAULT 0;", + NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not add 'skip' column"); + flb_sqldb_close(db); + return NULL; + } + } + + /* Check if 'anchor' column exists (migration for older databases) */ + column_found = 0; + ret = flb_sqldb_query(db, + "SELECT 1 FROM pragma_table_info('in_tail_files') " + "WHERE name='anchor';", + cb_column_exists, &column_found); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not query table info for 'anchor' column"); + flb_sqldb_close(db); + return NULL; + } + if (column_found == 0) { + flb_plg_debug(ctx->ins, "db: migrating database, adding 'anchor' column"); + ret = flb_sqldb_query(db, + "ALTER TABLE in_tail_files " + "ADD COLUMN anchor INTEGER DEFAULT 0;", + NULL, NULL); + if (ret != FLB_OK) { + flb_plg_error(ctx->ins, "db: could not add 'anchor' column"); + flb_sqldb_close(db); + return NULL; + } + } + if (ctx->db_sync >= 0) { snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC, ctx->db_sync); @@ -130,7 +181,8 @@ static int flb_tail_db_file_delete_by_id(struct flb_tail_config *ctx, */ static int db_file_exists(struct flb_tail_file *file, struct flb_tail_config *ctx, - uint64_t *id, uint64_t *inode, off_t *offset) + uint64_t *id, uint64_t *inode, + int64_t *offset, uint64_t *skip, int64_t *anchor) { int ret; int exists = FLB_FALSE; @@ -149,6 +201,8 @@ static int db_file_exists(struct flb_tail_file *file, /* name: column 1 */ name = sqlite3_column_text(ctx->stmt_get_file, 1); if (ctx->compare_filename && name == NULL) { + sqlite3_clear_bindings(ctx->stmt_get_file); + sqlite3_reset(ctx->stmt_get_file); flb_plg_error(ctx->ins, "db: error getting name: id=%"PRIu64, *id); return -1; } @@ -159,12 +213,18 @@ static int db_file_exists(struct flb_tail_file *file, /* inode: column 3 */ *inode = sqlite3_column_int64(ctx->stmt_get_file, 3); + /* skip: column 6 */ + *skip = sqlite3_column_int64(ctx->stmt_get_file, 6); + + /* anchor: column 7 */ + *anchor = sqlite3_column_int64(ctx->stmt_get_file, 7); + /* Checking if the file's name and inode match exactly */ if (ctx->compare_filename) { if (flb_tail_target_file_name_cmp((char *) name, file) != 0) { exists = FLB_FALSE; flb_plg_debug(ctx->ins, "db: exists stale file from database:" - " id=%"PRIu64" inode=%"PRIu64" offset=%"PRIu64 + " id=%"PRIu64" inode=%"PRIu64" offset=%"PRId64 " name=%s file_inode=%"PRIu64" file_name=%s", *id, *inode, *offset, name, file->inode, file->name); @@ -199,6 +259,8 @@ static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ct sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset); sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode); sqlite3_bind_int64(ctx->stmt_insert_file, 4, created); + sqlite3_bind_int64(ctx->stmt_insert_file, 5, file->skip_bytes); + sqlite3_bind_int64(ctx->stmt_insert_file, 6, file->anchor_offset); /* Run the insert */ ret = sqlite3_step(ctx->stmt_insert_file); @@ -258,11 +320,13 @@ int flb_tail_db_file_set(struct flb_tail_file *file, { int ret; uint64_t id = 0; - off_t offset = 0; + int64_t offset = 0; + uint64_t skip = 0; + int64_t anchor = 0; uint64_t inode = 0; /* Check if the file exists */ - ret = db_file_exists(file, ctx, &id, &inode, &offset); + ret = db_file_exists(file, ctx, &id, &inode, &offset, &skip, &anchor); if (ret == -1) { flb_plg_error(ctx->ins, "cannot execute query to check inode: %" PRIu64, file->inode); @@ -281,6 +345,18 @@ int flb_tail_db_file_set(struct flb_tail_file *file, else { file->db_id = id; file->offset = offset; + file->skip_bytes = skip; + file->anchor_offset = anchor; + + /* Initialize skipping mode if needed */ + if (file->skip_bytes > 0) { + file->exclude_bytes = file->skip_bytes; + file->skipping_mode = FLB_TRUE; + } + else { + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; + } } return 0; @@ -294,7 +370,9 @@ int flb_tail_db_file_offset(struct flb_tail_file *file, /* Bind parameters */ sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset); - sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id); + sqlite3_bind_int64(ctx->stmt_offset, 2, file->skip_bytes); + sqlite3_bind_int64(ctx->stmt_offset, 3, file->anchor_offset); + sqlite3_bind_int64(ctx->stmt_offset, 4, file->db_id); ret = sqlite3_step(ctx->stmt_offset); diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 18e2ec92c16..b834945473f 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -44,6 +44,7 @@ #include "tail_dockermode.h" #include "tail_multiline.h" #include "tail_scan.h" +#include #ifdef FLB_SYSTEM_WINDOWS #include "win32.h" @@ -1027,6 +1028,8 @@ static int set_file_position(struct flb_tail_config *ctx, struct flb_tail_file *file) { int64_t ret; + int64_t seek_pos; + int has_db_position; #ifdef FLB_HAVE_SQLDB /* @@ -1036,12 +1039,37 @@ static int set_file_position(struct flb_tail_config *ctx, if (ctx->db) { ret = flb_tail_db_file_set(file, ctx); if (ret == 0) { - if (file->offset > 0) { - ret = lseek(file->fd, file->offset, SEEK_SET); + + /* + * Determine seek position based on file type and DB state: + * + * - Gzip files with anchor/skip info: use anchor_offset + * - Normal files or gzip migration fallback: use offset + * - No DB position + read_from_head=off: seek to EOF + */ + seek_pos = file->offset; + has_db_position = (file->offset > 0); + + /* Override for gzip files with proper anchor tracking */ + if (file->decompression_context != NULL && + (file->anchor_offset > 0 || file->skip_bytes > 0)) { + seek_pos = file->anchor_offset; + has_db_position = FLB_TRUE; + } + + if (has_db_position) { + ret = lseek(file->fd, seek_pos, SEEK_SET); if (ret == -1) { flb_errno(); return -1; } + file->offset = ret; + + /* Initialize skip state for gzip resume */ + if (file->decompression_context != NULL && file->skip_bytes > 0) { + file->exclude_bytes = file->skip_bytes; + file->skipping_mode = FLB_TRUE; + } } else if (ctx->read_from_head == FLB_FALSE) { ret = lseek(file->fd, 0, SEEK_END); @@ -1050,8 +1078,23 @@ static int set_file_position(struct flb_tail_config *ctx, return -1; } file->offset = ret; + file->anchor_offset = ret; flb_tail_db_file_offset(file, ctx); } + + if (file->decompression_context == NULL) { + file->stream_offset = file->offset; + } + else { + /* + * For single-member gzip, stream_offset = skip_bytes is correct. + * For multi-member gzip, skip_bytes only tracks bytes within the + * current member (reset at member boundaries), so stream_offset + * may not reflect the total decompressed bytes from prior members. + */ + file->stream_offset = file->skip_bytes; + } + return 0; } } @@ -1084,6 +1127,15 @@ static int set_file_position(struct flb_tail_config *ctx, if (file->decompression_context == NULL) { file->stream_offset = ret; } + else { + /* + * Compressed file without DB: no persistent state available. + * Initialize skip-related fields to 0 for code consistency. + */ + file->anchor_offset = file->offset; + file->exclude_bytes = 0; + file->stream_offset = 0; + } return 0; } @@ -1279,6 +1331,12 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, file->skip_next = FLB_FALSE; file->skip_warn = FLB_FALSE; + /* Initialize gzip resume fields */ + file->anchor_offset = 0; + file->skip_bytes = 0; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; + /* Multiline core mode */ if (ctx->ml_ctx) { /* @@ -1454,18 +1512,20 @@ void flb_tail_file_remove(struct flb_tail_file *file) * If there is data in the buffer, it means it was not processed. * We must rewind the offset to ensure this data is re-read on restart. */ - off_t old_offset = file->offset; + int64_t old_offset = file->offset; if (file->offset > file->buf_len) { file->offset -= file->buf_len; - } else { + } + else { file->offset = 0; } flb_plg_debug(ctx->ins, "inode=%"PRIu64" rewind offset for %s: " - "old=%"PRId64" new=%"PRId64" (buf_len=%lu)", - file->inode, file->name, old_offset, file->offset, - (unsigned long)file->buf_len); + "old=%jd new=%jd (buf_len=%zu)", + file->inode, file->name, + (intmax_t)old_offset, (intmax_t)file->offset, + file->buf_len); #ifdef FLB_HAVE_SQLDB if (ctx->db && file->db_id > FLB_TAIL_DB_ID_NONE) { @@ -1474,9 +1534,16 @@ void flb_tail_file_remove(struct flb_tail_file *file) #endif } else { - flb_plg_warn(ctx->ins, "inode=%"PRIu64" cannot rewind compressed file %s; " - "%lu decompressed bytes in buffer may be lost on restart", - file->inode, file->name, (unsigned long)file->buf_len); + flb_plg_debug(ctx->ins, + "inode=%"PRIu64" file=%s: buffered data (%zu bytes) " + "remains on exit for gzip input; cannot rewind raw " + "offset safely with streaming decompression", + file->inode, file->name, file->buf_len); +#ifdef FLB_HAVE_SQLDB + if (ctx->db && file->db_id > FLB_TAIL_DB_ID_NONE) { + flb_tail_db_file_offset(file, ctx); + } +#endif } } @@ -1598,6 +1665,10 @@ static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *fi file->inode, file->name, size_delta); file->offset = offset; file->buf_len = 0; + file->anchor_offset = offset; + file->skip_bytes = 0; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; /* Update offset in the database file */ #ifdef FLB_HAVE_SQLDB @@ -1625,6 +1696,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) uint8_t *read_buffer; size_t read_size; size_t size; + size_t remain; char *tmp; int ret; int lines; @@ -1797,6 +1869,28 @@ int flb_tail_file_chunk(struct flb_tail_file *file) return FLB_TAIL_ERROR; } + if (file->skipping_mode == FLB_TRUE && decompressed_data_length > 0) { + flb_plg_debug(ctx->ins, + "Skipping: anchor=%jd offset=%jd " + "exclude=%zu decompressed=%zu", + (intmax_t)file->anchor_offset, + (intmax_t)file->offset, + file->exclude_bytes, decompressed_data_length); + if (file->exclude_bytes >= decompressed_data_length) { + file->exclude_bytes -= decompressed_data_length; + decompressed_data_length = 0; + } + else { + remain = decompressed_data_length - file->exclude_bytes; + memmove(&file->buf_data[file->buf_len], + &file->buf_data[file->buf_len + file->exclude_bytes], + remain); + decompressed_data_length = remain; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; + } + } + stream_data_length = decompressed_data_length; } } @@ -1836,6 +1930,29 @@ int flb_tail_file_chunk(struct flb_tail_file *file) file->buf_len -= processed_bytes; file->buf_data[file->buf_len] = '\0'; + if (file->decompression_context) { + file->skip_bytes += processed_bytes; + + /* + * Gzip member boundary: update anchor when we complete a member + * and all decompressed data is consumed. + */ + if (file->decompression_context->state == + FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER && + file->decompression_context->input_buffer_length == 0 && + file->buf_len == 0) { + flb_plg_debug(file->config->ins, + "Gzip member completed: updating anchor " + "from %jd to %jd, resetting skip from " + "%"PRIu64" to 0", + (intmax_t)file->anchor_offset, + (intmax_t)file->offset, + file->skip_bytes); + file->anchor_offset = file->offset; + file->skip_bytes = 0; + } + } + #ifdef FLB_HAVE_SQLDB if (file->config->db) { flb_tail_db_file_offset(file, file->config); diff --git a/plugins/in_tail/tail_file_internal.h b/plugins/in_tail/tail_file_internal.h index bca35ed67cf..2d9dfc770c2 100644 --- a/plugins/in_tail/tail_file_internal.h +++ b/plugins/in_tail/tail_file_internal.h @@ -43,6 +43,10 @@ struct flb_tail_file { int64_t size; int64_t offset; /* this represents the raw file offset, not the input data offset (see stream_offset) */ + int64_t anchor_offset; /* compressed: file offset at member start */ + uint64_t skip_bytes; /* compressed: decompressed bytes to skip */ + uint64_t exclude_bytes; /* compressed: runtime countdown during skip */ + int skipping_mode; /* compressed: skipping previously read data */ int64_t last_line; uint64_t dev_id; uint64_t inode; diff --git a/plugins/in_tail/tail_fs_inotify.c b/plugins/in_tail/tail_fs_inotify.c index eee9babd343..7552b562ae8 100644 --- a/plugins/in_tail/tail_fs_inotify.c +++ b/plugins/in_tail/tail_fs_inotify.c @@ -267,6 +267,10 @@ static int tail_fs_event(struct flb_input_instance *ins, file->inode, file->name, size_delta); file->offset = offset; file->buf_len = 0; + file->anchor_offset = offset; + file->skip_bytes = 0; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; /* Update offset in the database file */ #ifdef FLB_HAVE_SQLDB diff --git a/plugins/in_tail/tail_fs_stat.c b/plugins/in_tail/tail_fs_stat.c index 36eead51464..8f662d6f735 100644 --- a/plugins/in_tail/tail_fs_stat.c +++ b/plugins/in_tail/tail_fs_stat.c @@ -136,6 +136,10 @@ static int tail_fs_check(struct flb_input_instance *ins, file->name, size_delta); file->offset = offset; file->buf_len = 0; + file->anchor_offset = offset; + file->skip_bytes = 0; + file->exclude_bytes = 0; + file->skipping_mode = FLB_FALSE; memcpy(&fst->st, &st, sizeof(struct stat)); #ifdef FLB_HAVE_SQLDB diff --git a/plugins/in_tail/tail_sql.h b/plugins/in_tail/tail_sql.h index e157450e353..dca5952df14 100644 --- a/plugins/in_tail/tail_sql.h +++ b/plugins/in_tail/tail_sql.h @@ -34,21 +34,23 @@ " offset INTEGER," \ " inode INTEGER," \ " created INTEGER," \ - " rotated INTEGER DEFAULT 0" \ + " rotated INTEGER DEFAULT 0," \ + " skip INTEGER DEFAULT 0," \ + " anchor INTEGER DEFAULT 0" \ ");" #define SQL_GET_FILE \ "SELECT * from in_tail_files WHERE inode=@inode order by id desc;" -#define SQL_INSERT_FILE \ - "INSERT INTO in_tail_files (name, offset, inode, created)" \ - " VALUES (@name, @offset, @inode, @created);" +#define SQL_INSERT_FILE \ + "INSERT INTO in_tail_files (name, offset, inode, created, skip, anchor)" \ + " VALUES (@name, @offset, @inode, @created, @skip, @anchor);" #define SQL_ROTATE_FILE \ "UPDATE in_tail_files set name=@name,rotated=1 WHERE id=@id;" -#define SQL_UPDATE_OFFSET \ - "UPDATE in_tail_files set offset=@offset WHERE id=@id;" +#define SQL_UPDATE_OFFSET \ + "UPDATE in_tail_files set offset=@offset, skip=@skip, anchor=@anchor WHERE id=@id;" #define SQL_DELETE_FILE \ "DELETE FROM in_tail_files WHERE id=@id;"