Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
Expand Down Expand Up @@ -365,10 +369,65 @@ public static <T extends Throwable> T firstOrSuppressed(T newException, @Nullabl

if (previous == null || previous == newException) {
return newException;
} else {
previous.addSuppressed(newException);
}

// If the exceptions already reference each other through the suppression or cause chains,
// return the previous exception to avoid introducing cycles.
if (existsInExceptionChain(newException, previous)
|| existsInExceptionChain(previous, newException)) {
return previous;
}

previous.addSuppressed(newException);
return previous;
}

/**
* Checks whether the given {@code exception} throwable exception exists anywhere within the
* exception chain of {@code previous}. This includes both the cause chain and all suppressed
* exceptions. A visited set is used to avoid cycles and redundant traversal.
*
* @param exception The throwable exception to search for.
* @param previous The previous throwable exception chain to search in.
* @return True, if the exception is found within the suppressed chain, false otherwise.
*/
private static boolean existsInExceptionChain(Throwable exception, Throwable previous) {
if (exception == null || previous == null) {
return false;
}
if (exception == previous) {
return true;
}

// Apply cycle prevention through a graph-like traversal of existing
// suppressed or cause chain exceptions
Set<Throwable> previousExceptions = new HashSet<>();
Deque<Throwable> exceptionStack = new ArrayDeque<>();
exceptionStack.push(previous);

while (!exceptionStack.isEmpty()) {
Throwable currentException = exceptionStack.pop();
if (!previousExceptions.add(currentException)) {
continue;
}

if (currentException == exception) {
return true;
}

// Traverse suppression chain
for (Throwable suppressed : currentException.getSuppressed()) {
exceptionStack.push(suppressed);
}

// Traverse cause-chain
Throwable cause = currentException.getCause();
if (cause != null) {
exceptionStack.push(cause);
}
}

return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.utils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void testFindThrowableByType() {

@Test
void testFirstOrSuppressed() {
// tet first exception
// test first exception
Exception exception = new Exception("exception");
assertThat(ExceptionUtils.firstOrSuppressed(exception, null)).isEqualTo(exception);

Expand All @@ -121,6 +122,35 @@ void testFirstOrSuppressed() {
assertThat(suppressedException.getSuppressed()).isEqualTo(new Throwable[] {newException});
}

@Test
void testFirstOrSuppressedCyclePrevention() {
// create two test exceptions (assuming thrown during shutdown, etc.)
Exception exceptionA = new Exception("Exception A");
Exception exceptionB = new Exception("Exception B");

// associate the suppressions (creating a suppression chain)
ExceptionUtils.firstOrSuppressed(exceptionB, exceptionA);
assertThat(exceptionA.getSuppressed()).contains(exceptionB);

// attempt to create a suppression cycle (A -> B; B -> A)
Exception result = ExceptionUtils.firstOrSuppressed(exceptionA, exceptionB);
assertThat(result).isEqualTo(exceptionB);

// verify the exception cycle was prevented (no bidirectional reference)
assertThat(exceptionA.getSuppressed()).contains(exceptionB);
assertThat(exceptionB.getSuppressed()).doesNotContain(exceptionA);
assertThat(exceptionB.getSuppressed()).isEmpty();

// verify that processing suppressed exceptions no longer causes StackOverflowError
Assertions.assertDoesNotThrow(() -> recursivelyProcessSuppressedExceptions(exceptionA));
}

private void recursivelyProcessSuppressedExceptions(Throwable throwable) {
for (Throwable suppressed : throwable.getSuppressed()) {
recursivelyProcessSuppressedExceptions(suppressed);
}
}

@Test
public void testExceptionStripping() {
final Exception expectedException = new Exception("test exception");
Expand Down
Loading