Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts #324

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2297,12 +2297,14 @@ static Set<String> getPropertySet() {
public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties";

/**
* Frequency at which thread dump should be captured. Supports TimeUnits.
* Frequency at which thread dump should be captured. Supports TimeUnits. This is effective only
* when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to tez.am.hooks or
* org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to tez.task.attempt.hooks.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we introduce pluggable hooks, I think we can change the default value. We may remove NOOP_TEZ_THREAD_DUMP_HELPER, too.


/**
* Limits the amount of data that can be written to LocalFileSystem by a Task.
Expand All @@ -2312,4 +2314,19 @@ static Set<String> getPropertySet() {
public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes";
public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1;

/**
* Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezDAGHook.
* e.g. org.apache.tez.dag.app.ThreadDumpDAGHook
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_AM_HOOKS = TEZ_AM_PREFIX + "hooks";

/**
* Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezTaskAttemptHook.
* e.g. org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + "attempt.hooks";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezDAGID;

/**
* A hook which is instantiated and triggered before and after a DAG is exeucted.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TezDAGHook {
/**
* Invoked before the DAG starts.
*
* @param id the DAG id
* @param conf the conf
*/
void start(TezDAGID id, Configuration conf);

/**
* Invoked after the DAG finishes.
*/
void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;

/**
* A hook which is instantiated and triggered before and after a task attempt is executed.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TezTaskAttemptHook {
/**
* Invoked before the task attempt starts.
*
* @param id the task attempt id
* @param conf the conf
*/
void start(TezTaskAttemptID id, Configuration conf);

/**
* Invoked after the task attempt finishes.
*/
void stop();
}
21 changes: 16 additions & 5 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.tez.Utils;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
Expand Down Expand Up @@ -186,7 +187,7 @@
import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezDAGHook;
import org.apache.tez.util.LoggingUtils;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.codehaus.jettison.json.JSONException;
Expand Down Expand Up @@ -342,7 +343,7 @@ public class DAGAppMaster extends AbstractService {
Map<Service, ServiceWithDependency> services =
new LinkedHashMap<Service, ServiceWithDependency>();
private ThreadLocalMap mdcContext;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
private TezDAGHook[] hooks = {};

public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Expand Down Expand Up @@ -769,7 +770,9 @@ protected synchronized void handle(DAGAppMasterEvent event) {
"DAGAppMaster Internal Error occurred");
break;
case DAG_FINISHED:
tezThreadDumpHelper.stop();
for (TezDAGHook hook : hooks) {
hook.stop();
}
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
Expand Down Expand Up @@ -2207,7 +2210,9 @@ public Void run() throws Exception {
}

// Check if the thread dump service is up in any case, if yes attempt a shutdown
tezThreadDumpHelper.stop();
for (TezDAGHook hook : hooks) {
hook.stop();
}

super.serviceStop();
}
Expand Down Expand Up @@ -2579,7 +2584,13 @@ private void countHeldContainers(DAG newDAG) {
private void startDAGExecution(DAG dag, final Map<String, LocalResource> additionalAmResources)
throws TezException {
currentDAG = dag;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString());
final Configuration conf = dag.getConf();
final String[] hookClasses = conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]);
final TezDAGHook[] hooks = new TezDAGHook[hookClasses.length];
for (int i = 0; i < hooks.length; i++) {
hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
hooks[i].start(dag.getID(), conf);
}

// Try localizing the actual resources.
List<URL> additionalUrlsForClasspath;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.app;

import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezDAGHook;

/**
* A DAG hook which dumps thread information periodically.
*/
public class ThreadDumpDAGHook implements TezDAGHook {
private TezThreadDumpHelper helper;

@Override
public void start(TezDAGID id, Configuration conf) {
helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
}

@Override
public void stop() {
helper.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.log4j.helpers.ThreadLocalMap;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezLocalResource;
Expand All @@ -69,10 +70,10 @@
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.hook.TezTaskAttemptHook;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.util.LoggingUtils;

Expand Down Expand Up @@ -120,7 +121,6 @@ public class TezChild {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final String user;
private final boolean updateSysCounters;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;

private Multimap<String, String> startedInputsMap = HashMultimap.create();
private final boolean ownUmbilical;
Expand Down Expand Up @@ -296,7 +296,13 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
hadoopShim, sharedExecutor);

boolean shouldDie;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString());
final String[] hookClasses = taskConf
.getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new String[0]);
final TezTaskAttemptHook[] hooks = new TezTaskAttemptHook[hookClasses.length];
for (int i = 0; i < hooks.length; i++) {
hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
hooks[i].start(attemptId, taskConf);
}
try {
TaskRunner2Result result = taskRunner.run();
LOG.info("TaskRunner2Result: {}", result);
Expand All @@ -315,7 +321,9 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
e, "TaskExecutionFailure: " + e.getMessage());
}
} finally {
tezThreadDumpHelper.stop();
for (TezTaskAttemptHook hook : hooks) {
hook.stop();
}
FileSystem.closeAllForUGI(childUGI);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.task;

import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezTaskAttemptHook;

/**
* A task attempt hook which dumps thread information periodically.
*/
public class ThreadDumpTaskAttemptHook implements TezTaskAttemptHook {
private TezThreadDumpHelper helper;

@Override
public void start(TezTaskAttemptID id, Configuration conf) {
helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
}

@Override
public void stop() {
helper.stop();
}
}