Skip to content

Commit

Permalink
ignore flows that are running beyond job start and flow finish deadli…
Browse files Browse the repository at this point in the history
…nes when doing the concurrency check
  • Loading branch information
arjun4084346 committed Sep 5, 2024
1 parent f40bb44 commit d52593a
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URI;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -143,6 +144,11 @@ Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> getDagNodeWit
*/
Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);

/**
* @return list of {@link org.apache.gobblin.service.monitoring.FlowStatus} for the provided flow group and flow name.
*/
List<org.apache.gobblin.service.monitoring.FlowStatus> getAllFlowStatusesForFlow(String flowGroup, String flowName);

/**
* Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name.
* @param flowGroup flow group for the dag action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -43,6 +44,7 @@
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatus;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
Expand Down Expand Up @@ -211,6 +213,11 @@ public Optional<JobStatus> getJobStatus(DagNodeId dagNodeId) {
}
}

@Override
public List<FlowStatus> getAllFlowStatusesForFlow(String flowGroup, String flowName) {
return this.jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName);
}

@Override
public boolean existsJobDagAction(String flowGroup, String flowName, long flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.reflect.ConstructorUtils;
Expand All @@ -38,16 +39,23 @@
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FlowStatus;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;

import static org.apache.gobblin.service.ExecutionStatus.*;


/**
* Helper class with functionality meant to be re-used between the DagManager and Orchestrator when launching
Expand All @@ -70,12 +78,12 @@ public class FlowCompilationValidationHelper {
private final SpecCompiler specCompiler;
private final UserQuotaManager quotaManager;
private final EventSubmitter eventSubmitter;
private final FlowStatusGenerator flowStatusGenerator;
private final DagManagementStateStore dagManagementStateStore;
private final boolean isFlowConcurrencyEnabled;

@Inject
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) {
UserQuotaManager userQuotaManager, DagManagementStateStore dagManagementStateStore) {
try {
String specCompilerClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS);
Expand All @@ -89,7 +97,7 @@ public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton
this.quotaManager = userQuotaManager;
MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass());
this.eventSubmitter = new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build();
this.flowStatusGenerator = flowStatusGenerator;
this.dagManagementStateStore = dagManagementStateStore;
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
}
Expand Down Expand Up @@ -157,7 +165,7 @@ public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Conf
}
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);

if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName, allowConcurrentExecution,
if (isExecutionPermitted(flowGroup, flowName, allowConcurrentExecution,
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))) {
return Optional.fromNullable(jobExecutionPlanDag);
} else {
Expand Down Expand Up @@ -188,9 +196,58 @@ public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Conf
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING.
*/
private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowGroup, String flowName,
boolean allowConcurrentExecution, long flowExecutionId) {
return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
private boolean isExecutionPermitted(String flowGroup, String flowName, boolean allowConcurrentExecution, long flowExecutionId)
throws IOException {
return allowConcurrentExecution || !isFlowBeforeThisRunning(flowName, flowGroup, flowExecutionId);
}

/**
* Returns true if any previous execution for the flow determined by the provided flowGroup, flowName is running.
* We ignore the execution that has the provided flowExecutionId. We also ignore the flows that are running beyond
* the job start deadline and flow finish deadline.
*/
private boolean isFlowBeforeThisRunning(String flowName, String flowGroup, long flowExecutionId) throws IOException {
List<FlowStatus> flowStatusList = dagManagementStateStore.getAllFlowStatusesForFlow(flowGroup, flowName);

if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
}

for (FlowStatus flowStatus : flowStatusList) {
ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();

if (flowStatus.getFlowExecutionId() == flowExecutionId) {
continue;
}

log.info("Verifying if {} is running...", flowStatus);

if (flowExecutionStatus == COMPILED || flowExecutionStatus == PENDING
|| flowExecutionStatus == PENDING_RESUME || flowExecutionStatus == RUNNING) {
DagManager.DagId dagIdOfOldExecution = new DagManager.DagId(flowGroup, flowName, flowStatus.getFlowExecutionId());
java.util.Optional<Dag<JobExecutionPlan>> dag = dagManagementStateStore.getDag(dagIdOfOldExecution);

if (!dag.isPresent()) {
// dag is finished and cleaned up, job status monitor somehow did not receive/update the flow status; just ignore it...
continue;
}

Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
long jobStartDeadline =
DagManagerUtils.getJobStartSla(dagNode, DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
if (((flowExecutionStatus == COMPILED || flowExecutionStatus == PENDING || flowExecutionStatus == PENDING_RESUME)
&& (flowStartTime + jobStartDeadline > System.currentTimeMillis())) ||
((flowExecutionStatus == RUNNING && flowStartTime + flowFinishDeadline > System.currentTimeMillis()))) {
log.info("{} is still running. Found a dag for this, flowStartTime {}, jobStartDeadline {}, flowFinishDeadline {}",
flowStatus, flowStartTime, jobStartDeadline, flowFinishDeadline);
return true;
}
}
}

return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup,
@Override
public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String flowGroup, String flowName) {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
List<State> jobStatusStates = timeOpAndWrapIOException(() -> this.stateStore.getAllWithPrefix(storeName),
List<State> jobStatusStates = timeOpAndWrapIOException(() -> this.stateStore.getAll(storeName),
GET_LATEST_FLOW_GROUP_STATUS_METRIC);
return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, jobStatusStates,null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void setUp() throws Exception {

SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));

FlowCompilationValidationHelper flowCompilationValidationHelper = new FlowCompilationValidationHelper(ConfigFactory.empty(), sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockFlowStatusGenerator);
FlowCompilationValidationHelper flowCompilationValidationHelper = new FlowCompilationValidationHelper(ConfigFactory.empty(), sharedFlowMetricsSingleton, mock(UserQuotaManager.class), dagManagementStateStore);
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.topologyCatalog, mockDagManager, Optional.of(logger), mockFlowStatusGenerator,
Optional.absent(), sharedFlowMetricsSingleton, Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
Expand Down

0 comments on commit d52593a

Please sign in to comment.