Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2135] Cache Gobblin YARN application jars #4030

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

Will-Lo
Copy link
Contributor

@Will-Lo Will-Lo commented Aug 16, 2024

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Gobblin YARN Application Launcher lacks some functionality used in MRJobLauncher. One of the biggest gaps in feature parity is the absence of jar caching, where MRJobLauncher creates a monthly cache that is automatically cleaned up by subsequent executions performed 2 months in advance.

YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15 mins for a sizeable job to get all the jars), and given that many jobs do share the same jars, it makes sense to cache them together and only provide YARN the shared path.

We also want to ensure that SNAPSHOT jars are other files are not uploaded to a cache, since they are not immutable unlike jar versions on Artifactory.

This PR implements jar caching through 2 configurations:

gobblin.yarn.jar.cache.enabled
gobblin.yarn.jar.cache.dir

Where if gobblin.yarn.jar.cache.enabled=true, then it will look for the directory defined in gobblin.yarn.jar.cache.dir. It is expected that snapshot jars and other files are stored in some directory that is unique to the execution so that those jars will not be shared across other concurrent executions, only jars stored in the jar cache will be.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    It is tested that this saves approximately 10 minutes of bootstrap time per job.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@Will-Lo Will-Lo changed the title Cache temporal jars [GOBBLIN-2135] Cache Gobblin YARN application jars Aug 19, 2024
while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
try {
if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This key seems unused now in this class. And might as well fix the typo in HdfsJarUploadUtils

// Then add the jar file on HDFS to the classpath
LOG.info(String.format("Adding %s to classpath", destJarFile));
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
} else {
LOG.error("Failed to upload jar file: " + status.getPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find the prior code throwing an error...

nonetheless, should everything continue on w/ just some error logs?

shouldn't we instead fail the overall job because presumably necessary jars won't be there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly we should throw an error, but I think given that another job could be uploading the same jars though it might be better to let the job attempt to try and run, if that job fails it should be emitting the failed event anyways.

* @return
* @throws IOException
*/
public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int jarFileMaximumRetry, Path destJarFile) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jarFileMaximumRetry => simply maxAttempts?

}
if (this.jarCacheEnabled) {
addContainerLocalResources(new Path(jarCacheDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this key. you're checking it here in two different conditionals, but in neither one do you actually use (or even check to see) what value it holds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops should have used gobblin.yarn.container.jars key

this.appLauncherMode = ConfigUtils.getString(config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);

this.appLauncherMode = ConfigUtils.getString(this.config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
this.jarCacheEnabled = ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NBD, but you just updated the two above to be this.config, but only use config here :)

}
if (resourceMap.isPresent()) {
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get());
LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, should this be an actual failure, not merely logging?

...or do we believe there are times when it's actually OK to continue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the concern is valid but currently trying to get parity with MR job launcher, I think since there could always be concurrent executions adding the jars instead so it can be worthwhile to just attempt the job, it will fail loudly if the jars weren't uploaded properly anyways.

Copy link
Contributor

@phet phet Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's fine to start. how about a

// TODO: decide whether to fail-fast here, given the job may be unable to run w/o it

if (this.jarCacheEnabled) {
Path jarCachePath = YarnHelixUtils.calculateJarCachePath(this.config);
// Retain at least the current and last month's jars to handle executions running for ~30 days max
boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this run before or after caching jars? e.g. do we save only two prior AND THEN potentially add one more or we've added any new one already prior to retention paring it down to two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It runs after caching the jars. But it uses a consistent YARN_APPLICATION_LAUNCHER_START_TIME_KEY in the job so no matter how many times we look at the cache path it's only creating one path at most, and that path would be the ones where the jars are being uploaded.

Comment on lines 232 to 233
if (jarDirs.size() > k) {
return fs.delete(jarDirs.get(0).getPath(), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is not a loop, it seems it would delete at most one dir even if there are more than 1 more than k. is that ok? if so, document in javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to use a loop, for consistency with naming convention.

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 55.31%. Comparing base (e501b62) to head (72eb403).
Report is 23 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4030      +/-   ##
============================================
+ Coverage     45.86%   55.31%   +9.45%     
+ Complexity     3257     1582    -1675     
============================================
  Files           707      307     -400     
  Lines         27865    10580   -17285     
  Branches       2796     1069    -1727     
============================================
- Hits          12779     5852    -6927     
+ Misses        14008     4223    -9785     
+ Partials       1078      505     -573     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants