Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2135] Cache Gobblin YARN application jars #4030

Merged
merged 10 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.SerializationUtils;
import org.apache.gobblin.util.filesystem.HdfsJarUploadUtils;
import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
/**
* An implementation of {@link JobLauncher} that launches a Gobblin job as a Hadoop MR job.
Expand Down Expand Up @@ -154,8 +155,6 @@ public class MRJobLauncher extends AbstractJobLauncher {
// Configuration that make uploading of jar files more reliable,
// since multiple Gobblin Jobs are sharing the same jar directory.
private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;

public static final String MR_TYPE_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "mr.type";
public static final String MAPPER_TASK_NUM_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.num";
public static final String MAPPER_TASK_ATTEMPT_NUM_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.attempt.num";
Expand Down Expand Up @@ -572,56 +571,22 @@ private void addJars(Path jarFileDir, String jarFileList, Configuration conf) th
for (String jarFile : SPLITTER.split(jarFileList)) {
Path srcJarFile = new Path(jarFile);
FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);

for (FileStatus status : fileStatusList) {
Path destJarFile = HdfsJarUploadUtils.calculateDestJarFilePath(fs, status.getPath().getName(), this.unsharedJarsDir, jarFileDir);
// For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence
// or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory.
// the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging.
int retryCount = 0;
boolean shouldFileBeAddedIntoDC = true;
Path destJarFile = calculateDestJarFile(status, jarFileDir);
// Adding destJarFile into HDFS until it exists and the size of file on targetPath matches the one on local path.
while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
try {
if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
throw new IOException("Waiting for file to complete on uploading ... ");
}
// Set the first parameter as false for not deleting sourceFile
// Set the second parameter as false for not overwriting existing file on the target, by default it is true.
// If the file is preExisted but overwrite flag set to false, then an IOException if thrown.
this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile);
} catch (IOException | InterruptedException e) {
LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry.");
retryCount += 1;
if (retryCount >= this.jarFileMaximumRetry) {
LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e);
// If retry reaches upper limit, skip copying this file.
shouldFileBeAddedIntoDC = false;
break;
}
}
}
if (shouldFileBeAddedIntoDC) {
if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status, this.jarFileMaximumRetry, destJarFile)) {
// Then add the jar file on HDFS to the classpath
LOG.info(String.format("Adding %s to classpath", destJarFile));
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
} else {
LOG.error("Failed to upload jar file: " + status.getPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find the prior code throwing an error...

nonetheless, should everything continue on w/ just some error logs?

shouldn't we instead fail the overall job because presumably necessary jars won't be there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly we should throw an error, but I think given that another job could be uploading the same jars though it might be better to let the job attempt to try and run, if that job fails it should be emitting the failed event anyways.

}
}
}
}

/**
* Calculate the target filePath of the jar file to be copied on HDFS,
* given the {@link FileStatus} of a jarFile and the path of directory that contains jar.
*/
private Path calculateDestJarFile(FileStatus status, Path jarFileDir) {
// SNAPSHOT jars should not be shared, as different jobs may be using different versions of it
Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? this.unsharedJarsDir : jarFileDir;
// DistributedCache requires absolute path, so we need to use makeQualified.
return new Path(this.fs.makeQualified(baseDir), status.getPath().getName());
}

/**
* Add local non-jar files the job depends on to DistributedCache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ class YarnService extends AbstractIdleService {

private volatile boolean shutdownInProgress = false;

private final boolean jarCacheEnabled;

public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
this.applicationName = applicationName;
Expand Down Expand Up @@ -270,6 +272,7 @@ public YarnService(Config config, String applicationName, String applicationId,
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -484,12 +487,30 @@ private void requestContainer(Optional<String> preferredNode, Resource resource)
protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo)
throws IOException {
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId);
// Used for -SNAPSHOT versions of jars
Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
phet marked this conversation as resolved.
Show resolved Hide resolved
Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
Path containerJarsCachedDir = new Path(jarCacheDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
LOGGER.info("Container execution-private jars root dir: " + containerJarsUnsharedDir);
Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);

Map<String, LocalResource> resourceMap = Maps.newHashMap();

Map<String, LocalResource> resourceMap = Maps.newHashMap();
// Always fetch any jars from the appWorkDir for any potential snapshot jars
addContainerLocalResources(new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
addContainerLocalResources(new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap);
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
addContainerLocalResources(new Path(containerJarsUnsharedDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
resourceMap);
}
if (this.jarCacheEnabled) {
addContainerLocalResources(new Path(jarCacheDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
addContainerLocalResources(new Path(containerJarsCachedDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
resourceMap);
}
}

addContainerLocalResources(
new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME), resourceMap);

Expand Down Expand Up @@ -579,8 +600,6 @@ protected String buildContainerCommand(Container container, String helixParticip
containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
.append(" ").append(helixInstanceTag);
}

LOGGER.info("Building " + containerProcessName);
return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
Expand Down Expand Up @@ -797,7 +816,6 @@ public void onContainersAllocated(List<Container> containers) {
public void run() {
try {
LOGGER.info("Starting container " + containerId);

nmClientAsync.startContainerAsync(container, newContainerLaunchContext(containerInfo));
} catch (IOException ioe) {
LOGGER.error("Failed to start container " + containerId, ioe);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.util.filesystem;

import java.io.IOException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import lombok.extern.slf4j.Slf4j;


/**
* Utility class for uploading jar files to HDFS with retries to handle concurrency
*/
@Slf4j
public class HdfsJarUploadUtils {

private static final long WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS = 3000;

/**
* Calculate the target filePath of the jar file to be copied on HDFS,
* given the {@link FileStatus} of a jarFile and the path of directory that contains jar.
* Snapshot dirs should not be shared, as different jobs may be using different versions of it.
* @param fs
* @param jarName
* @param unsharedJarsDir
* @param jarCacheDir
* @return
* @throws IOException
*/
public static Path calculateDestJarFilePath(FileSystem fs, String jarName, Path unsharedJarsDir, Path jarCacheDir) throws IOException {
Path uploadDir = jarName.contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir;
Path destJarFile = new Path(fs.makeQualified(uploadDir), jarName);
return destJarFile;
}
/**
* Upload a jar file to HDFS with retries to handle already existing jars
* @param fs
* @param localJar
* @param destJarFile
* @param maxAttempts
* @return
* @throws IOException
*/
public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int maxAttempts, Path destJarFile) throws IOException {
int retryCount = 0;
while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) {
try {
if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) {
Thread.sleep(WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS);
throw new IOException("Waiting for file to complete on uploading ... ");
}
boolean deleteSourceFile = false;
boolean overwriteAnyExistingDestFile = false; // IOException will be thrown if does already exist
fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile, localJar.getPath(), destJarFile);
} catch (IOException | InterruptedException e) {
log.warn("Path:" + destJarFile + " is not copied successfully. Will require retry.");
retryCount += 1;
if (retryCount >= maxAttempts) {
log.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e);
// If retry reaches upper limit, skip copying this file.
return false;
}
}
}
return true;
}
}
Loading
Loading