Skip to content

Commit

Permalink
[DPC-3562] Me/dpc 3562 new consent workflow (#2027)
Browse files Browse the repository at this point in the history
## 🎫 Ticket

https://jira.cms.gov/browse/DPC-3562

## 🛠 Changes

- New consent workflow in `JobBatchProcessor`.
- Eliminates duplicate Patient resource calls to BFD.
- Fixes bug in `dpc-consent` when an enrollee has multiple MBIs and some
have opt outs and others don't.
- Adds unit tests for `JobBatchProcessor` and `ResourceFetcher` in
`dpc-aggregation`.
- Updates integration tests in `dpc-api` to test enrollees with multiple
MBIs.
- Removed V2 code.

## ℹ️ Context for reviewers

The spec for this called for doing a full resource load at the start of
processing a job, but that was written under the assumption that we were
only making one call to BFD to load a patient's resources, similar to
how `dpc-api` provides a `Patient/$everything` operation. In reality,
for a full resource load (Patient, EoB and Coverage) we were making 8(!)
calls to BFD. The details were abstracted away in the `ResourceFetcher`
class. The process went like this:

### Old Process
**Lookback check**
1.Patient load by MBI get the Patient resource and extract its resource
id.
2. EoB load by Patient resource id.

**Consent check**
No calls to BFD.

**EoB resource load**
4. Patient load by MBI get the Patient resource and extract its resource
id.
5. EoB load by Patient resource id with a `since` date filter.

**Coverage resource load**
6. Patient load by MBI get the Patient resource and extract its resource
id.
7. Coverage load by Patient resource id with a `since` filter.

**Patient resource load**
8. Patient load by MBI get the Patient resource and extract its resource
id.
9. Patient load by Patient resource id with a `since` filter.


### New Process
The new process only makes 5 calls per patient, and looks like this:

1.Patient load by MBI get the Patient resource and save it for later.
(This also allows us to extract the Patient's MBIs for the consent
check)

**Consent check**
Uses MBIs from Patient resource to call dpc-consent.  No calls to BFD.

**Look back check**
2. EoB load by Patient resource id.

**EoB Resource load**
3. EoB load by Patient resource id with a `since` date filter.

**Coverage resource load**
4. Coverage load by Patient resource id with a `since` filter.

**Patient resource load**
5. Patient load by Patient resource id with a `since` filter.

### Future Improvements
We should be able to bring this down to three calls, one for each
resource type. We would just have to verify that all resources we get
from BFD have their `meta` element and `lastUpdated` value filled in,
and then we could do the date filtering on our side instead of making a
separate call to do it.

## ✅ Acceptance Validation

New unit and integration tests added and coverage is >98% for
`JobBatchProcessor` and `ResourceFetcher`.

## 🔒 Security Implications

- [ ] This PR adds a new software dependency or dependencies.
- [ ] This PR modifies or invalidates one or more of our security
controls.
- [ ] This PR stores or transmits data that was not stored or
transmitted before.
- [ ] This PR requires additional review of its security implications
for other reasons.

If any security implications apply, add Jason Ashbaugh (GitHub username:
StewGoin) as a reviewer and do not merge this PR without his approval.
  • Loading branch information
MEspositoE14s authored Jan 31, 2024
1 parent c30a527 commit 3fed283
Show file tree
Hide file tree
Showing 25 changed files with 1,099 additions and 1,168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.typesafe.config.Config;
import gov.cms.dpc.aggregation.engine.AggregationEngine;
import gov.cms.dpc.aggregation.engine.JobBatchProcessor;
import gov.cms.dpc.aggregation.engine.JobBatchProcessorV2;
import gov.cms.dpc.aggregation.engine.OperationsConfig;
import gov.cms.dpc.aggregation.health.AggregationEngineHealthCheck;
import gov.cms.dpc.aggregation.service.*;
Expand Down Expand Up @@ -40,7 +39,6 @@ public void configure() {
binder.bind(AggregationEngine.class);
binder.bind(AggregationManager.class).asEagerSingleton();
binder.bind(JobBatchProcessor.class);
binder.bind(JobBatchProcessorV2.class);
binder.bind(AggregationEngineHealthCheck.class);

// Healthchecks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class AggregationEngine implements Runnable {
private final IJobQueue queue;
private final OperationsConfig operationsConfig;
private final JobBatchProcessor jobBatchProcessor;
private final JobBatchProcessorV2 jobBatchProcessorV2;
private Disposable subscribe;

/**
Expand All @@ -57,12 +56,11 @@ public class AggregationEngine implements Runnable {
* @param jobBatchProcessor - {@link JobBatchProcessor} contains all the job processing logic
*/
@Inject
public AggregationEngine(@AggregatorID UUID aggregatorID, IJobQueue queue, OperationsConfig operationsConfig, JobBatchProcessor jobBatchProcessor, JobBatchProcessorV2 jobBatchProcessorV2) {
public AggregationEngine(@AggregatorID UUID aggregatorID, IJobQueue queue, OperationsConfig operationsConfig, JobBatchProcessor jobBatchProcessor) {
this.aggregatorID = aggregatorID;
this.queue = queue;
this.operationsConfig = operationsConfig;
this.jobBatchProcessor = jobBatchProcessor;
this.jobBatchProcessorV2 = jobBatchProcessorV2;
}

/**
Expand Down Expand Up @@ -196,12 +194,7 @@ protected void processJobBatch(JobQueueBatch job) {
}

private Optional<String> processPatient(JobQueueBatch job, String patientId) {
if (job.isV2()) {
jobBatchProcessorV2.processJobBatchPartial(aggregatorID, queue, job, patientId);
}
else {
jobBatchProcessor.processJobBatchPartial(aggregatorID, queue, job, patientId);
}
jobBatchProcessor.processJobBatchPartial(aggregatorID, queue, job, patientId);

// Stop processing when no patients or early shutdown
return this.isRunning() ? job.fetchNextPatient(aggregatorID) : Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gov.cms.dpc.aggregation.engine;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import gov.cms.dpc.aggregation.service.*;
Expand All @@ -9,16 +10,14 @@
import gov.cms.dpc.common.MDCConstants;
import gov.cms.dpc.common.utils.MetricMaker;
import gov.cms.dpc.fhir.DPCResourceType;
import gov.cms.dpc.fhir.FHIRExtractors;
import gov.cms.dpc.queue.IJobQueue;
import gov.cms.dpc.queue.models.JobQueueBatch;
import gov.cms.dpc.queue.models.JobQueueBatchFile;
import io.reactivex.Flowable;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.lang3.tuple.Pair;
import org.hl7.fhir.dstu3.model.ExplanationOfBenefit;
import org.hl7.fhir.dstu3.model.OperationOutcome;
import org.hl7.fhir.dstu3.model.Patient;
import org.hl7.fhir.dstu3.model.Resource;
import org.hl7.fhir.dstu3.model.*;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,6 +29,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static gov.cms.dpc.fhir.FHIRExtractors.getPatientMBI;
import static gov.cms.dpc.fhir.FHIRExtractors.getPatientMBIs;

public class JobBatchProcessor {
Expand Down Expand Up @@ -63,49 +63,118 @@ public JobBatchProcessor(BlueButtonClient bbclient, FhirContext fhirContext, Met
* @param aggregatorID the current aggregatorID
* @param queue the queue
* @param job the job to process
* @param patientID the current patient id to process
* @param mbi the current patient mbi to process
* @return A list of batch files {@link JobQueueBatchFile}
*/
public List<JobQueueBatchFile> processJobBatchPartial(UUID aggregatorID, IJobQueue queue, JobQueueBatch job, String patientID) {
public List<JobQueueBatchFile> processJobBatchPartial(UUID aggregatorID, IJobQueue queue, JobQueueBatch job, String mbi) {
StopWatch stopWatch = StopWatch.createStarted();
OutcomeReason failReason = null;
final Pair<Optional<List<ConsentResult>>, Optional<OperationOutcome>> consentResult = getConsent(patientID);
Optional<OutcomeReason> failReason = Optional.empty();
Optional<Flowable<Resource>> flowable = Optional.empty();

Flowable<Resource> flowable;
if (consentResult.getRight().isPresent()) {
flowable = Flowable.just(consentResult.getRight().get());
failReason = OutcomeReason.INTERNAL_ERROR;
} else if (isOptedOut(consentResult.getLeft())) {
failReason = OutcomeReason.CONSENT_OPTED_OUT;
flowable = Flowable.just(AggregationUtils.toOperationOutcome(OutcomeReason.CONSENT_OPTED_OUT, patientID));
} else if (isLookBackExempt(job.getOrgID())) {
logger.info("Skipping lookBack for org: {}", job.getOrgID().toString());
MDC.put(MDCConstants.IS_SMOKE_TEST_ORG, "true");
flowable = Flowable.fromIterable(job.getResourceTypes())
.flatMap(r -> fetchResource(job, patientID, r, job.getSince().orElse(null)));
} else {
List<LookBackAnswer> answers = getLookBackAnswers(job, patientID);
if (passesLookBack(answers)) {
flowable = Flowable.fromIterable(job.getResourceTypes())
.flatMap(r -> fetchResource(job, patientID, r, job.getSince().orElse(null)));
} else {
failReason = LookBackAnalyzer.analyze(answers);
flowable = Flowable.just(AggregationUtils.toOperationOutcome(failReason, patientID));
// Load the Patient resource from BFD.
final Optional<Patient> optPatient = fetchPatient(job, mbi);
if(optPatient.isEmpty()) {
// Failed to load patient
failReason = Optional.of(OutcomeReason.INTERNAL_ERROR);
flowable = Optional.of(Flowable.just(AggregationUtils.toOperationOutcome(failReason.get(), mbi)));
}

// Check if the patient has opted out
if(flowable.isEmpty()) {
Optional<Pair<Flowable<Resource>, OutcomeReason>> consentResult = checkForOptOut(optPatient.get());
if(consentResult.isPresent()) {
flowable = Optional.of(consentResult.get().getLeft());
failReason = Optional.of(consentResult.get().getRight());
}
}

final var results = writeResource(job, flowable)
// Check if the patient passes look back
if(flowable.isEmpty()) {
Optional<Pair<Flowable<Resource>, OutcomeReason>> lookBackResult = checkLookBack(optPatient.get(), job);
if(lookBackResult.isPresent()) {
flowable = Optional.of(lookBackResult.get().getLeft());
failReason = Optional.of(lookBackResult.get().getRight());
}
}

// All checks passed, load resources
if(flowable.isEmpty()) {
flowable = Optional.of(
Flowable.fromIterable(job.getResourceTypes()).flatMap(r -> fetchResource(job, optPatient.get(), r, job.getSince().orElse(null)))
);
}

final var results = writeResource(job, flowable.get())
.toList()
.blockingGet();
queue.completePartialBatch(job, aggregatorID);

final String resourcesRequested = job.getResourceTypes().stream().map(DPCResourceType::getPath).filter(Objects::nonNull).collect(Collectors.joining(";"));
final String failReasonLabel = failReason == null ? "NA" : failReason.name();
final String failReasonLabel = failReason.isEmpty() ? "NA" : failReason.get().name();
stopWatch.stop();
logger.info("dpcMetric=DataExportResult,dataRetrieved={},failReason={},resourcesRequested={},duration={}", failReason == null, failReasonLabel, resourcesRequested, stopWatch.getTime());
logger.info("dpcMetric=DataExportResult,dataRetrieved={},failReason={},resourcesRequested={},duration={}", failReason.isEmpty(), failReasonLabel, resourcesRequested, stopWatch.getTime());
return results;
}

/**
* Checks the given patient against the consent service and returns any issues if the check doesn't pass.
* @param patient {@link Patient} resource we're checking consent for.
* @return If there's a problem, it returns a pair of a {@link Flowable} {@link OperationOutcome} and an {@link OutcomeReason}.
* If the Patient passes the consent check, it returns an empty {@link Optional}s.
*/
private Optional<Pair<Flowable<Resource>, OutcomeReason>> checkForOptOut(Patient patient) {
final Pair<Optional<List<ConsentResult>>, Optional<OperationOutcome>> consentResult = getConsent(patient);

if (consentResult.getRight().isPresent()) {
// Consent check returned an error
return Optional.of(
Pair.of(
Flowable.just(consentResult.getRight().get()),
OutcomeReason.INTERNAL_ERROR
)
);
} else if (isOptedOut(consentResult.getLeft())) {
// Enrollee is opted out
return Optional.of(
Pair.of(
Flowable.just(AggregationUtils.toOperationOutcome(OutcomeReason.CONSENT_OPTED_OUT, FHIRExtractors.getPatientMBI(patient))),
OutcomeReason.CONSENT_OPTED_OUT
)
);
}

// Passes consent check
return Optional.empty();
}

/**
* Does the patient look back check and returns any issues if it doesn't pass.
* @param patient {@link Patient} resource we're looking for a relationship for.
* @param job {@link JobQueueBatch} currently running.
* @return If there's a problem, it returns a pair of a {@link Flowable} {@link OperationOutcome} and an {@link OutcomeReason}.
* If the look back check passes, an empty {@link Optional}.
*/
private Optional<Pair<Flowable<Resource>, OutcomeReason>> checkLookBack(Patient patient, JobQueueBatch job) {
if (isLookBackExempt(job.getOrgID())) {
logger.info("Skipping lookBack for org: {}", job.getOrgID().toString());
MDC.put(MDCConstants.IS_SMOKE_TEST_ORG, "true");
} else {
List<LookBackAnswer> answers = getLookBackAnswers(job, patient);
if (!passesLookBack(answers)) {
OutcomeReason failReason = LookBackAnalyzer.analyze(answers);
return Optional.of(
Pair.of(
Flowable.just(AggregationUtils.toOperationOutcome(failReason, FHIRExtractors.getPatientMBI(patient))),
failReason
)
);
}
}

// Passes lookback check
return Optional.empty();
}

private boolean isLookBackExempt(UUID orgId) {
List<String> exemptOrgs = operationsConfig.getLookBackExemptOrgs();
if (exemptOrgs != null) {
Expand All @@ -117,32 +186,62 @@ private boolean isLookBackExempt(UUID orgId) {
/**
* Fetch and write a specific resource type
*
* @param job the job to associate the fetch
* @param patientID the patientID to fetch data
* @param resourceType the resourceType to fetch data
* @param since the since date
* @param job the job to associate the fetch
* @param patient the {@link Patient} we're fetching data for
* @return A flowable and resourceType the user requested
*/
private Flowable<Resource> fetchResource(JobQueueBatch job, String patientID, DPCResourceType resourceType, OffsetDateTime since) {
private Flowable<Resource> fetchResource(JobQueueBatch job, Patient patient, DPCResourceType resourceType, OffsetDateTime since) {
// Make this flow hot (ie. only called once) when multiple subscribers attach
final var fetcher = new ResourceFetcher(bbclient,
job.getJobID(),
job.getBatchID(),
resourceType,
since,
job.getTransactionTime());
return fetcher.fetchResources(patientID, new JobHeaders(job.getRequestingIP(),job.getJobID().toString(),
return fetcher.fetchResources(patient, new JobHeaders(job.getRequestingIP(),job.getJobID().toString(),
job.getProviderNPI(),job.getTransactionTime().toString(),job.isBulk()).buildHeaders())
.flatMap(Flowable::fromIterable);
}

private List<LookBackAnswer> getLookBackAnswers(JobQueueBatch job, String patientId) {
/**
* Fetches the {@link Patient} referenced by the given mbi. Throws a {@link ResourceNotFoundException} if no
* {@link Patient} can be found.
* @param job The job associated to the fetch
* @param mbi The mbi of the {@link Patient}
* @return The {@link Patient}
*/
private Optional<Patient> fetchPatient(JobQueueBatch job, String mbi) {
JobHeaders headers = new JobHeaders(
job.getRequestingIP(),
job.getJobID().toString(),
job.getProviderNPI(),
job.getTransactionTime().toString(),
job.isBulk());

Bundle patients;
try {
patients = bbclient.requestPatientFromServerByMbi(mbi, headers.buildHeaders());
} catch (Exception e) {
logger.error("Failed to retrieve Patient", e);
return Optional.empty();
}

// If we get more than one unique Patient for an MBI then we've got some upstream problems.
if (patients.getTotal() == 1) {
return Optional.of((Patient) patients.getEntryFirstRep().getResource());
}

logger.error("Expected 1 Patient to match MBI but found {}", patients.getTotal());
return Optional.empty();
}

private List<LookBackAnswer> getLookBackAnswers(JobQueueBatch job, Patient patient) {
List<LookBackAnswer> result = new ArrayList<>();
final String practitionerNPI = job.getProviderNPI();
final String organizationNPI = job.getOrgNPI();
if (practitionerNPI != null && organizationNPI != null) {
MDC.put(MDCConstants.PROVIDER_NPI, practitionerNPI);
Flowable<Resource> flowable = fetchResource(job, patientId, DPCResourceType.ExplanationOfBenefit, null);
Flowable<Resource> flowable = fetchResource(job, patient, DPCResourceType.ExplanationOfBenefit, null);
result = flowable
.filter(resource -> Objects.requireNonNull(DPCResourceType.ExplanationOfBenefit.getPath()).equals(resource.getResourceType().getPath()))
.map(ExplanationOfBenefit.class::cast)
Expand Down Expand Up @@ -213,12 +312,19 @@ private Meter getMeter(DPCResourceType resourceType) {
return DPCResourceType.OperationOutcome == resourceType ? operationalOutcomeMeter : resourceMeter;
}

private Pair<Optional<List<ConsentResult>>, Optional<OperationOutcome>> getConsent(String patientId) {
/**
* Returns a {@link List} of {@link ConsentResult}s if successful. An {@link OperationOutcome} if not. Only one of
* the two {@link Optional}s returned will be filled in.
*
* @param patient A {@link Patient} that we want to get {@link ConsentResult}s for
* @return A {@link Pair}
*/
private Pair<Optional<List<ConsentResult>>, Optional<OperationOutcome>> getConsent(Patient patient) {
try {
return Pair.of(consentService.getConsent(patientId), Optional.empty());
return Pair.of(consentService.getConsent(getPatientMBIs(patient)), Optional.empty());
} catch (Exception e) {
logger.error("Unable to retrieve consent from consent service.", e);
OperationOutcome operationOutcome = AggregationUtils.toOperationOutcome(OutcomeReason.INTERNAL_ERROR, patientId);
OperationOutcome operationOutcome = AggregationUtils.toOperationOutcome(OutcomeReason.INTERNAL_ERROR, getPatientMBI(patient));
return Pair.of(Optional.empty(), Optional.of(operationOutcome));
}
}
Expand All @@ -245,20 +351,4 @@ private boolean passesLookBack(List<LookBackAnswer> answers) {
return answers.stream()
.anyMatch(a -> a.matchDateCriteria() && (a.orgNPIMatchAnyEobNPIs() || a.practitionerNPIMatchAnyEobNPIs()));
}

/**
* Takes a list of resources, finds all of the {@link Patient}s and returns a list of their valid
* MBIs. If there is more than one {@link Patient} all of their MBIs will be returned, and if there are no
* {@link Patient}s an empty list will be returned.
* @param resources A {@link Flowable} of FHIR {@link Resource}s
* @return A {@link List} of MBIs
*/
private List<String> getMBIs(Flowable<Resource> resources) {
return resources
.filter(r -> DPCResourceType.Patient.getPath().equals(r.getResourceType().getPath()))
.map(r -> (Patient)r)
.flatMap(p -> Flowable.fromIterable(getPatientMBIs(p)))
.toList()
.blockingGet();
}
}
Loading

0 comments on commit 3fed283

Please sign in to comment.