From 7c8d1ccc7a3cf61bbb7eccd6b5d086aae8961552 Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Thu, 29 Aug 2024 15:28:53 -0700 Subject: [PATCH] Revert "remove flowconfigsv1" This reverts commit 1f73f538b8dd09a35b761b908e35bf08f730ba5a. --- ....gobblin.service.flowconfigs.restspec.json | 31 +++++ ....gobblin.service.flowconfigs.snapshot.json | 124 ++++++++++++++++++ .../gobblin/service/FlowConfigTest.java | 6 +- .../service/FlowConfigsV2Resource.java | 4 +- .../service/FlowExecutionResource.java | 1 + ...r.java => FlowConfigsResourceHandler.java} | 4 +- .../FlowConfigResourceLocalHandlerTest.java | 6 +- .../core/GobblinServiceGuiceModule.java | 4 +- .../modules/core/GobblinServiceManager.java | 4 +- .../orchestration/DagManagerMetrics.java | 1 + .../modules/orchestration/DagStateStore.java | 11 +- .../modules/orchestration/Orchestrator.java | 2 +- .../orchestration/UserQuotaManager.java | 6 +- .../proc/EnforceJobStartDeadlineDagProc.java | 2 +- .../task/EnforceJobStartDeadlineDagTask.java | 2 +- .../FlowCompilationValidationHelper.java | 14 +- .../utils/SharedFlowMetricsSingleton.java | 13 +- .../DagActionStoreChangeMonitor.java | 5 +- .../service/GobblinServiceManagerTest.java | 8 +- .../InMemoryUserQuotaManagerTest.java | 10 +- .../orchestration/OrchestratorTest.java | 99 +++++++++++++- .../modules/restli/FlowConfigUtilsTest.java | 4 +- .../FlowCompilationValidationHelperTest.java | 19 ++- 23 files changed, 313 insertions(+), 67 deletions(-) create mode 100644 gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigs.restspec.json create mode 100644 gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json rename gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/{FlowConfigsV2ResourceHandler.java => FlowConfigsResourceHandler.java} (99%) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigs.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigs.restspec.json new file mode 100644 index 00000000000..68c990c04ce --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigs.restspec.json @@ -0,0 +1,31 @@ +{ + "name" : "flowconfigs", + "namespace" : "org.apache.gobblin.service", + "path" : "/flowconfigs", + "schema" : "org.apache.gobblin.service.FlowConfig", + "doc" : "Resource for handling flow configuration requests\n\ngenerated from: org.apache.gobblin.service.FlowConfigsResource", + "collection" : { + "identifier" : { + "name" : "id", + "type" : "org.apache.gobblin.service.FlowId", + "params" : "com.linkedin.restli.common.EmptyRecord" + }, + "supports" : [ "create", "delete", "get", "update" ], + "methods" : [ { + "method" : "create", + "doc" : "Create a flow configuration that the service will forward to execution instances for execution" + }, { + "method" : "get", + "doc" : "Retrieve the flow configuration with the given key" + }, { + "method" : "update", + "doc" : "Update the flow configuration with the specified key. Running flows are not affected.\n An error is raised if the flow configuration does not exist." + }, { + "method" : "delete", + "doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows." + } ], + "entity" : { + "path" : "/flowconfigs/{id}" + } + } +} \ No newline at end of file diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json new file mode 100644 index 00000000000..9845e8dca26 --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json @@ -0,0 +1,124 @@ +{ + "models" : [ { + "type" : "record", + "name" : "EmptyRecord", + "namespace" : "com.linkedin.restli.common", + "doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource.", + "fields" : [ ], + "validate" : { + "com.linkedin.restli.common.EmptyRecordValidator" : { } + } + }, { + "type" : "record", + "name" : "FlowConfig", + "namespace" : "org.apache.gobblin.service", + "doc" : "Defines a flow configuration that can be compiled into Gobblin jobs", + "fields" : [ { + "name" : "id", + "type" : { + "type" : "record", + "name" : "FlowId", + "doc" : "Identifier for a Gobblin as a Service flow", + "fields" : [ { + "name" : "flowName", + "type" : "string", + "doc" : "Name of the flow", + "validate" : { + "strlen" : { + "max" : 128, + "min" : 1 + } + } + }, { + "name" : "flowGroup", + "type" : "string", + "doc" : "Group of the flow. This defines the namespace for the flow.", + "validate" : { + "strlen" : { + "max" : 128, + "min" : 1 + } + } + } ] + }, + "doc" : "Identifier for the flow" + }, { + "name" : "schedule", + "type" : { + "type" : "record", + "name" : "Schedule", + "doc" : "Attributes for defining a job schedule", + "fields" : [ { + "name" : "cronSchedule", + "type" : "string", + "doc" : "Schedule for flow in cron format", + "validate" : { + "org.apache.gobblin.service.validator.CronValidator" : { } + } + }, { + "name" : "runImmediately", + "type" : "boolean", + "doc" : "Set to true to request that a job with a schedule be run immediately in addition to being scheduled", + "default" : false + } ] + }, + "doc" : "Optional schedule for when to execution the flow. If a schedule is not specified then the flow is executed immediately.", + "optional" : true + }, { + "name" : "templateUris", + "type" : "string", + "doc" : "Comma separated list of URIs for templates used in the flow. The template location is defined by the multiproduct that packages the template.", + "validate" : { + "org.apache.gobblin.service.validator.TemplateUriValidator" : { } + } + }, { + "name" : "explain", + "type" : "boolean", + "doc" : "Return the compiled flow as a string. If enabled, the flow is not added.", + "default" : false + }, { + "name" : "owningGroup", + "type" : "string", + "doc" : "Optional string name of group that the requester belongs to for group ownership of flows.", + "optional" : true + }, { + "name" : "properties", + "type" : { + "type" : "map", + "values" : "string" + }, + "doc" : "Properties for the flow. These properties are passed to the compiled Gobblin jobs." + } ] + }, "org.apache.gobblin.service.FlowId", "org.apache.gobblin.service.Schedule" ], + "schema" : { + "name" : "flowconfigs", + "namespace" : "org.apache.gobblin.service", + "path" : "/flowconfigs", + "schema" : "org.apache.gobblin.service.FlowConfig", + "doc" : "Resource for handling flow configuration requests\n\ngenerated from: org.apache.gobblin.service.FlowConfigsResource", + "collection" : { + "identifier" : { + "name" : "id", + "type" : "org.apache.gobblin.service.FlowId", + "params" : "com.linkedin.restli.common.EmptyRecord" + }, + "supports" : [ "create", "delete", "get", "update" ], + "methods" : [ { + "method" : "create", + "doc" : "Create a flow configuration that the service will forward to execution instances for execution" + }, { + "method" : "get", + "doc" : "Retrieve the flow configuration with the given key" + }, { + "method" : "update", + "doc" : "Update the flow configuration with the specified key. Running flows are not affected.\n An error is raised if the flow configuration does not exist." + }, { + "method" : "delete", + "doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows." + } ], + "entity" : { + "path" : "/flowconfigs/{id}" + } + } + } +} \ No newline at end of file diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java index a67b5227e4b..050aa246594 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java @@ -58,7 +58,7 @@ import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.spec_store.FSSpecStore; -import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler; +import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; @@ -131,7 +131,7 @@ public void setUp() throws Exception { groupOwnershipService = new LocalGroupOwnershipService(groupServiceConfig); Injector injector = Guice.createInjector(binder -> { - binder.bind(FlowConfigsV2ResourceHandler.class).toInstance(new FlowConfigsV2ResourceHandler("service_name", flowCatalog)); + binder.bind(FlowConfigsResourceHandler.class).toInstance(new FlowConfigsResourceHandler("service_name", flowCatalog)); binder.bind(RequesterService.class).toInstance(_requesterService); binder.bind(GroupOwnershipService.class).toInstance(groupOwnershipService); }); @@ -270,7 +270,7 @@ public void testUnschedule() throws Exception { FlowConfig persistedFlowConfig = _client.getFlowConfig(flowId); Assert.assertFalse(persistedFlowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_UNSCHEDULE_KEY)); - Assert.assertEquals(Objects.requireNonNull(persistedFlowConfig.getSchedule()).getCronSchedule(), FlowConfigsV2ResourceHandler.NEVER_RUN_CRON_SCHEDULE.getCronSchedule()); + Assert.assertEquals(Objects.requireNonNull(persistedFlowConfig.getSchedule()).getCronSchedule(), FlowConfigsResourceHandler.NEVER_RUN_CRON_SCHEDULE.getCronSchedule()); } @Test (dependsOnMethods = "testUnschedule") diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java index 66cac31be5f..c922cd0ae46 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java @@ -56,7 +56,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.runtime.api.FlowSpecSearchObject; -import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler; +import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler; /** @@ -69,7 +69,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate ALLOWED_METADATA = ImmutableSet.of("delete.state.store"); @Inject - private FlowConfigsV2ResourceHandler flowConfigsResourceHandler; + private FlowConfigsResourceHandler flowConfigsResourceHandler; // For getting who sends the request @Inject diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java index d630069f0b5..79fe85515e0 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java @@ -209,6 +209,7 @@ public static FlowExecution convertFlowStatus(FlowStatus monitoringFlowStatus, jobStatusArray.add(jobStatus); } + // If DagManager is not enabled, we have to determine flow end time by individual job's end times. flowEndTime = flowEndTime == 0L ? maxJobEndTime : flowEndTime; jobStatusArray.sort(Comparator.comparing((org.apache.gobblin.service.JobStatus js) -> js.getExecutionStatistics().getExecutionStartTime())); diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsResourceHandler.java similarity index 99% rename from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java rename to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsResourceHandler.java index 75560663b18..0b9637f40a1 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsResourceHandler.java @@ -75,7 +75,7 @@ @Slf4j -public class FlowConfigsV2ResourceHandler { +public class FlowConfigsResourceHandler { @Getter private String serviceName; @@ -87,7 +87,7 @@ public class FlowConfigsV2ResourceHandler { protected final ContextAwareMeter runImmediatelyFlow; @Inject - public FlowConfigsV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String serviceName, FlowCatalog flowCatalog) { + public FlowConfigsResourceHandler(@Named(InjectionNames.SERVICE_NAME) String serviceName, FlowCatalog flowCatalog) { this.serviceName = serviceName; this.flowCatalog = flowCatalog; MetricContext diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java index bfb24a2df6e..2f5d8656e5a 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java @@ -29,7 +29,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler; +import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler; public class FlowConfigResourceLocalHandlerTest { @@ -48,7 +48,7 @@ public void testCreateFlowSpecForConfig() throws URISyntaxException { setRunImmediately(true)) .setProperties(new StringMap(flowProperties)); - FlowSpec flowSpec = FlowConfigsV2ResourceHandler.createFlowSpecForConfig(flowConfig); + FlowSpec flowSpec = FlowConfigsResourceHandler.createFlowSpecForConfig(flowConfig); Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_GROUP_NAME); Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME); Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY), TEST_SCHEDULE); @@ -65,7 +65,7 @@ public void testCreateFlowSpecForConfig() throws URISyntaxException { .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE). setRunImmediately(true)) .setProperties(new StringMap(flowProperties)); - flowSpec = FlowConfigsV2ResourceHandler.createFlowSpecForConfig(flowConfig); + flowSpec = FlowConfigsResourceHandler.createFlowSpecForConfig(flowConfig); Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_GROUP_NAME); Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java index 5c6d018014c..d1e40521c28 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java @@ -74,7 +74,7 @@ import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils; import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; -import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler; +import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler; import org.apache.gobblin.service.modules.restli.FlowExecutionResourceHandler; import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; import org.apache.gobblin.service.modules.topology.TopologySpecFactory; @@ -139,7 +139,7 @@ public void configure(Binder binder) { binder.bind(FlowConfigsV2Resource.class); binder.bind(FlowStatusResource.class); binder.bind(FlowExecutionResource.class); - binder.bind(FlowConfigsV2ResourceHandler.class); + binder.bind(FlowConfigsResourceHandler.class); binder.bind(FlowExecutionResourceHandler.class); binder.bind(FlowExecutionResourceHandlerInterface.class).to(FlowExecutionResourceHandler.class); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 48b74824501..dfdde2a9ad1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -79,7 +79,7 @@ import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; -import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler; +import org.apache.gobblin.service.modules.restli.FlowConfigsResourceHandler; import org.apache.gobblin.service.modules.restli.FlowExecutionResourceHandler; import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; import org.apache.gobblin.service.modules.topology.TopologySpecFactory; @@ -120,7 +120,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri @Inject @Getter - protected FlowConfigsV2ResourceHandler resourceHandler; + protected FlowConfigsResourceHandler resourceHandler; @Inject @Getter diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index 3d7880e93ab..45ef11a27dd 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -267,6 +267,7 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() { } public void cleanup() { + // Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager if (this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManagerMetrics.class.getSimpleName())) { // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton. // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java index 9813785aa1e..e990bb0f2fb 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java @@ -27,7 +27,9 @@ /** - * An interface for storing and retrieving currently running {@link Dag}s. + * An interface for storing and retrieving currently running {@link Dag}s. In case of a leadership + * change in the {@link org.apache.gobblin.service.modules.core.GobblinServiceManager}, the corresponding {@link DagManager} + * loads the running {@link Dag}s from the {@link DagStateStore} to resume their execution. */ @Alpha public interface DagStateStore { @@ -35,7 +37,7 @@ public interface DagStateStore { * Persist the {@link Dag} to the backing store. * This is not an actual checkpoint but more like a Write-ahead log, where uncommitted job will be persisted * and be picked up again when leader transition happens. - * @param dag The dag submitted to store + * @param dag The dag submitted to {@link DagManager} */ void writeCheckpoint(Dag dag) throws IOException; @@ -58,8 +60,9 @@ default boolean cleanUp(Dag.DagId dagId) throws IOException { void cleanUp(String dagId) throws IOException; /** - * Load all currently running {@link Dag}s from the underlying store. - * @deprecated because {@link DagProcessingEngine} does not need this API + * Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager} + * takes over or on restart of service. + * @deprecated because {@link DagProcessingEngine} that will replace {@link DagManager} does not need this API * @return a {@link List} of currently running {@link Dag}s. */ @Deprecated diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index f9039a068db..0ad869a0fda 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -102,7 +102,7 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, Optional this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 2, "Both flow specs should be present"); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java index 7a6c5c7e34b..9685b6b4938 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java @@ -49,7 +49,7 @@ public void setUp() { @Test public void testExceedsQuotaOnStartup() throws Exception { List> dags = DagTestUtils.buildDagList(2, "user", ConfigFactory.empty()); - // Ensure that the current attempt is 1, normally done by DagProcs + // Ensure that the current attempt is 1, normally done by DagManager dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1); dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1); @@ -61,7 +61,7 @@ public void testExceedsQuotaOnStartup() throws Exception { public void testExceedsUserQuotaThrowsException() throws Exception { List> dags = DagTestUtils.buildDagList(2, "user2", ConfigFactory.empty()); - // Ensure that the current attempt is 1, normally done by DagProcs + // Ensure that the current attempt is 1, normally done by DagManager dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1); dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1); @@ -76,7 +76,7 @@ public void testMultipleRemoveQuotasIdempotent() throws Exception { // Test that multiple decrements cannot cause the number to decrease by more than 1 List> dags = DagTestUtils.buildDagList(2, "user3", ConfigFactory.empty()); - // Ensure that the current attempt is 1, normally done by DagProcs + // Ensure that the current attempt is 1, normally done by DagManager dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1); dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1); @@ -91,7 +91,7 @@ public void testExceedsFlowGroupQuotaThrowsException() throws Exception { List> dags = DagTestUtils.buildDagList(2, "user4", ConfigFactory.empty().withValue( ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef("group1"))); - // Ensure that the current attempt is 1, normally done by DagProcs + // Ensure that the current attempt is 1, normally done by DagManager dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1); dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1); @@ -113,7 +113,7 @@ public void testUserAndFlowGroupQuotaMultipleUsersAdd() throws Exception { 1, "user6", ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef("group3"))); Dag dag4 = DagTestUtils.buildDag("4", System.currentTimeMillis(), DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 1, "user5", ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef("group2"))); - // Ensure that the current attempt is 1, normally done by DagProcs + // Ensure that the current attempt is 1, normally done by DagManager dag1.getNodes().get(0).getValue().setCurrentAttempts(1); dag2.getNodes().get(0).getValue().setCurrentAttempts(1); dag3.getNodes().get(0).getValue().setCurrentAttempts(1); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 9502e6b63a4..caef7fbbb8f 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration; import java.io.File; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; @@ -25,6 +26,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -57,6 +59,9 @@ import org.apache.gobblin.service.FlowId; import org.apache.gobblin.service.GobblinServiceManagerTest; import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; +import org.apache.gobblin.service.modules.flow.SpecCompiler; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper; import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; @@ -64,8 +69,12 @@ import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class OrchestratorTest { @@ -88,6 +97,8 @@ public class OrchestratorTest { private FlowSpec flowSpec; private ITestMetastoreDatabase testMetastoreDatabase; + private FlowStatusGenerator mockFlowStatusGenerator; + private FlowCompilationValidationHelper mockedFlowCompilationValidationHelper; private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator; @BeforeClass @@ -119,7 +130,7 @@ public void setUp() throws Exception { this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), Optional.of(logger), Optional.absent(), true); this.serviceLauncher.addService(flowCatalog); - FlowStatusGenerator mockFlowStatusGenerator = mock(FlowStatusGenerator.class); + this.mockFlowStatusGenerator = mock(FlowStatusGenerator.class); MySqlDagManagementStateStore dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); @@ -132,6 +143,12 @@ public void setUp() throws Exception { this.topologyCatalog, Optional.of(logger), mock(FlowLaunchHandler.class), sharedFlowMetricsSingleton, dagManagementStateStore, flowCompilationValidationHelper, mock(JobStatusRetriever.class)); + /* Initialize a second orchestrator with a mocked flowCompilationValidationHelper to use Mockito to spoof the dag + returned by a call to compile a flowSpec + */ + this.mockedFlowCompilationValidationHelper = mock(FlowCompilationValidationHelper.class); + when(mockedFlowCompilationValidationHelper.getSpecCompiler()).thenReturn(mock(SpecCompiler.class)); + this.topologyCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator); this.flowCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator); // Start application @@ -199,7 +216,9 @@ private FlowSpec initFlowSpec() { properties.put("gobblin.flow.destinationIdentifier", "destination"); Config config = ConfigUtils.propertiesToConfig(properties); - FlowSpec.Builder flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR, FLOW_SPEC_GROUP_DIR)) + FlowSpec.Builder flowSpecBuilder = null; + flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR, + FLOW_SPEC_GROUP_DIR)) .withConfig(config) .withDescription(SPEC_DESCRIPTION) .withVersion(SPEC_VERSION) @@ -216,9 +235,10 @@ private FlowSpec initBadFlowSpec() { properties.put("gobblin.flow.destinationIdentifier", "destination"); Config config = ConfigUtils.propertiesToConfig(properties); - FlowSpec.Builder flowSpecBuilder; + FlowSpec.Builder flowSpecBuilder = null; try { - flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR, FLOW_SPEC_GROUP_DIR)) + flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR, + FLOW_SPEC_GROUP_DIR)) .withConfig(config) .withDescription(SPEC_DESCRIPTION) .withVersion(SPEC_VERSION) @@ -231,7 +251,8 @@ private FlowSpec initBadFlowSpec() { public URI computeTopologySpecURI(String parent, String current) { // Make sure this is relative - return PathUtils.relativizePath(new Path(current), new Path(parent)).toUri(); + URI uri = PathUtils.relativizePath(new Path(current), new Path(parent)).toUri(); + return uri; } // TODO: this test doesn't exercise `Orchestrator` and so belongs elsewhere - move it, then rework into `@BeforeMethod` init (since others depend on this) @@ -379,6 +400,74 @@ public void doNotRegisterMetricsAdhocFlows() throws Throwable { Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName)); } + @Test(enabled = false) + // todo - re-write with dag proc engine + public void removeFlowSpecWhenDagAdded() throws Throwable { + // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod` + createTopologySpec(); // for flow compilation to pass + + FlowId flowId = GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME); + FlowSpec flowSpec = createBasicFlowSpecForFlowId(flowId); + this.topologyCatalog.getInitComplete().countDown(); // unblock orchestration + + this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(flowSpec, new Properties(), 0, false); + } + + @Test(enabled = false) + // todo - re-write with dag proc engine + public void removeFlowSpecEvenWhenDagNotAddedDueToCompilationFailure() throws Throwable { + // to cause flow compilation to fail, DO NOT EXECUTE: createTopologySpec(); + + FlowId flowId = GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME); + FlowSpec flowSpec = createBasicFlowSpecForFlowId(flowId); + this.topologyCatalog.getInitComplete().countDown(); // unblock orchestration + + this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(flowSpec, new Properties(), 0, false); + + // (verifies that compilation failure precedes enforcement of concurrent flow executions) + Mockito.verify(this.mockFlowStatusGenerator, Mockito.never()).isFlowRunning(any(), any(), anyLong()); + } + + @Test(enabled = false) + // todo - re-write with dag proc engine + public void removeFlowSpecEvenWhenDagNotAddedDueToConcurrentExecution() throws Throwable { + // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod` + createTopologySpec(); // for flow compilation to pass + + FlowId flowId = GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME); + Properties noConcurrentExecsProps = new Properties(); + noConcurrentExecsProps.setProperty("flow.allowConcurrentExecution", "false"); + FlowSpec flowSpec = createBasicFlowSpecForFlowId(flowId, noConcurrentExecsProps); + this.topologyCatalog.getInitComplete().countDown(); // unblock orchestration + + Mockito.when(this.mockFlowStatusGenerator.isFlowRunning(eq(flowId.getFlowName()), eq(flowId.getFlowGroup()), anyLong())).thenReturn(true); + + this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(flowSpec, new Properties(), 0, false); + + // (verifies enforcement of concurrent flow executions) + Mockito.verify(this.mockFlowStatusGenerator, Mockito.times(1)).isFlowRunning(eq(flowId.getFlowName()), eq(flowId.getFlowGroup()), anyLong()); + } + + + /** + * Tests that when compiling and forwarding a dagAction from + * {@link org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor#submitFlowToDagManagerHelper} to the + * DagManager that {@link DagManager#removeFlowSpecIfAdhoc(FlowSpec)} is called to ensure adhoc flowSpecs are deleted + * after compilation. + */ + @Test(enabled = false) + // todo - re-write with dag proc engine + public void testDeleteFlowSpecCalledForMultiActivePath() + throws IOException, URISyntaxException, InterruptedException { + FlowId flowId = GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME); + FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId); + FlowSpec flowSpec1 = initFlowSpec(); + + Optional> dag = Optional.of( + DagTestUtils.buildDag("0", 123L, "FINISH_RUNNING", false)); + Mockito.when(this.mockedFlowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec1)).thenReturn(dag); + } + public static FlowSpec createBasicFlowSpecForFlowId(FlowId flowId) throws URISyntaxException { return createBasicFlowSpecForFlowId(flowId, new Properties()); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java index 94ab199ccc5..f75fba17e43 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java @@ -37,7 +37,7 @@ public class FlowConfigUtilsTest { private void testFlowSpec(FlowConfig flowConfig) { try { - FlowConfigsV2ResourceHandler.createFlowSpecForConfig(flowConfig); + FlowConfigsResourceHandler.createFlowSpecForConfig(flowConfig); } catch (FlowConfigLoggedException e) { Assert.fail("Should not get to here"); } @@ -154,7 +154,7 @@ public void testFlowConfigWithoutTemplateUri() { flowConfig.setProperties(new StringMap(Maps.fromProperties(properties))); try { - FlowConfigsV2ResourceHandler.createFlowSpecForConfig(flowConfig); + FlowConfigsResourceHandler.createFlowSpecForConfig(flowConfig); Assert.fail("Should not get to here"); } catch (RequiredFieldNotPresentException e) { Assert.assertTrue(true, "templateUri cannot be empty"); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java index 349b161ad8a..37ab711f01b 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java @@ -19,27 +19,26 @@ import java.net.URISyntaxException; import java.util.HashMap; - -import org.junit.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.DagTestUtils; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.junit.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; /** - * Test functionality provided by the helper class re-used between the DagProcs and Orchestrator for flow compilation. + * Test functionality provided by the helper class re-used between the DagManager and Orchestrator for flow compilation. */ public class FlowCompilationValidationHelperTest { - private final Long jobSpecFlowExecutionId = 1234L; + private String dagId = "testDag"; + private Long jobSpecFlowExecutionId = 1234L; + private String existingFlowExecutionId = "9999"; private Dag jobExecutionPlanDag; @BeforeClass public void setup() throws URISyntaxException { - String dagId = "testDag"; jobExecutionPlanDag = DagTestUtils.buildDag(dagId, jobSpecFlowExecutionId); } @@ -60,10 +59,8 @@ public void testAddFlowExecutionIdWhenAbsent() { @Test public void testSkipAddingFlowExecutionIdWhenPresent() { HashMap flowMetadata = new HashMap<>(); - String existingFlowExecutionId = "9999"; flowMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, existingFlowExecutionId); FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,jobExecutionPlanDag); - Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD), - existingFlowExecutionId); + Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD), existingFlowExecutionId); } }