diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 1e9144640be..03f1a2629e9 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -191,7 +191,7 @@ public class ConfigurationKeys { public static final String JOB_GROUP_KEY = "job.group"; public static final String JOB_TAG_KEY = "job.tag"; public static final String JOB_DESCRIPTION_KEY = "job.description"; - public static final String JOB_CURRENT_ATTEMPTS = "job.currentAttempts"; + public static final String JOB_ATTEMPT_ID = "job.attemptId"; public static final String JOB_CURRENT_GENERATION = "job.currentGeneration"; // Job launcher type public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type"; diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 45c3685d17a..cd0d2d0e7cb 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -183,9 +183,9 @@ public static List> initBaseEventTags(Properties jobProps, jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobExecutionId))); } - if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) { + if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) { metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, - jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1"))); + jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1"))); metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "1"))); metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java index 9ced348ac4b..e1e3dcfbc73 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java @@ -399,9 +399,9 @@ private static List> addAdditionalMetadataTags(Properties jobPr metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY))); - if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) { + if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) { metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, - jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1"))); + jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1"))); metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "1"))); metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java index a2f5e1062d3..a5a0850217f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java @@ -128,7 +128,7 @@ public static JobExecutionPlan getJobExecutionPlan(DagNode dag public static JobSpec getJobSpec(DagNode dagNode) { JobSpec jobSpec = dagNode.getValue().getJobSpec(); - Map configWithCurrentAttempts = ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, String.valueOf(dagNode.getValue().getCurrentAttempts()), + Map configWithCurrentAttempts = ImmutableMap.of(ConfigurationKeys.JOB_ATTEMPT_ID, String.valueOf(dagNode.getValue().getCurrentAttempts()), ConfigurationKeys.JOB_CURRENT_GENERATION, String.valueOf(dagNode.getValue().getCurrentGeneration())); Properties configAsProperties = (Properties) jobSpec.getConfigAsProperties().clone(); configAsProperties.putAll(configWithCurrentAttempts); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java index 2a2cf170494..8ee9a690911 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java @@ -62,6 +62,7 @@ public class Help { public static final String USER_TO_PROXY_KEY = "user.to.proxy"; public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy"; public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey"; + public static final String DEFAULT_GAAS_ATTEMPT_ID = "1"; // treat `JobState` as immutable and cache, for reuse among activities executed by the same worker private static final transient Cache jobStateByPath = CacheBuilder.newBuilder().recordStats().build(); @@ -105,7 +106,9 @@ public static String calcPerExecQualifier(Config workerConfig) { ? workerConfig.getString(USER_TO_PROXY_KEY) : ""; String gaasFlowExecId = workerConfig.hasPath(ConfigurationKeys.GAAS_JOB_EXEC_ID) ? workerConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID) : UUID.randomUUID().toString(); - return userToProxy + "_" + gaasFlowExecId; + String gaasAttemptId = workerConfig.hasPath(ConfigurationKeys.JOB_ATTEMPT_ID) + ? workerConfig.getString(ConfigurationKeys.JOB_ATTEMPT_ID) : DEFAULT_GAAS_ATTEMPT_ID; + return String.join("_", userToProxy, gaasFlowExecId, gaasAttemptId); } public static FileSystem loadFileSystem(FileSystemApt a) throws IOException { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java index 6ef80f00d86..e23b5dab7ea 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java @@ -109,9 +109,9 @@ public Builder withGaaSJobProps(Properties jobProps) { this.tags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))); } - if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) { + if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) { this.tags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, - jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1"))); + jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1"))); this.tags.add(new Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, "1"))); this.tags.add(new Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,