Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jalbinson committed Nov 21, 2024
1 parent a91a0f7 commit f99f5f8
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@
"HL7_BATCH"
]
},
"hl7AcknowledgementEnabled": {
"type": "boolean"
},
"name": {
"type": "string"
},
Expand Down
8 changes: 7 additions & 1 deletion prime-router/src/main/kotlin/Sender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import java.time.OffsetDateTime
* @property allowDuplicates if false a duplicate submission will be rejected
* @property senderType one of four broad sender categories
* @property primarySubmissionMethod Sender preference for submission - manual or automatic
* @property hl7AcknowledgementEnabled should we return an HL7 ACK response if MSH.15 == "AL"?
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(
Expand Down Expand Up @@ -59,6 +60,7 @@ abstract class Sender(
val allowDuplicates: Boolean = true,
val senderType: SenderType? = null,
val primarySubmissionMethod: PrimarySubmissionMethod? = null,
val hl7AcknowledgementEnabled: Boolean = false,
override var version: Int? = null,
override var createdBy: String? = null,
override var createdAt: OffsetDateTime? = null,
Expand Down Expand Up @@ -194,6 +196,7 @@ class UniversalPipelineSender : Sender {
allowDuplicates: Boolean = true,
senderType: SenderType? = null,
primarySubmissionMethod: PrimarySubmissionMethod? = null,
hl7AcknowledgementEnabled: Boolean = false,
topic: Topic,
) : super(
topic,
Expand All @@ -205,7 +208,8 @@ class UniversalPipelineSender : Sender {
processingType,
allowDuplicates,
senderType,
primarySubmissionMethod
primarySubmissionMethod,
hl7AcknowledgementEnabled
)

constructor(copy: UniversalPipelineSender) : this(
Expand All @@ -214,6 +218,7 @@ class UniversalPipelineSender : Sender {
copy.format,
copy.customerStatus,
copy.schemaName,
hl7AcknowledgementEnabled = copy.hl7AcknowledgementEnabled,
topic = copy.topic,
)

Expand Down Expand Up @@ -355,6 +360,7 @@ class MonkeypoxSender : LegacyPipelineSender {
allowDuplicates,
senderType,
primarySubmissionMethod

)

constructor(copy: MonkeypoxSender) : this(
Expand Down
6 changes: 6 additions & 0 deletions prime-router/src/main/kotlin/azure/HttpUtilities.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class HttpUtilities {
companion object : Logging {
const val jsonMediaType = "application/json"
const val fhirMediaType = "application/fhir+json"
const val hl7V2MediaType = "application/hl7-v2"
const val oldApi = "/api/reports"
const val watersApi = "/api/waters"
const val tokenApi = "/api/token"
Expand Down Expand Up @@ -434,6 +435,11 @@ class HttpUtilities {
return responseCode to response
}
}

fun HttpStatus.isSuccessful(): Boolean {
val status = this.value()
return status in 200..299
}
}
}

Expand Down
90 changes: 47 additions & 43 deletions prime-router/src/main/kotlin/azure/ReportFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import com.microsoft.azure.functions.annotation.StorageAccount
import gov.cdc.prime.router.ActionError
import gov.cdc.prime.router.ActionLog
import gov.cdc.prime.router.ActionLogLevel
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.CustomerStatus
import gov.cdc.prime.router.InvalidParamMessage
import gov.cdc.prime.router.InvalidReportMessage
Expand All @@ -31,6 +30,7 @@ import gov.cdc.prime.router.Sender.ProcessingType
import gov.cdc.prime.router.SubmissionReceiver
import gov.cdc.prime.router.UniversalPipelineReceiver
import gov.cdc.prime.router.azure.BlobAccess.Companion.getBlobContainer
import gov.cdc.prime.router.azure.HttpUtilities.Companion.isSuccessful
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
Expand All @@ -44,7 +44,7 @@ import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7ACKUtils
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.history.DetailedSubmissionHistory
import gov.cdc.prime.router.history.azure.SubmissionsFacade
import gov.cdc.prime.router.tokens.AuthenticatedClaims
import gov.cdc.prime.router.tokens.Scope
Expand Down Expand Up @@ -75,10 +75,10 @@ class ReportFunction(
) : RequestFunction(workflowEngine),
Logging {

enum class IngestionMethod {
SFTP,
REST,
}
enum class IngestionMethod {
SFTP,
REST,
}

/**
* POST a report to the router
Expand Down Expand Up @@ -499,12 +499,6 @@ class ReportFunction(
request: HttpRequestMessage<String?>,
sender: Sender,
): HttpResponseMessage {
// check for ACK request
val maybeACKResponse = handleAckRequest(request)
if (maybeACKResponse != null) {
return maybeACKResponse
}

// determine if we should be following the sync or async workflow
val isAsync = processingType(request, sender) == ProcessingType.async
// allow duplicates 'override' param
Expand Down Expand Up @@ -611,19 +605,7 @@ class ReportFunction(
SubmissionsFacade.instance.findDetailedSubmissionHistory(txn, null, actionHistory.action)
}

val response = request.createResponseBuilder(httpStatus)
.header(HttpHeaders.CONTENT_TYPE, "application/json")
.body(
JacksonMapperUtilities.allowUnknownsMapper
.writeValueAsString(submission)
)
.header(
HttpHeaders.LOCATION,
request.uri.resolve(
"/api/waters/report/${submission?.reportId}/history"
).toString()
)
.build()
val response = buildResponse(request, httpStatus, submission, sender)

// queue messages here after all task / action records are in
actionHistory.queueMessages(workflowEngine)
Expand All @@ -644,34 +626,56 @@ class ReportFunction(
}
}

private fun handleAckRequest(request: HttpRequestMessage<String?>): HttpResponseMessage? {
// why does Azure handle Headers case-sensitive???
private fun buildResponse(
request: HttpRequestMessage<String?>,
responseStatus: HttpStatus,
submission: DetailedSubmissionHistory?,
sender: Sender,
): HttpResponseMessage {
return handleAckRequest(request, responseStatus, sender) ?: run {
request.createResponseBuilder(responseStatus)
.header(HttpHeaders.CONTENT_TYPE, "application/json")
.body(
JacksonMapperUtilities.allowUnknownsMapper
.writeValueAsString(submission)
)
.header(
HttpHeaders.LOCATION,
request.uri.resolve(
"/api/waters/report/${submission?.reportId}/history"
).toString()
)
.build()
}
}

private fun handleAckRequest(
request: HttpRequestMessage<String?>,
responseStatus: HttpStatus,
sender: Sender,
): HttpResponseMessage? {
// Azure handles all headers as lowercase
val contentType = request.headers[HttpHeaders.CONTENT_TYPE.lowercase()]
val requestBody = request.body
return if (contentType == "application/hl7-v2" && requestBody != null) {
return if (
sender.hl7AcknowledgementEnabled &&
responseStatus.isSuccessful() &&
contentType == HttpUtilities.hl7V2MediaType &&
requestBody != null
) {
try {
// Parse HL7 message
val maybeMessage = HL7Reader(ActionLogger())
.getMessages(requestBody)
.firstOrNull()

// Is the message an ACK?
if (maybeMessage != null && HL7Reader.isAckMessage(maybeMessage)) {
hl7ACKUtils.generateOutgoingACKMessageIfRequired(requestBody)?.let { responseBody ->
logger.info("Creating HL7 ACK response")
request.createResponseBuilder(HttpStatus.OK)
.header(HttpHeaders.CONTENT_TYPE, "application/hl7-v2")
.body(hl7ACKUtils.generateOutgoingACKMessage(maybeMessage))
request.createResponseBuilder(responseStatus)
.header(HttpHeaders.CONTENT_TYPE, HttpUtilities.hl7V2MediaType)
.body(responseBody)
.build()
} else {
logger.trace("Not an HL7 ACK message. Continuing.")
null
}
} catch (ex: Exception) {
logger.warn("Error checking for HL7 ACK. Continuing normal pipeline execution.", ex)
logger.warn("Error checking for HL7 ACK.", ex)
null
}
} else {
logger.trace("Not an HL7 ACK message. Continuing.")
null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gov.cdc.prime.router.fhirengine.translation.hl7.utils

import ca.uhn.hl7v2.model.Message
import ca.uhn.hl7v2.model.v251.message.ACK
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import java.time.Clock
Expand All @@ -17,7 +18,19 @@ class HL7ACKUtils(
private val clock: Clock = Clock.systemUTC(),
) {

fun generateOutgoingACKMessage(incomingACKMessage: Message): String {
fun generateOutgoingACKMessageIfRequired(rawHL7: String): String? {
val maybeMessage = HL7Reader(ActionLogger())
.getMessages(rawHL7)
.firstOrNull()

return if (maybeMessage != null && HL7Reader.isAckMessage(maybeMessage)) {
generateOutgoingACKMessage(maybeMessage)
} else {
null
}
}

private fun generateOutgoingACKMessage(incomingACKMessage: Message): String {
val outgoingAck = ACK()

val ackMsh = outgoingAck.msh
Expand Down
Loading

0 comments on commit f99f5f8

Please sign in to comment.