Skip to content

HADOOP-18891 hadoop distcp needs support to filter by file/directory attribute #6070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,33 @@ public void initialize() {}
*/
public abstract boolean shouldCopy(Path path);

/**
* Predicate to determine if a fileStatus can be excluded from copy.
* The fileStatus object has various attrs, so it is convenient to do
* more complex thing.
*
* The behaviour of calling shouldCopy() is like this:
* if supportFileStatus() is true, then call shouldCopy(fileStatus)
* if supportFileStatus() is false, then call shouldCopy(path)
*
*
* @param fileStatus a FileStatus to be considered for copying
* @return boolean, true to copy, false to exclude
*/
public boolean shouldCopy(CopyListingFileStatus fileStatus){
return shouldCopy(fileStatus.getPath());
}

/**
* Indicate whether to use shouldCopy(fileStatus) or use shouldCopy(path)
* The default behaviour is to use shouldCopy(path).
*
* @return true, if call shouldCopy(fileStatus), or false.
*/
public boolean supportFileStatus(){
return false;
}

/**
* Public factory method which returns the appropriate implementation of
* CopyFilter.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;

import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
* A CopyFilter which checks if the FileStatus is a file or directory, the directory
* will be kept and the file will be filtered out.
*/
public class DirCopyFilter extends FileStatusCopyFilter {
private static final Logger LOG = LoggerFactory.getLogger(DirCopyFilter.class);
private Configuration conf;

/**
* Constructor of DirCopyFilter, it can be instantiated by CopyFilter#getCopyFilter method.
* @param conf Configuration.
*/
public DirCopyFilter(Configuration conf) {
this.conf = conf;
}

@Override
public boolean shouldCopy(Path path) {
try {
FileSystem fs = path.getFileSystem(this.conf);
if (fs.getFileStatus(path).isDirectory()) {
return true;
}
} catch (IOException e) {
throw new RuntimeException("Exception occurred when get FileSystem or get FileStatus", e);
}

LOG.debug("Skipping {} as it is not a directory", path);
return false;
}

@Override
public boolean shouldCopy(CopyListingFileStatus fileStatus) {
return fileStatus.isDirectory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private DistCpConstants() {
/* Total bytes to be copied. Updated by copylisting. Unfiltered count */
public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";

/* Total number of paths to copy, includes directories. Unfiltered count */
/* Total number of paths to be copied, includes directories. */
public static final String CONF_LABEL_TOTAL_NUMBER_OF_RECORDS = "mapred.number.of.records";

/* If input is based -f <<source listing>>, file containing the src paths */
Expand Down Expand Up @@ -185,7 +185,7 @@ private DistCpConstants() {
public static final int SPLIT_RATIO_DEFAULT = 2;

/**
* Constants for NONE file deletion
* Constants for NONE file deletion.
*/
public static final String NONE_PATH_NAME = "/NONE";
public static final Path NONE_PATH = new Path(NONE_PATH_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;

/**
* The default implement of FileStatus CopyFilter
*
* Each CopyFilter class likes to use shouldCopy(fileStatus) should be Subclass
* of this class.
*
*/
public abstract class FileStatusCopyFilter extends CopyFilter{

/**
* Always return true for FileStatusCopyFilter and its subsequent class
* to enable shouldCopy(fileStatus).
* @return return - for scan file status mode, always return true.
*/
@Override
public boolean supportFileStatus() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,20 @@ protected boolean shouldCopy(Path path) {
return copyFilter.shouldCopy(path);
}

/**
* Provide another option to skip copy of a path, allows for exclusion of files such as
* {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME}
*
* shouldCopy(path) and shouldCopy(fileStatus) are mutually exclusive. In other words,
* you should use shouldCopy(path) or shouldCopy(fileStatus), but not both.
*
* @param fileStatus - FileStatus being considered for copy while building the file listing
* @return - True if the fileStatus should be considered for copy, false otherwise
*/
protected boolean shouldCopy(CopyListingFileStatus fileStatus){
return copyFilter.shouldCopy(fileStatus);
}

/** {@inheritDoc} */
@Override
protected long getBytesToCopy() {
Expand Down Expand Up @@ -662,7 +676,13 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()),
fileStatus.getPath());

if (!shouldCopy(fileStatus.getPath())) {
// check if copyFilter to use shouldCopy(fileStatus) or shouldCopy(path)
// if true, use shouldCopy(fileStatus) or else use shouldCopy(path)
if(copyFilter.supportFileStatus()){
if(!shouldCopy(fileStatus)){
return;
}
}else if (!shouldCopy(fileStatus.getPath())) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.util.Preconditions;

/**
* UniformRecordInputFormat extends the InputFormat class, to produce
* record-splits for DistCp.
* It looks at the copy-listing and groups the contents into record-splits such
* that the total-records to be copied for each input split is uniform.
*/
public class UniformRecordInputFormat extends InputFormat<Text, CopyListingFileStatus> {
private static final Logger LOG = LoggerFactory.getLogger(UniformRecordInputFormat.class);

/**
* Implementation of InputFormat::getSplits(). Returns a list of InputSplits,
* such that the number of records to be copied for all the splits are
* approximately equal.
* @param context JobContext for the job.
* @return The list of uniformly-distributed input-splits.
* @throws IOException Exception Reading split file
* @throws InterruptedException Thread interrupted exception
*/
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
int numSplits = getNumSplits(conf);
if (numSplits == 0){
return new ArrayList<InputSplit>();
}

return createSplits(conf, numSplits, getNumberOfRecords(conf));
}

private List<InputSplit> createSplits(Configuration configuration, int numSplits, long numRecords)
throws IOException {
List<InputSplit> splits = new ArrayList(numSplits);
long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / numSplits);
if (LOG.isDebugEnabled()) {
LOG.debug("Average records per map: " + nRecordsPerSplit +
", Number of maps: " + numSplits + ", total records: " + numRecords);
}

Path listingFilePath = getListingFilePath(configuration);
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
long lastPosition = 0L;
long count = 0L;
long remains = numRecords - nRecordsPerSplit * (long) numSplits;

SequenceFile.Reader reader = null;
try {
reader = getListingFileReader(configuration);
while (reader.next(srcRelPath, srcFileStatus)) {
count++;

// a split's size must be nRecordsPerSplit or (nRecordsPerSplit + 1)
// the first N (num of remains) splits have a size of (nRecordsPerSplit + 1),
// the others have a size of nRecordsPerSplit
if ((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
(remains == 0 && count % nRecordsPerSplit == 0)) {

long currentPosition = reader.getPosition();
FileSplit split = new FileSplit(listingFilePath, lastPosition,
currentPosition - lastPosition, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating split: " + split + ", records in split: " + count);
}

splits.add(split);
lastPosition = currentPosition;
if (remains > 0) {
remains--;
}
count = 0L;
}
}

return splits;
} finally {
IOUtils.closeStream(reader);
}
}

/**
* Implementation of InputFormat::createRecordReader().
* @param split The split for which the RecordReader is sought.
* @param context The context of the current task-attempt.
* @return A SequenceFileRecordReader instance, (since the copy-listing is a
* simple sequence-file.)
* @throws IOException Exception Reading split file
* @throws InterruptedException Thread interrupted exception
*/
public RecordReader<Text, CopyListingFileStatus> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
}

private static Path getListingFilePath(Configuration configuration) {
String listingFilePathString =
configuration.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");

Preconditions.checkState(!listingFilePathString.equals(""),
"Listing file doesn't exist at %s", listingFilePathString);
return new Path(listingFilePathString);
}

private SequenceFile.Reader getListingFileReader(Configuration conf) {
Path listingFilePath = getListingFilePath(conf);

try {
FileSystem fs = listingFilePath.getFileSystem(conf);
Preconditions.checkState(fs.exists(listingFilePath),
"Listing file doesn't exist at: %s", listingFilePath);

return new SequenceFile.Reader(conf,
SequenceFile.Reader.file(listingFilePath));
} catch (IOException | IllegalStateException exception) {
LOG.error("Couldn't read listing file at: {}", listingFilePath, exception);
throw new IllegalStateException("Couldn't read listing file at: "
+ listingFilePath, exception);
}
}

private static long getNumberOfRecords(Configuration configuration) {
return DistCpUtils.getLong(configuration,
DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS);
}

private static int getNumSplits(Configuration configuration) {
return DistCpUtils.getInt(configuration, MRJobConfig.NUM_MAPS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

<configuration>

<property>
<name>distcp.record.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.UniformRecordInputFormat</value>
<description>Implementation of record input format</description>
</property>

<property>
<name>distcp.dynamic.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
Expand Down
Loading