Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platform/bill/16144 #16550

Merged
merged 17 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ enum class ReportStreamEventName {
REPORT_NOT_PROCESSABLE,
ITEM_SENT,
PIPELINE_EXCEPTION,
ITEM_TRANSFORMED,
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import ca.uhn.hl7v2.model.Segment
import ca.uhn.hl7v2.util.Terser
import fhirengine.engine.CustomFhirPathFunctions
import fhirengine.engine.CustomTranslationFunctions
import gov.cdc.prime.reportstream.shared.BlobUtils
import gov.cdc.prime.reportstream.shared.BlobUtils.sha256Digest
import gov.cdc.prime.reportstream.shared.QueueMessage
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.CustomerStatus
Expand All @@ -26,6 +28,8 @@ import gov.cdc.prime.router.azure.observability.context.withLoggingContext
import gov.cdc.prime.router.azure.observability.event.AzureEventService
import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEventBuilder
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Context
Expand Down Expand Up @@ -114,6 +118,10 @@ class FHIRTranslator(
logger.trace("Preparing to send original message")
val originalReport = reportService.getRootReport(message.reportId)
val bodyBytes = BlobAccess.downloadBlobAsByteArray(originalReport.bodyUrl)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Have we considered updating this to use BlobAccess.downloadBlob(...) which will do the digest check for us?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change.

val localDigest = BlobUtils.digestToString(sha256Digest(bodyBytes))
check(message.digest == localDigest) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems wrong to me. The digest value in the message I believe would be the digest of the blob associated with the passed in report (so the report that is downloaded in sendTranslated). Here, you are downloading the ROOT report, which would have a different digest.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if I'm understanding correctly, we should be comparing message.digest with blobInfo.digest (after the file is uploaded)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message.digest is already used in sendTranslated and that's the only place it should be used. Here, you are downloading the root report, so you need to to compare the downloaded root report to the root report's digest (which is stored in the DB report_file table)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest changes should address this.

"Downloaded file does not match expected file\n${message.digest} | $localDigest"
}

// get a Report from the message
val (report, event, blobInfo) = Report.generateReportAndUploadBlob(
Expand Down Expand Up @@ -163,6 +171,18 @@ class FHIRTranslator(
topic = message.topic
)

val builder = ReportStreamItemEventBuilder(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should prefer the style that is shown in FHIRDestinationFilter where we pass in the builder directly to sendItemEvent (so we remain consistent):

reportEventService.sendItemEvent(
            eventName = ReportStreamEventName.ITEM_ROUTED,
            childReport = report,
            pipelineStepName = TaskAction.destination_filter
        ) {
            parentReportId(queueMessage.reportId)
            params(
                mapOf(
                ReportStreamEventProperties.RECEIVER_NAME to receiver.fullName,
                ReportStreamEventProperties.BUNDLE_DIGEST
                    to bundleDigestExtractor.generateDigest(bundle)
            )
            )
            trackingId(bundle)
        }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be in there now.

reportEventService,
azureEventService,
ReportStreamEventName.ITEM_TRANSFORMED,
report.id,
report.bodyURL,
message.topic,
TaskAction.translate
)
builder.theParentReportId = message.reportId
azureEventService.trackEvent(builder.buildEvent())

return FHIREngineRunResult(
event,
report,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.Action
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.observability.event.AzureEventService
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.report.ReportService
import gov.cdc.prime.router.unittest.UnitTestUtils
import io.mockk.clearAllMocks
import io.mockk.every
Expand Down Expand Up @@ -75,6 +78,8 @@ class FhirTranslatorTests {
)
)
)
val reportServiceMock = mockk<ReportService>()
val azureEventService = mockk<AzureEventService>()

private fun makeFhirEngine(
metadata: Metadata = Metadata(
Expand All @@ -87,7 +92,8 @@ class FhirTranslatorTests {
settings: SettingsProvider = FileSettings().loadOrganizations(oneOrganization),
): FHIRTranslator {
return FHIREngine.Builder().metadata(metadata).settingsProvider(settings).databaseAccess(accessSpy)
.blobAccess(blobMock).build(TaskAction.translate) as FHIRTranslator
.blobAccess(blobMock).reportService(reportServiceMock).azureEventService(azureEventService)
.build(TaskAction.translate) as FHIRTranslator
}

@BeforeEach
Expand All @@ -106,10 +112,11 @@ class FhirTranslatorTests {
val actionLogger = mockk<ActionLogger>()
val engine = makeFhirEngine()

val reportId = UUID.randomUUID()
val message =
spyk(
FhirTranslateQueueMessage(
UUID.randomUUID(),
reportId,
arnejduranovic marked this conversation as resolved.
Show resolved Hide resolved
BLOB_URL,
"test",
BLOB_SUB_FOLDER,
Expand All @@ -120,6 +127,7 @@ class FhirTranslatorTests {

val bodyFormat = MimeFormat.FHIR
val bodyUrl = BODY_URL
val rootReport = mockk<ReportFile>()

every { actionLogger.hasErrors() } returns false
every { BlobAccess.downloadBlob(any(), any()) }
Expand Down Expand Up @@ -154,6 +162,14 @@ class FhirTranslatorTests {
""
)
)
every { rootReport.reportId } returns reportId
every { rootReport.sendingOrg } returns oneOrganization.name
every { rootReport.sendingOrgClient } returns oneOrganization.receivers[0].fullName
every { reportServiceMock.getRootReport(any()) } returns rootReport
every { reportServiceMock.getRootReports(any()) } returns listOf(rootReport)
every { reportServiceMock.getRootItemIndex(any(), any()) } returns 1
every { BlobAccess.downloadBlobAsByteArray(any()) } returns "1".toByteArray(Charsets.UTF_8)
every { azureEventService.trackEvent(any()) } returns Unit

// act
accessSpy.transact { txn ->
Expand All @@ -167,6 +183,88 @@ class FhirTranslatorTests {
BlobAccess.Companion.uploadBlob(any(), any(), any())
accessSpy.insertTask(any(), any(), any(), any(), any())
actionHistory.trackActionReceiverInfo(any(), any())
azureEventService.trackEvent(any())
}
}

@Test
fun `test translation happy path with file digest exception`() {
mockkObject(BlobAccess)
mockkObject(BlobAccess.BlobContainerMetadata)

// set up
val actionHistory = mockk<ActionHistory>()
val actionLogger = mockk<ActionLogger>()
val engine = makeFhirEngine()

val reportId = UUID.randomUUID()
val message =
spyk(
FhirTranslateQueueMessage(
reportId,
BLOB_URL,
"test",
BLOB_SUB_FOLDER,
topic = Topic.ELR_ELIMS,
oneOrganization.receivers[0].fullName
)
)

val bodyFormat = MimeFormat.FHIR
val bodyUrl = BODY_URL
val rootReport = mockk<ReportFile>()

every { actionLogger.hasErrors() } returns false
every { actionLogger.error(any<ActionLogDetail>()) } returns Unit
every { BlobAccess.downloadBlob(any(), any()) }
.returns(File(VALID_DATA_URL).readText())
every { BlobAccess.Companion.uploadBlob(any(), any()) } returns "test"
every {
BlobAccess.BlobContainerMetadata.build(
"metadata",
any()
)
} returns mockk<BlobAccess.BlobContainerMetadata>()
every { accessSpy.insertTask(any(), bodyFormat.toString(), bodyUrl, any()) }.returns(Unit)
every { actionHistory.trackCreatedReport(any(), any(), blobInfo = any()) }.returns(Unit)
every { actionHistory.trackExistingInputReport(any()) }.returns(Unit)
every { actionHistory.trackActionReceiverInfo(any(), any()) }.returns(Unit)
every { actionHistory.action }.returns(
Action(
1,
TaskAction.receive,
"",
"",
OffsetDateTime.now(),
JSONB.valueOf(""),
1,
1,
"",
"",
"",
"",
"",
"",
""
)
)
every { rootReport.reportId } returns reportId
every { rootReport.sendingOrg } returns oneOrganization.name
every { rootReport.sendingOrgClient } returns oneOrganization.receivers[0].fullName
every { rootReport.bodyUrl } returns BLOB_URL
every { reportServiceMock.getRootReport(any()) } returns rootReport
every { reportServiceMock.getRootReports(any()) } returns listOf(rootReport)
every { reportServiceMock.getRootItemIndex(any(), any()) } returns 1
every { BlobAccess.downloadBlobAsByteArray(any()) } returns "1".toByteArray(Charsets.UTF_8)

// act
@Suppress("ktlint:standard:max-line-length")
accessSpy.transact { txn ->
assertFailsWith<IllegalStateException>(
message = "Downloaded file does not match expected file\n" +
"test | 6bffffff86ffffffb273ffffffff34fffffffcffffffe1ffffff9d6bffffff804effffffff5a3f5747ffffffadffffffa4ffffffeaffffffa22f1d49ffffffc01e52ffffffddffffffb7ffffff875b4b",
block = { engine.run(message, actionLogger, actionHistory, txn) }
)
}
}

Expand All @@ -182,9 +280,10 @@ class FhirTranslatorTests {
val actionLogger = mockk<ActionLogger>()

val engine = makeFhirEngine(settings = settings)
val reportId = UUID.randomUUID()
val message = spyk(
FhirTranslateQueueMessage(
UUID.randomUUID(),
reportId,
BLOB_URL,
"test",
BLOB_SUB_FOLDER,
Expand All @@ -195,6 +294,7 @@ class FhirTranslatorTests {

val bodyFormat = MimeFormat.FHIR
val bodyUrl = BODY_URL
val rootReport = mockk<ReportFile>()
every { actionLogger.hasErrors() } returns false
every { BlobAccess.downloadBlob(any(), any()) }
.returns(File(VALID_DATA_URL).readText())
Expand Down Expand Up @@ -229,6 +329,14 @@ class FhirTranslatorTests {
)
)

every { rootReport.reportId } returns reportId
every { rootReport.sendingOrg } returns oneOrganization.name
every { rootReport.sendingOrgClient } returns oneOrganization.receivers[0].fullName
every { reportServiceMock.getRootReport(any()) } returns rootReport
every { reportServiceMock.getRootReports(any()) } returns listOf(rootReport)
every { reportServiceMock.getRootItemIndex(any(), any()) } returns 1
every { azureEventService.trackEvent(any()) } returns Unit

// act
accessSpy.transact { txn ->
engine.run(message, actionLogger, actionHistory, txn)
Expand Down
Loading