Skip to content

Commit 88fdddc

Browse files
committed
Cleanup
1 parent 72eb403 commit 88fdddc

File tree

3 files changed

+10
-11
lines changed

3 files changed

+10
-11
lines changed

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ private void addJars(Path jarFileDir, String jarFileList, Configuration conf) th
572572
Path srcJarFile = new Path(jarFile);
573573
FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
574574
for (FileStatus status : fileStatusList) {
575-
Path destJarFile = HdfsJarUploadUtils.calculateDestJarFile(fs, status, this.unsharedJarsDir, jarFileDir);
575+
Path destJarFile = HdfsJarUploadUtils.calculateDestJarFilePath(fs, status.getPath().getName(), this.unsharedJarsDir, jarFileDir);
576576
// For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence
577577
// or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory.
578578
// the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging.

gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ public class HdfsJarUploadUtils {
3939
* given the {@link FileStatus} of a jarFile and the path of directory that contains jar.
4040
* Snapshot dirs should not be shared, as different jobs may be using different versions of it.
4141
* @param fs
42-
* @param localJar
42+
* @param localJarPath
4343
* @param unsharedJarsDir
4444
* @param jarCacheDir
4545
* @return
4646
* @throws IOException
4747
*/
48-
public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, Path unsharedJarsDir, Path jarCacheDir) throws IOException {
49-
Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir;
50-
Path destJarFile = new Path(fs.makeQualified(uploadDir), localJar.getPath().getName());
48+
public static Path calculateDestJarFilePath(FileSystem fs, String localJarPath, Path unsharedJarsDir, Path jarCacheDir) throws IOException {
49+
Path uploadDir = localJarPath.contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir;
50+
Path destJarFile = new Path(fs.makeQualified(uploadDir), localJarPath);
5151
return destJarFile;
5252
}
5353
/**
@@ -67,10 +67,9 @@ public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int ma
6767
Thread.sleep(WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS);
6868
throw new IOException("Waiting for file to complete on uploading ... ");
6969
}
70-
// Set the first parameter as false for not deleting sourceFile
71-
// Set the second parameter as false for not overwriting existing file on the target, by default it is true.
72-
// If the file is preExisted but overwrite flag set to false, then an IOException if thrown.
73-
fs.copyFromLocalFile(false, false, localJar.getPath(), destJarFile);
70+
boolean deleteSourceFile = false;
71+
boolean overwriteAnyExistingDestFile = false; // IOException will be thrown if does already exist
72+
fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile, localJar.getPath(), destJarFile);
7473
} catch (IOException | InterruptedException e) {
7574
log.warn("Path:" + destJarFile + " is not copied successfully. Will require retry.");
7675
retryCount += 1;

gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ private void addLibJars(Path srcLibJarDir, Optional<Map<String, LocalResource>>
740740
}
741741

742742
for (FileStatus libJarFile : libJarFiles) {
743-
Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, libJarFile, unsharedDir, destCacheDir);
743+
Path destFilePath = HdfsJarUploadUtils.calculateDestJarFilePath(this.fs, libJarFile.getPath().getName(), unsharedDir, destCacheDir);
744744
if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) {
745745
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get());
746746
} else {
@@ -753,7 +753,7 @@ private void addAppJars(String jarFilePathList, Optional<Map<String, LocalResour
753753
for (String jarFilePath : SPLITTER.split(jarFilePathList)) {
754754
Path srcFilePath = new Path(jarFilePath);
755755
FileStatus localJar = localFs.getFileStatus(srcFilePath);
756-
Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs, localJar, unsharedDir, destCacheDir);
756+
Path destFilePath = HdfsJarUploadUtils.calculateDestJarFilePath(this.fs, localJar.getPath().getName(), unsharedDir, destCacheDir);
757757
if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) {
758758
if (resourceMap.isPresent()) {
759759
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get());

0 commit comments

Comments
 (0)