Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platform/bill/16144 #16550

Merged
merged 17 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ enum class ReportStreamEventProperties {
BUNDLE_DIGEST,
INGESTION_TYPE,
POISON_QUEUE_MESSAGE_ID,
ENRICHMENTS,
ORIGINAL_FORMAT,
TARGET_FORMAT,
;

@JsonKey
Expand All @@ -92,6 +95,7 @@ enum class ReportStreamEventName {
REPORT_NOT_PROCESSABLE,
ITEM_SENT,
PIPELINE_EXCEPTION,
ITEM_TRANSFORMED,
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ abstract class FHIREngine(
blobAccess ?: BlobAccess(),
azureEventService ?: AzureEventServiceImpl(),
reportService ?: ReportService(),
ReportStreamEventService(
reportEventService ?: ReportStreamEventService(
databaseAccess ?: databaseAccessSingleton,
azureEventService ?: AzureEventServiceImpl(),
reportService ?: ReportService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import ca.uhn.hl7v2.model.Segment
import ca.uhn.hl7v2.util.Terser
import fhirengine.engine.CustomFhirPathFunctions
import fhirengine.engine.CustomTranslationFunctions
import gov.cdc.prime.reportstream.shared.BlobUtils
import gov.cdc.prime.reportstream.shared.QueueMessage
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.CustomerStatus
Expand All @@ -21,16 +22,21 @@ import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.Event
import gov.cdc.prime.router.azure.db.Tables
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor
import gov.cdc.prime.router.azure.observability.bundleDigest.FhirPathBundleDigestLabResultExtractorStrategy
import gov.cdc.prime.router.azure.observability.context.MDCUtils
import gov.cdc.prime.router.azure.observability.context.withLoggingContext
import gov.cdc.prime.router.azure.observability.event.AzureEventService
import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Context
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Converter
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirTransformer
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7Utils.defaultHl7EncodingFiveChars
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7Utils.defaultHl7EncodingFourChars
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
Expand Down Expand Up @@ -113,12 +119,13 @@ class FHIRTranslator(
): FHIREngineRunResult {
logger.trace("Preparing to send original message")
val originalReport = reportService.getRootReport(message.reportId)
val bodyBytes = BlobAccess.downloadBlobAsByteArray(originalReport.bodyUrl)
val bodyAsString =
BlobAccess.downloadBlob(originalReport.bodyUrl, BlobUtils.digestToString(originalReport.blobDigest))

// get a Report from the message
val (report, event, blobInfo) = Report.generateReportAndUploadBlob(
Event.EventAction.SEND,
bodyBytes,
bodyAsString.toByteArray(),
listOf(message.reportId),
receiver,
this.metadata,
Expand Down Expand Up @@ -148,10 +155,9 @@ class FHIRTranslator(
actionHistory: ActionHistory,
): FHIREngineRunResult {
logger.trace("Preparing to send translated message")
val bodyBytes =
getByteArrayFromBundle(
receiver, FhirTranscoder.decode(BlobAccess.downloadBlob(message.blobURL, message.digest))
)
val originalReport = reportService.getRootReport(message.reportId)
val bundle = FhirTranscoder.decode(BlobAccess.downloadBlob(message.blobURL, message.digest))
val bodyBytes = getByteArrayFromBundle(receiver, bundle)

val (report, event, blobInfo) = Report.generateReportAndUploadBlob(
Event.EventAction.BATCH,
Expand All @@ -163,6 +169,35 @@ class FHIRTranslator(
topic = message.topic
)

val bundleDigestExtractor = BundleDigestExtractor(
FhirPathBundleDigestLabResultExtractorStrategy(
CustomContext(
bundle,
bundle,
mutableMapOf(),
CustomFhirPathFunctions()
)
)
)
reportEventService.sendItemEvent(
eventName = ReportStreamEventName.ITEM_TRANSFORMED,
childReport = report,
pipelineStepName = TaskAction.translate
) {
parentReportId(message.reportId)
params(
mapOf(
ReportStreamEventProperties.RECEIVER_NAME to receiver.fullName,
ReportStreamEventProperties.BUNDLE_DIGEST
to bundleDigestExtractor.generateDigest(bundle),
ReportStreamEventProperties.ORIGINAL_FORMAT to originalReport.bodyFormat,
ReportStreamEventProperties.TARGET_FORMAT to receiver.translation.format.name,
ReportStreamEventProperties.ENRICHMENTS to listOf(receiver.translation.schemaName)
)
)
trackingId(bundle)
}

return FHIREngineRunResult(
event,
report,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gov.cdc.prime.router.common
import assertk.assertThat
import assertk.assertions.hasSize
import assertk.assertions.isEqualTo
import gov.cdc.prime.reportstream.shared.BlobUtils.sha256Digest
import gov.cdc.prime.router.ClientSource
import gov.cdc.prime.router.CustomerStatus
import gov.cdc.prime.router.DeepOrganization
Expand Down Expand Up @@ -452,7 +453,8 @@ object UniversalPipelineTestUtils {
event,
Topic.FULL_ELR,
parentReport,
blobUrl
blobUrl,
reportContents,
)
}

Expand All @@ -464,6 +466,7 @@ object UniversalPipelineTestUtils {
topic: Topic,
parentReport: Report? = null,
bodyURL: String? = null,
reportContents: String,
): Report {
val report = Report(
fileFormat,
Expand Down Expand Up @@ -493,6 +496,7 @@ object UniversalPipelineTestUtils {
.setBodyUrl(report.bodyURL)
.setSendingOrg(universalPipelineOrganization.name)
.setSendingOrgClient("Test Sender")
.setBlobDigest(sha256Digest(reportContents.toByteArray(Charsets.UTF_8)))

ReportStreamTestDatabaseContainer.testDatabaseAccess.insertReportFile(
reportFile, txn, action
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package gov.cdc.prime.router.fhirengine.azure

import assertk.assertThat
import assertk.assertions.hasSize
import assertk.assertions.isEqualTo
import assertk.assertions.isInstanceOf
import assertk.assertions.isNotEqualTo
import assertk.assertions.isNotNull
import assertk.assertions.isNull
Expand All @@ -22,7 +24,10 @@ import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.Task
import gov.cdc.prime.router.azure.observability.event.AzureEventService
import gov.cdc.prime.router.azure.observability.event.InMemoryAzureEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEvent
import gov.cdc.prime.router.cli.tests.CompareData
import gov.cdc.prime.router.common.TestcontainersUtils
import gov.cdc.prime.router.common.UniversalPipelineTestUtils
Expand Down Expand Up @@ -191,6 +196,21 @@ class FHIRTranslatorIntegrationTests : Logging {
QueueAccess.sendMessage(any(), any())
}

// check events
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
assertThat(
azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
).isInstanceOf<ReportStreamItemEvent>()
val event = azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("HL7")
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
arnejduranovic marked this conversation as resolved.
Show resolved Hide resolved
assertThat(enrichments).hasSize(1)
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))

Expand Down Expand Up @@ -289,6 +309,21 @@ class FHIRTranslatorIntegrationTests : Logging {
QueueAccess.sendMessage(any(), any())
}

// check events
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
assertThat(
azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
).isInstanceOf<ReportStreamItemEvent>()
val event = azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("HL7")
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
assertThat(enrichments).hasSize(1)
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))

Expand Down Expand Up @@ -364,6 +399,21 @@ class FHIRTranslatorIntegrationTests : Logging {
QueueAccess.sendMessage(any(), any())
}

// check events
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
assertThat(
azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
).isInstanceOf<ReportStreamItemEvent>()
val event = azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
assertThat(enrichments).hasSize(1)
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))

Expand Down Expand Up @@ -449,6 +499,21 @@ class FHIRTranslatorIntegrationTests : Logging {
QueueAccess.sendMessage(any(), any())
}

// check events
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
assertThat(
azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
).isInstanceOf<ReportStreamItemEvent>()
val event = azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
assertThat(enrichments).hasSize(1)
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))

Expand Down
Loading