diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index fe24f3937d5..16123e634d6 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -199,12 +199,153 @@ static inline int try_to_write(char *buf, int *off, size_t left, return FLB_TRUE; } +static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, int *offset) { + char ts[KEY_ATTRIBUTES_MAX_LEN]; + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"keyAttributes\":{",0)) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"Type\":\"Service\"",0)) { + goto error; + } + if(stream->entity->key_attributes->name != NULL && strlen(stream->entity->key_attributes->name) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"Name\":\"",stream->entity->key_attributes->name,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->key_attributes->environment != NULL && strlen(stream->entity->key_attributes->environment) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"Environment\":\"",stream->entity->key_attributes->environment,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->key_attributes->account_id != NULL && strlen(stream->entity->key_attributes->account_id) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"AwsAccountId\":\"",stream->entity->key_attributes->account_id,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "},", 2)) { + goto error; + } + return 0; +error: + return -1; +} + +static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream,int *offset) { + char ts[ATTRIBUTES_MAX_LEN]; + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"attributes\":{", + 0)) { + goto error; + } + if (stream->entity->attributes->platform_type != NULL && strlen(stream->entity->attributes->platform_type) != 0) { + if (strcmp(stream->entity->attributes->platform_type, "eks") == 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s","\"PlatformType\":\"","AWS::EKS","\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + if(stream->entity->attributes->cluster_name != NULL && strlen(stream->entity->attributes->cluster_name) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"EKS.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + } else if (strcmp(stream->entity->attributes->platform_type, "k8s") == 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s","\"PlatformType\":\"","K8s","\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + if(stream->entity->attributes->cluster_name != NULL && strlen(stream->entity->attributes->cluster_name) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Cluster\":\"",stream->entity->attributes->cluster_name,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + } + } else { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s","\"PlatformType\":\"","Generic","\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->namespace != NULL && strlen(stream->entity->attributes->namespace) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Namespace\":\"",stream->entity->attributes->namespace,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->node != NULL && strlen(stream->entity->attributes->node) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Node\":\"",buf->current_stream->entity->attributes->node,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->workload != NULL && strlen(stream->entity->attributes->workload) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Workload\":\"",buf->current_stream->entity->attributes->workload,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->instance_id != NULL && strlen(stream->entity->attributes->instance_id) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"EC2.InstanceId\":\"",buf->current_stream->entity->attributes->instance_id,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->name_source != NULL && strlen(stream->entity->attributes->name_source) != 0) { + if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"AWS.ServiceNameSource\":\"",buf->current_stream->entity->attributes->name_source,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "}", 1)) { + goto error; + } + return 0; +error: + return -1; +} + /* * Writes the "header" for a put log events payload */ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, int *offset) { + int ret; if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "{\"logGroupName\":\"", 17)) { goto error; @@ -229,6 +370,35 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, "\",", 2)) { goto error; } + // If we are missing the service name, the entity will get rejected by the frontend anyway + // so do not emit entity unless service name is filled. If we are missing account ID + // it is considered not having sufficient information for entity therefore we should drop the entity. + if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes != NULL && stream->entity->key_attributes->name != NULL && stream->entity->key_attributes->account_id != NULL) { + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"entity\":{", 10)) { + goto error; + } + + if(stream->entity->key_attributes != NULL) { + ret = entity_add_key_attributes(ctx,buf,stream,offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize Entity KeyAttributes"); + goto error; + } + } + if(stream->entity->attributes != NULL) { + ret = entity_add_attributes(ctx,buf,stream,offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize Entity Attributes"); + goto error; + } + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "},", 2)) { + goto error; + } + + } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "\"logEvents\":[", 13)) { @@ -366,6 +536,69 @@ static int truncate_log(const struct flb_cloudwatch *ctx, const char *log_buffer return FLB_FALSE; } +/* + * Helper function to remove keys prefixed with aws_entity + * from a message pack map + */ +void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer *pk, int filtered_fields) { + const int remaining_kv_pairs = nested_map->size - filtered_fields; + uint32_t j; + + // Pack the updated nested map into the packer, skipping keys in the remove list + msgpack_pack_map(pk, remaining_kv_pairs); // Initial size will adjust in the next loop + + for (j = 0; j < nested_map->size; j++) { + msgpack_object_kv nested_kv = nested_map->ptr[j]; + + // Check if the current key is in the removal list + if (nested_kv.key.type == MSGPACK_OBJECT_STR && nested_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(nested_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { + // Skip the key in the remove list + continue; + } + + // Pack the remaining key-value pairs into the packer + msgpack_pack_object(pk, nested_kv.key); + msgpack_pack_object(pk, nested_kv.val); + } +} + +/* + * Main function to remove keys prefixed with aws_entity + * from the root and nested message pack map + */ +void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key, msgpack_packer *pk,int root_filtered_fields, int filtered_fields) { + uint32_t i; + + if (root_map->type == MSGPACK_OBJECT_MAP) { + msgpack_object_map root = root_map->via.map; + + // Prepare to pack the modified root map (size may be unchanged or reduced) + msgpack_pack_map(pk, root.size-root_filtered_fields); + + for (i = 0; i < root.size; i++) { + msgpack_object_kv root_kv = root.ptr[i]; + + // Check if this key matches the nested map key (e.g., "kubernetes") + if (filtered_fields > 0 && + root_kv.key.type == MSGPACK_OBJECT_STR && + strncmp(root_kv.key.via.str.ptr, nested_map_key, root_kv.key.via.str.size) == 0 && + root_kv.val.type == MSGPACK_OBJECT_MAP) { + + // Pack the nested map key + msgpack_pack_object(pk, root_kv.key); + + // Remove the unneeded key from the nested map + remove_key_from_nested_map(&root_kv.val.via.map, pk,filtered_fields); + } else if (root_filtered_fields > 0 && root_kv.key.type == MSGPACK_OBJECT_STR && root_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(root_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { + } else { + // Pack other key-value pairs unchanged + msgpack_pack_object(pk, root_kv.key); + msgpack_pack_object(pk, root_kv.val); + } + } + } +} + /* * Processes the msgpack object @@ -786,6 +1019,175 @@ int pack_emf_payload(struct flb_cloudwatch *ctx, return 0; } +static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entity) { + if(!ctx->add_entity || entity == NULL) { + return NULL; + } + char *fallback_env = NULL; + int ret; + /* + * Possible fallback environments: + * 1. eks:cluster-name/namespace + * 2. k8s:cluster-name/namespace + */ + if (entity->attributes->platform_type != NULL && entity->attributes->cluster_name != NULL && entity->attributes->namespace != NULL) { + /* Calculate required length */ + /* Add 3 for ':' '/' and null terminator */ + size_t len = strlen(entity->attributes->platform_type) + + strlen(entity->attributes->cluster_name) + + strlen(entity->attributes->namespace) + 3; + + fallback_env = flb_malloc(len); + if (!fallback_env) { + return NULL; + } + + /* Use snprintf for cross-platform compatibility */ + ret = snprintf(fallback_env, len, "%s:%s/%s", entity->attributes->platform_type, entity->attributes->cluster_name, entity->attributes->namespace); + if (ret < 0 || ret >= len) { + flb_free(fallback_env); + return NULL; + } + + return fallback_env; + } + return NULL; +} + +void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map, int map_size) { + int i,j; + msgpack_object key, kube_key; + msgpack_object val, kube_val; + + int val_map_size; + for(i=0; i < map_size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + if(strncmp(key.via.str.ptr, "kubernetes",10 ) == 0 ) { + if (val.type == MSGPACK_OBJECT_MAP) { + val_map_size = val.via.map.size; + for (j=0; j < val_map_size; j++) { + kube_key = val.via.map.ptr[j].key; + kube_val = val.via.map.ptr[j].val; + if(strncmp(kube_key.via.str.ptr, "aws_entity_service_name", kube_key.via.str.size) == 0) { + if(!entity->service_name_found) { + entity->filter_count++; + entity->service_name_found++; + } + if(entity->key_attributes->name != NULL) { + flb_free(entity->key_attributes->name); + } + entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_environment", kube_key.via.str.size) == 0) { + if(!entity->environment_found) { + entity->filter_count++; + entity->environment_found++; + } + if(entity->key_attributes->environment != NULL) { + flb_free(entity->key_attributes->environment); + } + entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "namespace_name", kube_key.via.str.size) == 0) { + if(entity->attributes->namespace != NULL) { + flb_free(entity->attributes->namespace); + } + entity->attributes->namespace = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "host", kube_key.via.str.size) == 0) { + if(entity->attributes->node != NULL) { + flb_free(entity->attributes->node); + } + entity->attributes->node = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_cluster", kube_key.via.str.size) == 0) { + if(entity->attributes->cluster_name == NULL) { + entity->filter_count++; + } else { + flb_free(entity->attributes->cluster_name); + } + entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_workload", kube_key.via.str.size) == 0) { + if(entity->attributes->workload == NULL) { + entity->filter_count++; + } else { + flb_free(entity->attributes->workload); + } + entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_name_source", kube_key.via.str.size) == 0) { + if(!entity->name_source_found) { + entity->filter_count++; + entity->name_source_found++; + } + if(entity->attributes->name_source != NULL) { + flb_free(entity->attributes->name_source); + } + entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } else if(strncmp(kube_key.via.str.ptr, "aws_entity_platform", kube_key.via.str.size) == 0) { + if(entity->attributes->platform_type == NULL) { + entity->filter_count++; + } else { + flb_free(entity->attributes->platform_type); + } + entity->attributes->platform_type = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); + } + } + } + } + if(strncmp(key.via.str.ptr, "aws_entity_ec2_instance_id",key.via.str.size ) == 0 ) { + if(entity->attributes->instance_id == NULL) { + entity->root_filter_count++; + } else { + flb_free(entity->attributes->instance_id); + } + entity->attributes->instance_id = flb_strndup(val.via.str.ptr, val.via.str.size); + } + if(strncmp(key.via.str.ptr, "aws_entity_account_id",key.via.str.size ) == 0 ) { + if(entity->key_attributes->account_id == NULL) { + entity->root_filter_count++; + } else { + flb_free(entity->key_attributes->account_id); + } + entity->key_attributes->account_id = flb_strndup(val.via.str.ptr, val.via.str.size); + } + } + if(entity->key_attributes->name == NULL && entity->attributes->name_source == NULL &&entity->attributes->workload != NULL) { + entity->key_attributes->name = flb_strndup(entity->attributes->workload, strlen(entity->attributes->workload)); + entity->attributes->name_source = flb_strndup("K8sWorkload", 11); + } + if(entity->key_attributes->environment == NULL) { + entity->key_attributes->environment = find_fallback_environment(ctx, entity); + } +} + +void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stream, const msgpack_object map) { + if(stream->entity == NULL) { + stream->entity = flb_malloc(sizeof(entity)); + if (stream->entity == NULL) { + return; + } + memset(stream->entity, 0, sizeof(entity)); + + stream->entity->key_attributes = flb_malloc(sizeof(entity_key_attributes)); + if (stream->entity->key_attributes == NULL) { + return; + } + memset(stream->entity->key_attributes, 0, sizeof(entity_key_attributes)); + + stream->entity->attributes = flb_malloc(sizeof(entity_attributes)); + if (stream->entity->attributes == NULL) { + return; + } + memset(stream->entity->attributes, 0, sizeof(entity_attributes)); + stream->entity->filter_count = 0; + stream->entity->root_filter_count = 0; + stream->entity->service_name_found = 0; + stream->entity->environment_found = 0; + stream->entity->name_source_found = 0; + } + parse_entity(ctx,stream->entity,map, map.via.map.size); + if (!stream->entity) { + flb_plg_warn(ctx->ins, "Failed to generate entity"); + } +} + static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plugin, struct cw_flush *buf, flb_sds_t tag, const char *data, size_t bytes) @@ -800,6 +1202,12 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug msgpack_object emf_payload; /* msgpack::sbuffer is a simple buffer implementation. */ msgpack_sbuffer mp_sbuf; + /* + * Msgpack objects used to store msgpack after filtering out fields + * with aws entity prefix + */ + msgpack_sbuffer filtered_sbuf; + msgpack_unpacked modified_unpacked; struct log_stream *stream; @@ -848,11 +1256,30 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug map = *log_event.body; map_size = map.via.map.size; + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_init(&filtered_sbuf); + msgpack_unpacked_init(&modified_unpacked); + } stream = get_log_stream(ctx, tag, map); if (!stream) { flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag); goto error; } + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + update_or_create_entity(ctx,stream,map); + // Prepare a buffer to pack the modified map + if(stream->entity != NULL && (stream->entity->root_filter_count > 0 || stream->entity->filter_count > 0)) { + msgpack_packer pk; + msgpack_packer_init(&pk, &filtered_sbuf, msgpack_sbuffer_write); + remove_unneeded_field(&map, "kubernetes",&pk,stream->entity->root_filter_count, stream->entity->filter_count); + + // Now, unpack the modified data into a new msgpack_object + size_t modified_offset = 0; + if (msgpack_unpack_next(&modified_unpacked, filtered_sbuf.data, filtered_sbuf.size, &modified_offset)) { + map = modified_unpacked.data; + } + } + } if (ctx->log_key) { key_str = NULL; @@ -974,6 +1401,10 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug if (ret == 0) { i++; } + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_destroy(&filtered_sbuf); + msgpack_unpacked_destroy(&modified_unpacked); + } } flb_log_event_decoder_destroy(&log_decoder); @@ -981,7 +1412,10 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug error: flb_log_event_decoder_destroy(&log_decoder); - + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_destroy(&filtered_sbuf); + msgpack_unpacked_destroy(&modified_unpacked); + } return -1; } @@ -1537,6 +1971,8 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, if (c) { flb_plg_debug(ctx->ins, "PutLogEvents http status=%d", c->resp.status); + flb_plg_debug(ctx->ins, "PutLogEvents http data=%s", c->resp.data); + flb_plg_debug(ctx->ins, "PutLogEvents http payload=%s", c->resp.payload); if (c->resp.status == 200) { if (c->resp.data == NULL || c->resp.data_len == 0 || strcasestr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) { diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.h b/plugins/out_cloudwatch_logs/cloudwatch_api.h index 05abfff30a1..280b60bd485 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.h @@ -35,9 +35,19 @@ /* number of characters needed to 'end' a PutLogEvents payload */ #define PUT_LOG_EVENTS_FOOTER_LEN 4 +// https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html +/* Maxinum number of character limits including both the KeyAttributes key and its value */ +#define KEY_ATTRIBUTES_MAX_LEN 1100 +/* Maxinum number of character limits including both the Attributes key and its value */ +#define ATTRIBUTES_MAX_LEN 300 + /* 256KiB minus 26 bytes for the event */ #define MAX_EVENT_LEN 262118 +/* Prefix used for entity fields only */ +#define AWS_ENTITY_PREFIX "aws_entity" +#define AWS_ENTITY_PREFIX_LEN 10 + #include "cloudwatch_logs.h" void cw_flush_destroy(struct cw_flush *buf); diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index c5e808ae141..02bcfa681e2 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -378,6 +378,15 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, flb_output_upstream_set(upstream, ctx->ins); ctx->cw_client->host = ctx->endpoint; + struct mk_list *head; + struct flb_filter_instance *f_ins; + mk_list_foreach(head, &config->filters) { + f_ins = mk_list_entry(head, struct flb_filter_instance, _head); + if (strstr(f_ins->p->name, "kubernetes")) { + ctx->kubernete_metadata_enabled = true; + } + } + /* Export context */ flb_output_set_context(ins, ctx); @@ -530,6 +539,27 @@ static int cb_cloudwatch_exit(void *data, struct flb_config *config) return 0; } +void entity_destroy(entity *entity) { + if(entity->attributes) { + flb_free(entity->attributes->cluster_name); + flb_free(entity->attributes->instance_id); + flb_free(entity->attributes->namespace); + flb_free(entity->attributes->node); + flb_free(entity->attributes->platform_type); + flb_free(entity->attributes->workload); + flb_free(entity->attributes->name_source); + flb_free(entity->attributes); + } + if(entity->key_attributes) { + flb_free(entity->key_attributes->environment); + flb_free(entity->key_attributes->name); + flb_free(entity->key_attributes->type); + flb_free(entity->key_attributes->account_id); + flb_free(entity->key_attributes); + } + flb_free(entity); +} + void log_stream_destroy(struct log_stream *stream) { if (stream) { @@ -539,6 +569,9 @@ void log_stream_destroy(struct log_stream *stream) if (stream->group) { flb_sds_destroy(stream->group); } + if (stream->entity) { + entity_destroy(stream->entity); + } flb_free(stream); } } @@ -689,6 +722,12 @@ static struct flb_config_map config_map[] = { "Specify the log storage class. Valid values are STANDARD (default) and INFREQUENT_ACCESS." }, + { + FLB_CONFIG_MAP_BOOL, "add_entity", "false", + 0, FLB_TRUE, offsetof(struct flb_cloudwatch, add_entity), + "add entity to PutLogEvent calls" + }, + /* EOF */ {0} }; diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index 3724863426a..1971cffa2b4 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -30,6 +30,39 @@ #include #include +/* Entity object used for associating the telemetry + * in the PutLogEvent call*/ +typedef struct entity { + struct entity_key_attributes *key_attributes; + struct entity_attributes *attributes; + int filter_count; + int service_name_found; + int environment_found; + int name_source_found; + int root_filter_count; +}entity; + +/* KeyAttributes used for CloudWatch Entity object + * in the PutLogEvent call*/ +typedef struct entity_key_attributes { + char *type; + char *name; + char *environment; + char *account_id; +}entity_key_attributes; + +/* Attributes used for CloudWatch Entity object + * in the PutLogEvent call*/ +typedef struct entity_attributes { + char *platform_type; + char *cluster_name; + char *namespace; + char *workload; + char *node; + char *instance_id; + char *name_source; +}entity_attributes; + #define LOG_CLASS_STANDARD "STANDARD" #define LOG_CLASS_STANDARD_LEN 8 #define LOG_CLASS_INFREQUENT_ACCESS "INFREQUENT_ACCESS" @@ -94,6 +127,13 @@ struct log_stream { unsigned long long oldest_event; unsigned long long newest_event; + /* + * PutLogEvents entity object + * variable that store service or infrastructure + * information + */ + struct entity *entity; + struct mk_list _head; }; @@ -159,6 +199,14 @@ struct flb_cloudwatch { /* Plugin output instance reference */ struct flb_output_instance *ins; + + /* Checks if kubernete filter is enabled + * So the plugin knows when to scrape for Entity + */ + + int kubernete_metadata_enabled; + + int add_entity; }; void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx);