Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 27, 2024
1 parent 587418c commit 155ee29
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -217,7 +216,7 @@ public static FSDataInputStream getFSDataInputStream(FileSystem fs,
StoragePath filePath,
int bufferSize,
boolean wrapStream) {
FSDataInputStream fsDataInputStream = null;
FSDataInputStream fsDataInputStream;
try {
fsDataInputStream = fs.open(convertToHadoopPath(filePath), bufferSize);
} catch (IOException e) {
Expand All @@ -230,54 +229,22 @@ public static FSDataInputStream getFSDataInputStream(FileSystem fs,

if (isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, filePath, bufferSize), true);
return new SchemeAwareFSDataInputStream(fsDataInputStream, true);
}

if (isCHDFileSystem(fs)) {
return new BoundedFsDataInputStream(fs, convertToHadoopPath(filePath), fsDataInputStream);
}

if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(convertToHadoopPath(filePath), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
return new TimedFSDataInputStream(convertToHadoopPath(filePath), fsDataInputStream);
}

// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
return fsDataInputStream;
}

/**
* GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be
* used by wrapping with required input streams.
*
* @param fsDataInputStream original instance of {@link FSDataInputStream}.
* @param filePath path of the file.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private static FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream,
StoragePath filePath,
int bufferSize) {
// in case of GCS FS, there are two flows.
// a. fsDataInputStream.getWrappedStream() instanceof FSInputStream
// b. fsDataInputStream.getWrappedStream() not an instanceof FSInputStream, but an instance of FSDataInputStream.
// (a) is handled in the first if block and (b) is handled in the second if block. If not, we fallback to original fsDataInputStream
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(convertToHadoopPath(filePath), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}

if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
&& ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
FSInputStream inputStream = (FSInputStream) ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream();
return new TimedFSDataInputStream(convertToHadoopPath(filePath),
new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize)));
}

return fsDataInputStream;
}

/**
* This is due to HUDI-140 GCS has a different behavior for detecting EOF during seek().
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,10 @@ public void releaseBuffer(ByteBuffer buffer) {
public void unbuffer() {
outerStream.unbuffer();
}

@Override
public void close() throws IOException {
super.close();
outerStream.close();
}
}

0 comments on commit 155ee29

Please sign in to comment.