Skip to content

Commit 7120785

Browse files
committed
HADOOP-16848. Refactoring: initial layering
First PoC of My planned layout model of the S3A FS. * There's a raw layer and a guarded layer * which are instantiated in sequence in a separate executor from S3AFileSystem.initalize And whose accessors block until completed or rethrow failures. The layers are being handed in all their dependencies from FS.initialize() and we currently block until started. What I plan to do, in a future iteration, is: * each layer extracts their own settings from the config and stores locally (list version, upload size etc) * have each layer instantiate their internal classes (AWS S3 client, transfer manager) internally * Also async create: metastore, DT binding * And all startup actions (check bucket, init multipart, ...) Then * move ops to the layers, raw* -> rawStore; inner -> S3AStore * move WriteOperationHelper, SelectBinding, etc, to all work against S3AStore rather than FS. S3AStore will become where most of the code moves to; S3AFilesystem more of the init and binding to hadoop FS API. RawS3A will be the accessor through which all AWS client access goes. Not going to change: all accessors on S3AFileSystem...not just tests use it but some external code (cloudstore) needs it to get at low level S3A, etc. Change-Id: I998c0d61cce2ee7fd0be804bf21da6b68fd69a6f HADOOP-16583 refactoring RequestFactory and RawS3A This moves most of the s3 client interaction into RawS3AImpl, Mainly just by moving the methods from S3AFileSystem. One key finding was that we can put all the code to create Request classes for the AWS SDK into its own factory, So ensure everything is set up consistently and keeping what is mainly housekeeping out of the way of everything else. We can do that rework immediately, as it doesn't require the rest of the layering. Also, the rawS3A() accessor no longer raises IOEs, it was Used into many places where that wasn't allowed. I sort of expected that. There's an awaitQuietly() (todo: change name) method which returns all failures as RTEs rather than IOEs. For the S3A FS API entry points, we still need to raise the normal startup exceptions, so we will need extra block/validate there after which, maybe, we can change the logic in these new getters just to raise an exception if their refs are null, rather than block. Change-Id: I3d15a46dd3034fc5d34dbaf01aaddba462d63a9d HADOOP-16583. refactoring -fix regression I'd pulled an import which wasn't used, but a recent change reinstated its need Change-Id: I12b0725a14a6e7bae1ecf93fcbd4d05e1eacb03e HADOOP-16848. Building against trunk Change-Id: Id1af0763998dd285bb2476298cdb94edf809b67c
1 parent fa15594 commit 7120785

23 files changed

