From 2c2ffac77fed07bcc6486f9dee8cd60e41650ef7 Mon Sep 17 00:00:00 2001 From: William Lo Date: Mon, 10 Jul 2023 17:44:53 -0400 Subject: [PATCH] [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (#3712) * Add tags to dagmanager metrics for extensibility * Fix concurrency bug in test * Add job level metrics in dagmanager test * Test not cleaning dm threads * Only cleanup metrics if threads started by the dagmanager --- .../gobblin/metrics/MetricTagNames.java | 22 +++++++++++++++++++ .../modules/orchestration/DagManager.java | 2 +- .../orchestration/DagManagerMetrics.java | 15 ++++++++++++- .../modules/orchestration/DagManagerTest.java | 14 +++++++----- 4 files changed, 46 insertions(+), 7 deletions(-) create mode 100644 gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java new file mode 100644 index 00000000000..bce085d2b5e --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricTagNames.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics; + +public class MetricTagNames { + public static final String METRIC_BACKEND_REPRESENTATION = "metricBackendRepresentation"; +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 80da8a9e99d..acbf9f71da8 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -230,7 +230,7 @@ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean } else { this.eventSubmitter = Optional.absent(); } - this.dagManagerMetrics = new DagManagerMetrics(metricContext); + this.dagManagerMetrics = new DagManagerMetrics(); TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT)); this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME)); this.jobStatusRetriever = jobStatusRetriever; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index e9e605b0661..a5f34cff7f2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -28,16 +28,22 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.GobblinMetricsKeys; +import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.ContextAwareMeter; +import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.MetricTagNames; import org.apache.gobblin.metrics.RootMetricContext; import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter; import org.apache.gobblin.service.FlowId; import org.apache.gobblin.service.RequesterService; @@ -75,6 +81,13 @@ public DagManagerMetrics(MetricContext metricContext) { this.metricContext = metricContext; } + public DagManagerMetrics() { + // Create a new metric context for the DagManagerMetrics tagged appropriately + List> tags = new ArrayList<>(); + tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, GobblinMetrics.MetricType.COUNTER)); + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), this.getClass(), tags); + } + public void activate() { if (this.metricContext != null) { allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, @@ -249,7 +262,7 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() { public void cleanup() { // Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager - if(this.metricContext != null) { + if(this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName())) { // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton. // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement RootMetricContext.get().removeMatching(getMetricsFilterForDagManager()); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java index 0a572cf26f3..2babd068311 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java @@ -709,6 +709,9 @@ public void testResumeCancelledDag() throws URISyntaxException, IOException { @Test (dependsOnMethods = "testResumeCancelledDag") public void testJobStartSLAKilledDag() throws URISyntaxException, IOException { + String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER); + long slaKilledMeterCount = metricContext.getParent().get().getMeters().get(slakilledMeterName) == null? 0 : + metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(); long flowExecutionId = System.currentTimeMillis(); String flowGroupId = "0"; String flowGroup = "group" + flowGroupId; @@ -780,8 +783,7 @@ public void testJobStartSLAKilledDag() throws URISyntaxException, IOException { Assert.assertEquals(this.dagToJobs.size(), 1); Assert.assertTrue(this.dags.containsKey(dagId1)); - String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER); - Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), 1); + Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), slaKilledMeterCount + 1); // Cleanup this._dagManagerThread.run(); @@ -1162,10 +1164,13 @@ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException { Config executorOneConfig = ConfigFactory.empty() .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne")) .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId)) - .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10)); + .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10)) + .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true)); Config executorTwoConfig = ConfigFactory.empty() .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo")) - .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10)); + .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10)) + .withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true)); + List> dagList = buildDagList(2, "newUser", executorOneConfig); dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig)); @@ -1229,7 +1234,6 @@ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException { String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER); String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1","flow1", ServiceMetricNames.RUNNING_STATUS); - String slakilledGroupName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER); Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2); Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1); // Cleanup