Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4061: InputAttemptIdentifier and CompositeInputAttemptIdentifier cannot be compared for equality #326

Merged
merged 3 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public InputAttemptIdentifier expand(int inputIdentifierOffset) {
return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId());
}

public boolean include(int thatInputIdentifier, int thatAttemptNumber) {
Copy link
Contributor

@abstractdog abstractdog Dec 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few notes:

  1. nit: this is more like "includes" because the method answers a question like whether this composite identifier includes another one, rather than taking and action

  2. this method would be simpler with a single param:

includes(InputAttempIdentifier inputAttemptIdentifier)
  1. can you please add a method javadoc about what "includes" exactly means in this context

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that includes is more appropriate for the method. I modified the signature of the method as well as its name, and attached a javadoc on top of it.

return
super.getInputIdentifier() <= thatInputIdentifier && thatInputIdentifier < (super.getInputIdentifier() + inputIdentifierCount) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you reuse e.g. guava's Range for better readability?
https://www.geeksforgeeks.org/range-class-guava-java/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can. Modified to use guava's Range.

super.getAttemptNumber() == thatAttemptNumber;
}

// PathComponent & shared does not need to be part of the hashCode and equals computation.
@Override
public int hashCode() {
Expand All @@ -63,6 +69,6 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return super.toString();
return super.toString() + ", count=" + inputIdentifierCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public boolean canRetrieveInputInChunks() {
(fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal());
}

public boolean include(int thatInputIdentifier, int thatAttemptNumber) {
return this.inputIdentifier == thatInputIdentifier && this.attemptNumber == thatAttemptNumber;
}

// PathComponent & shared does not need to be part of the hashCode and equals computation.
@Override
public int hashCode() {
Expand Down Expand Up @@ -139,6 +143,6 @@ public boolean equals(Object obj) {
public String toString() {
return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
+ ", attemptNumber=" + attemptNumber + ", pathComponent="
+ pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId +"]";
+ pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ private void processShufflePayload(DataMovementEventPayloadProto shufflePayload,

private void processInputFailedEvent(InputFailedEvent ife) {
InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
LOG.info("Marking obsolete input: " + inputContext.getSourceVertexName() + " " + srcAttemptIdentifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use {} formatting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,15 @@ Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
} else {
alreadyCompleted = completedInputSet.get(input.getInputIdentifier());
}
// Avoid adding attempts which have already completed or have been marked as OBSOLETE
if (alreadyCompleted || obsoletedInputs.contains(input)) {

// Avoid adding attempts which have already completed
if (alreadyCompleted) {
inputIter.remove();
continue;
}
// Avoid adding attempts which have been marked as OBSOLETE
if (isObsoleteInputAttemptIdentifier(input)) {
LOG.info("Skipping obsolete input: " + input);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original behavior didn't log when we skip an obsolete input, is this an intentional change?

I can see that we still don't log anything in ShuffleScheduler when the same happens, so this is more like a LOG.debug, or back to

if (alreadyCompleted || isObsoleteInputAttemptIdentifier(input)) {

which one do you prefer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to synchronize the behaviour of ShuffleManager and ShuffleScheduler. Modified to use a single if clause.

inputIter.remove();
continue;
}
Expand Down Expand Up @@ -949,10 +956,14 @@ public void fetchFailed(String host,
// TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
// For now, reporting immediately.
InputAttemptIdentifier srcAttemptIdentifier = inputAttemptFetchFailure.getInputAttemptIdentifier();
if (isObsoleteInputAttemptIdentifier(srcAttemptIdentifier)) {
LOG.info("Do not report obsolete input: " + srcAttemptIdentifier);
return;
}
LOG.info(
"{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, "
"{}: Fetch failed for InputIdentifier: {}, connectFailed: {}, "
+ "local fetch: {}, remote fetch failure reported as local failure: {})",
sourceDestNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed,
sourceDestNameTrimmed, srcAttemptIdentifier, connectFailed,
abstractdog marked this conversation as resolved.
Show resolved Hide resolved
inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource());
failedShufflesCounter.increment(1);
inputContext.notifyProgress();
Expand Down Expand Up @@ -984,6 +995,22 @@ public void fetchFailed(String host,
}
}
}

private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) {
if (input == null) {
return false;
}
InputAttemptIdentifier obsoleteInput;
Iterator<InputAttemptIdentifier> obsoleteInputsIter = obsoletedInputs.iterator();
while (obsoleteInputsIter.hasNext()) {
obsoleteInput = obsoleteInputsIter.next();
if (input.include(obsoleteInput.getInputIdentifier(), obsoleteInput.getAttemptNumber())) {
return true;
}
}
return false;
}

/////////////////// End of Methods from FetcherCallbackHandler

public void shutdown() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,19 @@ private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
} else {
isInputFinished = isInputFinished(id.getInputIdentifier());
}
return !obsoleteInputs.contains(id) && !isInputFinished;
return !isObsoleteInputAttemptIdentifier(id) && !isInputFinished;
}

private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) {
InputAttemptIdentifier obsoleteInput;
Iterator<InputAttemptIdentifier> obsoleteInputsIter = obsoleteInputs.iterator();
while (obsoleteInputsIter.hasNext()) {
obsoleteInput = obsoleteInputsIter.next();
if (input.include(obsoleteInput.getInputIdentifier(), obsoleteInput.getAttemptNumber())) {
return true;
}
}
return false;
}

public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
Expand Down