From 478833af96895f8765dcb639c0fdd971779b89b9 Mon Sep 17 00:00:00 2001 From: leixin <1403342953@qq.com> Date: Sun, 7 Jan 2024 16:58:28 +0800 Subject: [PATCH] [HUDI-7266] Add clustering metric for flink (#10420) --- .../hudi/metrics/FlinkClusteringMetrics.java | 105 ++++++++++++++++++ .../sink/clustering/ClusteringCommitSink.java | 12 ++ .../sink/clustering/ClusteringOperator.java | 14 +++ .../clustering/ClusteringPlanOperator.java | 22 +++- .../sink/utils/ClusteringFunctionWrapper.java | 6 + 5 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java new file mode 100644 index 000000000000..081c8f79a73f --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkClusteringMetrics.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics; + +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.sink.clustering.ClusteringOperator; +import org.apache.hudi.sink.clustering.ClusteringPlanOperator; + +import org.apache.flink.metrics.MetricGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.time.Duration; +import java.time.Instant; + +/** + * Metrics for flink clustering. + */ +public class FlinkClusteringMetrics extends FlinkWriteMetrics { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkClusteringMetrics.class); + + /** + * Key for clustering timer. + */ + private static final String CLUSTERING_KEY = "clustering"; + + /** + * Number of pending clustering instants. + * + * @see ClusteringPlanOperator + */ + private long pendingClusteringCount; + + /** + * Duration between the earliest pending clustering instant time and now in seconds. + * + * @see ClusteringPlanOperator + */ + private long clusteringDelay; + + /** + * Cost for consuming a clustering operation in milliseconds. + * + * @see ClusteringOperator + */ + private long clusteringCost; + + public FlinkClusteringMetrics(MetricGroup metricGroup) { + super(metricGroup, CLUSTERING_KEY); + } + + @Override + public void registerMetrics() { + super.registerMetrics(); + metricGroup.gauge(getMetricsName(actionType, "pendingClusteringCount"), () -> pendingClusteringCount); + metricGroup.gauge(getMetricsName(actionType, "clusteringDelay"), () -> clusteringDelay); + metricGroup.gauge(getMetricsName(actionType, "clusteringCost"), () -> clusteringCost); + } + + public void setPendingClusteringCount(long pendingClusteringCount) { + this.pendingClusteringCount = pendingClusteringCount; + } + + public void setFirstPendingClusteringInstant(Option firstPendingClusteringInstant) { + try { + if (!firstPendingClusteringInstant.isPresent()) { + this.clusteringDelay = 0L; + } else { + Instant start = HoodieInstantTimeGenerator.parseDateFromInstantTime((firstPendingClusteringInstant.get()).getTimestamp()).toInstant(); + this.clusteringDelay = Duration.between(start, Instant.now()).getSeconds(); + } + } catch (ParseException e) { + LOG.warn("Invalid input clustering instant" + firstPendingClusteringInstant); + } + } + + public void startClustering() { + startTimer(CLUSTERING_KEY); + } + + public void endClustering() { + this.clusteringCost = stopTimer(CLUSTERING_KEY); + } + +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java index 93b6d4fbf951..75f025687e47 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java @@ -35,6 +35,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metrics.FlinkClusteringMetrics; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -42,6 +43,7 @@ import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +90,8 @@ public class ClusteringCommitSink extends CleanFunction { */ private transient Map clusteringPlanCache; + private transient FlinkClusteringMetrics clusteringMetrics; + public ClusteringCommitSink(Configuration conf) { super(conf); this.conf = conf; @@ -102,6 +106,7 @@ public void open(Configuration parameters) throws Exception { this.commitBuffer = new HashMap<>(); this.clusteringPlanCache = new HashMap<>(); this.table = writeClient.getHoodieTable(); + registerMetrics(); } @Override @@ -194,6 +199,7 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, Colle this.writeClient.completeTableService( TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant, Option.of(HoodieListData.lazy(writeMetadata.getWriteStatuses()))); + clusteringMetrics.updateCommitMetrics(instant, writeMetadata.getCommitMetadata().get()); // whether to clean up the input base parquet files used for clustering if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { LOG.info("Running inline clean"); @@ -229,4 +235,10 @@ private static Map> getPartitionToReplacedFileIds( .filter(fg -> !newFilesWritten.contains(fg)) .collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList()))); } + + private void registerMetrics() { + MetricGroup metrics = getRuntimeContext().getMetricGroup(); + clusteringMetrics = new FlinkClusteringMetrics(metrics); + clusteringMetrics.registerMetrics(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 415b1024cfdc..6aa5dd9acbac 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -44,6 +44,7 @@ import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.metrics.FlinkClusteringMetrics; import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; import org.apache.hudi.sink.utils.NonThrownExecutor; @@ -58,6 +59,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.BoundedOneInput; @@ -127,6 +129,8 @@ public class ClusteringOperator extends TableStreamOperator(output); + + registerMetrics(); } @Override @@ -213,6 +219,7 @@ public void endInput() { // ------------------------------------------------------------------------- private void doClustering(String instantTime, List clusteringOperations) throws Exception { + clusteringMetrics.startClustering(); BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig, instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), this.rowType, true); @@ -247,6 +254,7 @@ instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), get } List writeStatuses = writerHelper.getWriteStatuses(this.taskID); + clusteringMetrics.endClustering(); collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), writeStatuses, this.taskID)); writerHelper.close(); } @@ -388,4 +396,10 @@ public void setExecutor(NonThrownExecutor executor) { public void setOutput(Output> output) { this.output = output; } + + private void registerMetrics() { + MetricGroup metrics = getRuntimeContext().getMetricGroup(); + clusteringMetrics = new FlinkClusteringMetrics(metrics); + clusteringMetrics.registerMetrics(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java index 48b2a9becd43..c16f8ed70801 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.metrics.FlinkClusteringMetrics; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.ClusteringUtil; import org.apache.hudi.util.FlinkTables; @@ -33,11 +34,14 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import java.util.List; + /** * Operator that generates the clustering plan with pluggable strategies on finished checkpoints. * @@ -57,6 +61,8 @@ public class ClusteringPlanOperator extends AbstractStreamOperator table, long checkpointId) { + List pendingClusteringInstantTimes = + ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()); // the first instant takes the highest priority. Option firstRequested = Option.fromJavaOptional( - ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream() + pendingClusteringInstantTimes.stream() .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst()); + + // record metrics + clusteringMetrics.setFirstPendingClusteringInstant(firstRequested); + clusteringMetrics.setPendingClusteringCount(pendingClusteringInstantTimes.size()); + if (!firstRequested.isPresent()) { // do nothing. LOG.info("No clustering plan for checkpoint " + checkpointId); @@ -136,4 +150,10 @@ private void scheduleClustering(HoodieFlinkTable table, long checkpointId) { public void setOutput(Output> output) { this.output = output; } + + private void registerMetrics() { + MetricGroup metrics = getRuntimeContext().getMetricGroup(); + clusteringMetrics = new FlinkClusteringMetrics(metrics); + clusteringMetrics.registerMetrics(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java index e3b75cbf6379..252a48350699 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java @@ -55,6 +55,10 @@ public class ClusteringFunctionWrapper { * Function that generates the {@code HoodieClusteringPlan}. */ private ClusteringPlanOperator clusteringPlanOperator; + /** + * Output to collect the clustering plan events. + */ + private CollectorOutput planEventOutput; /** * Output to collect the clustering commit events. */ @@ -83,6 +87,8 @@ public ClusteringFunctionWrapper(Configuration conf, StreamTask streamTask public void openFunction() throws Exception { clusteringPlanOperator = new ClusteringPlanOperator(conf); + planEventOutput = new CollectorOutput<>(); + clusteringPlanOperator.setup(streamTask, streamConfig, planEventOutput); clusteringPlanOperator.open(); clusteringOperator = new ClusteringOperator(conf, TestConfigurations.ROW_TYPE);