Skip to content

Commit

Permalink
Merge branch 'main' into platform/david-navapbc/21nov24/dependabot
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-h-wang authored Nov 22, 2024
2 parents f7ac442 + ad28996 commit ed63fd1
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ enum class ReportStreamEventProperties {
BUNDLE_DIGEST,
INGESTION_TYPE,
POISON_QUEUE_MESSAGE_ID,
ENRICHMENTS,
ORIGINAL_FORMAT,
TARGET_FORMAT,
;

@JsonKey
Expand All @@ -92,6 +95,7 @@ enum class ReportStreamEventName {
REPORT_NOT_PROCESSABLE,
ITEM_SENT,
PIPELINE_EXCEPTION,
ITEM_TRANSFORMED,
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ abstract class FHIREngine(
blobAccess ?: BlobAccess(),
azureEventService ?: AzureEventServiceImpl(),
reportService ?: ReportService(),
ReportStreamEventService(
reportEventService ?: ReportStreamEventService(
databaseAccess ?: databaseAccessSingleton,
azureEventService ?: AzureEventServiceImpl(),
reportService ?: ReportService()
Expand Down
47 changes: 41 additions & 6 deletions prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import ca.uhn.hl7v2.model.Segment
import ca.uhn.hl7v2.util.Terser
import fhirengine.engine.CustomFhirPathFunctions
import fhirengine.engine.CustomTranslationFunctions
import gov.cdc.prime.reportstream.shared.BlobUtils
import gov.cdc.prime.reportstream.shared.QueueMessage
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.CustomerStatus
Expand All @@ -21,16 +22,21 @@ import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.Event
import gov.cdc.prime.router.azure.db.Tables
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor
import gov.cdc.prime.router.azure.observability.bundleDigest.FhirPathBundleDigestLabResultExtractorStrategy
import gov.cdc.prime.router.azure.observability.context.MDCUtils
import gov.cdc.prime.router.azure.observability.context.withLoggingContext
import gov.cdc.prime.router.azure.observability.event.AzureEventService
import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Context
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Converter
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirTransformer
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7Utils.defaultHl7EncodingFiveChars
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7Utils.defaultHl7EncodingFourChars
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
Expand Down Expand Up @@ -113,12 +119,13 @@ class FHIRTranslator(
): FHIREngineRunResult {
logger.trace("Preparing to send original message")
val originalReport = reportService.getRootReport(message.reportId)
val bodyBytes = BlobAccess.downloadBlobAsByteArray(originalReport.bodyUrl)
val bodyAsString =
BlobAccess.downloadBlob(originalReport.bodyUrl, BlobUtils.digestToString(originalReport.blobDigest))

// get a Report from the message
val (report, event, blobInfo) = Report.generateReportAndUploadBlob(
Event.EventAction.SEND,
bodyBytes,
bodyAsString.toByteArray(),
listOf(message.reportId),
receiver,
this.metadata,
Expand Down Expand Up @@ -148,10 +155,9 @@ class FHIRTranslator(
actionHistory: ActionHistory,
): FHIREngineRunResult {
logger.trace("Preparing to send translated message")
val bodyBytes =
getByteArrayFromBundle(
receiver, FhirTranscoder.decode(BlobAccess.downloadBlob(message.blobURL, message.digest))
)
val originalReport = reportService.getRootReport(message.reportId)
val bundle = FhirTranscoder.decode(BlobAccess.downloadBlob(message.blobURL, message.digest))
val bodyBytes = getByteArrayFromBundle(receiver, bundle)

val (report, event, blobInfo) = Report.generateReportAndUploadBlob(
Event.EventAction.BATCH,
Expand All @@ -163,6 +169,35 @@ class FHIRTranslator(
topic = message.topic
)

val bundleDigestExtractor = BundleDigestExtractor(
FhirPathBundleDigestLabResultExtractorStrategy(
CustomContext(
bundle,
bundle,
mutableMapOf(),
CustomFhirPathFunctions()
)
)
)
reportEventService.sendItemEvent(
eventName = ReportStreamEventName.ITEM_TRANSFORMED,
childReport = report,
pipelineStepName = TaskAction.translate
) {
parentReportId(message.reportId)
params(
mapOf(
ReportStreamEventProperties.RECEIVER_NAME to receiver.fullName,
ReportStreamEventProperties.BUNDLE_DIGEST
to bundleDigestExtractor.generateDigest(bundle),
ReportStreamEventProperties.ORIGINAL_FORMAT to originalReport.bodyFormat,
ReportStreamEventProperties.TARGET_FORMAT to receiver.translation.format.name,
ReportStreamEventProperties.ENRICHMENTS to listOf(receiver.translation.schemaName)
)
)
trackingId(bundle)
}

return FHIREngineRunResult(
event,
report,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gov.cdc.prime.router.common
import assertk.assertThat
import assertk.assertions.hasSize
import assertk.assertions.isEqualTo
import gov.cdc.prime.reportstream.shared.BlobUtils.sha256Digest
import gov.cdc.prime.router.ClientSource
import gov.cdc.prime.router.CustomerStatus
import gov.cdc.prime.router.DeepOrganization
Expand Down Expand Up @@ -452,7 +453,8 @@ object UniversalPipelineTestUtils {
event,
Topic.FULL_ELR,
parentReport,
blobUrl
blobUrl,
reportContents,
)
}

Expand All @@ -464,6 +466,7 @@ object UniversalPipelineTestUtils {
topic: Topic,
parentReport: Report? = null,
bodyURL: String? = null,
reportContents: String,
): Report {
val report = Report(
fileFormat,
Expand Down Expand Up @@ -493,6 +496,7 @@ object UniversalPipelineTestUtils {
.setBodyUrl(report.bodyURL)
.setSendingOrg(universalPipelineOrganization.name)
.setSendingOrgClient("Test Sender")
.setBlobDigest(sha256Digest(reportContents.toByteArray(Charsets.UTF_8)))

ReportStreamTestDatabaseContainer.testDatabaseAccess.insertReportFile(
reportFile, txn, action
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package gov.cdc.prime.router.fhirengine.azure

import assertk.assertThat
import assertk.assertions.hasSize
import assertk.assertions.isEqualTo
import assertk.assertions.isInstanceOf
import assertk.assertions.isNotEqualTo
import assertk.assertions.isNotNull
import assertk.assertions.isNull
Expand All @@ -22,7 +24,10 @@ import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.Task
import gov.cdc.prime.router.azure.observability.event.AzureEventService
import gov.cdc.prime.router.azure.observability.event.InMemoryAzureEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEvent
import gov.cdc.prime.router.cli.tests.CompareData
import gov.cdc.prime.router.common.TestcontainersUtils
import gov.cdc.prime.router.common.UniversalPipelineTestUtils
Expand Down Expand Up @@ -191,6 +196,21 @@ class FHIRTranslatorIntegrationTests : Logging {
QueueAccess.sendMessage(any(), any())
}

// check events
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
assertThat(
azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
).isInstanceOf<ReportStreamItemEvent>()
val event = azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("HL7")
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
assertThat(enrichments).hasSize(1)
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))

Expand Down Expand Up @@ -289,6 +309,21 @@ class FHIRTranslatorIntegrationTests : Logging {
QueueAccess.sendMessage(any(), any())
}

// check events
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
assertThat(
azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
).isInstanceOf<ReportStreamItemEvent>()
val event = azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("HL7")
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
assertThat(enrichments).hasSize(1)
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))

Expand Down Expand Up @@ -364,6 +399,21 @@ class FHIRTranslatorIntegrationTests : Logging {
QueueAccess.sendMessage(any(), any())
}

// check events
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
assertThat(
azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
).isInstanceOf<ReportStreamItemEvent>()
val event = azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
assertThat(enrichments).hasSize(1)
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))

Expand Down Expand Up @@ -449,6 +499,21 @@ class FHIRTranslatorIntegrationTests : Logging {
QueueAccess.sendMessage(any(), any())
}

// check events
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
assertThat(
azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
).isInstanceOf<ReportStreamItemEvent>()
val event = azureEventService
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("FHIR")
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
assertThat(enrichments).hasSize(1)
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))

Expand Down
Loading

0 comments on commit ed63fd1

Please sign in to comment.