Skip to content

Commit

Permalink
synchronize hadoop configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Kanishk Karanawat committed Sep 1, 2023
1 parent 6456bce commit 2e84a2c
Showing 1 changed file with 145 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
Expand Down Expand Up @@ -92,78 +87,82 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {

@Override
protected List<MatchResult> match(List<String> specs) {
ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
for (String spec : specs) {
try {
final Set<Metadata> metadata = new HashSet<>();
if (spec.contains("**")) {
// recursive glob
int index = spec.indexOf("**");
metadata.addAll(
matchRecursiveGlob(spec.substring(0, index + 1), spec.substring(index + 1)));
} else {
// normal glob
final Path path = new Path(spec);
final FileStatus[] fileStatuses = path.getFileSystem(configuration).globStatus(path);
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
metadata.add(toMetadata(fileStatus));
synchronized (configuration) {
ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
for (String spec : specs) {
try {
final Set<Metadata> metadata = new HashSet<>();
if (spec.contains("**")) {
// recursive glob
int index = spec.indexOf("**");
metadata.addAll(
matchRecursiveGlob(spec.substring(0, index + 1), spec.substring(index + 1)));
} else {
// normal glob
final Path path = new Path(spec);
final FileStatus[] fileStatuses = path.getFileSystem(configuration).globStatus(path);
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
metadata.add(toMetadata(fileStatus));
}
}
}
if (metadata.isEmpty()) {
resultsBuilder.add(MatchResult.create(Status.NOT_FOUND, Collections.emptyList()));
} else {
resultsBuilder.add(MatchResult.create(Status.OK, new ArrayList<>(metadata)));
}
} catch (IOException e) {
resultsBuilder.add(MatchResult.create(Status.ERROR, e));
}
if (metadata.isEmpty()) {
resultsBuilder.add(MatchResult.create(Status.NOT_FOUND, Collections.emptyList()));
} else {
resultsBuilder.add(MatchResult.create(Status.OK, new ArrayList<>(metadata)));
}
} catch (IOException e) {
resultsBuilder.add(MatchResult.create(Status.ERROR, e));
}
return resultsBuilder.build();
}
return resultsBuilder.build();
}

private Set<Metadata> matchRecursiveGlob(String directorySpec, String fileSpec)
throws IOException {
final org.apache.hadoop.fs.FileSystem fs = new Path(directorySpec).getFileSystem(configuration);
Set<Metadata> metadata = new HashSet<>();
if (directorySpec.contains("*")) {
// An abstract directory with a wildcard is converted to concrete directories to search.
FileStatus[] directoryStatuses = fs.globStatus(new Path(directorySpec));
for (FileStatus directoryStatus : directoryStatuses) {
if (directoryStatus.isDirectory()) {
metadata.addAll(
matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
synchronized (configuration) {
final org.apache.hadoop.fs.FileSystem fs = new Path(directorySpec).getFileSystem(configuration);
Set<Metadata> metadata = new HashSet<>();
if (directorySpec.contains("*")) {
// An abstract directory with a wildcard is converted to concrete directories to search.
FileStatus[] directoryStatuses = fs.globStatus(new Path(directorySpec));
for (FileStatus directoryStatus : directoryStatuses) {
if (directoryStatus.isDirectory()) {
metadata.addAll(
matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
}
}
}
} else {
// A concrete directory is searched.
FileStatus[] fileStatuses = fs.globStatus(new Path(directorySpec + "/" + fileSpec));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
metadata.add(toMetadata(fileStatus));
} else {
// A concrete directory is searched.
FileStatus[] fileStatuses = fs.globStatus(new Path(directorySpec + "/" + fileSpec));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
metadata.add(toMetadata(fileStatus));
}
}
}

// All sub-directories of a concrete directory are searched.
FileStatus[] directoryStatuses = fs.globStatus(new Path(directorySpec + "/*"));
for (FileStatus directoryStatus : directoryStatuses) {
if (directoryStatus.isDirectory()) {
metadata.addAll(
matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
// All sub-directories of a concrete directory are searched.
FileStatus[] directoryStatuses = fs.globStatus(new Path(directorySpec + "/*"));
for (FileStatus directoryStatus : directoryStatuses) {
if (directoryStatus.isDirectory()) {
metadata.addAll(
matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));
}
}
}

// Handle additional instances of recursive globs.
if (fileSpec.contains("**")) {
int index = fileSpec.indexOf("**");
metadata.addAll(
matchRecursiveGlob(
directorySpec + "/" + fileSpec.substring(0, index + 1),
fileSpec.substring(index + 1)));
// Handle additional instances of recursive globs.
if (fileSpec.contains("**")) {
int index = fileSpec.indexOf("**");
metadata.addAll(
matchRecursiveGlob(
directorySpec + "/" + fileSpec.substring(0, index + 1),
fileSpec.substring(index + 1)));
}
}
return metadata;
}
return metadata;
}

private Metadata toMetadata(FileStatus fileStatus) {
Expand All @@ -179,47 +178,53 @@ private Metadata toMetadata(FileStatus fileStatus) {
@Override
protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)
throws IOException {
return Channels.newChannel(
resourceId.toPath().getFileSystem(configuration).create(resourceId.toPath()));
synchronized (configuration) {
return Channels.newChannel(
resourceId.toPath().getFileSystem(configuration).create(resourceId.toPath()));
}
}

@Override
protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException {
final org.apache.hadoop.fs.FileSystem fs = resourceId.toPath().getFileSystem(configuration);
final FileStatus fileStatus = fs.getFileStatus(resourceId.toPath());
return new HadoopSeekableByteChannel(fileStatus, fs.open(resourceId.toPath()));
synchronized(configuration) {
final org.apache.hadoop.fs.FileSystem fs = resourceId.toPath().getFileSystem(configuration);
final FileStatus fileStatus = fs.getFileStatus(resourceId.toPath());
return new HadoopSeekableByteChannel(fileStatus, fs.open(resourceId.toPath()));
}
}

@Override
protected void copy(List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> destResourceIds)
throws IOException {
for (int i = 0; i < srcResourceIds.size(); ++i) {
// this enforces src and dest file systems to match
final org.apache.hadoop.fs.FileSystem fs =
srcResourceIds.get(i).toPath().getFileSystem(configuration);
// Unfortunately HDFS FileSystems don't support a native copy operation so we are forced
// to use the inefficient implementation found in FileUtil which copies all the bytes through
// the local machine.
//
// HDFS FileSystem does define a concat method but could only find the DFSFileSystem
// implementing it. The DFSFileSystem implemented concat by deleting the srcs after which
// is not what we want. Also, all the other FileSystem implementations I saw threw
// UnsupportedOperationException within concat.
final boolean success =
FileUtil.copy(
fs,
srcResourceIds.get(i).toPath(),
fs,
destResourceIds.get(i).toPath(),
false,
true,
fs.getConf());
if (!success) {
// Defensive coding as this should not happen in practice
throw new IOException(
String.format(
"Unable to copy resource %s to %s. No further information provided by underlying filesystem.",
srcResourceIds.get(i).toPath(), destResourceIds.get(i).toPath()));
synchronized (configuration) {
for (int i = 0; i < srcResourceIds.size(); ++i) {
// this enforces src and dest file systems to match
final org.apache.hadoop.fs.FileSystem fs =
srcResourceIds.get(i).toPath().getFileSystem(configuration);
// Unfortunately HDFS FileSystems don't support a native copy operation so we are forced
// to use the inefficient implementation found in FileUtil which copies all the bytes through
// the local machine.
//
// HDFS FileSystem does define a concat method but could only find the DFSFileSystem
// implementing it. The DFSFileSystem implemented concat by deleting the srcs after which
// is not what we want. Also, all the other FileSystem implementations I saw threw
// UnsupportedOperationException within concat.
final boolean success =
FileUtil.copy(
fs,
srcResourceIds.get(i).toPath(),
fs,
destResourceIds.get(i).toPath(),
false,
true,
fs.getConf());
if (!success) {
// Defensive coding as this should not happen in practice
throw new IOException(
String.format(
"Unable to copy resource %s to %s. No further information provided by underlying filesystem.",
srcResourceIds.get(i).toPath(), destResourceIds.get(i).toPath()));
}
}
}
}
Expand Down Expand Up @@ -252,61 +257,65 @@ protected void rename(
if (moveOptions.length > 0) {
throw new UnsupportedOperationException("Support for move options is not yet implemented.");
}
for (int i = 0; i < srcResourceIds.size(); ++i) {

final Path srcPath = srcResourceIds.get(i).toPath();
final Path destPath = destResourceIds.get(i).toPath();
synchronized (configuration) {
for (int i = 0; i < srcResourceIds.size(); ++i) {

// this enforces src and dest file systems to match
final org.apache.hadoop.fs.FileSystem fs = srcPath.getFileSystem(configuration);
final Path srcPath = srcResourceIds.get(i).toPath();
final Path destPath = destResourceIds.get(i).toPath();

// rename in HDFS requires the target directory to exist or silently fails (BEAM-4861)
mkdirs(destPath);
// this enforces src and dest file systems to match
final org.apache.hadoop.fs.FileSystem fs = srcPath.getFileSystem(configuration);

boolean success = fs.rename(srcPath, destPath);
// rename in HDFS requires the target directory to exist or silently fails (BEAM-4861)
mkdirs(destPath);

// If the failure was due to the file already existing, delete and retry (BEAM-5036).
// This should be the exceptional case, so handle here rather than incur the overhead of
// testing first
if (!success && fs.exists(srcPath) && fs.exists(destPath)) {
LOG.debug(LOG_DELETING_EXISTING_FILE, Path.getPathWithoutSchemeAndAuthority(destPath));
fs.delete(destPath, false); // not recursive
success = fs.rename(srcPath, destPath);
}
boolean success = fs.rename(srcPath, destPath);

if (!success) {
if (!fs.exists(srcPath)) {
throw new FileNotFoundException(
String.format(
"Unable to rename resource %s to %s as source not found.", srcPath, destPath));

} else if (fs.exists(destPath)) {
throw new FileAlreadyExistsException(
String.format(
"Unable to rename resource %s to %s as destination already exists and couldn't be deleted.",
srcPath, destPath));
// If the failure was due to the file already existing, delete and retry (BEAM-5036).
// This should be the exceptional case, so handle here rather than incur the overhead of
// testing first
if (!success && fs.exists(srcPath) && fs.exists(destPath)) {
LOG.debug(LOG_DELETING_EXISTING_FILE, Path.getPathWithoutSchemeAndAuthority(destPath));
fs.delete(destPath, false); // not recursive
success = fs.rename(srcPath, destPath);
}

} else {
throw new IOException(
String.format(
"Unable to rename resource %s to %s. No further information provided by underlying filesystem.",
srcPath, destPath));
if (!success) {
if (!fs.exists(srcPath)) {
throw new FileNotFoundException(
String.format(
"Unable to rename resource %s to %s as source not found.", srcPath, destPath));

} else if (fs.exists(destPath)) {
throw new FileAlreadyExistsException(
String.format(
"Unable to rename resource %s to %s as destination already exists and couldn't be deleted.",
srcPath, destPath));

} else {
throw new IOException(
String.format(
"Unable to rename resource %s to %s. No further information provided by underlying filesystem.",
srcPath, destPath));
}
}
}
}
}

/** Ensures that the target directory exists for the given filePath. */
private void mkdirs(Path filePath) throws IOException {
final org.apache.hadoop.fs.FileSystem fs = filePath.getFileSystem(configuration);
final Path targetDirectory = filePath.getParent();
if (!fs.exists(targetDirectory)) {
LOG.debug(LOG_CREATE_DIRECTORY, Path.getPathWithoutSchemeAndAuthority(targetDirectory));
if (!fs.mkdirs(targetDirectory)) {
throw new IOException(
String.format(
"Unable to create target directory %s. No further information provided by underlying filesystem.",
targetDirectory));
synchronized (configuration) {
final org.apache.hadoop.fs.FileSystem fs = filePath.getFileSystem(configuration);
final Path targetDirectory = filePath.getParent();
if (!fs.exists(targetDirectory)) {
LOG.debug(LOG_CREATE_DIRECTORY, Path.getPathWithoutSchemeAndAuthority(targetDirectory));
if (!fs.mkdirs(targetDirectory)) {
throw new IOException(
String.format(
"Unable to create target directory %s. No further information provided by underlying filesystem.",
targetDirectory));
}
}
}
}
Expand Down

0 comments on commit 2e84a2c

Please sign in to comment.