From 20c434188ff0715110d5f708b9e16ed8612059c4 Mon Sep 17 00:00:00 2001 From: abhishekmjain Date: Mon, 7 Oct 2024 23:24:39 +0530 Subject: [PATCH] [GOBBLIN-2148] Add temporal workflow cancel support (#4045) * Add temporal workflow cancel support --- .../launcher/ExecuteGobblinJobLauncher.java | 4 +- .../GenerateWorkUnitsJobLauncher.java | 3 +- .../launcher/ProcessWorkUnitsJobLauncher.java | 3 +- .../joblauncher/GobblinJobLauncher.java | 25 ++- .../GobblinTemporalJobLauncher.java | 65 +++++- .../GobblinTemporalJobScheduler.java | 8 + .../GobblinTemporalJobLauncherTest.java | 204 ++++++++++++++++++ .../org.mockito.plugins.MockMaker | 1 + 8 files changed, 303 insertions(+), 10 deletions(-) create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java create mode 100644 gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java index 4c2e44c5bdb..b2f65ccb410 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java @@ -83,10 +83,12 @@ public ExecuteGobblinJobLauncher( public void submitJob(List workunits) { try { Properties finalProps = adjustJobProperties(this.jobProps); + // Initialize workflowId. + this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(finalProps)); WorkflowOptions options = WorkflowOptions.newBuilder() .setTaskQueue(this.queueName) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(finalProps))) .setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(finalProps)) + .setWorkflowId(this.workflowId) .build(); ExecuteGobblinWorkflow workflow = this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java index ce0d8b732bc..b058472c6b0 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java @@ -71,10 +71,11 @@ public GenerateWorkUnitsJobLauncher( @Override public void submitJob(List workunits) { try { + this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps)); WorkflowOptions options = WorkflowOptions.newBuilder() .setTaskQueue(this.queueName) .setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps)) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps))) + .setWorkflowId(this.workflowId) .build(); GenerateWorkUnitsWorkflow workflow = this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java index d0ce87c05db..709a9cf935f 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java @@ -102,10 +102,11 @@ public void submitJob(List workunits) { GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX)); + this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps)); WorkflowOptions options = WorkflowOptions.newBuilder() .setTaskQueue(this.queueName) .setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps)) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps))) + .setWorkflowId(this.workflowId) .build(); Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, Help.loadFileSystem(wuSpec))); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java index 12d8861ecdf..ea2c2ce7c2e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java @@ -22,6 +22,10 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -87,7 +91,7 @@ public abstract class GobblinJobLauncher extends AbstractJobLauncher { protected final StateStores stateStores; protected JobListener jobListener; protected volatile boolean jobSubmitted = false; - + private final ExecutorService executor; public GobblinJobLauncher(Properties jobProps, Path appWorkDir, List> metadataTags, ConcurrentHashMap runningMap, EventBus eventbus) @@ -122,6 +126,7 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir, this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.eventSubmitter, this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository()); + this.executor = Executors.newSingleThreadExecutor(); } @Override @@ -150,17 +155,23 @@ protected void runWorkUnits(List workUnits) throws Exception { // Start the output TaskState collector service this.taskStateCollectorService.startAsync().awaitRunning(); + Future submitJobFuture = null; synchronized (this.cancellationRequest) { if (!this.cancellationRequested) { - submitJob(workUnits); + submitJobFuture = executor.submit(() -> { + try { + submitJob(workUnits); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); log.info(String.format("Submitted job %s", this.jobContext.getJobId())); this.jobSubmitted = true; } else { log.warn("Job {} not submitted as it was requested to be cancelled.", this.jobContext.getJobId()); } } - - waitJob(); + waitJob(submitJobFuture); log.info(String.format("Job %s completed", this.jobContext.getJobId())); } finally { // The last iteration of output TaskState collecting will run when the collector service gets stopped @@ -172,7 +183,11 @@ protected void runWorkUnits(List workUnits) throws Exception { protected void submitJob(List workUnits) throws Exception { } - protected void waitJob() throws InterruptedException { + protected void waitJob(Future submitJobFuture) + throws InterruptedException, ExecutionException { + if (submitJobFuture != null) { + submitJobFuture.get(); + } } @Override diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 82aeb8b20f0..2d17fe20a30 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -20,12 +20,19 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowStub; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.workflow.Workflow; @@ -65,10 +72,13 @@ @Alpha public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher { private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class); + private static final int TERMINATION_TIMEOUT_SECONDS = 3; protected WorkflowServiceStubs workflowServiceStubs; protected WorkflowClient client; protected String queueName; + protected String namespace; + protected String workflowId; public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir, List> metadataTags, ConcurrentHashMap runningMap, EventBus eventBus) @@ -79,11 +89,13 @@ public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir, String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING); this.workflowServiceStubs = createServiceInstance(connectionUri); - String namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); + this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); this.client = createClientInstance(workflowServiceStubs, namespace); this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE, DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE); + // non-null value indicates job has been submitted + this.workflowId = null; startCancellationExecutor(); } @@ -113,7 +125,56 @@ protected void handleLaunchFinalization() { @Override protected void executeCancellation() { - log.info("Cancel temporal workflow"); + if (this.workflowId == null) { + log.info("Cancellation of temporal workflow attempted without submitting it"); + return; + } + + log.info("Cancelling temporal workflow {}", this.workflowId); + try { + WorkflowStub workflowStub = this.client.newUntypedWorkflowStub(this.workflowId); + + // Describe the workflow execution to get its status + DescribeWorkflowExecutionRequest request = DescribeWorkflowExecutionRequest.newBuilder() + .setNamespace(this.namespace) + .setExecution(workflowStub.getExecution()) + .build(); + DescribeWorkflowExecutionResponse response = workflowServiceStubs.blockingStub().describeWorkflowExecution(request); + + WorkflowExecutionStatus status; + try { + status = response.getWorkflowExecutionInfo().getStatus(); + } catch (Exception e) { + log.warn("Exception occurred while getting status of the workflow " + this.workflowId + + ". We would still attempt the cancellation", e); + workflowStub.cancel(); + log.info("Temporal workflow {} cancelled successfully", this.workflowId); + return; + } + + // Check if the workflow is not finished + if (status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED && + status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED && + status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED && + status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED) { + workflowStub.cancel(); + try { + // Check workflow status, if it is cancelled, will throw WorkflowFailedException else TimeoutException + workflowStub.getResult(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS, String.class, String.class); + } catch (TimeoutException te) { + // Workflow is still running, terminate it. + log.info("Workflow is still running, will attempt termination", te); + workflowStub.terminate("Job cancel invoked"); + } catch (WorkflowFailedException wfe) { + // Do nothing as exception is expected. + } + log.info("Temporal workflow {} cancelled successfully", this.workflowId); + } else { + log.info("Workflow {} is already finished with status {}", this.workflowId, status); + } + } catch (Exception e) { + log.error("Exception occurred while cancelling the workflow " + this.workflowId, e); + } } /** No-op: merely logs a warning, since not expected to be invoked */ diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java index 76629aa68dc..34a1cec4dc2 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java @@ -210,6 +210,14 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { LOGGER.info("No job schedule found, so running job " + jobUri); GobblinTemporalJobLauncherListener listener = new GobblinTemporalJobLauncherListener(this.launcherMetrics); JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig()); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + launcher.cancelJob(listener); + } catch (JobException e) { + LOGGER.error("Failed to cancel the job during shutdown", e); + throw new RuntimeException(e); + } + })); launcher.launchJob(listener); } } catch (Exception je) { diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java new file mode 100644 index 00000000000..98d0379b651 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.joblauncher; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.fs.Path; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.io.Files; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; +import io.temporal.serviceclient.WorkflowServiceStubs; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.example.simplejson.SimpleJsonSource; +import org.apache.gobblin.runtime.locks.FileBasedJobLock; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory; +import org.apache.gobblin.util.JobLauncherUtils; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class GobblinTemporalJobLauncherTest { + + private GobblinTemporalJobLauncher jobLauncher; + private MockedStatic mockWorkflowClientFactory; + private WorkflowServiceStubs mockServiceStubs; + private WorkflowClient mockClient; + private WorkflowStub mockStub; + private WorkflowExecutionInfo mockExecutionInfo; + private Properties jobProperties; + + class GobblinTemporalJobLauncherForTest extends GobblinTemporalJobLauncher { + public GobblinTemporalJobLauncherForTest(Properties jobProperties, Path appWorkDir) throws Exception { + super(jobProperties, appWorkDir, new ArrayList<>(), new ConcurrentHashMap<>(), null); + } + + @Override + protected void submitJob(List workUnits) + throws Exception { + this.workflowId = "someWorkflowId"; + } + } + + + @BeforeClass + public void setUp() throws Exception { + mockServiceStubs = mock(WorkflowServiceStubs.class); + mockClient = mock(WorkflowClient.class); + mockExecutionInfo = mock(WorkflowExecutionInfo.class); + DescribeWorkflowExecutionResponse mockResponse = mock(DescribeWorkflowExecutionResponse.class); + WorkflowServiceGrpc.WorkflowServiceBlockingStub mockBlockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); + when(mockServiceStubs.blockingStub()).thenReturn(mockBlockingStub); + when(mockBlockingStub.describeWorkflowExecution(Mockito.any())).thenReturn(mockResponse); + when(mockResponse.getWorkflowExecutionInfo()).thenReturn(mockExecutionInfo); + + mockWorkflowClientFactory = Mockito.mockStatic(TemporalWorkflowClientFactory.class); + mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createServiceInstance(Mockito.anyString())) + .thenReturn(mockServiceStubs); + mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createClientInstance(Mockito.any(), Mockito.anyString())) + .thenReturn(mockClient); + + jobProperties = new Properties(); + jobProperties.setProperty(ConfigurationKeys.FS_URI_KEY, "file:///"); + jobProperties.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING, "someConnString"); + jobProperties.setProperty(ConfigurationKeys.JOB_LOCK_TYPE, FileBasedJobLock.class.getName()); + jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, SimpleJsonSource.class.getName()); + } + + @BeforeMethod + public void methodSetUp() throws Exception { + mockStub = mock(WorkflowStub.class); + when(mockClient.newUntypedWorkflowStub(Mockito.anyString())).thenReturn(mockStub); + when(mockStub.getExecution()).thenReturn(WorkflowExecution.getDefaultInstance()); + + File tmpDir = Files.createTempDir(); + String basePath = tmpDir.getAbsolutePath(); + Path appWorkDir = new Path(basePath, "testAppWorkDir"); + String jobLockDir = new Path(basePath, "jobLockDir").toString(); + String stateStoreDir = new Path(basePath, "stateStoreDir").toString(); + String jobName = "testJob"; + String jobId = JobLauncherUtils.newJobId(jobName); + jobProperties.setProperty(ConfigurationKeys.JOB_NAME_KEY, jobName); + jobProperties.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId); + jobProperties.setProperty(FileBasedJobLock.JOB_LOCK_DIR, jobLockDir); + jobProperties.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, stateStoreDir); + + jobLauncher = new GobblinTemporalJobLauncherForTest(jobProperties, appWorkDir); + } + + @Test + public void testCancelWorkflowIfFailed() throws Exception { + // For workflowId to be generated + jobLauncher.submitJob(null); + + // Mock the workflow status to be failed + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(0)).cancel(); + } + + @Test + public void testCancelWorkflowIfCompleted() throws Exception { + // For workflowId to be generated + jobLauncher.submitJob(null); + + // Mock the workflow status to be completed + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(0)).cancel(); + } + + @Test + public void testCancelWorkflowIfRunning() throws Exception { + // Mock the workflow status to be running + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING); + + jobLauncher.executeCancellation(); + + // Verify that the cancel method was not called without job submission + verify(mockStub, times(0)).cancel(); + + jobLauncher.submitJob(null); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(1)).cancel(); + } + + @Test + public void testCancelWorkflowFetchStatusThrowsException() throws Exception { + // Mock the get workflow status to throw an exception + Mockito.doThrow(new RuntimeException("Some exception occurred")).when(mockExecutionInfo).getStatus(); + + jobLauncher.submitJob(null); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(1)).cancel(); + + Mockito.reset(mockExecutionInfo); + } + + @Test + public void testTerminateWorkflow() throws Exception { + // Mock the workflow status to be running + when(mockExecutionInfo.getStatus()) + .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING); + + // Mock getResult to throw TimeoutException + Mockito.doThrow(new TimeoutException("Workflow still in running")) + .when(mockStub).getResult(3L, TimeUnit.SECONDS, String.class, String.class); + + jobLauncher.submitJob(null); + + jobLauncher.executeCancellation(); + + verify(mockStub, times(1)).terminate("Job cancel invoked"); + } +} \ No newline at end of file diff --git a/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000000..1f0955d450f --- /dev/null +++ b/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline