Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts #324

Merged
merged 8 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2297,12 +2297,14 @@ static Set<String> getPropertySet() {
public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties";

/**
* Frequency at which thread dump should be captured. Supports TimeUnits.
* Frequency at which thread dump should be captured. Supports TimeUnits. This is effective only
* when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to tez.am.hooks or
* org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to tez.task.attempt.hooks.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";
public static final String TEZ_HOOK_THREAD_DUMP_INTERVAL = "tez.hook.thread.dump.interval";
Copy link
Contributor

@abstractdog abstractdog Dec 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@okumin : I'm terribly sorry, I just realized that changing this causes more problems than benefits (changing config opts from one release to another), the class name also doesn't have "hook" in it, so it's fine to have this as "tez.thread.dump.interval", are you fine with changing back? TEZ_THREAD_DUMP_INTERVAL was also fine from this point of view

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np. I renamed them back
293ce63

public static final String TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT = "100ms";

/**
* Limits the amount of data that can be written to LocalFileSystem by a Task.
Expand All @@ -2312,4 +2314,19 @@ static Set<String> getPropertySet() {
public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes";
public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1;

/**
* Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezDAGHook.
* e.g. org.apache.tez.dag.app.ThreadDumpDAGHook
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_AM_HOOKS = TEZ_AM_PREFIX + "hooks";

/**
* Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezTaskAttemptHook.
* e.g. org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + "attempt.hooks";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezDAGID;

/**
* A hook which is instantiated and triggered before and after a DAG is executed.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TezDAGHook {
/**
* Invoked before the DAG starts.
*
* @param id the DAG id
* @param conf the conf
*/
void start(TezDAGID id, Configuration conf);

/**
* Invoked after the DAG finishes.
*/
void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;

/**
* A hook which is instantiated and triggered before and after a task attempt is executed.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TezTaskAttemptHook {
/**
* Invoked before the task attempt starts.
*
* @param id the task attempt id
* @param conf the conf
*/
void start(TezTaskAttemptID id, Configuration conf);

/**
* Invoked after the task attempt finishes.
*/
void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

@Private
package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience.Private;
23 changes: 17 additions & 6 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.tez.Utils;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
Expand Down Expand Up @@ -187,7 +188,7 @@
import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezDAGHook;
import org.apache.tez.util.LoggingUtils;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.codehaus.jettison.json.JSONException;
Expand Down Expand Up @@ -343,7 +344,7 @@ public class DAGAppMaster extends AbstractService {
Map<Service, ServiceWithDependency> services =
new LinkedHashMap<Service, ServiceWithDependency>();
private ThreadLocalMap mdcContext;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
private TezDAGHook[] hooks = {};

public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Expand Down Expand Up @@ -770,7 +771,9 @@ protected synchronized void handle(DAGAppMasterEvent event) {
"DAGAppMaster Internal Error occurred");
break;
case DAG_FINISHED:
tezThreadDumpHelper.stop();
for (TezDAGHook hook : hooks) {
hook.stop();
}
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
Expand Down Expand Up @@ -2226,8 +2229,10 @@ public Void run() throws Exception {
execService.shutdownNow();
}

// Check if the thread dump service is up in any case, if yes attempt a shutdown
tezThreadDumpHelper.stop();
// Try to shut down any hooks that are still active
for (TezDAGHook hook : hooks) {
hook.stop();
}

super.serviceStop();
}
Expand Down Expand Up @@ -2599,7 +2604,13 @@ private void countHeldContainers(DAG newDAG) {
private void startDAGExecution(DAG dag, final Map<String, LocalResource> additionalAmResources)
throws TezException {
currentDAG = dag;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString());
final Configuration conf = dag.getConf();
final String[] hookClasses = conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]);
hooks = new TezDAGHook[hookClasses.length];
for (int i = 0; i < hooks.length; i++) {
hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
hooks[i].start(dag.getID(), conf);
}

// Try localizing the actual resources.
List<URL> additionalUrlsForClasspath;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.app;

