Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encode_opentelemetry: add cut off for otel payloads for prometheus mimir #223

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions include/cmetrics/cmt_encode_opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,32 @@
#define CMT_ENCODE_OPENTELEMETRY_INVALID_ARGUMENT_ERROR 2
#define CMT_ENCODE_OPENTELEMETRY_UNEXPECTED_METRIC_TYPE 3
#define CMT_ENCODE_OPENTELEMETRY_DATA_POINT_INIT_ERROR 4
#define CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR 5

#define CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD 300000000000L /* 5 minutes in nanoseconds */
#define CMT_ENCODE_OPENTELEMETRY_CUTOFF_DISABLED -1L /* disabled */


struct cmt_opentelemetry_context
{
size_t resource_index;
Opentelemetry__Proto__Metrics__V1__MetricsData *metrics_data;
struct cmt *cmt;
int use_cutoff;
int64_t cutoff_threshold;
};

struct cmt_opentelemetry_context_opts
{
int use_cutoff;
int64_t cutoff_threshold;
};

cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt);
cfl_sds_t cmt_encode_opentelemetry_create_with_cutoff(struct cmt *cmt, int use_cutoff);
cfl_sds_t cmt_encode_opentelemetry_create_with_cutoff_opts(struct cmt *cmt,
struct cmt_opentelemetry_context_opts *opts);

void cmt_encode_opentelemetry_destroy(cfl_sds_t text);

#endif
89 changes: 84 additions & 5 deletions src/cmt_encode_opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ static void destroy_opentelemetry_context(
struct cmt_opentelemetry_context *context);

static struct cmt_opentelemetry_context *initialize_opentelemetry_context(
struct cmt *cmt);
struct cmt *cmt, struct cmt_opentelemetry_context_opts *opts);

static inline Opentelemetry__Proto__Common__V1__AnyValue *cfl_variant_to_otlp_any_value(struct cfl_variant *value);
static inline Opentelemetry__Proto__Common__V1__KeyValue *cfl_variant_kvpair_to_otlp_kvpair(struct cfl_kvpair *input_pair);
Expand Down Expand Up @@ -2138,7 +2138,7 @@ static Opentelemetry__Proto__Resource__V1__Resource *
}

