From 474c8c651d42a83ba62993c4041bc08c9959e019 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 16 Jan 2025 15:25:16 +0800 Subject: [PATCH] HDFS-17709. [ARR] Add async responder performance metrics. --- .../protocolPB/AsyncRpcProtocolPBUtil.java | 4 ++++ .../metrics/FederationRPCMetrics.java | 21 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java index 43bbe0373906c..31ed63f88f620 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.protocolPB; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext; import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil; @@ -28,6 +29,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngineCallback2; import org.apache.hadoop.ipc.internal.ShadedProtobufHelper; import org.apache.hadoop.thirdparty.protobuf.Message; +import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.AsyncGet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +89,7 @@ public static R asyncIpcClient( // transfer thread local context to worker threads of executor. ThreadLocalContext threadLocalContext = new ThreadLocalContext(); asyncCompleteWith(responseFuture.handleAsync((result, e) -> { + FederationRPCMetrics.ASYNC_RESPONDER_START_TIME.set(Time.monotonicNow()); threadLocalContext.transfer(); if (e != null) { throw warpCompletionException(e); @@ -136,6 +139,7 @@ public static void asyncRouterServer(ServerReq req, ServerRes res) { } else { callback.error(e.getCause()); } + FederationRPCMetrics.addAsyncResponderThreadTime(); return null; }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 5d5f9fb8aa12a..d3a572e987e7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -30,6 +30,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.util.Time; /** * Implementation of the RPC metrics collector. @@ -41,9 +42,15 @@ public class FederationRPCMetrics implements FederationRPCMBean { private final MetricsRegistry registry = new MetricsRegistry("router"); private RouterRpcServer rpcServer; + public static final ThreadLocal ASYNC_RESPONDER_START_TIME = + ThreadLocal.withInitial(() -> -1L); + public static final ThreadLocal ASYNC_RESPONDER_END_TIME = + ThreadLocal.withInitial(() -> -1L); @Metric("Time for the router to process an operation internally") private MutableRate processing; + @Metric("Time for the router async responder to process an operation internally") + private static MutableRate asyncResponderProcessing; @Metric("Number of operations the Router processed internally") private MutableCounterLong processingOp; @Metric("Time for the Router to proxy an operation to the Namenodes") @@ -302,6 +309,20 @@ public void addProcessingTime(long time) { processingOp.incr(); } + public static void addAsyncResponderThreadTime() { + ASYNC_RESPONDER_END_TIME.set(Time.monotonicNow()); + long duration = getAsyncResponderProcessingTime(); + asyncResponderProcessing.add(duration); + } + + public static long getAsyncResponderProcessingTime() { + if (ASYNC_RESPONDER_START_TIME.get() != null && ASYNC_RESPONDER_START_TIME.get() > 0 && + ASYNC_RESPONDER_END_TIME.get() != null && ASYNC_RESPONDER_END_TIME.get() > 0) { + return ASYNC_RESPONDER_END_TIME.get() - ASYNC_RESPONDER_START_TIME.get(); + } + return -1; + } + @Override public double getProcessingAvg() { return processing.lastStat().mean();