Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public class ConfigurationKeys {
public static final String JOB_GROUP_KEY = "job.group";
public static final String JOB_TAG_KEY = "job.tag";
public static final String JOB_DESCRIPTION_KEY = "job.description";
public static final String JOB_CURRENT_ATTEMPTS = "job.currentAttempts";
public static final String JOB_ATTEMPT_ID = "job.attemptId";
public static final String JOB_CURRENT_GENERATION = "job.currentGeneration";
// Job launcher type
public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ public static List<? extends Tag<?>> initBaseEventTags(Properties jobProps,
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId)));
}

if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) {
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "1")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties jobPr
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));

if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) {
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "1")));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan> dag

public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
JobSpec jobSpec = dagNode.getValue().getJobSpec();
Map<String, String> configWithCurrentAttempts = ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, String.valueOf(dagNode.getValue().getCurrentAttempts()),
Map<String, String> configWithCurrentAttempts = ImmutableMap.of(ConfigurationKeys.JOB_ATTEMPT_ID, String.valueOf(dagNode.getValue().getCurrentAttempts()),
ConfigurationKeys.JOB_CURRENT_GENERATION, String.valueOf(dagNode.getValue().getCurrentGeneration()));
Properties configAsProperties = (Properties) jobSpec.getConfigAsProperties().clone();
configAsProperties.putAll(configWithCurrentAttempts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Help {
public static final String USER_TO_PROXY_KEY = "user.to.proxy";
public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy";
public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey";
public static final String DEFAULT_GAAS_ATTEMPT_ID = "1";

// treat `JobState` as immutable and cache, for reuse among activities executed by the same worker
private static final transient Cache<Path, JobState> jobStateByPath = CacheBuilder.newBuilder().recordStats().build();
Expand Down Expand Up @@ -105,7 +106,9 @@ public static String calcPerExecQualifier(Config workerConfig) {
? workerConfig.getString(USER_TO_PROXY_KEY) : "";
String gaasFlowExecId = workerConfig.hasPath(ConfigurationKeys.GAAS_JOB_EXEC_ID)
? workerConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID) : UUID.randomUUID().toString();
return userToProxy + "_" + gaasFlowExecId;
String gaasAttemptId = workerConfig.hasPath(ConfigurationKeys.JOB_ATTEMPT_ID)
? workerConfig.getString(ConfigurationKeys.JOB_ATTEMPT_ID) : DEFAULT_GAAS_ATTEMPT_ID;
return String.join("_", userToProxy, gaasFlowExecId, gaasAttemptId);
}

public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ public Builder withGaaSJobProps(Properties jobProps) {
this.tags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
}

if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) {
this.tags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1")));
this.tags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "1")));
this.tags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
Expand Down
Loading