-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19569. S3A: stream write/close fails badly once FS is closed #7700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
HADOOP-19569. S3A: stream write/close fails badly once FS is closed #7700
Conversation
cf32b54
to
fea3e89
Compare
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR addresses HADOOP-19569 by enhancing the S3A filesystem’s behavior when closed, ensuring that write/close operations fail gracefully, and improves internal service registration and executor shutdown behavior. Key changes include enforcing FS state checks via added checkRunning() calls, refactoring helper and callback methods (e.g. renaming getWriteOperationHelper to createWriteOperationHelperWithinActiveSpan and replacing direct FS calls with getStore() invocations), and cleaning up legacy utilities such as MultipartUtils.
Reviewed Changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java | Adjusted WriteOperationHelper construction with updated callback parameters and method renaming. |
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java |
Updated test calls to use getStore() for invoking putObjectDirect. |
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java |
Inserted checkRunning() calls across API methods and updated service lookup/register methods. |
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/WriteOperationHelper.java | Refactored to use a new “callbacks” field and improved null validations with requireNonNull. |
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | Exposed getStore() publicly and updated multipart upload handling to leverage the new MultipartIOService. |
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestBlockingThreadPoolExecutorService.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java |
Enhanced executor service tests and added rejection-handling logic to ensure proper shutdown on task rejection. |
Comments suppressed due to low confidence (3)
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/WriteOperationHelper.java:127
- [nitpick] Update the Javadoc in WriteOperationHelper to reflect the change from using the variable name 'writeOperationHelperCallbacks' to 'callbacks', ensuring that the documentation clearly explains its role and usage.
private final WriteOperationHelperCallbacks callbacks;
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java:290
- The completeMultipartUpload method has been removed from the S3AStore interface. Ensure that all external consumers have been updated accordingly or consider adding a deprecation warning to ease the transition.
/* Removed completeMultipartUpload method */
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:1357
- [nitpick] Making the getStore() method public exposes internal implementation details. Confirm that this exposure is intended, or consider providing a more restricted accessor if external access is not required.
public S3AStore getStore() {
@@ -130,21 +130,20 @@ public static BlockingThreadPoolExecutorService newInstance( | |||
slower than enqueueing. */ | |||
final BlockingQueue<Runnable> workQueue = | |||
new LinkedBlockingQueue<>(waitingTasks + activeTasks); | |||
final InnerExecutorRejection rejection = new InnerExecutorRejection(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The InnerExecutorRejection handler now shuts down the service upon rejection. Consider enhancing the error handling logic or adding more detailed documentation to explain the shutdown behavior in case of task rejection.
Copilot uses AI. Check for mistakes.
private static final int BLOCKING_THRESHOLD_MSEC = 50; | ||
|
||
private static final Integer SOME_VALUE = 1337; | ||
|
||
private static BlockingThreadPoolExecutorService tpe; | ||
private BlockingThreadPoolExecutorService tpe; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Since the thread pool executor is now an instance variable with setup/teardown methods, ensure that each test properly initializes and destroys the executor to avoid interference between tests.
Copilot uses AI. Check for mistakes.
Executors in hadoop-common to - pick up shutdown of inner executor and shut themselves down. - semaphore executor to decrement counters in this process so that queue state is updated - semaphored delegating executor unit test in common This stops callers being able to submit work when the inner executor has shut down. WriteOperationHelper * make all calls through its callback interface, rather than given a ref to S3AFS. * Move WriteOperationHelper callbacks to S3Store layer, Multipart IO operations * move nearly all Multpart IO operationss out of s3afs and into a new mulitpart service interface and impl * Multipart service retrieved and invoked as appropriate * StoreImpl stores a map of ServiceName -> service. with a lookupService() method in S3AStore interface, it's possible to retrieve services through the API just by knowing their name and type * registering all current services this way StoreImpl to IllegalStateException on method invocation whene the service isn't running. Some methods are kept open as they do seem needed.
c347def
to
c8cdbfa
Compare
Continuing pulling out multipart IO such that there are no back references from it to S3AStore -the final change is to define a store statistics class which it and other things can use to update stats. Yes, those stats will get back into the FS, but we don't want to have that recursive complexity of S3AFS utility classes. Once an inner class has been pulled out of S3AStoreImpl, it SHOULD have a restricted interface of operations it can call back on the store.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
All upload operations are in MultipartIO service, which has been renamed and move to package org.apache.hadoop.fs.s3a.impl.write to match. For completeness deletion should also go into this class or an adjacent one on deletion.
💔 -1 overall
This message was automatically generated. |
HADOOP-19569.
Executors in hadoop-common to
S3A code
This is complex.
TODO:
How was this patch tested?
New ITests which close the FS while simple and multipart writes are in progress.
S3 london.
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?