diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java index b8fb436e4ce..935b18d0ccb 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java @@ -36,7 +36,6 @@ import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JobLauncherUtils; -import org.apache.gobblin.util.PropertiesUtils; /** @@ -96,17 +95,17 @@ public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) { } public static String createPlanningJobId (Properties jobPlanningProps) { - long planningJobId = PropertiesUtils.getPropAsBoolean(jobPlanningProps, GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ? - System.currentTimeMillis() : PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()); + String flowExecIdSuffix = jobPlanningProps.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) ? + "_" + jobPlanningProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) : ""; return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX - + JobState.getJobNameFromProps(jobPlanningProps), planningJobId); + + JobState.getJobNameFromProps(jobPlanningProps) + flowExecIdSuffix); } public static String createActualJobId (Properties jobProps) { - long actualJobId = PropertiesUtils.getPropAsBoolean(jobProps, GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ? - System.currentTimeMillis() : PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()); + String flowExecIdSuffix = jobProps.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) ? + "_" + jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) : ""; return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX - + JobState.getJobNameFromProps(jobProps), actualJobId); + + JobState.getJobNameFromProps(jobProps) + flowExecIdSuffix); } @Nullable diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java index 022a9fb8a66..2070422b019 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java @@ -34,20 +34,19 @@ void testMapJobNameWithFlowExecutionId() { props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1"); String planningJobId = HelixJobsMapping.createPlanningJobId(props); String actualJobId = HelixJobsMapping.createActualJobId(props); - Assert.assertEquals(planningJobId, "job_PlanningJobjob1_1234"); - Assert.assertEquals(actualJobId, "job_ActualJobjob1_1234"); + // The jobID contains the system timestamp that we need to parse out + Assert.assertEquals(planningJobId.substring(0, planningJobId.lastIndexOf("_")), "job_PlanningJobjob1_1234"); + Assert.assertEquals(actualJobId.substring(0, actualJobId.lastIndexOf("_")), "job_ActualJobjob1_1234"); } @Test - void testMapJobNameWithOverride() { + void testMapJobNameWithoutFlowExecutionId() { Properties props = new Properties(); - props.setProperty(GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "true"); - props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "1234"); props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1"); String planningJobId = HelixJobsMapping.createPlanningJobId(props); String actualJobId = HelixJobsMapping.createActualJobId(props); - // The jobID will be the system timestamp instead of the flow execution ID - Assert.assertNotEquals(planningJobId, "job_PlanningJobjob1_1234"); - Assert.assertNotEquals(actualJobId, "job_ActualJobjob1_1234"); + // The jobID contains the system timestamp that we need to parse out + Assert.assertEquals(planningJobId.substring(0, planningJobId.lastIndexOf("_")), "job_PlanningJobjob1"); + Assert.assertEquals(actualJobId.substring(0, actualJobId.lastIndexOf("_")), "job_ActualJobjob1"); } }