diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ExceptionUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/ExceptionUtils.java index 86be42f725..c47bd03cce 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/ExceptionUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/ExceptionUtils.java @@ -32,8 +32,12 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.Field; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashSet; import java.util.Locale; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.function.Function; @@ -365,10 +369,65 @@ public static T firstOrSuppressed(T newException, @Nullabl if (previous == null || previous == newException) { return newException; - } else { - previous.addSuppressed(newException); + } + + // If the exceptions already reference each other through the suppression or cause chains, + // return the previous exception to avoid introducing cycles. + if (existsInExceptionChain(newException, previous) + || existsInExceptionChain(previous, newException)) { return previous; } + + previous.addSuppressed(newException); + return previous; + } + + /** + * Checks whether the given {@code exception} throwable exception exists anywhere within the + * exception chain of {@code previous}. This includes both the cause chain and all suppressed + * exceptions. A visited set is used to avoid cycles and redundant traversal. + * + * @param exception The throwable exception to search for. + * @param previous The previous throwable exception chain to search in. + * @return True, if the exception is found within the suppressed chain, false otherwise. + */ + private static boolean existsInExceptionChain(Throwable exception, Throwable previous) { + if (exception == null || previous == null) { + return false; + } + if (exception == previous) { + return true; + } + + // Apply cycle prevention through a graph-like traversal of existing + // suppressed or cause chain exceptions + Set previousExceptions = new HashSet<>(); + Deque exceptionStack = new ArrayDeque<>(); + exceptionStack.push(previous); + + while (!exceptionStack.isEmpty()) { + Throwable currentException = exceptionStack.pop(); + if (!previousExceptions.add(currentException)) { + continue; + } + + if (currentException == exception) { + return true; + } + + // Traverse suppression chain + for (Throwable suppressed : currentException.getSuppressed()) { + exceptionStack.push(suppressed); + } + + // Traverse cause-chain + Throwable cause = currentException.getCause(); + if (cause != null) { + exceptionStack.push(cause); + } + } + + return false; } /** diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/ExceptionUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/ExceptionUtilsTest.java index 04b4d951fc..469a62e782 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/ExceptionUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/ExceptionUtilsTest.java @@ -17,6 +17,7 @@ package org.apache.fluss.utils; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.concurrent.CompletionException; @@ -108,7 +109,7 @@ public void testFindThrowableByType() { @Test void testFirstOrSuppressed() { - // tet first exception + // test first exception Exception exception = new Exception("exception"); assertThat(ExceptionUtils.firstOrSuppressed(exception, null)).isEqualTo(exception); @@ -121,6 +122,35 @@ void testFirstOrSuppressed() { assertThat(suppressedException.getSuppressed()).isEqualTo(new Throwable[] {newException}); } + @Test + void testFirstOrSuppressedCyclePrevention() { + // create two test exceptions (assuming thrown during shutdown, etc.) + Exception exceptionA = new Exception("Exception A"); + Exception exceptionB = new Exception("Exception B"); + + // associate the suppressions (creating a suppression chain) + ExceptionUtils.firstOrSuppressed(exceptionB, exceptionA); + assertThat(exceptionA.getSuppressed()).contains(exceptionB); + + // attempt to create a suppression cycle (A -> B; B -> A) + Exception result = ExceptionUtils.firstOrSuppressed(exceptionA, exceptionB); + assertThat(result).isEqualTo(exceptionB); + + // verify the exception cycle was prevented (no bidirectional reference) + assertThat(exceptionA.getSuppressed()).contains(exceptionB); + assertThat(exceptionB.getSuppressed()).doesNotContain(exceptionA); + assertThat(exceptionB.getSuppressed()).isEmpty(); + + // verify that processing suppressed exceptions no longer causes StackOverflowError + Assertions.assertDoesNotThrow(() -> recursivelyProcessSuppressedExceptions(exceptionA)); + } + + private void recursivelyProcessSuppressedExceptions(Throwable throwable) { + for (Throwable suppressed : throwable.getSuppressed()) { + recursivelyProcessSuppressedExceptions(suppressed); + } + } + @Test public void testExceptionStripping() { final Exception expectedException = new Exception("test exception");