-
Notifications
You must be signed in to change notification settings - Fork 425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TEZ-4061: InputAttemptIdentifier and CompositeInputAttemptIdentifier cannot be compared for equality #326
TEZ-4061: InputAttemptIdentifier and CompositeInputAttemptIdentifier cannot be compared for equality #326
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,12 @@ public InputAttemptIdentifier expand(int inputIdentifierOffset) { | |
return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId()); | ||
} | ||
|
||
public boolean include(int thatInputIdentifier, int thatAttemptNumber) { | ||
return | ||
super.getInputIdentifier() <= thatInputIdentifier && thatInputIdentifier < (super.getInputIdentifier() + inputIdentifierCount) && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you reuse e.g. guava's Range for better readability? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, we can. Modified to use guava's Range. |
||
super.getAttemptNumber() == thatAttemptNumber; | ||
} | ||
|
||
// PathComponent & shared does not need to be part of the hashCode and equals computation. | ||
@Override | ||
public int hashCode() { | ||
|
@@ -63,6 +69,6 @@ public boolean equals(Object obj) { | |
|
||
@Override | ||
public String toString() { | ||
return super.toString(); | ||
return super.toString() + ", count=" + inputIdentifierCount; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -282,6 +282,7 @@ private void processShufflePayload(DataMovementEventPayloadProto shufflePayload, | |
|
||
private void processInputFailedEvent(InputFailedEvent ife) { | ||
InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion()); | ||
LOG.info("Marking obsolete input: " + inputContext.getSourceVertexName() + " " + srcAttemptIdentifier); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use {} formatting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
shuffleManager.obsoleteKnownInput(srcAttemptIdentifier); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -583,8 +583,15 @@ Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { | |
} else { | ||
alreadyCompleted = completedInputSet.get(input.getInputIdentifier()); | ||
} | ||
// Avoid adding attempts which have already completed or have been marked as OBSOLETE | ||
if (alreadyCompleted || obsoletedInputs.contains(input)) { | ||
|
||
// Avoid adding attempts which have already completed | ||
if (alreadyCompleted) { | ||
inputIter.remove(); | ||
continue; | ||
} | ||
// Avoid adding attempts which have been marked as OBSOLETE | ||
if (isObsoleteInputAttemptIdentifier(input)) { | ||
LOG.info("Skipping obsolete input: " + input); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the original behavior didn't log when we skip an obsolete input, is this an intentional change? I can see that we still don't log anything in ShuffleScheduler when the same happens, so this is more like a LOG.debug, or back to
which one do you prefer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better to synchronize the behaviour of |
||
inputIter.remove(); | ||
continue; | ||
} | ||
|
@@ -949,10 +956,14 @@ public void fetchFailed(String host, | |
// TODO NEWTEZ. Implement logic to report fetch failures after a threshold. | ||
// For now, reporting immediately. | ||
InputAttemptIdentifier srcAttemptIdentifier = inputAttemptFetchFailure.getInputAttemptIdentifier(); | ||
if (isObsoleteInputAttemptIdentifier(srcAttemptIdentifier)) { | ||
LOG.info("Do not report obsolete input: " + srcAttemptIdentifier); | ||
return; | ||
} | ||
LOG.info( | ||
"{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, " | ||
"{}: Fetch failed for InputIdentifier: {}, connectFailed: {}, " | ||
+ "local fetch: {}, remote fetch failure reported as local failure: {})", | ||
sourceDestNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed, | ||
sourceDestNameTrimmed, srcAttemptIdentifier, connectFailed, | ||
abstractdog marked this conversation as resolved.
Show resolved
Hide resolved
|
||
inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource()); | ||
failedShufflesCounter.increment(1); | ||
inputContext.notifyProgress(); | ||
|
@@ -984,6 +995,22 @@ public void fetchFailed(String host, | |
} | ||
} | ||
} | ||
|
||
private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) { | ||
if (input == null) { | ||
return false; | ||
} | ||
InputAttemptIdentifier obsoleteInput; | ||
Iterator<InputAttemptIdentifier> obsoleteInputsIter = obsoletedInputs.iterator(); | ||
while (obsoleteInputsIter.hasNext()) { | ||
obsoleteInput = obsoleteInputsIter.next(); | ||
if (input.include(obsoleteInput.getInputIdentifier(), obsoleteInput.getAttemptNumber())) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
/////////////////// End of Methods from FetcherCallbackHandler | ||
|
||
public void shutdown() throws InterruptedException { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few notes:
nit: this is more like "includes" because the method answers a question like whether this composite identifier includes another one, rather than taking and action
this method would be simpler with a single param:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that
includes
is more appropriate for the method. I modified the signature of the method as well as its name, and attached a javadoc on top of it.