@@ -2817,6 +2817,18 @@ private VertexState setupVertex() {
2817
2817
return VertexState .INITED ;
2818
2818
}
2819
2819
2820
+ private boolean canSkipInitializingParents () {
2821
+ // Both cases use RootInputVertexManager. RootInputVertexManager can start tasks even though
2822
+ // any parents are not fully initialized.
2823
+ if (vertexPlan .hasVertexManagerPlugin ()) {
2824
+ final VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
2825
+ .convertVertexManagerPluginDescriptorFromDAGPlan (vertexPlan .getVertexManagerPlugin ());
2826
+ return pluginDesc .getClassName ().equals (RootInputVertexManager .class .getName ());
2827
+ } else {
2828
+ return inputsWithInitializers != null ;
2829
+ }
2830
+ }
2831
+
2820
2832
private boolean isVertexInitSkippedInParentVertices () {
2821
2833
for (Map .Entry <Vertex , Edge > entry : sourceVertices .entrySet ()) {
2822
2834
if (!(((VertexImpl ) entry .getKey ()).isVertexInitSkipped ())) {
@@ -2843,7 +2855,7 @@ private void assignVertexManager() throws TezException {
2843
2855
if (recoveryData != null && recoveryData .shouldSkipInit ()
2844
2856
&& (recoveryData .isVertexTasksStarted () ||
2845
2857
recoveryData .getVertexConfigurationDoneEvent ().isSetParallelismCalled ())
2846
- && isVertexInitSkippedInParentVertices ()) {
2858
+ && ( canSkipInitializingParents () || isVertexInitSkippedInParentVertices () )) {
2847
2859
// Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt
2848
2860
VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData .getVertexConfigurationDoneEvent ();
2849
2861
if (LOG .isInfoEnabled ()) {
0 commit comments