static struct cmt_opentelemetry_context *initialize_opentelemetry_context(
struct cmt *cmt)
struct cmt *cmt, struct cmt_opentelemetry_context_opts *opts)
{
struct cfl_kvlist *resource_metrics_root;
struct cfl_kvlist *scope_metrics_root;
Expand Down Expand Up @@ -2166,6 +2166,8 @@ static struct cmt_opentelemetry_context *initialize_opentelemetry_context(
memset(context, 0, sizeof(struct cmt_opentelemetry_context));

context->cmt = cmt;
context->use_cutoff = opts->use_cutoff;
context->cutoff_threshold = opts->cutoff_threshold;

resource = initialize_resource(resource_root, &result);

Expand Down Expand Up @@ -2369,6 +2371,17 @@ int append_sample_to_metric(struct cmt_opentelemetry_context *context,
return result;
}

static int check_staled_timestamp(struct cmt_metric *metric, uint64_t now, uint64_t cutoff)
{
uint64_t ts;
uint64_t diff;

ts = cmt_metric_get_timestamp(metric);
diff = now - ts;

return diff > cutoff;
}

int pack_basic_type(struct cmt_opentelemetry_context *context,
struct cmt_map *map,
size_t *metric_index)
Expand All @@ -2382,8 +2395,11 @@ int pack_basic_type(struct cmt_opentelemetry_context *context,
Opentelemetry__Proto__Metrics__V1__Metric *metric;
int result;
struct cfl_list *head;
uint64_t now;
int cutoff = CMT_FALSE;

sample_count = 0;
now = cfl_time_now();

if (map->metric_static_set) {
sample_count++;
Expand Down Expand Up @@ -2434,6 +2450,15 @@ int pack_basic_type(struct cmt_opentelemetry_context *context,
&map->metric,
sample_index++);

if (context->use_cutoff == CMT_TRUE &&
check_staled_timestamp(&map->metric, now,
context->cutoff_threshold)) {
destroy_metric(metric);

/* Skip processing metrics which are staled over the threshold */
return CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR;
}

if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
destroy_metric(metric);

Expand All @@ -2444,6 +2469,15 @@ int pack_basic_type(struct cmt_opentelemetry_context *context,
cfl_list_foreach(head, &map->metrics) {
sample = cfl_list_entry(head, struct cmt_metric, _head);

if (context->use_cutoff == CMT_TRUE &&
check_staled_timestamp(&map->metric, now,
context->cutoff_threshold)) {

/* Skip processing metrics which are staled over the threshold */
cutoff = CMT_TRUE;
continue;
}

result = append_sample_to_metric(context,
metric,
map,
Expand Down Expand Up @@ -2473,6 +2507,10 @@ int pack_basic_type(struct cmt_opentelemetry_context *context,

(*metric_index)++;

if (cutoff == CMT_TRUE) {
return CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR;
}

return result;
}

Expand All @@ -2498,7 +2536,8 @@ static cfl_sds_t render_opentelemetry_context_to_sds(
return result_buffer;
}

cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
cfl_sds_t cmt_encode_opentelemetry_create_with_cutoff_opts(struct cmt *cmt,
struct cmt_opentelemetry_context_opts *opts)
{
size_t metric_index;
struct cmt_opentelemetry_context *context;
Expand All @@ -2514,7 +2553,7 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
buf = NULL;
result = 0;

context = initialize_opentelemetry_context(cmt);
context = initialize_opentelemetry_context(cmt, opts);

if (context == NULL) {
return NULL;
Expand All @@ -2527,6 +2566,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
counter = cfl_list_entry(head, struct cmt_counter, _head);
result = pack_basic_type(context, counter->map, &metric_index);

if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
continue;
}

if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
break;
}
Expand All @@ -2538,6 +2581,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
gauge = cfl_list_entry(head, struct cmt_gauge, _head);
result = pack_basic_type(context, gauge->map, &metric_index);

if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
continue;
}

if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
break;
}
Expand All @@ -2549,6 +2596,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
untyped = cfl_list_entry(head, struct cmt_untyped, _head);
result = pack_basic_type(context, untyped->map, &metric_index);

if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
continue;
}

if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
break;
}
Expand All @@ -2560,6 +2611,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
summary = cfl_list_entry(head, struct cmt_summary, _head);
result = pack_basic_type(context, summary->map, &metric_index);

if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
continue;
}

if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
break;
}
Expand All @@ -2571,21 +2626,45 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
histogram = cfl_list_entry(head, struct cmt_histogram, _head);
result = pack_basic_type(context, histogram->map, &metric_index);

if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
continue;
}

if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
break;
}
}
}

if (result == CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
if (metric_index > 0) {
buf = render_opentelemetry_context_to_sds(context);
} else {
buf = NULL;
}

destroy_opentelemetry_context(context);

return buf;
}

cfl_sds_t cmt_encode_opentelemetry_create_with_cutoff(struct cmt *cmt, int use_cutoff)
{
struct cmt_opentelemetry_context_opts opts;
opts.use_cutoff = use_cutoff;
opts.cutoff_threshold = CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD;

return cmt_encode_opentelemetry_create_with_cutoff_opts(cmt, &opts);
}

cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
{
struct cmt_opentelemetry_context_opts opts;
opts.use_cutoff = CMT_FALSE;
opts.cutoff_threshold = CMT_ENCODE_OPENTELEMETRY_CUTOFF_DISABLED;

return cmt_encode_opentelemetry_create_with_cutoff_opts(cmt, &opts);
}

