From 1116ebcb5caac844bdacba06bd6f8b412e2f2bdf Mon Sep 17 00:00:00 2001 From: Tomasz Wilk Date: Fri, 11 Jul 2025 14:20:57 -0400 Subject: [PATCH] plugins: out_azure_blob: add support for logging a specific key Signed-off-by: Tomasz Wilk --- plugins/out_azure_blob/azure_blob.c | 136 +++++++++++++++++++++-- plugins/out_azure_blob/azure_blob.h | 1 + plugins/out_azure_blob/azure_blob_conf.c | 4 + 3 files changed, 134 insertions(+), 7 deletions(-) diff --git a/plugins/out_azure_blob/azure_blob.c b/plugins/out_azure_blob/azure_blob.c index a650809c4d4..cabc3dfcf17 100644 --- a/plugins/out_azure_blob/azure_blob.c +++ b/plugins/out_azure_blob/azure_blob.c @@ -31,6 +31,8 @@ #include #include #include +#include +#include #include @@ -53,6 +55,113 @@ struct worker_info { FLB_TLS_DEFINE(struct worker_info, worker_info); +static flb_sds_t cb_azb_msgpack_extract_log_key(void *out_context, const char *data, + uint64_t bytes) +{ + struct flb_azure_blob *ctx = out_context; + flb_sds_t out_buf = NULL; + msgpack_unpacked result; + msgpack_object root; + msgpack_object map; + struct flb_record_accessor *ra = NULL; + struct flb_ra_value *rval = NULL; + size_t off = 0; + + ra = flb_ra_create(ctx->log_key, FLB_FALSE); + if (!ra) { + flb_plg_error(ctx->ins, "invalid record accessor pattern '%s'", ctx->log_key); + flb_errno(); + return NULL; + } + + /* Unpack the data */ + msgpack_unpacked_init(&result); + while (1) { + msgpack_unpack_return ret = msgpack_unpack_next(&result, data, bytes, &off); + if (ret == MSGPACK_UNPACK_SUCCESS) { + root = result.data; + if (root.type != MSGPACK_OBJECT_ARRAY) { + continue; + } + + if (root.via.array.size < 2) { + flb_plg_debug(ctx->ins, "msgpack array has insufficient elements"); + continue; + } + + map = root.via.array.ptr[1]; + + /* Get value using record accessor */ + rval = flb_ra_get_value_object(ra, map); + if (!rval) { + flb_plg_error(ctx->ins, "could not find field '%s'", ctx->log_key); + continue; + } + + /* Convert value based on its type */ + if (rval->type == FLB_RA_STRING) { + out_buf = flb_sds_create_size(rval->o.via.str.size + 1); + if (out_buf) { + flb_sds_copy(out_buf, rval->o.via.str.ptr, rval->o.via.str.size); + flb_sds_cat(out_buf, "\n", 1); + } + } + else if (rval->type == FLB_RA_FLOAT) { + out_buf = flb_sds_create_size(64); + if (out_buf) { + flb_sds_printf(&out_buf, "%f\n", rval->val.f64); + } + } + else if (rval->type == FLB_RA_INT) { + out_buf = flb_sds_create_size(64); + if (out_buf) { + flb_sds_printf(&out_buf, "%" PRId64 "\n", rval->val.i64); + } + } + else { + flb_errno(); + flb_plg_error(ctx->ins, "cannot convert given value for field '%s'", ctx->log_key); + flb_ra_key_value_destroy(rval); + rval = NULL; + break; + } + + /* Check if buffer allocation succeeded */ + if (!out_buf) { + flb_errno(); + flb_plg_error(ctx->ins, "could not allocate output buffer"); + } + + flb_ra_key_value_destroy(rval); + rval = NULL; + + /* Successfully found and processed log_key, exit loop */ + break; + } + else if (ret == MSGPACK_UNPACK_CONTINUE) { + /* Buffer exhausted or truncated data, stop processing */ + flb_plg_debug(ctx->ins, "msgpack unpack needs more data or data truncated"); + break; + } + else if (ret == MSGPACK_UNPACK_PARSE_ERROR) { + flb_errno(); + flb_plg_error(ctx->ins, "msgpack parse error"); + break; + } + else { + flb_errno(); + flb_plg_error(ctx->ins, "unexpected msgpack unpack return code %d", ret); + break; + } + } + + /* Clean up */ + msgpack_unpacked_destroy(&result); + flb_ra_destroy(ra); + + return out_buf; +} + static int azure_blob_format(struct flb_config *config, struct flb_input_instance *ins, void *plugin_context, @@ -65,11 +174,16 @@ static int azure_blob_format(struct flb_config *config, flb_sds_t out_buf; struct flb_azure_blob *ctx = plugin_context; - out_buf = flb_pack_msgpack_to_json_format(data, bytes, - FLB_PACK_JSON_FORMAT_LINES, - FLB_PACK_JSON_DATE_ISO8601, - ctx->date_key, - config->json_escape_unicode); + if (ctx->log_key) { + out_buf = cb_azb_msgpack_extract_log_key(ctx, data, bytes); + } + else { + out_buf = flb_pack_msgpack_to_json_format(data, bytes, + FLB_PACK_JSON_FORMAT_LINES, + FLB_PACK_JSON_DATE_ISO8601, + ctx->date_key, + config->json_escape_unicode); + } if (!out_buf) { return -1; } @@ -713,7 +827,7 @@ static int ensure_container(struct flb_azure_blob *ctx) ctx->container_name); return FLB_FALSE; } - + flb_plg_error(ctx->ins, "get container request failed, status=%i", status); @@ -1780,6 +1894,14 @@ static struct flb_config_map config_map[] = { "Set the block type: appendblob or blockblob" }, + { + FLB_CONFIG_MAP_STR, "log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_azure_blob, log_key), + "By default, the whole log record will be sent to blob storage. " + "If you specify a key name with this option, then only the value of " + "that key will be sent" + }, + { FLB_CONFIG_MAP_STR, "compress", NULL, 0, FLB_FALSE, 0, @@ -1939,7 +2061,7 @@ static struct flb_config_map config_map[] = { "Whether to delete the buffered file early after successful blob creation. Default is false" }, - { + { FLB_CONFIG_MAP_INT, "blob_uri_length", "64", 0, FLB_TRUE, offsetof(struct flb_azure_blob, blob_uri_length), "Set the length of generated blob uri before ingesting to Azure Kusto. Default is 64" diff --git a/plugins/out_azure_blob/azure_blob.h b/plugins/out_azure_blob/azure_blob.h index 8699dda54f8..d83ccedf252 100644 --- a/plugins/out_azure_blob/azure_blob.h +++ b/plugins/out_azure_blob/azure_blob.h @@ -59,6 +59,7 @@ struct flb_azure_blob { flb_sds_t account_name; flb_sds_t container_name; flb_sds_t blob_type; + flb_sds_t log_key; flb_sds_t shared_key; flb_sds_t endpoint; flb_sds_t path; diff --git a/plugins/out_azure_blob/azure_blob_conf.c b/plugins/out_azure_blob/azure_blob_conf.c index ea883a01852..4ba199c3ca6 100644 --- a/plugins/out_azure_blob/azure_blob_conf.c +++ b/plugins/out_azure_blob/azure_blob_conf.c @@ -801,6 +801,10 @@ void flb_azure_blob_conf_destroy(struct flb_azure_blob *ctx) flb_sds_destroy(ctx->container_name); ctx->container_name = NULL; } + if (ctx->log_key) { + flb_sds_destroy(ctx->log_key); + ctx->log_key = NULL; + } if (ctx->path_overriden_flag == FLB_TRUE) { flb_sds_destroy(ctx->path); ctx->path = NULL;