diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/AggregationAppModule.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/AggregationAppModule.java index 773032f677..2cbe833923 100644 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/AggregationAppModule.java +++ b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/AggregationAppModule.java @@ -10,7 +10,6 @@ import com.typesafe.config.Config; import gov.cms.dpc.aggregation.engine.AggregationEngine; import gov.cms.dpc.aggregation.engine.JobBatchProcessor; -import gov.cms.dpc.aggregation.engine.JobBatchProcessorV2; import gov.cms.dpc.aggregation.engine.OperationsConfig; import gov.cms.dpc.aggregation.health.AggregationEngineHealthCheck; import gov.cms.dpc.aggregation.service.*; @@ -40,7 +39,6 @@ public void configure() { binder.bind(AggregationEngine.class); binder.bind(AggregationManager.class).asEagerSingleton(); binder.bind(JobBatchProcessor.class); - binder.bind(JobBatchProcessorV2.class); binder.bind(AggregationEngineHealthCheck.class); // Healthchecks diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/AggregationEngine.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/AggregationEngine.java index 703b39be6c..f59376f635 100644 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/AggregationEngine.java +++ b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/AggregationEngine.java @@ -39,7 +39,6 @@ public class AggregationEngine implements Runnable { private final IJobQueue queue; private final OperationsConfig operationsConfig; private final JobBatchProcessor jobBatchProcessor; - private final JobBatchProcessorV2 jobBatchProcessorV2; private Disposable subscribe; /** @@ -57,12 +56,11 @@ public class AggregationEngine implements Runnable { * @param jobBatchProcessor - {@link JobBatchProcessor} contains all the job processing logic */ @Inject - public AggregationEngine(@AggregatorID UUID aggregatorID, IJobQueue queue, OperationsConfig operationsConfig, JobBatchProcessor jobBatchProcessor, JobBatchProcessorV2 jobBatchProcessorV2) { + public AggregationEngine(@AggregatorID UUID aggregatorID, IJobQueue queue, OperationsConfig operationsConfig, JobBatchProcessor jobBatchProcessor) { this.aggregatorID = aggregatorID; this.queue = queue; this.operationsConfig = operationsConfig; this.jobBatchProcessor = jobBatchProcessor; - this.jobBatchProcessorV2 = jobBatchProcessorV2; } /** @@ -196,12 +194,7 @@ protected void processJobBatch(JobQueueBatch job) { } private Optional processPatient(JobQueueBatch job, String patientId) { - if (job.isV2()) { - jobBatchProcessorV2.processJobBatchPartial(aggregatorID, queue, job, patientId); - } - else { - jobBatchProcessor.processJobBatchPartial(aggregatorID, queue, job, patientId); - } + jobBatchProcessor.processJobBatchPartial(aggregatorID, queue, job, patientId); // Stop processing when no patients or early shutdown return this.isRunning() ? job.fetchNextPatient(aggregatorID) : Optional.empty(); diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/JobBatchProcessor.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/JobBatchProcessor.java index b3cb796bd1..037f500cff 100644 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/JobBatchProcessor.java +++ b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/JobBatchProcessor.java @@ -1,6 +1,7 @@ package gov.cms.dpc.aggregation.engine; import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import gov.cms.dpc.aggregation.service.*; @@ -9,16 +10,14 @@ import gov.cms.dpc.common.MDCConstants; import gov.cms.dpc.common.utils.MetricMaker; import gov.cms.dpc.fhir.DPCResourceType; +import gov.cms.dpc.fhir.FHIRExtractors; import gov.cms.dpc.queue.IJobQueue; import gov.cms.dpc.queue.models.JobQueueBatch; import gov.cms.dpc.queue.models.JobQueueBatchFile; import io.reactivex.Flowable; import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.tuple.Pair; -import org.hl7.fhir.dstu3.model.ExplanationOfBenefit; -import org.hl7.fhir.dstu3.model.OperationOutcome; -import org.hl7.fhir.dstu3.model.Patient; -import org.hl7.fhir.dstu3.model.Resource; +import org.hl7.fhir.dstu3.model.*; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static gov.cms.dpc.fhir.FHIRExtractors.getPatientMBI; import static gov.cms.dpc.fhir.FHIRExtractors.getPatientMBIs; public class JobBatchProcessor { @@ -63,49 +63,118 @@ public JobBatchProcessor(BlueButtonClient bbclient, FhirContext fhirContext, Met * @param aggregatorID the current aggregatorID * @param queue the queue * @param job the job to process - * @param patientID the current patient id to process + * @param mbi the current patient mbi to process * @return A list of batch files {@link JobQueueBatchFile} */ - public List processJobBatchPartial(UUID aggregatorID, IJobQueue queue, JobQueueBatch job, String patientID) { + public List processJobBatchPartial(UUID aggregatorID, IJobQueue queue, JobQueueBatch job, String mbi) { StopWatch stopWatch = StopWatch.createStarted(); - OutcomeReason failReason = null; - final Pair>, Optional> consentResult = getConsent(patientID); + Optional failReason = Optional.empty(); + Optional> flowable = Optional.empty(); - Flowable flowable; - if (consentResult.getRight().isPresent()) { - flowable = Flowable.just(consentResult.getRight().get()); - failReason = OutcomeReason.INTERNAL_ERROR; - } else if (isOptedOut(consentResult.getLeft())) { - failReason = OutcomeReason.CONSENT_OPTED_OUT; - flowable = Flowable.just(AggregationUtils.toOperationOutcome(OutcomeReason.CONSENT_OPTED_OUT, patientID)); - } else if (isLookBackExempt(job.getOrgID())) { - logger.info("Skipping lookBack for org: {}", job.getOrgID().toString()); - MDC.put(MDCConstants.IS_SMOKE_TEST_ORG, "true"); - flowable = Flowable.fromIterable(job.getResourceTypes()) - .flatMap(r -> fetchResource(job, patientID, r, job.getSince().orElse(null))); - } else { - List answers = getLookBackAnswers(job, patientID); - if (passesLookBack(answers)) { - flowable = Flowable.fromIterable(job.getResourceTypes()) - .flatMap(r -> fetchResource(job, patientID, r, job.getSince().orElse(null))); - } else { - failReason = LookBackAnalyzer.analyze(answers); - flowable = Flowable.just(AggregationUtils.toOperationOutcome(failReason, patientID)); + // Load the Patient resource from BFD. + final Optional optPatient = fetchPatient(job, mbi); + if(optPatient.isEmpty()) { + // Failed to load patient + failReason = Optional.of(OutcomeReason.INTERNAL_ERROR); + flowable = Optional.of(Flowable.just(AggregationUtils.toOperationOutcome(failReason.get(), mbi))); + } + + // Check if the patient has opted out + if(flowable.isEmpty()) { + Optional, OutcomeReason>> consentResult = checkForOptOut(optPatient.get()); + if(consentResult.isPresent()) { + flowable = Optional.of(consentResult.get().getLeft()); + failReason = Optional.of(consentResult.get().getRight()); } } - final var results = writeResource(job, flowable) + // Check if the patient passes look back + if(flowable.isEmpty()) { + Optional, OutcomeReason>> lookBackResult = checkLookBack(optPatient.get(), job); + if(lookBackResult.isPresent()) { + flowable = Optional.of(lookBackResult.get().getLeft()); + failReason = Optional.of(lookBackResult.get().getRight()); + } + } + + // All checks passed, load resources + if(flowable.isEmpty()) { + flowable = Optional.of( + Flowable.fromIterable(job.getResourceTypes()).flatMap(r -> fetchResource(job, optPatient.get(), r, job.getSince().orElse(null))) + ); + } + + final var results = writeResource(job, flowable.get()) .toList() .blockingGet(); queue.completePartialBatch(job, aggregatorID); final String resourcesRequested = job.getResourceTypes().stream().map(DPCResourceType::getPath).filter(Objects::nonNull).collect(Collectors.joining(";")); - final String failReasonLabel = failReason == null ? "NA" : failReason.name(); + final String failReasonLabel = failReason.isEmpty() ? "NA" : failReason.get().name(); stopWatch.stop(); - logger.info("dpcMetric=DataExportResult,dataRetrieved={},failReason={},resourcesRequested={},duration={}", failReason == null, failReasonLabel, resourcesRequested, stopWatch.getTime()); + logger.info("dpcMetric=DataExportResult,dataRetrieved={},failReason={},resourcesRequested={},duration={}", failReason.isEmpty(), failReasonLabel, resourcesRequested, stopWatch.getTime()); return results; } + /** + * Checks the given patient against the consent service and returns any issues if the check doesn't pass. + * @param patient {@link Patient} resource we're checking consent for. + * @return If there's a problem, it returns a pair of a {@link Flowable} {@link OperationOutcome} and an {@link OutcomeReason}. + * If the Patient passes the consent check, it returns an empty {@link Optional}s. + */ + private Optional, OutcomeReason>> checkForOptOut(Patient patient) { + final Pair>, Optional> consentResult = getConsent(patient); + + if (consentResult.getRight().isPresent()) { + // Consent check returned an error + return Optional.of( + Pair.of( + Flowable.just(consentResult.getRight().get()), + OutcomeReason.INTERNAL_ERROR + ) + ); + } else if (isOptedOut(consentResult.getLeft())) { + // Enrollee is opted out + return Optional.of( + Pair.of( + Flowable.just(AggregationUtils.toOperationOutcome(OutcomeReason.CONSENT_OPTED_OUT, FHIRExtractors.getPatientMBI(patient))), + OutcomeReason.CONSENT_OPTED_OUT + ) + ); + } + + // Passes consent check + return Optional.empty(); + } + + /** + * Does the patient look back check and returns any issues if it doesn't pass. + * @param patient {@link Patient} resource we're looking for a relationship for. + * @param job {@link JobQueueBatch} currently running. + * @return If there's a problem, it returns a pair of a {@link Flowable} {@link OperationOutcome} and an {@link OutcomeReason}. + * If the look back check passes, an empty {@link Optional}. + */ + private Optional, OutcomeReason>> checkLookBack(Patient patient, JobQueueBatch job) { + if (isLookBackExempt(job.getOrgID())) { + logger.info("Skipping lookBack for org: {}", job.getOrgID().toString()); + MDC.put(MDCConstants.IS_SMOKE_TEST_ORG, "true"); + } else { + List answers = getLookBackAnswers(job, patient); + if (!passesLookBack(answers)) { + OutcomeReason failReason = LookBackAnalyzer.analyze(answers); + return Optional.of( + Pair.of( + Flowable.just(AggregationUtils.toOperationOutcome(failReason, FHIRExtractors.getPatientMBI(patient))), + failReason + ) + ); + } + } + + // Passes lookback check + return Optional.empty(); + } + private boolean isLookBackExempt(UUID orgId) { List exemptOrgs = operationsConfig.getLookBackExemptOrgs(); if (exemptOrgs != null) { @@ -117,13 +186,11 @@ private boolean isLookBackExempt(UUID orgId) { /** * Fetch and write a specific resource type * - * @param job the job to associate the fetch - * @param patientID the patientID to fetch data - * @param resourceType the resourceType to fetch data - * @param since the since date + * @param job the job to associate the fetch + * @param patient the {@link Patient} we're fetching data for * @return A flowable and resourceType the user requested */ - private Flowable fetchResource(JobQueueBatch job, String patientID, DPCResourceType resourceType, OffsetDateTime since) { + private Flowable fetchResource(JobQueueBatch job, Patient patient, DPCResourceType resourceType, OffsetDateTime since) { // Make this flow hot (ie. only called once) when multiple subscribers attach final var fetcher = new ResourceFetcher(bbclient, job.getJobID(), @@ -131,18 +198,50 @@ private Flowable fetchResource(JobQueueBatch job, String patientID, DP resourceType, since, job.getTransactionTime()); - return fetcher.fetchResources(patientID, new JobHeaders(job.getRequestingIP(),job.getJobID().toString(), + return fetcher.fetchResources(patient, new JobHeaders(job.getRequestingIP(),job.getJobID().toString(), job.getProviderNPI(),job.getTransactionTime().toString(),job.isBulk()).buildHeaders()) .flatMap(Flowable::fromIterable); } - private List getLookBackAnswers(JobQueueBatch job, String patientId) { + /** + * Fetches the {@link Patient} referenced by the given mbi. Throws a {@link ResourceNotFoundException} if no + * {@link Patient} can be found. + * @param job The job associated to the fetch + * @param mbi The mbi of the {@link Patient} + * @return The {@link Patient} + */ + private Optional fetchPatient(JobQueueBatch job, String mbi) { + JobHeaders headers = new JobHeaders( + job.getRequestingIP(), + job.getJobID().toString(), + job.getProviderNPI(), + job.getTransactionTime().toString(), + job.isBulk()); + + Bundle patients; + try { + patients = bbclient.requestPatientFromServerByMbi(mbi, headers.buildHeaders()); + } catch (Exception e) { + logger.error("Failed to retrieve Patient", e); + return Optional.empty(); + } + + // If we get more than one unique Patient for an MBI then we've got some upstream problems. + if (patients.getTotal() == 1) { + return Optional.of((Patient) patients.getEntryFirstRep().getResource()); + } + + logger.error("Expected 1 Patient to match MBI but found {}", patients.getTotal()); + return Optional.empty(); + } + + private List getLookBackAnswers(JobQueueBatch job, Patient patient) { List result = new ArrayList<>(); final String practitionerNPI = job.getProviderNPI(); final String organizationNPI = job.getOrgNPI(); if (practitionerNPI != null && organizationNPI != null) { MDC.put(MDCConstants.PROVIDER_NPI, practitionerNPI); - Flowable flowable = fetchResource(job, patientId, DPCResourceType.ExplanationOfBenefit, null); + Flowable flowable = fetchResource(job, patient, DPCResourceType.ExplanationOfBenefit, null); result = flowable .filter(resource -> Objects.requireNonNull(DPCResourceType.ExplanationOfBenefit.getPath()).equals(resource.getResourceType().getPath())) .map(ExplanationOfBenefit.class::cast) @@ -213,12 +312,19 @@ private Meter getMeter(DPCResourceType resourceType) { return DPCResourceType.OperationOutcome == resourceType ? operationalOutcomeMeter : resourceMeter; } - private Pair>, Optional> getConsent(String patientId) { + /** + * Returns a {@link List} of {@link ConsentResult}s if successful. An {@link OperationOutcome} if not. Only one of + * the two {@link Optional}s returned will be filled in. + * + * @param patient A {@link Patient} that we want to get {@link ConsentResult}s for + * @return A {@link Pair} + */ + private Pair>, Optional> getConsent(Patient patient) { try { - return Pair.of(consentService.getConsent(patientId), Optional.empty()); + return Pair.of(consentService.getConsent(getPatientMBIs(patient)), Optional.empty()); } catch (Exception e) { logger.error("Unable to retrieve consent from consent service.", e); - OperationOutcome operationOutcome = AggregationUtils.toOperationOutcome(OutcomeReason.INTERNAL_ERROR, patientId); + OperationOutcome operationOutcome = AggregationUtils.toOperationOutcome(OutcomeReason.INTERNAL_ERROR, getPatientMBI(patient)); return Pair.of(Optional.empty(), Optional.of(operationOutcome)); } } @@ -245,20 +351,4 @@ private boolean passesLookBack(List answers) { return answers.stream() .anyMatch(a -> a.matchDateCriteria() && (a.orgNPIMatchAnyEobNPIs() || a.practitionerNPIMatchAnyEobNPIs())); } - - /** - * Takes a list of resources, finds all of the {@link Patient}s and returns a list of their valid - * MBIs. If there is more than one {@link Patient} all of their MBIs will be returned, and if there are no - * {@link Patient}s an empty list will be returned. - * @param resources A {@link Flowable} of FHIR {@link Resource}s - * @return A {@link List} of MBIs - */ - private List getMBIs(Flowable resources) { - return resources - .filter(r -> DPCResourceType.Patient.getPath().equals(r.getResourceType().getPath())) - .map(r -> (Patient)r) - .flatMap(p -> Flowable.fromIterable(getPatientMBIs(p))) - .toList() - .blockingGet(); - } } diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/JobBatchProcessorV2.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/JobBatchProcessorV2.java deleted file mode 100644 index 0e59a12ea3..0000000000 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/JobBatchProcessorV2.java +++ /dev/null @@ -1,198 +0,0 @@ -package gov.cms.dpc.aggregation.engine; - -import ca.uhn.fhir.context.FhirContext; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.google.inject.name.Named; -import gov.cms.dpc.aggregation.service.ConsentResult; -import gov.cms.dpc.aggregation.service.ConsentService; -import gov.cms.dpc.aggregation.util.AggregationUtils; -import gov.cms.dpc.bluebutton.clientV2.BlueButtonClientV2; -import gov.cms.dpc.common.utils.MetricMaker; -import gov.cms.dpc.fhir.DPCResourceType; -import gov.cms.dpc.queue.IJobQueue; -import gov.cms.dpc.queue.models.JobQueueBatch; -import gov.cms.dpc.queue.models.JobQueueBatchFile; -import io.reactivex.Flowable; -import org.apache.commons.lang3.time.StopWatch; -import org.apache.commons.lang3.tuple.Pair; -import org.hl7.fhir.r4.model.OperationOutcome; -import org.hl7.fhir.r4.model.Resource; -import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import java.time.OffsetDateTime; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -public class JobBatchProcessorV2 { - private static final Logger logger = LoggerFactory.getLogger(JobBatchProcessorV2.class); - - private final BlueButtonClientV2 bbclientV2; - private final OperationsConfig operationsConfig; - private final FhirContext fhirContext; - private final Meter resourceMeter; - private final Meter operationalOutcomeMeter; - private final ConsentService consentService; - - @Inject - public JobBatchProcessorV2(BlueButtonClientV2 bbclientV2, @Named("fhirContextR4") FhirContext fhirContext, MetricRegistry metricRegistry, OperationsConfig operationsConfig, ConsentService consentService) { - this.bbclientV2 = bbclientV2; - this.fhirContext = fhirContext; - this.operationsConfig = operationsConfig; - this.consentService = consentService; - - // Metrics - final var metricFactory = new MetricMaker(metricRegistry, JobBatchProcessorV2.class); - resourceMeter = metricFactory.registerMeter("resourceFetched"); - operationalOutcomeMeter = metricFactory.registerMeter("operationalOutcomes"); - } - - /** - * Processes a partial of a job batch. Marks the partial as completed upon processing - * - * @param aggregatorID the current aggregatorID - * @param queue the queue - * @param job the job to process - * @param patientID the current patient id to process - * @return A list of batch files {@link JobQueueBatchFile} - */ - public List processJobBatchPartial(UUID aggregatorID, IJobQueue queue, JobQueueBatch job, String patientID) { - StopWatch stopWatch = StopWatch.createStarted(); - OutcomeReason failReason = null; - final Pair>, Optional> consentResult = getConsent(patientID); - - Flowable flowable; - if (consentResult.getRight().isPresent()) { - flowable = Flowable.just(consentResult.getRight().get()); - failReason = OutcomeReason.INTERNAL_ERROR; - } else if (isOptedOut(consentResult.getLeft())) { - failReason = OutcomeReason.CONSENT_OPTED_OUT; - flowable = Flowable.just(AggregationUtils.toOperationOutcomeV2(OutcomeReason.CONSENT_OPTED_OUT, patientID)); - } else { - logger.info("Skipping lookBack for V2 job: {}", job.getOrgID().toString()); - flowable = Flowable.fromIterable(job.getResourceTypes()) - .flatMap(r -> fetchResource(job, patientID, r, job.getSince().orElse(null))); - } - - final var results = writeResource(job, flowable) - .toList() - .blockingGet(); - queue.completePartialBatch(job, aggregatorID); - - final String resourcesRequested = job.getResourceTypes().stream().map(DPCResourceType::getPath).filter(Objects::nonNull).collect(Collectors.joining(";")); - final String failReasonLabel = failReason == null ? "NA" : failReason.name(); - stopWatch.stop(); - logger.info("dpcMetric=DataExportResult,dataRetrieved={},failReason={},resourcesRequested={},duration={}", failReason == null, failReasonLabel, resourcesRequested, stopWatch.getTime()); - return results; - } - - /** - * Fetch and write a specific resource type - * - * @param job the job to associate the fetch - * @param patientID the patientID to fetch data - * @param resourceType the resourceType to fetch data - * @param since the since date - * @return A flowable and resourceType the user requested - */ - private Flowable fetchResource(JobQueueBatch job, String patientID, DPCResourceType resourceType, OffsetDateTime since) { - // Make this flow hot (ie. only called once) when multiple subscribers attach - final var fetcher = new ResourceFetcherV2(bbclientV2, - job.getJobID(), - job.getBatchID(), - resourceType, - since, - job.getTransactionTime()); - return fetcher.fetchResources(patientID, new JobHeaders(job.getRequestingIP(),job.getJobID().toString(), - job.getProviderNPI(),job.getTransactionTime().toString(),job.isBulk()).buildHeaders()). - flatMap(Flowable::fromIterable); - } - - private Flowable writeResource(JobQueueBatch job, Flowable flow) { - return flow.groupBy(Resource::getResourceType) - .flatMap(groupedByResourceFlow -> { - final var resourceCount = new AtomicInteger(); - final var sequenceCount = new AtomicInteger(); - final var resourceType = groupedByResourceFlow.getKey(); - final var dpcResourceType = DPCResourceType.valueOf(resourceType != null ? resourceType.toString() : null); - job.getJobQueueFileLatest(dpcResourceType).ifPresent(file -> { - resourceCount.set(file.getCount()); - sequenceCount.set(file.getSequence()); - }); - final var writer = new ResourceWriterV2(fhirContext, job, dpcResourceType, operationsConfig); - return groupedByResourceFlow.compose((upstream) -> bufferAndWrite(upstream, writer, resourceCount, sequenceCount)); - }); - } - - - /** - * This part of the flow chain buffers resources and writes them in batches to a file - * - * @param writer - the writer to use - * @param resourceCount - the number of resources in the current file - * @param sequenceCount - the sequence counter - * @return a transformed flow - */ - private Publisher bufferAndWrite(Flowable upstream, ResourceWriterV2 writer, AtomicInteger resourceCount, AtomicInteger sequenceCount) { - final Flowable filteredUpstream = upstream.filter(r -> r.getResourceType().getPath().equals(writer.getResourceType().getPath())); - final var connectableMixedFlow = filteredUpstream.publish().autoConnect(2); - - var resourcesInCurrentFileCount = resourceCount.getAndSet(0); - var resourcesPerFile = operationsConfig.getResourcesPerFileCount(); - var firstResourceBatchCount = resourcesInCurrentFileCount < resourcesPerFile ? resourcesPerFile - resourcesInCurrentFileCount : resourcesPerFile; - - if (resourcesInCurrentFileCount == resourcesPerFile) { - // Start a new file since the file has been filled up - sequenceCount.incrementAndGet(); - } - Meter meter = getMeter(writer.getResourceType()); - // Handle the scenario where a previous file was already written by breaking up the flow into the first batch and the buffered batch - final Flowable partialBatch = connectableMixedFlow - .compose(stream -> writeResources(stream.take(firstResourceBatchCount), writer, sequenceCount, meter)); - final Flowable bufferedBatch = connectableMixedFlow - .compose(stream -> writeResources(stream.skip(firstResourceBatchCount), writer, sequenceCount, meter)); - - return partialBatch.mergeWith(bufferedBatch); - } - - private Flowable writeResources(Flowable upstream, ResourceWriterV2 writer, AtomicInteger sequenceCount, Meter meter) { - return upstream - .buffer(operationsConfig.getResourcesPerFileCount()) - .doOnNext(outcomes -> meter.mark(outcomes.size())) - .map(batch -> writer.writeBatch(sequenceCount, batch)); - } - - private Meter getMeter(DPCResourceType resourceType) { - return DPCResourceType.OperationOutcome == resourceType ? operationalOutcomeMeter : resourceMeter; - } - - private Pair>, Optional> getConsent(String patientId) { - try { - return Pair.of(consentService.getConsent(patientId), Optional.empty()); - } catch (Exception e) { - logger.error("Unable to retrieve consent from consent service.", e); - OperationOutcome operationOutcome = AggregationUtils.toOperationOutcomeV2(OutcomeReason.INTERNAL_ERROR, patientId); - return Pair.of(Optional.empty(), Optional.of(operationOutcome)); - } - } - - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - private boolean isOptedOut(Optional> consentResultsOptional) { - if (consentResultsOptional.isPresent()) { - final List consentResults = consentResultsOptional.get(); - long optOutCount = consentResults.stream().filter(consentResult -> { - final boolean isActive = consentResult.isActive(); - final boolean isOptOut = ConsentResult.PolicyType.OPT_OUT.equals(consentResult.getPolicyType()); - final boolean isFutureConsent = consentResult.getConsentDate().after(new Date()); - return isActive && isOptOut && !isFutureConsent; - }).count(); - return optOutCount > 0; - } - return true; - } - -} diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceFetcher.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceFetcher.java index cd0bbde1d3..aae6c05e76 100644 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceFetcher.java +++ b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceFetcher.java @@ -16,11 +16,12 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import java.security.GeneralSecurityException; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.*; +import static gov.cms.dpc.fhir.FHIRExtractors.getPatientMBI; + /** * A resource fetcher will fetch resources of particular type from passed {@link BlueButtonClient} */ @@ -60,18 +61,18 @@ class ResourceFetcher { * Fetch all the resources for a specific patient. If errors are encountered from BlueButton, * a OperationOutcome resource is used. * - * @param mbi to use + * @param patient {@link Patient} we're fetching resources for * @param headers headers * @return a flow with all the resources for specific patient */ - Flowable> fetchResources(String mbi, Map headers) { + Flowable> fetchResources(Patient patient, Map headers) { return Flowable.fromCallable(() -> { String fetchId = UUID.randomUUID().toString(); logger.debug("Fetching first {} from BlueButton for {}", resourceType.toString(), fetchId); - final Bundle firstFetched = fetchFirst(mbi, headers); + final Bundle firstFetched = fetchFirst(patient, headers); return fetchAllBundles(firstFetched, fetchId, headers); }) - .onErrorResumeNext((Throwable error) -> handleError(mbi, error)); + .onErrorResumeNext((Throwable error) -> handleError(patient, error)); } /** @@ -101,11 +102,11 @@ private List fetchAllBundles(Bundle firstBundle, String fetchId, Map< /** * Turn an error into a flow. - * @param mbi MBI + * @param patient The {@link Patient} we're loading resources for * @param error the error * @return a Flowable of list of resources */ - private Publisher> handleError(String mbi, Throwable error) { + private Publisher> handleError(Patient patient, Throwable error) { if (error instanceof JobQueueFailure) { // JobQueueFailure is an internal error. Just pass it along as an error. return Flowable.error(error); @@ -113,65 +114,41 @@ private Publisher> handleError(String mbi, Throwable error) { // Other errors should be turned into OperationOutcome and just recorded. logger.error("Turning error into OperationOutcome.", error); - final var operationOutcome = formOperationOutcome(mbi, error); + final var operationOutcome = formOperationOutcome(getPatientMBI(patient), error); return Flowable.just(List.of(operationOutcome)); } /** * Based on resourceType, fetch a resource or a bundle of resources. * - * @param mbi of the resource to fetch + * @param patient {@link Patient} we're fetching resrouc * @return the first bundle of resources */ - private Bundle fetchFirst(String mbi, Map headers) { - Patient patient = fetchPatient(mbi, headers); + private Bundle fetchFirst(Patient patient, Map headers) { patient.getIdentifier().stream() .filter(i -> i.getSystem().equals(DPCIdentifierSystem.MBI_HASH.getSystem())) .findFirst() .ifPresent(i -> MDC.put(MDCConstants.PATIENT_ID, i.getValue())); - var beneId = getBeneIdFromPatient(patient); + String patientId = patient.getIdElement().getIdPart(); + final var lastUpdated = formLastUpdatedParam(); switch (resourceType) { + // Why are we reloading the Patient, because on reload we're passing a lastUpdated parameter. If the user + // only wants resources from the last month and our patient hasn't been updated in a year, this method will + // return an empty bundle. + // TODO: Implement the since parameter ourselves using meta.lastUpdated and avoid pulling Patient and EoB twice. case Patient: - return blueButtonClient.requestPatientFromServer(beneId, lastUpdated, headers); + return blueButtonClient.requestPatientFromServer(patientId, lastUpdated, headers); case ExplanationOfBenefit: - return blueButtonClient.requestEOBFromServer(beneId, lastUpdated, headers); + return blueButtonClient.requestEOBFromServer(patientId, lastUpdated, headers); case Coverage: - return blueButtonClient.requestCoverageFromServer(beneId, lastUpdated, headers); + return blueButtonClient.requestCoverageFromServer(patientId, lastUpdated, headers); default: throw new JobQueueFailure(jobID, batchID, "Unexpected resource type: " + resourceType.toString()); } } - private Patient fetchPatient(String mbi, Map headers) { - Bundle patients; - try { - patients = blueButtonClient.requestPatientFromServerByMbi(mbi, headers); - } catch (GeneralSecurityException e) { - logger.error("Failed to retrieve Patient", e); - throw new ResourceNotFoundException("Failed to retrieve Patient"); - } - - if (patients.getTotal() == 1) { - return (Patient) patients.getEntryFirstRep().getResource(); - } - - logger.error("Expected 1 Patient to match MBI but found {}", patients.getTotal()); - throw new ResourceNotFoundException(String.format("Expected 1 Patient to match MBI but found %d", patients.getTotal())); - } - - private String getBeneIdFromPatient(Patient patient) { - return patient.getIdentifier().stream() - .filter(id -> DPCIdentifierSystem.BENE_ID.getSystem().equals(id.getSystem())) - .findFirst() - .map(Identifier::getValue) - .orElseThrow(() -> { - logger.error("No bene_id found in Patient resource"); - return new ResourceNotFoundException("No bene_id found in Patient resource"); - }); - } - /** * Add resources in a bundle to a list * diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceFetcherV2.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceFetcherV2.java deleted file mode 100644 index 8b89d8d763..0000000000 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceFetcherV2.java +++ /dev/null @@ -1,254 +0,0 @@ -package gov.cms.dpc.aggregation.engine; - -import ca.uhn.fhir.parser.DataFormatException; -import ca.uhn.fhir.rest.param.DateRangeParam; -import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; -import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; -import gov.cms.dpc.bluebutton.clientV2.BlueButtonClientV2; -import gov.cms.dpc.common.MDCConstants; -import gov.cms.dpc.fhir.DPCIdentifierSystem; -import gov.cms.dpc.fhir.DPCResourceType; -import gov.cms.dpc.queue.exceptions.JobQueueFailure; -import io.reactivex.Flowable; -import org.hl7.fhir.r4.model.*; -import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; - -import java.security.GeneralSecurityException; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.*; - -/** - * A resource fetcher will fetch resources of particular type from passed {@link BlueButtonClientV2} - */ -class ResourceFetcherV2 { - private static final Logger logger = LoggerFactory.getLogger(ResourceFetcherV2.class); - private final BlueButtonClientV2 blueButtonClientV2; - private final UUID jobID; - private final UUID batchID; - private final DPCResourceType resourceType; - private final OffsetDateTime since; - private final OffsetDateTime transactionTime; - - /** - * Create a context for fetching FHIR resources - * @param blueButtonClientV2 - client to BlueButton to use - * @param jobID - the jobID for logging and reporting - * @param batchID - the batchID for logging and reporting - * @param resourceType - the resource type to fetch - * @param since - the since parameter for the job - * @param transactionTime - the start time of this job - */ - ResourceFetcherV2(BlueButtonClientV2 blueButtonClientV2, - UUID jobID, - UUID batchID, - DPCResourceType resourceType, - OffsetDateTime since, - OffsetDateTime transactionTime) { - this.blueButtonClientV2 = blueButtonClientV2; - this.jobID = jobID; - this.batchID = batchID; - this.resourceType = resourceType; - this.since = since; - this.transactionTime = transactionTime; - } - - /** - * Fetch all the resources for a specific patient. If errors are encountered from BlueButton, - * a OperationOutcome resource is used. - * - * @param mbi to use - * @param headers headers - * @return a flow with all the resources for specific patient - */ - Flowable> fetchResources(String mbi, Map headers) { - return Flowable.fromCallable(() -> { - String fetchId = UUID.randomUUID().toString(); - logger.debug("Fetching first {} from BlueButton for {}", resourceType.toString(), fetchId); - final Bundle firstFetched = fetchFirst(mbi, headers); - return fetchAllBundles(firstFetched, fetchId, headers); - }) - .onErrorResumeNext((Throwable error) -> handleError(mbi, error)); - } - - /** - * Given a bundle, return a list of resources in the passed in bundle and all - * the resources from the next bundles. - * - * @param firstBundle of resources. Included in the result list - * @return a list of all the resources in the first bundle and all next bundles - */ - private List fetchAllBundles(Bundle firstBundle, String fetchId, Map headers) { - final var resources = new ArrayList(); - checkBundleTransactionTime(firstBundle); - addResources(resources, firstBundle); - - // Loop until no more next bundles - var bundle = firstBundle; - while (bundle.getLink(Bundle.LINK_NEXT) != null) { - logger.debug("Fetching next bundle {} from BlueButton for {}", resourceType.toString(), fetchId); - bundle = blueButtonClientV2.requestNextBundleFromServer(bundle, headers); - checkBundleTransactionTime(bundle); - addResources(resources, bundle); - } - - logger.debug("Done fetching bundles {} for {}", resourceType.toString(), fetchId); - return resources; - } - - /** - * Turn an error into a flow. - * @param mbi MBI - * @param error the error - * @return a Flowable of list of resources - */ - private Publisher> handleError(String mbi, Throwable error) { - if (error instanceof JobQueueFailure) { - // JobQueueFailure is an internal error. Just pass it along as an error. - return Flowable.error(error); - } - - // Other errors should be turned into OperationOutcome and just recorded. - logger.error("Turning error into OperationOutcome.", error); - final var operationOutcome = formOperationOutcome(mbi, error); - return Flowable.just(List.of(operationOutcome)); - } - - /** - * Based on resourceType, fetch a resource or a bundle of resources. - * - * @param mbi of the resource to fetch - * @return the first bundle of resources - */ - private Bundle fetchFirst(String mbi, Map headers) { - Patient patient = fetchPatient(mbi, headers); - patient.getIdentifier().stream() - .filter(i -> i.getSystem().equals(DPCIdentifierSystem.MBI_HASH.getSystem())) - .findFirst() - .ifPresent(i -> MDC.put(MDCConstants.PATIENT_ID, i.getValue())); - - var beneId = getBeneIdFromPatient(patient); - final var lastUpdated = formLastUpdatedParam(); - switch (resourceType) { - case Patient: - return blueButtonClientV2.requestPatientFromServer(beneId, lastUpdated, headers); - case ExplanationOfBenefit: - return blueButtonClientV2.requestEOBFromServer(beneId, lastUpdated, headers); - case Coverage: - return blueButtonClientV2.requestCoverageFromServer(beneId, lastUpdated, headers); - default: - throw new JobQueueFailure(jobID, batchID, "Unexpected resource type: " + resourceType); - } - } - - private Patient fetchPatient(String mbi, Map headers) { - Bundle patients; - try { - patients = blueButtonClientV2.requestPatientFromServerByMbi(mbi, headers); - } catch (GeneralSecurityException e) { - logger.error("Failed to retrieve Patient", e); - throw new ResourceNotFoundException("Failed to retrieve Patient"); - } - - if (patients.getTotal() == 1) { - return (Patient) patients.getEntryFirstRep().getResource(); - } - - logger.error("Expected 1 Patient to match MBI but found {}", patients.getTotal()); - throw new ResourceNotFoundException(String.format("Expected 1 Patient to match MBI but found %d", patients.getTotal())); - } - - private String getBeneIdFromPatient(Patient patient) { - return patient.getIdentifier().stream() - .filter(id -> DPCIdentifierSystem.BENE_ID.getSystem().equals(id.getSystem())) - .findFirst() - .map(Identifier::getValue) - .orElseThrow(() -> { - logger.error("No bene_id found in Patient resource"); - return new ResourceNotFoundException("No bene_id found in Patient resource"); - }); - } - - /** - * Add resources in a bundle to a list - * - * @param resources - the list to add resources to - * @param bundle - the bundle to extract resources from - */ - private void addResources(ArrayList resources, Bundle bundle) { - bundle.getEntry().forEach((entry) -> { - final var resource = entry.getResource(); - if (!resource.getResourceType().getPath().equals(resourceType.getPath())) { - throw new DataFormatException(String.format("Unexpected resource type: got %s expected: %s", resource.getResourceType().toString(), resourceType)); - } - resources.add(resource); - }); - } - - /** - * Create a {@link OperationOutcome} resource from an exception with a patient - * - * @param ex - the exception to turn into a Operation Outcome - * @return an operation outcome - */ - private OperationOutcome formOperationOutcome(String patientID, Throwable ex) { - String details; - if (ex instanceof ResourceNotFoundException) { - details = String.format("%s resource not found in Blue Button for id: %s", resourceType.toString(), patientID); - } else if (ex instanceof BaseServerResponseException) { - final var serverException = (BaseServerResponseException) ex; - details = String.format("Blue Button error fetching %s resource. HTTP return code: %s", resourceType.toString(), serverException.getStatusCode()); - } else { - details = String.format("Internal error: %s", ex.getMessage()); - } - - final var patientLocation = List.of(new StringType("Patient"), new StringType("id"), new StringType(patientID)); - final var outcome = new OperationOutcome(); - outcome.addIssue() - .setSeverity(OperationOutcome.IssueSeverity.ERROR) - .setCode(OperationOutcome.IssueType.EXCEPTION) - .setDetails(new CodeableConcept().setText(details)) - .setLocation(patientLocation); - return outcome; - } - - /** - * Form a date range for the lastUpdated parameter for this export job - * - * @return a date range for this job - */ - @SuppressWarnings("JdkObsolete") // Date class is used by HAPI FHIR DateRangeParam - private DateRangeParam formLastUpdatedParam() { - // Note: FHIR bulk spec says that since is exclusive and transactionTime is inclusive - // It is also says that all resources should not have lastUpdated after the transactionTime. - // This is true for the both the since and the non-since cases. - // BFD will include resources that do not have a lastUpdated if there isn't a complete range. - return since != null ? - new DateRangeParam() - .setUpperBoundInclusive(Date.from(transactionTime.toInstant())) - .setLowerBoundExclusive(Date.from(since.toInstant())) : - new DateRangeParam() - .setUpperBoundInclusive(Date.from(transactionTime.toInstant())); - } - - /** - * Check the transaction time of the BFD against the transaction time of the export job - * - * @param bundle to check - */ - @SuppressWarnings("JdkObsolete") // Date class is used by FHIR stu3 Meta model - private void checkBundleTransactionTime(Bundle bundle) { - if (bundle.getMeta() == null || bundle.getMeta().getLastUpdated() == null) return; - final var bfdTransactionTime = bundle.getMeta().getLastUpdated().toInstant().atOffset(ZoneOffset.UTC); - if (bfdTransactionTime.isBefore(transactionTime)) { - // See BFD's RFC0004 for a discussion on why this type error may occur. - // Note: Retrying the job after a delay may fix this problem. - logger.warn("Warning: BFD transaction time regression: BFD time {}, Job time {}", - bfdTransactionTime, - transactionTime); - } - } -} diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceWriterV2.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceWriterV2.java deleted file mode 100644 index 4bad7bccc0..0000000000 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/engine/ResourceWriterV2.java +++ /dev/null @@ -1,126 +0,0 @@ -package gov.cms.dpc.aggregation.engine; - -import ca.uhn.fhir.context.FhirContext; -import com.google.inject.name.Named; -import gov.cms.dpc.fhir.DPCResourceType; -import gov.cms.dpc.queue.exceptions.JobQueueFailure; -import gov.cms.dpc.queue.models.JobQueueBatch; -import gov.cms.dpc.queue.models.JobQueueBatchFile; -import org.hl7.fhir.r4.model.Resource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Writes files from batches of FHIR Resources - */ -class ResourceWriterV2 { - private static final Logger logger = LoggerFactory.getLogger(ResourceWriterV2.class); - private static final char DELIM = '\n'; - - private final @Named("fhirContextR4") FhirContext fhirContext; - private final OperationsConfig config; - private final JobQueueBatch job; - private final DPCResourceType resourceType; - - /** - * Form the full file name of an output file - * @param batchID - {@link UUID} ID of the batch job - * @param resourceType - {@link DPCResourceType} to append to filename - * @param sequence - batch sequence number - * @return return the path - */ - static String formOutputFilePath(String exportPath, UUID batchID, DPCResourceType resourceType, int sequence) { - return String.format("%s/%s.ndjson", exportPath, JobQueueBatchFile.formOutputFileName(batchID, resourceType, sequence)); - } - - /** - * Create a context for fetching FHIR resources - * @param fhirContext - the single context for the engine - * @param job - the context for logging and reporting - * @param resourceType - the resource type to fetch - * @param config - config to use for the engine - */ - ResourceWriterV2(@Named("fhirContextR4") FhirContext fhirContext, - JobQueueBatch job, - DPCResourceType resourceType, - OperationsConfig config) { - this.fhirContext = fhirContext; - this.config = config; - this.job = job; - this.resourceType = resourceType; - } - - /** - * @return The resourceType of this resource - */ - DPCResourceType getResourceType() { - return resourceType; - } - - /** - * Write a batch of resources to a file. - * - * @param batch is the list of resources to write - * @param counter is general counter for batch number - * @return The JobQueueBatchFile associated with this file - */ - JobQueueBatchFile writeBatch(AtomicInteger counter, List batch) { - try { - final var byteStream = new ByteArrayOutputStream(); - final var sequence = counter.getAndIncrement(); - final var jsonParser = fhirContext.newJsonParser(); - OutputStream writer = byteStream; - String outputPath = formOutputFilePath(config.getExportPath(), job.getBatchID(), resourceType, sequence); - JobQueueBatchFile file = job.addJobQueueFile(resourceType, sequence, batch.size()); - - boolean isStartOfFile = batch.size() == file.getCount(); - Boolean shouldAppendToFile = !isStartOfFile; - - logger.debug("Start writing to {}", outputPath); - for (var resource: batch) { - final String str = jsonParser.encodeResourceToString(resource); - writer.write(str.getBytes(StandardCharsets.UTF_8)); - writer.write(DELIM); - } - writer.flush(); - writer.close(); - writeToFile(byteStream.toByteArray(), outputPath, shouldAppendToFile); - logger.debug("Finished writing to '{}'", outputPath); - - return file; - } catch(IOException ex) { - throw new JobQueueFailure(job.getJobID(), job.getBatchID(), "IO error writing a resource", ex); - } catch(SecurityException ex) { - throw new JobQueueFailure(job.getJobID(), job.getBatchID(), "Error encrypting a resource", ex); - } catch(Exception ex) { - throw new JobQueueFailure(job.getJobID(), job.getBatchID(), "General failure consuming a resource", ex); - } - } - - /** - * Write a array of bytes to a file. Name the file according to the supplied name - * - * @param bytes - Bytes to write - * @param fileName - The fileName to write too - * @param append - If the - * @throws IOException - If the write fails - */ - private void writeToFile(byte[] bytes, String fileName, Boolean append) throws IOException { - if (bytes.length == 0) { - return; - } - try (final var outputFile = new FileOutputStream(fileName, append)) { - outputFile.write(bytes); - outputFile.flush(); - } - } -} diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentResult.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentResult.java index 9481018fab..7786496b30 100644 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentResult.java +++ b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentResult.java @@ -61,5 +61,7 @@ public static PolicyType fromPolicyUrl(String url) { } return null; } + + public String getValue() {return policyUrl;} } } diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentService.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentService.java index 09cf70f3ad..d7f0270d1e 100644 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentService.java +++ b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentService.java @@ -5,4 +5,5 @@ public interface ConsentService { Optional> getConsent(String mbi); + Optional> getConsent(List mbis); } diff --git a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentServiceImpl.java b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentServiceImpl.java index b3571f69ab..3824a4eaa3 100644 --- a/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentServiceImpl.java +++ b/dpc-aggregation/src/main/java/gov/cms/dpc/aggregation/service/ConsentServiceImpl.java @@ -20,29 +20,38 @@ public ConsentServiceImpl(@Named("consentClient") IGenericClient consentClient){ @Override public Optional> getConsent(String mbi) { - final Bundle bundle = doConsentSearch(mbi); - - List results = bundle.getEntry().stream().map(entryComponent -> { - Consent consent = (Consent) entryComponent.getResource(); - ConsentResult consentResult = new ConsentResult(); - consentResult.setActive(Consent.ConsentState.ACTIVE.equals(consent.getStatus())); - consentResult.setConsentDate(consent.getDateTime()); - consentResult.setConsentId(consent.getId()); - consentResult.setPolicyType(ConsentResult.PolicyType.fromPolicyUrl(consent.getPolicyRule())); - return consentResult; - }).collect(Collectors.toList()); - - return Optional.of(results); + return getConsent(List.of(mbi)); } - private Bundle doConsentSearch(String mbi){ - final String mbiIdentifier = String.format("%s|%s", DPCIdentifierSystem.MBI.getSystem(), mbi); + @Override + public Optional> getConsent(List mbis) { + final Bundle bundle = doConsentSearch(mbis); + + return Optional.of( + bundle.getEntry().stream().map( entry -> { + Consent consent = (Consent) entry.getResource(); + + ConsentResult consentResult = new ConsentResult(); + consentResult.setActive(Consent.ConsentState.ACTIVE.equals(consent.getStatus())); + consentResult.setConsentDate(consent.getDateTime()); + consentResult.setConsentId(consent.getId()); + consentResult.setPolicyType(ConsentResult.PolicyType.fromPolicyUrl(consent.getPolicyRule())); + return consentResult; + }).collect(Collectors.toList()) + ); + } + + private Bundle doConsentSearch(List mbis){ + List fullMbis = mbis.stream() + .map( mbi -> String.format("%s|%s", DPCIdentifierSystem.MBI.getSystem(), mbi) ) + .collect(Collectors.toList()); + return consentClient .search() .forResource(Consent.class) .encodedJson() .returnBundle(Bundle.class) - .where(Consent.PATIENT.hasId(mbiIdentifier)) + .where(Consent.PATIENT.hasAnyOfIds(fullMbis)) .execute(); } } diff --git a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/AggregationEngineBFDClientTest.java b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/AggregationEngineBFDClientTest.java index 6226ab5df9..2d90218e58 100644 --- a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/AggregationEngineBFDClientTest.java +++ b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/AggregationEngineBFDClientTest.java @@ -71,9 +71,8 @@ public void setup() throws GeneralSecurityException { BlueButtonClientV2 blueButtonClientV2 = Mockito.spy(new BlueButtonClientV2Impl(bbClientV2, new BBClientConfiguration(), metricRegistry)); OperationsConfig config = new OperationsConfig(1000, tempDir.toString(), 1, 1, 1, YearMonth.now(), List.of(orgID.toString())); JobBatchProcessor processor = new JobBatchProcessor(blueButtonClient, fhirContext, metricRegistry, config, lookBackService, mockConsentService); - JobBatchProcessorV2 processorV2 = new JobBatchProcessorV2(blueButtonClientV2, fhirContextR4, metricRegistry, config, mockConsentService); queue = new MemoryBatchQueue(100); - engine = new AggregationEngine(UUID.randomUUID(), queue, config, processor, processorV2); + engine = new AggregationEngine(UUID.randomUUID(), queue, config, processor); engine.queueRunning.set(true); Mockito.when(blueButtonClient.hashMbi(Mockito.anyString())).thenReturn(MockBlueButtonClient.MBI_HASH_MAP.get(MockBlueButtonClient.TEST_PATIENT_MBIS.get(0))); diff --git a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/AggregationEngineTest.java b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/AggregationEngineTest.java index 4305327f37..0ac391d6b1 100644 --- a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/AggregationEngineTest.java +++ b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/AggregationEngineTest.java @@ -12,8 +12,6 @@ import gov.cms.dpc.aggregation.service.LookBackService; import gov.cms.dpc.bluebutton.client.BlueButtonClient; import gov.cms.dpc.bluebutton.client.MockBlueButtonClient; -import gov.cms.dpc.bluebutton.clientV2.BlueButtonClientV2; -import gov.cms.dpc.bluebutton.clientV2.MockBlueButtonClientV2; import gov.cms.dpc.common.utils.NPIUtil; import gov.cms.dpc.fhir.DPCResourceType; import gov.cms.dpc.fhir.hapi.ContextUtils; @@ -52,7 +50,6 @@ class AggregationEngineTest { private static final String TEST_ORG_NPI = NPIUtil.generateNPI(); private static final String TEST_PROVIDER_NPI = NPIUtil.generateNPI(); private BlueButtonClient bbclient; - private BlueButtonClientV2 bbclientV2; private IJobQueue queue; private AggregationEngine engine; private Disposable subscribe; @@ -60,7 +57,6 @@ class AggregationEngineTest { private ConsentService mockConsentService; static private final FhirContext fhirContext = FhirContext.forDstu3(); - static private final FhirContext fhirContextR4 = FhirContext.forR4(); static private final MetricRegistry metricRegistry = new MetricRegistry(); static private String exportPath; @@ -80,17 +76,15 @@ void setupEach() { consentResult.setActive(true); consentResult.setPolicyType(ConsentResult.PolicyType.OPT_IN); consentResult.setConsentId(UUID.randomUUID().toString()); - Mockito.when(mockConsentService.getConsent(MockBlueButtonClient.TEST_PATIENT_MBIS.get(0))).thenReturn(Optional.of(Lists.list(consentResult))); - Mockito.when(mockConsentService.getConsent(MockBlueButtonClient.TEST_PATIENT_MBIS.get(1))).thenReturn(Optional.of(Lists.list(consentResult))); + Mockito.when(mockConsentService.getConsent(List.of(MockBlueButtonClient.TEST_PATIENT_MBIS.get(0)))).thenReturn(Optional.of(Lists.list(consentResult))); + Mockito.when(mockConsentService.getConsent(List.of(MockBlueButtonClient.TEST_PATIENT_MBIS.get(1)))).thenReturn(Optional.of(Lists.list(consentResult))); queue = Mockito.spy(new MemoryBatchQueue(10)); bbclient = Mockito.spy(new MockBlueButtonClient(fhirContext)); - bbclientV2 = Mockito.spy(new MockBlueButtonClientV2(fhirContextR4)); var operationalConfig = new OperationsConfig(1000, exportPath, 500, YearMonth.of(2014, 3)); lookBackService = Mockito.spy(EveryoneGetsDataLookBackServiceImpl.class); JobBatchProcessor jobBatchProcessor = Mockito.spy(new JobBatchProcessor(bbclient, fhirContext, metricRegistry, operationalConfig, lookBackService, mockConsentService)); - JobBatchProcessorV2 jobBatchProcessorV2 = Mockito.spy(new JobBatchProcessorV2(bbclientV2, fhirContextR4, metricRegistry, operationalConfig, mockConsentService)); - engine = Mockito.spy(new AggregationEngine(aggregatorID, queue, operationalConfig, jobBatchProcessor, jobBatchProcessorV2)); + engine = Mockito.spy(new AggregationEngine(aggregatorID, queue, operationalConfig, jobBatchProcessor)); engine.queueRunning.set(true); AggregationEngine.setGlobalErrorHandler(); subscribe = Mockito.mock(Disposable.class); @@ -106,9 +100,6 @@ void mockBlueButtonClientTest() { var patientMBI = MockBlueButtonClient.MBI_BENE_ID_MAP.get(MockBlueButtonClient.TEST_PATIENT_MBIS.get(0)); Bundle patient = bbclient.requestPatientFromServer(patientMBI, null, null); assertNotNull(patient); - // add test for v2 mock client - org.hl7.fhir.r4.model.Bundle patientV2 = bbclientV2.requestPatientFromServer(patientMBI, null, null); - assertNotNull(patientV2); } /** @@ -236,38 +227,6 @@ void simpleJobTest() { assertFalse(Files.exists(Path.of(errorFilePath)), "expect no error file"); } - /** - * Test if a engine can handle a simple V2 job with one resource type, one test provider, and one patient. - */ - @Test - void simpleV2JobTest() { - final var orgID = UUID.randomUUID(); - - // Make a simple job with one resource type - final var jobID = queue.createJob( - orgID, - TEST_ORG_NPI, - TEST_PROVIDER_NPI, - Collections.singletonList(MockBlueButtonClientV2.TEST_PATIENT_MBIS.get(0)), - Collections.singletonList(DPCResourceType.Patient), - null, - MockBlueButtonClientV2.BFD_TRANSACTION_TIME, - null, "https://example.org/V2/Group/id/$export", true, false); - - // Work the batch - queue.claimBatch(engine.getAggregatorID()) - .ifPresent(engine::processJobBatch); - - // Look at the result - final var completeJob = queue.getJobBatches(jobID).stream().findFirst().orElseThrow(); - assertEquals(JobStatus.COMPLETED, completeJob.getStatus()); - assertEquals(1000, completeJob.getPriority()); - final var outputFilePath = ResourceWriterV2.formOutputFilePath(exportPath, completeJob.getBatchID(), DPCResourceType.Patient, 0); - assertTrue(Files.exists(Path.of(outputFilePath))); - final var errorFilePath = ResourceWriterV2.formOutputFilePath(exportPath, completeJob.getBatchID(), DPCResourceType.OperationOutcome, 0); - assertFalse(Files.exists(Path.of(errorFilePath)), "expect no error file"); - } - /** * Test if a engine can handle a simple job with one resource type, one test provider, one patient and since. */ @@ -299,37 +258,6 @@ void sinceJobTest() { assertFalse(Files.exists(Path.of(errorFilePath)), "expect no error file"); } - /** - * Test if a engine can handle a simple V2 job with one resource type, one test provider, one patient and since. - */ - @Test - void sinceV2JobTest() { - final var orgID = UUID.randomUUID(); - - // Make a simple job with one resource type - final var jobID = queue.createJob( - orgID, - TEST_ORG_NPI, - TEST_PROVIDER_NPI, - Collections.singletonList(MockBlueButtonClientV2.TEST_PATIENT_MBIS.get(0)), - Collections.singletonList(DPCResourceType.Patient), - MockBlueButtonClientV2.BFD_TRANSACTION_TIME, - MockBlueButtonClientV2.BFD_TRANSACTION_TIME, - null, "https://example.org/v2/Group/id/$export", true, false); - - // Work the batch - queue.claimBatch(engine.getAggregatorID()) - .ifPresent(engine::processJobBatch); - - // Look at the result. Should be not have any output file. - final var completeJob = queue.getJobBatches(jobID).stream().findFirst().orElseThrow(); - assertEquals(JobStatus.COMPLETED, completeJob.getStatus()); - final var outputFilePath = ResourceWriterV2.formOutputFilePath(exportPath, completeJob.getBatchID(), DPCResourceType.Patient, 0); - assertFalse(Files.exists(Path.of(outputFilePath))); - final var errorFilePath = ResourceWriterV2.formOutputFilePath(exportPath, completeJob.getBatchID(), DPCResourceType.OperationOutcome, 0); - assertFalse(Files.exists(Path.of(errorFilePath)), "expect no error file"); - } - /** * Test if the engine can handle a job with multiple output files and patients */ @@ -362,38 +290,6 @@ void multipleFileJobTest() { }); } - /** - * Test if the engine can handle a V2 job with multiple output files and patients - */ - @Test - void multipleFileV2JobTest() { - final var orgID = UUID.randomUUID(); - final List mbis = List.of(MockBlueButtonClient.TEST_PATIENT_MBIS.get(0), MockBlueButtonClient.TEST_PATIENT_MBIS.get(1)); - - // build a job with multiple resource types - final var jobID = queue.createJob( - orgID, - TEST_ORG_NPI, - TEST_PROVIDER_NPI, - mbis, - JobQueueBatch.validResourceTypes, - null, - MockBlueButtonClientV2.BFD_TRANSACTION_TIME, - null, "https://example.org/v2/Group/id/$export", true, false); - - // Work the batch - queue.claimBatch(engine.getAggregatorID()) - .ifPresent(engine::processJobBatch); - - // Look at the result - assertAll(() -> assertTrue(queue.getJobBatches(jobID).stream().findFirst().isPresent()), - () -> assertEquals(JobStatus.COMPLETED, queue.getJobBatches(jobID).stream().findFirst().get().getStatus())); - JobQueueBatch.validResourceTypes.forEach(resourceType -> { - var outputFilePath = ResourceWriterV2.formOutputFilePath(exportPath, queue.getJobBatches(jobID).stream().findFirst().get().getBatchID(), resourceType, 0); - assertTrue(Files.exists(Path.of(outputFilePath))); - }); - } - /** * Test if the engine can split a job into multiple batches */ @@ -417,29 +313,6 @@ void multipleBatchJobTest() { assertEquals(5000, queue.getJobBatches(jobID).get(0).getPriority()); } - /** - * Test if the engine can split a V2 job into multiple batches - */ - @Test - void multipleBatchV2JobTest() { - final var orgID = UUID.randomUUID(); - - // build a job with multiple resource types - final var jobID = queue.createJob( - orgID, - TEST_ORG_NPI, - TEST_PROVIDER_NPI, - Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11"), - JobQueueBatch.validResourceTypes, - null, - MockBlueButtonClientV2.BFD_TRANSACTION_TIME, - null, "https://example.org/v2/Group/id/$export", true, false); - - // Assert the queue size - assertEquals(2, queue.queueSize()); - assertEquals(5000, queue.getJobBatches(jobID).get(0).getPriority()); - } - /** * Test if the engine can handle a pausing a job on shutdown */ @@ -518,44 +391,6 @@ void appendBatchFileTest() { } } - /** - * Test if the engine can handle appending to a V2 batch file with multiple patients - */ - @Test - void appendV2BatchFileTest() { - final var orgID = UUID.randomUUID(); - final List mbis = List.of(MockBlueButtonClientV2.TEST_PATIENT_MBIS.get(0), MockBlueButtonClientV2.TEST_PATIENT_MBIS.get(1)); - - // build a job with multiple resource types - final var jobID = queue.createJob( - orgID, - TEST_ORG_NPI, - TEST_PROVIDER_NPI, - mbis, - Collections.singletonList(DPCResourceType.Patient), - null, - MockBlueButtonClientV2.BFD_TRANSACTION_TIME, - null, "https://example.org/v2/Group/id/$export", true, false); - - // Work the batch - queue.claimBatch(engine.getAggregatorID()) - .ifPresent(engine::processJobBatch); - - // Look at the result - assertAll( - () -> assertTrue(queue.getJobBatches(jobID).stream().findFirst().isPresent()), - () -> assertEquals(JobStatus.COMPLETED, queue.getJobBatches(jobID).stream().findFirst().get().getStatus()) - ); - var outputFilePath = ResourceWriterV2.formOutputFilePath(exportPath, queue.getJobBatches(jobID).stream().findFirst().get().getBatchID(), DPCResourceType.Patient, 0); - assertTrue(Files.exists(Path.of(outputFilePath))); - try { - final String fileContents = Files.readString(Path.of(outputFilePath)); - assertEquals(mbis.size(), Arrays.stream(fileContents.split("\n")).count(), "Contains multiple patients in file output"); - } catch (Exception e) { - fail("Failed to read output file"); - } - } - /** * Test if the engine can handle a job with no attributions */ @@ -685,9 +520,9 @@ void badPatientIDTest() throws GeneralSecurityException { Mockito.verify(bbclient, atLeastOnce()).requestPatientFromServerByMbi(idCaptor.capture(), anyMap()); Mockito.verify(bbclient, atLeastOnce()).requestEOBFromServer(idCaptor.capture(), lastUpdatedCaptor.capture(), anyMap()); var values = idCaptor.getAllValues(); - assertEquals(0, + assertEquals(1, values.stream().filter(value -> value.equals("-1")).count(), - "Should be 0 call, never makes it past lookback"); + "Should be 1 call when loading the patient for consent check, then doesn't go any further"); // Look at the result. It should have one error, but be successful otherwise. assertTrue(queue.getJobBatches(jobID).stream().findFirst().isPresent()); @@ -701,7 +536,7 @@ void badPatientIDTest() throws GeneralSecurityException { @Test void multiplePatientsMatchTest() { - final List mbis = Collections.singletonList(MockBlueButtonClient.MULTIPLE_RESULTS_MBI); + final List mbis = Collections.singletonList(MockBlueButtonClient.TEST_PATIENT_MULTIPLE_MBIS.get(0)); final var orgID = UUID.randomUUID(); diff --git a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/BatchAggregationEngineTest.java b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/BatchAggregationEngineTest.java index d8f578417c..6561d3fe1f 100644 --- a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/BatchAggregationEngineTest.java +++ b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/BatchAggregationEngineTest.java @@ -7,7 +7,6 @@ import gov.cms.dpc.aggregation.service.*; import gov.cms.dpc.aggregation.util.AggregationUtils; import gov.cms.dpc.bluebutton.client.MockBlueButtonClient; -import gov.cms.dpc.bluebutton.clientV2.MockBlueButtonClientV2; import gov.cms.dpc.common.utils.NPIUtil; import gov.cms.dpc.fhir.DPCResourceType; import gov.cms.dpc.fhir.hapi.ContextUtils; @@ -77,11 +76,9 @@ void setupEach() { consentService = Mockito.mock(ConsentService.class); queue = new MemoryBatchQueue(100); final var bbclient = Mockito.spy(new MockBlueButtonClient(fhirContext)); - final var bbclientV2 = Mockito.spy(new MockBlueButtonClientV2(fhirContextR4)); lookBackService = Mockito.spy(EveryoneGetsDataLookBackServiceImpl.class); JobBatchProcessor jobBatchProcessor = Mockito.spy(new JobBatchProcessor(bbclient, fhirContext, metricRegistry, operationsConfig, lookBackService, consentService)); - JobBatchProcessorV2 jobBatchProcessorV2 = Mockito.spy(new JobBatchProcessorV2(bbclientV2, fhirContextR4, metricRegistry, operationsConfig, consentService)); - engine = Mockito.spy(new AggregationEngine(aggregatorID, queue, operationsConfig, jobBatchProcessor, jobBatchProcessorV2)); + engine = Mockito.spy(new AggregationEngine(aggregatorID, queue, operationsConfig, jobBatchProcessor)); engine.queueRunning.set(true); Disposable subscribe = Mockito.mock(Disposable.class); doReturn(false).when(subscribe).isDisposed(); @@ -92,8 +89,10 @@ void setupEach() { consentResult.setActive(true); consentResult.setPolicyType(ConsentResult.PolicyType.OPT_IN); consentResult.setConsentId(UUID.randomUUID().toString()); - MockBlueButtonClient.TEST_PATIENT_MBIS.forEach(mbi -> Mockito.when(consentService.getConsent(mbi)).thenReturn(Optional.of(Lists.list(consentResult)))); - MockBlueButtonClientV2.TEST_PATIENT_MBIS.forEach(mbi -> Mockito.when(consentService.getConsent(mbi)).thenReturn(Optional.of(Lists.list(consentResult)))); + + MockBlueButtonClient.TEST_PATIENT_MBIS.forEach(mbi -> Mockito.when(consentService.getConsent(List.of(mbi))).thenReturn(Optional.of(Lists.list(consentResult)))); + // Special case where patient has multiple MBIs + Mockito.when(consentService.getConsent(MockBlueButtonClient.TEST_PATIENT_MULTIPLE_MBIS)).thenReturn(Optional.of(Lists.list(consentResult))); } /** @@ -208,80 +207,6 @@ void largeJobWithBadPatientTest() { final var errorFilePath = ResourceWriter.formOutputFilePath(exportPath, completeJob.getBatchID(), DPCResourceType.OperationOutcome, 0); assertTrue(Files.exists(Path.of(errorFilePath)), "expect error file for failed patient"); } - - /** - * Test if a engine can handle a simple V2 job with one resource type, one test provider, and one patient. - */ - @Test - void largeV2JobTestSingleResource() { - // Make a simple job with one resource type - final var orgID = UUID.randomUUID(); - final var jobID = queue.createJob( - orgID, - TEST_ORG_NPI, - TEST_PROVIDER_NPI, - Collections.singletonList(MockBlueButtonClient.TEST_PATIENT_MBIS.get(0)), - Collections.singletonList(DPCResourceType.ExplanationOfBenefit), - MockBlueButtonClientV2.TEST_LAST_UPDATED.minusSeconds(1), - MockBlueButtonClientV2.BFD_TRANSACTION_TIME, - null, "http:example.org/v2/Group/id/$export", true, false); - - // Do the job - queue.claimBatch(engine.getAggregatorID()) - .ifPresent(engine::processJobBatch); - - // Look at the result - final var completeJob = queue.getJobBatches(jobID).stream().findFirst().orElseThrow(); - assertEquals(JobStatus.COMPLETED, completeJob.getStatus()); - final List sorted = completeJob.getJobQueueBatchFiles().stream().sorted(Comparator.comparingInt(JobQueueBatchFile::getSequence)).collect(Collectors.toList()); - assertAll(() -> assertEquals(4, sorted.size()), - () -> assertEquals(10, sorted.get(0).getCount()), - () -> assertEquals(8, sorted.get(3).getCount())); - - // Look at the output files - final var outputFilePath = ResourceWriterV2.formOutputFilePath(exportPath, completeJob.getBatchID(), DPCResourceType.ExplanationOfBenefit, 0); - assertTrue(Files.exists(Path.of(outputFilePath))); - final var errorFilePath = ResourceWriterV2.formOutputFilePath(exportPath, completeJob.getBatchID(), DPCResourceType.OperationOutcome, 0); - assertFalse(Files.exists(Path.of(errorFilePath)), "expect no error file"); - } - - /** - * Test if a engine can handle a V2 simple job with one resource type, one test provider, and one patient. - */ - @Test - void largeV2JobWithBadPatientTest() { - final var orgID = UUID.randomUUID(); - - // Make a simple job with one resource type - final var jobID = queue.createJob( - orgID, - TEST_ORG_NPI, - TEST_PROVIDER_NPI, - MockBlueButtonClientV2.TEST_PATIENT_WITH_BAD_IDS, - Collections.singletonList(DPCResourceType.ExplanationOfBenefit), - MockBlueButtonClientV2.TEST_LAST_UPDATED.minusSeconds(1), - MockBlueButtonClientV2.BFD_TRANSACTION_TIME, - null, "http:example.org/v2/Group/id/$export", true, false); - - // Do the job - queue.claimBatch(engine.getAggregatorID()) - .ifPresent(engine::processJobBatch); - - // Look at the result - final var completeJob = queue.getJobBatches(jobID).stream().findFirst().orElseThrow(); - assertEquals(JobStatus.COMPLETED, completeJob.getStatus()); - assertAll( - () -> assertEquals(5, completeJob.getJobQueueBatchFiles().size(), String.format("Unexpected JobModel: %s", completeJob.toString())), - () -> assertTrue(completeJob.getJobQueueFile(DPCResourceType.ExplanationOfBenefit).isPresent(), "Expect a EOB"), - () -> assertFalse(completeJob.getJobQueueFile(DPCResourceType.OperationOutcome).isEmpty(), "Expect an error")); - - // Look at the output files - final var outputFilePath = ResourceWriterV2.formOutputFilePath(exportPath, completeJob.getBatchID(), DPCResourceType.ExplanationOfBenefit, 0); - assertTrue(Files.exists(Path.of(outputFilePath))); - final var errorFilePath = ResourceWriterV2.formOutputFilePath(exportPath, completeJob.getBatchID(), DPCResourceType.OperationOutcome, 0); - assertTrue(Files.exists(Path.of(errorFilePath)), "expect error file for failed patient"); - } - @Test void lookBackDateCriteriaMismatch() throws IOException { final var orgID = UUID.randomUUID(); diff --git a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/JobBatchProcessorUnitTest.java b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/JobBatchProcessorUnitTest.java index fcdb473002..380542ce44 100644 --- a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/JobBatchProcessorUnitTest.java +++ b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/JobBatchProcessorUnitTest.java @@ -2,125 +2,500 @@ import ca.uhn.fhir.context.FhirContext; import com.codahale.metrics.MetricRegistry; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import gov.cms.dpc.aggregation.service.ConsentResult; import gov.cms.dpc.aggregation.service.ConsentService; +import gov.cms.dpc.aggregation.service.EveryoneGetsDataLookBackServiceImpl; import gov.cms.dpc.aggregation.service.LookBackService; import gov.cms.dpc.bluebutton.client.BlueButtonClient; -import gov.cms.dpc.fhir.DPCIdentifierSystem; -import io.reactivex.Flowable; -import org.hl7.fhir.dstu3.model.Coverage; -import org.hl7.fhir.dstu3.model.ExplanationOfBenefit; -import org.hl7.fhir.dstu3.model.Identifier; -import org.hl7.fhir.dstu3.model.Patient; +import gov.cms.dpc.bluebutton.client.MockBlueButtonClient; +import gov.cms.dpc.common.utils.NPIUtil; +import gov.cms.dpc.fhir.DPCResourceType; +import gov.cms.dpc.queue.IJobQueue; +import gov.cms.dpc.queue.MemoryBatchQueue; +import gov.cms.dpc.queue.models.JobQueueBatch; +import gov.cms.dpc.queue.models.JobQueueBatchFile; +import org.hl7.fhir.dstu3.model.Bundle; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.List; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.GeneralSecurityException; +import java.time.YearMonth; +import java.util.*; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; @ExtendWith(MockitoExtension.class) class JobBatchProcessorUnitTest { - @Mock private BlueButtonClient blueButtonClient; - @Mock private FhirContext fhirContext; - @Mock private OperationsConfig operationsConfig; - @Mock private LookBackService lookBackService; - @Mock private ConsentService consentService; + private static final Config config = ConfigFactory.load("testing.conf").getConfig("dpc.aggregation"); + private static final String TEST_ORG_NPI = NPIUtil.generateNPI(); + private static final String TEST_PROVIDER_NPI = NPIUtil.generateNPI(); + private static ConsentResult optIn; + private static ConsentResult optOut; - private MetricRegistry metricRegistry = new MetricRegistry(); + private final MetricRegistry metricRegistry = new MetricRegistry(); - private JobBatchProcessor jobBatchProcessor = new JobBatchProcessor( - blueButtonClient, fhirContext, metricRegistry, operationsConfig, lookBackService, consentService); + @Spy + private BlueButtonClient bbClient = new MockBlueButtonClient(FhirContext.forDstu3()); + @Mock + private ConsentService consentService; - /* - As a rule, we shouldn't be writing tests for private methods, but this method isn't in use yet so this seemed - like the best way to verify it works. - TODO: Remove these tests after getMBIs is put into use and we have unit tests for the JobBatchProcessor class (DPC-3562). - */ + @BeforeAll + static void setup() { + optIn = new ConsentResult(); + optIn.setConsentDate(new Date()); + optIn.setActive(true); + optIn.setPolicyType(ConsentResult.PolicyType.OPT_IN); + optIn.setConsentId(UUID.randomUUID().toString()); + + optOut = new ConsentResult(); + optOut.setConsentDate(new Date()); + optOut.setActive(true); + optOut.setPolicyType(ConsentResult.PolicyType.OPT_OUT); + optOut.setConsentId(UUID.randomUUID().toString()); + } @Test - public void testGetMBIs_onePatient() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - Identifier identifier = new Identifier(); - identifier.setSystem(DPCIdentifierSystem.MBI.getSystem()); - identifier.setValue("4S41C00AA00"); - - Patient patient = new Patient(); - patient.setIdentifier(List.of(identifier)); - - Method getMBIs = JobBatchProcessor.class.getDeclaredMethod("getMBIs", Flowable.class); - getMBIs.setAccessible(true); - - List mbis = (List) getMBIs.invoke(jobBatchProcessor, Flowable.just( - patient, - new ExplanationOfBenefit(), - new ExplanationOfBenefit(), - new Coverage() - )); - - assertEquals(1, mbis.size()); - assertEquals(identifier.getValue(), mbis.get(0)); + public void testHappyPath() { + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); + + OperationsConfig operationsConfig = getOperationsConfig(); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); + + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + TEST_ORG_NPI, + TEST_PROVIDER_NPI, + Collections.singletonList(mbi), + Collections.singletonList(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false + ); + List jobs = queue.getJobBatches(jobID); + + Mockito.when(consentService.getConsent(List.of(mbi))).thenReturn(Optional.of(List.of(optIn))); + + List results = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + jobs.get(0), + mbi + ); + + assertEquals(1, results.size()); + JobQueueBatchFile completedJob = results.get(0); + + assertNoError(completedJob.getBatchID(), DPCResourceType.Patient); } @Test - public void testGetMBIs_manyPatients() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - Identifier mbi1 = new Identifier(); - mbi1.setSystem(DPCIdentifierSystem.MBI.getSystem()); - mbi1.setValue("4S41C00AA00"); + public void testHappyPath_lookBackExempt() { + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); + + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + TEST_ORG_NPI, + TEST_PROVIDER_NPI, + Collections.singletonList(mbi), + Collections.singletonList(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false + ); + List jobs = queue.getJobBatches(jobID); + JobQueueBatch job = jobs.get(0); + + // Create a config with our org look back exempt + OperationsConfig operationsConfig = new OperationsConfig( + 1000, + config.getString("exportPath"), + 1, + 500, + 120, + YearMonth.of(2014, 3), + List.of(job.getOrgID().toString()) + ); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); + + Mockito.when(consentService.getConsent(List.of(mbi))).thenReturn(Optional.of(List.of(optIn))); + + List results = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + job, + mbi + ); + + assertEquals(1, results.size()); + JobQueueBatchFile completedJob = results.get(0); + + assertNoError(completedJob.getBatchID(), DPCResourceType.Patient); + } + + @Test + public void testHappyPath_MoreThanOnePatientInJob() { + List mbis = List.of( + MockBlueButtonClient.TEST_PATIENT_MBIS.get(0), + MockBlueButtonClient.TEST_PATIENT_MBIS.get(1) + ); + + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + TEST_ORG_NPI, + TEST_PROVIDER_NPI, + mbis, + List.of(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false + ); + List jobs = queue.getJobBatches(jobID); + JobQueueBatch job = jobs.get(0); + + // Create a config with our org look back exempt + OperationsConfig operationsConfig = new OperationsConfig( + 1, + config.getString("exportPath"), + 1, + 500, + 120, + YearMonth.of(2014, 3), + List.of(job.getOrgID().toString()) + ); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); + + Mockito.when(consentService.getConsent(List.of(mbis.get(0)))).thenReturn(Optional.of(List.of(optIn))); + Mockito.when(consentService.getConsent(List.of(mbis.get(1)))).thenReturn(Optional.of(List.of(optIn))); - Identifier mbi2 = new Identifier(); - mbi2.setSystem(DPCIdentifierSystem.MBI.getSystem()); - mbi2.setValue("4S41C00AA01"); + List results1 = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + job, + mbis.get(0) + ); + + List results2 = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + job, + mbis.get(1) + ); + + + assertEquals(1, results1.size()); + JobQueueBatchFile completedJob1 = results1.get(0); + assertNoError(completedJob1.getBatchID(), DPCResourceType.Patient); + + assertEquals(1, results2.size()); + JobQueueBatchFile completedJob2 = results1.get(0); + assertNoError(completedJob2.getBatchID(), DPCResourceType.Patient); + } - Identifier badMbi = new Identifier(); - badMbi.setSystem(DPCIdentifierSystem.MBI.getSystem()); - badMbi.setValue("bad_mbi"); + @Test + public void testHappyPath_NoConsent() { + // No consent records gets treated like an opt in by the JobBatchProcessor + + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); - Identifier beneId = new Identifier(); - beneId.setSystem(DPCIdentifierSystem.BENE_ID.getSystem()); - beneId.setValue("bene_id"); + OperationsConfig operationsConfig = getOperationsConfig(); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); - Patient patient = new Patient(); - patient.setIdentifier(List.of(mbi1, mbi2, beneId, badMbi)); + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + TEST_ORG_NPI, + TEST_PROVIDER_NPI, + Collections.singletonList(mbi), + Collections.singletonList(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false + ); + List jobs = queue.getJobBatches(jobID); - Method getMBIs = JobBatchProcessor.class.getDeclaredMethod("getMBIs", Flowable.class); - getMBIs.setAccessible(true); + Mockito.when(consentService.getConsent(List.of(mbi))).thenReturn(Optional.of(List.of())); - List mbis = (List) getMBIs.invoke(jobBatchProcessor, Flowable.just( - patient, - new ExplanationOfBenefit(), - new Coverage()) + List results = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + jobs.get(0), + mbi ); - assertEquals(2, mbis.size()); - assertTrue(mbis.contains(mbi1.getValue())); - assertTrue(mbis.contains(mbi2.getValue())); + assertEquals(1, results.size()); + JobQueueBatchFile completedJob = results.get(0); + + assertNoError(completedJob.getBatchID(), DPCResourceType.Patient); } @Test - public void testGetMBIs_noResources() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - Method getMBIs = JobBatchProcessor.class.getDeclaredMethod("getMBIs", Flowable.class); - getMBIs.setAccessible(true); + public void testError_LoadingPatientByMbi() throws GeneralSecurityException { + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); - List mbis = (List) getMBIs.invoke(jobBatchProcessor, Flowable.empty()); + OperationsConfig operationsConfig = getOperationsConfig(); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); + + Mockito.doThrow(new IllegalStateException("bad mbi test")).when(bbClient).requestPatientFromServerByMbi(eq(mbi), any()); + + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + TEST_ORG_NPI, + TEST_PROVIDER_NPI, + Collections.singletonList(mbi), + Collections.singletonList(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false + ); + List jobs = queue.getJobBatches(jobID); - assertEquals(0, mbis.size()); + List results = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + jobs.get(0), + mbi + ); + + assertEquals(1, results.size()); + JobQueueBatchFile completedJob = results.get(0); + + assertError(completedJob.getBatchID(), DPCResourceType.Patient); } @Test - public void testGetMBIs_noPatients() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - Method getMBIs = JobBatchProcessor.class.getDeclaredMethod("getMBIs", Flowable.class); - getMBIs.setAccessible(true); + public void testError_MultiplePatientsForMbi() throws GeneralSecurityException { + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); + + OperationsConfig operationsConfig = getOperationsConfig(); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); - List mbis = (List) getMBIs.invoke(jobBatchProcessor, Flowable.just( - new ExplanationOfBenefit(), - new Coverage(), - new ExplanationOfBenefit()) + // Bundle with multiple patients + Bundle bundle = new Bundle(); + bundle.setTotal(2); + + Mockito.doReturn(bundle).when(bbClient).requestPatientFromServerByMbi(eq(mbi), any()); + + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + TEST_ORG_NPI, + TEST_PROVIDER_NPI, + Collections.singletonList(mbi), + Collections.singletonList(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false ); + List jobs = queue.getJobBatches(jobID); + + List results = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + jobs.get(0), + mbi + ); + + assertEquals(1, results.size()); + JobQueueBatchFile completedJob = results.get(0); + + assertError(completedJob.getBatchID(), DPCResourceType.Patient); + } + + @Test + public void testError_ConsentServiceException() { + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); + + OperationsConfig operationsConfig = getOperationsConfig(); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); + + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + TEST_ORG_NPI, + TEST_PROVIDER_NPI, + Collections.singletonList(mbi), + Collections.singletonList(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false + ); + List jobs = queue.getJobBatches(jobID); + + Mockito.when(consentService.getConsent(List.of(mbi))).thenThrow(new IllegalStateException("consent error")); + + List results = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + jobs.get(0), + mbi + ); + + assertEquals(1, results.size()); + JobQueueBatchFile completedJob = results.get(0); + + assertError(completedJob.getBatchID(), DPCResourceType.Patient); + } + + @Test + public void testPatientOptOut() { + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); + + OperationsConfig operationsConfig = getOperationsConfig(); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); + + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + TEST_ORG_NPI, + TEST_PROVIDER_NPI, + Collections.singletonList(mbi), + Collections.singletonList(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false + ); + List jobs = queue.getJobBatches(jobID); + + Mockito.when(consentService.getConsent(List.of(mbi))).thenReturn(Optional.of(List.of(optOut))); + + List results = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + jobs.get(0), + mbi + ); + + assertEquals(1, results.size()); + JobQueueBatchFile completedJob = results.get(0); + + assertError(completedJob.getBatchID(), DPCResourceType.Patient); + } + + @Test + public void testFailsLookBackCheck() { + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); + String id = MockBlueButtonClient.MBI_BENE_ID_MAP.get(mbi); + + Bundle emptyBundle = new Bundle(); + Mockito.doReturn(emptyBundle).when(bbClient).requestEOBFromServer(eq(id), any(), any()); + + OperationsConfig operationsConfig = getOperationsConfig(); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); + + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + TEST_ORG_NPI, + TEST_PROVIDER_NPI, + Collections.singletonList(mbi), + Collections.singletonList(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false + ); + List jobs = queue.getJobBatches(jobID); + + Mockito.when(consentService.getConsent(List.of(mbi))).thenReturn(Optional.of(List.of(optIn))); + + List results = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + jobs.get(0), + mbi + ); + + assertEquals(1, results.size()); + JobQueueBatchFile completedJob = results.get(0); + + assertError(completedJob.getBatchID(), DPCResourceType.Patient); + } + + @Test + public void testError_NoPractitionerAndOrgLookBack() { + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); + + OperationsConfig operationsConfig = getOperationsConfig(); + JobBatchProcessor jobBatchProcessor = getJobBatchProcessor(bbClient, operationsConfig, new EveryoneGetsDataLookBackServiceImpl(), consentService); + + IJobQueue queue = new MemoryBatchQueue(); + final var jobID = queue.createJob( + UUID.randomUUID(), + null, + null, + Collections.singletonList(mbi), + Collections.singletonList(DPCResourceType.Patient), + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME, + null, null, true, false + ); + List jobs = queue.getJobBatches(jobID); + + Mockito.when(consentService.getConsent(List.of(mbi))).thenReturn(Optional.of(List.of(optIn))); + + List results = jobBatchProcessor.processJobBatchPartial( + UUID.randomUUID(), + queue, + jobs.get(0), + mbi + ); + + assertEquals(1, results.size()); + JobQueueBatchFile completedJob = results.get(0); + + assertError(completedJob.getBatchID(), DPCResourceType.Patient); + } + + private JobBatchProcessor getJobBatchProcessor(BlueButtonClient bbClient, OperationsConfig config, LookBackService lookBackSrvc, ConsentService consentSrvc) { + return new JobBatchProcessor( + bbClient, + FhirContext.forDstu3(), + metricRegistry, + config, + lookBackSrvc, + consentSrvc + ); + } + + // Creates a generic config + private OperationsConfig getOperationsConfig() { + return new OperationsConfig( + 1000, + config.getString("exportPath"), + 500, + YearMonth.of(2014, 3) + ); + } + + private void assertError(UUID batchId, DPCResourceType resourceType) { + final var outputFilePath = ResourceWriter.formOutputFilePath(config.getString("exportPath"), batchId, resourceType, 0); + final var errorFilePath = ResourceWriter.formOutputFilePath(config.getString("exportPath"), batchId, DPCResourceType.OperationOutcome, 0); + + // Error file exists, but not output file + assertFalse(Files.exists(Path.of(outputFilePath))); + assertTrue(Files.exists(Path.of(errorFilePath))); + } + + private void assertNoError(UUID batchId, DPCResourceType resourceType) { + final var outputFilePath = ResourceWriter.formOutputFilePath(config.getString("exportPath"), batchId, resourceType, 0); + final var errorFilePath = ResourceWriter.formOutputFilePath(config.getString("exportPath"), batchId, DPCResourceType.OperationOutcome, 0); - assertEquals(0, mbis.size()); + // Output file exists, but no error file + assertTrue(Files.exists(Path.of(outputFilePath))); + assertFalse(Files.exists(Path.of(errorFilePath))); } } diff --git a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/ResourceFetcherUnitTest.java b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/ResourceFetcherUnitTest.java new file mode 100644 index 0000000000..bea936300a --- /dev/null +++ b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/engine/ResourceFetcherUnitTest.java @@ -0,0 +1,241 @@ +package gov.cms.dpc.aggregation.engine; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.rest.client.exceptions.FhirClientConnectionException; +import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; +import gov.cms.dpc.bluebutton.client.MockBlueButtonClient; +import gov.cms.dpc.fhir.DPCResourceType; +import gov.cms.dpc.queue.exceptions.JobQueueFailure; +import io.reactivex.Flowable; +import org.hl7.fhir.dstu3.model.*; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.time.OffsetDateTime; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; + +class ResourceFetcherUnitTest { + private static Patient testPatient; + private static String testPatientMbi; + private static String testPatientFhirId; + + @BeforeAll + public static void setup() { + MockBlueButtonClient bbClient = new MockBlueButtonClient(FhirContext.forDstu3()); + + // Use an existing test patient from the MockBlueButtonClient + testPatientMbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(0); + + Bundle bundle = bbClient.requestPatientFromServerByMbi(testPatientMbi, Map.of()); + testPatient = (Patient) bundle.getEntry().get(0).getResource(); + + testPatientFhirId = testPatient.getIdElement().getIdPart(); + } + + @Test + public void testHappyPath_Patient() { + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.Patient, + MockBlueButtonClient.TEST_LAST_UPDATED.minus(1, ChronoUnit.DAYS), + MockBlueButtonClient.BFD_TRANSACTION_TIME + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + List resources = results.flatMap(Flowable::fromIterable).toList().blockingGet(); + + assertEquals(1, resources.size()); + assertEquals(testPatient.getId(), resources.get(0).getId()); + } + + @Test + public void testHappyPath_Eob() { + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.ExplanationOfBenefit, + MockBlueButtonClient.TEST_LAST_UPDATED.minus(1, ChronoUnit.DAYS), + MockBlueButtonClient.BFD_TRANSACTION_TIME + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + List resources = results.flatMap(Flowable::fromIterable).toList().blockingGet(); + + assertEquals(32, resources.size()); + assertTrue(resources.get(0).getId().contains("carrier-20587716665")); + } + + @Test + public void testHappyPath_coverage() { + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.Coverage, + MockBlueButtonClient.TEST_LAST_UPDATED.minus(1, ChronoUnit.DAYS), + MockBlueButtonClient.BFD_TRANSACTION_TIME + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + List resources = results.flatMap(Flowable::fromIterable).toList().blockingGet(); + + assertEquals(3, resources.size()); + assertTrue(resources.get(0).getId().contains("part-a-20140000008325")); + } + + @Test + public void testHappyPath_NoSince() { + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.Patient, + null, + MockBlueButtonClient.BFD_TRANSACTION_TIME + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + List resources = results.flatMap(Flowable::fromIterable).toList().blockingGet(); + + assertEquals(1, resources.size()); + assertEquals(testPatient.getId(), resources.get(0).getId()); + } + + @Test + public void testBadTransactionTime() { + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.Patient, + MockBlueButtonClient.TEST_LAST_UPDATED.minus(1, ChronoUnit.DAYS), + MockBlueButtonClient.BFD_TRANSACTION_TIME.plus(1, ChronoUnit.DAYS) + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + JobQueueFailure exception = assertThrows(JobQueueFailure.class, () -> results.flatMap(Flowable::fromIterable).toList().blockingGet()); + + assertEquals("BFD's transaction time regression", exception.getMessage()); + } + + @Test + public void testResourceNotFound() { + MockBlueButtonClient bbClient = Mockito.spy(new MockBlueButtonClient(FhirContext.forDstu3())); + + Mockito.doThrow(new ResourceNotFoundException("test exception")) + .when(bbClient).requestPatientFromServer(eq(testPatientFhirId), any(), any()); + + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.Patient, + MockBlueButtonClient.TEST_LAST_UPDATED.minus(1, ChronoUnit.DAYS), + MockBlueButtonClient.BFD_TRANSACTION_TIME, + bbClient + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + List errors = results.flatMap(Flowable::fromIterable).toList().blockingGet(); + + assertEquals(1, errors.size()); + OperationOutcome outcome = (OperationOutcome) errors.get(0); + assertEquals(String.format("Patient resource not found in Blue Button for id: %s", testPatientMbi), outcome.getIssueFirstRep().getDetails().getText()); + } + + @Test + public void testBaseServerErrorFetchingResources() { + MockBlueButtonClient bbClient = Mockito.spy(new MockBlueButtonClient(FhirContext.forDstu3())); + + Mockito.doThrow(new FhirClientConnectionException("fhir client exception")) + .when(bbClient).requestPatientFromServer(eq(testPatientFhirId), any(), any()); + + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.Patient, + MockBlueButtonClient.TEST_LAST_UPDATED.minus(1, ChronoUnit.DAYS), + MockBlueButtonClient.BFD_TRANSACTION_TIME, + bbClient + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + List errors = results.flatMap(Flowable::fromIterable).toList().blockingGet(); + + assertEquals(1, errors.size()); + OperationOutcome outcome = (OperationOutcome) errors.get(0); + assertEquals("Blue Button error fetching Patient resource. HTTP return code: 0", outcome.getIssueFirstRep().getDetails().getText()); + } + + @Test + public void testUnknownErrorFetchingResources() { + MockBlueButtonClient bbClient = Mockito.spy(new MockBlueButtonClient(FhirContext.forDstu3())); + + String exceptionMsg = "Unknown Exception"; + Mockito.doThrow(new IllegalStateException(exceptionMsg)) + .when(bbClient).requestPatientFromServer(eq(testPatientFhirId), any(), any()); + + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.Patient, + MockBlueButtonClient.TEST_LAST_UPDATED.minus(1, ChronoUnit.DAYS), + MockBlueButtonClient.BFD_TRANSACTION_TIME, + bbClient + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + List errors = results.flatMap(Flowable::fromIterable).toList().blockingGet(); + + assertEquals(1, errors.size()); + OperationOutcome outcome = (OperationOutcome) errors.get(0); + assertEquals(String.format("Internal error: %s", exceptionMsg), outcome.getIssueFirstRep().getDetails().getText()); + } + + @Test + public void testWrongResourceTypeReturned() { + MockBlueButtonClient bbClient = Mockito.spy(new MockBlueButtonClient(FhirContext.forDstu3())); + + // Build EoB bundle + ExplanationOfBenefit eob = new ExplanationOfBenefit(); + Bundle.BundleEntryComponent component = new Bundle.BundleEntryComponent(); + Bundle bundle = new Bundle(); + component.setResource(eob); + bundle.addEntry(component); + + Mockito.doReturn(bundle).when(bbClient).requestPatientFromServer(eq(testPatientFhirId), any(), any()); + + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.Patient, + MockBlueButtonClient.TEST_LAST_UPDATED.minus(1, ChronoUnit.DAYS), + MockBlueButtonClient.BFD_TRANSACTION_TIME, + bbClient + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + List errors = results.flatMap(Flowable::fromIterable).toList().blockingGet(); + + assertEquals(1, errors.size()); + OperationOutcome outcome = (OperationOutcome) errors.get(0); + assertEquals("Internal error: Unexpected resource type: got ExplanationOfBenefit expected: Patient", outcome.getIssueFirstRep().getDetails().getText()); + } + + @Test + public void testSearchForUnsupportedResourceType() { + ResourceFetcher fetcher = getResourceFetcher( + DPCResourceType.Practitioner, + MockBlueButtonClient.TEST_LAST_UPDATED.minus(1, ChronoUnit.DAYS), + MockBlueButtonClient.BFD_TRANSACTION_TIME + ); + + Flowable> results = fetcher.fetchResources(testPatient, Map.of()); + + JobQueueFailure exception = assertThrows(JobQueueFailure.class, () -> results.flatMap(Flowable::fromIterable).toList().blockingGet()); + + assertTrue(exception.getMessage().contains("Unexpected resource type: Practitioner")); + } + + private ResourceFetcher getResourceFetcher(DPCResourceType resourceType, OffsetDateTime since, OffsetDateTime transactionTime) { + return getResourceFetcher(resourceType, since, transactionTime, new MockBlueButtonClient(FhirContext.forDstu3())); + } + + private ResourceFetcher getResourceFetcher(DPCResourceType resourceType, OffsetDateTime since, OffsetDateTime transactionTime, MockBlueButtonClient bbClient) { + return new ResourceFetcher( + bbClient, + UUID.randomUUID(), + UUID.randomUUID(), + resourceType, + since, + transactionTime + ); + } +} \ No newline at end of file diff --git a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/health/AggregationEngineHealthCheckTest.java b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/health/AggregationEngineHealthCheckTest.java index 8db2e4e310..1fa115f235 100644 --- a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/health/AggregationEngineHealthCheckTest.java +++ b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/health/AggregationEngineHealthCheckTest.java @@ -6,7 +6,6 @@ import com.typesafe.config.ConfigFactory; import gov.cms.dpc.aggregation.engine.AggregationEngine; import gov.cms.dpc.aggregation.engine.JobBatchProcessor; -import gov.cms.dpc.aggregation.engine.JobBatchProcessorV2; import gov.cms.dpc.aggregation.engine.OperationsConfig; import gov.cms.dpc.aggregation.service.ConsentService; import gov.cms.dpc.aggregation.service.LookBackServiceImpl; @@ -76,8 +75,7 @@ void setupEach() { var operationalConfig = new OperationsConfig(1000, exportPath, 500, YearMonth.of(2015, 3)); LookBackServiceImpl lookBackService = Mockito.spy(new LookBackServiceImpl(operationalConfig)); JobBatchProcessor jobBatchProcessor = Mockito.spy(new JobBatchProcessor(bbclient, fhirContext, metricRegistry, operationalConfig, lookBackService, consentService)); - JobBatchProcessorV2 jobBatchProcessorV2 = Mockito.spy(new JobBatchProcessorV2(bbclientV2, fhirContextR4, metricRegistry, operationalConfig, consentService)); - engine = Mockito.spy(new AggregationEngine(aggregatorID, queue, operationalConfig, jobBatchProcessor, jobBatchProcessorV2)); + engine = Mockito.spy(new AggregationEngine(aggregatorID, queue, operationalConfig, jobBatchProcessor)); AggregationEngine.setGlobalErrorHandler(); } diff --git a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/service/ConsentServiceImplUnitTest.java b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/service/ConsentServiceImplUnitTest.java index b9444d6678..7f75f61e88 100644 --- a/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/service/ConsentServiceImplUnitTest.java +++ b/dpc-aggregation/src/test/java/gov/cms/dpc/aggregation/service/ConsentServiceImplUnitTest.java @@ -18,7 +18,8 @@ import java.util.Optional; import java.util.UUID; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; public class ConsentServiceImplUnitTest { @@ -36,7 +37,7 @@ public void setUp() { @Test @SuppressWarnings("unchecked") - public void getConsent() { + public void getSingleConsent() { final String testMbi = "0OO0OO0OO00"; Bundle bundle = new Bundle(); @@ -46,20 +47,54 @@ public void getConsent() { Mockito.when(queryExec.returnBundle(any(Class.class)).where(any(ICriterion.class))).thenReturn(mockQuery); Mockito.when(mockQuery.execute()).thenReturn(bundle); - Optional> results = consentService.getConsent(testMbi); - assertTrue(results.isPresent(), "Expected optional to have a value."); - assertEquals(0, results.get().size(), "Expected consent results to be an empty list"); - bundle.addEntry(new Bundle.BundleEntryComponent().setResource(createTestConsent(testMbi))); bundle.addEntry(new Bundle.BundleEntryComponent().setResource(createTestConsent(testMbi))); - - - results = consentService.getConsent(testMbi); + Optional> results = consentService.getConsent(testMbi); assertTrue(results.isPresent(), "Expected optional to have a value"); assertEquals(2, results.get().size(), "Expected 2 consent results"); } + @Test + public void getMultipleConsent() { + String mbi1 = "0OO0OO0OO00"; + String mbi2 = "0OO0OO0OO01"; + + Bundle returnBundle = new Bundle(); + returnBundle.addEntry(new Bundle.BundleEntryComponent().setResource(createTestConsent(mbi1))); + returnBundle.addEntry(new Bundle.BundleEntryComponent().setResource(createTestConsent(mbi1))); + returnBundle.addEntry(new Bundle.BundleEntryComponent().setResource(createTestConsent(mbi2))); + + IQuery queryExec = Mockito.mock(IQuery.class, Answers.RETURNS_DEEP_STUBS); + IQuery mockQuery = Mockito.mock(IQuery.class); + + Mockito.when(mockConsentClient.search().forResource(Consent.class).encodedJson()).thenReturn(queryExec); + Mockito.when(queryExec.returnBundle(Bundle.class).where(any(ICriterion.class))).thenReturn(mockQuery); + Mockito.when(mockQuery.execute()).thenReturn(returnBundle); + + Optional> optionalResults = consentService.getConsent(List.of(mbi1, mbi2)); + + assertTrue(optionalResults.isPresent()); + + List results = optionalResults.get(); + assertEquals(3, results.size()); + } + + @Test + public void testNoConsent() { + final String testMbi = "0OO0OO0OO00"; + + Bundle bundle = new Bundle(); + IQuery queryExec = Mockito.mock(IQuery.class, Answers.RETURNS_DEEP_STUBS); + Mockito.when(mockConsentClient.search().forResource(Consent.class).encodedJson()).thenReturn(queryExec); + IQuery mockQuery = Mockito.mock(IQuery.class); + Mockito.when(queryExec.returnBundle(any(Class.class)).where(any(ICriterion.class))).thenReturn(mockQuery); + Mockito.when(mockQuery.execute()).thenReturn(bundle); + + Optional> results = consentService.getConsent(testMbi); + assertTrue(results.isPresent(), "Expected optional to have a value."); + assertEquals(0, results.get().size(), "Expected consent results to be an empty list"); + } private Consent createTestConsent(String mbi){ Consent consent = new Consent(); @@ -82,6 +117,8 @@ private Consent createTestConsent(String mbi){ String policyUrl = "http://hl7.org/fhir/ConsentPolicy/opt-out"; consent.setPolicyRule(policyUrl); + + consent.setId(UUID.randomUUID().toString()); return consent; } } \ No newline at end of file diff --git a/dpc-api/pom.xml b/dpc-api/pom.xml index 1e727c207f..144aa372cc 100644 --- a/dpc-api/pom.xml +++ b/dpc-api/pom.xml @@ -223,6 +223,12 @@ + + gov.cms.dpc + dpc-aggregation + 0.4.0-SNAPSHOT + test + diff --git a/dpc-api/src/test/java/gov/cms/dpc/api/resources/v1/PatientResourceTest.java b/dpc-api/src/test/java/gov/cms/dpc/api/resources/v1/PatientResourceTest.java index 91b06d3539..9287af407d 100644 --- a/dpc-api/src/test/java/gov/cms/dpc/api/resources/v1/PatientResourceTest.java +++ b/dpc-api/src/test/java/gov/cms/dpc/api/resources/v1/PatientResourceTest.java @@ -11,6 +11,7 @@ import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; +import gov.cms.dpc.aggregation.service.ConsentResult; import gov.cms.dpc.api.APITestHelpers; import gov.cms.dpc.api.AbstractSecureApplicationTest; import gov.cms.dpc.api.TestOrganizationContext; @@ -22,6 +23,7 @@ import gov.cms.dpc.fhir.helpers.FHIRHelpers; import gov.cms.dpc.testing.APIAuthHelpers; import gov.cms.dpc.testing.factories.FHIRPractitionerBuilder; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.http.HttpHeaders; import org.eclipse.jetty.http.HttpStatus; @@ -45,6 +47,7 @@ import java.time.OffsetDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; @@ -53,8 +56,14 @@ import static gov.cms.dpc.api.APITestHelpers.ORGANIZATION_NPI; import static org.junit.jupiter.api.Assertions.*; +/* + If you're running this locally, you'll need to wipe out the rows in the consent table in between runs. If not, the + opt outs from the previous run will interfere with the current one. + */ @TestMethodOrder(MethodOrderer.OrderAnnotation.class) class PatientResourceTest extends AbstractSecureApplicationTest { + final java.util.Date dateYesterday = Date.from(Instant.now().minus(1, ChronoUnit.DAYS)); + final java.util.Date dateToday = Date.from(Instant.now()); public static final String PROVENANCE_FMT = "{ \"resourceType\": \"Provenance\", \"recorded\": \"" + DateTimeType.now().getValueAsString() + "\"," + " \"reason\": [ { \"system\": \"http://hl7.org/fhir/v3/ActReason\", \"code\": \"TREAT\" } ], \"agent\": [ { \"role\": " + @@ -267,7 +276,7 @@ void testCreateInvalidPatient() throws IOException, URISyntaxException { @Test @Order(6) void testPatientEverythingWithoutGroupFetchesData() throws IOException, URISyntaxException, GeneralSecurityException { - IGenericClient client = generateClient(ORGANIZATION_NPI, "patient-everything-key"); + IGenericClient client = generateClient(ORGANIZATION_NPI, RandomStringUtils.randomAlphabetic(25)); APITestHelpers.setupPractitionerTest(client, parser); String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(2); @@ -342,8 +351,7 @@ void testPatientEverythingWithoutGroupFetchesData() throws IOException, URISynta @Test @Order(7) void testPatientEverythingWithGroupFetchesData() throws IOException, URISyntaxException, GeneralSecurityException { - IGenericClient client = generateClient(ORGANIZATION_NPI, "patient-everything-key-2"); - APITestHelpers.setupPractitionerTest(client, parser); + IGenericClient client = generateClient(ORGANIZATION_NPI, RandomStringUtils.randomAlphabetic(25)); String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(2); Patient patient = fetchPatient(client, mbi); @@ -428,16 +436,48 @@ void testPatientEverythingWithGroupFetchesData() throws IOException, URISyntaxEx @Test @Order(8) + void testPatientEverything_CanHandlePatientWithMultipleMBIs() throws IOException, URISyntaxException, GeneralSecurityException { + IGenericClient client = generateClient(ORGANIZATION_NPI, RandomStringUtils.randomAlphabetic(25)); + + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(6); + Patient patient = fetchPatient(client, mbi); + Practitioner practitioner = fetchPractitionerByNPI(client); + final String patientId = FHIRExtractors.getEntityUUID(patient.getId()).toString(); + + Bundle everythingBundle = client + .operation() + .onInstance(new IdType("Patient", patientId)) + .named("$everything") + .withNoParameters(Parameters.class) + .returnResourceType(Bundle.class) + .useHttpGet() + .withAdditionalHeader("X-Provenance", generateProvenance(ORGANIZATION_ID, practitioner.getId())) + .execute(); + + Patient patientResource = (Patient) everythingBundle.getEntry().stream() + .filter(entry -> entry.getResource().getResourceType().getPath() == "patient") + .findFirst().get().getResource(); + + // Patient should have multiple MBIs + List mbis = FHIRExtractors.getPatientMBIs(patientResource); + assertEquals(2, mbis.size()); + assertTrue(mbis.containsAll(List.of("9V99EU8XY91", "1S00EU8FE91"))); + + // Current MBI + assertEquals("9V99EU8XY91", FHIRExtractors.getPatientMBI(patientResource)); + } + + @Test + @Order(9) void testPatientEverythingForOptedOutPatient() throws IOException, URISyntaxException, GeneralSecurityException { - IGenericClient client = generateClient(ORGANIZATION_NPI, "patient-everything-key-3"); - APITestHelpers.setupPractitionerTest(client, parser); + IGenericClient client = generateClient(ORGANIZATION_NPI, RandomStringUtils.randomAlphabetic(25)); - String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(3); + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(2); Patient patient = fetchPatient(client, mbi); Practitioner practitioner = fetchPractitionerByNPI(client); final String patientId = FHIRExtractors.getEntityUUID(patient.getId()).toString(); - optOutPatient(mbi); + optOutPatient(mbi, dateYesterday); IOperationUntypedWithInput getEverythingOperation = client .operation() @@ -448,23 +488,49 @@ void testPatientEverythingForOptedOutPatient() throws IOException, URISyntaxExce .useHttpGet() .withAdditionalHeader("X-Provenance", generateProvenance(ORGANIZATION_ID, practitioner.getId())); - InternalErrorException exception = assertThrows(InternalErrorException.class, getEverythingOperation::execute, "Expected Internal server error when retrieving opted out patient."); assertTrue(exception.getResponseBody().contains("\"text\":\"Data not available for opted out patient\""), "Incorrect or missing operation outcome in response body."); } @Test - @Order(9) - void testPatientEverything_CanHandlePatientWithMultipleMBIs() throws IOException, URISyntaxException, GeneralSecurityException { - IGenericClient client = generateClient(ORGANIZATION_NPI, "patient-everything-key-4"); - APITestHelpers.setupPractitionerTest(client, parser); + @Order(10) + void testPatientEverythingForOptedOutPatientOnMultipleMbis() throws IOException, URISyntaxException, GeneralSecurityException { + IGenericClient client = generateClient(ORGANIZATION_NPI, RandomStringUtils.randomAlphabetic(25)); String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(6); + String historicMbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(7); + Patient patient = fetchPatient(client, mbi); Practitioner practitioner = fetchPractitionerByNPI(client); final String patientId = FHIRExtractors.getEntityUUID(patient.getId()).toString(); - Bundle everythingBundle = client + optOutPatient(historicMbi, dateYesterday); + + IOperationUntypedWithInput getEverythingOperation = client + .operation() + .onInstance(new IdType("Patient", patientId)) + .named("$everything") + .withNoParameters(Parameters.class) + .returnResourceType(Bundle.class) + .useHttpGet() + .withAdditionalHeader("X-Provenance", generateProvenance(ORGANIZATION_ID, practitioner.getId())); + + InternalErrorException exception = assertThrows(InternalErrorException.class, getEverythingOperation::execute, "Expected Internal server error when retrieving opted out patient."); + assertTrue(exception.getResponseBody().contains("\"text\":\"Data not available for opted out patient\""), "Incorrect or missing operation outcome in response body."); + } + + @Test + public void testOptInPatient() throws GeneralSecurityException, IOException, URISyntaxException { + IGenericClient client = generateClient(ORGANIZATION_NPI, RandomStringUtils.randomAlphabetic(25)); + + String mbi = MockBlueButtonClient.TEST_PATIENT_MBIS.get(2); + Patient patient = fetchPatient(client, mbi); + Practitioner practitioner = fetchPractitionerByNPI(client); + final String patientId = FHIRExtractors.getEntityUUID(patient.getId()).toString(); + + optInPatient(mbi, dateToday); + + Bundle bundle = client .operation() .onInstance(new IdType("Patient", patientId)) .named("$everything") @@ -474,17 +540,13 @@ void testPatientEverything_CanHandlePatientWithMultipleMBIs() throws IOException .withAdditionalHeader("X-Provenance", generateProvenance(ORGANIZATION_ID, practitioner.getId())) .execute(); - Patient patientResource = (Patient) everythingBundle.getEntry().stream() + Patient patientResource = (Patient) bundle.getEntry().stream() .filter(entry -> entry.getResource().getResourceType().getPath() == "patient") .findFirst().get().getResource(); // Patient should have multiple MBIs - List mbis = FHIRExtractors.getPatientMBIs(patientResource); - assertEquals(2, mbis.size()); - assertTrue(mbis.containsAll(List.of("9V99EU8XY91", "1S00EU8FE91"))); - - // Current MBI - assertEquals("9V99EU8XY91", FHIRExtractors.getPatientMBI(patientResource)); + String resultMbi = FHIRExtractors.getPatientMBI(patientResource); + assertEquals(MockBlueButtonClient.TEST_PATIENT_MBIS.get(2), resultMbi ); } @Test @@ -601,7 +663,15 @@ private Practitioner fetchPractitionerByNPI(IGenericClient client) { return (Practitioner) practSearch.getEntry().get(0).getResource(); } - private void optOutPatient(String mbi){ + private void optOutPatient(String mbi, java.util.Date date){ + sendConsent(mbi, ConsentResult.PolicyType.OPT_OUT.getValue(), date); + } + + private void optInPatient(String mbi, java.util.Date date){ + sendConsent(mbi, ConsentResult.PolicyType.OPT_IN.getValue(), date); + } + + private void sendConsent(String mbi, String policyUrl, java.util.Date date) { Consent consent = new Consent(); consent.setStatus(Consent.ConsentState.ACTIVE); @@ -613,20 +683,17 @@ private void optOutPatient(String mbi){ String patientRefPath = "/Patient?identity=|"+mbi; consent.setPatient(new Reference("http://api.url" + patientRefPath)); - java.util.Date date = java.util.Date.from(Instant.now()); consent.setDateTime(date); Reference orgRef = new Reference("Organization/" + UUID.randomUUID().toString()); consent.setOrganization(List.of(orgRef)); - String policyUrl = "http://hl7.org/fhir/ConsentPolicy/opt-out"; consent.setPolicyRule(policyUrl); - consentClient + consentClient .create() .resource(consent) .encodedJson() .execute(); } - } diff --git a/dpc-bluebutton/src/main/java/gov/cms/dpc/bluebutton/client/BlueButtonClientImpl.java b/dpc-bluebutton/src/main/java/gov/cms/dpc/bluebutton/client/BlueButtonClientImpl.java index bb43364b57..439a2fcab4 100644 --- a/dpc-bluebutton/src/main/java/gov/cms/dpc/bluebutton/client/BlueButtonClientImpl.java +++ b/dpc-bluebutton/src/main/java/gov/cms/dpc/bluebutton/client/BlueButtonClientImpl.java @@ -72,17 +72,17 @@ public BlueButtonClientImpl(@Named("bbclient") IGenericClient client, BBClientCo /** * Queries Blue Button server for patient data * - * @param beneId The requested patient's ID + * @param patientId The {@link Patient} resource's id * @param headers * @return {@link Patient} A FHIR Patient resource * @throws ResourceNotFoundException when no such patient with the provided ID exists */ @Override - public Bundle requestPatientFromServer(String beneId, DateRangeParam lastUpdated, Map headers) throws ResourceNotFoundException { - logger.debug("Attempting to fetch patient ID {} from baseURL: {}", beneId, client.getServerBase()); - ICriterion criterion = new ReferenceClientParam(Patient.SP_RES_ID).hasId(beneId); - return instrumentCall(REQUEST_EOB_METRIC, () -> - fetchBundle(Patient.class, Collections.singletonList(criterion), beneId, lastUpdated, headers)); + public Bundle requestPatientFromServer(String patientId, DateRangeParam lastUpdated, Map headers) throws ResourceNotFoundException { + logger.debug("Attempting to fetch patient ID {} from baseURL: {}", patientId, client.getServerBase()); + ICriterion criterion = new ReferenceClientParam(Patient.SP_RES_ID).hasId(patientId); + return instrumentCall(REQUEST_PATIENT_METRIC, () -> + fetchBundle(Patient.class, Collections.singletonList(criterion), patientId, lastUpdated, headers)); } /** @@ -134,23 +134,23 @@ public Bundle requestPatientFromServerByMbiHash(String mbiHash, Map headers) { - logger.debug("Attempting to fetch EOBs for patient ID {} from baseURL: {}", beneId, client.getServerBase()); + public Bundle requestEOBFromServer(String patientId, DateRangeParam lastUpdated, Map headers) { + logger.debug("Attempting to fetch EOBs for patient ID {} from baseURL: {}", patientId, client.getServerBase()); List> criteria = new ArrayList>(); - criteria.add(ExplanationOfBenefit.PATIENT.hasId(beneId)); + criteria.add(ExplanationOfBenefit.PATIENT.hasId(patientId)); criteria.add(new TokenClientParam("excludeSAMHSA").exactly().code("true")); return instrumentCall(REQUEST_EOB_METRIC, () -> fetchBundle(ExplanationOfBenefit.class, criteria, - beneId, + patientId, lastUpdated, headers)); } @@ -168,20 +168,20 @@ public Bundle requestEOBFromServer(String beneId, DateRangeParam lastUpdated, Ma * returns the Bundle it received from BlueButton to the caller, and the caller is responsible for handling Bundles * that contain no coverage records. * - * @param beneId The requested patient's ID + * @param patientId The requested {@link Patient} resource's ID * @param headers * @return {@link Bundle} Containing a number (possibly 0) of {@link ExplanationOfBenefit} objects * @throws ResourceNotFoundException when the requested patient does not exist */ @Override - public Bundle requestCoverageFromServer(String beneId, DateRangeParam lastUpdated, Map headers) throws ResourceNotFoundException { - logger.debug("Attempting to fetch Coverage for patient ID {} from baseURL: {}", beneId, client.getServerBase()); + public Bundle requestCoverageFromServer(String patientId, DateRangeParam lastUpdated, Map headers) throws ResourceNotFoundException { + logger.debug("Attempting to fetch Coverage for patient ID {} from baseURL: {}", patientId, client.getServerBase()); List> criteria = new ArrayList>(); - criteria.add(Coverage.BENEFICIARY.hasId(formBeneficiaryID(beneId))); + criteria.add(Coverage.BENEFICIARY.hasId(formBeneficiaryID(patientId))); return instrumentCall(REQUEST_COVERAGE_METRIC, () -> - fetchBundle(Coverage.class, criteria, beneId, lastUpdated, headers)); + fetchBundle(Coverage.class, criteria, patientId, lastUpdated, headers)); } @Override diff --git a/dpc-bluebutton/src/main/java/gov/cms/dpc/bluebutton/client/MockBlueButtonClient.java b/dpc-bluebutton/src/main/java/gov/cms/dpc/bluebutton/client/MockBlueButtonClient.java index ca17d26357..51ca0fc739 100644 --- a/dpc-bluebutton/src/main/java/gov/cms/dpc/bluebutton/client/MockBlueButtonClient.java +++ b/dpc-bluebutton/src/main/java/gov/cms/dpc/bluebutton/client/MockBlueButtonClient.java @@ -30,11 +30,11 @@ public class MockBlueButtonClient implements BlueButtonClient { private static final String SAMPLE_METADATA_PATH_PREFIX = "bb-test-data/"; private static final String SAMPLE_EMPTY_BUNDLE = "bb-test-data/empty"; - public static final String MULTIPLE_RESULTS_MBI = "9V99EU8XY91"; - + public static final List TEST_PATIENT_MULTIPLE_MBIS = List.of("9V99EU8XY91", "1S00EU8FE91"); public static final List TEST_PATIENT_MBIS = List.of( - "2SW4N00AA00", "4SP0P00AA00", "3S58A00AA00", "4S58A00AA00", "5S58A00AA00", "1SQ3F00AA00", MULTIPLE_RESULTS_MBI, "1S00EU8FE91" + "2SW4N00AA00", "4SP0P00AA00", "3S58A00AA00", "4S58A00AA00", "5S58A00AA00", "1SQ3F00AA00", TEST_PATIENT_MULTIPLE_MBIS.get(0), TEST_PATIENT_MULTIPLE_MBIS.get(1) ); + public static final Map MBI_BENE_ID_MAP = Map.of( TEST_PATIENT_MBIS.get(0), "-20140000008325", TEST_PATIENT_MBIS.get(1), "-20140000009893", diff --git a/src/main/resources/bb-test-data/patient/-20000000001809.xml b/src/main/resources/bb-test-data/patient/-20000000001809.xml index 7162724a7d..c603976062 100644 --- a/src/main/resources/bb-test-data/patient/-20000000001809.xml +++ b/src/main/resources/bb-test-data/patient/-20000000001809.xml @@ -114,6 +114,10 @@ + + + + diff --git a/src/test/EndToEndRequestTest.postman_collection.json b/src/test/EndToEndRequestTest.postman_collection.json index 7b897a21a6..315e1741cf 100644 --- a/src/test/EndToEndRequestTest.postman_collection.json +++ b/src/test/EndToEndRequestTest.postman_collection.json @@ -1,9 +1,10 @@ { "info": { - "_postman_id": "5f37b58d-b843-4110-a54d-ed39f9d52460", + "_postman_id": "fa460e5e-c6fa-4028-b873-52c3cf08e8ee", "name": "EndToEndRequestTest", "description": "This test is an example of an end to end set of API requests to submit a roster and export patient data. Each element in the 'item' list is a single request.", - "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" + "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json", + "_exporter_id": "27475832" }, "item": [ { @@ -15,7 +16,6 @@ { "listen": "test", "script": { - "id": "d8fdc0a8-8995-49fb-99ef-ede01a59a9bb", "exec": [ "// Status should be 201", "pm.test(\"Status is 201\", function () {", @@ -81,7 +81,6 @@ { "listen": "test", "script": { - "id": "babe61be-8f50-4537-b89b-ce1264e6116a", "exec": [ "pm.test(\"Status is 201 Created\", function () {", " pm.expect(pm.response.code).equals(201);", @@ -128,7 +127,6 @@ { "listen": "test", "script": { - "id": "9c0ae7d2-9bda-4fb0-941e-eb7672f29527", "exec": [ "pm.test(\"Status is OK\", function () {", " pm.response.to.be.ok;", @@ -172,7 +170,6 @@ { "listen": "test", "script": { - "id": "8da99989-edf5-454a-b652-41190250e54a", "exec": [ "pm.test(\"Status is 201\", function () {", " pm.response.to.have.status(201);", @@ -229,8 +226,7 @@ }, "response": [] } - ], - "protocolProfileBehavior": {} + ] }, { "name": "Roster tests", @@ -241,7 +237,6 @@ { "listen": "test", "script": { - "id": "af3eadcb-8d41-42f4-8c35-5085f5f36107", "exec": [ "// Status should be 200", "pm.test(\"Status is 200\", function () {", @@ -308,7 +303,6 @@ { "listen": "test", "script": { - "id": "2ba5d71e-9d01-49ad-a8da-b025559876ee", "exec": [ "// Status should be 200", "pm.test(\"Status is 200\", function () {", @@ -377,7 +371,6 @@ { "listen": "test", "script": { - "id": "dbf32e16-501c-403b-9f10-20279261ec7e", "exec": [ "// Status should be 201", "pm.test(\"Status is 201\", function () {", @@ -412,7 +405,6 @@ { "listen": "prerequest", "script": { - "id": "6bd6440e-6358-4cc0-be87-7b4efdde1229", "exec": [ "var group = {", " \"resourceType\": \"Group\",", @@ -535,7 +527,6 @@ { "listen": "test", "script": { - "id": "6e41af2f-41ae-4ab4-b9e4-f427ac77e138", "exec": [ "// Status should be 200", "pm.test(\"Status is 200\", function () {", @@ -556,11 +547,11 @@ "", " pm.globals.set(\"single_patient_id\", \"Patient/\" + response.entry[0].resource.id);", "})", - "pm.test(\"Result should have MBI as uppercase\", function() {", - " var response = pm.response.json();", - " var mbiIdentifier = response.entry[0].resource.identifier.filter(identifier => identifier.system == \"http://hl7.org/fhir/sid/us-mbi\")[0]", - " pm.expect(mbiIdentifier.value).to.equal(\"1SQ3F00AA00\");", - "})" + "pm.test(\"Result should have MBI as uppercase\", function() {", + " var response = pm.response.json();", + " var mbiIdentifier = response.entry[0].resource.identifier.filter(identifier => identifier.system == \"http://hl7.org/fhir/sid/us-mbi\")[0]", + " pm.expect(mbiIdentifier.value).to.equal(\"1SQ3F00AA00\");", + "})" ], "type": "text/javascript" } @@ -570,7 +561,7 @@ "method": "GET", "header": [], "url": { - "raw": "http://{{hostname}}:{{api_port}}/v1/Patient?identifier=1sQ3f00aa00", + "raw": "http://{{hostname}}:{{api_port}}/v1/Patient?identifier=1sQ3f00Aa00", "protocol": "http", "host": [ "{{hostname}}" @@ -597,7 +588,6 @@ { "listen": "test", "script": { - "id": "23bda94f-2589-4dcf-a8b3-044166cbaf7d", "exec": [ "// Status should be 200", "pm.test(\"Status is 200\", function () {", @@ -656,7 +646,6 @@ { "listen": "prerequest", "script": { - "id": "523fe412-b2e8-42c0-9313-cc273c0f2d78", "exec": [ "var group = JSON.parse(pm.globals.get(\"attribution_group\"));", "var patientID = pm.globals.get(\"single_patient_id\");", @@ -679,7 +668,6 @@ { "listen": "test", "script": { - "id": "6e8e1f2e-1d04-4a0f-a22d-5fd6927f418d", "exec": [ "// Status should be 200", "pm.test(\"Status is 200\", function () {", @@ -746,7 +734,6 @@ { "listen": "prerequest", "script": { - "id": "4699b19e-dca5-4d57-8a69-6b584fefd5e6", "exec": [ "// Build the attestation header", "var attestation = {", @@ -787,7 +774,6 @@ { "listen": "test", "script": { - "id": "d4682837-eefc-4626-a39f-45c7cc077e70", "exec": [ "// Status should be 400", "pm.test(\"Status is 400\", function () {", @@ -852,8 +838,7 @@ }, "response": [] } - ], - "protocolProfileBehavior": {} + ] }, { "name": "Bulk Export", @@ -864,7 +849,6 @@ { "listen": "test", "script": { - "id": "61dbf88e-b949-4874-9d7f-05bd0e3e6b8b", "exec": [ "// Status should be 202", "pm.test(\"Status is 202\", function () {", @@ -889,7 +873,6 @@ { "listen": "prerequest", "script": { - "id": "279ee4c2-d9ff-41c8-b268-ef16ca3f013f", "exec": [ "console.log(\"Group:\", pm.environment.get(\"attribution_group\"));" ], @@ -935,7 +918,6 @@ { "listen": "prerequest", "script": { - "id": "24d1aaf5-f6a5-489e-91e7-421d2f3fc168", "exec": [ "// Wait between pings", "setTimeout(function() {}, 1000);" @@ -946,7 +928,6 @@ { "listen": "test", "script": { - "id": "e4840a33-8d36-4d7a-9af4-bb562b62f451", "exec": [ "if (pm.response.code == 200) {", " // Response should have FHIR Content-Type", @@ -1048,7 +1029,6 @@ { "listen": "test", "script": { - "id": "3a2dbd63-1fc3-4880-81f0-5f218fad957e", "exec": [ "require('crypto-js')", "", @@ -1085,7 +1065,6 @@ { "listen": "prerequest", "script": { - "id": "58cd062d-719d-4a54-964a-4c971f464baf", "exec": [ "pm.environment.set(\"patient_url\", pm.environment.get(\"patient\").url)" ], @@ -1112,7 +1091,6 @@ { "listen": "test", "script": { - "id": "1ff19a39-1917-4d23-9fc0-1e80b624a267", "exec": [ "// Response should have FHIR Content-Type", "pm.test(\"Content-type is application/ndjson\", function() {", @@ -1137,7 +1115,6 @@ { "listen": "prerequest", "script": { - "id": "28686923-a654-4d56-a3a1-e4e1830eec70", "exec": [ "pm.environment.set(\"eob_url\", pm.environment.get(\"eob\").url)" ], @@ -1164,7 +1141,6 @@ { "listen": "test", "script": { - "id": "7a7b2436-1612-405a-98ba-592d6c125da6", "exec": [ "// Response should have FHIR Content-Type", "pm.test(\"Content-type is application/ndjson\", function() {", @@ -1198,7 +1174,6 @@ { "listen": "prerequest", "script": { - "id": "82a59df8-453e-4b12-9c53-e550917f7cfd", "exec": [ "pm.environment.set(\"coverage_url\", pm.environment.get(\"coverage\").url)" ], @@ -1225,7 +1200,6 @@ { "listen": "test", "script": { - "id": "25ec9c81-b59c-4524-adb0-05c5633b2033", "exec": [ "// Response should have FHIR Content-Type", "pm.test(\"Content-type is application/ndjson\", function() {", @@ -1241,7 +1215,7 @@ " pm.expect(operationOutcomeItem.resourceType).to.equal(\"OperationOutcome\");", " pm.expect(operationOutcomeItem.issue).to.have.lengthOf(1);", " var issue = operationOutcomeItem.issue[0];", - " pm.expect(issue.details.text).to.equal(\"DPC couldn't find any claims for this MBI; unable to demonstrate relationship with provider or organization\");", + " pm.expect(issue.details.text).to.equal(\"Unable to retrieve patient data due to internal error\");", " pm.expect(issue.location).to.include('0S80C00AA00');", "});", "", @@ -1261,7 +1235,6 @@ { "listen": "prerequest", "script": { - "id": "694e2fcf-c720-4e1c-90f6-10783dd62dcc", "exec": [ "pm.environment.set(\"operationOutcome_url\", pm.environment.get(\"operationOutcome\").url)" ], @@ -1288,7 +1261,6 @@ { "listen": "test", "script": { - "id": "b66bb3d3-b465-40f1-a0fa-507816ff7619", "exec": [ "// Response should have FHIR Content-Type", "pm.test(\"Content-type is application/ndjson\", function() {", @@ -1333,7 +1305,6 @@ { "listen": "test", "script": { - "id": "595d8c8f-6ae2-408d-a1eb-0001ac956230", "exec": [ "// Response should be 304", "pm.test('Response should be not-modified', function() {", @@ -1376,7 +1347,6 @@ { "listen": "test", "script": { - "id": "a3294ca5-5c4b-4fa3-ab92-d41efa02544a", "exec": [ "// Status should be 202", "pm.test(\"Status is 202\", function () {", @@ -1401,7 +1371,6 @@ { "listen": "prerequest", "script": { - "id": "5257c1e2-c5e5-4d41-8515-7d10fcc620a1", "exec": [ "console.log(\"Group:\", pm.environment.get(\"attribution_group\"));", "pm.environment.set(\"since_timestamp\", new Date().toISOString());" @@ -1461,7 +1430,6 @@ { "listen": "prerequest", "script": { - "id": "fe048bde-8074-4ea0-9342-a4070239da67", "exec": [ "// Wait between pings", "setTimeout(function() {}, 1000);" @@ -1472,7 +1440,6 @@ { "listen": "test", "script": { - "id": "5535235a-e239-48c2-b3b6-7339ef0662de", "exec": [ "if (pm.response.code == 200) {", " // Response should have FHIR Content-Type", @@ -1516,7 +1483,6 @@ { "listen": "test", "script": { - "id": "99ef66da-7460-4665-8ea2-8b9a1aa37a37", "exec": [ "pm.test(\"Status code should be 200\", function () {", " pm.response.to.have.status(200);", @@ -1540,7 +1506,6 @@ { "listen": "prerequest", "script": { - "id": "c08deedf-8b27-48b4-bc92-af78e031ccac", "exec": [ "" ], @@ -1575,8 +1540,7 @@ }, "response": [] } - ], - "protocolProfileBehavior": {} + ] }, { "name": "Updating", @@ -1587,7 +1551,6 @@ { "listen": "test", "script": { - "id": "b7dde0db-98b3-439e-91a7-e571952e2bd8", "exec": [ "pm.test(\"Status is 415\", function () {", " pm.response.to.have.status(415);", @@ -1640,7 +1603,6 @@ { "listen": "test", "script": { - "id": "3ae43670-a11e-4cac-9ebc-a8f1b39d3c04", "exec": [ "pm.test(\"Status is OK\", function () {", " pm.response.to.be.ok;", @@ -1697,7 +1659,6 @@ { "listen": "test", "script": { - "id": "2f4e8c5f-db7b-4525-b138-bd1ffa23e83a", "exec": [ "pm.test(\"Status is OK\", function () {", " pm.response.to.be.ok;", @@ -1752,7 +1713,6 @@ { "listen": "test", "script": { - "id": "4f043b01-671d-4778-9867-42cd5f0ac1c8", "exec": [ "pm.test(\"Status is 422\", function () {", " pm.response.to.have.status(422);", @@ -1803,7 +1763,6 @@ { "listen": "test", "script": { - "id": "e3bf1f3c-1426-4e9b-9d1e-c26ba2c54951", "exec": [ "pm.test(\"Status is 200\", function () {", " pm.response.to.have.status(200);", @@ -1859,8 +1818,7 @@ }, "response": [] } - ], - "protocolProfileBehavior": {} + ] }, { "name": "Deletion", @@ -1871,7 +1829,6 @@ { "listen": "test", "script": { - "id": "51fbe1c8-6368-4756-9426-338d5054412f", "exec": [ "// Status should be 200", "pm.test(\"Status is 200\", function () {", @@ -1948,7 +1905,6 @@ { "listen": "test", "script": { - "id": "0f1c1e51-dd2f-4132-bb79-78be81153ccc", "exec": [ "// Status should be 200", "pm.test(\"Status is 200\", function () {", @@ -2008,7 +1964,6 @@ { "listen": "test", "script": { - "id": "4abbb99c-f712-46cd-bbc6-2fb7cf35d0db", "exec": [ "// Status should be 200", "pm.test(\"Status is 200\", function () {", @@ -2044,7 +1999,6 @@ { "listen": "test", "script": { - "id": "64d427a6-f028-49de-a753-6c4de2a643e9", "exec": [ "pm.test(\"Bundle should be empty\", function() {", " var response = pm.response.json();", @@ -2089,7 +2043,6 @@ { "listen": "test", "script": { - "id": "723d3de9-12ef-49a0-8036-b31184d227b6", "exec": [ "pm.test(\"Status is OK\", function () {", " pm.response.to.be.ok;", @@ -2126,15 +2079,13 @@ "response": [] } ], - "description": "Test that we can remove various resource and that everything cleans up appropriately.", - "protocolProfileBehavior": {} + "description": "Test that we can remove various resource and that everything cleans up appropriately." } ], "event": [ { "listen": "prerequest", "script": { - "id": "bc44ce8b-485d-4072-9691-e17ad5417f7b", "type": "text/javascript", "exec": [ "" @@ -2144,7 +2095,6 @@ { "listen": "test", "script": { - "id": "60ad4542-25e7-4f05-95da-0ecbcde8a1aa", "type": "text/javascript", "exec": [ "" @@ -2154,35 +2104,29 @@ ], "variable": [ { - "id": "07881292-1de2-4918-86c0-5de1906dedb8", "key": "PROVIDER_ID", "value": "8D80925A-027E-43DD-8AED-9A501CC4CD91", "type": "string" }, { - "id": "7717ba57-46d8-486e-a5d7-da8237c900a2", "key": "hostname", "value": "localhost", "type": "string" }, { - "id": "beeb67a8-91ae-4b41-8ef6-6fb16399eee5", "key": "api_port", "value": "3002", "type": "string" }, { - "id": "770b174b-c2c4-439b-babf-d9ec7155dbdb", "key": "attribution_port", "value": "3500", "type": "string" }, { - "id": "6ef027ab-07ee-4c2a-8c8a-45d4ff6540d3", "key": "consent_port", "value": "3600", "type": "string" } - ], - "protocolProfileBehavior": {} + ] } \ No newline at end of file diff --git a/src/test/resources/bb-test-data/patient/-20140000008325.xml b/src/test/resources/bb-test-data/patient/-20140000008325.xml index 9406514cbc..0e6c56d360 100644 --- a/src/test/resources/bb-test-data/patient/-20140000008325.xml +++ b/src/test/resources/bb-test-data/patient/-20140000008325.xml @@ -31,6 +31,10 @@ + + + + diff --git a/src/test/resources/bb-test-data/patient/-20140000009893.xml b/src/test/resources/bb-test-data/patient/-20140000009893.xml index 0f5fb1650c..d489e26f15 100644 --- a/src/test/resources/bb-test-data/patient/-20140000009893.xml +++ b/src/test/resources/bb-test-data/patient/-20140000009893.xml @@ -31,6 +31,10 @@ + + + +