diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle index 1832ac909de..5e67f156030 100644 --- a/gobblin-temporal/build.gradle +++ b/gobblin-temporal/build.gradle @@ -67,11 +67,14 @@ dependencies { testCompile project(":gobblin-example") testCompile externalDependency.testng - testCompile externalDependency.mockito + testCompile externalDependency.mockitoInline + testCompile externalDependency.powerMockApi + testCompile externalDependency.powerMockModule testCompile externalDependency.hadoopYarnMiniCluster testCompile externalDependency.curatorFramework testCompile externalDependency.curatorTest + testCompile ('com.google.inject:guice:3.0') { force = true } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java index d568ea4e5b3..9ffd5a9883f 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java @@ -66,7 +66,7 @@ public CommitStats commit(WUProcessingSpec workSpec) { return commitGobblinStats; } - private List convertDatasetStatsToTaskSummaries(Map datasetStats) { + List convertDatasetStatsToTaskSummaries(Map datasetStats) { List datasetTaskSummaries = new ArrayList<>(); for (Map.Entry entry : datasetStats.entrySet()) { datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(), entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(), entry.getValue().isSuccessfullyCommitted())); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java index 6bef7a609cb..dc600b0abcc 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java @@ -104,7 +104,7 @@ public int performWorkload( /** Factory for invoking the specific activity by providing it args via {@link Async::function} */ protected abstract Promise launchAsyncActivity(WORK_ITEM task); - protected NestingExecWorkflow createChildWorkflow(final WorkflowAddr childAddr) { + public NestingExecWorkflow createChildWorkflow(final WorkflowAddr childAddr) { // preserve the current workflow ID of this parent, but add the (hierarchical) address extension specific to each child String thisWorkflowId = Workflow.getInfo().getWorkflowId(); String childWorkflowId = thisWorkflowId.replaceAll("-[^-]+$", "") + "-" + childAddr; @@ -116,7 +116,7 @@ protected NestingExecWorkflow createChildWorkflow(final WorkflowAddr } /** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */ - protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) { + public Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) { // (only pause when an appreciable number of leaves) // TODO: use a configuration value, for simpler adjustment, rather than hard-code return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT @@ -130,11 +130,8 @@ protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChi * List naiveUniformity = Collections.nCopies(numSubTreesPerSubTree, numSubTreeChildren); * @return each sub-tree's desired size, in ascending sub-tree order */ - protected static List consolidateSubTreeGrandChildren( - final int numSubTreesPerSubTree, - final int numChildrenTotal, - final int numSubTreeChildren - ) { + public static List consolidateSubTreeGrandChildren(final int numSubTreesPerSubTree, + final int numChildrenTotal, final int numSubTreeChildren) { if (numSubTreesPerSubTree <= 0) { return Lists.newArrayList(); } else if (isSqrt(numSubTreeChildren, numChildrenTotal)) { diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java deleted file mode 100644 index 86c5ac12de1..00000000000 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.temporal.ddm.activity.impl; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -import org.testng.Assert; -import org.testng.annotations.Test; - -import org.apache.gobblin.source.workunit.BasicWorkUnitStream; -import org.apache.gobblin.source.workunit.MultiWorkUnit; -import org.apache.gobblin.source.workunit.WorkUnit; -import org.apache.gobblin.source.workunit.WorkUnitStream; - - -public class GenerateWorkUnitsImplTest { - - @Test - public void testFetchesWorkDirsFromWorkUnits() { - List workUnits = new ArrayList<>(); - for (int i = 0; i < 5; i++) { - WorkUnit workUnit = new WorkUnit(); - workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/" + i); - workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/" + i); - workUnit.setProp("qualitychecker.row.err.file", "/tmp/jobId/row-err/file"); - workUnit.setProp("qualitychecker.clean.err.dir", "true"); - workUnits.add(workUnit); - } - WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits) - .setFiniteStream(true) - .build(); - Set output = GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream); - Assert.assertEquals(output.size(), 11); - } - - @Test - public void testFetchesWorkDirsFromMultiWorkUnits() { - List workUnits = new ArrayList<>(); - for (int i = 0; i < 5; i++) { - MultiWorkUnit multiWorkUnit = new MultiWorkUnit(); - for (int j = 0; j < 3; j++) { - WorkUnit workUnit = new WorkUnit(); - workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/"); - workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/"); - workUnit.setProp("qualitychecker.row.err.file", "/tmp/jobId/row-err/file"); - workUnit.setProp("qualitychecker.clean.err.dir", "true"); - multiWorkUnit.addWorkUnit(workUnit); - } - workUnits.add(multiWorkUnit); - } - WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits) - .setFiniteStream(true) - .build(); - Set output = GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream); - Assert.assertEquals(output.size(), 3); - } - - @Test - public void testFetchesUniqueWorkDirsFromMultiWorkUnits() { - List workUnits = new ArrayList<>(); - for (int i = 0; i < 5; i++) { - MultiWorkUnit multiWorkUnit = new MultiWorkUnit(); - for (int j = 0; j < 3; j++) { - WorkUnit workUnit = new WorkUnit(); - // Each MWU will have its own staging and output dir - workUnit.setProp("writer.staging.dir", "/tmp/jobId/" + i + "/task-staging/"); - workUnit.setProp("writer.output.dir", "/tmp/jobId/" + i + "task-output/"); - workUnit.setProp("qualitychecker.row.err.file", "/tmp/jobId/row-err/file"); - workUnit.setProp("qualitychecker.clean.err.dir", "true"); - multiWorkUnit.addWorkUnit(workUnit); - } - workUnits.add(multiWorkUnit); - } - WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits) - .setFiniteStream(true) - .build(); - Set output = GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream); - Assert.assertEquals(output.size(), 11); - } -} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java new file mode 100644 index 00000000000..62e17bcea40 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/utils/JobStateUtilTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.temporal.ddm.utils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.source.Source; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; + + +public class JobStateUtilTest { + + private JobState jobState; + private FileSystem fileSystem; + + @BeforeMethod + public void setUp() { + jobState = Mockito.mock(JobState.class); + fileSystem = Mockito.mock(FileSystem.class); + } + + @Test + public void testOpenFileSystem() throws IOException { + + Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test"); + Mockito.when(jobState.getProperties()).thenReturn(new Properties()); + + FileSystem fs = JobStateUtils.openFileSystem(jobState); + + Assert.assertNotNull(fs); + Mockito.verify(jobState,Mockito.times(1)).getProp(Mockito.anyString(), Mockito.anyString()); + } + + @Test + public void testCreateSource() throws ReflectiveOperationException { + Mockito.when(jobState.getProp(Mockito.anyString())) + .thenReturn("org.apache.gobblin.source.extractor.filebased.TextFileBasedSource"); + Source source = JobStateUtils.createSource(jobState); + Assert.assertNotNull(source); + } + + @Test + public void testOpenTaskStateStoreUncached() throws URISyntaxException { + Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("file:///test"); + Mockito.when(jobState.getJobId()).thenReturn("testJobId"); + Mockito.when(jobState.getJobName()).thenReturn("testJobName"); + Mockito.when(fileSystem.makeQualified(Mockito.any())) + .thenReturn(new Path("file:///test/testJobName/testJobId/output")); + Mockito.when(fileSystem.getUri()).thenReturn(new URI("file:///test/testJobName/testJobId/output")); + + StateStore stateStore = JobStateUtils.openTaskStateStoreUncached(jobState, fileSystem); + + Assert.assertNotNull(stateStore); + } + + @Test + public void testGetFileSystemUri() { + Mockito.when(jobState.getProp(Mockito.anyString(), Mockito.anyString())).thenReturn("file:///test"); + URI fsUri = JobStateUtils.getFileSystemUri(jobState); + Assert.assertEquals(URI.create("file:///test"), fsUri); + Mockito.verify(jobState).getProp(Mockito.anyString(), Mockito.anyString()); + } + + @Test + public void testGetWorkDirRoot() { + Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); + Mockito.when(jobState.getJobName()).thenReturn("testJob"); + Mockito.when(jobState.getJobId()).thenReturn("jobId123"); + Path rootPath = JobStateUtils.getWorkDirRoot(jobState); + Assert.assertEquals(new Path("/tmp/testJob/jobId123"), rootPath); + Mockito.verify(jobState, Mockito.times(1)).getProp(Mockito.anyString()); + } + + @Test + public void testGetWorkUnitsPath() { + Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); + Mockito.when(jobState.getJobName()).thenReturn("testJob"); + Mockito.when(jobState.getJobId()).thenReturn("jobId123"); + Path workUnitsPath = JobStateUtils.getWorkUnitsPath(jobState); + Assert.assertEquals(new Path("/tmp/testJob/jobId123/input"), workUnitsPath); + } + + @Test + public void testGetTaskStateStorePath() throws IOException { + Mockito.when(fileSystem.makeQualified(Mockito.any(Path.class))).thenReturn(new Path("/qualified/path")); + Mockito.when(jobState.getProp(Mockito.anyString())).thenReturn("/tmp"); + Mockito.when(jobState.getJobName()).thenReturn("testJob"); + Mockito.when(jobState.getJobId()).thenReturn("jobId123"); + Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, fileSystem); + Assert.assertEquals(new Path("/qualified/path"), taskStateStorePath); + } + + @Test + public void testWriteJobState() throws IOException { + Path workDirRootPath = new Path("/tmp"); + FSDataOutputStream dos = Mockito.mock(FSDataOutputStream.class); + Mockito.when(fileSystem.create(Mockito.any(Path.class))).thenReturn(dos); + + JobStateUtils.writeJobState(jobState, workDirRootPath, fileSystem); + + Mockito.verify(fileSystem).create(Mockito.any(Path.class)); + Mockito.verify(jobState).write(Mockito.any(DataOutputStream.class), Mockito.anyBoolean(), Mockito.anyBoolean()); + } + + @Test + public void testGetSharedResourcesBroker() { + Mockito.when(jobState.getProperties()).thenReturn(System.getProperties()); + Mockito.when(jobState.getJobName()).thenReturn("testJob"); + Mockito.when(jobState.getJobId()).thenReturn("jobId123"); + Assert.assertNotNull(JobStateUtils.getSharedResourcesBroker(jobState)); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java new file mode 100644 index 00000000000..4d0ae66f0be --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/AbstractNestingExecWorkflowImplTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow.impl; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; + +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import io.temporal.workflow.Async; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; + +import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; +import org.apache.gobblin.temporal.util.nesting.work.Workload; +import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest(Workflow.class) +public class AbstractNestingExecWorkflowImplTest { + + @Mock + private Workload mockWorkload; + + @Mock + private WorkflowAddr mockWorkflowAddr; + + @Mock + private Workload.WorkSpan mockWorkSpan; + + @Mock + private Promise mockPromise; + + private AbstractNestingExecWorkflowImpl workflow; + + @BeforeClass + public void setup() { + // PowerMockito is required to mock static methods in the Workflow class + Mockito.mockStatic(Workflow.class); + Mockito.mockStatic(Async.class); + Mockito.mockStatic(Promise.class); + this.mockWorkload = Mockito.mock(Workload.class); + this.mockWorkflowAddr = Mockito.mock(WorkflowAddr.class); + this.mockWorkSpan = Mockito.mock(Workload.WorkSpan.class); + this.mockPromise = Mockito.mock(Promise.class); + + workflow = new AbstractNestingExecWorkflowImpl() { + @Override + protected Promise launchAsyncActivity(String task) { + return mockPromise; + } + }; + } + + @Test + public void testPerformWorkload_NoWorkSpan() { + // Arrange + Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.empty()); + + // Act + int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty()); + + // Assert + Assert.assertEquals(0, result); + Mockito.verify(mockWorkload, Mockito.times(2)).getSpan(0, 5); + } + + @Test + public void testCalcPauseDurationBeforeCreatingSubTree_NoPause() { + // Act + Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(50); + + // Assert + Assert.assertEquals(Duration.ZERO, result); + } + + @Test + public void testCalcPauseDurationBeforeCreatingSubTree_PauseRequired() { + // Act + Duration result = workflow.calcPauseDurationBeforeCreatingSubTree(150); + + // Assert + Assert.assertEquals( + Duration.ofSeconds(AbstractNestingExecWorkflowImpl.NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT), + result); + } + + @Test + public void testConsolidateSubTreeGrandChildren() { + // Act + List result = AbstractNestingExecWorkflowImpl.consolidateSubTreeGrandChildren(3, 10, 2); + + // Assert + Assert.assertEquals(3, result.size()); + Assert.assertEquals(Integer.valueOf(0), result.get(0)); + Assert.assertEquals(Integer.valueOf(0), result.get(1)); + Assert.assertEquals(Integer.valueOf(6), result.get(2)); + } + + @Test(expectedExceptions = AssertionError.class) + public void testPerformWorkload_LaunchesChildWorkflows() { + // Arrange + Mockito.when(mockWorkload.getSpan(Mockito.anyInt(), Mockito.anyInt())).thenReturn(Optional.of(mockWorkSpan)); + Mockito.when(mockWorkSpan.getNumElems()).thenReturn(5); + Mockito.when(mockWorkSpan.next()).thenReturn("task1"); + Mockito.when(mockWorkload.isIndexKnownToExceed(Mockito.anyInt())).thenReturn(false); + + // Mock the child workflow + Mockito.when(Async.function(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), + Mockito.anyInt(), Mockito.any())).thenReturn(mockPromise); + Mockito.when(mockPromise.get()).thenReturn(5); + // Act + int result = workflow.performWorkload(mockWorkflowAddr, mockWorkload, 0, 10, 5, Optional.empty()); + } +}