Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1851] Unit tests for MysqlMultiActiveLeaseArbiter with Single Participant #3715

Merged
merged 10 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ protected interface CheckedFunction<T, R> {
+ "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s "
+ "(epsilon INT, linger INT, PRIMARY KEY (epsilon, linger))";
private static final String GET_ROW_COUNT_STATEMENT = "SELECT COUNT(*) FROM %s";
private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (epsilon, linger) VALUES (?,?)";
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already.
private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (epsilon, linger) SELECT ?, ? "
+ "WHERE NOT EXISTS (SELECT 1 FROM %s)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to overwrite in case the pre-existing values are out-of-date? that's why I suggested INSERT INTO... ON DUPLICATE KEY UPDATE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, had a misunderstanding about whether or not we want to update the values upon each redeploy if new config values are provided.

protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=?"
+ " AND flow_action=?";
protected static final String WHERE_CLAUSE_TO_MATCH_ROW = WHERE_CLAUSE_TO_MATCH_KEY
Expand Down Expand Up @@ -173,30 +174,14 @@ private void initializeConstantsTable() throws IOException {
String createConstantsStatement = String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
umustafi marked this conversation as resolved.
Show resolved Hide resolved
withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(), true);

Optional<Integer> count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, this.constantsTableName), getStatement -> {
ResultSet resultSet = getStatement.executeQuery();
try {
if (resultSet.next()) {
return Optional.of(resultSet.getInt(1));
}
return Optional.absent();
} finally {
if (resultSet != null) {
resultSet.close();
}
}
String insertConstantsStatement = String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName,
this.constantsTableName);
withPreparedStatement(insertConstantsStatement, insertStatement -> {
int i = 0;
insertStatement.setInt(++i, epsilon);
insertStatement.setInt(++i, linger);
return insertStatement.executeUpdate();
}, true);

// Only insert epsilon and linger values from config if this table does not contain pre-existing values.
if (count.isPresent() && count.get() == 0) {
String insertConstantsStatement = String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
withPreparedStatement(insertConstantsStatement, insertStatement -> {
int i = 0;
insertStatement.setInt(++i, epsilon);
insertStatement.setInt(++i, linger);
return insertStatement.executeUpdate();
}, true);
}
}

@Override
Expand Down Expand Up @@ -234,7 +219,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
completeInsertPreparedStatement(insertStatement, flowAction);
return insertStatement.executeUpdate();
}, true);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.absent());
}

// Extract values from result set
Expand All @@ -255,13 +240,13 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(flowAction, dbEventTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - System.currentTimeMillis());
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid", flowAction,
dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
return new LeasedToAnotherStatus(flowAction, dbCurrentTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - System.currentTimeMillis());
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
else if (leaseValidityStatus == 2) {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of whether "
Expand All @@ -280,7 +265,7 @@ else if (leaseValidityStatus == 2) {
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return insertStatement.executeUpdate();
}, true);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp));
} // No longer leasing this event
if (isWithinEpsilon) {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5: Same event, no longer leasing event in db: "
Expand All @@ -298,13 +283,13 @@ else if (leaseValidityStatus == 2) {
false, dbEventTimestamp, null);
return insertStatement.executeUpdate();
}, true);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws SQLException {
protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOException {
try {
// Extract values from result set
Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
Expand All @@ -316,29 +301,36 @@ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws SQL
return new GetEventInfoResult(dbEventTimestamp, dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
dbLinger, dbCurrentTimestamp);
} catch (SQLException e) {
throw e;
throw new IOException(e);
} finally {
if (resultSet != null) {
resultSet.close();
try {
resultSet.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
}
}

protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws SQLException {
protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException {
try {
if (!resultSet.next()) {
resultSet.close();
log.error("Expected num rows and lease_acquisition_timestamp returned from query but received nothing");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused here, so if there's no item after in the result set we log and error but still try to parse the current result set results?

Copy link
Contributor Author

@umustafi umustafi Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch, I want this code path to terminate so instead will through an IO Error.

}
long eventTimeMillis = resultSet.getTimestamp(1).getTime();
long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
int dbLinger = resultSet.getInt(3);
return new SelectInfoResult(eventTimeMillis, leaseAcquisitionTimeMillis, dbLinger);
} catch (SQLException e) {
throw e;
throw new IOException(e);
} finally {
if (resultSet != null) {
resultSet.close();
try {
resultSet.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
}
}
Expand All @@ -351,7 +343,7 @@ protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws SQ
* @throws IOException
*/
protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
DagActionStore.DagAction flowAction)
DagActionStore.DagAction flowAction, Optional<Timestamp> dbCurrentTimestamp)
throws SQLException, IOException {
// Fetch values in row after attempted insert
SelectInfoResult selectInfoResult = withPreparedStatement(thisTableSelectAfterInsertStatement,
Expand All @@ -375,7 +367,7 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis() + selectInfoResult.getDbLinger()
- System.currentTimeMillis());
- (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}

/**
Expand Down Expand Up @@ -496,38 +488,21 @@ protected <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatem
*/
@Data
static class GetEventInfoResult {
private Timestamp dbEventTimestamp;
private Timestamp dbLeaseAcquisitionTimestamp;
private boolean withinEpsilon;
private int leaseValidityStatus;
private int dbLinger;
private Timestamp dbCurrentTimestamp;

GetEventInfoResult(Timestamp eventTimestamp, Timestamp leaseAcquisitionTimestamp, boolean isWithinEpsilon,
int validityStatus, int linger, Timestamp currentTimestamp) {
// Extract values from result set
dbEventTimestamp = eventTimestamp;
dbLeaseAcquisitionTimestamp = leaseAcquisitionTimestamp;
withinEpsilon = isWithinEpsilon;
leaseValidityStatus = validityStatus;
dbLinger = linger;
dbCurrentTimestamp = currentTimestamp;
}
private final Timestamp dbEventTimestamp;
private final Timestamp dbLeaseAcquisitionTimestamp;
private final boolean withinEpsilon;
private final int leaseValidityStatus;
private final int dbLinger;
private final Timestamp dbCurrentTimestamp;
}

/**
DTO for result of SELECT query used to determine status of lease acquisition attempt
*/
@Data
static class SelectInfoResult {
private long eventTimeMillis;
private long leaseAcquisitionTimeMillis;
private int dbLinger;

SelectInfoResult(long eventTimeMillis, long leaseAcquisitionTimeMillis, int linger) {
this.eventTimeMillis = eventTimeMillis;
this.leaseAcquisitionTimeMillis = leaseAcquisitionTimeMillis;
this.dbLinger = linger;
}
private final long eventTimeMillis;
private final long leaseAcquisitionTimeMillis;
private final int dbLinger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertTrue(secondLeasedToAnotherStatus.getEventTimeMillis() == firstObtainedStatus.getEventTimestamp());
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() > LINGER);
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() >= LINGER);

// Tests CASE 3 of trying to acquire a lease for a distinct flow action event, while the previous event's lease is
// valid
Expand All @@ -110,7 +110,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() > firstObtainedStatus.getEventTimestamp());
Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() > LINGER);
Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() < LINGER);

// Tests CASE 4 of lease out of date
Thread.sleep(LINGER);
Expand Down