Skip to content

Commit

Permalink
Fix for job id tracking where configuration is unnecessary (apache#3729)
Browse files Browse the repository at this point in the history
  • Loading branch information
Will-Lo authored and phet committed Aug 15, 2023
1 parent 027156c commit 800ae71
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.PropertiesUtils;


/**
Expand Down Expand Up @@ -96,17 +95,17 @@ public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) {
}

public static String createPlanningJobId (Properties jobPlanningProps) {
long planningJobId = PropertiesUtils.getPropAsBoolean(jobPlanningProps, GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ?
System.currentTimeMillis() : PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis());
String flowExecIdSuffix = jobPlanningProps.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) ?
"_" + jobPlanningProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) : "";
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
+ JobState.getJobNameFromProps(jobPlanningProps), planningJobId);
+ JobState.getJobNameFromProps(jobPlanningProps) + flowExecIdSuffix);
}

public static String createActualJobId (Properties jobProps) {
long actualJobId = PropertiesUtils.getPropAsBoolean(jobProps, GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ?
System.currentTimeMillis() : PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis());
String flowExecIdSuffix = jobProps.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) ?
"_" + jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) : "";
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+ JobState.getJobNameFromProps(jobProps), actualJobId);
+ JobState.getJobNameFromProps(jobProps) + flowExecIdSuffix);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,19 @@ void testMapJobNameWithFlowExecutionId() {
props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
String planningJobId = HelixJobsMapping.createPlanningJobId(props);
String actualJobId = HelixJobsMapping.createActualJobId(props);
Assert.assertEquals(planningJobId, "job_PlanningJobjob1_1234");
Assert.assertEquals(actualJobId, "job_ActualJobjob1_1234");
// The jobID contains the system timestamp that we need to parse out
Assert.assertEquals(planningJobId.substring(0, planningJobId.lastIndexOf("_")), "job_PlanningJobjob1_1234");
Assert.assertEquals(actualJobId.substring(0, actualJobId.lastIndexOf("_")), "job_ActualJobjob1_1234");
}

@Test
void testMapJobNameWithOverride() {
void testMapJobNameWithoutFlowExecutionId() {
Properties props = new Properties();
props.setProperty(GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "true");
props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "1234");
props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
String planningJobId = HelixJobsMapping.createPlanningJobId(props);
String actualJobId = HelixJobsMapping.createActualJobId(props);
// The jobID will be the system timestamp instead of the flow execution ID
Assert.assertNotEquals(planningJobId, "job_PlanningJobjob1_1234");
Assert.assertNotEquals(actualJobId, "job_ActualJobjob1_1234");
// The jobID contains the system timestamp that we need to parse out
Assert.assertEquals(planningJobId.substring(0, planningJobId.lastIndexOf("_")), "job_PlanningJobjob1");
Assert.assertEquals(actualJobId.substring(0, actualJobId.lastIndexOf("_")), "job_ActualJobjob1");
}
}

0 comments on commit 800ae71

Please sign in to comment.