-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single Participant #3715
Conversation
...rvice/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" | ||
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" | ||
+ ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " | ||
+ "event_timestamp TIMESTAMP, " | ||
+ "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP," | ||
+ "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT '1970-01-02 00:00:00', " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this choice of default seems questionable... what advantage does it have over just using null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was actually able to get this working again with CURRENT_TIMESTAMP by ensuring this value is always set manually to previous value when updating other columns in the row
+ "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP) / 1000 <= epsilon as isWithinEpsilon, CASE " | ||
+ "WHEN CURRENT_TIMESTAMP < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 " | ||
+ "WHEN CURRENT_TIMESTAMP >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 " | ||
+ "ELSE 3 END as leaseValidityStatus, linger, CURRENT_TIMESTAMP FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know enough about sql's evaluation rules to be certain whether the multiple invocations of CURRENT_TIMESTAMP
all resolve to a common evaluation or would be invoked multiple times (and if so, whether in a defined order or not). do you know?
if this leads to multiple evaluations with no ordering guarantee, would all logic remain internally consistent and provide an atomic view across multiple fields?
if not, tge alternative would be to evaluate CURRENT_TIMESTAMP
only once in an inner (nested) query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In MySQL, multiple invocations of the CURRENT_TIMESTAMP function within a single query will all resolve to the same timestamp value. This is because the function is evaluated only once per query execution and the same value is used for all occurrences of the function within the query. -> https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice research BTW!
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
// CASE 1: If no existing row for this flow action, then go ahead and insert | ||
if (!resultSet.next()) { | ||
if (getResult == null) { | ||
log.debug("CASE 1: no existing row for this flow action, then go ahead and insert"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be worth writing only in log.debug
, not also as a comment just above. [update: I see other cases lack logging. I'd imagine tracing more helpful to debugging than a source comment.]
also, I'm forgetting on the numbering... did each case receive a number somewhere previously, which you're referencing here (e.g. in a javadoc comment)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me move these all to debug level logs instead of comments. I am numbering these cases here explicitly actually and referencing these numbers in the unit tests. They are not defined elsewhere.
int dbLinger = resultSet.getInt(3); | ||
// Fetch values in row after attempted insert | ||
String formattedSelectAfterInsertStatement = | ||
String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName, this.constantsTableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering: is it truly necessary to bind the table name every time this query is made? e.g. couldn't it be done once when the instance is constructed? or might the table names ever change throughout the instance's lifetime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table name shouldn't change over the instance's lifetime since it would need to be re-initialized through the config. For this particular statement I defined it globally and also did so for the other SELECT statement that is re-used. The CREATE statements aren't re-used so it doesn't make sense to make them class variables.
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
private int dbLinger; | ||
private Timestamp dbCurrentTimestamp; | ||
|
||
GetEventInfoResult(ResultSet resultSet) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, but rather than defining a variant constructor, this would be better as a factory method:
public static fromResultSet(ResultSet rs);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, I created two factory methods for GetEventInfoResult
and SelectInfoResult
(naming perhaps isn't the best but can be addressed in later PR)
protected Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter; | ||
protected SchedulerService schedulerService; | ||
protected DagActionStore dagActionStore; | ||
protected Optional<DagActionStore> dagActionStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great! I strongly prefer this to null
;)
even so, since the two must work together, the ultimate abstraction would be a Strategy interface w/ capability to persist the flow action and handle the trigger event. in that approach, this ctor would pass these two optionals to a factory to get an appropriate instance of the strategy. when both present, the factory would give a strategy using them both, but when they're both missing, a "null strategy" would be returned. the factory would throw if only one were given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea! I can think about adding that interface in the future. By null strategy do you mean doing nothing? I agree with throwing an error if only one is given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, "Null Pattern", in essence, codes each operation as a no-op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall, the PR title lacks context. perhaps "Add unit tests for MysqlMALeaseArbiter
?
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
Optional<Integer> count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, this.constantsTableName), getStatement -> { | ||
ResultSet resultSet = getStatement.executeQuery(); | ||
if (resultSet.next()) { | ||
return resultSet.getInt(1); | ||
return Optional.of(resultSet.getInt(1)); | ||
} | ||
return -1; | ||
return Optional.absent(); | ||
}, true); | ||
|
||
// Only insert epsilon and linger values from config if this table does not contain pre-existing values. | ||
if (count == 0) { | ||
if (count.isPresent() && count.get() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with multiple participants, use of "check, then set" is error-prone. e.g. two or more rows could be inserted.
the algo requires there to only ever be exactly one. as this is essential, add a primary key field that should only ever have one value (== 1). every INSERT should hard-code that PK value. to seamlessly handle both first-time init, or instead update when the config settings have changed, utilize INSERT... ON DUPLICATE KEY UPDATE
. see: https://stackoverflow.com/a/1361368
every participant would then "upsert" upon start up, where the most recent value wins, clobbering the former. even so, due to all participants having uniform config, in general, no actual overwriting happens with most updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good catch, I updated the statement to a simpler one INSERT ... WHERE NOT EXISTS (SELECT 1 FROM table)
to insert if there are no rows in table.
Codecov Report
@@ Coverage Diff @@
## master #3715 +/- ##
============================================
- Coverage 46.82% 43.92% -2.90%
+ Complexity 10804 4985 -5819
============================================
Files 2141 1073 -1068
Lines 84429 43583 -40846
Branches 9383 4807 -4576
============================================
- Hits 39537 19146 -20391
+ Misses 41294 22601 -18693
+ Partials 3598 1836 -1762
... and 1083 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, what are you thoughts on the potential race-condition I raised about the constants table init?
} catch (SQLException e) { | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
if this is all you want, it's the implicit behavior, which you need not write explicitly.
-
even so, elsewhere, we wrap
SQLException
in anIOException
. do we want that here too... or is there already a higher layer wrapping around this invocation where that will happen for us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to wrap in IOException
as we do that in other places, updating to wrap it.
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Outdated
Show resolved
Hide resolved
Addressed above, forgot to make the change in previous commit. |
private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (epsilon, linger) SELECT ?, ? " | ||
+ "WHERE NOT EXISTS (SELECT 1 FROM %s)"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to overwrite in case the pre-existing values are out-of-date? that's why I suggested INSERT INTO... ON DUPLICATE KEY UPDATE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, had a misunderstanding about whether or not we want to update the values upon each redeploy if new config values are provided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great job!
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already. | ||
private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (epsilon, linger) SELECT ?, ? " | ||
+ "WHERE NOT EXISTS (SELECT 1 FROM %s)"; | ||
private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (primary_key, epsilon, linger) " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I might call UPSERT_CONSTANTS_TABLE_STATEMENT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
Show resolved
Hide resolved
protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException { | ||
try { | ||
if (!resultSet.next()) { | ||
log.error("Expected num rows and lease_acquisition_timestamp returned from query but received nothing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused here, so if there's no item after in the result set we log and error but still try to parse the current result set results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good catch, I want this code path to terminate so instead will through an IO Error.
Assert.assertTrue( | ||
killObtainedStatus.getLeaseAcquisitionTimestamp() >= killObtainedStatus.getEventTimestamp()); | ||
|
||
// Tests CASE 2 of acquire lease for a flow action event that already has a valid lease for the same event in db |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense for us to have separate cases instead of combining these all into one super test? In case implementation details change in the future it will be easier to triage/isolate for certain cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially tried to make it separate tests but it was easier to write this way since a lot of them related to one another. For now my priority is to get this tested and merged. I'll add a TODO to refactor it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make tests have dependencies on each other so that they run synchronously, but I guess not urgent if you are still working on improving this
* upstream/master: Fix bug with total count watermark whitelist (apache#3724) [GOBBLIN-1858] Fix logs relating to multi-active lease arbiter (apache#3720) [GOBBLIN-1838] Introduce total count based completion watermark (apache#3701) Correct num of failures (apache#3722) [GOBBLIN- 1856] Add flow trigger handler leasing metrics (apache#3717) [GOBBLIN-1857] Add override flag to force generate a job execution id based on gobbl… (apache#3719) [GOBBLIN-1855] Metadata writer tests do not work in isolation after upgrading to Iceberg 1.2.0 (apache#3718) Remove unused ORC writer code (apache#3710) [GOBBLIN-1853] Reduce # of Hive calls during schema related updates (apache#3716) [GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single Participant (apache#3715) [GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (apache#3712) [GOBBLIN-1849] Add Flow Group & Name to Job Config for Job Scheduler (apache#3713) [GOBBLIN-1841] Move disabling of current live instances to the GobblinClusterManager startup (apache#3708) [GOBBLIN-1840] Helix Job scheduler should not try to replace running workflow if within configured time (apache#3704) [GOBBLIN-1847] Exceptions in the JobLauncher should try to delete the existing workflow if it is launched (apache#3711) [GOBBLIN-1842] Add timers to GobblinMCEWriter (apache#3703) [GOBBLIN-1844] Ignore workflows marked for deletion when calculating container count (apache#3709) [GOBBLIN-1846] Validate Multi-active Scheduler with Logs (apache#3707) [GOBBLIN-1845] Changes parallelstream to stream in DatasetsFinderFilteringDecorator to avoid classloader issues in spark (apache#3706) [GOBBLIN-1843] Utility for detecting non optional unions should convert dataset urn to hive compatible format (apache#3705) [GOBBLIN-1837] Implement multi-active, non blocking for leader host (apache#3700) [GOBBLIN-1835]Upgrade Iceberg Version from 0.11.1 to 1.2.0 (apache#3697) Update CHANGELOG to reflect changes in 0.17.0 Reserving 0.18.0 version for next release [GOBBLIN-1836] Ensuring Task Reliability: Handling Job Cancellation and Graceful Exits for Error-Free Completion (apache#3699) [GOBBLIN-1805] Check watermark for the most recent hour for quiet topics (apache#3698) [GOBBLIN-1825]Hive retention job should fail if deleting underlying files fail (apache#3687) [GOBBLIN-1823] Improving Container Calculation and Allocation Methodology (apache#3692) [GOBBLIN-1830] Improving Container Transition Tracking in Streaming Data Ingestion (apache#3693) [GOBBLIN-1833]Emit Completeness watermark information in snapshotCommitEvent (apache#3696)
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Here are some details about my PR, including screenshots (if applicable):
Tests all cases of trying to acquire a lease for a flow action event with one participant involved and makes corresponding fixes in the
MultiActiveLeaseArbiter
.One key change this PR includes is to remove usage the participant's local
event_timestamp
in the database to identify the particular flow_action event. We swap it out for the database utilize the CURRENT_TIMESTAMP of the database to insert or keep track of our event. This is to avoid any discrepancies between local time and database time for future comparisons.Large number of fixes relate to MySQL-specific bugs for example NULL values not being permitted by default, syntax errors in creation, inserting into a table with a create statement, etc...
Tests
Tests the following cases
Commits