Skip to content

Commit

Permalink
[GOBBLIN-2145] fix bug in getting flow status (#4040)
Browse files Browse the repository at this point in the history
* fix bug in getting flow status
  • Loading branch information
arjun4084346 committed Aug 29, 2024
1 parent 444f266 commit f6ba473
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public List<FlowStatus> getFlowStatusesAcrossGroup(String flowGroup, int countPe
* @return true, if any jobs of the flow are RUNNING.
*/
public boolean isFlowRunning(String flowName, String flowGroup, long flowExecutionId) {
List<FlowStatus> flowStatusList = jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup);
List<FlowStatus> flowStatusList = jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName);

if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testIsFlowRunningFirstExecution() {
String flowName = "testName";
String flowGroup = "testGroup";
long currFlowExecutionId = 1234L;
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(null);
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn(null);

FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, currFlowExecutionId));
Expand All @@ -59,7 +59,7 @@ public void testIsFlowRunningCompiledPastExecution() {
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
FlowStatus flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn(
Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// Block the next execution if the prior one is in compiled as it's considered still running
Expand All @@ -78,15 +78,15 @@ public void skipFlowConcurrentCheckSameFlowExecutionId() {
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
FlowStatus flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn(
Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// If the flow is compiled but the flow execution status is the same as the one about to be kicked off, do not consider it as running.
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
}

@Test
public void testIsFlowRunningJobExecutionIgnored() {
@Test
public void testIsFlowRunningJobExecutionIgnored() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
Expand All @@ -109,14 +109,14 @@ public void testIsFlowRunningJobExecutionIgnored() {
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);

FlowStatus flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(Lists.newArrayList(flowStatus));
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn(Lists.newArrayList(flowStatus));
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));

jobStatus4 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, jobStatus4).iterator();
flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(Collections.singletonList(flowStatus));
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn(Collections.singletonList(flowStatus));
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId+1));
}

Expand Down Expand Up @@ -186,7 +186,7 @@ public void testIsFlowRunning_NoFlowStatuses_ReturnsFalse() {
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);

// Mocking the retrieval of empty flowStatusList
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup))
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName))
.thenReturn(Collections.emptyList());

Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
Expand All @@ -207,7 +207,7 @@ public void testIsFlowRunning_AllFinishedFlowStatuses_ReturnsFalse() {
createFlowStatus(flowName, flowGroup, flowExecutionId, "CANCELLED")
);

when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup))
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName))
.thenReturn(flowStatusList);

Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
Expand All @@ -229,9 +229,9 @@ public void testIsFlowRunning_FlowStatusNotMatchingFlowExecutionIdAndOneOfTheSta
createFlowStatus(flowName, flowGroup, flowExecutionId+1, "FAILED"),
createFlowStatus(flowName, flowGroup, flowExecutionId, "COMPLETE")

);
);

when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup))
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName))
.thenReturn(flowStatusList);

Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId + 1));
Expand All @@ -250,7 +250,7 @@ public void testIsFlowRunning_FlowStatusMatchingFlowExecutionId_ReturnsFalse() {
createFlowStatus(flowName, flowGroup, flowExecutionId, "RUNNING")
);

when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup))
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName))
.thenReturn(flowStatusList);

Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
Expand Down

0 comments on commit f6ba473

Please sign in to comment.