Skip to content

Commit

Permalink
Revert "remove flowconfigsv1"
Browse files Browse the repository at this point in the history
This reverts commit 1f73f53.
  • Loading branch information
arjun4084346 committed Aug 29, 2024
1 parent 89fcf0a commit 7c8d1cc
Show file tree
Hide file tree
Showing 23 changed files with 313 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"name" : "flowconfigs",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowconfigs",
"schema" : "org.apache.gobblin.service.FlowConfig",
"doc" : "Resource for handling flow configuration requests\n\ngenerated from: org.apache.gobblin.service.FlowConfigsResource",
"collection" : {
"identifier" : {
"name" : "id",
"type" : "org.apache.gobblin.service.FlowId",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
"supports" : [ "create", "delete", "get", "update" ],
"methods" : [ {
"method" : "create",
"doc" : "Create a flow configuration that the service will forward to execution instances for execution"
}, {
"method" : "get",
"doc" : "Retrieve the flow configuration with the given key"
}, {
"method" : "update",
"doc" : "Update the flow configuration with the specified key. Running flows are not affected.\n An error is raised if the flow configuration does not exist."
}, {
"method" : "delete",
"doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
} ],
"entity" : {
"path" : "/flowconfigs/{id}"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
{
"models" : [ {
"type" : "record",
"name" : "EmptyRecord",
"namespace" : "com.linkedin.restli.common",
"doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request<EmptyRecord> to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource<XyzKey, EmptyRecord, Xyz>.",
"fields" : [ ],
"validate" : {
"com.linkedin.restli.common.EmptyRecordValidator" : { }
}
}, {
"type" : "record",
"name" : "FlowConfig",
"namespace" : "org.apache.gobblin.service",
"doc" : "Defines a flow configuration that can be compiled into Gobblin jobs",
"fields" : [ {
"name" : "id",
"type" : {
"type" : "record",
"name" : "FlowId",
"doc" : "Identifier for a Gobblin as a Service flow",
"fields" : [ {
"name" : "flowName",
"type" : "string",
"doc" : "Name of the flow",
"validate" : {
"strlen" : {
"max" : 128,
"min" : 1
}
}
}, {
"name" : "flowGroup",
"type" : "string",
"doc" : "Group of the flow. This defines the namespace for the flow.",
"validate" : {
"strlen" : {
"max" : 128,
"min" : 1
}
}
} ]
},
"doc" : "Identifier for the flow"
}, {
"name" : "schedule",
"type" : {
"type" : "record",
"name" : "Schedule",
"doc" : "Attributes for defining a job schedule",
"fields" : [ {
"name" : "cronSchedule",
"type" : "string",
"doc" : "Schedule for flow in cron format",
"validate" : {
"org.apache.gobblin.service.validator.CronValidator" : { }
}
}, {
"name" : "runImmediately",
"type" : "boolean",
"doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled",
"default" : false
} ]
},
"doc" : "Optional schedule for when to execution the flow. If a schedule is not specified then the flow is executed immediately.",
"optional" : true
}, {
"name" : "templateUris",
"type" : "string",
"doc" : "Comma separated list of URIs for templates used in the flow. The template location is defined by the multiproduct that packages the template.",
"validate" : {
"org.apache.gobblin.service.validator.TemplateUriValidator" : { }
}
}, {
"name" : "explain",
"type" : "boolean",
"doc" : "Return the compiled flow as a string. If enabled, the flow is not added.",
"default" : false
}, {
"name" : "owningGroup",
"type" : "string",
"doc" : "Optional string name of group that the requester belongs to for group ownership of flows.",
"optional" : true
}, {
"name" : "properties",
"type" : {
"type" : "map",
"values" : "string"
},
"doc" : "Properties for the flow. These properties are passed to the compiled Gobblin jobs."
} ]
}, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.Schedule" ],
"schema" : {
"name" : "flowconfigs",
"namespace" : "org.apache.gobblin.service",
"path" : "/flowconfigs",
"schema" : "org.apache.gobblin.service.FlowConfig",
"doc" : "Resource for handling flow configuration requests\n\ngenerated from: org.apache.gobblin.service.FlowConfigsResource",
"collection" : {
"identifier" : {
"name" : "id",
"type" : "org.apache.gobblin.service.FlowId",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
"supports" : [ "create", "delete", "get", "update" ],
"methods" : [ {
"method" : "create",
"doc" : "Create a flow configuration that the service will forward to execution instances for execution"
}, {
"method" : "get",
"doc" : "Retrieve the flow configuration with the given key"
}, {
"method" : "update",
"doc" : "Update the flow configuration with the specified key. Running flows are not affected.\n An error is raised if the flow configuration does not exist."
}, {
"method" : "delete",
"doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
} ],
"entity" : {
"path" : "/flowconfigs/{id}"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler;
import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void setUp() throws Exception {
groupOwnershipService = new LocalGroupOwnershipService(groupServiceConfig);

Injector injector = Guice.createInjector(binder -> {
binder.bind(FlowConfigsV2ResourceHandler.class).toInstance(new FlowConfigsV2ResourceHandler("service_name", flowCatalog));
binder.bind(FlowConfigsResourceHandler.class).toInstance(new FlowConfigsResourceHandler("service_name", flowCatalog));
binder.bind(RequesterService.class).toInstance(_requesterService);
binder.bind(GroupOwnershipService.class).toInstance(groupOwnershipService);
});
Expand Down Expand Up @@ -270,7 +270,7 @@ public void testUnschedule() throws Exception {
FlowConfig persistedFlowConfig = _client.getFlowConfig(flowId);

Assert.assertFalse(persistedFlowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_UNSCHEDULE_KEY));
Assert.assertEquals(Objects.requireNonNull(persistedFlowConfig.getSchedule()).getCronSchedule(), FlowConfigsV2ResourceHandler.NEVER_RUN_CRON_SCHEDULE.getCronSchedule());
Assert.assertEquals(Objects.requireNonNull(persistedFlowConfig.getSchedule()).getCronSchedule(), FlowConfigsResourceHandler.NEVER_RUN_CRON_SCHEDULE.getCronSchedule());
}

@Test (dependsOnMethods = "testUnschedule")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler;
import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler;


/**
Expand All @@ -69,7 +69,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store");

@Inject
private FlowConfigsV2ResourceHandler flowConfigsResourceHandler;
private FlowConfigsResourceHandler flowConfigsResourceHandler;

// For getting who sends the request
@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public static FlowExecution convertFlowStatus(FlowStatus monitoringFlowStatus,
jobStatusArray.add(jobStatus);
}

// If DagManager is not enabled, we have to determine flow end time by individual job's end times.
flowEndTime = flowEndTime == 0L ? maxJobEndTime : flowEndTime;

jobStatusArray.sort(Comparator.comparing((org.apache.gobblin.service.JobStatus js) -> js.getExecutionStatistics().getExecutionStartTime()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@


@Slf4j
public class FlowConfigsV2ResourceHandler {
public class FlowConfigsResourceHandler {

@Getter
private String serviceName;
Expand All @@ -87,7 +87,7 @@ public class FlowConfigsV2ResourceHandler {
protected final ContextAwareMeter runImmediatelyFlow;

@Inject
public FlowConfigsV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String serviceName, FlowCatalog flowCatalog) {
public FlowConfigsResourceHandler(@Named(InjectionNames.SERVICE_NAME) String serviceName, FlowCatalog flowCatalog) {
this.serviceName = serviceName;
this.flowCatalog = flowCatalog;
MetricContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler;
import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler;


public class FlowConfigResourceLocalHandlerTest {
Expand All @@ -48,7 +48,7 @@ public void testCreateFlowSpecForConfig() throws URISyntaxException {
setRunImmediately(true))
.setProperties(new StringMap(flowProperties));

FlowSpec flowSpec = FlowConfigsV2ResourceHandler.createFlowSpecForConfig(flowConfig);
FlowSpec flowSpec = FlowConfigsResourceHandler.createFlowSpecForConfig(flowConfig);
Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_GROUP_NAME);
Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY), TEST_SCHEDULE);
Expand All @@ -65,7 +65,7 @@ public void testCreateFlowSpecForConfig() throws URISyntaxException {
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).
setRunImmediately(true))
.setProperties(new StringMap(flowProperties));
flowSpec = FlowConfigsV2ResourceHandler.createFlowSpecForConfig(flowConfig);
flowSpec = FlowConfigsResourceHandler.createFlowSpecForConfig(flowConfig);

Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_GROUP_NAME);
Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler;
import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler;
import org.apache.gobblin.service.modules.restli.FlowExecutionResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
Expand Down Expand Up @@ -139,7 +139,7 @@ public void configure(Binder binder) {
binder.bind(FlowConfigsV2Resource.class);
binder.bind(FlowStatusResource.class);
binder.bind(FlowExecutionResource.class);
binder.bind(FlowConfigsV2ResourceHandler.class);
binder.bind(FlowConfigsResourceHandler.class);
binder.bind(FlowExecutionResourceHandler.class);
binder.bind(FlowExecutionResourceHandlerInterface.class).to(FlowExecutionResourceHandler.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler;
import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler;
import org.apache.gobblin.service.modules.restli.FlowExecutionResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
Expand Down Expand Up @@ -120,7 +120,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri

@Inject
@Getter
protected FlowConfigsV2ResourceHandler resourceHandler;
protected FlowConfigsResourceHandler resourceHandler;

@Inject
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
}

public void cleanup() {
// Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager
if (this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManagerMetrics.class.getSimpleName())) {
// The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
// To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@


/**
* An interface for storing and retrieving currently running {@link Dag<JobExecutionPlan>}s.
* An interface for storing and retrieving currently running {@link Dag<JobExecutionPlan>}s. In case of a leadership
* change in the {@link org.apache.gobblin.service.modules.core.GobblinServiceManager}, the corresponding {@link DagManager}
* loads the running {@link Dag}s from the {@link DagStateStore} to resume their execution.
*/
@Alpha
public interface DagStateStore {
/**
* Persist the {@link Dag} to the backing store.
* This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted
* and be picked up again when leader transition happens.
* @param dag The dag submitted to store
* @param dag The dag submitted to {@link DagManager}
*/
void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException;

Expand All @@ -58,8 +60,9 @@ default boolean cleanUp(Dag.DagId dagId) throws IOException {
void cleanUp(String dagId) throws IOException;

/**
* Load all currently running {@link Dag}s from the underlying store.
* @deprecated because {@link DagProcessingEngine} does not need this API
* Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager}
* takes over or on restart of service.
* @deprecated because {@link DagProcessingEngine} that will replace {@link DagManager} does not need this API
* @return a {@link List} of currently running {@link Dag}s.
*/
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, Optional<Log
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.jobStatusRetriever = jobStatusRetriever;
this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
//At this point, the TopologySpecMap is initialized by the SpecCompiler. Pass the TopologySpecMap to the DagManagementStateStore.
//At this point, the TopologySpecMap is initialized by the SpecCompiler. Pass the TopologySpecMap to the DagManager.
((MySqlDagManagementStateStore) dagManagementStateStore).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());

this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

/**
* Manages the statically configured user quotas for the proxy user in user.to.proxy configuration, the API requester(s)
* and the flow group. It is used by the {@link org.apache.gobblin.service.modules.orchestration.proc.DagProc} to ensure
* that the number of currently running jobs do not exceed the quota, if the quota is exceeded, the execution will fail
* without running on the underlying executor.
* and the flow group.
* It is used by the {@link DagManager} to ensure that the number of currently running jobs do not exceed the quota, if
* the quota is exceeded, the execution will fail without running on the underlying executor.
*/
public interface UserQuotaManager {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

/**
* An implementation for {@link DagProc} that marks the {@link Dag} as failed and cancel the job if it does not start in
* {@link org.apache.gobblin.service.ServiceConfigKeys#JOB_START_SLA_TIME} time.
* {@link org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} time.
*/
@Slf4j
public class EnforceJobStartDeadlineDagProc extends DeadlineEnforcementDagProc {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

/**
* A {@link DagTask} responsible for killing jobs if they have not started in
* {@link org.apache.gobblin.service.ServiceConfigKeys#JOB_START_SLA_TIME}.
* {@link org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}.
*/

public class EnforceJobStartDeadlineDagTask extends DagTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@


/**
* Helper class with functionality meant to be re-used between the LaunchDagProc and Orchestrator when launching
* Helper class with functionality meant to be re-used between the DagManager and Orchestrator when launching
* executions of a flow spec. In the common case, the Orchestrator receives a flow to orchestrate, performs necessary
* validations, and creates DagActions. The DagProcessingEngine's responsibility is to
* process out dag action requests. However, with launch executions now being stored in the DagActionStateStore, on
* restart, the LaunchDagProc has to perform validations before executing any launch actions the previous LaunchDagProc
* process was unable to complete. Rather than duplicating the code or introducing a circular dependency between
* the LaunchDagProc and Orchestrator, this class is utilized to store the common functionality. It is stateful,
* validations, and forwards the execution responsibility to the DagManager. The DagManager's responsibility is to
* carry out any flow action requests. However, with launch executions now being stored in the DagActionStateStore, on
* restart or leadership change the DagManager has to perform validations before executing any launch actions the
* previous leader was unable to complete. Rather than duplicating the code or introducing a circular dependency between
* the DagManager and Orchestrator, this class is utilized to store the common functionality. It is stateful,
* requiring all stateful pieces to be passed as input from the caller upon instantiating the helper.
* Note: We expect further refactoring to be done to the DagManager in later stage of multi-active development, so we do
* not attempt major reorganization as abstractions may change.
*/
@Slf4j
@Data
Expand Down
Loading

0 comments on commit 7c8d1cc

Please sign in to comment.