Skip to content

Commit c0ce8f3

Browse files
committed
TEZ-4589: Counter for the overall duration of succeeded/failed/killed task attempts
1 parent 7cd6480 commit c0ce8f3

File tree

4 files changed

+121
-6
lines changed

4 files changed

+121
-6
lines changed

tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,28 @@ public enum DAGCounter {
3030
NUM_KILLED_TASKS,
3131
NUM_SUCCEEDED_TASKS,
3232
TOTAL_LAUNCHED_TASKS,
33+
34+
/* The durations of task attempts are categorized based on their final states. The duration of successful tasks
35+
can serve as a reference when analyzing the durations of failed or killed tasks. This is because solely examining
36+
failed or killed task durations may be misleading, as these durations are measured from the submission time,
37+
which does not always correspond to the actual start time of the task attempt on executor nodes
38+
(e.g., in scenarios involving Hive LLAP).
39+
These counters align with the duration metrics used for WALL_CLOCK_MILLIS.
40+
As such, the following relationship applies:
41+
WALL_CLOCK_MILLIS = DURATION_FAILED_TASKS_MILLIS + DURATION_KILLED_TASKS_MILLIS + DURATION_SUCCEEDED_TASKS_MILLIS
42+
*/
43+
44+
// Total amount of time spent on running FAILED task attempts. This can be blamed for performance degradation, as a
45+
// DAG can still finish successfully in the presence of failed attempts.
46+
DURATION_FAILED_TASKS_MILLIS,
47+
48+
// Total amount of time spent on running KILLED task attempts.
49+
DURATION_KILLED_TASKS_MILLIS,
50+
51+
// Total amount of time spent on running SUCCEEDED task attempts, which can be a reference together with the same for
52+
// FAILED and KILLED attempts.
53+
DURATION_SUCCEEDED_TASKS_MILLIS,
54+
3355
OTHER_LOCAL_TASKS,
3456
DATA_LOCAL_TASKS,
3557
RACK_LOCAL_TASKS,

tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class DAGEventCounterUpdate extends DAGEvent {
2929

3030
public DAGEventCounterUpdate(TezDAGID dagId) {
3131
super(dagId, DAGEventType.DAG_COUNTER_UPDATE);
32-
counterUpdates = new ArrayList<DAGEventCounterUpdate.CounterIncrementalUpdate>();
32+
counterUpdates = new ArrayList<>();
3333
}
3434

3535
public void addCounterUpdate(Enum<?> key, long incrValue) {
@@ -56,5 +56,9 @@ public Enum<?> getCounterKey() {
5656
public long getIncrementValue() {
5757
return incrValue;
5858
}
59+
60+
public String toString(){
61+
return String.format("DAGEventCounterUpdate.CounterIncrementalUpdate(key=%s, incrValue=%d)", key, incrValue);
62+
}
5963
}
6064
}

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -967,23 +967,26 @@ private static DAGEventCounterUpdate createDAGCounterUpdateEventTALaunched(
967967
return dagCounterEvent;
968968
}
969969

970-
private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
970+
@VisibleForTesting
971+
static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
971972
TaskAttemptImpl taskAttempt, TaskAttemptState taState) {
972973
DAGEventCounterUpdate jce =
973974
new DAGEventCounterUpdate(taskAttempt.getDAGID());
974975

976+
long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(taskAttempt.getDurationNs());
977+
jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);
978+
975979
if (taState == TaskAttemptState.FAILED) {
976980
jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1);
981+
jce.addCounterUpdate(DAGCounter.DURATION_FAILED_TASKS_MILLIS, amSideWallClockTimeMs);
977982
} else if (taState == TaskAttemptState.KILLED) {
978983
jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1);
984+
jce.addCounterUpdate(DAGCounter.DURATION_KILLED_TASKS_MILLIS, amSideWallClockTimeMs);
979985
} else if (taState == TaskAttemptState.SUCCEEDED ) {
980986
jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1);
987+
jce.addCounterUpdate(DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, amSideWallClockTimeMs);
981988
}
982989

983-
long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(
984-
taskAttempt.getDurationNs());
985-
jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);
986-
987990
return jce;
988991
}
989992

tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.tez.dag.app.dag.impl;
2020

21+
import org.apache.hadoop.yarn.util.MonotonicClock;
22+
import org.apache.tez.common.counters.DAGCounter;
2123
import org.apache.tez.dag.app.MockClock;
2224
import static org.junit.Assert.assertEquals;
2325
import static org.junit.Assert.assertFalse;
@@ -2261,6 +2263,85 @@ public void testMapTaskIsBlamedByDownstreamAttemptsFromDifferentHosts() {
22612263
Assert.assertEquals(TaskAttemptStateInternal.FAILED, resultState2);
22622264
}
22632265

