Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;

import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory;
Expand All @@ -32,13 +33,20 @@

import java.io.IOException;
import java.net.URL;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

/**
* {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus {@link PushGateway}.
*/
@PublicEvolving
public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
public static final String REPORTER_ID_GROUPING_KEY =
"flink_prometheus_push_gateway_reporter_id";

private final PushGateway pushGateway;
private final String jobName;
Expand All @@ -50,6 +58,7 @@ public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter im
PrometheusPushGatewayReporter(
URL hostUrl,
String jobName,
boolean metricsGroupingByReporter,
Map<String, String> groupingKey,
final boolean deleteOnShutdown,
@Nullable String username,
Expand All @@ -64,7 +73,10 @@ public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter im
this.basicAuthEnabled = false;
}
this.jobName = Preconditions.checkNotNull(jobName);
this.groupingKey = Preconditions.checkNotNull(groupingKey);
this.groupingKey =
metricsGroupingByReporter
? new GroupingKeyMap(Preconditions.checkNotNull(groupingKey))
: Preconditions.checkNotNull(groupingKey);
this.deleteOnShutdown = deleteOnShutdown;
}

Expand Down Expand Up @@ -96,4 +108,68 @@ public void close() {
}
super.close();
}

/**
* (FLINK-21309) Put "flink_prometheus_push_gateway_reporter_id" as the last entry of the
* grouping keys, so that each taskmanger instance and jobmanager will not collide their metrics
* in the PushGateway.
*/
static class GroupingKeyMap extends AbstractMap<String, String> {
private final Map<String, String> customGroupingKeys;
private final String reporterId = new AbstractID().toString();
private final Set<Entry<String, String>> entrySet = new GroupingKeySet();

GroupingKeyMap(Map<String, String> customGroupingKeys) {
if (customGroupingKeys.containsKey(REPORTER_ID_GROUPPING_KEY)) {
throw new IllegalArgumentException(
Copy link
Contributor

@rionmonster rionmonster Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll want to add a test here to cover the assertion against the user supplying "flink_prometheus_push_gateway_reporter_id" as an explicit grouping (asserting the expected exception).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"Grouping keys must not contain the reserved key: "
+ REPORTER_ID_GROUPPING_KEY);
}
this.customGroupingKeys = customGroupingKeys;
}

@Override
public Set<Entry<String, String>> entrySet() {
return entrySet;
}

private class GroupingKeySet extends AbstractSet<Map.Entry<String, String>> {
@Override
public Iterator<Entry<String, String>> iterator() {

return new Iterator<>() {
private Iterator<Entry<String, String>> customEntryIterator =
customGroupingKeys.entrySet().iterator();
private boolean consumedLast = false;

@Override
public boolean hasNext() {
return customEntryIterator.hasNext() || !consumedLast;
}

@Override
public Entry<String, String> next() {
if (customEntryIterator.hasNext()) {
return customEntryIterator.next();
}
if (!consumedLast) {
consumedLast = true;
return new AbstractMap.SimpleEntry<>(
REPORTER_ID_GROUPPING_KEY, reporterId);
}
throw new NoSuchElementException();
}
};
}

@Override
public int size() {
return customGroupingKeys.size() + 1;
}
}

String reporterId() {
return reporterId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public PrometheusPushGatewayReporter createMetricReporter(Properties properties)
new PrometheusPushGatewayReporter(
new URL(hostUrl),
jobName,
!randomSuffix,
groupingKey,
deleteOnShutdown,
username,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ public class PrometheusPushGatewayReporterOptions {
.defaultValue("")
.withDescription("The job name under which metrics will be pushed");

@Deprecated
public static final ConfigOption<Boolean> RANDOM_JOB_NAME_SUFFIX =
Copy link
Contributor

@rionmonster rionmonster Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we want to remove this configuration entirely? I’d imagine there are existing jobs and/or users already relying on it. This would also require updating the associated documentation and, at a minimum, feels like something that should be deprecated before being removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Let me deprecate it in the PR. I'm wondering should I also change the default value to false while keeping it. I'll use this flag as a switch: if job is not suffixed, a "flink_prometheus_push_gateway_reporter_id" grouping key will be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sent a followup commit in this sense.

ConfigOptions.key("randomJobNameSuffix")
.booleanType()
.defaultValue(true)
.defaultValue(false)
.withDescription(
"Specifies whether a random suffix should be appended to the job name.");
"Specifies whether random suffixing `job` label (the old way) to avoid metric collision among reporters from taskamangers."
+ " When disabled (now default) , metrics will be grouped under a random reporter id while job name is faithful to configuration."
+ " This option is deprecated and it is recommended to rely on the reporter id grouping key.");

public static final ConfigOption<Boolean> DELETE_ON_SHUTDOWN =
ConfigOptions.key("deleteOnShutdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.event.Level;

import java.util.AbstractMap;
import java.util.Iterator;
import java.util.Map;

import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.REPORTER_ID_GROUPPING_KEY;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.GROUPING_KEY;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST_URL;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PASSWORD;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.USERNAME;
Expand Down Expand Up @@ -61,6 +65,37 @@ void testParseIncompleteGroupingKey() {
assertThat(groupingKey).isEmpty();
}

@Test
void testGroupingKeysIteratorEndsWithReporterId() {
PrometheusPushGatewayReporter.GroupingKeyMap groupingKey =
new PrometheusPushGatewayReporter.GroupingKeyMap(
PrometheusPushGatewayReporterFactory.parseGroupingKey("k1=v1;k2=v2"));
assertThat(groupingKey.size()).isEqualTo(3);
Iterator<Map.Entry<String, String>> it = groupingKey.entrySet().iterator();
for (int i = 0; i < 2; i++) {
it.next();
}
assertThat(it.next())
.isEqualTo(
new AbstractMap.SimpleEntry(
REPORTER_ID_GROUPPING_KEY, groupingKey.reporterId()));
}

@Test
void testRejectReporterIdInUserGroupingKey() {
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(HOST_URL.key(), "http://localhost:8080");
metricConfig.setProperty(GROUPING_KEY.key(), "k1=v1;" + REPORTER_ID_GROUPPING_KEY + "=123");
assertThatThrownBy(
() ->
new PrometheusPushGatewayReporterFactory()
.createMetricReporter(metricConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Grouping keys must not contain the reserved key: "
+ REPORTER_ID_GROUPPING_KEY);
}

@Test
void testConnectToPushGatewayUsingHostUrl() {
PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();
Expand Down