Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 87 additions & 13 deletions plugins/out_azure_blob/azure_blob.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_config_map.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_compression.h>
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_base64.h>
#include <fluent-bit/flb_sqldb.h>
#include <fluent-bit/flb_input_blob.h>
Expand Down Expand Up @@ -136,7 +138,7 @@ static int construct_request_buffer(struct flb_azure_blob *ctx, flb_sds_t new_da
}
body = buffered_data = tmp;
memcpy(body + buffer_size, new_data, flb_sds_len(new_data));
if (ctx->compress_gzip == FLB_FALSE){
if (ctx->compression == FLB_COMPRESSION_ALGORITHM_NONE) {
body[body_size] = '\0';
}
}
Expand All @@ -149,6 +151,38 @@ static int construct_request_buffer(struct flb_azure_blob *ctx, flb_sds_t new_da
return 0;
}

/*
* Compress a payload using the configured algorithm. Returns 0 on success and
* negative on failure so callers can gracefully fall back to sending the raw
* payload.
*/
static int azure_blob_compress_payload(int algorithm,
void *in_data, size_t in_len,
void **out_data, size_t *out_len)
{
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
return flb_gzip_compress(in_data, in_len, out_data, out_len);
}
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
return flb_zstd_compress(in_data, in_len, out_data, out_len);
}

return -1;
}

/* Map a compression algorithm to its human-friendly label for logs. */
static const char *azure_blob_compression_name(int algorithm)
{
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
return "gzip";
}
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
return "zstd";
}

return "unknown";
}

