Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public interface GobblinTemporalConfigurationKeys {
/**
* Activities timeout configs
*/
String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED = PREFIX + "activity.heartbeat.enabled";
String DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED = "true";
String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX + "activity.heartbeat.timeout.minutes";
int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5;
String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX + "activity.heartbeat.interval.minutes";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public enum ActivityType {
}

public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbeatTimeout) {
if (!setHeartbeatTimeout) {
if (!setHeartbeatTimeout || !PropertiesUtils.getPropAsBoolean(props,
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED)) {
return buildActivityOptionsWithoutHeartBeatTimeout(props);
}
return ActivityOptions.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public CommitStats commit(WUProcessingSpec workSpec) {
JobState jobState = Help.loadJobState(workSpec, fs);

int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running Commit Activity"),
heartBeatExecutor.scheduleAtFixedRate(ExecutorsUtils.safeRunnable(() -> activityExecutionContext.heartbeat("Running Commit Activity")),
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);

optJobName = Optional.ofNullable(jobState.getJobName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
log.info("Created jobState: {}", jobState.toJsonString(true));

int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
heartBeatExecutor.scheduleAtFixedRate(ExecutorsUtils.safeRunnable(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits")),
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);

Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public int processWorkUnit(WorkUnitClaimCheck wu) {
log.info("{} - loaded; found {} workUnits", correlator, workUnits.size());
JobState jobState = Help.loadJobState(wu, fs);
int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity"),
heartBeatExecutor.scheduleAtFixedRate(ExecutorsUtils.safeRunnable(() -> activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity")),
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
troubleshooter.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,11 @@ public void testBuildActivityOptions(ActivityType activityType, int expectedTime
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21);
}

@Test
public void testBuildActivityOptionsWithHeartBeatTimeoutDisabled() {
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_ENABLED, "false");
activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props, true))
.forEach(options -> Assert.assertNull(options.getHeartbeatTimeout(), "Heartbeat timeout should be null"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,30 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.gobblin.util.executors.MDCPropagatingCallable;
import org.apache.gobblin.util.executors.MDCPropagatingRunnable;
import org.apache.gobblin.util.executors.MDCPropagatingScheduledExecutorService;
import org.slf4j.Logger;
import lombok.extern.slf4j.Slf4j;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.gobblin.util.executors.MDCPropagatingCallable;
import org.apache.gobblin.util.executors.MDCPropagatingExecutorService;
import org.apache.gobblin.util.executors.MDCPropagatingRunnable;
import org.apache.gobblin.util.executors.MDCPropagatingScheduledExecutorService;


/**
* A utility class to use with {@link java.util.concurrent.Executors} in cases such as when creating new thread pools.
*
* @author Yinan Li
*/
@Slf4j
public class ExecutorsUtils {

private static final ThreadFactory DEFAULT_THREAD_FACTORY = newThreadFactory(Optional.<Logger>absent());
Expand Down Expand Up @@ -162,6 +165,28 @@ public static Runnable loggingDecorator(Runnable runnable) {
return new MDCPropagatingRunnable(runnable);
}


/**
* Wraps a {@link Runnable} task with exception handling to ensure that
* any thrown exception does not terminate the thread or scheduled executor.
* This is useful for long-running or recurring tasks where resilience is critical.
*
* @param runnable the task to wrap
* @return a safe {@link Runnable} that logs and suppresses exceptions
*/
public static Runnable safeRunnable(Runnable runnable) {
return () -> {
try {
runnable.run();
} catch (Exception exception) {
// Catch all exceptions to prevent the thread from dying
// and log the exception
log.warn("Caught exception in runnable {}", exception.getMessage());
log.debug("Caught exception in runnable ", exception);
}
};
}

/**
* Creates an {@link Callable<T>} which propagates the MDC
* information across thread boundaries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,26 @@ public String apply(Integer input) {

ExecutorsUtils.parallelize(nums, sleepAndMultiply, 2, 1, Optional.<Logger> absent());
}

@Test
public void testSafeRunnableRunsSuccessfully() {
Runnable mockRunnable = Mockito.mock(Runnable.class);
ExecutorsUtils.safeRunnable(mockRunnable).run();
Mockito.verify(mockRunnable, Mockito.times(1)).run();
}

@Test
public void testSafeRunnableHandlesException() {
Runnable mockRunnable = Mockito.mock(Runnable.class);
Mockito.doThrow(new RuntimeException("Test exception")).when(mockRunnable).run();
Runnable safeRunnable = ExecutorsUtils.safeRunnable(mockRunnable);
try {
safeRunnable.run();
safeRunnable.run();
Mockito.verify(mockRunnable, Mockito.times(2)).run();
} catch (Exception e) {
Assert.fail("Exception should not be thrown from safeRunnable");
}
}

}
Loading