Skip to content

Commit ba49cb7

Browse files
committed
HADOOP-16848. S3A refactoring
RequestFactory adds factory construction and full set of operations. Moves all S3 IO into the RawS3A class, takes factory S3 select API call goes into the store remove attempt at async init. Too complex. Change-Id: I36bb2884ebe0d7183f99f25860f79e3796a112fb
1 parent 7120785 commit ba49cb7

17 files changed

+607
-348
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 89 additions & 174 deletions
Large diffs are not rendered by default.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Lines changed: 18 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323
import java.io.FileNotFoundException;
2424
import java.io.IOException;
2525
import java.io.InputStream;
26-
import java.util.ArrayList;
2726
import java.util.List;
2827
import java.util.Map;
2928
import java.util.concurrent.atomic.AtomicInteger;
3029

31-
import com.amazonaws.services.s3.model.AmazonS3Exception;
3230
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
3331
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
3432
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
@@ -51,20 +49,15 @@
5149
import org.apache.hadoop.conf.Configuration;
5250
import org.apache.hadoop.fs.Path;
5351
import org.apache.hadoop.fs.PathIOException;
52+
import org.apache.hadoop.fs.s3a.impl.RawS3A;
5453
import org.apache.hadoop.fs.s3a.impl.StoreContext;
5554
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
5655
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
5756
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
58-
import org.apache.hadoop.fs.s3a.select.SelectBinding;
59-
import org.apache.hadoop.util.DurationInfo;
6057
import org.apache.hadoop.util.functional.CallableRaisingIOE;
6158

62-
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
6359
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
6460
import static org.apache.hadoop.fs.s3a.Invoker.*;
65-
import static org.apache.hadoop.fs.s3a.S3AUtils.longOption;
66-
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
67-
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
6861

6962
/**
7063
* Helper for low-level operations against an S3 Bucket for writing data,
@@ -112,6 +105,9 @@ public class WriteOperationHelper implements WriteOperations {
112105
/** Bucket of the owner FS. */
113106
private final String bucket;
114107