+1866
-273
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private MultipartUtils() { }
5555
* bucket will be searched.
5656
* @return an iterator of matching uploads
5757
*/
58-
static MultipartUtils.UploadIterator listMultipartUploads(AmazonS3 s3,
58+
public static MultipartUtils.UploadIterator listMultipartUploads(AmazonS3 s3,
5959
Invoker invoker, String bucketName, int maxKeys, @Nullable String prefix)
6060
throws IOException {
6161
return new MultipartUtils.UploadIterator(s3, invoker, bucketName, maxKeys,

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 183 additions & 202 deletions
Large diffs are not rendered by default.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Lines changed: 11 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.hadoop.conf.Configuration;
5252
import org.apache.hadoop.fs.Path;
5353
import org.apache.hadoop.fs.PathIOException;
54+
import org.apache.hadoop.fs.s3a.impl.StoreContext;
5455
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
5556
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
5657
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
@@ -116,6 +117,8 @@ public class WriteOperationHelper implements WriteOperations {
116117
*/
117118
private final S3AStatisticsContext statisticsContext;
118119

120+
private final StoreContext context;
121+
119122
/**
120123
* Constructor.
121124
* @param owner owner FS creating the helper
@@ -131,7 +134,8 @@ protected WriteOperationHelper(S3AFileSystem owner,
131134
this::operationRetried);
132135
this.conf = conf;
133136
this.statisticsContext = statisticsContext;
134-
bucket = owner.getBucket();
137+
this.context = owner.createStoreContext();
138+
this.bucket = owner.getBucket();
135139
}
136140

137141
/**
@@ -241,11 +245,8 @@ public ObjectMetadata newObjectMetadata(long length) {
241245
public String initiateMultiPartUpload(String destKey) throws IOException {
242246
LOG.debug("Initiating Multipart upload to {}", destKey);
243247
final InitiateMultipartUploadRequest initiateMPURequest =
244-
new InitiateMultipartUploadRequest(bucket,
245-
destKey,
246-
newObjectMetadata(-1));
247-
initiateMPURequest.setCannedACL(owner.getCannedACL());
248-
owner.setOptionalMultipartUploadRequestParameters(initiateMPURequest);
248+
context.getRequestFactory().newMultipartUploadRequest(
249+
destKey);
249250

250251
return retry("initiate MultiPartUpload", destKey, true,
251252
() -> owner.initiateMultipartUpload(initiateMPURequest).getUploadId());
@@ -432,50 +433,8 @@ public UploadPartRequest newUploadPartRequest(
432433
File sourceFile,
433434
Long offset) throws PathIOException {
434435
checkNotNull(uploadId);
435-
// exactly one source must be set; xor verifies this
436-
checkArgument((uploadStream != null) ^ (sourceFile != null),
437-
"Data source");
438-
checkArgument(size >= 0, "Invalid partition size %s", size);
439-
checkArgument(partNumber > 0,
440-
"partNumber must be between 1 and %s inclusive, but is %s",
441-
DEFAULT_UPLOAD_PART_COUNT_LIMIT, partNumber);
442-
443-
LOG.debug("Creating part upload request for {} #{} size {}",
444-
uploadId, partNumber, size);
445-
long partCountLimit = longOption(conf,
446-
UPLOAD_PART_COUNT_LIMIT,
447-
DEFAULT_UPLOAD_PART_COUNT_LIMIT,
448-
1);
449-
if (partCountLimit != DEFAULT_UPLOAD_PART_COUNT_LIMIT) {
450-
LOG.warn("Configuration property {} shouldn't be overridden by client",
451-
UPLOAD_PART_COUNT_LIMIT);
452-
}
453-
final String pathErrorMsg = "Number of parts in multipart upload exceeded."
454-
+ " Current part count = %s, Part count limit = %s ";
455-
if (partNumber > partCountLimit) {
456-
throw new PathIOException(destKey,
457-
String.format(pathErrorMsg, partNumber, partCountLimit));
458-
}
459-
UploadPartRequest request = new UploadPartRequest()
460-
.withBucketName(bucket)
461-
.withKey(destKey)
462-
.withUploadId(uploadId)
463-
.withPartNumber(partNumber)
464-
.withPartSize(size);
465-
if (uploadStream != null) {
466-
// there's an upload stream. Bind to it.
467-
request.setInputStream(uploadStream);
468-
} else {
469-
checkArgument(sourceFile.exists(),
470-
"Source file does not exist: %s", sourceFile);
471-
checkArgument(offset >= 0, "Invalid offset %s", offset);
472-
long length = sourceFile.length();
473-
checkArgument(offset == 0 || offset < length,
474-
"Offset %s beyond length of file %s", offset, length);
475-
request.setFile(sourceFile);
476-
request.setFileOffset(offset);
477-
}
478-
return request;
436+
437+
return context.getRequestFactory().newUploadPartRequest(destKey, uploadId, partNumber, size, uploadStream, sourceFile, offset);
479438
}
480439

481440
/**
@@ -631,10 +590,8 @@ public Configuration getConf() {
631590
* @return the request
632591
*/
633592
public SelectObjectContentRequest newSelectRequest(Path path) {
634-
SelectObjectContentRequest request = new SelectObjectContentRequest();
635-
request.setBucketName(bucket);
636-
request.setKey(owner.pathToKey(path));
637-
return request;
593+
return context.getRequestFactory().newSelectRequest(
594+
context.pathToKey(path));
638595
}
639596

640597
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.slf4j.LoggerFactory;
2525

2626
import org.apache.hadoop.fs.Path;
27-
import org.apache.hadoop.fs.s3a.S3AFileSystem;
2827
import org.apache.hadoop.fs.s3a.Statistic;
2928
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
3029
import org.apache.hadoop.fs.s3a.impl.StoreContext;
@@ -48,7 +47,7 @@
4847
public class MagicCommitIntegration {
4948
private static final Logger LOG =
5049
LoggerFactory.getLogger(MagicCommitIntegration.class);
51-
private final S3AFileSystem owner;
50+
private final StoreContext owner;
5251
private final boolean magicCommitEnabled;
5352

5453
private final StoreContext storeContext;
@@ -58,7 +57,7 @@ public class MagicCommitIntegration {
5857
* @param owner owner class
5958
* @param magicCommitEnabled is magic commit enabled.
6059
*/
61-
public MagicCommitIntegration(S3AFileSystem owner,
60+
public MagicCommitIntegration(StoreContext owner,
6261
boolean magicCommitEnabled) {
6362
this.owner = owner;
6463
this.magicCommitEnabled = magicCommitEnabled;
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.impl;
20+
21+
import java.util.Objects;
22+
23+
import org.apache.hadoop.fs.s3a.Invoker;
24+
import org.apache.hadoop.fs.s3a.Statistic;
25+
import org.apache.hadoop.service.AbstractService;
26+
27+
public abstract class AbstractS3AService
28+
extends AbstractService
29+
implements S3AService {
30+
31+
private StoreContext storeContext;
32+
33+
protected AbstractS3AService(final String name) {
34+
super(name);
35+
}
36+
37+
protected void bind(final StoreContext storeContext) {
38+
this.storeContext = storeContext;
39+
}
40+
41+
@Override
42+
public StoreContext getStoreContext() {
43+
return storeContext;
44+
}
45+
46+
/**
47+
* Validate the state of the service, then start the service.
48+
* Service start may be async.
49+
* @throws Exception if initialization failed.
50+
*/
51+
@Override
52+
protected void serviceStart() throws Exception {
53+
Objects.requireNonNull(storeContext, () ->
54+
"not initialized with store context: " + getName());
55+
}
56+
57+
/**
58+
* Increment a statistic by 1.
59+
* This increments both the instrumentation and storage statistics.
60+
* @param statistic The operation to increment
61+
*/
62+
protected void incrementStatistic(Statistic statistic) {
63+
incrementStatistic(statistic, 1);
64+
}
65+
66+
/**
67+
* Increment a statistic by a specific value.
68+
* This increments both the instrumentation and storage statistics.
69+
* @param statistic The operation to increment
70+
* @param count the count to increment
71+
*/
72+
protected void incrementStatistic(Statistic statistic, long count) {
73+
// todo
74+
}
75+
76+
/**
77+
* Increment read operations.
78+
*/
79+
public void incrementReadOperations() {
80+
// statistics.incrementReadOps(1);
81+
}
82+
83+
/**
84+
* Increment the write operation counter.
85+
* This is somewhat inaccurate, as it appears to be invoked more
86+
* often than needed in progress callbacks.
87+
*/
88+
public void incrementWriteOperations() {
89+
// statistics.incrementWriteOps(1);
90+
}
91+
92+
protected String getBucket() {
93+
return getStoreContext().getBucket();
94+
}
95+
96+
protected Invoker getInvoker() {
97+
return getStoreContext().getInvoker();
98+
}
99+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import javax.annotation.Nullable;
2222
import java.io.IOException;
2323
import java.io.UncheckedIOException;
24+
import java.io.InterruptedIOException;
2425
import java.util.List;
2526
import java.util.concurrent.Callable;
2627
import java.util.concurrent.CancellationException;
@@ -118,7 +119,8 @@ public static <T> void waitForCompletion(
118119
new DurationInfo(LOG, false, "Waiting for task completion")) {
119120
future.join();
120121
} catch (CancellationException e) {
121-
throw new IOException(e);
122+
throw (InterruptedIOException)
123+
new InterruptedIOException(e.toString()).initCause(e);
122124
} catch (CompletionException e) {
123125
raiseInnerCause(e);
124126
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.apache.hadoop.fs.Path;
2828
import org.apache.hadoop.fs.s3a.Retries;
29+
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
2930

3031
/**
3132
* An interface to implement for providing accessors to
@@ -94,4 +95,10 @@ public interface ContextAccessors {
9495
@Retries.RetryTranslated
9596
ObjectMetadata getObjectMetadata(String key) throws IOException;
9697

98+
99+
/**
100+
* Get a write operation helper.
101+
* @return a write operation helper instance.
102+
*/
103+
WriteOperationHelper getWriteOperationHelper();
97104
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@
3737
*/
3838
public final class InternalConstants {
3939

40+
/**
41+
* This declared delete as idempotent.
42+
* This is an "interesting" topic in past Hadoop FS work.
43+
* Essentially: with a single caller, DELETE is idempotent
44+
* but in a shared filesystem, it is is very much not so.
45+
* Here, on the basis that isn't a filesystem with consistency guarantees,
46+
* retryable results in files being deleted.
47+
*/
48+
public static final boolean DELETE_CONSIDERED_IDEMPOTENT = true;
49+
4050
private InternalConstants() {
4151
}
4252

0 commit comments

Comments
 (0)