Skip to content

Commit

Permalink
HL7 retry DMQ and sender updates
Browse files Browse the repository at this point in the history
  • Loading branch information
pmanko committed Apr 27, 2024
1 parent a389bab commit 5396adb
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 30 deletions.
49 changes: 42 additions & 7 deletions src/lib/hl7MllpSender.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,65 @@
import { MllpServer } from '@i-tech-uw/mllp-server'
import logger from './winston'
import { WorkflowHandler, topicList } from '../workflows/botswana/workflowHandler'

export default class Hl7MllpSender {
targetIp: string
targetPort: number
mllpServer: MllpServer
retries: number
retryInterval: number

constructor(targetIp: string, targetPort: number) {
private static instance: Hl7MllpSender;

constructor(targetIp: string, targetPort: number, retries = 3, retryInterval = 10000) {
this.targetPort = targetPort
this.targetIp = targetIp
this.retries = retries
this.retryInterval = retryInterval
this.mllpServer = new MllpServer(targetIp, targetPort, logger)
}

public static getInstance(targetIp: string, targetPort: number): Hl7MllpSender {
if (!Hl7MllpSender.instance) {
Hl7MllpSender.instance = new Hl7MllpSender(targetIp, targetPort)
}
return Hl7MllpSender.instance
}


/**
*
* @returns Promise
*/
send(message: string, retries = 10): any {
send(message: string, targetIp?: string, port?: number, retries?: number): any {
if(!targetIp) {
targetIp = this.targetIp;
}

if(!port) {
port = this.targetPort;
}

if(!retries) {
retries = this.retries
}

message = message.replace(/[\n\r]/g, '\r')
const firstNewline = message.match(/\r/)
const header = firstNewline ? message.substring(0, firstNewline.index) : ''

return new Promise((resolve, reject) => {
this.mllpServer.send(this.targetIp, this.targetPort, message, (err: any, ackData: any) => {
this.mllpServer.send(targetIp, port, message, (err: any, ackData: any) => {

Check failure on line 52 in src/lib/hl7MllpSender.ts

View workflow job for this annotation

GitHub Actions / unit-test

Argument of type 'string | undefined' is not assignable to parameter of type 'string'.
logger.info(
`!! Sending HL7 message ${header}!\n err: ${err ? err : ''}\n ackData: ${
`Sending HL7 message ${header}!\n err: ${err ? err : ''}\n ackData: ${
ackData ? ackData : ''
}`,
)
if (err) {
reject({ error: err, retries: retries })
} else {
logger.info(
`!! Successfully sent HL7 message ${header} \n with ${retries} retries left!`,
`Successfully sent HL7 message ${header} \n with ${retries} retries left!`,
)
resolve(ackData)
}
Expand All @@ -44,11 +71,19 @@ export default class Hl7MllpSender {
.catch(e => {
if (e.retries > 0) {
logger.info(`Retrying... ${e.retries} retries left`)
return setTimeout(() => this.send(message, e.retries - 1), 2000)
return setTimeout(() => this.send(message, targetIp, port, retries - 1), this.retryInterval)

Check failure on line 74 in src/lib/hl7MllpSender.ts

View workflow job for this annotation

GitHub Actions / unit-test

'retries' is possibly 'undefined'.
} else {
logger.error(`!! Failed to send HL7 message ${header}!`)
logger.error(`Failed to send HL7 message ${header}!`)

// Send to DMQ
WorkflowHandler.sendPayload({message: message, targetIp: this.targetIp, port: port}, topicList.DMQ)

return e.error
}
})
}
}

const hl7Sender = Hl7MllpSender.getInstance('127.0.0.1', 3000);

export { hl7Sender };
8 changes: 5 additions & 3 deletions src/lib/kafkaConsumerUtil.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Consumer, EachBatchPayload, Kafka, KafkaConfig, Message } from 'kafkajs'
import logger from './winston'
import { WorkflowResult } from '../workflows/botswana/workflowHandler'
import { WorkflowHandler, WorkflowResult, topicList } from '../workflows/botswana/workflowHandler'

export type EachMessageCallback = (
topic: string,
Expand Down Expand Up @@ -87,11 +87,13 @@ export class KafkaConsumerUtil {
retryCount++
if (retryCount >= maxRetries) {
logger.error(
`Max retries reached for message ${message.offset}, sending to dead letter queue or similar.`,
`Max retries reached for message ${message.offset}, sending to dead message queue.`,
)
resolveOffset(message.offset)

// TODO: handle with DLQ
// Send to DMQ
WorkflowHandler.sendPayload({ topic: topic, partition: partition, message: message }, topicList.DMQ)

break
}
await new Promise(resolve => setTimeout(resolve, retryDelay))
Expand Down
6 changes: 2 additions & 4 deletions src/routes/hl7.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

import express, { Request, Response } from 'express'
import Hl7MllpSender from '../lib/hl7MllpSender'
import { hl7Sender } from '../lib/hl7MllpSender'

export const router = express.Router()

Expand All @@ -11,9 +11,7 @@ router.post('/forward/:targetIp/:targetPort', async (req: Request, res: Response
const targetIp: string = req.params.targetIp
const targetPort = Number(req.params.targetPort)

const sender = new Hl7MllpSender(targetIp, targetPort)

const ack = await sender.send(hl7Msg)
const ack = await hl7Sender.send(hl7Msg, targetIp, targetPort)

res.status(200)
res.send(ack)
Expand Down
24 changes: 10 additions & 14 deletions src/workflows/botswana/IpmsWorkflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { R4 } from '@ahryman40k/ts-fhir-types'
import config from '../../lib/config'
import logger from '../../lib/winston'
import { getTaskStatus, setTaskStatus } from './helpers'
import Hl7MllpSender from '../../lib/hl7MllpSender'
import { hl7Sender } from '../../lib/hl7MllpSender'
import Hl7WorkflowsBw from '../botswana/hl7Workflows'
import got from 'got'
import {
Expand Down Expand Up @@ -38,19 +38,17 @@ export async function sendAdtToIpms(labBundle: R4.IBundle): Promise<R4.IBundle>
if (status && status === R4.TaskStatusKind._requested) {
logger.info('Sending ADT message to IPMS!')

const sender = new Hl7MllpSender(
config.get('bwConfig:mllp:targetIp'),
config.get('bwConfig:mllp:targetAdtPort'),
)

const adtMessage = await Hl7WorkflowsBw.getFhirTranslationWithRetry(
labBundle,
config.get('bwConfig:toIpmsAdtTemplate'),
)

logger.info(`adt:\n${adtMessage}`)

const adtResult: string = <string>await sender.send(adtMessage)
const targetIp = config.get('bwConfig:mllp:targetIp')
const targetPort = config.get('bwConfig:mllp:targetAdtPort')

const adtResult: string = <string>await hl7Sender.send(adtMessage, targetIp, targetPort)

if (adtResult.includes && adtResult.includes('AA')) {
labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._received)
Expand Down Expand Up @@ -131,17 +129,15 @@ export async function sendOrmToIpms(bundles: any): Promise<R4.IBundle> {
config.get('bwConfig:toIpmsOrmTemplate'),
)

const sender = new Hl7MllpSender(
config.get('bwConfig:mllp:targetIp'),
config.get('bwConfig:mllp:targetOrmPort'),
)

const targetIp = config.get('bwConfig:mllp:targetIp')
const targetPort = config.get('bwConfig:mllp:targetOrmPort')

logger.info('Sending ORM message to IPMS!')

logger.info(`orm:\n${ormMessage}\n`)

if (ormMessage && ormMessage != '') {
const result: any = await sender.send(ormMessage)
const result: any = await hl7Sender.send(ormMessage, targetIp, targetPort)
if (result.includes('AA')) {
labBundle = setTaskStatus(labBundle, R4.TaskStatusKind._accepted)
}
Expand All @@ -163,7 +159,7 @@ export async function sendOrmToIpms(bundles: any): Promise<R4.IBundle> {
* @param registrationBundle - The registration bundle containing the patient information.
* @returns A Promise that resolves to the registration bundle.
*/
export async function handleAdtFromIpms(adtMessage: string): Promise<any> {
export async function handleAdtFromIpms(adtMessage: string, sender?: Hl7MllpSender): Promise<any> {

Check failure on line 162 in src/workflows/botswana/IpmsWorkflows.ts

View workflow job for this annotation

GitHub Actions / unit-test

Cannot find name 'Hl7MllpSender'.
try {
const registrationBundle: R4.IBundle = await Hl7WorkflowsBw.translateBundle(
adtMessage,
Expand Down
1 change: 0 additions & 1 deletion src/workflows/botswana/hl7Workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ export default class Hl7WorkflowsBw {
try {
WorkflowHandler.sendPayloadWithRetryDMQ({ message: hl7Msg }, topicList.HANDLE_ADT_FROM_IPMS)
} catch (error: any) {
// TODO: Major Error - send to DMQ or handle otherwise
logger.error(`Could not translate and save ADT message!\n${JSON.stringify(error)}`)
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/workflows/botswana/workflowHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export const topicList = {
SAVE_IPMS_PATIENT: 'save-ipms-patient',
HANDLE_ORU_FROM_IPMS: 'handle-oru-from-ipms',
HANDLE_ADT_FROM_IPMS: 'handle-adt-from-ipms',
DMQ: 'dmq'
}

/**
Expand Down Expand Up @@ -298,7 +299,7 @@ export class WorkflowHandler {
if (error && attempt === maxRetries) {
logger.error(`All retries failed. Sending payload to DMQ!`)
try {
logger.error('TODO: Implement DMQ!:\n' + JSON.stringify(payload))
WorkflowHandler.sendPayload({ payload: payload, topic: topic, error: error }, topicList.DMQ)
} catch (dmqError) {
logger.error(`Failed to send payload to DMQ: ${dmqError}`)
throw new Error(`Failed to send payload to DMQ: ${dmqError}`)
Expand Down

0 comments on commit 5396adb

Please sign in to comment.