diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index cdd5728fdc8..74ce374f283 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -102,6 +102,8 @@ public interface GobblinTemporalConfigurationKeys { /** * Activities timeout configs */ + String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED = PREFIX + "activity.heartbeat.enabled"; + String DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED = "true"; String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX + "activity.heartbeat.timeout.minutes"; int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5; String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX + "activity.heartbeat.interval.minutes"; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java index 22126b2d0fc..5003e822cc8 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java @@ -62,7 +62,9 @@ public enum ActivityType { } public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbeatTimeout) { - if (!setHeartbeatTimeout) { + if (!setHeartbeatTimeout || !PropertiesUtils.getPropAsBoolean(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED)) { return buildActivityOptionsWithoutHeartBeatTimeout(props); } return ActivityOptions.newBuilder() diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index b106cdc1532..7eb3c71edb4 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -103,7 +103,7 @@ public CommitStats commit(WUProcessingSpec workSpec) { JobState jobState = Help.loadJobState(workSpec, fs); int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState); - heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running Commit Activity"), + heartBeatExecutor.scheduleAtFixedRate(ExecutorsUtils.safeRunnable(() -> activityExecutionContext.heartbeat("Running Commit Activity")), heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES); optJobName = Optional.ofNullable(jobState.getJobName()); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java index fdeaf424870..764e7cc568a 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java @@ -138,7 +138,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi log.info("Created jobState: {}", jobState.toJsonString(true)); int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState); - heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits"), + heartBeatExecutor.scheduleAtFixedRate(ExecutorsUtils.safeRunnable(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits")), heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES); Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java index ffd84d0bc1a..e770c1a27bf 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java @@ -86,7 +86,7 @@ public int processWorkUnit(WorkUnitClaimCheck wu) { log.info("{} - loaded; found {} workUnits", correlator, workUnits.size()); JobState jobState = Help.loadJobState(wu, fs); int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState); - heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity"), + heartBeatExecutor.scheduleAtFixedRate(ExecutorsUtils.safeRunnable(() -> activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity")), heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES); troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties()); troubleshooter.start(); diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java index 8ce5c2991dd..4e8ba3639e4 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java @@ -127,4 +127,11 @@ public void testBuildActivityOptions(ActivityType activityType, int expectedTime Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21); } + @Test + public void testBuildActivityOptionsWithHeartBeatTimeoutDisabled() { + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED, "false"); + activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props, true)) + .forEach(options -> Assert.assertNull(options.getHeartbeatTimeout(), "Heartbeat timeout should be null")); + } + } diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java index 09f2f00b49a..f3802f3ed32 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java @@ -28,20 +28,22 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import org.apache.gobblin.util.executors.MDCPropagatingCallable; -import org.apache.gobblin.util.executors.MDCPropagatingRunnable; -import org.apache.gobblin.util.executors.MDCPropagatingScheduledExecutorService; import org.slf4j.Logger; +import lombok.extern.slf4j.Slf4j; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.gobblin.util.executors.MDCPropagatingCallable; import org.apache.gobblin.util.executors.MDCPropagatingExecutorService; +import org.apache.gobblin.util.executors.MDCPropagatingRunnable; +import org.apache.gobblin.util.executors.MDCPropagatingScheduledExecutorService; /** @@ -49,6 +51,7 @@ * * @author Yinan Li */ +@Slf4j public class ExecutorsUtils { private static final ThreadFactory DEFAULT_THREAD_FACTORY = newThreadFactory(Optional.absent()); @@ -162,6 +165,28 @@ public static Runnable loggingDecorator(Runnable runnable) { return new MDCPropagatingRunnable(runnable); } + + /** + * Wraps a {@link Runnable} task with exception handling to ensure that + * any thrown exception does not terminate the thread or scheduled executor. + * This is useful for long-running or recurring tasks where resilience is critical. + * + * @param runnable the task to wrap + * @return a safe {@link Runnable} that logs and suppresses exceptions + */ + public static Runnable safeRunnable(Runnable runnable) { + return () -> { + try { + runnable.run(); + } catch (Exception exception) { + // Catch all exceptions to prevent the thread from dying + // and log the exception + log.warn("Caught exception in runnable {}", exception.getMessage()); + log.debug("Caught exception in runnable ", exception); + } + }; + } + /** * Creates an {@link Callable} which propagates the MDC * information across thread boundaries. diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/ExecutorsUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/ExecutorsUtilsTest.java index 0d7bb073bf4..ef897d8d127 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/ExecutorsUtilsTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/ExecutorsUtilsTest.java @@ -129,4 +129,26 @@ public String apply(Integer input) { ExecutorsUtils.parallelize(nums, sleepAndMultiply, 2, 1, Optional. absent()); } + + @Test + public void testSafeRunnableRunsSuccessfully() { + Runnable mockRunnable = Mockito.mock(Runnable.class); + ExecutorsUtils.safeRunnable(mockRunnable).run(); + Mockito.verify(mockRunnable, Mockito.times(1)).run(); + } + + @Test + public void testSafeRunnableHandlesException() { + Runnable mockRunnable = Mockito.mock(Runnable.class); + Mockito.doThrow(new RuntimeException("Test exception")).when(mockRunnable).run(); + Runnable safeRunnable = ExecutorsUtils.safeRunnable(mockRunnable); + try { + safeRunnable.run(); + safeRunnable.run(); + Mockito.verify(mockRunnable, Mockito.times(2)).run(); + } catch (Exception e) { + Assert.fail("Exception should not be thrown from safeRunnable"); + } + } + }