Skip to content

Commit

Permalink
fix async batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Stevens committed Dec 24, 2024
1 parent e52a8f4 commit fead7c3
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import ca.uhn.fhir.jpa.interceptor.validation.RepositoryValidatingRuleBuilder;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.packages.IHapiPackageCacheManager;
import ca.uhn.fhir.jpa.packages.IPackageInstallerSvc;
Expand Down Expand Up @@ -940,19 +941,21 @@ public Batch2TaskHelper batch2TaskHelper() {

@Bean
public IReplaceReferencesSvc replaceReferencesSvc(
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService,
IResourceLinkDao theResourceLinkDao,
IJobCoordinator theJobCoordinator,
ReplaceReferencesPatchBundleSvc theReplaceReferencesPatchBundle,
Batch2TaskHelper theBatch2TaskHelper) {
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService,
IResourceLinkDao theResourceLinkDao,
IJobCoordinator theJobCoordinator,
ReplaceReferencesPatchBundleSvc theReplaceReferencesPatchBundle,
Batch2TaskHelper theBatch2TaskHelper,
JpaStorageSettings theStorageSettings) {
return new ReplaceReferencesSvcImpl(
theDaoRegistry,
theHapiTransactionService,
theResourceLinkDao,
theJobCoordinator,
theReplaceReferencesPatchBundle,
theBatch2TaskHelper);
theBatch2TaskHelper,
theStorageSettings);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.replacereferences.ReplaceReferencesJobParameters;
import ca.uhn.fhir.batch2.util.Batch2TaskHelper;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.data.IResourceLinkDao;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.replacereferences.ReplaceReferencesPatchBundleSvc;
import ca.uhn.fhir.replacereferences.ReplaceReferencesRequest;
Expand Down Expand Up @@ -55,20 +57,23 @@ public class ReplaceReferencesSvcImpl implements IReplaceReferencesSvc {
private final IJobCoordinator myJobCoordinator;
private final ReplaceReferencesPatchBundleSvc myReplaceReferencesPatchBundleSvc;
private final Batch2TaskHelper myBatch2TaskHelper;
private final JpaStorageSettings myStorageSettings;

public ReplaceReferencesSvcImpl(
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService,
IResourceLinkDao theResourceLinkDao,
IJobCoordinator theJobCoordinator,
ReplaceReferencesPatchBundleSvc theReplaceReferencesPatchBundleSvc,
Batch2TaskHelper theBatch2TaskHelper) {
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService,
IResourceLinkDao theResourceLinkDao,
IJobCoordinator theJobCoordinator,
ReplaceReferencesPatchBundleSvc theReplaceReferencesPatchBundleSvc,
Batch2TaskHelper theBatch2TaskHelper,
JpaStorageSettings theStorageSettings) {
myDaoRegistry = theDaoRegistry;
myHapiTransactionService = theHapiTransactionService;
myResourceLinkDao = theResourceLinkDao;
myJobCoordinator = theJobCoordinator;
myReplaceReferencesPatchBundleSvc = theReplaceReferencesPatchBundleSvc;
myBatch2TaskHelper = theBatch2TaskHelper;
myStorageSettings = theStorageSettings;
}

@Override
Expand Down Expand Up @@ -99,7 +104,7 @@ private IBaseParameters replaceReferencesPreferAsync(
theRequestDetails,
myJobCoordinator,
JOB_REPLACE_REFERENCES,
new ReplaceReferencesJobParameters(theReplaceReferencesRequest));
new ReplaceReferencesJobParameters(theReplaceReferencesRequest, myStorageSettings.getDefaultTransactionEntriesForWrite()));

Parameters retval = new Parameters();
task.setIdElement(task.getIdElement().toUnqualifiedVersionless());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public MergeJobParameters asMergeJobParameters(
retval.setResultResource(theFhirContext.newJsonParser().encodeResourceToString(getResultResource()));
}
retval.setDeleteSource(getDeleteSource());
retval.setResourceLimit(getResourceLimit());
retval.setBatchSize(getResourceLimit());
retval.setSourceId(new FhirIdJson(theSourceResource.getIdElement().toVersionless()));
retval.setTargetId(new FhirIdJson(theTargetResource.getIdElement().toVersionless()));
retval.setPartitionId(thePartitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1438,7 +1438,7 @@ private void verifyBatch2JobTaskHelperMockInvocation(@Nullable Patient theResult

assertThat(jobParametersCaptor.getValue()).isInstanceOf(MergeJobParameters.class);
MergeJobParameters capturedJobParams = (MergeJobParameters) jobParametersCaptor.getValue();
assertThat(capturedJobParams.getResourceLimit()).isEqualTo(PAGE_SIZE);
assertThat(capturedJobParams.getBatchSize()).isEqualTo(PAGE_SIZE);
assertThat(capturedJobParams.getSourceId().toString()).isEqualTo(SOURCE_PATIENT_TEST_ID);
assertThat(capturedJobParams.getTargetId().toString()).isEqualTo(TARGET_PATIENT_TEST_ID);
assertThat(capturedJobParams.getDeleteSource()).isEqualTo(theDeleteSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,22 @@ public class ReplaceReferencesJobParameters extends BatchJobParametersWithTaskId
private FhirIdJson myTargetId;

@JsonProperty(
value = "resourceLimit",
value = "batchSize",
defaultValue = ProviderConstants.OPERATION_REPLACE_REFERENCES_RESOURCE_LIMIT_DEFAULT_STRING,
required = false)
private int myResourceLimit;
private int myBatchSize;

@JsonProperty("partitionId")
private RequestPartitionId myPartitionId;

public ReplaceReferencesJobParameters() {}

public ReplaceReferencesJobParameters(ReplaceReferencesRequest theRequest) {
mySourceId = new FhirIdJson(theRequest.sourceId);
myTargetId = new FhirIdJson(theRequest.targetId);
myResourceLimit = theRequest.resourceLimit;
myPartitionId = theRequest.partitionId;
public ReplaceReferencesJobParameters(ReplaceReferencesRequest theReplaceReferencesRequest, int theBatchSize) {
mySourceId = new FhirIdJson(theReplaceReferencesRequest.sourceId);
myTargetId = new FhirIdJson(theReplaceReferencesRequest.targetId);
// Note theReplaceReferencesRequest.resourceLimit is only used for the synchronous case. It is ignored in this async case.
myBatchSize = theBatchSize;
myPartitionId = theReplaceReferencesRequest.partitionId;
}

public FhirIdJson getSourceId() {
Expand All @@ -68,15 +69,15 @@ public void setTargetId(FhirIdJson theTargetId) {
myTargetId = theTargetId;
}

public int getResourceLimit() {
if (myResourceLimit <= 0) {
myResourceLimit = ProviderConstants.OPERATION_REPLACE_REFERENCES_RESOURCE_LIMIT_DEFAULT;
public int getBatchSize() {
if (myBatchSize <= 0) {
myBatchSize = ProviderConstants.OPERATION_REPLACE_REFERENCES_RESOURCE_LIMIT_DEFAULT;
}
return myResourceLimit;
return myBatchSize;
}

public void setResourceLimit(int theResourceLimit) {
myResourceLimit = theResourceLimit;
public void setBatchSize(int theBatchSize) {
myBatchSize = theBatchSize;
}

public RequestPartitionId getPartitionId() {
Expand All @@ -88,6 +89,6 @@ public void setPartitionId(RequestPartitionId thePartitionId) {
}

public ReplaceReferencesRequest asReplaceReferencesRequest() {
return new ReplaceReferencesRequest(mySourceId.asIdDt(), myTargetId.asIdDt(), myResourceLimit, myPartitionId);
return new ReplaceReferencesRequest(mySourceId.asIdDt(), myTargetId.asIdDt(), myBatchSize, myPartitionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public RunOutcome run(
params.getSourceId().asIdDt())
.map(FhirIdJson::new);

StreamUtil.partition(stream, params.getResourceLimit())
StreamUtil.partition(stream, params.getBatchSize())
.forEach(chunk ->
totalCount.addAndGet(processChunk(theDataSink, chunk, params.getPartitionId())));
});
Expand Down

0 comments on commit fead7c3

Please sign in to comment.