From bb0f34ba4e3455155548576069d4715284803bac Mon Sep 17 00:00:00 2001 From: pinguo-liguo Date: Fri, 3 Jul 2015 16:52:08 +0800 Subject: [PATCH] 1.add s3a region conf 2.fixed s3 china region HTTP Status Code: 403 3.update aws skd to 1.10.1 --- hadoop-tools/hadoop-aws/pom.xml | 9 +- .../org/apache/hadoop/fs/s3a/Constants.java | 3 + .../apache/hadoop/fs/s3a/S3AFileSystem.java | 1997 ++++++++--------- 3 files changed, 1004 insertions(+), 1005 deletions(-) diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index c37ab85b69f8..31d8e9529cf8 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -96,7 +96,7 @@ false - com.amazonaws:aws-java-sdk:jar:* + com.amazonaws:*:jar:* com.fasterxml.jackson.*:* joda-time:joda-time:jar:* org.apache.httpcomponents:*:jar:* @@ -161,6 +161,7 @@ com.amazonaws aws-java-sdk + 1.10.1 compile @@ -170,6 +171,12 @@ + + joda-time + joda-time + 2.8.1 + + junit junit diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index bbf3c1da83f6..896f9b371a00 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -25,6 +25,9 @@ public class Constants { // s3 secret key public static final String SECRET_KEY = "fs.s3a.secret.key"; + //s3 region key + public static final String REGION_KEY = "fs.s3a.region.key"; + // number of simultaneous connections to s3 public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum"; public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 439ea27eff3b..8570ec73bea8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,1162 +18,1151 @@ package org.apache.hadoop.fs.s3a; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentialsProviderChain; - import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.CannedAccessControlList; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.CopyObjectRequest; -import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.*; import com.amazonaws.services.s3.transfer.Copy; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; import com.amazonaws.services.s3.transfer.Upload; -import com.amazonaws.event.ProgressListener; -import com.amazonaws.event.ProgressEvent; - import org.apache.commons.lang.StringUtils; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; - -import static org.apache.hadoop.fs.s3a.Constants.*; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.fs.s3a.Constants.*; + public class S3AFileSystem extends FileSystem { - /** - * Default blocksize as used in blocksize and FS status queries - */ - public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024; - private URI uri; - private Path workingDir; - private AmazonS3Client s3; - private String bucket; - private int maxKeys; - private long partSize; - private TransferManager transfers; - private ThreadPoolExecutor threadPoolExecutor; - private int multiPartThreshold; - public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); - private CannedAccessControlList cannedACL; - private String serverSideEncryptionAlgorithm; - - // The maximum number of entries that can be deleted in any call to s3 - private static final int MAX_ENTRIES_TO_DELETE = 1000; - - private static final AtomicInteger poolNumber = new AtomicInteger(1); - - // CLOUDERA-BUILD: deprecate access key and secret key introduced in CDH 5.3 - private static final String DEPRECATED_ACCESS_KEY = "fs.s3a.awsAccessKeyId"; - private static final String DEPRECATED_SECRET_KEY = "fs.s3a.awsSecretAccessKey"; - - static { - Configuration.addDeprecation(DEPRECATED_ACCESS_KEY, ACCESS_KEY, - String.format("%s is deprecated, use %s instead.", - DEPRECATED_ACCESS_KEY, ACCESS_KEY)); - Configuration.addDeprecation(DEPRECATED_SECRET_KEY, SECRET_KEY, - String.format("%s is deprecated, use %s instead.", - DEPRECATED_SECRET_KEY, SECRET_KEY)); - } - - /** - * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, - * with a common prefix. - * @param prefix The prefix of every created Thread's name - * @return a {@link java.util.concurrent.ThreadFactory} that names threads - */ - public static ThreadFactory getNamedThreadFactory(final String prefix) { - SecurityManager s = System.getSecurityManager(); - final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() - .getThreadGroup(); - - return new ThreadFactory() { - final AtomicInteger threadNumber = new AtomicInteger(1); - private final int poolNum = poolNumber.getAndIncrement(); - final ThreadGroup group = threadGroup; - - @Override - public Thread newThread(Runnable r) { - final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); - return new Thread(group, r, name); - } - }; - } - - /** - * Get a named {@link ThreadFactory} that just builds daemon threads. - * @param prefix name prefix for all threads created from the factory - * @return a thread factory that creates named, daemon threads with - * the supplied exception handler and normal priority - */ - private static ThreadFactory newDaemonThreadFactory(final String prefix) { - final ThreadFactory namedFactory = getNamedThreadFactory(prefix); - return new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = namedFactory.newThread(r); - if (!t.isDaemon()) { - t.setDaemon(true); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } - - }; - } - - /** Called after a new FileSystem instance is constructed. - * @param name a uri whose authority section names the host, port, etc. - * for this FileSystem - * @param conf the configuration - */ - public void initialize(URI name, Configuration conf) throws IOException { - super.initialize(name, conf); - - uri = URI.create(name.getScheme() + "://" + name.getAuthority()); - workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, - this.getWorkingDirectory()); - - // Try to get our credentials or just connect anonymously - // CLOUDERA-BUILD: deprecated keys are alias of supported keys. - String accessKey = conf.get( - ACCESS_KEY, conf.get(DEPRECATED_ACCESS_KEY, null)); - String secretKey = conf.get( - SECRET_KEY, conf.get(DEPRECATED_SECRET_KEY, null)); - - String userInfo = name.getUserInfo(); - if (userInfo != null) { - int index = userInfo.indexOf(':'); - if (index != -1) { - accessKey = userInfo.substring(0, index); - secretKey = userInfo.substring(index + 1); - } else { - accessKey = userInfo; - } + /** + * Default blocksize as used in blocksize and FS status queries + */ + public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024; + private URI uri; + private Path workingDir; + private AmazonS3Client s3; + private String bucket; + private int maxKeys; + private long partSize; + private TransferManager transfers; + private ThreadPoolExecutor threadPoolExecutor; + private int multiPartThreshold; + public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); + private CannedAccessControlList cannedACL; + private String serverSideEncryptionAlgorithm; + + // The maximum number of entries that can be deleted in any call to s3 + private static final int MAX_ENTRIES_TO_DELETE = 1000; + + private static final AtomicInteger poolNumber = new AtomicInteger(1); + + // CLOUDERA-BUILD: deprecate access key and secret key introduced in CDH 5.3 + private static final String DEPRECATED_ACCESS_KEY = "fs.s3a.awsAccessKeyId"; + private static final String DEPRECATED_SECRET_KEY = "fs.s3a.awsSecretAccessKey"; + + static { + Configuration.addDeprecation(DEPRECATED_ACCESS_KEY, ACCESS_KEY, + String.format("%s is deprecated, use %s instead.", + DEPRECATED_ACCESS_KEY, ACCESS_KEY)); + Configuration.addDeprecation(DEPRECATED_SECRET_KEY, SECRET_KEY, + String.format("%s is deprecated, use %s instead.", + DEPRECATED_SECRET_KEY, SECRET_KEY)); } - AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain( - new BasicAWSCredentialsProvider(accessKey, secretKey), - new InstanceProfileCredentialsProvider(), - new AnonymousAWSCredentialsProvider() - ); - - bucket = name.getHost(); - - ClientConfiguration awsConf = new ClientConfiguration(); - awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, - DEFAULT_MAXIMUM_CONNECTIONS)); - awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS, - DEFAULT_SECURE_CONNECTIONS) ? Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, - DEFAULT_MAX_ERROR_RETRIES)); - awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT, - DEFAULT_ESTABLISH_TIMEOUT)); - awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, - DEFAULT_SOCKET_TIMEOUT)); - - s3 = new AmazonS3Client(credentials, awsConf); - - maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); - partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, - DEFAULT_MIN_MULTIPART_THRESHOLD); - - if (partSize < 5 * 1024 * 1024) { - LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); - partSize = 5 * 1024 * 1024; + /** + * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, + * with a common prefix. + * @param prefix The prefix of every created Thread's name + * @return a {@link java.util.concurrent.ThreadFactory} that names threads + */ + public static ThreadFactory getNamedThreadFactory(final String prefix) { + SecurityManager s = System.getSecurityManager(); + final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() + .getThreadGroup(); + + return new ThreadFactory() { + final AtomicInteger threadNumber = new AtomicInteger(1); + private final int poolNum = poolNumber.getAndIncrement(); + final ThreadGroup group = threadGroup; + + @Override + public Thread newThread(Runnable r) { + final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); + return new Thread(group, r, name); + } + }; } - if (multiPartThreshold < 5 * 1024 * 1024) { - LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); - multiPartThreshold = 5 * 1024 * 1024; - } + /** + * Get a named {@link ThreadFactory} that just builds daemon threads. + * @param prefix name prefix for all threads created from the factory + * @return a thread factory that creates named, daemon threads with + * the supplied exception handler and normal priority + */ + private static ThreadFactory newDaemonThreadFactory(final String prefix) { + final ThreadFactory namedFactory = getNamedThreadFactory(prefix); + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = namedFactory.newThread(r); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } - int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); - int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS); - if (maxThreads == 0) { - maxThreads = Runtime.getRuntime().availableProcessors() * 8; - } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } - long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); - LinkedBlockingQueue workQueue = - new LinkedBlockingQueue<>(maxThreads * - conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS)); - threadPoolExecutor = new ThreadPoolExecutor( - coreThreads, - maxThreads, - keepAliveTime, - TimeUnit.SECONDS, - workQueue, - newDaemonThreadFactory("s3a-transfer-shared-")); - threadPoolExecutor.allowCoreThreadTimeOut(true); - - TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); - transferConfiguration.setMinimumUploadPartSize(partSize); - transferConfiguration.setMultipartUploadThreshold(multiPartThreshold); - - transfers = new TransferManager(s3, threadPoolExecutor); - transfers.setConfiguration(transferConfiguration); - - String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL); - if (!cannedACLName.isEmpty()) { - cannedACL = CannedAccessControlList.valueOf(cannedACLName); - } else { - cannedACL = null; + }; } - if (!s3.doesBucketExist(bucket)) { - throw new IOException("Bucket " + bucket + " does not exist"); - } + /** Called after a new FileSystem instance is constructed. + * @param name a uri whose authority section names the host, port, etc. + * for this FileSystem + * @param conf the configuration + */ + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + + uri = URI.create(name.getScheme() + "://" + name.getAuthority()); + workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, + this.getWorkingDirectory()); + + // Try to get our credentials or just connect anonymously + // CLOUDERA-BUILD: deprecated keys are alias of supported keys. + String accessKey = conf.get( + ACCESS_KEY, conf.get(DEPRECATED_ACCESS_KEY, null)); + String secretKey = conf.get( + SECRET_KEY, conf.get(DEPRECATED_SECRET_KEY, null)); + String regionKey = conf.get(REGION_KEY, null); + + String userInfo = name.getUserInfo(); + if (userInfo != null) { + int index = userInfo.indexOf(':'); + if (index != -1) { + accessKey = userInfo.substring(0, index); + secretKey = userInfo.substring(index + 1); + } else { + accessKey = userInfo; + } + } - boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, - DEFAULT_PURGE_EXISTING_MULTIPART); - long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE, - DEFAULT_PURGE_EXISTING_MULTIPART_AGE); + AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain( + new BasicAWSCredentialsProvider(accessKey, secretKey), + new InstanceProfileCredentialsProvider(), + new AnonymousAWSCredentialsProvider() + ); + + bucket = name.getHost(); + + ClientConfiguration awsConf = new ClientConfiguration(); + awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS)); + awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS, + DEFAULT_SECURE_CONNECTIONS) ? Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES)); + awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT, + DEFAULT_ESTABLISH_TIMEOUT)); + awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT)); + + s3 = new AmazonS3Client(credentials, awsConf); + if (null != regionKey) { + LOG.info("set region : " + regionKey); + com.amazonaws.regions.Region usWest2 = com.amazonaws.regions.Region.getRegion(Regions.fromName(regionKey)); + s3.setRegion(usWest2); + } - if (purgeExistingMultipart) { - Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000); + maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); + partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, + DEFAULT_MIN_MULTIPART_THRESHOLD); - transfers.abortMultipartUploads(bucket, purgeBefore); - } + if (partSize < 5 * 1024 * 1024) { + LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); + partSize = 5 * 1024 * 1024; + } + + if (multiPartThreshold < 5 * 1024 * 1024) { + LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); + multiPartThreshold = 5 * 1024 * 1024; + } + + int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); + int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors() * 8; + } + if (coreThreads == 0) { + coreThreads = Runtime.getRuntime().availableProcessors() * 8; + } + long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); + LinkedBlockingQueue workQueue = + new LinkedBlockingQueue<>(maxThreads * + conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS)); + threadPoolExecutor = new ThreadPoolExecutor( + coreThreads, + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + workQueue, + newDaemonThreadFactory("s3a-transfer-shared-")); + threadPoolExecutor.allowCoreThreadTimeOut(true); + + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMinimumUploadPartSize(partSize); + transferConfiguration.setMultipartUploadThreshold(multiPartThreshold); + + transfers = new TransferManager(s3, threadPoolExecutor); + transfers.setConfiguration(transferConfiguration); + + String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL); + if (!cannedACLName.isEmpty()) { + cannedACL = CannedAccessControlList.valueOf(cannedACLName); + } else { + cannedACL = null; + } - serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); + if (!s3.doesBucketExist(bucket)) { + throw new IOException("Bucket " + bucket + " does not exist"); + } - setConf(conf); - } + boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, + DEFAULT_PURGE_EXISTING_MULTIPART); + long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE, + DEFAULT_PURGE_EXISTING_MULTIPART_AGE); - /** - * Return the protocol scheme for the FileSystem. - * - * @return "s3a" - */ - public String getScheme() { - return "s3a"; - } + if (purgeExistingMultipart) { + Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge * 1000); - /** Returns a URI whose scheme and authority identify this FileSystem.*/ - public URI getUri() { - return uri; - } + transfers.abortMultipartUploads(bucket, purgeBefore); + } + serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); - public S3AFileSystem() { - super(); - } + setConf(conf); + } - /* Turns a path (relative or otherwise) into an S3 key - */ - private String pathToKey(Path path) { - if (!path.isAbsolute()) { - path = new Path(workingDir, path); + /** + * Return the protocol scheme for the FileSystem. + * + * @return "s3a" + */ + public String getScheme() { + return "s3a"; } - if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { - return ""; + /** Returns a URI whose scheme and authority identify this FileSystem.*/ + public URI getUri() { + return uri; } - return path.toUri().getPath().substring(1); - } - private Path keyToPath(String key) { - return new Path("/" + key); - } + public S3AFileSystem() { + super(); + } - /** - * Opens an FSDataInputStream at the indicated Path. - * @param f the file name to open - * @param bufferSize the size of the buffer to be used. - */ - public FSDataInputStream open(Path f, int bufferSize) - throws IOException { + /* Turns a path (relative or otherwise) into an S3 key + */ + private String pathToKey(Path path) { + if (!path.isAbsolute()) { + path = new Path(workingDir, path); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Opening '{}' for reading.", f); - } - final FileStatus fileStatus = getFileStatus(f); - if (fileStatus.isDirectory()) { - throw new FileNotFoundException("Can't open " + f + " because it is a directory"); - } + if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { + return ""; + } - return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), - fileStatus.getLen(), s3, statistics)); - } - - /** - * Create an FSDataOutputStream at the indicated Path with write-progress - * reporting. - * @param f the file name to open - * @param permission - * @param overwrite if a file with this name already exists, then if true, - * the file will be overwritten, and if false an error will be thrown. - * @param bufferSize the size of the buffer to be used. - * @param replication required block replication for the file. - * @param blockSize - * @param progress - * @throws IOException - * @see #setPermission(Path, FsPermission) - */ - public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, - int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - String key = pathToKey(f); - - if (!overwrite && exists(f)) { - throw new FileAlreadyExistsException(f + " already exists"); - } - if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { - return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, - key, progress, statistics, cannedACL, - serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold, - threadPoolExecutor), statistics); + return path.toUri().getPath().substring(1); } - // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file - return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this, - bucket, key, progress, cannedACL, statistics, - serverSideEncryptionAlgorithm), null); - } - - /** - * Append to an existing file (optional operation). - * @param f the existing file to be appended. - * @param bufferSize the size of the buffer to be used. - * @param progress for reporting progress if it is not null. - * @throws IOException - */ - public FSDataOutputStream append(Path f, int bufferSize, - Progressable progress) throws IOException { - throw new IOException("Not supported"); - } - - - /** - * Renames Path src to Path dst. Can take place on local fs - * or remote DFS. - * - * Warning: S3 does not support renames. This method does a copy which can - * take S3 some time to execute with large files and directories. Since - * there is no Progressable passed in, this can time out jobs. - * - * Note: This implementation differs with other S3 drivers. Specifically: - * Fails if src is a file and dst is a directory. - * Fails if src is a directory and dst is a file. - * Fails if the parent of dst does not exist or is a file. - * Fails if dst is a directory that is not empty. - * - * @param src path to be renamed - * @param dst new path after rename - * @throws IOException on failure - * @return true if rename is successful - */ - public boolean rename(Path src, Path dst) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Rename path {} to {}", src, dst); + + private Path keyToPath(String key) { + return new Path("/" + key); } - String srcKey = pathToKey(src); - String dstKey = pathToKey(dst); + /** + * Opens an FSDataInputStream at the indicated Path. + * @param f the file name to open + * @param bufferSize the size of the buffer to be used. + */ + public FSDataInputStream open(Path f, int bufferSize) + throws IOException { - if (srcKey.isEmpty() || dstKey.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: src or dst are empty"); - } - return false; + if (LOG.isDebugEnabled()) { + LOG.debug("Opening '{}' for reading.", f); + } + final FileStatus fileStatus = getFileStatus(f); + if (fileStatus.isDirectory()) { + throw new FileNotFoundException("Can't open " + f + " because it is a directory"); + } + + return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), + fileStatus.getLen(), s3, statistics)); } - S3AFileStatus srcStatus; - try { - srcStatus = getFileStatus(src); - } catch (FileNotFoundException e) { - LOG.error("rename: src not found {}", src); - return false; + /** + * Create an FSDataOutputStream at the indicated Path with write-progress + * reporting. + * @param f the file name to open + * @param permission + * @param overwrite if a file with this name already exists, then if true, + * the file will be overwritten, and if false an error will be thrown. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @throws IOException + * @see #setPermission(Path, FsPermission) + */ + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, + int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + String key = pathToKey(f); + + if (!overwrite && exists(f)) { + throw new FileAlreadyExistsException(f + " already exists"); + } + if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { + return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, + key, progress, statistics, cannedACL, + serverSideEncryptionAlgorithm, partSize, (long) multiPartThreshold, + threadPoolExecutor), statistics); + } + // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file + return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this, + bucket, key, progress, cannedACL, statistics, + serverSideEncryptionAlgorithm), null); } - if (srcKey.equals(dstKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: src and dst refer to the same file or directory"); - } - return srcStatus.isFile(); + /** + * Append to an existing file (optional operation). + * @param f the existing file to be appended. + * @param bufferSize the size of the buffer to be used. + * @param progress for reporting progress if it is not null. + * @throws IOException + */ + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + throw new IOException("Not supported"); } - S3AFileStatus dstStatus = null; - try { - dstStatus = getFileStatus(dst); - if (srcStatus.isDirectory() && dstStatus.isFile()) { + /** + * Renames Path src to Path dst. Can take place on local fs + * or remote DFS. + * + * Warning: S3 does not support renames. This method does a copy which can + * take S3 some time to execute with large files and directories. Since + * there is no Progressable passed in, this can time out jobs. + * + * Note: This implementation differs with other S3 drivers. Specifically: + * Fails if src is a file and dst is a directory. + * Fails if src is a directory and dst is a file. + * Fails if the parent of dst does not exist or is a file. + * Fails if dst is a directory that is not empty. + * + * @param src path to be renamed + * @param dst new path after rename + * @throws IOException on failure + * @return true if rename is successful + */ + public boolean rename(Path src, Path dst) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("rename: src is a directory and dst is a file"); + LOG.debug("Rename path {} to {}", src, dst); } - return false; - } - - if (dstStatus.isDirectory() && !dstStatus.isEmptyDirectory()) { - return false; - } - } catch (FileNotFoundException e) { - // Parent must exist - Path parent = dst.getParent(); - if (!pathToKey(parent).isEmpty()) { - try { - S3AFileStatus dstParentStatus = getFileStatus(dst.getParent()); - if (!dstParentStatus.isDirectory()) { + + String srcKey = pathToKey(src); + String dstKey = pathToKey(dst); + + if (srcKey.isEmpty() || dstKey.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("rename: src or dst are empty"); + } return false; - } - } catch (FileNotFoundException e2) { - return false; } - } - } - // Ok! Time to start - if (srcStatus.isFile()) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: renaming file " + src + " to " + dst); - } - if (dstStatus != null && dstStatus.isDirectory()) { - String newDstKey = dstKey; - if (!newDstKey.endsWith("/")) { - newDstKey = newDstKey + "/"; + S3AFileStatus srcStatus; + try { + srcStatus = getFileStatus(src); + } catch (FileNotFoundException e) { + LOG.error("rename: src not found {}", src); + return false; } - String filename = - srcKey.substring(pathToKey(src.getParent()).length()+1); - newDstKey = newDstKey + filename; - copyFile(srcKey, newDstKey); - } else { - copyFile(srcKey, dstKey); - } - delete(src, false); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: renaming directory " + src + " to " + dst); - } - - // This is a directory to directory copy - if (!dstKey.endsWith("/")) { - dstKey = dstKey + "/"; - } - - if (!srcKey.endsWith("/")) { - srcKey = srcKey + "/"; - } - - //Verify dest is not a child of the source directory - if (dstKey.startsWith(srcKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("cannot rename a directory to a subdirectory of self"); + + if (srcKey.equals(dstKey)) { + if (LOG.isDebugEnabled()) { + LOG.debug("rename: src and dst refer to the same file or directory"); + } + return srcStatus.isFile(); } - return false; - } - - List keysToDelete = - new ArrayList<>(); - if (dstStatus != null && dstStatus.isEmptyDirectory()) { - // delete unnecessary fake directory. - keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); - } - - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(srcKey); - request.setMaxKeys(maxKeys); - - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); - - while (true) { - for (S3ObjectSummary summary : objects.getObjectSummaries()) { - keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); - String newDstKey = dstKey + summary.getKey().substring(srcKey.length()); - copyFile(summary.getKey(), newDstKey); - - if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { - DeleteObjectsRequest deleteRequest = - new DeleteObjectsRequest(bucket).withKeys(keysToDelete); - s3.deleteObjects(deleteRequest); - statistics.incrementWriteOps(1); - keysToDelete.clear(); - } + + S3AFileStatus dstStatus = null; + try { + dstStatus = getFileStatus(dst); + + if (srcStatus.isDirectory() && dstStatus.isFile()) { + if (LOG.isDebugEnabled()) { + LOG.debug("rename: src is a directory and dst is a file"); + } + return false; + } + + if (dstStatus.isDirectory() && !dstStatus.isEmptyDirectory()) { + return false; + } + } catch (FileNotFoundException e) { + // Parent must exist + Path parent = dst.getParent(); + if (!pathToKey(parent).isEmpty()) { + try { + S3AFileStatus dstParentStatus = getFileStatus(dst.getParent()); + if (!dstParentStatus.isDirectory()) { + return false; + } + } catch (FileNotFoundException e2) { + return false; + } + } } - if (objects.isTruncated()) { - objects = s3.listNextBatchOfObjects(objects); - statistics.incrementReadOps(1); + // Ok! Time to start + if (srcStatus.isFile()) { + if (LOG.isDebugEnabled()) { + LOG.debug("rename: renaming file " + src + " to " + dst); + } + if (dstStatus != null && dstStatus.isDirectory()) { + String newDstKey = dstKey; + if (!newDstKey.endsWith("/")) { + newDstKey = newDstKey + "/"; + } + String filename = + srcKey.substring(pathToKey(src.getParent()).length() + 1); + newDstKey = newDstKey + filename; + copyFile(srcKey, newDstKey); + } else { + copyFile(srcKey, dstKey); + } + delete(src, false); } else { - if (keysToDelete.size() > 0) { - DeleteObjectsRequest deleteRequest = - new DeleteObjectsRequest(bucket).withKeys(keysToDelete); - s3.deleteObjects(deleteRequest); - statistics.incrementWriteOps(1); - } - break; - } - } - } + if (LOG.isDebugEnabled()) { + LOG.debug("rename: renaming directory " + src + " to " + dst); + } - if (src.getParent() != dst.getParent()) { - deleteUnnecessaryFakeDirectories(dst.getParent()); - createFakeDirectoryIfNecessary(src.getParent()); - } - return true; - } - - /** Delete a file. - * - * @param f the path to delete. - * @param recursive if path is a directory and set to - * true, the directory is deleted else throws an exception. In - * case of a file the recursive can be set to either true or false. - * @return true if delete is successful else false. - * @throws IOException - */ - public boolean delete(Path f, boolean recursive) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Delete path " + f + " - recursive " + recursive); - } - S3AFileStatus status; - try { - status = getFileStatus(f); - } catch (FileNotFoundException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Couldn't delete " + f + " - does not exist"); - } - return false; - } + // This is a directory to directory copy + if (!dstKey.endsWith("/")) { + dstKey = dstKey + "/"; + } - String key = pathToKey(f); + if (!srcKey.endsWith("/")) { + srcKey = srcKey + "/"; + } - if (status.isDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("delete: Path is a directory"); - } + //Verify dest is not a child of the source directory + if (dstKey.startsWith(srcKey)) { + if (LOG.isDebugEnabled()) { + LOG.debug("cannot rename a directory to a subdirectory of self"); + } + return false; + } - if (!recursive && !status.isEmptyDirectory()) { - throw new IOException("Path is a folder: " + f + - " and it is not an empty directory"); - } + List keysToDelete = + new ArrayList<>(); + if (dstStatus != null && dstStatus.isEmptyDirectory()) { + // delete unnecessary fake directory. + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); + } - if (!key.endsWith("/")) { - key = key + "/"; - } + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(srcKey); + request.setMaxKeys(maxKeys); - if (key.equals("/")) { - LOG.info("s3a cannot delete the root directory"); - return false; - } + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); - if (status.isEmptyDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting fake empty directory"); + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); + String newDstKey = dstKey + summary.getKey().substring(srcKey.length()); + copyFile(summary.getKey(), newDstKey); + + if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { + DeleteObjectsRequest deleteRequest = + new DeleteObjectsRequest(bucket).withKeys(keysToDelete); + s3.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); + keysToDelete.clear(); + } + } + + if (objects.isTruncated()) { + objects = s3.listNextBatchOfObjects(objects); + statistics.incrementReadOps(1); + } else { + if (keysToDelete.size() > 0) { + DeleteObjectsRequest deleteRequest = + new DeleteObjectsRequest(bucket).withKeys(keysToDelete); + s3.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); + } + break; + } + } } - s3.deleteObject(bucket, key); - statistics.incrementWriteOps(1); - } else { + + if (src.getParent() != dst.getParent()) { + deleteUnnecessaryFakeDirectories(dst.getParent()); + createFakeDirectoryIfNecessary(src.getParent()); + } + return true; + } + + /** Delete a file. + * + * @param f the path to delete. + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. In + * case of a file the recursive can be set to either true or false. + * @return true if delete is successful else false. + * @throws IOException + */ + public boolean delete(Path f, boolean recursive) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Getting objects for directory prefix " + key + " to delete"); + LOG.debug("Delete path " + f + " - recursive " + recursive); + } + S3AFileStatus status; + try { + status = getFileStatus(f); + } catch (FileNotFoundException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Couldn't delete " + f + " - does not exist"); + } + return false; } - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(key); - // Hopefully not setting a delimiter will cause this to find everything - //request.setDelimiter("/"); - request.setMaxKeys(maxKeys); - - List keys = - new ArrayList<>(); - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); - while (true) { - for (S3ObjectSummary summary : objects.getObjectSummaries()) { - keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); + String key = pathToKey(f); + + if (status.isDirectory()) { if (LOG.isDebugEnabled()) { - LOG.debug("Got object to delete " + summary.getKey()); + LOG.debug("delete: Path is a directory"); } - if (keys.size() == MAX_ENTRIES_TO_DELETE) { - DeleteObjectsRequest deleteRequest = - new DeleteObjectsRequest(bucket).withKeys(keys); - s3.deleteObjects(deleteRequest); - statistics.incrementWriteOps(1); - keys.clear(); + if (!recursive && !status.isEmptyDirectory()) { + throw new IOException("Path is a folder: " + f + + " and it is not an empty directory"); } - } - if (objects.isTruncated()) { - objects = s3.listNextBatchOfObjects(objects); - statistics.incrementReadOps(1); - } else { - if (!keys.isEmpty()) { - DeleteObjectsRequest deleteRequest = - new DeleteObjectsRequest(bucket).withKeys(keys); - s3.deleteObjects(deleteRequest); - statistics.incrementWriteOps(1); + if (!key.endsWith("/")) { + key = key + "/"; } - break; - } - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("delete: Path is a file"); - } - s3.deleteObject(bucket, key); - statistics.incrementWriteOps(1); - } - createFakeDirectoryIfNecessary(f.getParent()); + if (key.equals("/")) { + LOG.info("s3a cannot delete the root directory"); + return false; + } + + if (status.isEmptyDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting fake empty directory"); + } + s3.deleteObject(bucket, key); + statistics.incrementWriteOps(1); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Getting objects for directory prefix " + key + " to delete"); + } + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(key); + // Hopefully not setting a delimiter will cause this to find everything + //request.setDelimiter("/"); + request.setMaxKeys(maxKeys); + + List keys = + new ArrayList<>(); + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); + if (LOG.isDebugEnabled()) { + LOG.debug("Got object to delete " + summary.getKey()); + } + + if (keys.size() == MAX_ENTRIES_TO_DELETE) { + DeleteObjectsRequest deleteRequest = + new DeleteObjectsRequest(bucket).withKeys(keys); + s3.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); + keys.clear(); + } + } + + if (objects.isTruncated()) { + objects = s3.listNextBatchOfObjects(objects); + statistics.incrementReadOps(1); + } else { + if (!keys.isEmpty()) { + DeleteObjectsRequest deleteRequest = + new DeleteObjectsRequest(bucket).withKeys(keys); + s3.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); + } + break; + } + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("delete: Path is a file"); + } + s3.deleteObject(bucket, key); + statistics.incrementWriteOps(1); + } - return true; - } + createFakeDirectoryIfNecessary(f.getParent()); - private void createFakeDirectoryIfNecessary(Path f) throws IOException { - String key = pathToKey(f); - if (!key.isEmpty() && !exists(f)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new fake directory at " + f); - } - createFakeDirectory(bucket, key); - } - } - - /** - * List the statuses of the files/directories in the given path if the path is - * a directory. - * - * @param f given path - * @return the statuses of the files/directories in the given patch - * @throws FileNotFoundException when the path does not exist; - * IOException see specific implementation - */ - public FileStatus[] listStatus(Path f) throws FileNotFoundException, - IOException { - String key = pathToKey(f); - if (LOG.isDebugEnabled()) { - LOG.debug("List status for path: " + f); + return true; } - final List result = new ArrayList(); - final FileStatus fileStatus = getFileStatus(f); + private void createFakeDirectoryIfNecessary(Path f) throws IOException { + String key = pathToKey(f); + if (!key.isEmpty() && !exists(f)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating new fake directory at " + f); + } + createFakeDirectory(bucket, key); + } + } - if (fileStatus.isDirectory()) { - if (!key.isEmpty()) { - key = key + "/"; - } + /** + * List the statuses of the files/directories in the given path if the path is + * a directory. + * + * @param f given path + * @return the statuses of the files/directories in the given patch + * @throws FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ + public FileStatus[] listStatus(Path f) throws FileNotFoundException, + IOException { + String key = pathToKey(f); + if (LOG.isDebugEnabled()) { + LOG.debug("List status for path: " + f); + } - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(key); - request.setDelimiter("/"); - request.setMaxKeys(maxKeys); + final List result = new ArrayList(); + final FileStatus fileStatus = getFileStatus(f); - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: doing listObjects for directory " + key); - } + if (fileStatus.isDirectory()) { + if (!key.isEmpty()) { + key = key + "/"; + } - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(key); + request.setDelimiter("/"); + request.setMaxKeys(maxKeys); - while (true) { - for (S3ObjectSummary summary : objects.getObjectSummaries()) { - Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir); - // Skip over keys that are ourselves and old S3N _$folder$ files - if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring: " + keyPath); + LOG.debug("listStatus: doing listObjects for directory " + key); } - continue; - } - if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { - result.add(new S3AFileStatus(true, true, keyPath)); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: fd: " + keyPath); + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir); + // Skip over keys that are ourselves and old S3N _$folder$ files + if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring: " + keyPath); + } + continue; + } + + if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { + result.add(new S3AFileStatus(true, true, keyPath)); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: fd: " + keyPath); + } + } else { + result.add(new S3AFileStatus(summary.getSize(), + dateToLong(summary.getLastModified()), keyPath, + getDefaultBlockSize(f.makeQualified(uri, workingDir)))); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: fi: " + keyPath); + } + } + } + + for (String prefix : objects.getCommonPrefixes()) { + Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); + if (keyPath.equals(f)) { + continue; + } + result.add(new S3AFileStatus(true, false, keyPath)); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd: " + keyPath); + } + } + + if (objects.isTruncated()) { + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: list truncated - getting next batch"); + } + + objects = s3.listNextBatchOfObjects(objects); + statistics.incrementReadOps(1); + } else { + break; + } } - } else { - result.add(new S3AFileStatus(summary.getSize(), - dateToLong(summary.getLastModified()), keyPath, - getDefaultBlockSize(f.makeQualified(uri, workingDir)))); + } else { if (LOG.isDebugEnabled()) { - LOG.debug("Adding: fi: " + keyPath); + LOG.debug("Adding: rd (not a dir): " + f); } - } + result.add(fileStatus); } - for (String prefix : objects.getCommonPrefixes()) { - Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); - if (keyPath.equals(f)) { - continue; - } - result.add(new S3AFileStatus(true, false, keyPath)); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd: " + keyPath); - } - } + return result.toArray(new FileStatus[result.size()]); + } - if (objects.isTruncated()) { - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: list truncated - getting next batch"); - } - objects = s3.listNextBatchOfObjects(objects); - statistics.incrementReadOps(1); - } else { - break; - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd (not a dir): " + f); - } - result.add(fileStatus); + /** + * Set the current working directory for the given file system. All relative + * paths will be resolved relative to it. + * + * @param new_dir + */ + public void setWorkingDirectory(Path new_dir) { + workingDir = new_dir; } - return result.toArray(new FileStatus[result.size()]); - } - - - - /** - * Set the current working directory for the given file system. All relative - * paths will be resolved relative to it. - * - * @param new_dir - */ - public void setWorkingDirectory(Path new_dir) { - workingDir = new_dir; - } - - /** - * Get the current working directory for the given file system - * @return the directory pathname - */ - public Path getWorkingDirectory() { - return workingDir; - } - - /** - * Make the given file and all non-existent parents into - * directories. Has the semantics of Unix 'mkdir -p'. - * Existence of the directory hierarchy is not an error. - * @param f path to create - * @param permission to apply to f - */ - // TODO: If we have created an empty file at /foo/bar and we then call - // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? - public boolean mkdirs(Path f, FsPermission permission) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Making directory: " + f); + /** + * Get the current working directory for the given file system + * @return the directory pathname + */ + public Path getWorkingDirectory() { + return workingDir; } + /** + * Make the given file and all non-existent parents into + * directories. Has the semantics of Unix 'mkdir -p'. + * Existence of the directory hierarchy is not an error. + * @param f path to create + * @param permission to apply to f + */ + // TODO: If we have created an empty file at /foo/bar and we then call + // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Making directory: " + f); + } - try { - FileStatus fileStatus = getFileStatus(f); - if (fileStatus.isDirectory()) { - return true; - } else { - throw new FileAlreadyExistsException("Path is a file: " + f); - } - } catch (FileNotFoundException e) { - Path fPart = f; - do { try { - FileStatus fileStatus = getFileStatus(fPart); - if (fileStatus.isFile()) { - throw new FileAlreadyExistsException(String.format( - "Can't make directory for path '%s' since it is a file.", - fPart)); - } - } catch (FileNotFoundException fnfe) { - } - fPart = fPart.getParent(); - } while (fPart != null); + FileStatus fileStatus = getFileStatus(f); - String key = pathToKey(f); - createFakeDirectory(bucket, key); - return true; - } - } - - /** - * Return a file status object that represents the path. - * @param f The path we want information from - * @return a FileStatus object - * @throws java.io.FileNotFoundException when the path does not exist; - * IOException see specific implementation - */ - public S3AFileStatus getFileStatus(Path f) throws IOException { - String key = pathToKey(f); - if (LOG.isDebugEnabled()) { - LOG.debug("Getting path status for " + f + " (" + key + ")"); + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + f); + } + } catch (FileNotFoundException e) { + Path fPart = f; + do { + try { + FileStatus fileStatus = getFileStatus(fPart); + if (fileStatus.isFile()) { + throw new FileAlreadyExistsException(String.format( + "Can't make directory for path '%s' since it is a file.", + fPart)); + } + } catch (FileNotFoundException fnfe) { + } + fPart = fPart.getParent(); + } while (fPart != null); + + String key = pathToKey(f); + createFakeDirectory(bucket, key); + return true; + } } + /** + * Return a file status object that represents the path. + * @param f The path we want information from + * @return a FileStatus object + * @throws java.io.FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ + public S3AFileStatus getFileStatus(Path f) throws IOException { + String key = pathToKey(f); + if (LOG.isDebugEnabled()) { + LOG.debug("Getting path status for " + f + " (" + key + ")"); + } - if (!key.isEmpty()) { - try { - ObjectMetadata meta = s3.getObjectMetadata(bucket, key); - statistics.incrementReadOps(1); - if (objectRepresentsDirectory(key, meta.getContentLength())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Found exact file: fake directory"); - } - return new S3AFileStatus(true, true, - f.makeQualified(uri, workingDir)); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Found exact file: normal file"); - } - return new S3AFileStatus(meta.getContentLength(), - dateToLong(meta.getLastModified()), - f.makeQualified(uri, workingDir), - getDefaultBlockSize(f.makeQualified(uri, workingDir))); - } - } catch (AmazonServiceException e) { - if (e.getStatusCode() != 404) { - printAmazonServiceException(e); - throw e; + if (!key.isEmpty()) { + try { + ObjectMetadata meta = s3.getObjectMetadata(bucket, key); + statistics.incrementReadOps(1); + + if (objectRepresentsDirectory(key, meta.getContentLength())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found exact file: fake directory"); + } + return new S3AFileStatus(true, true, + f.makeQualified(uri, workingDir)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Found exact file: normal file"); + } + return new S3AFileStatus(meta.getContentLength(), + dateToLong(meta.getLastModified()), + f.makeQualified(uri, workingDir), + getDefaultBlockSize(f.makeQualified(uri, workingDir))); + } + } catch (AmazonServiceException e) { + if (e.getStatusCode() != 404) { + printAmazonServiceException(e); + throw e; + } + } catch (AmazonClientException e) { + printAmazonClientException(e); + throw e; + } + + // Necessary? + if (!key.endsWith("/")) { + try { + String newKey = key + "/"; + ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey); + statistics.incrementReadOps(1); + + if (objectRepresentsDirectory(newKey, meta.getContentLength())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found file (with /): fake directory"); + } + return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); + } else { + LOG.warn("Found file (with /): real file? should not happen: {}", key); + + return new S3AFileStatus(meta.getContentLength(), + dateToLong(meta.getLastModified()), + f.makeQualified(uri, workingDir), + getDefaultBlockSize(f.makeQualified(uri, workingDir))); + } + } catch (AmazonServiceException e) { + if (e.getStatusCode() != 404) { + printAmazonServiceException(e); + throw e; + } + } catch (AmazonClientException e) { + printAmazonClientException(e); + throw e; + } + } } - } catch (AmazonClientException e) { - printAmazonClientException(e); - throw e; - } - // Necessary? - if (!key.endsWith("/")) { try { - String newKey = key + "/"; - ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey); - statistics.incrementReadOps(1); + if (!key.isEmpty() && !key.endsWith("/")) { + key = key + "/"; + } + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(key); + request.setDelimiter("/"); + request.setMaxKeys(1); - if (objectRepresentsDirectory(newKey, meta.getContentLength())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Found file (with /): fake directory"); + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + + if (!objects.getCommonPrefixes().isEmpty() + || objects.getObjectSummaries().size() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found path as directory (with /): " + + objects.getCommonPrefixes().size() + "/" + + objects.getObjectSummaries().size()); + + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize()); + } + for (String prefix : objects.getCommonPrefixes()) { + LOG.debug("Prefix: " + prefix); + } + } + + return new S3AFileStatus(true, false, + f.makeQualified(uri, workingDir)); } - return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); - } else { - LOG.warn("Found file (with /): real file? should not happen: {}", key); - - return new S3AFileStatus(meta.getContentLength(), - dateToLong(meta.getLastModified()), - f.makeQualified(uri, workingDir), - getDefaultBlockSize(f.makeQualified(uri, workingDir))); - } } catch (AmazonServiceException e) { - if (e.getStatusCode() != 404) { - printAmazonServiceException(e); - throw e; - } + if (e.getStatusCode() != 404) { + printAmazonServiceException(e); + throw e; + } } catch (AmazonClientException e) { - printAmazonClientException(e); - throw e; + printAmazonClientException(e); + throw e; } - } - } - try { - if (!key.isEmpty() && !key.endsWith("/")) { - key = key + "/"; - } - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(key); - request.setDelimiter("/"); - request.setMaxKeys(1); - - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); - - if (!objects.getCommonPrefixes().isEmpty() - || objects.getObjectSummaries().size() > 0) { if (LOG.isDebugEnabled()) { - LOG.debug("Found path as directory (with /): " + - objects.getCommonPrefixes().size() + "/" + - objects.getObjectSummaries().size()); - - for (S3ObjectSummary summary : objects.getObjectSummaries()) { - LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize()); - } - for (String prefix : objects.getCommonPrefixes()) { - LOG.debug("Prefix: " + prefix); - } + LOG.debug("Not Found: " + f); } - - return new S3AFileStatus(true, false, - f.makeQualified(uri, workingDir)); - } - } catch (AmazonServiceException e) { - if (e.getStatusCode() != 404) { - printAmazonServiceException(e); - throw e; - } - } catch (AmazonClientException e) { - printAmazonClientException(e); - throw e; + throw new FileNotFoundException("No such file or directory: " + f); } - if (LOG.isDebugEnabled()) { - LOG.debug("Not Found: " + f); - } - throw new FileNotFoundException("No such file or directory: " + f); - } - - /** - * The src file is on the local disk. Add it to FS at - * the given dst name. - * - * This version doesn't need to create a temporary file to calculate the md5. - * Sadly this doesn't seem to be used by the shell cp :( - * - * delSrc indicates if the source should be removed - * @param delSrc whether to delete the src - * @param overwrite whether to overwrite an existing file - * @param src path - * @param dst path - */ - @Override - public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, - Path dst) throws IOException { - String key = pathToKey(dst); - - if (!overwrite && exists(dst)) { - throw new IOException(dst + " already exists"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Copying local file from " + src + " to " + dst); - } + /** + * The src file is on the local disk. Add it to FS at + * the given dst name. + * + * This version doesn't need to create a temporary file to calculate the md5. + * Sadly this doesn't seem to be used by the shell cp :( + * + * delSrc indicates if the source should be removed + * @param delSrc whether to delete the src + * @param overwrite whether to overwrite an existing file + * @param src path + * @param dst path + */ + @Override + public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, + Path dst) throws IOException { + String key = pathToKey(dst); + + if (!overwrite && exists(dst)) { + throw new IOException(dst + " already exists"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Copying local file from " + src + " to " + dst); + } - // Since we have a local file, we don't need to stream into a temporary file - LocalFileSystem local = getLocal(getConf()); - File srcfile = local.pathToFile(src); + // Since we have a local file, we don't need to stream into a temporary file + LocalFileSystem local = getLocal(getConf()); + File srcfile = local.pathToFile(src); - final ObjectMetadata om = new ObjectMetadata(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); - putObjectRequest.setCannedAcl(cannedACL); - putObjectRequest.setMetadata(om); - - ProgressListener progressListener = new ProgressListener() { - public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventCode()) { - case ProgressEvent.PART_COMPLETED_EVENT_CODE: + final ObjectMetadata om = new ObjectMetadata(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); + putObjectRequest.setCannedAcl(cannedACL); + putObjectRequest.setMetadata(om); + + ProgressListener progressListener = new ProgressListener() { + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventCode()) { + case ProgressEvent.PART_COMPLETED_EVENT_CODE: + statistics.incrementWriteOps(1); + break; + } + } + }; + + Upload up = transfers.upload(putObjectRequest); + up.addProgressListener(progressListener); + try { + up.waitForUploadResult(); statistics.incrementWriteOps(1); - break; + } catch (InterruptedException e) { + throw new IOException("Got interrupted, cancelling"); } - } - }; - - Upload up = transfers.upload(putObjectRequest); - up.addProgressListener(progressListener); - try { - up.waitForUploadResult(); - statistics.incrementWriteOps(1); - } catch (InterruptedException e) { - throw new IOException("Got interrupted, cancelling"); - } - // This will delete unnecessary fake parent directories - finishedWrite(key); + // This will delete unnecessary fake parent directories + finishedWrite(key); - if (delSrc) { - local.delete(src, false); - } - } - - @Override - public void close() throws IOException { - try { - super.close(); - } finally { - if (transfers != null) { - transfers.shutdownNow(true); - transfers = null; - } + if (delSrc) { + local.delete(src, false); + } } - } - - /** - * Override getCononicalServiceName because we don't support token in S3A - */ - @Override - public String getCanonicalServiceName() { - // Does not support Token - return null; - } - - private void copyFile(String srcKey, String dstKey) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("copyFile " + srcKey + " -> " + dstKey); + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + if (transfers != null) { + transfers.shutdownNow(true); + transfers = null; + } + } } - ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); - final ObjectMetadata dstom = srcom.clone(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - dstom.setServerSideEncryption(serverSideEncryptionAlgorithm); + /** + * Override getCononicalServiceName because we don't support token in S3A + */ + @Override + public String getCanonicalServiceName() { + // Does not support Token + return null; } - CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); - copyObjectRequest.setCannedAccessControlList(cannedACL); - copyObjectRequest.setNewObjectMetadata(dstom); - - ProgressListener progressListener = new ProgressListener() { - public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventCode()) { - case ProgressEvent.PART_COMPLETED_EVENT_CODE: + + private void copyFile(String srcKey, String dstKey) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("copyFile " + srcKey + " -> " + dstKey); + } + + ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); + final ObjectMetadata dstom = srcom.clone(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + dstom.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + copyObjectRequest.setCannedAccessControlList(cannedACL); + copyObjectRequest.setNewObjectMetadata(dstom); + + ProgressListener progressListener = new ProgressListener() { + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventCode()) { + case ProgressEvent.PART_COMPLETED_EVENT_CODE: + statistics.incrementWriteOps(1); + break; + } + } + }; + + Copy copy = transfers.copy(copyObjectRequest); + copy.addProgressListener(progressListener); + try { + copy.waitForCopyResult(); statistics.incrementWriteOps(1); - break; + } catch (InterruptedException e) { + throw new IOException("Got interrupted, cancelling"); } - } - }; - - Copy copy = transfers.copy(copyObjectRequest); - copy.addProgressListener(progressListener); - try { - copy.waitForCopyResult(); - statistics.incrementWriteOps(1); - } catch (InterruptedException e) { - throw new IOException("Got interrupted, cancelling"); } - } - private boolean objectRepresentsDirectory(final String name, final long size) { - return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L; - } + private boolean objectRepresentsDirectory(final String name, final long size) { + return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L; + } + + // Handles null Dates that can be returned by AWS + private static long dateToLong(final Date date) { + if (date == null) { + return 0L; + } + + return date.getTime(); + } - // Handles null Dates that can be returned by AWS - private static long dateToLong(final Date date) { - if (date == null) { - return 0L; + public void finishedWrite(String key) throws IOException { + deleteUnnecessaryFakeDirectories(keyToPath(key).getParent()); } - return date.getTime(); - } + private void deleteUnnecessaryFakeDirectories(Path f) throws IOException { + while (true) { + try { + String key = pathToKey(f); + if (key.isEmpty()) { + break; + } + + S3AFileStatus status = getFileStatus(f); + + if (status.isDirectory() && status.isEmptyDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting fake directory " + key + "/"); + } + s3.deleteObject(bucket, key + "/"); + statistics.incrementWriteOps(1); + } + } catch (FileNotFoundException | AmazonServiceException e) { + } - public void finishedWrite(String key) throws IOException { - deleteUnnecessaryFakeDirectories(keyToPath(key).getParent()); - } + if (f.isRoot()) { + break; + } - private void deleteUnnecessaryFakeDirectories(Path f) throws IOException { - while (true) { - try { - String key = pathToKey(f); - if (key.isEmpty()) { - break; + f = f.getParent(); } + } - S3AFileStatus status = getFileStatus(f); - if (status.isDirectory() && status.isEmptyDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting fake directory " + key + "/"); - } - s3.deleteObject(bucket, key + "/"); - statistics.incrementWriteOps(1); + private void createFakeDirectory(final String bucketName, final String objectName) + throws AmazonClientException, AmazonServiceException { + if (!objectName.endsWith("/")) { + createEmptyObject(bucketName, objectName + "/"); + } else { + createEmptyObject(bucketName, objectName); } - } catch (FileNotFoundException | AmazonServiceException e) { - } + } - if (f.isRoot()) { - break; - } + // Used to create an empty file that represents an empty directory + private void createEmptyObject(final String bucketName, final String objectName) + throws AmazonClientException, AmazonServiceException { + final InputStream im = new InputStream() { + @Override + public int read() throws IOException { + return -1; + } + }; - f = f.getParent(); + final ObjectMetadata om = new ObjectMetadata(); + om.setContentLength(0L); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om); + putObjectRequest.setCannedAcl(cannedACL); + s3.putObject(putObjectRequest); + statistics.incrementWriteOps(1); } - } + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. + * @deprecated use {@link #getDefaultBlockSize(Path)} instead + */ + @Deprecated + public long getDefaultBlockSize() { + // default to 32MB: large enough to minimize the impact of seeks + return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE); + } - private void createFakeDirectory(final String bucketName, final String objectName) - throws AmazonClientException, AmazonServiceException { - if (!objectName.endsWith("/")) { - createEmptyObject(bucketName, objectName + "/"); - } else { - createEmptyObject(bucketName, objectName); + private void printAmazonServiceException(AmazonServiceException ase) { + LOG.info("Caught an AmazonServiceException, which means your request made it " + + "to Amazon S3, but was rejected with an error response for some reason."); + LOG.info("Error Message: " + ase.getMessage()); + LOG.info("HTTP Status Code: " + ase.getStatusCode()); + LOG.info("AWS Error Code: " + ase.getErrorCode()); + LOG.info("Error Type: " + ase.getErrorType()); + LOG.info("Request ID: " + ase.getRequestId()); + LOG.info("Class Name: " + ase.getClass().getName()); } - } - - // Used to create an empty file that represents an empty directory - private void createEmptyObject(final String bucketName, final String objectName) - throws AmazonClientException, AmazonServiceException { - final InputStream im = new InputStream() { - @Override - public int read() throws IOException { - return -1; - } - }; - - final ObjectMetadata om = new ObjectMetadata(); - om.setContentLength(0L); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); + + private void printAmazonClientException(AmazonClientException ace) { + LOG.info("Caught an AmazonClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with S3, " + + "such as not being able to access the network."); + LOG.info("Error Message: {}" + ace, ace); } - PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om); - putObjectRequest.setCannedAcl(cannedACL); - s3.putObject(putObjectRequest); - statistics.incrementWriteOps(1); - } - - /** - * Return the number of bytes that large input files should be optimally - * be split into to minimize i/o time. - * @deprecated use {@link #getDefaultBlockSize(Path)} instead - */ - @Deprecated - public long getDefaultBlockSize() { - // default to 32MB: large enough to minimize the impact of seeks - return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE); - } - - private void printAmazonServiceException(AmazonServiceException ase) { - LOG.info("Caught an AmazonServiceException, which means your request made it " + - "to Amazon S3, but was rejected with an error response for some reason."); - LOG.info("Error Message: " + ase.getMessage()); - LOG.info("HTTP Status Code: " + ase.getStatusCode()); - LOG.info("AWS Error Code: " + ase.getErrorCode()); - LOG.info("Error Type: " + ase.getErrorType()); - LOG.info("Request ID: " + ase.getRequestId()); - LOG.info("Class Name: " + ase.getClass().getName()); - } - - private void printAmazonClientException(AmazonClientException ace) { - LOG.info("Caught an AmazonClientException, which means the client encountered " + - "a serious internal problem while trying to communicate with S3, " + - "such as not being able to access the network."); - LOG.info("Error Message: {}" + ace, ace); - } }