-
Notifications
You must be signed in to change notification settings - Fork 750
[GOBBLIN-2135] Cache Gobblin YARN application jars #4030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7b5706e
cca5c0a
1743bb3
ee790d1
9a2f8fb
f290629
614e4bc
72eb403
88fdddc
3aa8a08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,6 +114,7 @@ | |
import org.apache.gobblin.util.JobLauncherUtils; | ||
import org.apache.gobblin.util.ParallelRunner; | ||
import org.apache.gobblin.util.SerializationUtils; | ||
import org.apache.gobblin.util.filesystem.HdfsJarUploadUtils; | ||
import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils; | ||
/** | ||
* An implementation of {@link JobLauncher} that launches a Gobblin job as a Hadoop MR job. | ||
|
@@ -154,8 +155,6 @@ public class MRJobLauncher extends AbstractJobLauncher { | |
// Configuration that make uploading of jar files more reliable, | ||
// since multiple Gobblin Jobs are sharing the same jar directory. | ||
private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5; | ||
private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000; | ||
|
||
public static final String MR_TYPE_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "mr.type"; | ||
public static final String MAPPER_TASK_NUM_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.num"; | ||
public static final String MAPPER_TASK_ATTEMPT_NUM_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.attempt.num"; | ||
|
@@ -572,56 +571,22 @@ private void addJars(Path jarFileDir, String jarFileList, Configuration conf) th | |
for (String jarFile : SPLITTER.split(jarFileList)) { | ||
Path srcJarFile = new Path(jarFile); | ||
FileStatus[] fileStatusList = lfs.globStatus(srcJarFile); | ||
|
||
for (FileStatus status : fileStatusList) { | ||
Path destJarFile = HdfsJarUploadUtils.calculateDestJarFilePath(fs, status.getPath().getName(), this.unsharedJarsDir, jarFileDir); | ||
// For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence | ||
// or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory. | ||
// the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging. | ||
int retryCount = 0; | ||
boolean shouldFileBeAddedIntoDC = true; | ||
Path destJarFile = calculateDestJarFile(status, jarFileDir); | ||
// Adding destJarFile into HDFS until it exists and the size of file on targetPath matches the one on local path. | ||
while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) { | ||
try { | ||
if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) { | ||
Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD); | ||
throw new IOException("Waiting for file to complete on uploading ... "); | ||
} | ||
// Set the first parameter as false for not deleting sourceFile | ||
// Set the second parameter as false for not overwriting existing file on the target, by default it is true. | ||
// If the file is preExisted but overwrite flag set to false, then an IOException if thrown. | ||
this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile); | ||
} catch (IOException | InterruptedException e) { | ||
LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry."); | ||
retryCount += 1; | ||
if (retryCount >= this.jarFileMaximumRetry) { | ||
LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e); | ||
// If retry reaches upper limit, skip copying this file. | ||
shouldFileBeAddedIntoDC = false; | ||
break; | ||
} | ||
} | ||
} | ||
if (shouldFileBeAddedIntoDC) { | ||
if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status, this.jarFileMaximumRetry, destJarFile)) { | ||
// Then add the jar file on HDFS to the classpath | ||
LOG.info(String.format("Adding %s to classpath", destJarFile)); | ||
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs); | ||
} else { | ||
LOG.error("Failed to upload jar file: " + status.getPath()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't find the prior code throwing an error... nonetheless, should everything continue on w/ just some error logs? shouldn't we instead fail the overall job because presumably necessary jars won't be there? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possibly we should throw an error, but I think given that another job could be uploading the same jars though it might be better to let the job attempt to try and run, if that job fails it should be emitting the failed event anyways. |
||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Calculate the target filePath of the jar file to be copied on HDFS, | ||
* given the {@link FileStatus} of a jarFile and the path of directory that contains jar. | ||
*/ | ||
private Path calculateDestJarFile(FileStatus status, Path jarFileDir) { | ||
// SNAPSHOT jars should not be shared, as different jobs may be using different versions of it | ||
Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? this.unsharedJarsDir : jarFileDir; | ||
// DistributedCache requires absolute path, so we need to use makeQualified. | ||
return new Path(this.fs.makeQualified(baseDir), status.getPath().getName()); | ||
} | ||
|
||
/** | ||
* Add local non-jar files the job depends on to DistributedCache. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.gobblin.util.filesystem; | ||
|
||
import java.io.IOException; | ||
|
||
import org.apache.hadoop.fs.FileStatus; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
|
||
/** | ||
* Utility class for uploading jar files to HDFS with retries to handle concurrency | ||
*/ | ||
@Slf4j | ||
public class HdfsJarUploadUtils { | ||
|
||
private static final long WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS = 3000; | ||
|
||
/** | ||
* Calculate the target filePath of the jar file to be copied on HDFS, | ||
* given the {@link FileStatus} of a jarFile and the path of directory that contains jar. | ||
* Snapshot dirs should not be shared, as different jobs may be using different versions of it. | ||
* @param fs | ||
* @param jarName | ||
* @param unsharedJarsDir | ||
* @param jarCacheDir | ||
* @return | ||
* @throws IOException | ||
*/ | ||
public static Path calculateDestJarFilePath(FileSystem fs, String jarName, Path unsharedJarsDir, Path jarCacheDir) throws IOException { | ||
Path uploadDir = jarName.contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir; | ||
Path destJarFile = new Path(fs.makeQualified(uploadDir), jarName); | ||
return destJarFile; | ||
} | ||
/** | ||
* Upload a jar file to HDFS with retries to handle already existing jars | ||
* @param fs | ||
* @param localJar | ||
* @param destJarFile | ||
* @param maxAttempts | ||
* @return | ||
* @throws IOException | ||
*/ | ||
public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int maxAttempts, Path destJarFile) throws IOException { | ||
int retryCount = 0; | ||
while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) { | ||
try { | ||
if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) { | ||
Thread.sleep(WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS); | ||
throw new IOException("Waiting for file to complete on uploading ... "); | ||
} | ||
boolean deleteSourceFile = false; | ||
boolean overwriteAnyExistingDestFile = false; // IOException will be thrown if does already exist | ||
fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile, localJar.getPath(), destJarFile); | ||
} catch (IOException | InterruptedException e) { | ||
log.warn("Path:" + destJarFile + " is not copied successfully. Will require retry."); | ||
retryCount += 1; | ||
if (retryCount >= maxAttempts) { | ||
log.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e); | ||
// If retry reaches upper limit, skip copying this file. | ||
return false; | ||
} | ||
} | ||
} | ||
return true; | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.