2266+
@Test
2267+
public void testDAGCounterUpdateEvent(){
2268+
TaskAttemptImpl taImpl = getMockTaskAttempt();
2269+
2270+
DAGEventCounterUpdate counterUpdateSucceeded = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
2271+
TaskAttemptState.SUCCEEDED);
2272+
List<DAGEventCounterUpdate.CounterIncrementalUpdate> succeededUpdates = counterUpdateSucceeded.getCounterUpdates();
2273+
// SUCCEEDED task related counters are updated (+ WALL_CLOCK_MILLIS)
2274+
assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.NUM_SUCCEEDED_TASKS, 1);
2275+
assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, 1000);
2276+
assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
2277+
// other counters are not updated (no FAILED, no KILLED)
2278+
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_FAILED_TASKS);
2279+
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_KILLED_TASKS);
2280+
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS);
2281+
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS);
2282+
2283+
DAGEventCounterUpdate counterUpdateFailed = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
2284+
TaskAttemptState.FAILED);
2285+
List<DAGEventCounterUpdate.CounterIncrementalUpdate> failedUpdates = counterUpdateFailed.getCounterUpdates();
2286+
// FAILED task related counters are updated (+ WALL_CLOCK_MILLIS)
2287+
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.NUM_FAILED_TASKS, 1);
2288+
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS, 1000);
2289+
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
2290+
// other counters are not updated (no SUCCEEDED, no KILLED)
2291+
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS);
2292+
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_KILLED_TASKS);
2293+
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS);
2294+
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS);
2295+
2296+
DAGEventCounterUpdate counterUpdateKilled = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
2297+
TaskAttemptState.KILLED);
2298+
List<DAGEventCounterUpdate.CounterIncrementalUpdate> killedUpdates = counterUpdateKilled.getCounterUpdates();
2299+
// KILLED task related counters are updated (+ WALL_CLOCK_MILLIS)
2300+
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.NUM_KILLED_TASKS, 1);
2301+
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS, 1000);
2302+
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
2303+
// other counters are not updated (no SUCCEEDED, no FAILED)
2304+
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS);
2305+
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_FAILED_TASKS);
2306+
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS);
2307+
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS);
2308+
}
2309+
2310+
private TaskAttemptImpl getMockTaskAttempt() {
2311+
ApplicationId appId = ApplicationId.newInstance(1, 2);
2312+
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
2313+
appId, 0);
2314+
TezDAGID dagID = TezDAGID.getInstance(appId, 1);
2315+
TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
2316+
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
2317+
2318+
return new MockTaskAttemptImpl(taskID, 1, mock(EventHandler.class),
2319+
mock(TaskCommunicatorManagerInterface.class), new Configuration(), new MonotonicClock(),
2320+
mock(TaskHeartbeatHandler.class), mock(AppContext.class), false,
2321+
mock(Resource.class), mock(ContainerContext.class), false);
2322+
}
2323+
2324+
private void assertCounterIncrementalUpdate(List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates,
2325+
DAGCounter counter, int expectedValue) {
2326+
for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) {
2327+
if (update.getCounterKey().equals(counter) && update.getIncrementValue() == expectedValue) {
2328+
return;
2329+
}
2330+
}
2331+
Assert.fail(
2332+
String.format("Haven't found counter update %s=%d, instead seen: %s", counter, expectedValue, counterUpdates));
2333+
}
2334+
2335+
private void assertCounterIncrementalUpdateNotFound(
2336+
List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates, DAGCounter counter) {
2337+
for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) {
2338+
if (update.getCounterKey().equals(counter)) {
2339+
Assert.fail(
2340+
String.format("Found counter update %s=%d, which is not expected", counter, update.getIncrementValue()));
2341+
}
2342+
}
2343+
}
2344+
22642345
private Event verifyEventType(List<Event> events,
22652346
Class<? extends Event> eventClass, int expectedOccurences) {
22662347
int count = 0;
@@ -2344,6 +2425,11 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion(
23442425
protected void sendInputFailedToConsumers() {
23452426
inputFailedReported = true;
23462427
}
2428+
2429+
@Override
2430+
public long getDurationNs(){
2431+
return 1000000000L; // 1000000000ns = 1000ms
2432+
}
23472433
}
23482434

23492435
private static ContainerContext createFakeContainerContext() {

0 commit comments

Comments
 (0)