Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@
import org.apache.gobblin.runtime.JobState.DatasetState;
import org.apache.gobblin.runtime.commit.FsCommitSequenceStore;
import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
import org.apache.gobblin.runtime.util.DatasetStateStoreUtils;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
Expand Down Expand Up @@ -192,28 +192,7 @@ public JobContext(Properties jobProps, Logger logger, SharedResourcesBroker<Gobb

protected DatasetStateStore createStateStore(Config jobConfig)
throws IOException {
boolean stateStoreEnabled = !jobConfig.hasPath(ConfigurationKeys.STATE_STORE_ENABLED) || jobConfig
.getBoolean(ConfigurationKeys.STATE_STORE_ENABLED);

String stateStoreType;

if (!stateStoreEnabled) {
stateStoreType = ConfigurationKeys.STATE_STORE_TYPE_NOOP;
} else {
stateStoreType = ConfigUtils.getString(jobConfig, ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY, ConfigUtils
.getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
}

ClassAliasResolver<DatasetStateStore.Factory> resolver = new ClassAliasResolver<>(DatasetStateStore.Factory.class);

try {
DatasetStateStore.Factory stateStoreFactory = resolver.resolveClass(stateStoreType).newInstance();
return stateStoreFactory.createStateStore(jobConfig);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
return DatasetStateStoreUtils.createStateStore(jobConfig);
}

protected Optional<JobHistoryStore> createJobHistoryStore(Properties jobProps) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.runtime.util;

import java.io.IOException;

import lombok.extern.slf4j.Slf4j;

import com.typesafe.config.Config;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;


/**
* Utilities for creating and managing {@link DatasetStateStore} instances based on job configuration.
*/
@Slf4j
public class DatasetStateStoreUtils {

/**
* Private constructor to prevent instantiation of this utility class.
*/
private DatasetStateStoreUtils() {
// Utility class should not be instantiated
}

/**
* Creates a {@link DatasetStateStore} instance based on the provided job configuration.
*
* <p>This method performs the following operations:</p>
* <ol>
* <li>Checks if state store is enabled via {@link ConfigurationKeys#STATE_STORE_ENABLED}</li>
* <li>Determines the appropriate state store type from configuration hierarchy</li>
* <li>Uses {@link ClassAliasResolver} to resolve the state store factory class</li>
* <li>Creates and returns the configured {@link DatasetStateStore} instance</li>
* </ol>
*
* <p>If state store is disabled, a no-op state store type will be used. Otherwise, the method
* looks for state store type in the following priority order:</p>
* <ol>
* <li>{@link ConfigurationKeys#DATASET_STATE_STORE_TYPE_KEY}</li>
* <li>{@link ConfigurationKeys#STATE_STORE_TYPE_KEY}</li>
* <li>{@link ConfigurationKeys#DEFAULT_STATE_STORE_TYPE}</li>
* </ol>
*
* @param jobConfig the job configuration containing state store settings and type information.
* Must not be null.
* @return a configured {@link DatasetStateStore} instance ready for use
* @throws IOException if there's an error creating the state store, including:
* <ul>
* <li>Class resolution failures</li>
* <li>Factory instantiation errors</li>
* <li>State store creation failures</li>
* </ul>
* @throws RuntimeException if there's a runtime error during state store initialization,
* which will be logged and re-thrown
*/
public static DatasetStateStore createStateStore(Config jobConfig)
throws IOException {
boolean stateStoreEnabled = !jobConfig.hasPath(ConfigurationKeys.STATE_STORE_ENABLED) || jobConfig
.getBoolean(ConfigurationKeys.STATE_STORE_ENABLED);

String stateStoreType;

if (!stateStoreEnabled) {
stateStoreType = ConfigurationKeys.STATE_STORE_TYPE_NOOP;
} else {
stateStoreType = ConfigUtils.getString(jobConfig, ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY, ConfigUtils
.getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
}

ClassAliasResolver<DatasetStateStore.Factory> resolver = new ClassAliasResolver<>(DatasetStateStore.Factory.class);

try {
DatasetStateStore.Factory stateStoreFactory = resolver.resolveClass(stateStoreType).newInstance();
return stateStoreFactory.createStateStore(jobConfig);
} catch (RuntimeException e) {
log.error("Error in initializing DataStateStore of type {} ", stateStoreType, e);
throw e;
} catch (Exception e) {
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.tdunning.math.stats.TDigest;
import com.typesafe.config.ConfigFactory;
import io.temporal.failure.ApplicationFailure;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;

import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.converter.initializer.ConverterInitializer;
Expand All @@ -53,7 +56,9 @@
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.CombinedWorkUnitAndDatasetStateGenerator;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.util.DatasetStateStoreUtils;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
Expand All @@ -76,6 +81,7 @@
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;



@Slf4j
public class GenerateWorkUnitsImpl implements GenerateWorkUnits {

Expand Down Expand Up @@ -144,10 +150,16 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
log.info("Using work dir root path for job '{}' - '{}'", jobState.getJobId(), workDirRoot);

// TODO: determine whether these are actually necessary to do (as MR/AbstractJobLauncher did)!
// SharedResourcesBroker<GobblinScopeTypes> jobBroker = JobStateUtils.getSharedResourcesBroker(jobState);
// jobState.setBroker(jobBroker);
// jobState.setWorkUnitAndDatasetStateFunctional(new CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
/* Set up dataset state functional and shared resource broker on JobState to enable
work units to access previous dataset states and watermarks during work discovery, following MR launcher pattern*/
try {
addDatasetStateFunctionalAndSharedResourceBrokerToJobState(jobProps, jobState);
}
catch (IOException e){
String errMsg = "Failed to addDatasetStateFunctionalAndSharedResourceBrokerToJobState for job " + jobState.getJobId();
log.error(errMsg, e);
throw ApplicationFailure.newFailureWithCause(errMsg, "Failure: creating SharedResourcesBroker", e);
}

AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobProps);
troubleshooter.start();
Expand Down Expand Up @@ -195,6 +207,13 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
}
}

private void addDatasetStateFunctionalAndSharedResourceBrokerToJobState(Properties jobProps, JobState jobState) throws IOException {
SharedResourcesBroker<GobblinScopeTypes> jobBroker = JobStateUtils.getSharedResourcesBroker(jobState);
jobState.setBroker(jobBroker);
jobState.setWorkUnitAndDatasetStateFunctional(new CombinedWorkUnitAndDatasetStateGenerator(
DatasetStateStoreUtils.createStateStore(ConfigFactory.parseProperties(jobProps)), jobState.getJobName()));
}

protected WorkUnitsWithInsights generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer)
throws ReflectiveOperationException {
// report (timer) metrics for "Work Discovery", *planning only* - NOT including WU prep, like serialization, `DestinationDatasetHandlerService`ing, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,35 @@

package org.apache.gobblin.temporal.ddm.activity.impl;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.mockito.MockedStatic;
import org.mockito.Mockito;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import org.testng.Assert;
import org.testng.annotations.Test;

import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.CombinedWorkUnitAndDatasetStateGenerator;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.util.DatasetStateStoreUtils;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;


Expand Down Expand Up @@ -199,6 +216,119 @@ public void testDigestWorkUnitsSizeWithEmptyWorkUnits() {
Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsMedianSize(), 0.0);
}

@Test
public void testAddDatasetStateFunctionalAndSharedResourceBrokerToJobState() throws Exception {
// Arrange
Properties jobProps = new Properties();
jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");

JobState jobState = new JobState(jobProps);
SharedResourcesBroker<GobblinScopeTypes> mockBroker = mock(SharedResourcesBroker.class);
DatasetStateStore mockDatasetStateStore = mock(DatasetStateStore.class);

// Create instance and get access to the private method
GenerateWorkUnitsImpl generateWorkUnitsImpl = new GenerateWorkUnitsImpl();
Method privateMethod = GenerateWorkUnitsImpl.class.getDeclaredMethod(
"addDatasetStateFunctionalAndSharedResourceBrokerToJobState", Properties.class, JobState.class);
privateMethod.setAccessible(true);

// Mock static method calls
try (MockedStatic<JobStateUtils> mockedJobStateUtils = Mockito.mockStatic(JobStateUtils.class);
MockedStatic<DatasetStateStoreUtils> mockedDataStateStoreUtils = Mockito.mockStatic(DatasetStateStoreUtils.class)) {

mockedJobStateUtils.when(() -> JobStateUtils.getSharedResourcesBroker(jobState))
.thenReturn(mockBroker);
mockedDataStateStoreUtils.when(() -> DatasetStateStoreUtils.createStateStore(any()))
.thenReturn(mockDatasetStateStore);

// Act - invoke on the instance instead of null
privateMethod.invoke(generateWorkUnitsImpl, jobProps, jobState);

// Assert
Assert.assertEquals(jobState.getBroker(), mockBroker, "SharedResourcesBroker should be set on JobState");
Assert.assertNotNull(jobState.getWorkUnitAndDatasetStateFunctional(), "WorkUnitAndDatasetStateFunctional should be set");
Assert.assertTrue(jobState.getWorkUnitAndDatasetStateFunctional() instanceof CombinedWorkUnitAndDatasetStateGenerator,
"WorkUnitAndDatasetStateFunctional should be instance of CombinedWorkUnitAndDatasetStateGenerator");

// Verify interactions
mockedJobStateUtils.verify(() -> JobStateUtils.getSharedResourcesBroker(jobState), times(1));
mockedDataStateStoreUtils.verify(() -> DatasetStateStoreUtils.createStateStore(any()), times(1));
}
}

@Test
public void testAddDatasetStateFunctionalAndSharedResourceBrokerToJobStateWithIOException() throws Exception {
// Arrange
Properties jobProps = new Properties();
jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");

JobState jobState = new JobState(jobProps);

// Create instance and get access to the private method
GenerateWorkUnitsImpl generateWorkUnitsImpl = new GenerateWorkUnitsImpl();
Method privateMethod = GenerateWorkUnitsImpl.class.getDeclaredMethod(
"addDatasetStateFunctionalAndSharedResourceBrokerToJobState", Properties.class, JobState.class);
privateMethod.setAccessible(true);

// Mock static method calls to throw IOException
try (MockedStatic<JobStateUtils> mockedJobStateUtils = Mockito.mockStatic(JobStateUtils.class);
MockedStatic<DatasetStateStoreUtils> mockedDataStateStoreUtils = Mockito.mockStatic(DatasetStateStoreUtils.class)) {

mockedDataStateStoreUtils.when(() -> DatasetStateStoreUtils.createStateStore(any()))
.thenThrow(new IOException("Failed to create state store"));

// Act & Assert
try {
privateMethod.invoke(generateWorkUnitsImpl, jobProps, jobState);
Assert.fail("Expected IOException to be thrown");
} catch (InvocationTargetException e) {
Assert.assertTrue(e.getCause() instanceof IOException, "Root cause should be IOException");
Assert.assertEquals(e.getCause().getMessage(), "Failed to create state store");
// Verify broker was never set due to exception
Assert.assertNull(jobState.getBroker(), "Broker should not be set when exception occurs");
Assert.assertNull(jobState.getWorkUnitAndDatasetStateFunctional(),
"WorkUnitAndDatasetStateFunctional should not be set when exception occurs");
}
}
}

@Test
public void testAddDatasetStateFunctionalAndSharedResourceBrokerToJobStateWithNullBroker() throws Exception {
// Arrange
Properties jobProps = new Properties();
jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");

JobState jobState = new JobState(jobProps);
DatasetStateStore mockDatasetStateStore = mock(DatasetStateStore.class);

// Create instance and get access to the private method
GenerateWorkUnitsImpl generateWorkUnitsImpl = new GenerateWorkUnitsImpl();
Method privateMethod = GenerateWorkUnitsImpl.class.getDeclaredMethod(
"addDatasetStateFunctionalAndSharedResourceBrokerToJobState", Properties.class, JobState.class);
privateMethod.setAccessible(true);

// Mock static method calls - return null broker to test null handling
try (MockedStatic<JobStateUtils> mockedJobStateUtils = Mockito.mockStatic(JobStateUtils.class);
MockedStatic<DatasetStateStoreUtils> mockedDataStateStoreUtils = Mockito.mockStatic(DatasetStateStoreUtils.class)) {

mockedJobStateUtils.when(() -> JobStateUtils.getSharedResourcesBroker(jobState))
.thenReturn(null);
mockedDataStateStoreUtils.when(() -> DatasetStateStoreUtils.createStateStore(any()))
.thenReturn(mockDatasetStateStore);

// Act
privateMethod.invoke(generateWorkUnitsImpl, jobProps, jobState);

// Assert
Assert.assertNull(jobState.getBroker(), "Broker should be null when null broker is returned");
Assert.assertNotNull(jobState.getWorkUnitAndDatasetStateFunctional(),
"WorkUnitAndDatasetStateFunctional should still be set even with null broker");
}
}

public static WorkUnit createWorkUnitOfSize(long size) {
WorkUnit workUnit = WorkUnit.createEmpty();
workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);
Expand Down
Loading