diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopyConfiguration.java index 96c3ffbae0f..9fafdf1dde9 100644 --- a/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopyConfiguration.java +++ b/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopyConfiguration.java @@ -12,29 +12,68 @@ package gobblin.data.management.copy; +import java.util.Properties; + +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import org.apache.hadoop.fs.Path; +import com.google.common.base.Optional; + /** * Configuration for Gobblin distcp jobs. */ @Data +@AllArgsConstructor +@Builder public class CopyConfiguration { + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String PRESERVE_ATTRIBUTES_KEY = COPY_PREFIX + ".preserved.attributes"; + public static final String DESTINATION_GROUP_KEY = COPY_PREFIX + ".dataset.destination.group"; + /** - * Directory where dataset should be replicated. - * This directory corresponds to the {@link CopyableDataset#datasetRoot} in the new location. + * Directory where dataset should be replicated. This directory corresponds to the {@link CopyableDataset#datasetRoot} + * in the new location. */ private final Path targetRoot; + /** * Preserve options passed by the user. */ private final PreserveAttributes preserve; + /** * {@link CopyContext} for this job. */ private final CopyContext copyContext; + private final Optional targetGroup; + + public static class CopyConfigurationBuilder { + + private PreserveAttributes preserve; + private Optional targetGroup; + private CopyContext copyContext; + + public CopyConfigurationBuilder(Properties properties) { + this.targetGroup = + properties.containsKey(DESTINATION_GROUP_KEY) ? Optional.of(properties.getProperty(DESTINATION_GROUP_KEY)) + : Optional. absent(); + this.preserve = PreserveAttributes.fromMnemonicString(properties.getProperty(PRESERVE_ATTRIBUTES_KEY)); + this.copyContext = new CopyContext(); + } + } + + public static CopyConfigurationBuilder builder() { + return new CopyConfigurationBuilder(new Properties()); + } + + public static CopyConfigurationBuilder builder(Properties properties) { + return new CopyConfigurationBuilder(properties); + } + } diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopySource.java index 5cd02cce612..9f401da73bb 100644 --- a/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopySource.java +++ b/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopySource.java @@ -64,11 +64,9 @@ public class CopySource extends AbstractSource { public static final String DEFAULT_DATASET_PROFILE_CLASS_KEY = CopyableGlobDatasetFinder.class.getCanonicalName(); - private static final String COPY_PREFIX = "gobblin.copy"; - public static final String SERIALIZED_COPYABLE_FILE = COPY_PREFIX + ".serialized.copyable.file"; - public static final String SERIALIZED_COPYABLE_DATASET = COPY_PREFIX + ".serialized.copyable.datasets"; - public static final String PRESERVE_ATTRIBUTES_KEY = COPY_PREFIX + ".preserved.attributes"; - public static final String WORK_UNIT_GUID = COPY_PREFIX + ".work.unit.guid"; + public static final String SERIALIZED_COPYABLE_FILE = CopyConfiguration.COPY_PREFIX + ".serialized.copyable.file"; + public static final String SERIALIZED_COPYABLE_DATASET = CopyConfiguration.COPY_PREFIX + ".serialized.copyable.datasets"; + public static final String WORK_UNIT_GUID = CopyConfiguration.COPY_PREFIX + ".work.unit.guid"; /** *
    @@ -105,14 +103,16 @@ public List getWorkunits(SourceState state) { Path targetRoot = getTargetRoot(state, datasetFinder, copyableDataset); - CopyConfiguration copyConfiguration = new CopyConfiguration(targetRoot, - PreserveAttributes.fromMnemonicString(state.getProp(PRESERVE_ATTRIBUTES_KEY)), copyContext); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(state.getProperties()).targetRoot(targetRoot).copyContext(copyContext).build(); + + Collection files = copyableDataset.getCopyableFiles(targetFs, copyConfiguration); Collection> partitions = partitionCopyableFiles(files); for (Partition partition : partitions) { - Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, COPY_PREFIX, partition.getName()); + Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, CopyConfiguration.COPY_PREFIX, partition.getName()); for (CopyableFile copyableFile : partition.getFiles()) { CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(copyableDataset, targetRoot); diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopyableFile.java index 019ff2b3781..e199ab62ffd 100644 --- a/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/gobblin/data/management/copy/CopyableFile.java @@ -177,7 +177,14 @@ public CopyableFile build() throws IOException { if (this.destinationOwnerAndPermission == null) { String owner = this.preserve.preserve(Option.OWNER) ? this.origin.getOwner() : null; - String group = this.preserve.preserve(Option.GROUP) ? this.origin.getGroup() : null; + + String group = null; + if (this.preserve.preserve(Option.GROUP)) { + group = this.origin.getGroup(); + } else if (this.configuration.getTargetGroup().isPresent()) { + group = this.configuration.getTargetGroup().get(); + } + FsPermission permission = this.preserve.preserve(Option.PERMISSION) ? this.origin.getPermission() : null; this.destinationOwnerAndPermission = @@ -223,9 +230,15 @@ private List replicateOwnerAndPermission(final FileSystem or return new OwnerAndPermission(fs.getOwner(), fs.getGroup(), fs.getPermission()); } }); + + String group = null; + if (this.preserve.preserve(Option.GROUP)) { + group = ownerAndPermission.getGroup(); + } else if (this.configuration.getTargetGroup().isPresent()) { + group = this.configuration.getTargetGroup().get(); + } ancestorOwnerAndPermissions.add(new OwnerAndPermission( - preserve.preserve(Option.OWNER) ? ownerAndPermission.getOwner() : null, - preserve.preserve(Option.GROUP) ? ownerAndPermission.getGroup() : null, + preserve.preserve(Option.OWNER) ? ownerAndPermission.getOwner() : null, group, preserve.preserve(Option.PERMISSION) ? ownerAndPermission.getFsPermission() : null)); } } catch (ExecutionException ee) { diff --git a/gobblin-data-management/src/test/java/gobblin/data/management/copy/CopyableFileTest.java b/gobblin-data-management/src/test/java/gobblin/data/management/copy/CopyableFileTest.java index 51af01cd13f..431ac33367a 100644 --- a/gobblin-data-management/src/test/java/gobblin/data/management/copy/CopyableFileTest.java +++ b/gobblin-data-management/src/test/java/gobblin/data/management/copy/CopyableFileTest.java @@ -90,8 +90,8 @@ public void testCopyableFileBuilderMinimumConfiguration() Path relativePath = PathUtils.relativizePath(originFile, datasetRoot); Path targetPath = new Path(targetRoot, relativePath); - CopyConfiguration copyConfiguration = new CopyConfiguration(new Path(targetRoot), preserveAttributes, - new CopyContext()); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder().targetRoot(new Path(targetRoot)).preserve(preserveAttributes).build(); CopyableFile copyableFile = CopyableFile.builder(originFS, origin, datasetRoot, copyConfiguration) .build(); @@ -135,8 +135,8 @@ public void testCopyableFileBuilderMaximumConfiguration() Path relativePath = PathUtils.relativizePath(originFile, datasetRoot); Path targetPath = new Path(targetRoot, relativePath); - CopyConfiguration copyConfiguration = new CopyConfiguration(new Path(targetRoot), preserveAttributes, - new CopyContext()); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder().targetRoot(new Path(targetRoot)).preserve(preserveAttributes).build(); // Other attributes String fileSet = "fileset"; diff --git a/gobblin-data-management/src/test/java/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java b/gobblin-data-management/src/test/java/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java index e49cc02f42f..17490feee5f 100644 --- a/gobblin-data-management/src/test/java/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java +++ b/gobblin-data-management/src/test/java/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java @@ -41,8 +41,10 @@ public void testGetCopyableFiles() throws Exception { RecursiveCopyableDataset dataset = new RecursiveCopyableDataset(FileSystem.getLocal(new Configuration()), new Path(baseDir), properties); - CopyConfiguration copyConfiguration = new CopyConfiguration(new Path(destinationDir), - PreserveAttributes.fromMnemonicString("ugp"), new CopyContext()); + CopyConfiguration copyConfiguration = + CopyConfiguration.builder().targetRoot(new Path(destinationDir)) + .preserve(PreserveAttributes.fromMnemonicString("ugp")).build(); + Collection files = dataset.getCopyableFiles(FileSystem.getLocal(new Configuration()), copyConfiguration); Assert.assertEquals(files.size(), 3); diff --git a/gobblin-data-management/src/test/java/gobblin/data/management/copy/extractor/InputStreamExtractorTest.java b/gobblin-data-management/src/test/java/gobblin/data/management/copy/extractor/InputStreamExtractorTest.java index 2ad7138a056..962c6c40044 100644 --- a/gobblin-data-management/src/test/java/gobblin/data/management/copy/extractor/InputStreamExtractorTest.java +++ b/gobblin-data-management/src/test/java/gobblin/data/management/copy/extractor/InputStreamExtractorTest.java @@ -49,6 +49,6 @@ private CopyableFile getTestCopyableFile(String resourcePath) throws IOException String filePath = getClass().getClassLoader().getResource(resourcePath).getFile(); FileStatus status = new FileStatus(0l, false, 0, 0l, 0l, new Path(filePath)); return CopyableFile.builder(FileSystem.getLocal(new Configuration()), status, new Path("/"), - new CopyConfiguration(new Path("/"), PreserveAttributes.fromMnemonicString(""), new CopyContext())).build(); + CopyConfiguration.builder().targetRoot(new Path("/")).preserve(PreserveAttributes.fromMnemonicString("")).build()).build(); } } diff --git a/gobblin-data-management/src/test/java/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java b/gobblin-data-management/src/test/java/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java index 93341e87af6..340c6b240b0 100644 --- a/gobblin-data-management/src/test/java/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java +++ b/gobblin-data-management/src/test/java/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java @@ -250,7 +250,7 @@ public TestDatasetManager(Path testMethodTempPath, State state, String datasetTa FileStatus file = new FileStatus(0, false, 0, 0, 0, new Path("/file")); this.copyableFile = CopyableFile.builder(FileSystem.getLocal(new Configuration()), file, new Path("/"), - new CopyConfiguration(new Path("/"), PreserveAttributes.fromMnemonicString(""), new CopyContext())).build(); + CopyConfiguration.builder().targetRoot(new Path("/")).preserve(PreserveAttributes.fromMnemonicString("")).build()).build(); fs.mkdirs(testMethodTempPath); log.info("Created a temp directory for test at " + testMethodTempPath); diff --git a/gobblin-data-management/src/test/java/gobblin/data/management/copy/recovery/RecoveryHelperTest.java b/gobblin-data-management/src/test/java/gobblin/data/management/copy/recovery/RecoveryHelperTest.java index abbcb1d4351..ba5b785c1ce 100644 --- a/gobblin-data-management/src/test/java/gobblin/data/management/copy/recovery/RecoveryHelperTest.java +++ b/gobblin-data-management/src/test/java/gobblin/data/management/copy/recovery/RecoveryHelperTest.java @@ -12,8 +12,6 @@ package gobblin.data.management.copy.recovery; -import junit.framework.Assert; - import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -24,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -86,7 +85,7 @@ public class RecoveryHelperTest { CopyableFile copyableFile = CopyableFile.builder(fs, new FileStatus(0, false, 0, 0, 0, new Path("/file")), new Path("/dataset"), - new CopyConfiguration(new Path("/target"), PreserveAttributes.fromMnemonicString(""), new CopyContext())).build(); + CopyConfiguration.builder().targetRoot(new Path("/target")).preserve(PreserveAttributes.fromMnemonicString("")).build()).build(); CopySource.setWorkUnitGuid(state, Guid.fromHasGuid(copyableFile)); diff --git a/gobblin-data-management/src/test/java/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java b/gobblin-data-management/src/test/java/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java index f2dbdcf12e7..5de4d75668e 100644 --- a/gobblin-data-management/src/test/java/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java +++ b/gobblin-data-management/src/test/java/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java @@ -128,7 +128,7 @@ public void testCommit() throws IOException { ancestorOwnerAndPermissions.add(ownerAndPermission); ancestorOwnerAndPermissions.add(ownerAndPermission); CopyableFile cf = CopyableFile.builder(this.fs, status, new Path("/dataset"), - new CopyConfiguration(new Path("/target"), PreserveAttributes.fromMnemonicString(""), new CopyContext())) + CopyConfiguration.builder().targetRoot(new Path("/target")).preserve(PreserveAttributes.fromMnemonicString("")).build()) .destination(destination) .destinationOwnerAndPermission(ownerAndPermission) .ancestorsOwnerAndPermission(ancestorOwnerAndPermissions)