diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java index 2ee6d6df43168..38c9c7f623f6b 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; import org.apache.flink.util.Preconditions; import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory; @@ -32,6 +33,7 @@ import java.io.IOException; import java.net.URL; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -39,6 +41,8 @@ */ @PublicEvolving public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + public static final String REPORTER_ID_GROUPING_KEY = + "flink_prometheus_push_gateway_reporter_id"; private final PushGateway pushGateway; private final String jobName; @@ -50,6 +54,7 @@ public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter im PrometheusPushGatewayReporter( URL hostUrl, String jobName, + boolean metricsGroupingByReporter, Map groupingKey, final boolean deleteOnShutdown, @Nullable String username, @@ -64,7 +69,10 @@ public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter im this.basicAuthEnabled = false; } this.jobName = Preconditions.checkNotNull(jobName); - this.groupingKey = Preconditions.checkNotNull(groupingKey); + this.groupingKey = + metricsGroupingByReporter + ? groupWithReporterId(groupingKey) + : new LinkedHashMap<>(groupingKey); this.deleteOnShutdown = deleteOnShutdown; } @@ -96,4 +104,20 @@ public void close() { } super.close(); } + + /** + * (FLINK-21309) Put "flink_prometheus_push_gateway_reporter_id" as the last entry of the + * grouping keys, so that each taskmanger instance and jobmanager will not collide their metrics + * in the PushGateway. + */ + static Map groupWithReporterId(Map origin) { + Preconditions.checkNotNull(origin); + Map groupingKey = new LinkedHashMap<>(origin); + if (origin.containsKey(REPORTER_ID_GROUPING_KEY)) { + throw new IllegalArgumentException( + "Grouping keys must not contain the reserved key: " + REPORTER_ID_GROUPING_KEY); + } + groupingKey.put(REPORTER_ID_GROUPING_KEY, new AbstractID().toString()); + return groupingKey; + } } diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java index b08f5c0fd5d23..3cc90e03a3171 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java @@ -87,6 +87,7 @@ public PrometheusPushGatewayReporter createMetricReporter(Properties properties) new PrometheusPushGatewayReporter( new URL(hostUrl), jobName, + !randomSuffix, groupingKey, deleteOnShutdown, username, diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java index bdb48e9c611b8..cdf8f531a83b7 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java @@ -45,12 +45,16 @@ public class PrometheusPushGatewayReporterOptions { .defaultValue("") .withDescription("The job name under which metrics will be pushed"); + @Deprecated public static final ConfigOption RANDOM_JOB_NAME_SUFFIX = ConfigOptions.key("randomJobNameSuffix") .booleanType() .defaultValue(true) .withDescription( - "Specifies whether a random suffix should be appended to the job name."); + "Specifies whether to suffix `job` label with random unique ids to avoid metric collision among reporters from taskmanagers / jobmanager." + + " When enabled (default / old way to avoid collision), each taskmanager / jobmanager will append a UUID to the `job` label when reporting to pushgateway." + + " When disabled, all taskmanagers / jobmanager share the same `job` label as configured, but they group metrics under a random reporter id key so as to avoid collision." + + " This option is deprecated and it is recommended to disable it in order to rely on the reporter id grouping key."); public static final ConfigOption DELETE_ON_SHUTDOWN = ConfigOptions.key("deleteOnShutdown") diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterTest.java index 81be12eaf843a..2beb2f3b5c8de 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterTest.java @@ -25,10 +25,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.event.Level; +import java.util.Iterator; import java.util.Map; +import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.REPORTER_ID_GROUPING_KEY; +import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.GROUPING_KEY; import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST_URL; import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PASSWORD; +import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX; import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.USERNAME; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -61,6 +65,35 @@ void testParseIncompleteGroupingKey() { assertThat(groupingKey).isEmpty(); } + @Test + void testGroupingKeysIteratorEndsWithReporterId() { + Map groupingKeyWithReporterId = + PrometheusPushGatewayReporter.groupWithReporterId( + PrometheusPushGatewayReporterFactory.parseGroupingKey("k1=v1;k2=v2")); + assertThat(groupingKeyWithReporterId.size()).isEqualTo(3); + Iterator it = groupingKeyWithReporterId.keySet().iterator(); + for (int i = 0; i < 2; i++) { + it.next(); + } + assertThat(it.next()).isEqualTo(REPORTER_ID_GROUPING_KEY); + } + + @Test + void testRejectReporterIdInUserGroupingKey() { + MetricConfig metricConfig = new MetricConfig(); + metricConfig.setProperty(HOST_URL.key(), "http://localhost:8080"); + metricConfig.setProperty(GROUPING_KEY.key(), "k1=v1;" + REPORTER_ID_GROUPING_KEY + "=123"); + metricConfig.setProperty(RANDOM_JOB_NAME_SUFFIX.key(), "false"); + assertThatThrownBy( + () -> + new PrometheusPushGatewayReporterFactory() + .createMetricReporter(metricConfig)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Grouping keys must not contain the reserved key: " + + REPORTER_ID_GROUPING_KEY); + } + @Test void testConnectToPushGatewayUsingHostUrl() { PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();