void cmt_encode_opentelemetry_destroy(cfl_sds_t text)
{
cfl_sds_destroy(text);
Expand Down
14 changes: 10 additions & 4 deletions tests/decoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,17 @@

#include "cmt_tests.h"

static struct cmt *generate_encoder_test_data()
static struct cmt *generate_encoder_test_data_with_timestamp(uint64_t ts)
{
double quantiles[5];
struct cmt_histogram_buckets *buckets;
double val;
struct cmt *cmt;
uint64_t ts;
struct cmt_gauge *g1;
struct cmt_counter *c1;
struct cmt_summary *s1;
struct cmt_histogram *h1;

ts = 0;
cmt = cmt_create();

c1 = cmt_counter_create(cmt, "kubernetes", "network", "load_counter", "Network load counter",
Expand Down Expand Up @@ -124,6 +122,14 @@ static struct cmt *generate_encoder_test_data()
return cmt;
}

static struct cmt *generate_encoder_test_data_now()
{
uint64_t ts = 0;
ts = cfl_time_now();

return generate_encoder_test_data_with_timestamp(ts);
}

void test_opentelemetry()
{
cfl_sds_t reference_prometheus_context;
Expand All @@ -139,7 +145,7 @@ void test_opentelemetry()

cmt_initialize();

cmt = generate_encoder_test_data();
cmt = generate_encoder_test_data_now();
TEST_CHECK(cmt != NULL);

reference_prometheus_context = cmt_encode_prometheus_create(cmt, CMT_TRUE);
Expand Down
50 changes: 48 additions & 2 deletions tests/encoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -583,10 +583,12 @@ void test_opentelemetry()
cfl_sds_t payload;
struct cmt *cmt;
FILE *sample_file;
uint64_t ts;

cmt_initialize();
ts = cfl_time_now();

cmt = generate_encoder_test_data();
cmt = generate_encoder_test_data_with_timestamp(ts);

payload = cmt_encode_opentelemetry_create(cmt);
TEST_CHECK(NULL != payload);
Expand All @@ -609,7 +611,49 @@ curl -v 'http://localhost:9090/v1/metrics' -H 'Content-Type: application/x-proto

fclose(sample_file);

cmt_encode_prometheus_remote_write_destroy(payload);
cmt_encode_opentelemetry_destroy(payload);

cmt_destroy(cmt);
}

void test_opentelemetry_outdated()
{
cfl_sds_t payload;
struct cmt *cmt;
uint64_t ts;

cmt_initialize();
ts = cfl_time_now() - CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD * 1.5;

cmt = generate_encoder_test_data_with_timestamp(ts);

payload = cmt_encode_opentelemetry_create_with_cutoff(cmt, CMT_TRUE);
TEST_CHECK(NULL == payload);

cmt_encode_opentelemetry_destroy(payload);

cmt_destroy(cmt);
}

void test_opentelemetry_outdated_with_cutoff_opts()
{
cfl_sds_t payload;
struct cmt *cmt;
uint64_t ts;
struct cmt_opentelemetry_context_opts opts;

opts.use_cutoff = CMT_TRUE;
opts.cutoff_threshold = CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD;

cmt_initialize();
ts = cfl_time_now() - CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD * 1.5;

cmt = generate_encoder_test_data_with_timestamp(ts);

payload = cmt_encode_opentelemetry_create_with_cutoff_opts(cmt, &opts);
TEST_CHECK(NULL == payload);

cmt_encode_opentelemetry_destroy(payload);

cmt_destroy(cmt);
}
Expand Down Expand Up @@ -1173,6 +1217,8 @@ TEST_LIST = {
{"cmt_msgpack_labels", test_cmt_to_msgpack_labels},
{"cmt_msgpack", test_cmt_to_msgpack},
{"opentelemetry", test_opentelemetry},
{"opentelemetry_old_context", test_opentelemetry_outdated},
{"opentelemetry_cutoff_opts", test_opentelemetry_outdated_with_cutoff_opts},
{"cloudwatch_emf", test_cloudwatch_emf},
{"prometheus", test_prometheus},
{"text", test_text},
Expand Down
Loading