Skip to content

Commit

Permalink
fix async batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Stevens committed Dec 24, 2024
1 parent fead7c3 commit 4a35fa1
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import ca.uhn.fhir.jpa.interceptor.validation.RepositoryValidatingRuleBuilder;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.packages.IHapiPackageCacheManager;
import ca.uhn.fhir.jpa.packages.IPackageInstallerSvc;
Expand Down Expand Up @@ -941,21 +940,21 @@ public Batch2TaskHelper batch2TaskHelper() {

@Bean
public IReplaceReferencesSvc replaceReferencesSvc(
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService,
IResourceLinkDao theResourceLinkDao,
IJobCoordinator theJobCoordinator,
ReplaceReferencesPatchBundleSvc theReplaceReferencesPatchBundle,
Batch2TaskHelper theBatch2TaskHelper,
JpaStorageSettings theStorageSettings) {
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService,
IResourceLinkDao theResourceLinkDao,
IJobCoordinator theJobCoordinator,
ReplaceReferencesPatchBundleSvc theReplaceReferencesPatchBundle,
Batch2TaskHelper theBatch2TaskHelper,
JpaStorageSettings theStorageSettings) {
return new ReplaceReferencesSvcImpl(
theDaoRegistry,
theHapiTransactionService,
theResourceLinkDao,
theJobCoordinator,
theReplaceReferencesPatchBundle,
theBatch2TaskHelper,
theStorageSettings);
theStorageSettings);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.api.IDaoRegistry;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.config.GeneratedDaoAndResourceProviderConfigR4;
Expand Down Expand Up @@ -107,13 +108,16 @@ public ITermLoaderSvc termLoaderService(

@Bean
public ResourceMergeService resourceMergeService(
DaoRegistry theDaoRegistry,
IReplaceReferencesSvc theReplaceReferencesSvc,
HapiTransactionService theHapiTransactionService,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
IJobCoordinator theJobCoordinator,
Batch2TaskHelper theBatch2TaskHelper) {
DaoRegistry theDaoRegistry,
IReplaceReferencesSvc theReplaceReferencesSvc,
HapiTransactionService theHapiTransactionService,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
IJobCoordinator theJobCoordinator,
Batch2TaskHelper theBatch2TaskHelper,
JpaStorageSettings theStorageSettings) {

return new ResourceMergeService(
theStorageSettings,
theDaoRegistry,
theReplaceReferencesSvc,
theHapiTransactionService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.data.IResourceLinkDao;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.replacereferences.ReplaceReferencesPatchBundleSvc;
import ca.uhn.fhir.replacereferences.ReplaceReferencesRequest;
Expand Down Expand Up @@ -60,13 +59,13 @@ public class ReplaceReferencesSvcImpl implements IReplaceReferencesSvc {
private final JpaStorageSettings myStorageSettings;

public ReplaceReferencesSvcImpl(
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService,
IResourceLinkDao theResourceLinkDao,
IJobCoordinator theJobCoordinator,
ReplaceReferencesPatchBundleSvc theReplaceReferencesPatchBundleSvc,
Batch2TaskHelper theBatch2TaskHelper,
JpaStorageSettings theStorageSettings) {
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService,
IResourceLinkDao theResourceLinkDao,
IJobCoordinator theJobCoordinator,
ReplaceReferencesPatchBundleSvc theReplaceReferencesPatchBundleSvc,
Batch2TaskHelper theBatch2TaskHelper,
JpaStorageSettings theStorageSettings) {
myDaoRegistry = theDaoRegistry;
myHapiTransactionService = theHapiTransactionService;
myResourceLinkDao = theResourceLinkDao;
Expand Down Expand Up @@ -104,7 +103,8 @@ private IBaseParameters replaceReferencesPreferAsync(
theRequestDetails,
myJobCoordinator,
JOB_REPLACE_REFERENCES,
new ReplaceReferencesJobParameters(theReplaceReferencesRequest, myStorageSettings.getDefaultTransactionEntriesForWrite()));
new ReplaceReferencesJobParameters(
theReplaceReferencesRequest, myStorageSettings.getDefaultTransactionEntriesForWrite()));

Parameters retval = new Parameters();
task.setIdElement(task.getIdElement().toUnqualifiedVersionless());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import ca.uhn.fhir.batch2.jobs.merge.MergeJobParameters;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.util.CanonicalIdentifier;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
Expand Down Expand Up @@ -128,6 +129,7 @@ public int getResourceLimit() {

public MergeJobParameters asMergeJobParameters(
FhirContext theFhirContext,
JpaStorageSettings theStorageSettings,
Patient theSourceResource,
Patient theTargetResource,
RequestPartitionId thePartitionId) {
Expand All @@ -136,7 +138,7 @@ public MergeJobParameters asMergeJobParameters(
retval.setResultResource(theFhirContext.newJsonParser().encodeResourceToString(getResultResource()));
}
retval.setDeleteSource(getDeleteSource());
retval.setBatchSize(getResourceLimit());
retval.setBatchSize(theStorageSettings.getDefaultTransactionEntriesForWrite());
retval.setSourceId(new FhirIdJson(theSourceResource.getIdElement().toVersionless()));
retval.setTargetId(new FhirIdJson(theTargetResource.getIdElement().toVersionless()));
retval.setPartitionId(thePartitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
Expand Down Expand Up @@ -53,10 +54,11 @@
public class ResourceMergeService {
private static final Logger ourLog = LoggerFactory.getLogger(ResourceMergeService.class);

private final FhirContext myFhirContext;
private final JpaStorageSettings myStorageSettings;
private final IFhirResourceDao<Patient> myPatientDao;
private final IReplaceReferencesSvc myReplaceReferencesSvc;
private final IHapiTransactionService myHapiTransactionService;
private final FhirContext myFhirContext;
private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
private final IFhirResourceDao<Task> myTaskDao;
private final IJobCoordinator myJobCoordinator;
Expand All @@ -65,12 +67,14 @@ public class ResourceMergeService {
private final MergeValidationService myMergeValidationService;

public ResourceMergeService(
DaoRegistry theDaoRegistry,
IReplaceReferencesSvc theReplaceReferencesSvc,
IHapiTransactionService theHapiTransactionService,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
IJobCoordinator theJobCoordinator,
Batch2TaskHelper theBatch2TaskHelper) {
JpaStorageSettings theStorageSettings,
DaoRegistry theDaoRegistry,
IReplaceReferencesSvc theReplaceReferencesSvc,
IHapiTransactionService theHapiTransactionService,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
IJobCoordinator theJobCoordinator,
Batch2TaskHelper theBatch2TaskHelper) {
myStorageSettings = theStorageSettings;

myPatientDao = theDaoRegistry.getResourceDao(Patient.class);
myTaskDao = theDaoRegistry.getResourceDao(Task.class);
Expand Down Expand Up @@ -237,7 +241,7 @@ private void doMergeAsync(
RequestPartitionId thePartitionId) {

MergeJobParameters mergeJobParameters = theMergeOperationParameters.asMergeJobParameters(
myFhirContext, theSourceResource, theTargetResource, thePartitionId);
myFhirContext, myStorageSettings, theSourceResource, theTargetResource, thePartitionId);

Task task = myBatch2TaskHelper.startJobAndCreateAssociatedTask(
myTaskDao, theRequestDetails, myJobCoordinator, JOB_MERGE, mergeJobParameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ca.uhn.fhir.batch2.util.Batch2TaskHelper;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoPatient;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
Expand Down Expand Up @@ -109,6 +110,8 @@ public class ResourceMergeServiceTest {
@Mock
RequestPartitionId myRequestPartitionIdMock;

@Mock
private JpaStorageSettings myStorageSettingsMock;

private ResourceMergeService myResourceMergeService;

Expand All @@ -123,7 +126,9 @@ void setup() {
when(myDaoRegistryMock.getResourceDao(eq(Patient.class))).thenReturn(myPatientDaoMock);
when(myDaoRegistryMock.getResourceDao(eq(Task.class))).thenReturn(myTaskDaoMock);
when(myPatientDaoMock.getContext()).thenReturn(myFhirContext);
myResourceMergeService = new ResourceMergeService(myDaoRegistryMock,
myResourceMergeService = new ResourceMergeService(
myStorageSettingsMock,
myDaoRegistryMock,
myReplaceReferencesSvcMock,
myTransactionServiceMock,
myRequestPartitionHelperSvcMock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import ca.uhn.fhir.batch2.jobs.chunk.FhirIdJson;
import ca.uhn.fhir.batch2.jobs.parameters.BatchJobParametersWithTaskId;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.replacereferences.ReplaceReferencesRequest;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -48,7 +49,8 @@ public ReplaceReferencesJobParameters() {}
public ReplaceReferencesJobParameters(ReplaceReferencesRequest theReplaceReferencesRequest, int theBatchSize) {
mySourceId = new FhirIdJson(theReplaceReferencesRequest.sourceId);
myTargetId = new FhirIdJson(theReplaceReferencesRequest.targetId);
// Note theReplaceReferencesRequest.resourceLimit is only used for the synchronous case. It is ignored in this async case.
// Note theReplaceReferencesRequest.resourceLimit is only used for the synchronous case. It is ignored in this
// async case.
myBatchSize = theBatchSize;
myPartitionId = theReplaceReferencesRequest.partitionId;
}
Expand All @@ -71,7 +73,7 @@ public void setTargetId(FhirIdJson theTargetId) {

public int getBatchSize() {
if (myBatchSize <= 0) {
myBatchSize = ProviderConstants.OPERATION_REPLACE_REFERENCES_RESOURCE_LIMIT_DEFAULT;
myBatchSize = JpaStorageSettings.DEFAULT_TRANSACTION_ENTRIES_FOR_WRITE;
}
return myBatchSize;
}
Expand Down

0 comments on commit 4a35fa1

Please sign in to comment.