diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 026bfda1379..994eedb6ea8 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -25,7 +25,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -547,13 +546,11 @@ private void addContainerLocalResources(Path destDir, Map FileStatus[] statuses = this.fs.listStatus(destDir); if (statuses != null) { - Set libJarNames = new HashSet<>(Arrays.asList(this.config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST).split(","))); - String containerJars = this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY) ? - this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY) : ""; + Set appLibJars = YarnHelixUtils.getAppLibJarList(this.config); for (FileStatus status : statuses) { String fileName = status.getPath().getName(); // Ensure that we are only adding jars that were uploaded by the YarnAppLauncher for this application - if (fileName.contains(".jar") && !(libJarNames.contains(fileName) || containerJars.contains(fileName))) { + if (fileName.contains(".jar") && !appLibJars.contains(fileName)) { continue; } YarnHelixUtils.addFileAsLocalResource(this.fs, status.getPath(), LocalResourceType.FILE, resourceMap); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java index 4a74da44a53..d38ebe52ee1 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java @@ -23,9 +23,11 @@ import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -52,6 +54,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.typesafe.config.Config; import org.apache.gobblin.util.ConfigUtils; @@ -237,6 +240,16 @@ public static boolean retainKLatestJarCachePaths(Path parentCachePath, int k, Fi return deletesSuccessful; } + + public static Set getAppLibJarList(Config config) { + Set libAppJars = new HashSet<>(Arrays.asList( + ConfigUtils.getString(config, GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST, "").split(","))); + Set containerJars = new HashSet<>(Arrays.asList( + ConfigUtils.getString(config, GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY, "").split(","))); + libAppJars.addAll(containerJars); + return Sets.filter(libAppJars, s -> !s.isEmpty()); + } + public static void addRemoteFilesToLocalResources(String hdfsFileList, Map resourceMap, Configuration yarnConfiguration) throws IOException { for (String hdfsFilePath : SPLITTER.split(hdfsFileList)) { Path srcFilePath = new Path(hdfsFilePath); diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java index c271258c1d4..009a403c00f 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.yarn; import java.io.IOException; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -97,6 +98,40 @@ public void retainLatestKJarCachePaths() throws IOException { // Should be cleaned up Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-07"))); Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-06"))); + } + + @Test + public void testGetJarListFromConfigs() { + // Test when container jars is empty + Config emptyContainerJarsList = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST, ConfigValueFactory.fromAnyRef("a.jar,b.jar")) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY, ConfigValueFactory.fromAnyRef("")); + + Set jars = YarnHelixUtils.getAppLibJarList(emptyContainerJarsList); + Assert.assertEquals(2, jars.size()); + Assert.assertTrue(jars.contains("a.jar")); + Assert.assertTrue(jars.contains("b.jar")); + + // Test when yarn application lib jars is empty + Config emptyYarnAppLibJarsConfig = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST, ConfigValueFactory.fromAnyRef("")) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY, ConfigValueFactory.fromAnyRef("c.jar,d.jar")); + + jars = YarnHelixUtils.getAppLibJarList(emptyYarnAppLibJarsConfig); + Assert.assertEquals(2, jars.size()); + Assert.assertTrue(jars.contains("c.jar")); + Assert.assertTrue(jars.contains("d.jar")); + + // Test when both yarn application lib jars and container jars are not empty + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST, ConfigValueFactory.fromAnyRef("a.jar,b.jar")) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY, ConfigValueFactory.fromAnyRef("c.jar,d.jar")); + jars = YarnHelixUtils.getAppLibJarList(config); + Assert.assertEquals(4, jars.size()); + Assert.assertTrue(jars.contains("a.jar")); + Assert.assertTrue(jars.contains("b.jar")); + Assert.assertTrue(jars.contains("c.jar")); + Assert.assertTrue(jars.contains("d.jar")); } } \ No newline at end of file