Skip to content

Commit

Permalink
Refactor complex conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
okumin committed Dec 23, 2024
1 parent 7f0b3b8 commit c8c18fc
Showing 1 changed file with 37 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2750,7 +2750,7 @@ private VertexState setupVertex() {
: rootInputDescriptors.values()) {
if (input.getControllerDescriptor() != null &&
input.getControllerDescriptor().getClassName() != null) {
if (inputsWithInitializers == null) {
if (!hasInputInitializers()) {
inputsWithInitializers = Sets.newHashSet();
}
inputsWithInitializers.add(input.getName());
Expand All @@ -2771,7 +2771,7 @@ private VertexState setupVertex() {
}
}

if (hasBipartite && inputsWithInitializers != null) {
if (hasBipartite && hasInputInitializers()) {
LOG.error("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
return finished(VertexState.FAILED);
}
Expand Down Expand Up @@ -2819,15 +2819,19 @@ private VertexState setupVertex() {
return VertexState.INITED;
}

private boolean canSkipInitializingParents() {
// Both cases use RootInputVertexManager. RootInputVertexManager can start tasks even though
// any parents are not fully initialized.
private boolean hasInputInitializers() {
return inputsWithInitializers != null;
}

private boolean usesRootInputVertexManager() {
// RootInputVertexManager can start tasks even though any parents are not fully initialized.
if (vertexPlan.hasVertexManagerPlugin()) {
final VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
.convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan.getVertexManagerPlugin());
return pluginDesc.getClassName().equals(RootInputVertexManager.class.getName());
} else {
return inputsWithInitializers != null;
// This case implicitly uses RootInputVertexManager. See VertexImpl#assignVertexManager
return hasInputInitializers();
}
}

Expand All @@ -2840,24 +2844,35 @@ private boolean isVertexInitSkippedInParentVertices() {
return true;
}

private void assignVertexManager() throws TezException {
private boolean canSkipInitialization() {
// condition for skip initializing stage
// - VertexInputInitializerEvent is seen
// - VertexReconfigureDoneEvent is seen
// - Reason to check whether VertexManager has complete its responsibility
// - VertexInitializedEvent is seen
// - VertexConfigurationDoneEvent is seen
// - Reason to check whether VertexManager has completed its responsibility
// VertexManager actually is involved in the InputInitializer (InputInitializer generate events
// and send them to VertexManager which do some processing and send back to Vertex), so that means
// Input initializer will affect on the VertexManager and we couldn't skip the initializing step if
// Input initializer will affect on the VertexManager and we couldn't skip the initializing step if
// VertexManager has not completed its responsibility.
// - Why using VertexReconfigureDoneEvent
// - VertexReconfigureDoneEvent represent the case that user use API reconfigureVertex
// VertexReconfigureDoneEvent will be logged
// - TaskStartEvent is seen in that vertex or setVertexParallelism is called
// - Why using VertexConfigurationDoneEvent
// - VertexConfigurationDoneEvent represent the case that user use API reconfigureVertex
// VertexConfigurationDoneEvent will be logged
// - VertexStartedEvent is seen in that vertex or setVertexParallelism is called
// - All the parent vertices have skipped initializing stage while recovering
if (recoveryData != null && recoveryData.shouldSkipInit()
&& (recoveryData.isVertexTasksStarted() ||
recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled())
&& (canSkipInitializingParents() || isVertexInitSkippedInParentVertices())) {
// - Or RootInputVertexManager is used, which can start without waiting for parent vertices
if (recoveryData == null) {
return false;
}
if (!recoveryData.shouldSkipInit()) {
return false;
}
if (!recoveryData.isVertexStarted() && !recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled()) {
return false;
}
return isVertexInitSkippedInParentVertices() || usesRootInputVertexManager();
}

private void assignVertexManager() throws TezException {
if (canSkipInitialization()) {
// Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt
VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData.getVertexConfigurationDoneEvent();
if (LOG.isInfoEnabled()) {
Expand Down Expand Up @@ -2921,7 +2936,7 @@ private void assignVertexManager() throws TezException {
// If there is a one to one edge then we use the InputReadyVertexManager
// If there is a scatter-gather edge then we use the ShuffleVertexManager
// Else we use the default ImmediateStartVertexManager
if (inputsWithInitializers != null) {
if (hasInputInitializers()) {
LOG.info("Setting vertexManager to RootInputVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(RootInputVertexManager
Expand Down Expand Up @@ -3096,7 +3111,7 @@ private VertexState handleInitEvent(VertexImpl vertex) {
LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
+ " to set #tasks for the vertex " + vertex.getLogIdentifier());

if (vertex.inputsWithInitializers != null) {
if (vertex.hasInputInitializers()) {
if (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit()) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
try {
Expand Down Expand Up @@ -3135,8 +3150,7 @@ private VertexState handleInitEvent(VertexImpl vertex) {
LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier);
vertex.createTasks();
// this block may return VertexState.INITIALIZING
if (vertex.inputsWithInitializers != null &&
(vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) {
if (vertex.hasInputInitializers() && (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
try {
vertex.setupInputInitializerManager();
Expand Down

0 comments on commit c8c18fc

Please sign in to comment.