-
Notifications
You must be signed in to change notification settings - Fork 744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2135] Cache Gobblin YARN application jars #4030
base: master
Are you sure you want to change the base?
Conversation
…epeatedly upload jars and files to hdfs
while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) { | ||
try { | ||
if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) { | ||
Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This key seems unused now in this class. And might as well fix the typo in HdfsJarUploadUtils
// Then add the jar file on HDFS to the classpath | ||
LOG.info(String.format("Adding %s to classpath", destJarFile)); | ||
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs); | ||
} else { | ||
LOG.error("Failed to upload jar file: " + status.getPath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't find the prior code throwing an error...
nonetheless, should everything continue on w/ just some error logs?
shouldn't we instead fail the overall job because presumably necessary jars won't be there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly we should throw an error, but I think given that another job could be uploading the same jars though it might be better to let the job attempt to try and run, if that job fails it should be emitting the failed event anyways.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
Show resolved
Hide resolved
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
Outdated
Show resolved
Hide resolved
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
Outdated
Show resolved
Hide resolved
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java
Outdated
Show resolved
Hide resolved
* @return | ||
* @throws IOException | ||
*/ | ||
public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int jarFileMaximumRetry, Path destJarFile) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jarFileMaximumRetry
=> simply maxAttempts
?
} | ||
if (this.jarCacheEnabled) { | ||
addContainerLocalResources(new Path(jarCacheDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap); | ||
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand this key. you're checking it here in two different conditionals, but in neither one do you actually use (or even check to see) what value it holds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops should have used gobblin.yarn.container.jars
key
this.appLauncherMode = ConfigUtils.getString(config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE); | ||
|
||
this.appLauncherMode = ConfigUtils.getString(this.config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE); | ||
this.jarCacheEnabled = ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NBD, but you just updated the two above to be this.config
, but only use config
here :)
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
Outdated
Show resolved
Hide resolved
} | ||
if (resourceMap.isPresent()) { | ||
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); | ||
LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, should this be an actual failure, not merely logging?
...or do we believe there are times when it's actually OK to continue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the concern is valid but currently trying to get parity with MR job launcher, I think since there could always be concurrent executions adding the jars instead so it can be worthwhile to just attempt the job, it will fail loudly if the jars weren't uploaded properly anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that's fine to start. how about a
// TODO: decide whether to fail-fast here, given the job may be unable to run w/o it
if (this.jarCacheEnabled) { | ||
Path jarCachePath = YarnHelixUtils.calculateJarCachePath(this.config); | ||
// Retain at least the current and last month's jars to handle executions running for ~30 days max | ||
boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this run before or after caching jars? e.g. do we save only two prior AND THEN potentially add one more or we've added any new one already prior to retention paring it down to two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It runs after caching the jars. But it uses a consistent YARN_APPLICATION_LAUNCHER_START_TIME_KEY
in the job so no matter how many times we look at the cache path it's only creating one path at most, and that path would be the ones where the jars are being uploaded.
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
Outdated
Show resolved
Hide resolved
if (jarDirs.size() > k) { | ||
return fs.delete(jarDirs.get(0).getPath(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is not a loop, it seems it would delete at most one dir even if there are more than 1 more than k. is that ok? if so, document in javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to use a loop, for consistency with naming convention.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4030 +/- ##
============================================
+ Coverage 45.86% 55.31% +9.45%
+ Complexity 3257 1582 -1675
============================================
Files 707 307 -400
Lines 27865 10580 -17285
Branches 2796 1069 -1727
============================================
- Hits 12779 5852 -6927
+ Misses 14008 4223 -9785
+ Partials 1078 505 -573 ☔ View full report in Codecov by Sentry. |
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Gobblin YARN Application Launcher lacks some functionality used in MRJobLauncher. One of the biggest gaps in feature parity is the absence of jar caching, where MRJobLauncher creates a monthly cache that is automatically cleaned up by subsequent executions performed 2 months in advance.
YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15 mins for a sizeable job to get all the jars), and given that many jobs do share the same jars, it makes sense to cache them together and only provide YARN the shared path.
We also want to ensure that SNAPSHOT jars are other files are not uploaded to a cache, since they are not immutable unlike jar versions on Artifactory.
This PR implements jar caching through 2 configurations:
Where if
gobblin.yarn.jar.cache.enabled=true
, then it will look for the directory defined ingobblin.yarn.jar.cache.dir
. It is expected that snapshot jars and other files are stored in some directory that is unique to the execution so that those jars will not be shared across other concurrent executions, only jars stored in the jar cache will be.Tests
It is tested that this saves approximately 10 minutes of bootstrap time per job.
Commits