import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezDAGHook;

/**
* A DAG hook which dumps thread information periodically.
*/
public class ThreadDumpDAGHook implements TezDAGHook {
private TezThreadDumpHelper helper;

@Override
public void start(TezDAGID id, Configuration conf) {
helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
}

@Override
public void stop() {
helper.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Appender;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezContainerLogAppender;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,12 +42,11 @@

import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_HOOK_THREAD_DUMP_INTERVAL;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT;

public class TezThreadDumpHelper {

public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = new NoopTezThreadDumpHelper();
private long duration = 0L;
private Path basePath = null;
private FileSystem fs = null;
Expand All @@ -70,21 +71,17 @@ private TezThreadDumpHelper(long duration, Configuration conf) throws IOExceptio
"path: {}", duration, basePath);
}

public TezThreadDumpHelper() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, cannot recall what the purpose was of this constructor, does reflection work without this explicitly defined?
I'm afraid that as there is private parameterized constructor, class.newInstance() throws an InstantiationException, doesn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is originally needed to instantiate

private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper {
@Override
public TezThreadDumpHelper start(String name) {
// Do Nothing
return this;
}
@Override
public void stop() {
// Do Nothing
}
}
with zero arguments.

I think the class is not constructed in a reflective way, or it doesn't assume it's reflectively operated. I slightly updated the modifiers to make sure it
61d8249

Copy link
Contributor

@abstractdog abstractdog Dec 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I was wrong, the hooks are created by reflection, but the TezThreadDumpHelper is not

    helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());

}

public static TezThreadDumpHelper getInstance(Configuration conf) {
long periodicThreadDumpFrequency =
conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);

if (periodicThreadDumpFrequency > 0) {
try {
return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
} catch (IOException e) {
LOG.warn("Can not initialize periodic thread dump service", e);
}
long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_HOOK_THREAD_DUMP_INTERVAL,
TEZ_HOOK_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
Preconditions.checkArgument(periodicThreadDumpFrequency > 0, "%s must be positive duration",
TEZ_HOOK_THREAD_DUMP_INTERVAL);

try {
return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
} catch (IOException e) {
throw new TezUncheckedException("Can not initialize periodic thread dump service", e);
}
return NOOP_TEZ_THREAD_DUMP_HELPER;
}

public TezThreadDumpHelper start(String name) {
Expand Down Expand Up @@ -178,18 +175,4 @@ private String getTaskName(long id, String taskName) {
return id + " (" + taskName + ")";
}
}

private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper {

@Override
public TezThreadDumpHelper start(String name) {
// Do Nothing
return this;
}

@Override
public void stop() {
// Do Nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.log4j.helpers.ThreadLocalMap;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezLocalResource;
Expand All @@ -69,10 +70,10 @@
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.hook.TezTaskAttemptHook;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.util.LoggingUtils;

Expand Down Expand Up @@ -120,7 +121,6 @@ public class TezChild {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final String user;
private final boolean updateSysCounters;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;

private Multimap<String, String> startedInputsMap = HashMultimap.create();
private final boolean ownUmbilical;
Expand Down Expand Up @@ -295,7 +295,13 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
hadoopShim, sharedExecutor);

boolean shouldDie;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString());
final String[] hookClasses = taskConf
.getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new String[0]);
final TezTaskAttemptHook[] hooks = new TezTaskAttemptHook[hookClasses.length];
for (int i = 0; i < hooks.length; i++) {
hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
hooks[i].start(attemptId, taskConf);
}
try {
TaskRunner2Result result = taskRunner.run();
LOG.info("TaskRunner2Result: {}", result);
Expand All @@ -314,7 +320,9 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
e, "TaskExecutionFailure: " + e.getMessage());
}
} finally {
tezThreadDumpHelper.stop();
for (TezTaskAttemptHook hook : hooks) {
hook.stop();
}
FileSystem.closeAllForUGI(childUGI);
}
}
Expand Down
Loading
Loading