From cf2d4e8af7b077482affc6282ee0a91475e60b76 Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Mon, 9 Sep 2024 15:24:09 -0700 Subject: [PATCH] address review comments --- ...gobblin.service.flowstatuses.restspec.json | 2 +- ...gobblin.service.flowstatuses.snapshot.json | 2 +- ...FlowExecutionResourceHandlerInterface.java | 2 +- ... => FlowConfigsV2ResourceHandlerTest.java} | 2 +- ... => FlowExecutionResourceHandlerTest.java} | 2 +- .../modules/core/GobblinServiceManager.java | 2 +- .../restli/FlowExecutionResourceHandler.java | 3 + ...Test.java => FlowExecutionClientTest.java} | 2 +- .../modules/orchestration/DagFlowTest.java | 58 ------------------- .../modules/orchestration/DagUtilsTest.java | 19 ++++++ 10 files changed, 29 insertions(+), 65 deletions(-) rename gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/{FlowConfigResourceLocalHandlerTest.java => FlowConfigsV2ResourceHandlerTest.java} (98%) rename gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/{FlowExecutionResourceLocalHandlerTest.java => FlowExecutionResourceHandlerTest.java} (97%) rename gobblin-service/src/test/java/org/apache/gobblin/service/{FlowExecutionTest.java => FlowExecutionClientTest.java} (99%) delete mode 100644 gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json index b6a0ee0eef8..f84720ab487 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowstatuses.restspec.json @@ -6,7 +6,7 @@ "namespace" : "org.apache.gobblin.service", "path" : "/flowstatuses", "schema" : "org.apache.gobblin.service.FlowStatus", - "doc" : "Resource for handling flow status requests. Deprecated, use {@link FlowExecutionResource}\n\ngenerated from: org.apache.gobblin.service.FlowStatusResource", + "doc" : "Deprecated, use {@link FlowExecutionResource}\n\nResource for handling flow status requests. generated from: org.apache.gobblin.service.FlowStatusResource", "collection" : { "identifier" : { "name" : "id", diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json index e85d93df793..df4d02067a5 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json @@ -271,7 +271,7 @@ "namespace" : "org.apache.gobblin.service", "path" : "/flowstatuses", "schema" : "org.apache.gobblin.service.FlowStatus", - "doc" : "Resource for handling flow status requests. Deprecated, use {@link FlowExecutionResource}\n\ngenerated from: org.apache.gobblin.service.FlowStatusResource", + "doc" : "Deprecated, use {@link FlowExecutionResource}\n\nResource for handling flow status requests. generated from: org.apache.gobblin.service.FlowStatusResource", "collection" : { "identifier" : { "name" : "id", diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandlerInterface.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandlerInterface.java index 48e3ac149d7..cc9b0e34b5f 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandlerInterface.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandlerInterface.java @@ -26,7 +26,7 @@ // Unlike FlowConfigsV2ResourceHandler, this is an interface rather than a class because it's implementation needs -// classes from gobblin-service module, and adding gobblin-service as a dependency will cause circular dependency, +// classes from gobblin-service module, and adding gobblin-service as a dependency will cause circular dependency public interface FlowExecutionResourceHandlerInterface { /** * Get {@link FlowExecution} diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigsV2ResourceHandlerTest.java similarity index 98% rename from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java rename to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigsV2ResourceHandlerTest.java index bfb24a2df6e..8675c6c9dc3 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigsV2ResourceHandlerTest.java @@ -32,7 +32,7 @@ import org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler; -public class FlowConfigResourceLocalHandlerTest { +public class FlowConfigsV2ResourceHandlerTest { private static final String TEST_GROUP_NAME = "testGroup1"; private static final String TEST_FLOW_NAME = "testFlow1"; private static final String TEST_SCHEDULE = "0 1/0 * ? * *"; diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandlerTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java similarity index 97% rename from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandlerTest.java rename to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java index 8215135bf4d..0333a167b73 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandlerTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceHandlerTest.java @@ -21,7 +21,7 @@ import org.testng.annotations.Test; -public class FlowExecutionResourceLocalHandlerTest { +public class FlowExecutionResourceHandlerTest { @Test public void testEstimateCopyTimeLeftSanityCheck() { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index d9141a01344..d36afdeeec4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -280,7 +280,7 @@ private void registerServicesInLauncher(){ private void configureServices(){ if (configuration.isRestLIServerEnabled()) { this.restliServer = EmbeddedRestliServer.builder() - .resources(Lists.newArrayList(FlowConfigsV2Resource.class, FlowConfigsV2Resource.class)) + .resources(Lists.newArrayList(FlowConfigsV2Resource.class)) .injector(injector) .build(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowExecutionResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowExecutionResourceHandler.java index 85bb3851e84..8dc2802dc55 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowExecutionResourceHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowExecutionResourceHandler.java @@ -45,6 +45,9 @@ import org.apache.gobblin.service.monitoring.FlowStatusGenerator; +/** + * This is associated with {@link FlowExecutionResource} and handles all the requests FlowExecutionResource get. + */ @Slf4j public class FlowExecutionResourceHandler implements FlowExecutionResourceHandlerInterface { private final DagManagementStateStore dagManagementStateStore; diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/FlowExecutionTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/FlowExecutionClientTest.java similarity index 99% rename from gobblin-service/src/test/java/org/apache/gobblin/service/FlowExecutionTest.java rename to gobblin-service/src/test/java/org/apache/gobblin/service/FlowExecutionClientTest.java index 15380be091c..c6d88591bba 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/FlowExecutionTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/FlowExecutionClientTest.java @@ -46,7 +46,7 @@ @Test(groups = { "gobblin.service" }, singleThreaded = true) -public class FlowExecutionTest { +public class FlowExecutionClientTest { private FlowExecutionClient client; private EmbeddedRestliServer _server; private List> _listOfJobStatusLists; diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java deleted file mode 100644 index b07ebe8f033..00000000000 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.orchestration; - -import java.util.concurrent.TimeUnit; - -import org.testng.Assert; -import org.testng.annotations.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.service.modules.spec.JobExecutionPlan; - - -/** - * Tests the state updates (including updating in-memory state and MysqlDagActionStore) after performing add or cancel - * operations by calling addDag, stopDag, kill, and resume. It also tests flows with and without sla configs. - */ -public class DagFlowTest { - - @Test - void slaConfigCheck() throws Exception { - Dag dag = DagTestUtils.buildDag("5", 123456783L, "FINISH_RUNNING", 1); - Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)), ServiceConfigKeys.DEFAULT_FLOW_FINISH_DEADLINE_MILLIS); - - Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig(); - jobConfig = jobConfig - .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, ConfigValueFactory.fromAnyRef("7")) - .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name())); - dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig); - Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)), TimeUnit.SECONDS.toMillis(7L)); - - jobConfig = jobConfig - .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, ConfigValueFactory.fromAnyRef("8")) - .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.name())); - dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig); - Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)), TimeUnit.MINUTES.toMillis(8L)); - } -} diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java index 4d710fb79f2..fe307c35b4d 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java @@ -63,6 +63,25 @@ public class DagUtilsTest { .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)); + @Test + void slaConfigCheck() throws Exception { + Dag dag = DagTestUtils.buildDag("5", 123456783L, "FINISH_RUNNING", 1); + Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)), ServiceConfigKeys.DEFAULT_FLOW_FINISH_DEADLINE_MILLIS); + + Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig(); + jobConfig = jobConfig + .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, ConfigValueFactory.fromAnyRef("7")) + .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name())); + dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig); + Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)), TimeUnit.SECONDS.toMillis(7L)); + + Config jobConfig2 = jobConfig + .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, ConfigValueFactory.fromAnyRef("8")) + .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.name())); + dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig2); + Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)), TimeUnit.MINUTES.toMillis(8L)); + } + @Test void deadlineConfigCheck() throws Exception { Dag dag = DagTestUtils.buildDag("5", 123456783L, "FINISH_RUNNING", 1);