Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2056] initialize topology specs directly without waitging for listener call… #3937

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

arjun4084346
Copy link
Contributor

@arjun4084346 arjun4084346 commented Apr 29, 2024

…backs

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    topologies are static and fixed. they can be initialized through configs. but right now we initialize them in spec compiler making spec compiler as a topology spec catalog listener and then wait for topology spec catalog to populate topologies.
    this sometimes take longer time because it happens when some of the services' setActive is called. some newly created classes like DagManagementStateStore initialization may fail if topologies are not present by that time.
    so in this pr i am populating topologies sooner in spec compiler

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    updated existing test cases

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@arjun4084346 arjun4084346 force-pushed the initTopology branch 3 times, most recently from 1dccd35 to 6d61f02 Compare April 29, 2024 12:42
@arjun4084346 arjun4084346 changed the title initialize topology specs directly without waitging for listener call… [GOBBLIN-2056] initialize topology specs directly without waitging for listener call… Apr 29, 2024
Copy link
Contributor

@Will-Lo Will-Lo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to load the topology spec store earlier in the GobblinServiceManager? Otherwise the topology map being passed to the flow compilation validation helper can be empty right?


public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
if (instrumentationEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention here to remove the use of this variable for checking if instrumentation is enabled? It's still used widely in GaaS modules so there should be justification

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arjun did some refactoring before which made instrumentation required I believe. Can you link that change to explain what's happening here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. yes, in https://github.com/apache/gobblin/pull/3855/files I made some optional fields mandatory. instrumentationEnabled was not one of them, but eventSubmitter was which is related.
  2. instrumentationEnabled is usually true in prod, these variables are set false only for doing easier tests.
  3. Also, in this particular case, instrumentationEnabled was hard coded to true in the above above constructor anyway.

@@ -73,12 +74,12 @@ public class FlowCompilationValidationHelper {

@Inject
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we know the topologySpecFactory has been initialized by now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopologySpecFactory is guice based initialized

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I see that but what is the waiting you refer to in description "making spec compiler as a topology spec catalog listener and then wait for topology spec catalog to populate topologies." How do you know the topologySpecCatalog is populated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking GobblinServiceManager start to see how TopologySpecCatalog is populated and it may not have the topologies loaded if it uses the listener method rather than TopologySpecFactory.getTopologies()

One safe guard we can add is both Orchestrator and SpecCompiler'sonAddSpec method should check if specCompiler.isActive is true before accepting specs to pass to compiler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpecCompiler gets information about topology specs through orchestrator's onAddSpec. And orchestrator gets info about topology specs because it is a listener on topology spec catalog.
So compiler has to "wait for topology spec catalog to populate topologies". topology spec catalog is populated here https://github.com/apache/gobblin/blob/master/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java#L508 but we may want compiler to have topology information before this code reaches.


public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
if (instrumentationEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arjun did some refactoring before which made instrumentation required I believe. Can you link that change to explain what's happening here?

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems the intent is to remove the loose coupling between topo spec init and compiler init. are there any potential downsides? e.g. would it disallow the dynamic addition of additional topos during the course of system operation? this or any other limitations should be clearly captured in a comment

when you say:

topologies are static and fixed

is this essential to their nature or merely common practice we've adopted?

I'm not against a pragmatic change to preclude something we truly don't anticipate needing, but let's characterize for maintainers whether this is pure expedience (aka. easier than adding concurrency coordination to sub-service init)... or we actually believe it to be an essential part to modeling the solution (that the current impl mispercieved).

also nit: "waiting" in the title.

this.config = config;

/***
/*
* ETL-5996
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not an apache gobblin jira ticket

Comment on lines -222 to -223
} else if (addedSpec instanceof TopologySpec) {
return onAddTopologySpec( (TopologySpec) addedSpec);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it wise to remove the ability to be a TS listener?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

being a TS listener feels of no use to me. There should be no TS listener in gaas.

Copy link
Contributor

@phet phet May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I equate topology specs w/ executors in the flow graph. since the FG can change dynamically w/o requiring a system restart, it doesn't seem out of the question to change the set of topos w/o a system restart either, given whatever new FG edges could indicate newly defined flow.edge.specExecutors

I agree that's not what we've done thus far, but it's arguably inconvenient to require two separate changes to define a new executor - one to the FG and one to the gaas configs. if it were possible to do both together, I would personally find that appealing

@@ -114,6 +119,8 @@ public void setup() throws Exception {
FlowLaunchHandler mockFlowTriggerHandler = mock(FlowLaunchHandler.class);
DagManager mockDagManager = mock(DagManager.class);
doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
TopologySpecFactory mockedTopologySpecFactory = mock(TopologySpecFactory.class);
doReturn(Collections.singleton(this.topologySpec)).when(mockedTopologySpecFactory).getTopologies();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer the when().thenReturn() form, which is typesafe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer this way, because in when(obj.method()).thenReturn(obj) it actually calls obj.method() which is rarely useful (and often generate exceptions) because obj is a mocked dummy object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up to you. I do agree with a spy that doReturn is better for the reason you give, but with a mock, the type safety is helpful.

// Make sure TopologyCatalog Listener is empty
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 0, "SpecCompiler should not know about any Topology "
+ "before addition");
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should know about any Topology "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

desc here makes it sound like the test should be > 1. to that end, I didn't notice any Preconditions check or similar to insist on a non-empty collection. do we want one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"SpecCompiler should know about any Topology irrespective of what is there in the topology catalog" ? how does that sound like test should be >1 ? What description do you suggest?

There is no precondition. SpecCompiler should just know about the topologies. Number of topologies known to spec compiler should just always be 1 (equals to total number of topologies).

Copy link
Contributor

@phet phet May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"know about any" suggested it would apply even if there were > 1 topo specs. couldn't there be more than one?

also, if non-empty Set<TopologySpec> is critical, I was suggesting a guava Precondition in the ctor

Comment on lines +255 to +256
// Make sure TopologyCatalog empty
Assert.assertTrue(this.topologyCatalog.getSize() == 0, "Topology catalog should contain 0 Spec before addition");
Copy link
Contributor

@phet phet May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this asserts that the spec compiler's topos may now potentially deviate from the topo catalog's, which might make the system harder to reason about... is it really a good thing to drop such an invariant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a deviation only till everything is initialized. after that, they both should be same (1 in this test).

Copy link
Contributor

@phet phet May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prior to this change, there's an "inconsistent timespan", where the compiler doesn't have topo specs initialized. you suggested this might compromise correctness of DagManagementStateStore.

after this change, sounds like there will be an "inconsistent timespan", where the TopologyCatalog may be out of sync w/ the topo specs known to the compiler. could there be any negative consequence to that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants