Skip to content

Unwrap FailsafeException in BulkConnectionRetryWrapper #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -59,30 +59,45 @@ public JobInfo createJob(JobInfo jobInfo) throws AsyncApiException {
if (!retryOnBackendError) {
return bulkConnection.createJob(jobInfo);
}
Object resultJobInfo = Failsafe.with(retryPolicy).onFailure(event -> LOG.info("Failed while creating job."))
.get(() -> {
try {
return bulkConnection.createJob(jobInfo);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
Object resultJobInfo;
try {
resultJobInfo = Failsafe.with(retryPolicy).onFailure(event -> LOG.info("Failed while creating job.")).get(() -> {
try {
return bulkConnection.createJob(jobInfo);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
} catch (FailsafeException fse) {
if (unwrapFailsafeException(fse) instanceof AsyncApiException) {
throw (AsyncApiException) unwrapFailsafeException(fse);
Comment on lines +72 to +73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unable to understand if we are wrapping AsyncApiException with SalesforceQueryExecutionException, why do we require this?

Shouldn't we be catching SalesforceQueryExecutionException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Salesforce library throws AsyncApiException when there is an error, since we do not want to retry every error but only the bulk connection, we catch AsyncApiException when making a bulk connection call and throw our custom SalesforceQueryExecutionException.

Then the retry policy retries if SalesforceQueryExecutionException is seen, this helps us limit which AsyncApiException we need retry on.

unwrapFailsafeException function checks the stack trace and gets SalesforceQueryExecutionException

since SalesforceQueryExecutionException is also our own exception i unwrap that too and then throw AsyncApiException.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular code, I only see bulk connection calls, which means we are catching all AsyncApiException, am I missing something?

}
throw new RuntimeException(unwrapFailsafeException(fse)); // wrap as RuntimeException if not AsyncApiException
}
return (JobInfo) resultJobInfo;
}

public JobInfo getJobStatus(String jobId) throws AsyncApiException {
if (!retryOnBackendError) {
return bulkConnection.getJobStatus(jobId);
}
Object resultJobInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting job status."))
.get(() -> {
try {
return bulkConnection.getJobStatus(jobId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
Object resultJobInfo;
try {
resultJobInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting job status."))
.get(() -> {
try {
return bulkConnection.getJobStatus(jobId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
} catch (FailsafeException fse) {
if (unwrapFailsafeException(fse) instanceof AsyncApiException) {
throw (AsyncApiException) unwrapFailsafeException(fse);
}
throw new RuntimeException(unwrapFailsafeException(fse)); // wrap as RuntimeException if not AsyncApiException
}
return (JobInfo) resultJobInfo;
}

@@ -91,78 +106,113 @@ public void updateJob(JobInfo jobInfo) throws AsyncApiException {
bulkConnection.updateJob(jobInfo);
return;
}
Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while updating job."))
.get(() -> {
try {
return bulkConnection.updateJob(jobInfo);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
try {
Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while updating job."))
.get(() -> {
try {
return bulkConnection.updateJob(jobInfo);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
} catch (FailsafeException fse) {
if (unwrapFailsafeException(fse) instanceof AsyncApiException) {
throw (AsyncApiException) unwrapFailsafeException(fse);
}
throw new RuntimeException(unwrapFailsafeException(fse)); // wrap as RuntimeException if not AsyncApiException
}
}

public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException {
if (!retryOnBackendError) {
return bulkConnection.getBatchInfoList(jobId);
}
Object batchInfoList = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting batch info list."))
.get(() -> {
try {
return bulkConnection.getBatchInfoList(jobId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
Object batchInfoList;
try {
batchInfoList = Failsafe.with(retryPolicy).onFailure(event -> LOG.info("Failed while getting batch info list."))
.get(() -> {
try {
return bulkConnection.getBatchInfoList(jobId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
} catch (FailsafeException fse) {
if (unwrapFailsafeException(fse) instanceof AsyncApiException) {
throw (AsyncApiException) unwrapFailsafeException(fse);
}
throw new RuntimeException(unwrapFailsafeException(fse)); // wrap as RuntimeException if not AsyncApiException
}
return (BatchInfoList) batchInfoList;
}

public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiException {
if (!retryOnBackendError) {
return bulkConnection.getBatchInfo(jobId, batchId);
}
Object batchInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting batch status."))
.get(() -> {
try {
return bulkConnection.getBatchInfo(jobId, batchId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
Object batchInfo;
try {
batchInfo = Failsafe.with(retryPolicy).onFailure(event -> LOG.info("Failed while getting batch status."))
.get(() -> {
try {
return bulkConnection.getBatchInfo(jobId, batchId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
} catch (FailsafeException fse) {
if (unwrapFailsafeException(fse) instanceof AsyncApiException) {
throw (AsyncApiException) unwrapFailsafeException(fse);
}
throw new RuntimeException(unwrapFailsafeException(fse)); // wrap as RuntimeException if not AsyncApiException
}
return (BatchInfo) batchInfo;
}

public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException {
if (!retryOnBackendError) {
return bulkConnection.getBatchResultStream(jobId, batchId);
}
Object inputStream = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting batch result stream."))
.get(() -> {
try {
return bulkConnection.getBatchResultStream(jobId, batchId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
Object inputStream;
try {
inputStream = Failsafe.with(retryPolicy).onFailure(event -> LOG.info("Failed while getting batch result stream."))
.get(() -> {
try {
return bulkConnection.getBatchResultStream(jobId, batchId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
} catch (FailsafeException fse) {
if (unwrapFailsafeException(fse) instanceof AsyncApiException) {
throw (AsyncApiException) unwrapFailsafeException(fse);
}
throw new RuntimeException(unwrapFailsafeException(fse)); // wrap as RuntimeException if not AsyncApiException
}
return (InputStream) inputStream;
}

public InputStream getQueryResultStream(String jobId, String batchId, String resultId) throws AsyncApiException {
if (!retryOnBackendError) {
return bulkConnection.getQueryResultStream(jobId, batchId, resultId);
}
Object inputStream = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while getting query result stream."))
.get(() -> {
try {
return bulkConnection.getQueryResultStream(jobId, batchId, resultId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
Object inputStream;
try {
inputStream = Failsafe.with(retryPolicy).onFailure(event -> LOG.info("Failed while getting query result stream."))
.get(() -> {
try {
return bulkConnection.getQueryResultStream(jobId, batchId, resultId);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
} catch (FailsafeException fse) {
if (unwrapFailsafeException(fse) instanceof AsyncApiException) {
throw (AsyncApiException) unwrapFailsafeException(fse);
}
throw new RuntimeException(unwrapFailsafeException(fse)); // wrap as RuntimeException if not AsyncApiException
}
return (InputStream) inputStream;
}

@@ -171,18 +221,43 @@ public BatchInfo createBatchFromStream(String query, JobInfo job) throws AsyncAp
if (!retryOnBackendError) {
return createBatchFromStreamI(query, job);
}
Object batchInfo = Failsafe.with(retryPolicy)
.onFailure(event -> LOG.info("Failed while creating batch from stream."))
.get(() -> {
try {
return createBatchFromStreamI(query, job);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
Object batchInfo;
try {
batchInfo = Failsafe.with(retryPolicy).onFailure(event -> LOG.info("Failed while creating batch from stream."))
.get(() -> {
try {
return createBatchFromStreamI(query, job);
} catch (AsyncApiException e) {
throw new SalesforceQueryExecutionException(e);
}
});
} catch (FailsafeException fse) {
if (unwrapFailsafeException(fse) instanceof AsyncApiException) {
throw (AsyncApiException) unwrapFailsafeException(fse);
}
throw new RuntimeException(unwrapFailsafeException(fse)); // wrap as RuntimeException if not AsyncApiException
}
return (BatchInfo) batchInfo;
}

private static Exception unwrapFailsafeException(FailsafeException e) {
if (e.getCause() instanceof Exception) {
Exception innerException = (Exception) e.getCause();
if (innerException instanceof SalesforceQueryExecutionException) {
return unwrapSalesforceQueryExecutionException((SalesforceQueryExecutionException) innerException);
}
return innerException;
}
return e;
}

private static Exception unwrapSalesforceQueryExecutionException(SalesforceQueryExecutionException e) {
if (e.getCause() instanceof AsyncApiException) {
return (AsyncApiException) e.getCause();
}
return e;
}

private BatchInfo createBatchFromStreamI(String query, JobInfo job) throws
SalesforceQueryExecutionException, IOException, AsyncApiException {
BatchInfo batchInfo = null;
Original file line number Diff line number Diff line change
@@ -321,7 +321,7 @@ private void assertRecordReaderOutputRecords(String[] csvStrings, Schema schema,
* The FailsafeException is thrown as retry limit gets exceeded.
* The retry policy is correctly configured with the specified configurations.
*/
@Test(expected = FailsafeException.class)
@Test(expected = AsyncApiException.class)
public void testRetryMechanism() throws Exception {
Long initialRetryDuration = 5L;
Long maxRetryDuration = 10L;
@@ -370,7 +370,7 @@ private void assertRecordReaderOutputRecordsRetryMechanism(String[] csvStrings,
reader.setupParser();
}

@Test (expected = FailsafeException.class)
@Test (expected = AsyncApiException.class)
public void testSetupParserWithoutRetry() throws Exception {
String csvString1 = "\"Id\",\"IsDeleted\",\"ExpectedRevenue\",\"LastModifiedDate\",\"CloseDate\",\"Time\"\n" +
"\"0061i000003XNcBAAW\",\"false\",\"1500.0\",\"2019-02-22T07:03:21.000Z\",\"2019-01-01\",\"12:00:30.000Z\"\n";