From f6ba4731f6e47a5b0701341e5f6d7e9925946119 Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Wed, 28 Aug 2024 21:00:13 -0700 Subject: [PATCH] [GOBBLIN-2145] fix bug in getting flow status (#4040) * fix bug in getting flow status --- .../monitoring/FlowStatusGenerator.java | 2 +- .../monitoring/FlowStatusGeneratorTest.java | 24 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java index b4ec8c3530..dea106c294 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java @@ -154,7 +154,7 @@ public List getFlowStatusesAcrossGroup(String flowGroup, int countPe * @return true, if any jobs of the flow are RUNNING. */ public boolean isFlowRunning(String flowName, String flowGroup, long flowExecutionId) { - List flowStatusList = jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup); + List flowStatusList = jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName); if (flowStatusList == null || flowStatusList.isEmpty()) { return false; diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java index 9e84a1fa03..00ad043df8 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java @@ -43,7 +43,7 @@ public void testIsFlowRunningFirstExecution() { String flowName = "testName"; String flowGroup = "testGroup"; long currFlowExecutionId = 1234L; - when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(null); + when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn(null); FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever); Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, currFlowExecutionId)); @@ -59,7 +59,7 @@ public void testIsFlowRunningCompiledPastExecution() { .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build(); Iterator jobStatusIterator = Lists.newArrayList(jobStatus).iterator(); FlowStatus flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED); - when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn( + when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn( Lists.newArrayList(flowStatus)); FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever); // Block the next execution if the prior one is in compiled as it's considered still running @@ -78,15 +78,15 @@ public void skipFlowConcurrentCheckSameFlowExecutionId() { .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build(); Iterator jobStatusIterator = Lists.newArrayList(jobStatus).iterator(); FlowStatus flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED); - when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn( + when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn( Lists.newArrayList(flowStatus)); FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever); // If the flow is compiled but the flow execution status is the same as the one about to be kicked off, do not consider it as running. Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId)); } - @Test - public void testIsFlowRunningJobExecutionIgnored() { + @Test + public void testIsFlowRunningJobExecutionIgnored() { String flowName = "testName"; String flowGroup = "testGroup"; long flowExecutionId = 1234L; @@ -109,14 +109,14 @@ public void testIsFlowRunningJobExecutionIgnored() { FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever); FlowStatus flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator)); - when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(Lists.newArrayList(flowStatus)); + when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn(Lists.newArrayList(flowStatus)); Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId)); jobStatus4 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId) .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build(); jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, jobStatus4).iterator(); flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator)); - when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(Collections.singletonList(flowStatus)); + when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)).thenReturn(Collections.singletonList(flowStatus)); Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId+1)); } @@ -186,7 +186,7 @@ public void testIsFlowRunning_NoFlowStatuses_ReturnsFalse() { FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever); // Mocking the retrieval of empty flowStatusList - when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)) + when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)) .thenReturn(Collections.emptyList()); Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId)); @@ -207,7 +207,7 @@ public void testIsFlowRunning_AllFinishedFlowStatuses_ReturnsFalse() { createFlowStatus(flowName, flowGroup, flowExecutionId, "CANCELLED") ); - when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)) + when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)) .thenReturn(flowStatusList); Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId)); @@ -229,9 +229,9 @@ public void testIsFlowRunning_FlowStatusNotMatchingFlowExecutionIdAndOneOfTheSta createFlowStatus(flowName, flowGroup, flowExecutionId+1, "FAILED"), createFlowStatus(flowName, flowGroup, flowExecutionId, "COMPLETE") - ); + ); - when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)) + when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)) .thenReturn(flowStatusList); Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId + 1)); @@ -250,7 +250,7 @@ public void testIsFlowRunning_FlowStatusMatchingFlowExecutionId_ReturnsFalse() { createFlowStatus(flowName, flowGroup, flowExecutionId, "RUNNING") ); - when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)) + when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, flowName)) .thenReturn(flowStatusList); Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));