void generate_random_string_blob(char *str, size_t length)
{
const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
Expand Down Expand Up @@ -332,8 +366,12 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
{
int ret;
int compressed = FLB_FALSE;
int content_encoding = FLB_FALSE;
int content_type = FLB_FALSE;
int content_encoding = AZURE_BLOB_CE_NONE;
int content_type = AZURE_BLOB_CT_NONE;
int compression_algorithm = FLB_COMPRESSION_ALGORITHM_NONE;
int network_compression_algorithm = ctx->compression;
int network_compression_applied = FLB_FALSE;
int blob_compression_applied = FLB_FALSE;
size_t b_sent;
void *payload_buf;
size_t payload_size;
Expand All @@ -358,27 +396,62 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
payload_buf = data;
payload_size = bytes;

/* Determine compression algorithm */
if (network_compression_algorithm != FLB_COMPRESSION_ALGORITHM_NONE) {
compression_algorithm = network_compression_algorithm;
}

if (ctx->compress_blob == FLB_TRUE) {
if (compression_algorithm == FLB_COMPRESSION_ALGORITHM_NONE) {
compression_algorithm = FLB_COMPRESSION_ALGORITHM_GZIP;
}
}

/* Handle compression requests */
if (ctx->compress_gzip == FLB_TRUE || ctx->compress_blob == FLB_TRUE) {
ret = flb_gzip_compress((void *) data, bytes, &payload_buf, &payload_size);
if (compression_algorithm != FLB_COMPRESSION_ALGORITHM_NONE) {
ret = azure_blob_compress_payload(compression_algorithm,
(void *) data, bytes,
&payload_buf, &payload_size);
if (ret == 0) {
compressed = FLB_TRUE;
if (network_compression_algorithm != FLB_COMPRESSION_ALGORITHM_NONE) {
network_compression_applied = FLB_TRUE;
}
if (ctx->compress_blob == FLB_TRUE) {
blob_compression_applied = FLB_TRUE;
}
}
else {
const char *alg_name;

alg_name = azure_blob_compression_name(compression_algorithm);
flb_plg_warn(ctx->ins,
"cannot gzip payload, disabling compression");
"cannot %s payload, disabling compression",
alg_name);
payload_buf = data;
payload_size = bytes;
compression_algorithm = FLB_COMPRESSION_ALGORITHM_NONE;
}
}

/* set http header flags */
if (ctx->compress_blob == FLB_TRUE) {
if (blob_compression_applied == FLB_TRUE) {
content_encoding = AZURE_BLOB_CE_NONE;
content_type = AZURE_BLOB_CT_GZIP;

if (compression_algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
content_type = AZURE_BLOB_CT_ZSTD;
}
else {
content_type = AZURE_BLOB_CT_GZIP;
}
}
else if (compressed == FLB_TRUE) {
content_encoding = AZURE_BLOB_CE_GZIP;
else if (network_compression_applied == FLB_TRUE) {
if (network_compression_algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
content_encoding = AZURE_BLOB_CE_GZIP;
}
else if (network_compression_algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
content_encoding = AZURE_BLOB_CE_ZSTD;
}
content_type = AZURE_BLOB_CT_JSON;
}

Expand Down Expand Up @@ -1783,14 +1856,15 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "compress", NULL,
0, FLB_FALSE, 0,
"Set payload compression in network transfer. Option available is 'gzip'"
"Set payload compression in network transfer. Options: 'gzip', 'zstd'"
},

{
FLB_CONFIG_MAP_BOOL, "compress_blob", "false",
0, FLB_TRUE, offsetof(struct flb_azure_blob, compress_blob),
"Enable block blob GZIP compression in the final blob file. This option is "
"not compatible with 'appendblob' block type"
"Enable block blob compression in the final blob file (defaults to gzip, "
"uses the 'compress' codec when set). This option is not compatible with "
"'appendblob' block type"
},

{
Expand Down
4 changes: 3 additions & 1 deletion plugins/out_azure_blob/azure_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
#define AZURE_BLOB_CT_NONE 0
#define AZURE_BLOB_CT_JSON 1 /* application/json */
#define AZURE_BLOB_CT_GZIP 2 /* application/gzip */
#define AZURE_BLOB_CT_ZSTD 3 /* application/zstd */

/* Content-Encoding */
#define AZURE_BLOB_CE "Content-Encoding"
#define AZURE_BLOB_CE_NONE 0
#define AZURE_BLOB_CE_GZIP 1 /* gzip */
#define AZURE_BLOB_CE_ZSTD 2 /* zstd */

/* service endpoint */
#define AZURE_ENDPOINT_PREFIX ".blob.core.windows.net"
Expand All @@ -54,7 +56,7 @@
struct flb_azure_blob {
int auto_create_container;
int emulator_mode;
int compress_gzip;
int compression;
int compress_blob;
flb_sds_t account_name;
flb_sds_t container_name;
Expand Down
32 changes: 18 additions & 14 deletions plugins/out_azure_blob/azure_blob_blockblob.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_hash.h>
#include <fluent-bit/flb_crypto_constants.h>
#include <fluent-bit/flb_compression.h>

#include <math.h>

Expand All @@ -31,6 +32,19 @@
#include "azure_blob_uri.h"
#include "azure_blob_http.h"

static const char *azb_blob_extension(struct flb_azure_blob *ctx)
{
if (ctx->compress_blob != FLB_TRUE) {
return "";
}

if (ctx->compression == FLB_COMPRESSION_ALGORITHM_ZSTD) {
return ".zst";
}

return ".gz";
}

flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name)
{
flb_sds_t uri;
Expand Down Expand Up @@ -60,7 +74,7 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name,
{
int len;
flb_sds_t uri;
char *ext;
const char *ext;
char *encoded_blockid;

len = strlen(blockid);
Expand All @@ -75,12 +89,7 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name,
return NULL;
}

if (ctx->compress_blob == FLB_TRUE) {
ext = ".gz";
}
else {
ext = "";
}
ext = azb_blob_extension(ctx);

if (ctx->path) {
if (ms > 0) {
Expand Down Expand Up @@ -114,20 +123,15 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name,
flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx,
char *tag, uint64_t ms, char *str)
{
char *ext;
const char *ext;
flb_sds_t uri;

uri = azb_uri_container(ctx);
if (!uri) {
return NULL;
}

if (ctx->compress_blob == FLB_TRUE) {
ext = ".gz";
}
else {
ext = "";
}
ext = azb_blob_extension(ctx);

if (ctx->path) {
flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?comp=blocklist", ctx->path, tag, str,
Expand Down
14 changes: 11 additions & 3 deletions plugins/out_azure_blob/azure_blob_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_base64.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_compression.h>

#include "azure_blob.h"
#include "azure_blob_conf.h"
Expand Down Expand Up @@ -655,12 +656,19 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
return NULL;
}

/* Compress (gzip) */
/* Compress payload over the wire */
tmp = (char *) flb_output_get_property("compress", ins);
ctx->compress_gzip = FLB_FALSE;
ctx->compression = FLB_COMPRESSION_ALGORITHM_NONE;
if (tmp) {
if (strcasecmp(tmp, "gzip") == 0) {
ctx->compress_gzip = FLB_TRUE;
ctx->compression = FLB_COMPRESSION_ALGORITHM_GZIP;
}
else if (strcasecmp(tmp, "zstd") == 0) {
ctx->compression = FLB_COMPRESSION_ALGORITHM_ZSTD;
}
else {
flb_plg_error(ctx->ins, "invalid compress value '%s' (supported: gzip, zstd)", tmp);
return NULL;
}
}

Expand Down
16 changes: 16 additions & 0 deletions plugins/out_azure_blob/azure_blob_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx,
if (content_encoding == AZURE_BLOB_CE_GZIP) {
encoding = "gzip";
}
else if (content_encoding == AZURE_BLOB_CE_ZSTD) {
encoding = "zstd";
}
else {
encoding = "";
}
Expand Down Expand Up @@ -223,6 +226,9 @@ flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx,
else if (content_type == AZURE_BLOB_CT_GZIP) {
ctype = "application/gzip";
}
else if (content_type == AZURE_BLOB_CT_ZSTD) {
ctype = "application/zstd";
}

flb_sds_printf(&can_req,
"\n" /* Content-MD5 */
Expand Down Expand Up @@ -315,12 +321,22 @@ int azb_http_client_setup(struct flb_azure_blob *ctx, struct flb_http_client *c,
AZURE_BLOB_CT, sizeof(AZURE_BLOB_CT) - 1,
"application/gzip", 16);
}
else if (content_type == AZURE_BLOB_CT_ZSTD) {
flb_http_add_header(c,
AZURE_BLOB_CT, sizeof(AZURE_BLOB_CT) - 1,
"application/zstd", 16);
}

if (content_encoding == AZURE_BLOB_CE_GZIP) {
flb_http_add_header(c,
AZURE_BLOB_CE, sizeof(AZURE_BLOB_CE) - 1,
"gzip", 4);
}
else if (content_encoding == AZURE_BLOB_CE_ZSTD) {
flb_http_add_header(c,
AZURE_BLOB_CE, sizeof(AZURE_BLOB_CE) - 1,
"zstd", 4);
}

/* Azure header: x-ms-blob-type */
if (blob_type == FLB_TRUE) {
Expand Down
3 changes: 2 additions & 1 deletion plugins/out_azure_blob/azure_blob_http.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ int azb_http_client_setup(struct flb_azure_blob *ctx, struct flb_http_client *c,
flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx,
struct flb_http_client *c,
ssize_t content_length,
int content_type);
int content_type,
int content_encoding);

#endif
1 change: 1 addition & 0 deletions tests/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(UNIT_TESTS_FILES
unit_sizes.c
hashtable.c
http_client.c
azure_blob.c
utils.c
gzip.c
zstd.c
Expand Down
Loading