Skip to content

Commit d7551d3

Browse files
authored
16148: send reports from the submissions directly to FHIR Convert (#16339)
* 16148: send reports from the submissions directly to FHIR Convert * fixup! 16148: send reports from the submissions directly to FHIR Convert * 16148: delete old FHIR receiver code * fixup! 16148: send reports from the submissions directly to FHIR Convert * fixup! 16148: send reports from the submissions directly to FHIR Convert * fixup! 16148: send reports from the submissions directly to FHIR Convert * fixup! 16148: send reports from the submissions directly to FHIR Convert * fixup! 16148: send reports from the submissions directly to FHIR Convert * fixup! 16148: send reports from the submissions directly to FHIR Convert * fixup! 16148: send reports from the submissions directly to FHIR Convert
1 parent 4ccf2a8 commit d7551d3

24 files changed

+913
-1729
lines changed

prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventService.kt

Lines changed: 108 additions & 63 deletions
Large diffs are not rendered by default.

prime-router/src/main/kotlin/cli/ProcessFhirCommands.kt

Lines changed: 134 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,22 @@ import gov.cdc.prime.router.Hl7Configuration
2121
import gov.cdc.prime.router.Metadata
2222
import gov.cdc.prime.router.MimeFormat
2323
import gov.cdc.prime.router.Receiver
24+
import gov.cdc.prime.router.Report
2425
import gov.cdc.prime.router.ReportStreamFilter
26+
import gov.cdc.prime.router.Topic
2527
import gov.cdc.prime.router.azure.BlobAccess
2628
import gov.cdc.prime.router.azure.ConditionStamper
2729
import gov.cdc.prime.router.azure.LookupTableConditionMapper
30+
import gov.cdc.prime.router.azure.db.enums.TaskAction
31+
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
32+
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
33+
import gov.cdc.prime.router.azure.observability.event.ItemEventData
34+
import gov.cdc.prime.router.azure.observability.event.ReportEventData
35+
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
36+
import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEventBuilder
37+
import gov.cdc.prime.router.azure.observability.event.ReportStreamItemProcessingErrorEventBuilder
38+
import gov.cdc.prime.router.azure.observability.event.ReportStreamReportEventBuilder
39+
import gov.cdc.prime.router.azure.observability.event.ReportStreamReportProcessingErrorEventBuilder
2840
import gov.cdc.prime.router.cli.CommandUtilities.Companion.abort
2941
import gov.cdc.prime.router.cli.helpers.HL7DiffHelper
3042
import gov.cdc.prime.router.common.Environment
@@ -374,12 +386,16 @@ class ProcessFhirCommands : CliktCommand(
374386
// this is just for logging so it is fine to just make it up
375387
UUID.randomUUID().toString()
376388
}
377-
val result = FHIRReceiverFilter().evaluateObservationConditionFilters(
378-
receiver,
379-
bundle,
380-
ActionLogger(),
381-
trackingId
382-
)
389+
// TODO: https://github.com/CDCgov/prime-reportstream/issues/16407
390+
val result =
391+
FHIRReceiverFilter(
392+
reportStreamEventService = NoopReportStreamEventService()
393+
).evaluateObservationConditionFilters(
394+
receiver,
395+
bundle,
396+
ActionLogger(),
397+
trackingId
398+
)
383399
if (result is ReceiverFilterEvaluationResult.Success) {
384400
return result.bundle
385401
} else {
@@ -848,4 +864,116 @@ class FhirPathCommand : CliktCommand(
848864
stringValue.append("\n}\n")
849865
return stringValue.toString()
850866
}
867+
}
868+
869+
// This exists only because ProcessFhirCommands instantiates a FHIRReceiverFilter to access a function that likely could be
870+
// made static
871+
// TODO: https://github.com/CDCgov/prime-reportstream/issues/16407
872+
class NoopReportStreamEventService : IReportStreamEventService {
873+
override fun sendQueuedEvents() {
874+
throw NotImplementedError()
875+
}
876+
877+
override fun sendReportEvent(
878+
eventName: ReportStreamEventName,
879+
childReport: Report,
880+
pipelineStepName: TaskAction,
881+
shouldQueue: Boolean,
882+
initializer: ReportStreamReportEventBuilder.() -> Unit,
883+
) {
884+
throw NotImplementedError()
885+
}
886+
887+
override fun sendReportEvent(
888+
eventName: ReportStreamEventName,
889+
childReport: ReportFile,
890+
pipelineStepName: TaskAction,
891+
shouldQueue: Boolean,
892+
initializer: ReportStreamReportEventBuilder.() -> Unit,
893+
) {
894+
throw NotImplementedError()
895+
}
896+
897+
override fun sendReportProcessingError(
898+
eventName: ReportStreamEventName,
899+
childReport: ReportFile,
900+
pipelineStepName: TaskAction,
901+
error: String,
902+
shouldQueue: Boolean,
903+
initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit,
904+
) {
905+
throw NotImplementedError()
906+
}
907+
908+
override fun sendReportProcessingError(
909+
eventName: ReportStreamEventName,
910+
childReport: Report,
911+
pipelineStepName: TaskAction,
912+
error: String,
913+
shouldQueue: Boolean,
914+
initializer: ReportStreamReportProcessingErrorEventBuilder.() -> Unit,
915+
) {
916+
throw NotImplementedError()
917+
}
918+
919+
override fun sendItemEvent(
920+
eventName: ReportStreamEventName,
921+
childReport: Report,
922+
pipelineStepName: TaskAction,
923+
shouldQueue: Boolean,
924+
initializer: ReportStreamItemEventBuilder.() -> Unit,
925+
) {
926+
throw NotImplementedError()
927+
}
928+
929+
override fun sendItemEvent(
930+
eventName: ReportStreamEventName,
931+
childReport: ReportFile,
932+
pipelineStepName: TaskAction,
933+
shouldQueue: Boolean,
934+
initializer: ReportStreamItemEventBuilder.() -> Unit,
935+
) {
936+
throw NotImplementedError()
937+
}
938+
939+
override fun sendItemProcessingError(
940+
eventName: ReportStreamEventName,
941+
childReport: ReportFile,
942+
pipelineStepName: TaskAction,
943+
error: String,
944+
shouldQueue: Boolean,
945+
initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit,
946+
) {
947+
throw NotImplementedError()
948+
}
949+
950+
override fun sendItemProcessingError(
951+
eventName: ReportStreamEventName,
952+
childReport: Report,
953+
pipelineStepName: TaskAction,
954+
error: String,
955+
shouldQueue: Boolean,
956+
initializer: ReportStreamItemProcessingErrorEventBuilder.() -> Unit,
957+
) {
958+
throw NotImplementedError()
959+
}
960+
961+
override fun getReportEventData(
962+
childReportId: UUID,
963+
childBodyUrl: String,
964+
parentReportId: UUID?,
965+
pipelineStepName: TaskAction,
966+
topic: Topic?,
967+
): ReportEventData {
968+
throw NotImplementedError()
969+
}
970+
971+
override fun getItemEventData(
972+
childItemIndex: Int,
973+
parentReportId: UUID,
974+
parentItemIndex: Int,
975+
trackingId: String?,
976+
): ItemEventData {
977+
throw NotImplementedError()
978+
}
851979
}

prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,32 @@ import com.microsoft.azure.functions.annotation.FunctionName
55
import com.microsoft.azure.functions.annotation.QueueTrigger
66
import com.microsoft.azure.functions.annotation.StorageAccount
77
import gov.cdc.prime.reportstream.shared.QueueMessage
8+
import gov.cdc.prime.reportstream.shared.Submission
89
import gov.cdc.prime.router.ActionLogger
910
import gov.cdc.prime.router.azure.ActionHistory
1011
import gov.cdc.prime.router.azure.DataAccessTransaction
1112
import gov.cdc.prime.router.azure.DatabaseAccess
1213
import gov.cdc.prime.router.azure.QueueAccess
14+
import gov.cdc.prime.router.azure.SubmissionTableService
1315
import gov.cdc.prime.router.azure.WorkflowEngine
1416
import gov.cdc.prime.router.azure.db.enums.TaskAction
17+
import gov.cdc.prime.router.azure.observability.event.AzureEventService
18+
import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl
1519
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
1620
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
21+
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService
1722
import gov.cdc.prime.router.common.BaseEngine
1823
import gov.cdc.prime.router.fhirengine.engine.FHIRConverter
1924
import gov.cdc.prime.router.fhirengine.engine.FHIRDestinationFilter
2025
import gov.cdc.prime.router.fhirengine.engine.FHIREngine
21-
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiver
2226
import gov.cdc.prime.router.fhirengine.engine.FHIRReceiverFilter
2327
import gov.cdc.prime.router.fhirengine.engine.FHIRTranslator
24-
import gov.cdc.prime.router.fhirengine.engine.FhirReceiveQueueMessage
28+
import gov.cdc.prime.router.fhirengine.engine.FhirConvertSubmissionQueueMessage
2529
import gov.cdc.prime.router.fhirengine.engine.PrimeRouterQueueMessage
2630
import gov.cdc.prime.router.fhirengine.engine.ReportPipelineMessage
31+
import gov.cdc.prime.router.fhirengine.engine.SubmissionSenderNotFound
32+
import gov.cdc.prime.router.history.db.ReportGraph
33+
import gov.cdc.prime.router.report.ReportService
2734
import org.apache.commons.lang3.StringUtils
2835
import org.apache.logging.log4j.kotlin.Logging
2936
import org.jooq.exception.DataAccessException
@@ -33,23 +40,41 @@ class FHIRFunctions(
3340
private val actionLogger: ActionLogger = ActionLogger(),
3441
private val databaseAccess: DatabaseAccess = BaseEngine.databaseAccessSingleton,
3542
private val queueAccess: QueueAccess = QueueAccess,
43+
private val submissionTableService: SubmissionTableService = SubmissionTableService.getInstance(),
44+
val reportService: ReportService = ReportService(ReportGraph(databaseAccess), databaseAccess),
45+
val azureEventService: AzureEventService = AzureEventServiceImpl(),
46+
val reportStreamEventService: ReportStreamEventService =
47+
ReportStreamEventService(databaseAccess, azureEventService, reportService),
3648
) : Logging {
3749

3850
/**
3951
* An azure function for ingesting and recording submissions
4052
*/
41-
@FunctionName("receive-fhir")
53+
@FunctionName("convert-from-submissions-fhir")
4254
@StorageAccount("AzureWebJobsStorage")
43-
fun receive(
44-
@QueueTrigger(name = "message", queueName = QueueMessage.elrReceiveQueueName)
55+
fun convertFromSubmissions(
56+
@QueueTrigger(name = "message", queueName = QueueMessage.elrSubmissionConvertQueueName)
4557
message: String,
4658
// Number of times this message has been dequeued
4759
@BindingName("DequeueCount") dequeueCount: Int = 1,
4860
) {
4961
logger.info(
50-
"message consumed from elr-fhir-receive queue"
62+
"message consumed from ${QueueMessage.elrSubmissionConvertQueueName} queue"
5163
)
52-
process(message, dequeueCount, FHIRReceiver(), ActionHistory(TaskAction.receive))
64+
process(
65+
message,
66+
dequeueCount,
67+
FHIRConverter(reportStreamEventService = reportStreamEventService),
68+
ActionHistory(TaskAction.convert)
69+
)
70+
val messageContent = readMessage("convert", message, dequeueCount)
71+
val tableEntity = Submission(
72+
messageContent.reportId.toString(),
73+
"Accepted",
74+
messageContent.blobURL,
75+
actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString()
76+
)
77+
submissionTableService.insertSubmission(tableEntity)
5378
}
5479

5580
/**
@@ -63,7 +88,12 @@ class FHIRFunctions(
6388
// Number of times this message has been dequeued
6489
@BindingName("DequeueCount") dequeueCount: Int = 1,
6590
) {
66-
process(message, dequeueCount, FHIRConverter(), ActionHistory(TaskAction.convert))
91+
process(
92+
message,
93+
dequeueCount,
94+
FHIRConverter(reportStreamEventService = reportStreamEventService),
95+
ActionHistory(TaskAction.convert)
96+
)
6797
}
6898

6999
/**
@@ -77,7 +107,12 @@ class FHIRFunctions(
77107
// Number of times this message has been dequeued
78108
@BindingName("DequeueCount") dequeueCount: Int = 1,
79109
) {
80-
process(message, dequeueCount, FHIRDestinationFilter(), ActionHistory(TaskAction.destination_filter))
110+
process(
111+
message,
112+
dequeueCount,
113+
FHIRDestinationFilter(reportStreamEventService = reportStreamEventService),
114+
ActionHistory(TaskAction.destination_filter)
115+
)
81116
}
82117

83118
/**
@@ -91,7 +126,12 @@ class FHIRFunctions(
91126
// Number of times this message has been dequeued
92127
@BindingName("DequeueCount") dequeueCount: Int = 1,
93128
) {
94-
process(message, dequeueCount, FHIRReceiverFilter(), ActionHistory(TaskAction.receiver_filter))
129+
process(
130+
message,
131+
dequeueCount,
132+
FHIRReceiverFilter(reportStreamEventService = reportStreamEventService),
133+
ActionHistory(TaskAction.receiver_filter)
134+
)
95135
}
96136

97137
/**
@@ -105,7 +145,12 @@ class FHIRFunctions(
105145
// Number of times this message has been dequeued
106146
@BindingName("DequeueCount") dequeueCount: Int = 1,
107147
) {
108-
process(message, dequeueCount, FHIRTranslator(), ActionHistory(TaskAction.translate))
148+
process(
149+
message,
150+
dequeueCount,
151+
FHIRTranslator(reportStreamEventService = reportStreamEventService),
152+
ActionHistory(TaskAction.translate)
153+
)
109154
}
110155

111156
/**
@@ -149,13 +194,27 @@ class FHIRFunctions(
149194
recordResults(message, actionHistory, txn)
150195
results
151196
}
152-
197+
reportStreamEventService.sendQueuedEvents()
153198
return newMessages
154199
} catch (ex: DataAccessException) {
155200
// This is the one exception type that we currently will allow for retrying as there are occasional
156201
// DB connectivity issues that are resolved without intervention
157202
logger.error(ex)
158203
throw ex
204+
} catch (ex: SubmissionSenderNotFound) {
205+
// This is a specific error case that can occur while handling a report via the new Submission service
206+
// In a situation that the sender is not found there is not enough information to record a report event
207+
// and we want a poison queue message to be immediately added so that the configuration can be fixed
208+
logger.error(ex)
209+
val tableEntity = Submission(
210+
ex.reportId.toString(),
211+
"Rejected",
212+
ex.blobURL,
213+
actionLogger.errors.takeIf { it.isNotEmpty() }?.map { it.detail.message }?.toString()
214+
)
215+
submissionTableService.insertSubmission(tableEntity)
216+
queueAccess.sendMessage("${messageContent.messageQueueName}-poison", message)
217+
return emptyList()
159218
} catch (ex: Exception) {
160219
// We're catching anything else that occurs because the most likely cause is a code or configuration error
161220
// that will not be resolved if the message is automatically retried
@@ -186,7 +245,7 @@ class FHIRFunctions(
186245

187246
return when (val queueMessage = QueueMessage.deserialize(message)) {
188247
is QueueMessage.ReceiveQueueMessage -> {
189-
FhirReceiveQueueMessage(
248+
FhirConvertSubmissionQueueMessage(
190249
queueMessage.reportId,
191250
queueMessage.blobURL,
192251
queueMessage.digest,

0 commit comments

Comments
 (0)