diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java index 7b01dc1a6b..b5e7f09740 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -114,6 +114,7 @@ import org.apache.gobblin.util.JobLauncherUtils; import org.apache.gobblin.util.ParallelRunner; import org.apache.gobblin.util.SerializationUtils; +import org.apache.gobblin.util.filesystem.HdfsJarUploadUtils; import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils; /** * An implementation of {@link JobLauncher} that launches a Gobblin job as a Hadoop MR job. @@ -154,8 +155,6 @@ public class MRJobLauncher extends AbstractJobLauncher { // Configuration that make uploading of jar files more reliable, // since multiple Gobblin Jobs are sharing the same jar directory. private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5; - private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000; - public static final String MR_TYPE_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "mr.type"; public static final String MAPPER_TASK_NUM_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.num"; public static final String MAPPER_TASK_ATTEMPT_NUM_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.attempt.num"; @@ -572,56 +571,22 @@ private void addJars(Path jarFileDir, String jarFileList, Configuration conf) th for (String jarFile : SPLITTER.split(jarFileList)) { Path srcJarFile = new Path(jarFile); FileStatus[] fileStatusList = lfs.globStatus(srcJarFile); - for (FileStatus status : fileStatusList) { + Path destJarFile = HdfsJarUploadUtils.calculateDestJarFilePath(fs, status.getPath().getName(), this.unsharedJarsDir, jarFileDir); // For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence // or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory. // the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging. - int retryCount = 0; - boolean shouldFileBeAddedIntoDC = true; - Path destJarFile = calculateDestJarFile(status, jarFileDir); - // Adding destJarFile into HDFS until it exists and the size of file on targetPath matches the one on local path. - while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) { - try { - if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) { - Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD); - throw new IOException("Waiting for file to complete on uploading ... "); - } - // Set the first parameter as false for not deleting sourceFile - // Set the second parameter as false for not overwriting existing file on the target, by default it is true. - // If the file is preExisted but overwrite flag set to false, then an IOException if thrown. - this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile); - } catch (IOException | InterruptedException e) { - LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry."); - retryCount += 1; - if (retryCount >= this.jarFileMaximumRetry) { - LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e); - // If retry reaches upper limit, skip copying this file. - shouldFileBeAddedIntoDC = false; - break; - } - } - } - if (shouldFileBeAddedIntoDC) { + if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status, this.jarFileMaximumRetry, destJarFile)) { // Then add the jar file on HDFS to the classpath LOG.info(String.format("Adding %s to classpath", destJarFile)); DistributedCache.addFileToClassPath(destJarFile, conf, this.fs); + } else { + LOG.error("Failed to upload jar file: " + status.getPath()); } } } } - /** - * Calculate the target filePath of the jar file to be copied on HDFS, - * given the {@link FileStatus} of a jarFile and the path of directory that contains jar. - */ - private Path calculateDestJarFile(FileStatus status, Path jarFileDir) { - // SNAPSHOT jars should not be shared, as different jobs may be using different versions of it - Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? this.unsharedJarsDir : jarFileDir; - // DistributedCache requires absolute path, so we need to use makeQualified. - return new Path(this.fs.makeQualified(baseDir), status.getPath().getName()); - } - /** * Add local non-jar files the job depends on to DistributedCache. */ diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index b124063bcd..426521b921 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -197,6 +197,8 @@ class YarnService extends AbstractIdleService { private volatile boolean shutdownInProgress = false; + private final boolean jarCacheEnabled; + public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { this.applicationName = applicationName; @@ -270,6 +272,7 @@ public YarnService(Config config, String applicationName, String applicationId, GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL); this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); + this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); } @SuppressWarnings("unused") @@ -484,12 +487,30 @@ private void requestContainer(Optional preferredNode, Resource resource) protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId); + // Used for -SNAPSHOT versions of jars + Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); + Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir; + Path containerJarsCachedDir = new Path(jarCacheDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); + LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir); + LOGGER.info("Container execution-private jars root dir: " + containerJarsUnsharedDir); Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); - Map resourceMap = Maps.newHashMap(); + Map resourceMap = Maps.newHashMap(); + // Always fetch any jars from the appWorkDir for any potential snapshot jars addContainerLocalResources(new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap); - addContainerLocalResources(new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap); + if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) { + addContainerLocalResources(new Path(containerJarsUnsharedDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), + resourceMap); + } + if (this.jarCacheEnabled) { + addContainerLocalResources(new Path(jarCacheDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap); + if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) { + addContainerLocalResources(new Path(containerJarsCachedDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), + resourceMap); + } + } + addContainerLocalResources( new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME), resourceMap); @@ -579,8 +600,6 @@ protected String buildContainerCommand(Container container, String helixParticip containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME) .append(" ").append(helixInstanceTag); } - - LOGGER.info("Building " + containerProcessName); return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( containerProcessName).append(".").append(ApplicationConstants.STDOUT) .append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( @@ -797,7 +816,6 @@ public void onContainersAllocated(List containers) { public void run() { try { LOGGER.info("Starting container " + containerId); - nmClientAsync.startContainerAsync(container, newContainerLaunchContext(containerInfo)); } catch (IOException ioe) { LOGGER.error("Failed to start container " + containerId, ioe); diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java new file mode 100644 index 0000000000..31128a1cf1 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.filesystem; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Utility class for uploading jar files to HDFS with retries to handle concurrency + */ +@Slf4j +public class HdfsJarUploadUtils { + + private static final long WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS = 3000; + + /** + * Calculate the target filePath of the jar file to be copied on HDFS, + * given the {@link FileStatus} of a jarFile and the path of directory that contains jar. + * Snapshot dirs should not be shared, as different jobs may be using different versions of it. + * @param fs + * @param jarName + * @param unsharedJarsDir + * @param jarCacheDir + * @return + * @throws IOException + */ + public static Path calculateDestJarFilePath(FileSystem fs, String jarName, Path unsharedJarsDir, Path jarCacheDir) throws IOException { + Path uploadDir = jarName.contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir; + Path destJarFile = new Path(fs.makeQualified(uploadDir), jarName); + return destJarFile; + } + /** + * Upload a jar file to HDFS with retries to handle already existing jars + * @param fs + * @param localJar + * @param destJarFile + * @param maxAttempts + * @return + * @throws IOException + */ + public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int maxAttempts, Path destJarFile) throws IOException { + int retryCount = 0; + while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) { + try { + if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) { + Thread.sleep(WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS); + throw new IOException("Waiting for file to complete on uploading ... "); + } + boolean deleteSourceFile = false; + boolean overwriteAnyExistingDestFile = false; // IOException will be thrown if does already exist + fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile, localJar.getPath(), destJarFile); + } catch (IOException | InterruptedException e) { + log.warn("Path:" + destJarFile + " is not copied successfully. Will require retry."); + retryCount += 1; + if (retryCount >= maxAttempts) { + log.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e); + // If retry reaches upper limit, skip copying this file. + return false; + } + } + } + return true; + } +} diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index c7dd6e85c1..4ce40f21df 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -108,6 +108,7 @@ import org.apache.gobblin.util.EmailUtils; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.JvmUtils; +import org.apache.gobblin.util.filesystem.HdfsJarUploadUtils; import org.apache.gobblin.util.hadoop.TokenUtils; import org.apache.gobblin.util.io.StreamUtils; import org.apache.gobblin.util.logs.LogCopier; @@ -162,7 +163,7 @@ public class GobblinYarnAppLauncher { public static final String GOBBLIN_YARN_CONFIG_OUTPUT_PATH = "gobblin.yarn.configOutputPath"; //Configuration key to signal the GobblinYarnAppLauncher mode - public static final String GOBBLIN_YARN_APP_LAUNCHER_MODE = "gobblin.yarn.appLauncherMode"; + public static final String GOBBLIN_YARN_APP_LAUNCHER_MODE = "gobblin.yarn.appLauncherMode"; public static final String DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE = ""; public static final String AZKABAN_APP_LAUNCHER_MODE_KEY = "azkaban"; @@ -238,9 +239,13 @@ public class GobblinYarnAppLauncher { protected final String originalYarnRMAddress; protected final Map potentialYarnClients = new HashMap<>(); private YarnClient yarnClient; + private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5; + + private final boolean jarCacheEnabled; public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration) throws IOException { - this.config = config; + this.config = config.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, + ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); this.applicationName = config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY); this.appQueueName = config.getString(GobblinYarnConfigurationKeys.APP_QUEUE_KEY); @@ -254,7 +259,6 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration this.fs = GobblinClusterUtils.buildFileSystem(config, this.yarnConfiguration); this.closer.register(this.fs); - boolean isHelixEnabled = ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.HELIX_ENABLED, GobblinYarnConfigurationKeys.DEFAULT_HELIX_ENABLED); this.helixClusterLifecycleManager = isHelixEnabled @@ -300,10 +304,10 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); this.detachOnExitEnabled = ConfigUtils - .getBoolean(config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED, + .getBoolean(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_DETACH_ON_EXIT); - this.appLauncherMode = ConfigUtils.getString(config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE); - + this.appLauncherMode = ConfigUtils.getString(this.config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE); + this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); try { config = addDynamicConfig(config); @@ -334,7 +338,7 @@ public void launch() throws IOException, YarnException, InterruptedException { this.eventBus.register(this); //Before connect with yarn client, we need to login to get the token - if(ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) { + if (ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) { this.tokenRefreshManager = Optional.of(buildTokenRefreshManager()); this.tokenRefreshManager.get().loginAndScheduleTokenRenewal(); } @@ -370,7 +374,7 @@ public boolean isApplicationRunning() { return this.applicationId.isPresent() && !this.applicationCompleted && !this.detachOnExitEnabled; } - private void addServices() throws IOException{ + private void addServices() throws IOException { List services = Lists.newArrayList(); if (this.tokenRefreshManager.isPresent()) { LOGGER.info("Adding KeyManagerService since key management is enabled"); @@ -396,7 +400,7 @@ private void addServices() throws IOException{ LOGGER.warn("NOT starting the admin UI because the job execution info server is NOT enabled"); } - if (services.size() > 0 ) { + if (services.size() > 0) { this.serviceManager = Optional.of(new ServiceManager(services)); this.serviceManager.get().startAsync(); } else { @@ -428,7 +432,6 @@ public synchronized void stop() throws IOException, TimeoutException { if (!this.detachOnExitEnabled) { LOGGER.info("Disabling all live Helix instances.."); } - } finally { try { if (this.applicationId.isPresent() && !this.detachOnExitEnabled) { @@ -561,7 +564,7 @@ Optional getReconnectableApplicationId() throws YarnException, IO /** * Setup and submit the Gobblin Yarn application. - * + * Retain at least the current and last month's jars in the cache (if configured) to handle executions running for ~30 days max * @throws IOException if there's anything wrong setting up and submitting the Yarn application * @throws YarnException if there's anything wrong setting up and submitting the Yarn application */ @@ -573,7 +576,7 @@ ApplicationId setupAndSubmitApplication() throws IOException, YarnException, Int appSubmissionContext.setApplicationType(GOBBLIN_YARN_APPLICATION_TYPE); appSubmissionContext.setMaxAppAttempts(ConfigUtils.getInt(config, GobblinYarnConfigurationKeys.APP_MASTER_MAX_ATTEMPTS_KEY, GobblinYarnConfigurationKeys.DEFAULT_APP_MASTER_MAX_ATTEMPTS_KEY)); ApplicationId applicationId = appSubmissionContext.getApplicationId(); - LOGGER.info("created new yarn application: "+ applicationId.getId()); + LOGGER.info("created new yarn application: " + applicationId.getId()); GetNewApplicationResponse newApplicationResponse = gobblinYarnApp.getNewApplicationResponse(); // Set up resource type requirements for ApplicationMaster @@ -587,6 +590,15 @@ ApplicationId setupAndSubmitApplication() throws IOException, YarnException, Int amContainerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration)); amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(), resource.getMemory()))); + if (this.jarCacheEnabled) { + Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(this.config); + // Retain at least the current and last month's jars to handle executions running for ~30 days max + boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs); + if (!cleanedSuccessfully) { + LOGGER.warn("Failed to delete older jar cache directories"); + } + } + Map acls = new HashMap<>(1); acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl); amContainerLaunchContext.setApplicationACLs(acls); @@ -640,31 +652,36 @@ private Resource prepareContainerResource(GetNewApplicationResponse newApplicati private Map addAppMasterLocalResources(ApplicationId applicationId) throws IOException { Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString()); + Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir; Path appMasterWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME); - LOGGER.info("Configured GobblinApplicationMaster work directory to: {}", appMasterWorkDir.toString()); + Path appMasterJarsCacheDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME); + LOGGER.info("Configured GobblinApplicationMaster work directory to: {}", appMasterWorkDir); + LOGGER.info("Configured GobblinApplicationMaster jars directory to: {}", appMasterJarsCacheDir); Map appMasterResources = Maps.newHashMap(); FileSystem localFs = FileSystem.getLocal(new Configuration()); - // NOTE: log after each step below for insight into what takes bulk of time if (this.config.hasPath(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)) { - Path libJarsDestDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME); + // Lib jars are shared between all containers, store at the root level + Path libJarsDestDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME); + Path unsharedJarsDestDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME); addLibJars(new Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)), - Optional.of(appMasterResources), libJarsDestDir, localFs); - LOGGER.info("Added lib jars to directory: {}", libJarsDestDir.toString()); + Optional.of(appMasterResources), libJarsDestDir, unsharedJarsDestDir, localFs); + LOGGER.info("Added lib jars to directory: {} and execution-private directory: {}", libJarsDestDir, unsharedJarsDestDir); } if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY)) { - Path appJarsDestDir = new Path(appMasterWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME); + Path appJarsDestDir = new Path(appMasterJarsCacheDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME); + Path unsharedJarsDestDir = new Path(appMasterWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME); addAppJars(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY), - Optional.of(appMasterResources), appJarsDestDir, localFs); - LOGGER.info("Added app jars to directory: {}", appJarsDestDir.toString()); + Optional.of(appMasterResources), appJarsDestDir, unsharedJarsDestDir, localFs); + LOGGER.info("Added app jars to directory: {} and execution-private directory: {}", appJarsDestDir, unsharedJarsDestDir); } if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY)) { - Path appFilesDestDir = new Path(appMasterWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME); + Path appFilesDestDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME); addAppLocalFiles(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY), Optional.of(appMasterResources), appFilesDestDir, localFs); - LOGGER.info("Added app local files to directory: {}", appFilesDestDir.toString()); + LOGGER.info("Added app local files to directory: {}", appFilesDestDir); } if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY)) { YarnHelixUtils.addRemoteFilesToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY), @@ -677,27 +694,30 @@ private Map addAppMasterLocalResources(ApplicationId appl LOGGER.info("Added remote zips to local resources"); } if (this.config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) { - Path appFilesDestDir = new Path(appMasterWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME); + Path appFilesDestDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME); addJobConfPackage(this.config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY), appFilesDestDir, appMasterResources); - LOGGER.info("Added job conf package to directory: {}", appFilesDestDir.toString()); + LOGGER.info("Added job conf package to directory: {}", appFilesDestDir); } return appMasterResources; } private void addContainerLocalResources(ApplicationId applicationId) throws IOException { - Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString()); - + Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, + applicationId.toString()); + Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir; Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); - LOGGER.info("Configured Container work directory to: {}", containerWorkDir.toString()); - + Path containerJarsRootDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME); + LOGGER.info("Configured Container work directory to: {}", containerWorkDir); + LOGGER.info("Configured Container jars directory to: {}", containerJarsRootDir); FileSystem localFs = FileSystem.getLocal(new Configuration()); if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) { - Path appJarsDestDir = new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME); + Path appJarsDestDir = new Path(containerJarsRootDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME); + Path unsharedJarsDestDir = new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME); addAppJars(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY), - Optional.>absent(), appJarsDestDir, localFs); + Optional.>absent(), appJarsDestDir, unsharedJarsDestDir, localFs); } if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) { Path appFilesDestDir = new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME); @@ -706,7 +726,7 @@ private void addContainerLocalResources(ApplicationId applicationId) throws IOEx } } - private void addLibJars(Path srcLibJarDir, Optional> resourceMap, Path destDir, + private void addLibJars(Path srcLibJarDir, Optional> resourceMap, Path destCacheDir, Path unsharedDir, FileSystem localFs) throws IOException { // Missing classpath-jars will be a fatal error. @@ -720,26 +740,26 @@ private void addLibJars(Path srcLibJarDir, Optional> } for (FileStatus libJarFile : libJarFiles) { - Path destFilePath = new Path(destDir, libJarFile.getPath().getName()); - this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath); - if (resourceMap.isPresent()) { + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFilePath(this.fs, libJarFile.getPath().getName(), unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent()) { YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } else { + LOGGER.warn("Failed to upload jar file {} to HDFS", libJarFile.getPath()); } } } - - private void addAppJars(String jarFilePathList, Optional> resourceMap, - Path destDir, FileSystem localFs) throws IOException { + private void addAppJars(String jarFilePathList, Optional> resourceMap, Path destCacheDir, Path unsharedDir, + FileSystem localFs) throws IOException { for (String jarFilePath : SPLITTER.split(jarFilePathList)) { Path srcFilePath = new Path(jarFilePath); - Path destFilePath = new Path(destDir, srcFilePath.getName()); - if (localFs.exists(srcFilePath)) { - this.fs.copyFromLocalFile(srcFilePath, destFilePath); + FileStatus localJar = localFs.getFileStatus(srcFilePath); + Path destFilePath = HdfsJarUploadUtils.calculateDestJarFilePath(this.fs, localJar.getPath().getName(), unsharedDir, destCacheDir); + if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar, MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) { + if (resourceMap.isPresent()) { + YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } } else { - LOGGER.warn("The src destination " + srcFilePath + " doesn't exists"); - } - if (resourceMap.isPresent()) { - YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath); } } } @@ -751,7 +771,11 @@ private void addAppLocalFiles(String localFilePathList, Optional token: credentials.getAllTokens()) { + for (Token token : credentials.getAllTokens()) { if (token.getKind().equals(new Text("RM_DELEGATION_TOKEN")) && !token.getService().equals(new Text(this.originalYarnRMAddress))) { continue; } diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java index 1a5bc96c86..93cc23dba6 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java @@ -49,6 +49,15 @@ public class GobblinYarnConfigurationKeys { public static final String YARN_RESOURCE_MANAGER_IDS = YARN_RESOURCE_MANAGER_PREFIX + "ids"; public static final String OTHER_YARN_RESOURCE_MANAGER_ADDRESSES= "other.yarn.resourcemanager.addresses"; + public static final String JAR_CACHE_ENABLED = GOBBLIN_YARN_PREFIX + "jar.cache.enabled"; + + public static final boolean JAR_CACHE_ENABLED_DEFAULT = false; + + public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.dir"; + + // Used to store the start time of the app launcher to propagate to workers and appmaster + public static final String YARN_APPLICATION_LAUNCHER_START_TIME_KEY = GOBBLIN_YARN_PREFIX + "application.start.time"; + // Gobblin Yarn ApplicationMaster configuration properties. public static final String APP_MASTER_MEMORY_MBS_KEY = GOBBLIN_YARN_PREFIX + "app.master.memory.mbs"; public static final String APP_MASTER_CORES_KEY = GOBBLIN_YARN_PREFIX + "app.master.cores"; diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java index 72f9cc3363..4a74da44a5 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java @@ -20,10 +20,14 @@ import java.io.File; import java.io.IOException; import java.net.URL; +import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -199,6 +203,40 @@ public static void setYarnClassPath(Config config, Configuration yarnConfigurati } } + /** + * Calculate the path of a jar cache on HDFS, which is retained on a monthly basis. + * Should be used in conjunction with {@link #retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a periodic basis + * @param config + * @return + * @throws IOException + */ + public static Path calculatePerMonthJarCachePath(Config config) throws IOException { + Path jarsCacheDirMonthly = new Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)); + String monthSuffix = new SimpleDateFormat("yyyy-MM").format(config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY)); + return new Path(jarsCacheDirMonthly, monthSuffix); + + } + + /** + * Retain the latest k jar cache paths that are children of the parent cache path. + * @param parentCachePath + * @param k the number of latest jar cache paths to retain + * @param fs + * @return + * @throws IllegalAccessException + * @throws IOException + */ + public static boolean retainKLatestJarCachePaths(Path parentCachePath, int k, FileSystem fs) throws IOException { + // Cleanup old cache if necessary + List jarDirs = + Arrays.stream(fs.exists(parentCachePath) ? fs.listStatus(parentCachePath) : new FileStatus[0]).sorted().collect(Collectors.toList()); + boolean deletesSuccessful = true; + for (int i = 0; i < jarDirs.size() - k; i++) { + deletesSuccessful &= fs.delete(jarDirs.get(i).getPath(), true); + } + return deletesSuccessful; + } + public static void addRemoteFilesToLocalResources(String hdfsFileList, Map resourceMap, Configuration yarnConfiguration) throws IOException { for (String hdfsFilePath : SPLITTER.split(hdfsFileList)) { Path srcFilePath = new Path(hdfsFilePath); diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java index 033a24aa90..c271258c1d 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnHelixUtilsTest.java @@ -18,14 +18,22 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.junit.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.Test; +import com.google.common.io.Files; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + public class YarnHelixUtilsTest { /** @@ -33,6 +41,14 @@ public class YarnHelixUtilsTest { * added to the resources folder. * @throws IOException */ + String tempDir = Files.createTempDir().getPath(); + + @AfterClass + public void tearDown() throws IOException{ + FileSystem fs = FileSystem.getLocal(new Configuration()); + fs.delete(new Path(this.tempDir), true); + } + @Test public void testUpdateToken() throws IOException { @@ -51,4 +67,36 @@ public void testUpdateToken() Token readToken = credentials.getToken(new Text("testService")); Assert.assertNotNull(readToken); } + + @Test + public void testGetJarCachePath() throws IOException { + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(1726074000013L)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef("/tmp")); + Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config); + + Assert.assertEquals(jarCachePath, new Path("/tmp/2024-09")); + } + + @Test + public void retainLatestKJarCachePaths() throws IOException { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Config config = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(1726074000013L)) + .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR, ConfigValueFactory.fromAnyRef(this.tempDir + "/tmp")); + Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(config); + fs.mkdirs(jarCachePath); + fs.mkdirs(new Path(jarCachePath.getParent(), "2024-08")); + fs.mkdirs(new Path(jarCachePath.getParent(), "2024-07")); + fs.mkdirs(new Path(jarCachePath.getParent(), "2024-06")); + + YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, fs); + + Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-09"))); + Assert.assertTrue(fs.exists(new Path(this.tempDir, "tmp/2024-08"))); + // Should be cleaned up + Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-07"))); + Assert.assertFalse(fs.exists(new Path(this.tempDir, "tmp/2024-06"))); + + } } \ No newline at end of file