Skip to content

Commit 666423a

Browse files
committed
[out_oracle_log_analytics] refactor OCI_logan plugin code for clarity and consistency
Signed-off-by: rghouzra <[email protected]>
1 parent c052891 commit 666423a

File tree

3 files changed

+273
-198
lines changed

3 files changed

+273
-198
lines changed

plugins/out_oracle_log_analytics/oci_logan.c

Lines changed: 61 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,8 @@ static int build_headers(struct flb_http_client *c, struct flb_oci_logan *ctx,
197197
flb_sds_t encoded_uri = NULL;
198198
flb_sds_t signature = NULL;
199199
flb_sds_t auth_header_str = NULL;
200-
201200
flb_sds_t tmp_ref = NULL;
202-
203201
size_t tmp_len = 0;
204-
205202
unsigned char sha256_buf[32] = { 0 };
206203

207204
tmp_sds = flb_sds_create_size(512);
@@ -576,6 +573,7 @@ static int cb_oci_logan_init(struct flb_output_instance *ins,
576573
struct flb_config *config, void *data)
577574
{
578575
struct flb_oci_logan *ctx;
576+
579577
ctx = flb_oci_logan_conf_create(ins, config);
580578
if (!ctx) {
581579
flb_plg_error(ins, "cannot initialize plugin");
@@ -663,29 +661,33 @@ static flb_sds_t compose_uri(struct flb_oci_logan *ctx,
663661
return full_uri;
664662
}
665663

666-
// Checks if the current security token needs refresh(expires within 5min)
664+
/* Checks if the current security token needs refresh(expires within 5min) */
667665
static int token_needs_refresh(struct flb_oci_logan *ctx)
668666
{
669-
time_t now = time(NULL);
667+
time_t now;
670668

669+
now = time(NULL);
671670
return (ctx->security_token.expires_at - now) < 300;
672671
}
673672

674-
// Refreshes the security token if it's close to expiration
673+
/* Refreshes the security token if it's close to expiration */
675674
static int refresh_security_token_if_needed(struct flb_oci_logan *ctx)
676675
{
676+
flb_sds_t json_payload;
677+
flb_sds_t response;
678+
677679
if (!token_needs_refresh(ctx)) {
678680
return 0;
679681
}
680682

681-
flb_sds_t json_payload = create_federation_payload(ctx);
683+
json_payload = create_federation_payload(ctx);
682684
if (!json_payload) {
683685
flb_plg_error(ctx->ins,
684686
"failed to create federation payload for token refresh");
685687
return -1;
686688
}
687689

688-
flb_sds_t response = sign_and_send_federation_request(ctx, json_payload);
690+
response = sign_and_send_federation_request(ctx, json_payload);
689691
flb_sds_destroy(json_payload);
690692

691693
if (!response) {
@@ -722,9 +724,10 @@ static flb_sds_t sign_oci_request_with_security_token_for_logging(struct
722724
unsigned char *b64_out = NULL;
723725
size_t sig_len = 0;
724726
BIO *bio = NULL;
727+
size_t b64_size, olen;
725728
EVP_PKEY *pkey = NULL;
726729
EVP_MD_CTX *md_ctx = NULL;
727-
730+
const char *headers_list;
728731

729732
string_to_sign = flb_sds_create_size(2048);
730733
if (!string_to_sign) {
@@ -784,8 +787,8 @@ static flb_sds_t sign_oci_request_with_security_token_for_logging(struct
784787
goto cleanup;
785788
}
786789

787-
size_t b64_size = ((sig_len + 2) / 3) * 4 + 1;
788-
size_t olen = 0;
790+
b64_size = ((sig_len + 2) / 3) * 4 + 1;
791+
olen = 0;
789792
b64_out = flb_malloc(b64_size);
790793
if (!b64_out) {
791794
goto cleanup;
@@ -801,7 +804,7 @@ static flb_sds_t sign_oci_request_with_security_token_for_logging(struct
801804
goto cleanup;
802805
}
803806

804-
const char *headers_list = payload && payload_size > 0 ?
807+
headers_list = payload && payload_size > 0 ?
805808
"date (request-target) host x-content-sha256 content-type content-length"
806809
: "date (request-target) host x-content-sha256 content-length";
807810

@@ -878,7 +881,8 @@ struct flb_http_client *create_oci_signed_request_for_logging(struct
878881
char date_buf[128];
879882
time_t now;
880883
struct tm *tm_info;
881-
884+
size_t date_len;
885+
const char *user_agent;
882886

883887
if (refresh_security_token_if_needed(ctx) < 0) {
884888
flb_plg_error(ctx->ins, "refresh_security_token_if_needed failed ");
@@ -897,7 +901,7 @@ struct flb_http_client *create_oci_signed_request_for_logging(struct
897901
return NULL;
898902
}
899903

900-
size_t date_len =
904+
date_len =
901905
strftime(date_buf, sizeof(date_buf) - 1, "%a, %d %b %Y %H:%M:%S GMT",
902906
tm_info);
903907
if (date_len == 0) {
@@ -941,7 +945,7 @@ struct flb_http_client *create_oci_signed_request_for_logging(struct
941945

942946
flb_http_add_header(client, "opc-retry-token", 15, content_sha256,
943947
strlen(content_sha256));
944-
const char *user_agent = "fluent-bit-oci-plugin/1.0";
948+
user_agent = "fluent-bit-oci-plugin/1.0";
945949
flb_http_add_header(client, "User-Agent", 10, user_agent,
946950
strlen(user_agent));
947951

@@ -981,19 +985,22 @@ static void dump_payload_to_file(struct flb_oci_logan *ctx, flb_sds_t payload,
981985
{
982986
char hash_in_hex[66];
983987
char filename[1024];
988+
char *content_sha256;
989+
int i;
990+
size_t payload_size;
991+
FILE *fp;
984992

985993
if (!ctx->payload_files_location) {
986994
flb_plg_error(ctx->ins,
987995
"directory path for dumping should be specified");
988996
return;
989997
}
990-
size_t payload_size = flb_sds_len(payload);
991-
char *content_sha256 =
992-
calculate_content_sha256_b64(payload, payload_size);
998+
payload_size = flb_sds_len(payload);
999+
content_sha256 = calculate_content_sha256_b64(payload, payload_size);
9931000
if (!content_sha256) {
9941001
return;
9951002
}
996-
int i;
1003+
9971004
for (i = 0; i < SHA256_DIGEST_LENGTH; i++) {
9981005
sprintf(hash_in_hex + (i * 2), "%02x", content_sha256[i]);
9991006
}
@@ -1007,7 +1014,7 @@ static void dump_payload_to_file(struct flb_oci_logan *ctx, flb_sds_t payload,
10071014
return;
10081015
}
10091016

1010-
FILE *fp = fopen(filename, "w");
1017+
fp = fopen(filename, "w");
10111018
if (!fp) {
10121019
flb_plg_error(ctx->ins, "cant open file -> %s", filename);
10131020
flb_free(content_sha256);
@@ -1031,6 +1038,7 @@ static int flush_to_endpoint(struct flb_oci_logan *ctx,
10311038
flb_sds_t full_uri;
10321039
struct flb_http_client *c = NULL;
10331040
struct flb_connection *u_conn;
1041+
bool should_retry;
10341042

10351043
if (!payload) {
10361044
return FLB_ERROR;
@@ -1053,7 +1061,7 @@ static int flush_to_endpoint(struct flb_oci_logan *ctx,
10531061
flb_sds_destroy(full_uri);
10541062
return FLB_ERROR;
10551063
}
1056-
bool should_retry = 0;
1064+
should_retry = 0;
10571065
if (strcmp(ctx->auth_type, "instance_principal") == 0) {
10581066
c = create_oci_signed_request_for_logging(ctx, u_conn,
10591067
full_uri,
@@ -1346,6 +1354,8 @@ static void pack_oci_fields(msgpack_packer *packer, struct flb_oci_logan *ctx,
13461354
static int check_metadata_dot_notation(msgpack_object key,
13471355
char **metadata_key)
13481356
{
1357+
int meta_key_start, meta_key_len;
1358+
13491359
if (key.type != MSGPACK_OBJECT_STR) {
13501360
return FLB_FALSE;
13511361
}
@@ -1363,8 +1373,8 @@ static int check_metadata_dot_notation(msgpack_object key,
13631373
if (key.via.str.ptr[FLB_OCI_METADATA_KEY_SIZE] != '.') {
13641374
return FLB_FALSE;
13651375
}
1366-
int meta_key_start = FLB_OCI_METADATA_KEY_SIZE + 1;
1367-
int meta_key_len = key.via.str.size - meta_key_start;
1376+
meta_key_start = FLB_OCI_METADATA_KEY_SIZE + 1;
1377+
meta_key_len = key.via.str.size - meta_key_start;
13681378
if (meta_key_len <= 0) {
13691379
return FLB_FALSE;
13701380
}
@@ -1406,11 +1416,16 @@ static int get_and_pack_oci_fields_from_record(msgpack_packer *packer,
14061416
const char *log_path_value = NULL;
14071417
int log_path_len = 0;
14081418

1409-
//handling metadata
1419+
char tz_str[256];
1420+
int tz_len;
14101421
struct mk_list dot_metadata_list;
14111422
struct metadata_obj *dot_meta_obj;
14121423
char *dot_meta_key;
14131424
int has_dot_metadata = 0;
1425+
int total_meta_fields;
1426+
struct mk_list *head;
1427+
struct mk_list *tmp;
1428+
int j;
14141429

14151430
mk_list_init(&dot_metadata_list);
14161431

@@ -1620,8 +1635,7 @@ static int get_and_pack_oci_fields_from_record(msgpack_packer *packer,
16201635
}
16211636

16221637
if (log_timezone) {
1623-
char tz_str[256];
1624-
int tz_len = log_timezone->via.str.size;
1638+
tz_len = log_timezone->via.str.size;
16251639
if (tz_len >= sizeof(tz_str)) {
16261640
tz_len = sizeof(tz_str) - 1;
16271641
}
@@ -1649,7 +1663,7 @@ static int get_and_pack_oci_fields_from_record(msgpack_packer *packer,
16491663
msgpack_pack_str_body(packer, FLB_OCI_LOG_METADATA,
16501664
FLB_OCI_LOG_METADATA_SIZE);
16511665

1652-
int total_meta_fields = 0;
1666+
total_meta_fields = 0;
16531667
if (metadata) {
16541668
total_meta_fields += metadata->via.map.size;
16551669
}
@@ -1660,15 +1674,13 @@ static int get_and_pack_oci_fields_from_record(msgpack_packer *packer,
16601674
msgpack_pack_map(packer, total_meta_fields);
16611675

16621676
if (metadata) {
1663-
int j;
16641677
for (j = 0; j < metadata->via.map.size; j++) {
16651678
msgpack_pack_object(packer, metadata->via.map.ptr[j].key);
16661679
msgpack_pack_object(packer, metadata->via.map.ptr[j].val);
16671680
}
16681681
}
16691682

16701683
if (has_dot_metadata) {
1671-
struct mk_list *head;
16721684
mk_list_foreach(head, &dot_metadata_list) {
16731685
dot_meta_obj =
16741686
mk_list_entry(head, struct metadata_obj, _head);
@@ -1698,8 +1710,6 @@ static int get_and_pack_oci_fields_from_record(msgpack_packer *packer,
16981710
}
16991711
}
17001712
if (has_dot_metadata) {
1701-
struct mk_list *tmp;
1702-
struct mk_list *head;
17031713
mk_list_foreach_safe(head, tmp, &dot_metadata_list) {
17041714
dot_meta_obj = mk_list_entry(head, struct metadata_obj, _head);
17051715
if (dot_meta_obj->key)
@@ -1729,6 +1739,9 @@ static int send_batch_with_count(struct flb_oci_logan *ctx,
17291739
msgpack_object map;
17301740
char *tag;
17311741
int tag_len;
1742+
int skip;
1743+
int packed_records;
1744+
int map_size;
17321745

17331746
tag = event_chunk->tag;
17341747
tag_len = ((event_chunk->tag) ? flb_sds_len(event_chunk->tag) : 0);
@@ -1753,7 +1766,6 @@ static int send_batch_with_count(struct flb_oci_logan *ctx,
17531766
log_set_id = ctx->oci_la_log_set_id;
17541767
}
17551768
else {
1756-
int skip;
17571769
for (skip = 0; skip < start_record; skip++) {
17581770
if (flb_log_event_decoder_next(&log_decoder, &log_event) !=
17591771
FLB_EVENT_DECODER_SUCCESS) {
@@ -1803,14 +1815,14 @@ static int send_batch_with_count(struct flb_oci_logan *ctx,
18031815
current_record++;
18041816
}
18051817

1806-
int packed_records = 0;
1818+
packed_records = 0;
18071819
while (packed_records < record_count &&
18081820
flb_log_event_decoder_next(&log_decoder,
18091821
&log_event) ==
18101822
FLB_EVENT_DECODER_SUCCESS) {
18111823

18121824
map = *log_event.body;
1813-
int map_size = map.via.map.size;
1825+
map_size = map.via.map.size;
18141826

18151827
/* lookupfor message or log field */
18161828
msg = -1;
@@ -1851,9 +1863,8 @@ static int send_batch_with_count(struct flb_oci_logan *ctx,
18511863

18521864
flb_log_event_decoder_destroy(&log_decoder);
18531865

1854-
out_buf =
1855-
flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size,
1856-
config->json_escape_unicode);
1866+
out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size,
1867+
config->json_escape_unicode);
18571868
flb_plg_debug(ctx->ins, "out->buf [%s]", out_buf);
18581869
msgpack_sbuffer_destroy(&mp_sbuf);
18591870

@@ -1887,6 +1898,9 @@ static int total_flush(struct flb_event_chunk *event_chunk,
18871898
size_t estimated_total_size = 0;
18881899
flb_sds_t log_group_id = NULL;
18891900
flb_sds_t log_set_id = NULL;
1901+
int batches_needed, records_per_batch, start_record, remaining_records,
1902+
batch_size;
1903+
msgpack_object map;
18901904

18911905
/* intialize decoder for counting record and estimate size */
18921906
ret = flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data,
@@ -1899,7 +1913,7 @@ static int total_flush(struct flb_event_chunk *event_chunk,
18991913

19001914
while (flb_log_event_decoder_next(&log_decoder, &log_event) ==
19011915
FLB_EVENT_DECODER_SUCCESS) {
1902-
msgpack_object map = *log_event.body;
1916+
map = *log_event.body;
19031917

19041918
for (i = 0; i < map.via.map.size; i++) {
19051919
if (map.via.map.ptr[i].val.type == MSGPACK_OBJECT_STR) {
@@ -1926,12 +1940,13 @@ static int total_flush(struct flb_event_chunk *event_chunk,
19261940
}
19271941

19281942
/* calculate batching parameters */
1929-
int batches_needed = (estimated_total_size + MAX_PAYLOAD_SIZE_BYTES - 1) / MAX_PAYLOAD_SIZE_BYTES;
1943+
batches_needed = (estimated_total_size + MAX_PAYLOAD_SIZE_BYTES -
1944+
1) / MAX_PAYLOAD_SIZE_BYTES;
19301945
if (batches_needed < 1) {
19311946
batches_needed = 1;
19321947
}
1933-
1934-
int records_per_batch = (total_records + batches_needed - 1) / batches_needed;
1948+
1949+
records_per_batch = (total_records + batches_needed - 1) / batches_needed;
19351950
if (records_per_batch < 1) {
19361951
records_per_batch = 1;
19371952
}
@@ -1941,13 +1956,13 @@ static int total_flush(struct flb_event_chunk *event_chunk,
19411956
batches_needed, records_per_batch);
19421957

19431958
/* send data in batches */
1944-
int start_record = 0;
1945-
int remaining_records = total_records;
1959+
start_record = 0;
1960+
remaining_records = total_records;
19461961

19471962
while (remaining_records > 0) {
1948-
int batch_size =
1949-
(remaining_records >
1950-
records_per_batch) ? records_per_batch : remaining_records;
1963+
batch_size = (remaining_records >
1964+
records_per_batch) ? records_per_batch :
1965+
remaining_records;
19511966

19521967
ret =
19531968
send_batch_with_count(ctx, event_chunk, start_record, batch_size,

0 commit comments

Comments
 (0)