Skip to content

Commit

Permalink
add a new status SKIPPED for skipped jobs and flows
Browse files Browse the repository at this point in the history
fix merge conflicts
add tests
  • Loading branch information
arjun4084346 committed Sep 5, 2024
1 parent f40bb44 commit 8d5589d
Show file tree
Hide file tree
Showing 20 changed files with 338 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ private void runJobExecutionLauncher() throws JobException {

try {
if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
// todo it should emit SKIPPED_JOB event that sets the job status SKIPPED rather than CANCELLED
TimingEvent timer = new TimingEvent(eventSubmitter, TimingEvent.JOB_SKIPPED_TIME);
HashMap<String, String> metadata = new HashMap<>(Tag.toMap(Tag.tagValuesToString(
HelixUtils.initBaseEventTags(jobProps, Lists.newArrayList()))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static class LauncherTimings {
public static final String JOB_PENDING_RESUME = "JobPendingResume";
public static final String JOB_ORCHESTRATED = "JobOrchestrated";
public static final String JOB_PREPARE = "JobPrepareTimer";
public static final String JOB_SKIPPED = "JobSkipped";
public static final String JOB_START = "JobStartTimer";
public static final String JOB_RUN = "JobRunTimer";
public static final String JOB_COMMIT = "JobCommitTimer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public void testProcessMessageForSkippedFlow() throws IOException, ReflectiveOpe
ImmutableList.of(
createFlowCompiledEvent(),
createJobOrchestratedEvent(1, 2),
createJobSkippedEvent()
createJobSkippedTimeEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
Expand Down Expand Up @@ -838,6 +838,40 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe
jobStatusMonitor.shutDown();
}

@Test
public void testProcessMessageForSkippedEvent() throws IOException, ReflectiveOperationException {
DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic8");

//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createJobSkippedEvent()
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});

try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer(), dagManagementStateStore);
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);

State state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.SKIPPED.name());
Mockito.verify(dagManagementStateStore, Mockito.times(1)).addJobDagAction(
any(), any(), anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));

jobStatusMonitor.shutDown();
}

private State getNextJobStatusState(MockKafkaAvroJobStatusMonitor jobStatusMonitor, Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator,
String jobGroup, String jobName) throws IOException {
jobStatusMonitor.processMessage(recordIterator.next());
Expand Down Expand Up @@ -873,11 +907,15 @@ private GobblinTrackingEvent createJobOrchestratedEvent(int currentAttempt, int
return createGTE(TimingEvent.LauncherTimings.JOB_ORCHESTRATED, metadata);
}

private GobblinTrackingEvent createJobSkippedEvent() {
return createGTE(TimingEvent.LauncherTimings.JOB_SKIPPED, Maps.newHashMap());
}

private GobblinTrackingEvent createJobStartEvent() {
return createGTE(TimingEvent.LauncherTimings.JOB_START, Maps.newHashMap());
}

private GobblinTrackingEvent createJobSkippedEvent() {
private GobblinTrackingEvent createJobSkippedTimeEvent() {
return createGTE(TimingEvent.JOB_SKIPPED_TIME, Maps.newHashMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ enum ExecutionStatus {
* Flow cancelled.
*/
CANCELLED

/**
* Flow or job is skipped
*/
SKIPPED
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
Expand All @@ -23,7 +23,8 @@
"PENDING" : "Flow or job is in pending state.",
"PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
"RUNNING" : "Flow or job is currently executing.",
"SKIPPED" : "Flow or job is skipped."
}
}, {
"type" : "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", "ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED", "SKIPPED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
Expand All @@ -23,7 +23,8 @@
"PENDING" : "Flow or job is in pending state.",
"PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
"RUNNING" : "Flow or job is currently executing.",
"SKIPPED" : "Flow or job is skipped."
}
}, {
"type" : "record",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@Slf4j
public class FlowStatusGenerator {
public static final List<String> FINISHED_STATUSES = Lists.newArrayList(ExecutionStatus.FAILED.name(),
ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), ExecutionStatus.SKIPPED.name());
public static final int MAX_LOOKBACK = 100;

private final JobStatusRetriever jobStatusRetriever;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public interface DagManagementStateStore {
* {@link DagManagementStateStore#addDag}. This call is just an additional identifier which may be used
* for DagNode level operations. In the future, it may be merged with checkpointDag.
* @param dagNode dag node to be added
* @param dagId dag id of the dag this dag node belongs to
*/
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag)
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME) {
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME ||
executionStatus == SKIPPED) {
//Add a node to be executed next, only if all of its parent nodes are COMPLETE.
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
Expand Down Expand Up @@ -157,7 +156,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
log.info("Submitted job {} for dagId {}", DagManagerUtils.getJobName(dagNode), dagId);
}

public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) throws IOException {
Properties cancelJobArgs = new Properties();
String serializedFuture = null;

Expand All @@ -174,6 +173,7 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
} else {
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
}

DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
sendJobCancellationEvent(dagNodeToCancel);
log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture);
Expand All @@ -182,12 +182,34 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException {
/**
* Emits JOB_SKIPPED GTE for each of the dependent job.
*/
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node) {
Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>();
findDependentJobs(dag, node, dependentJobs);
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) {
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue());
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
}
}

