@@ -644,18 +644,38 @@ int pack_basic_metric_sample(struct cmt_prometheus_remote_write_context *context
644
644
return append_metric_to_timeseries (time_series , metric );
645
645
}
646
646
647
+ static int check_staled_timestamp (struct cmt_metric * metric , uint64_t now , uint64_t cutoff )
648
+ {
649
+ uint64_t ts ;
650
+ uint64_t diff ;
651
+
652
+ ts = cmt_metric_get_timestamp (metric );
653
+ diff = now - ts ;
654
+
655
+ return diff > cutoff ;
656
+ }
657
+
647
658
int pack_basic_type (struct cmt_prometheus_remote_write_context * context ,
648
659
struct cmt_map * map )
649
660
{
650
661
int add_metadata ;
651
662
struct cmt_metric * metric ;
652
663
int result ;
653
664
struct cfl_list * head ;
665
+ uint64_t now ;
654
666
655
667
context -> sequence_number ++ ;
656
668
add_metadata = CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_ADD_METADATA ;
657
669
670
+ now = cfl_time_now ();
671
+
658
672
if (map -> metric_static_set == CMT_TRUE ) {
673
+ if (check_staled_timestamp (& map -> metric , now ,
674
+ CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD )) {
675
+ /* Skip processing metrics which are staled over the threshold */
676
+ return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR ;
677
+ }
678
+
659
679
result = pack_basic_metric_sample (context , map , & map -> metric , add_metadata );
660
680
661
681
if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS ) {
@@ -666,6 +686,12 @@ int pack_basic_type(struct cmt_prometheus_remote_write_context *context,
666
686
cfl_list_foreach (head , & map -> metrics ) {
667
687
metric = cfl_list_entry (head , struct cmt_metric , _head );
668
688
689
+ if (check_staled_timestamp (metric , now ,
690
+ CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD )) {
691
+ /* Skip processing metrics which are staled over over the threshold */
692
+ return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR ;
693
+ }
694
+
669
695
result = pack_basic_metric_sample (context , map , metric , add_metadata );
670
696
671
697
if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS ) {
@@ -699,6 +725,15 @@ int pack_complex_metric_sample(struct cmt_prometheus_remote_write_context *conte
699
725
struct cmt_summary * summary ;
700
726
int result ;
701
727
size_t index ;
728
+ uint64_t now ;
729
+
730
+ now = cfl_time_now ();
731
+
732
+ if (check_staled_timestamp (metric , now ,
733
+ CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_THRESHOLD )) {
734
+ /* Skip processing metrics which are staled over the threshold */
735
+ return CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR ;
736
+ }
702
737
703
738
additional_label_caption = cfl_sds_create_len (NULL , 128 );
704
739
@@ -1067,6 +1102,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
1067
1102
counter = cfl_list_entry (head , struct cmt_counter , _head );
1068
1103
result = pack_basic_type (& context , counter -> map );
1069
1104
1105
+ if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR ) {
1106
+ continue ;
1107
+ }
1108
+
1070
1109
if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS ) {
1071
1110
break ;
1072
1111
}
@@ -1078,6 +1117,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
1078
1117
gauge = cfl_list_entry (head , struct cmt_gauge , _head );
1079
1118
result = pack_basic_type (& context , gauge -> map );
1080
1119
1120
+ if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR ) {
1121
+ continue ;
1122
+ }
1123
+
1081
1124
if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS ) {
1082
1125
break ;
1083
1126
}
@@ -1089,6 +1132,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
1089
1132
cfl_list_foreach (head , & cmt -> untypeds ) {
1090
1133
untyped = cfl_list_entry (head , struct cmt_untyped , _head );
1091
1134
pack_basic_type (& context , untyped -> map );
1135
+
1136
+ if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR ) {
1137
+ continue ;
1138
+ }
1092
1139
}
1093
1140
}
1094
1141
@@ -1098,6 +1145,10 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
1098
1145
summary = cfl_list_entry (head , struct cmt_summary , _head );
1099
1146
result = pack_complex_type (& context , summary -> map );
1100
1147
1148
+ if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR ) {
1149
+ continue ;
1150
+ }
1151
+
1101
1152
if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS ) {
1102
1153
break ;
1103
1154
}
@@ -1110,13 +1161,18 @@ cfl_sds_t cmt_encode_prometheus_remote_write_create(struct cmt *cmt)
1110
1161
histogram = cfl_list_entry (head , struct cmt_histogram , _head );
1111
1162
result = pack_complex_type (& context , histogram -> map );
1112
1163
1164
+ if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR ) {
1165
+ continue ;
1166
+ }
1167
+
1113
1168
if (result != CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS ) {
1114
1169
break ;
1115
1170
}
1116
1171
}
1117
1172
}
1118
1173
1119
- if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS ) {
1174
+ if (result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_SUCCESS ||
1175
+ result == CMT_ENCODE_PROMETHEUS_REMOTE_WRITE_CUTOFF_ERROR ) {
1120
1176
buf = render_remote_write_context_to_sds (& context );
1121
1177
}
1122
1178
0 commit comments