From af6a059f7357b27195e6762ad3f9168a3114edcf Mon Sep 17 00:00:00 2001 From: okumin Date: Mon, 18 Dec 2023 21:36:47 +0900 Subject: [PATCH 1/3] TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts --- .../apache/tez/dag/api/TezConfiguration.java | 21 +++++++++++++-- .../apache/tez/runtime/hook/TezDAGHook.java | 26 +++++++++++++++++++ .../tez/runtime/hook/TezTaskAttemptHook.java | 26 +++++++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 21 +++++++++++---- .../apache/tez/dag/app/ThreadDumpDAGHook.java | 23 ++++++++++++++++ .../org/apache/tez/runtime/task/TezChild.java | 16 +++++++++--- .../task/ThreadDumpTaskAttemptHook.java | 23 ++++++++++++++++ 7 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java create mode 100644 tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java create mode 100644 tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 3dc6fe4745..7c6cd2932e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2297,12 +2297,14 @@ static Set getPropertySet() { public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties"; /** - * Frequency at which thread dump should be captured. Supports TimeUnits. + * Frequency at which thread dump should be captured. Supports TimeUnits. This is effective only + * when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to tez.am.hooks or + * org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to tez.task.attempt.hooks. */ @ConfigurationScope(Scope.DAG) @ConfigurationProperty public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; - public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms"; + public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms"; /** * Limits the amount of data that can be written to LocalFileSystem by a Task. @@ -2312,4 +2314,19 @@ static Set getPropertySet() { public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes"; public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1; + /** + * Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezDAGHook. + * e.g. org.apache.tez.dag.app.ThreadDumpDAGHook + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_AM_HOOKS = TEZ_AM_PREFIX + "hooks"; + + /** + * Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezTaskAttemptHook. + * e.g. org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook + */ + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty + public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + "attempt.hooks"; } diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java new file mode 100644 index 0000000000..c9ce95810f --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java @@ -0,0 +1,26 @@ +package org.apache.tez.runtime.hook; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezDAGID; + +/** + * A hook which is instantiated and triggered before and after a DAG is exeucted. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface TezDAGHook { + /** + * Invoked before the DAG starts. + * + * @param id the DAG id + * @param conf the conf + */ + void start(TezDAGID id, Configuration conf); + + /** + * Invoked after the DAG finishes. + */ + void stop(); +} diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java new file mode 100644 index 0000000000..a83cb326bd --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java @@ -0,0 +1,26 @@ +package org.apache.tez.runtime.hook; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezTaskAttemptID; + +/** + * A hook which is instantiated and triggered before and after a task attempt is executed. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface TezTaskAttemptHook { + /** + * Invoked before the task attempt starts. + * + * @param id the task attempt id + * @param conf the conf + */ + void start(TezTaskAttemptID id, Configuration conf); + + /** + * Invoked after the task attempt finishes. + */ + void stop(); +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 263ac76b4c..6d7bd96535 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -70,6 +70,7 @@ import org.apache.tez.Utils; import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.SessionNotRunning; @@ -186,7 +187,7 @@ import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; -import org.apache.tez.runtime.TezThreadDumpHelper; +import org.apache.tez.runtime.hook.TezDAGHook; import org.apache.tez.util.LoggingUtils; import org.apache.tez.util.TezMxBeanResourceCalculator; import org.codehaus.jettison.json.JSONException; @@ -342,7 +343,7 @@ public class DAGAppMaster extends AbstractService { Map services = new LinkedHashMap(); private ThreadLocalMap mdcContext; - private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; + private TezDAGHook[] hooks = {}; public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, @@ -769,7 +770,9 @@ protected synchronized void handle(DAGAppMasterEvent event) { "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: - tezThreadDumpHelper.stop(); + for (TezDAGHook hook : hooks) { + hook.stop(); + } DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); @@ -2207,7 +2210,9 @@ public Void run() throws Exception { } // Check if the thread dump service is up in any case, if yes attempt a shutdown - tezThreadDumpHelper.stop(); + for (TezDAGHook hook : hooks) { + hook.stop(); + } super.serviceStop(); } @@ -2579,7 +2584,13 @@ private void countHeldContainers(DAG newDAG) { private void startDAGExecution(DAG dag, final Map additionalAmResources) throws TezException { currentDAG = dag; - tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString()); + final Configuration conf = dag.getConf(); + final String[] hookClasses = conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]); + final TezDAGHook[] hooks = new TezDAGHook[hookClasses.length]; + for (int i = 0; i < hooks.length; i++) { + hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]); + hooks[i].start(dag.getID(), conf); + } // Try localizing the actual resources. List additionalUrlsForClasspath; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java new file mode 100644 index 0000000000..0f0e697f4d --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java @@ -0,0 +1,23 @@ +package org.apache.tez.dag.app; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.runtime.TezThreadDumpHelper; +import org.apache.tez.runtime.hook.TezDAGHook; + +/** + * A DAG hook which dumps thread information periodically. + */ +public class ThreadDumpDAGHook implements TezDAGHook { + private TezThreadDumpHelper helper; + + @Override + public void start(TezDAGID id, Configuration conf) { + helper = TezThreadDumpHelper.getInstance(conf).start(id.toString()); + } + + @Override + public void stop() { + helper.stop(); + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 3145f21a58..7efb3cc327 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -52,6 +52,7 @@ import org.apache.log4j.helpers.ThreadLocalMap; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezExecutors; import org.apache.tez.common.TezLocalResource; @@ -69,10 +70,10 @@ import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; -import org.apache.tez.runtime.TezThreadDumpHelper; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; +import org.apache.tez.runtime.hook.TezTaskAttemptHook; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.util.LoggingUtils; @@ -120,7 +121,6 @@ public class TezChild { private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final String user; private final boolean updateSysCounters; - private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; private Multimap startedInputsMap = HashMultimap.create(); private final boolean ownUmbilical; @@ -296,7 +296,13 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, hadoopShim, sharedExecutor); boolean shouldDie; - tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString()); + final String[] hookClasses = taskConf + .getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new String[0]); + final TezTaskAttemptHook[] hooks = new TezTaskAttemptHook[hookClasses.length]; + for (int i = 0; i < hooks.length; i++) { + hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]); + hooks[i].start(attemptId, taskConf); + } try { TaskRunner2Result result = taskRunner.run(); LOG.info("TaskRunner2Result: {}", result); @@ -315,7 +321,9 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, e, "TaskExecutionFailure: " + e.getMessage()); } } finally { - tezThreadDumpHelper.stop(); + for (TezTaskAttemptHook hook : hooks) { + hook.stop(); + } FileSystem.closeAllForUGI(childUGI); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java new file mode 100644 index 0000000000..ecb87a533d --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java @@ -0,0 +1,23 @@ +package org.apache.tez.runtime.task; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.TezThreadDumpHelper; +import org.apache.tez.runtime.hook.TezTaskAttemptHook; + +/** + * A task attempt hook which dumps thread information periodically. + */ +public class ThreadDumpTaskAttemptHook implements TezTaskAttemptHook { + private TezThreadDumpHelper helper; + + @Override + public void start(TezTaskAttemptID id, Configuration conf) { + helper = TezThreadDumpHelper.getInstance(conf).start(id.toString()); + } + + @Override + public void stop() { + helper.stop(); + } +} From 2ede93efafff13f380eb0865c7ff4ae1894d7bfc Mon Sep 17 00:00:00 2001 From: okumin Date: Mon, 18 Dec 2023 23:33:51 +0900 Subject: [PATCH 2/3] Add licenses --- .../apache/tez/runtime/hook/TezDAGHook.java | 18 ++++++++++++++++++ .../tez/runtime/hook/TezTaskAttemptHook.java | 18 ++++++++++++++++++ .../apache/tez/dag/app/ThreadDumpDAGHook.java | 18 ++++++++++++++++++ .../task/ThreadDumpTaskAttemptHook.java | 18 ++++++++++++++++++ 4 files changed, 72 insertions(+) diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java index c9ce95810f..cbc92817a9 100644 --- a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.runtime.hook; import org.apache.hadoop.classification.InterfaceAudience; diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java index a83cb326bd..54931b64d5 100644 --- a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.runtime.hook; import org.apache.hadoop.classification.InterfaceAudience; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java index 0f0e697f4d..ff657e47f1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.dag.app; import org.apache.hadoop.conf.Configuration; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java index ecb87a533d..dd41cee9d2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.runtime.task; import org.apache.hadoop.conf.Configuration; From e33316d2b3c4fa839dfb9c40160c5a6d34da29a3 Mon Sep 17 00:00:00 2001 From: okumin Date: Tue, 19 Dec 2023 00:36:55 +0900 Subject: [PATCH 3/3] Follow checkstyle --- .../apache/tez/runtime/hook/package-info.java | 22 +++++++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java new file mode 100644 index 0000000000..d977897d86 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@Private +package org.apache.tez.runtime.hook; + +import org.apache.hadoop.classification.InterfaceAudience.Private; \ No newline at end of file diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 6d7bd96535..6bb558cda0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2586,7 +2586,7 @@ private void startDAGExecution(DAG dag, final Map additio currentDAG = dag; final Configuration conf = dag.getConf(); final String[] hookClasses = conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]); - final TezDAGHook[] hooks = new TezDAGHook[hookClasses.length]; + hooks = new TezDAGHook[hookClasses.length]; for (int i = 0; i < hooks.length; i++) { hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]); hooks[i].start(dag.getID(), conf);