Skip to content

Commit

Permalink
Add option to set group for distcp-ng
Browse files Browse the repository at this point in the history
  • Loading branch information
pcadabam-zz committed Feb 1, 2016
1 parent 351ae77 commit 8b5a374
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,68 @@

package gobblin.data.management.copy;

import java.util.Properties;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import org.apache.hadoop.fs.Path;

import com.google.common.base.Optional;


/**
* Configuration for Gobblin distcp jobs.
*/
@Data
@AllArgsConstructor
@Builder
public class CopyConfiguration {

public static final String COPY_PREFIX = "gobblin.copy";
public static final String PRESERVE_ATTRIBUTES_KEY = COPY_PREFIX + ".preserved.attributes";
public static final String DESTINATION_GROUP_KEY = COPY_PREFIX + ".dataset.destination.group";

/**
* Directory where dataset should be replicated.
* This directory corresponds to the {@link CopyableDataset#datasetRoot} in the new location.
* Directory where dataset should be replicated. This directory corresponds to the {@link CopyableDataset#datasetRoot}
* in the new location.
*/
private final Path targetRoot;

/**
* Preserve options passed by the user.
*/
private final PreserveAttributes preserve;

/**
* {@link CopyContext} for this job.
*/
private final CopyContext copyContext;

private final Optional<String> targetGroup;

public static class CopyConfigurationBuilder {

private PreserveAttributes preserve;
private Optional<String> targetGroup;
private CopyContext copyContext;

public CopyConfigurationBuilder(Properties properties) {
this.targetGroup =
properties.containsKey(DESTINATION_GROUP_KEY) ? Optional.of(properties.getProperty(DESTINATION_GROUP_KEY))
: Optional.<String> absent();
this.preserve = PreserveAttributes.fromMnemonicString(properties.getProperty(PRESERVE_ATTRIBUTES_KEY));
this.copyContext = new CopyContext();
}
}

public static CopyConfigurationBuilder builder() {
return new CopyConfigurationBuilder(new Properties());
}

public static CopyConfigurationBuilder builder(Properties properties) {
return new CopyConfigurationBuilder(properties);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@
public class CopySource extends AbstractSource<String, FileAwareInputStream> {

public static final String DEFAULT_DATASET_PROFILE_CLASS_KEY = CopyableGlobDatasetFinder.class.getCanonicalName();
private static final String COPY_PREFIX = "gobblin.copy";
public static final String SERIALIZED_COPYABLE_FILE = COPY_PREFIX + ".serialized.copyable.file";
public static final String SERIALIZED_COPYABLE_DATASET = COPY_PREFIX + ".serialized.copyable.datasets";
public static final String PRESERVE_ATTRIBUTES_KEY = COPY_PREFIX + ".preserved.attributes";
public static final String WORK_UNIT_GUID = COPY_PREFIX + ".work.unit.guid";
public static final String SERIALIZED_COPYABLE_FILE = CopyConfiguration.COPY_PREFIX + ".serialized.copyable.file";
public static final String SERIALIZED_COPYABLE_DATASET = CopyConfiguration.COPY_PREFIX + ".serialized.copyable.datasets";
public static final String WORK_UNIT_GUID = CopyConfiguration.COPY_PREFIX + ".work.unit.guid";

/**
* <ul>
Expand Down Expand Up @@ -105,14 +103,16 @@ public List<WorkUnit> getWorkunits(SourceState state) {

Path targetRoot = getTargetRoot(state, datasetFinder, copyableDataset);

CopyConfiguration copyConfiguration = new CopyConfiguration(targetRoot,
PreserveAttributes.fromMnemonicString(state.getProp(PRESERVE_ATTRIBUTES_KEY)), copyContext);
CopyConfiguration copyConfiguration =
CopyConfiguration.builder(state.getProperties()).targetRoot(targetRoot).copyContext(copyContext).build();



Collection<CopyableFile> files = copyableDataset.getCopyableFiles(targetFs, copyConfiguration);
Collection<Partition<CopyableFile>> partitions = partitionCopyableFiles(files);

for (Partition<CopyableFile> partition : partitions) {
Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, COPY_PREFIX, partition.getName());
Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, CopyConfiguration.COPY_PREFIX, partition.getName());
for (CopyableFile copyableFile : partition.getFiles()) {
WorkUnit workUnit = new WorkUnit(extract);
workUnit.addAll(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,14 @@ public CopyableFile build() throws IOException {

if (this.destinationOwnerAndPermission == null) {
String owner = this.preserve.preserve(Option.OWNER) ? this.origin.getOwner() : null;
String group = this.preserve.preserve(Option.GROUP) ? this.origin.getGroup() : null;

String group = null;
if (this.preserve.preserve(Option.GROUP)) {
group = this.origin.getGroup();
} else if (this.configuration.getTargetGroup().isPresent()) {
group = this.configuration.getTargetGroup().get();
}

FsPermission permission = this.preserve.preserve(Option.PERMISSION) ? this.origin.getPermission() : null;

this.destinationOwnerAndPermission =
Expand Down Expand Up @@ -223,9 +230,15 @@ private List<OwnerAndPermission> replicateOwnerAndPermission(final FileSystem or
return new OwnerAndPermission(fs.getOwner(), fs.getGroup(), fs.getPermission());
}
});

String group = null;
if (this.preserve.preserve(Option.GROUP)) {
group = ownerAndPermission.getGroup();
} else if (this.configuration.getTargetGroup().isPresent()) {
group = this.configuration.getTargetGroup().get();
}
ancestorOwnerAndPermissions.add(new OwnerAndPermission(
preserve.preserve(Option.OWNER) ? ownerAndPermission.getOwner() : null,
preserve.preserve(Option.GROUP) ? ownerAndPermission.getGroup() : null,
preserve.preserve(Option.OWNER) ? ownerAndPermission.getOwner() : null, group,
preserve.preserve(Option.PERMISSION) ? ownerAndPermission.getFsPermission() : null));
}
} catch (ExecutionException ee) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public void testCopyableFileBuilderMinimumConfiguration()
Path relativePath = PathUtils.relativizePath(originFile, datasetRoot);
Path targetPath = new Path(targetRoot, relativePath);

CopyConfiguration copyConfiguration = new CopyConfiguration(new Path(targetRoot), preserveAttributes,
new CopyContext());
CopyConfiguration copyConfiguration =
CopyConfiguration.builder().targetRoot(new Path(targetRoot)).preserve(preserveAttributes).build();

CopyableFile copyableFile = CopyableFile.builder(originFS, origin, datasetRoot, copyConfiguration)
.build();
Expand Down Expand Up @@ -135,8 +135,8 @@ public void testCopyableFileBuilderMaximumConfiguration()
Path relativePath = PathUtils.relativizePath(originFile, datasetRoot);
Path targetPath = new Path(targetRoot, relativePath);

CopyConfiguration copyConfiguration = new CopyConfiguration(new Path(targetRoot), preserveAttributes,
new CopyContext());
CopyConfiguration copyConfiguration =
CopyConfiguration.builder().targetRoot(new Path(targetRoot)).preserve(preserveAttributes).build();

// Other attributes
String fileSet = "fileset";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ public void testGetCopyableFiles() throws Exception {

RecursiveCopyableDataset dataset = new RecursiveCopyableDataset(FileSystem.getLocal(new Configuration()), new Path(baseDir), properties);

CopyConfiguration copyConfiguration = new CopyConfiguration(new Path(destinationDir),
PreserveAttributes.fromMnemonicString("ugp"), new CopyContext());
CopyConfiguration copyConfiguration =
CopyConfiguration.builder().targetRoot(new Path(destinationDir))
.preserve(PreserveAttributes.fromMnemonicString("ugp")).build();

Collection<CopyableFile> files = dataset.getCopyableFiles(FileSystem.getLocal(new Configuration()), copyConfiguration);

Assert.assertEquals(files.size(), 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ private CopyableFile getTestCopyableFile(String resourcePath) throws IOException
String filePath = getClass().getClassLoader().getResource(resourcePath).getFile();
FileStatus status = new FileStatus(0l, false, 0, 0l, 0l, new Path(filePath));
return CopyableFile.builder(FileSystem.getLocal(new Configuration()), status, new Path("/"),
new CopyConfiguration(new Path("/"), PreserveAttributes.fromMnemonicString(""), new CopyContext())).build();
CopyConfiguration.builder().targetRoot(new Path("/")).preserve(PreserveAttributes.fromMnemonicString("")).build()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public TestDatasetManager(Path testMethodTempPath, State state, String datasetTa

FileStatus file = new FileStatus(0, false, 0, 0, 0, new Path("/file"));
this.copyableFile = CopyableFile.builder(FileSystem.getLocal(new Configuration()), file, new Path("/"),
new CopyConfiguration(new Path("/"), PreserveAttributes.fromMnemonicString(""), new CopyContext())).build();
CopyConfiguration.builder().targetRoot(new Path("/")).preserve(PreserveAttributes.fromMnemonicString("")).build()).build();

fs.mkdirs(testMethodTempPath);
log.info("Created a temp directory for test at " + testMethodTempPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

package gobblin.data.management.copy.recovery;

import junit.framework.Assert;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
Expand All @@ -24,6 +22,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -86,7 +85,7 @@ public class RecoveryHelperTest {

CopyableFile copyableFile = CopyableFile.builder(fs,
new FileStatus(0, false, 0, 0, 0, new Path("/file")), new Path("/dataset"),
new CopyConfiguration(new Path("/target"), PreserveAttributes.fromMnemonicString(""), new CopyContext())).build();
CopyConfiguration.builder().targetRoot(new Path("/target")).preserve(PreserveAttributes.fromMnemonicString("")).build()).build();

CopySource.setWorkUnitGuid(state, Guid.fromHasGuid(copyableFile));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testCommit() throws IOException {
ancestorOwnerAndPermissions.add(ownerAndPermission);
ancestorOwnerAndPermissions.add(ownerAndPermission);
CopyableFile cf = CopyableFile.builder(this.fs, status, new Path("/dataset"),
new CopyConfiguration(new Path("/target"), PreserveAttributes.fromMnemonicString(""), new CopyContext()))
CopyConfiguration.builder().targetRoot(new Path("/target")).preserve(PreserveAttributes.fromMnemonicString("")).build())
.destination(destination)
.destinationOwnerAndPermission(ownerAndPermission)
.ancestorsOwnerAndPermission(ancestorOwnerAndPermissions)
Expand Down

0 comments on commit 8b5a374

Please sign in to comment.