-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single Participant #3715
Changes from 2 commits
9b7f505
48134fb
1feddee
8b8de03
84c2941
fdbf7a8
7c7bb2c
b43a587
a048d84
b82d45a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,8 +100,9 @@ protected interface CheckedFunction<T, R> { | |
+ "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))"; | ||
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s " | ||
+ "(epsilon INT, linger INT, PRIMARY KEY (epsilon, linger))"; | ||
private static final String GET_ROW_COUNT_STATEMENT = "SELECT COUNT(*) FROM %s"; | ||
private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (epsilon, linger) VALUES (?,?)"; | ||
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already. | ||
private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (epsilon, linger) SELECT ?, ? " | ||
+ "WHERE NOT EXISTS (SELECT 1 FROM %s)"; | ||
protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=?" | ||
+ " AND flow_action=?"; | ||
protected static final String WHERE_CLAUSE_TO_MATCH_ROW = WHERE_CLAUSE_TO_MATCH_KEY | ||
|
@@ -173,30 +174,14 @@ private void initializeConstantsTable() throws IOException { | |
String createConstantsStatement = String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName); | ||
umustafi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(), true); | ||
|
||
Optional<Integer> count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, this.constantsTableName), getStatement -> { | ||
ResultSet resultSet = getStatement.executeQuery(); | ||
try { | ||
if (resultSet.next()) { | ||
return Optional.of(resultSet.getInt(1)); | ||
} | ||
return Optional.absent(); | ||
} finally { | ||
if (resultSet != null) { | ||
resultSet.close(); | ||
} | ||
} | ||
String insertConstantsStatement = String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName, | ||
this.constantsTableName); | ||
withPreparedStatement(insertConstantsStatement, insertStatement -> { | ||
int i = 0; | ||
insertStatement.setInt(++i, epsilon); | ||
insertStatement.setInt(++i, linger); | ||
return insertStatement.executeUpdate(); | ||
}, true); | ||
|
||
// Only insert epsilon and linger values from config if this table does not contain pre-existing values. | ||
if (count.isPresent() && count.get() == 0) { | ||
String insertConstantsStatement = String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName); | ||
withPreparedStatement(insertConstantsStatement, insertStatement -> { | ||
int i = 0; | ||
insertStatement.setInt(++i, epsilon); | ||
insertStatement.setInt(++i, linger); | ||
return insertStatement.executeUpdate(); | ||
}, true); | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -234,7 +219,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l | |
completeInsertPreparedStatement(insertStatement, flowAction); | ||
return insertStatement.executeUpdate(); | ||
}, true); | ||
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction); | ||
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.absent()); | ||
} | ||
|
||
// Extract values from result set | ||
|
@@ -255,13 +240,13 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l | |
dbCurrentTimestamp.getTime()); | ||
// Utilize db timestamp for reminder | ||
return new LeasedToAnotherStatus(flowAction, dbEventTimestamp.getTime(), | ||
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - System.currentTimeMillis()); | ||
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); | ||
} | ||
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid", flowAction, | ||
dbCurrentTimestamp.getTime()); | ||
// Utilize db lease acquisition timestamp for wait time | ||
return new LeasedToAnotherStatus(flowAction, dbCurrentTimestamp.getTime(), | ||
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - System.currentTimeMillis()); | ||
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); | ||
} | ||
else if (leaseValidityStatus == 2) { | ||
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of whether " | ||
|
@@ -280,7 +265,7 @@ else if (leaseValidityStatus == 2) { | |
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp); | ||
return insertStatement.executeUpdate(); | ||
}, true); | ||
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction); | ||
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp)); | ||
} // No longer leasing this event | ||
if (isWithinEpsilon) { | ||
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event in db: " | ||
|
@@ -298,13 +283,13 @@ else if (leaseValidityStatus == 2) { | |
false, dbEventTimestamp, null); | ||
return insertStatement.executeUpdate(); | ||
}, true); | ||
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction); | ||
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp)); | ||
} catch (SQLException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws SQLException { | ||
protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOException { | ||
try { | ||
// Extract values from result set | ||
Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp"); | ||
|
@@ -316,29 +301,36 @@ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws SQL | |
return new GetEventInfoResult(dbEventTimestamp, dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus, | ||
dbLinger, dbCurrentTimestamp); | ||
} catch (SQLException e) { | ||
throw e; | ||
throw new IOException(e); | ||
} finally { | ||
if (resultSet != null) { | ||
resultSet.close(); | ||
try { | ||
resultSet.close(); | ||
} catch (SQLException e) { | ||
throw new IOException(e); | ||
} | ||
} | ||
} | ||
} | ||
|
||
protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws SQLException { | ||
protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException { | ||
try { | ||
if (!resultSet.next()) { | ||
resultSet.close(); | ||
log.error("Expected num rows and lease_acquisition_timestamp returned from query but received nothing"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused here, so if there's no item after in the result set we log and error but still try to parse the current result set results? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh good catch, I want this code path to terminate so instead will through an IO Error. |
||
} | ||
long eventTimeMillis = resultSet.getTimestamp(1).getTime(); | ||
long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime(); | ||
int dbLinger = resultSet.getInt(3); | ||
return new SelectInfoResult(eventTimeMillis, leaseAcquisitionTimeMillis, dbLinger); | ||
} catch (SQLException e) { | ||
throw e; | ||
throw new IOException(e); | ||
} finally { | ||
if (resultSet != null) { | ||
resultSet.close(); | ||
try { | ||
resultSet.close(); | ||
} catch (SQLException e) { | ||
throw new IOException(e); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -351,7 +343,7 @@ protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws SQ | |
* @throws IOException | ||
*/ | ||
protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated, | ||
DagActionStore.DagAction flowAction) | ||
DagActionStore.DagAction flowAction, Optional<Timestamp> dbCurrentTimestamp) | ||
throws SQLException, IOException { | ||
// Fetch values in row after attempted insert | ||
SelectInfoResult selectInfoResult = withPreparedStatement(thisTableSelectAfterInsertStatement, | ||
|
@@ -375,7 +367,7 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated, | |
// Another participant acquired lease in between | ||
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(), | ||
selectInfoResult.getLeaseAcquisitionTimeMillis() + selectInfoResult.getDbLinger() | ||
- System.currentTimeMillis()); | ||
- (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis())); | ||
} | ||
|
||
/** | ||
|
@@ -496,38 +488,21 @@ protected <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatem | |
*/ | ||
@Data | ||
static class GetEventInfoResult { | ||
private Timestamp dbEventTimestamp; | ||
private Timestamp dbLeaseAcquisitionTimestamp; | ||
private boolean withinEpsilon; | ||
private int leaseValidityStatus; | ||
private int dbLinger; | ||
private Timestamp dbCurrentTimestamp; | ||
|
||
GetEventInfoResult(Timestamp eventTimestamp, Timestamp leaseAcquisitionTimestamp, boolean isWithinEpsilon, | ||
int validityStatus, int linger, Timestamp currentTimestamp) { | ||
// Extract values from result set | ||
dbEventTimestamp = eventTimestamp; | ||
dbLeaseAcquisitionTimestamp = leaseAcquisitionTimestamp; | ||
withinEpsilon = isWithinEpsilon; | ||
leaseValidityStatus = validityStatus; | ||
dbLinger = linger; | ||
dbCurrentTimestamp = currentTimestamp; | ||
} | ||
private final Timestamp dbEventTimestamp; | ||
private final Timestamp dbLeaseAcquisitionTimestamp; | ||
private final boolean withinEpsilon; | ||
private final int leaseValidityStatus; | ||
private final int dbLinger; | ||
private final Timestamp dbCurrentTimestamp; | ||
} | ||
|
||
/** | ||
DTO for result of SELECT query used to determine status of lease acquisition attempt | ||
*/ | ||
@Data | ||
static class SelectInfoResult { | ||
private long eventTimeMillis; | ||
private long leaseAcquisitionTimeMillis; | ||
private int dbLinger; | ||
|
||
SelectInfoResult(long eventTimeMillis, long leaseAcquisitionTimeMillis, int linger) { | ||
this.eventTimeMillis = eventTimeMillis; | ||
this.leaseAcquisitionTimeMillis = leaseAcquisitionTimeMillis; | ||
this.dbLinger = linger; | ||
} | ||
private final long eventTimeMillis; | ||
private final long leaseAcquisitionTimeMillis; | ||
private final int dbLinger; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to overwrite in case the pre-existing values are out-of-date? that's why I suggested
INSERT INTO... ON DUPLICATE KEY UPDATE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, had a misunderstanding about whether or not we want to update the values upon each redeploy if new config values are provided.