private static void findDependentJobs(Dag<JobExecutionPlan> dag,
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> dependentJobs) {
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
if (!dependentJobs.contains(child)) {
dependentJobs.add(child);
findDependentJobs(dag, child, dependentJobs);
}
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag) throws IOException {
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));

for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel);
}
}

Expand All @@ -201,7 +223,7 @@ private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNo
* Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of the provided
* flow event type.
*/
public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) {
public static void setAndEmitFlowEvent(Dag<JobExecutionPlan> dag, String flowEvent) {
if (!dag.isEmpty()) {
// Every dag node will contain the same flow metadata
Config config = DagManagerUtils.getDagJobConfig(dag);
Expand All @@ -212,7 +234,7 @@ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExe
flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
}

eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
DagProc.eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), getDagId());

for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel);
}

dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
log.info("Job exceeded the job start deadline. Killing it now. Job - {}, jobOrchestratedTime - {}, timeOutForJobStart - {}",
DagManagerUtils.getJobName(dagNode), jobOrchestratedTime, timeOutForJobStart);
dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode);
DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNode);
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
}

dag.get().setMessage("Flow killed by request");
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.setAndEmitFlowEvent(dag.get(), TimingEvent.FlowTimings.FLOW_CANCELLED);

if (this.shouldKillSpecificJob) {
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
if (dagNodeToCancel.isPresent()) {
DagProcUtils.cancelDagNode(dagNodeToCancel.get(), dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel.get());
} else {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Did not find Dag node with id {}, it might be already cancelled/finished and thus cleaned up from the store.", getDagNodeId());
}
} else {
DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
DagProcUtils.cancelDag(dag.get());
}
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId());
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING);
DagProcUtils.setAndEmitFlowEvent(dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING);
dagManagementStateStore.getDagManagerMetrics().conditionallyMarkFlowAsState(DagManagerUtils.getFlowId(dag.get()),
DagManager.FlowState.RUNNING);
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, getDagTask().getDagAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
// The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs
// are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the
// status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried.
// This can also happen when a job is cancelled/failed and dag is cleaned; but we are still processing Reevaluate
// dag actions for SKIPPED dependent jobs
log.warn("Dag not found {}", getDagId());
return;
}
Expand All @@ -117,7 +119,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
} else if (DagProcUtils.isDagFinished(dag)) {
String flowEvent = DagProcUtils.calcFlowStatus(dag);
dag.setFlowEvent(flowEvent);
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent);
DagProcUtils.setAndEmitFlowEvent(dag, flowEvent);
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
// todo - verify if work from PR#3641 is required
dagManagementStateStore.deleteDag(getDagId());
Expand Down Expand Up @@ -159,9 +161,12 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
dag.setMessage("Flow failed because job " + jobName + " failed");
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
case CANCELLED:
case SKIPPED:
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
case COMPLETE:
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
long flowResumeTime = System.currentTimeMillis();

// Set the flow and its failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
DagProcUtils.setAndEmitFlowEvent(failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME);

for (Dag.DagNode<JobExecutionPlan> node : failedDag.get().getNodes()) {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ public org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEven
case TimingEvent.LauncherTimings.JOB_PENDING:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING.name());
break;
case TimingEvent.LauncherTimings.JOB_SKIPPED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.SKIPPED.name());
break;
case TimingEvent.FlowTimings.FLOW_PENDING_RESUME:
case TimingEvent.LauncherTimings.JOB_PENDING_RESUME:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.PENDING_RESUME.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by

private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE,
ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);

private final JobIssueEventHandler jobIssueEventHandler;
Expand Down
Loading

0 comments on commit 8d5589d

Please sign in to comment.