108+
/** Raw S3A implementation to invoke. */
109+
private final RawS3A rawS3A;
110+
115111
/**
116112
* statistics context.
117113
*/
@@ -124,18 +120,20 @@ public class WriteOperationHelper implements WriteOperations {
124120
* @param owner owner FS creating the helper
125121
* @param conf Configuration object
126122
* @param statisticsContext statistics context
127-
*
123+
* @param rawS3A raw S3A implementation.
128124
*/
129125
protected WriteOperationHelper(S3AFileSystem owner,
130126
Configuration conf,
131-
S3AStatisticsContext statisticsContext) {
127+
S3AStatisticsContext statisticsContext,
128+
RawS3A rawS3A) {
132129
this.owner = owner;
133130
this.invoker = new Invoker(new S3ARetryPolicy(conf),
134131
this::operationRetried);
135132
this.conf = conf;
136133
this.statisticsContext = statisticsContext;
137134
this.context = owner.createStoreContext();
138135
this.bucket = owner.getBucket();
136+
this.rawS3A = rawS3A;
139137
}
140138

141139
/**
@@ -231,7 +229,7 @@ public void writeFailed(Exception ex) {
231229
* @return a new metadata instance
232230
*/
233231
public ObjectMetadata newObjectMetadata(long length) {
234-
return owner.newObjectMetadata(length);
232+
return context.getRequestFactory().newObjectMetadata(length);
235233
}
236234

237235
/**
@@ -249,7 +247,7 @@ public String initiateMultiPartUpload(String destKey) throws IOException {
249247
destKey);
250248

251249
return retry("initiate MultiPartUpload", destKey, true,
252-
() -> owner.initiateMultipartUpload(initiateMPURequest).getUploadId());
250+
() -> rawS3A.initiateMultipartUpload(initiateMPURequest).getUploadId());
253251
}
254252

255253
/**
@@ -279,18 +277,16 @@ private CompleteMultipartUploadResult finalizeMultipartUpload(
279277
throw new PathIOException(destKey,
280278
"No upload parts in multipart upload");
281279
}
280+
281+
CompleteMultipartUploadRequest request =
282+
context.getRequestFactory().newCompleteMultipartUploadRequest(
283+
destKey, uploadId, partETags);
282284
CompleteMultipartUploadResult uploadResult =
283285
invoker.retry("Completing multipart upload", destKey,
284286
true,
285287
retrying,
286288
() -> {
287-
// a copy of the list is required, so that the AWS SDK doesn't
288-
// attempt to sort an unmodifiable list.
289-
return owner.getAmazonS3Client().completeMultipartUpload(
290-
new CompleteMultipartUploadRequest(bucket,
291-
destKey,
292-
uploadId,
293-
new ArrayList<>(partETags)));
289+
return rawS3A.completeMultipartUpload(request);
294290
}
295291
);
296292
owner.finishedWrite(destKey, length, uploadResult.getETag(),
@@ -432,9 +428,9 @@ public UploadPartRequest newUploadPartRequest(
432428
InputStream uploadStream,
433429
File sourceFile,
434430
Long offset) throws PathIOException {
435-
checkNotNull(uploadId);
436431

437-
return context.getRequestFactory().newUploadPartRequest(destKey, uploadId, partNumber, size, uploadStream, sourceFile, offset);
432+
return context.getRequestFactory().newUploadPartRequest(
433+
destKey, uploadId, partNumber, size, uploadStream, sourceFile, offset);
438434
}
439435

440436
/**
@@ -610,33 +606,6 @@ public SelectObjectContentResult select(
610606
final SelectObjectContentRequest request,
611607
final String action)
612608
throws IOException {
613-
String bucketName = request.getBucketName();
614-
Preconditions.checkArgument(bucket.equals(bucketName),
615-
"wrong bucket: %s", bucketName);
616-
if (LOG.isDebugEnabled()) {
617-
LOG.debug("Initiating select call {} {}",
618-
source, request.getExpression());
619-
LOG.debug(SelectBinding.toString(request));
620-
}
621-
return invoker.retry(
622-
action,
623-
source.toString(),
624-
true,
625-
() -> {
626-
try (DurationInfo ignored =
627-
new DurationInfo(LOG, "S3 Select operation")) {
628-
try {
629-
return owner.getAmazonS3Client().selectObjectContent(request);
630-
} catch (AmazonS3Exception e) {
631-
LOG.error("Failure of S3 Select request against {}",
632-
source);
633-
LOG.debug("S3 Select request against {}:\n{}",
634-
source,
635-
SelectBinding.toString(request),
636-
e);
637-
throw e;
638-
}
639-
}
640-
});
609+
return owner.store().select(source, request, action);
641610
}
642611
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.hadoop.fs.Path;
2727
import org.apache.hadoop.fs.s3a.Statistic;
28+
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
2829
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
2930
import org.apache.hadoop.fs.s3a.impl.StoreContext;
3031

@@ -47,21 +48,25 @@
4748
public class MagicCommitIntegration {
4849
private static final Logger LOG =
4950
LoggerFactory.getLogger(MagicCommitIntegration.class);
50-
private final StoreContext owner;
5151
private final boolean magicCommitEnabled;
5252

5353
private final StoreContext storeContext;
5454

55+
private final WriteOperationHelper writeOperationHelper;
56+
5557
/**
5658
* Instantiate.
57-
* @param owner owner class
59+
* @param storeContext store context
60+
* @param writeOperationHelper helper
5861
* @param magicCommitEnabled is magic commit enabled.
5962
*/
60-
public MagicCommitIntegration(StoreContext owner,
61-
boolean magicCommitEnabled) {
62-
this.owner = owner;
63+
public MagicCommitIntegration(
64+
final StoreContext storeContext,
65+
final WriteOperationHelper writeOperationHelper,
66+
final boolean magicCommitEnabled) {
6367
this.magicCommitEnabled = magicCommitEnabled;
64-
this.storeContext = owner.createStoreContext();
68+
this.storeContext = storeContext;
69+
this.writeOperationHelper = writeOperationHelper;
6570
}
6671

6772
/**
@@ -104,7 +109,7 @@ public PutTracker createTracker(Path path, String key) {
104109
key,
105110
destKey,
106111
pendingsetPath,
107-
owner.getWriteOperationHelper());
112+
writeOperationHelper);
108113
LOG.debug("Created {}", tracker);
109114
} else {
110115
LOG.warn("File being created has a \"magic\" path, but the filesystem"

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,4 @@ public interface ContextAccessors {
9595
@Retries.RetryTranslated
9696
ObjectMetadata getObjectMetadata(String key) throws IOException;
9797

98-
99-
/**
100-
* Get a write operation helper.
101-
* @return a write operation helper instance.
102-
*/
103-
WriteOperationHelper getWriteOperationHelper();
10498
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RawS3A.java

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,32 @@
2020

2121
import javax.annotation.Nullable;
2222
import java.io.IOException;
23+
import java.nio.file.AccessDeniedException;
2324
import java.util.List;
2425

2526
import com.amazonaws.AmazonClientException;
2627
import com.amazonaws.services.s3.AmazonS3;
28+
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
29+
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
2730
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
2831
import com.amazonaws.services.s3.model.DeleteObjectsResult;
32+
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
33+
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
2934
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
3035
import com.amazonaws.services.s3.model.MultipartUpload;
3136
import com.amazonaws.services.s3.model.ObjectMetadata;
3237
import com.amazonaws.services.s3.model.PutObjectRequest;
3338
import com.amazonaws.services.s3.model.PutObjectResult;
39+
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
40+
import com.amazonaws.services.s3.model.SelectObjectContentResult;
3441
import com.amazonaws.services.s3.model.UploadPartRequest;
3542
import com.amazonaws.services.s3.model.UploadPartResult;
3643
import com.amazonaws.services.s3.transfer.TransferManager;
37-
import com.google.common.annotations.VisibleForTesting;
44+
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
3845

3946
import org.apache.hadoop.classification.InterfaceAudience;
4047
import org.apache.hadoop.fs.InvalidRequestException;
48+
import org.apache.hadoop.fs.Path;
4149
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
4250
import org.apache.hadoop.fs.s3a.Invoker;
4351
import org.apache.hadoop.fs.s3a.MetadataPersistenceException;
@@ -48,22 +56,17 @@
4856
import org.apache.hadoop.fs.s3a.auth.SignerManager;
4957

5058
/**
51-
* The Guarded S3A Store.
52-
* This is where the core operations are implemented; the "APIs", both public
53-
* (FileSystem, AbstractFileSystem) and the internal ones (WriteOperationHelper)
54-
* call through here.
59+
* The Raw S3A Store.
5560
*/
5661
public interface RawS3A extends S3AService {
5762

5863
/**
5964
* Bind the Raw S3 services.
60-
* Ultimately the instation of some of these will be pushed down;
61-
* for now they are passed in.
6265
* @param credentials credential source
6366
* @param signerManager signing
6467
* @param transfers bulk transfers
6568
* @param s3client AWS S3 client
66-
* @param requestFactory
69+
* @param requestFactory AWS request factory
6770
*/
6871
void bind(
6972
StoreContext storeContext,
@@ -73,6 +76,12 @@ void bind(
7376
AmazonS3 s3client,
7477
RequestFactory requestFactory);
7578

79+
/**
80+
* Update the S3 client.
81+
* @param client new AWS S3 client.
82+
*/
83+
void setAmazonS3Client(AmazonS3 client);
84+
7685
/**
7786
* Request object metadata; increments counters in the process.
7887
* Retry policy: retry untranslated.
@@ -241,4 +250,56 @@ List<MultipartUpload> listMultipartUploads(String prefix)
241250
*/
242251
MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
243252
throws IOException;
253+
254+
/**
255+
* Execute an S3 Select operation.
256+
* On a failure, the request is only logged at debug to avoid the
257+
* select exception being printed.
258+
* @param request Select request to issue.
259+
* @return response
260+
* @throws IOException failure
261+
*/
262+
@Retries.OnceRaw
263+
SelectObjectContentResult select(
264+
SelectObjectContentRequest request)
265+
throws IOException;
266+
267+
/**
268+
* Initiate a multipart upload from the preconfigured request.
269+
* Retry policy: none + untranslated.
270+
* @param request request to initiate
271+
* @return the result of the call
272+
* @throws AmazonClientException on failures inside the AWS SDK
273+
* @throws IOException Other IO problems
274+
*/
275+
@Retries.OnceRaw
276+
InitiateMultipartUploadResult initiateMultipartUpload(
277+
InitiateMultipartUploadRequest request) throws IOException;
278+
279+
/**
280+
* Complete a multipart upload.
281+
* @param request request to complete
282+
* @return result
283+
* @throws AmazonClientException on failures inside the AWS SDK
284+
* @throws IOException Other IO problems
285+
*/
286+
@Retries.OnceRaw
287+
CompleteMultipartUploadResult completeMultipartUpload(
288+
CompleteMultipartUploadRequest request) throws IOException;
289+
290+
291+
292+
/**
293+
* Get the region of a bucket; fixing up the region so it can be used
294+
* in the builders of other AWS clients.
295+
* Requires the caller to have the AWS role permission
296+
* {@code s3:GetBucketLocation}.
297+
* Retry policy: retrying, translated.
298+
* @param bucketName the name of the bucket
299+
* @return the region in which a bucket is located
300+
* @throws AccessDeniedException if the caller lacks permission.
301+
* @throws IOException on any failure.
302+
*/
303+
@Retries.RetryTranslated
304+
String getBucketLocation(String bucketName) throws IOException;
244305
}

0 commit comments

Comments
 (0)