Skip to content

Commit

Permalink
EPA-282
Browse files Browse the repository at this point in the history
* CetpEventMapper implementation is added
* CETPServerHandler is extended from AbstractCETPEventHandler
  • Loading branch information
alex-kontcur committed Oct 17, 2024
1 parent e64f8e4 commit 7779f97
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 177 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#NOTE: This is a Maven Resolver internal implementation file, its format can be changed without prior notice.
#Wed Oct 16 22:40:04 AMT 2024
#Thu Oct 17 14:12:52 AMT 2024
lib-cetp-1.0.0-SNAPSHOT-sources.jar>=
lib-cetp-1.0.0-SNAPSHOT.jar>=
lib-cetp-1.0.0-SNAPSHOT.pom>=
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,26 @@
<groupId>de.servicehealth</groupId>
<artifactId>lib-cetp</artifactId>
<versioning>
<lastUpdated>20241016183958</lastUpdated>
<lastUpdated>20241017101246</lastUpdated>
<snapshot>
<localCopy>true</localCopy>
</snapshot>
<snapshotVersions>
<snapshotVersion>
<extension>pom</extension>
<value>1.0.0-SNAPSHOT</value>
<updated>20241016183958</updated>
<updated>20241017101246</updated>
</snapshotVersion>
<snapshotVersion>
<extension>jar</extension>
<value>1.0.0-SNAPSHOT</value>
<updated>20241016183958</updated>
<updated>20241017101246</updated>
</snapshotVersion>
<snapshotVersion>
<classifier>sources</classifier>
<extension>jar</extension>
<value>1.0.0-SNAPSHOT</value>
<updated>20241016183958</updated>
<updated>20241017101246</updated>
</snapshotVersion>
</snapshotVersions>
</versioning>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
<versions>
<version>1.0.0-SNAPSHOT</version>
</versions>
<lastUpdated>20241016183958</lastUpdated>
<lastUpdated>20241017101246</lastUpdated>
</versioning>
</metadata>
190 changes: 68 additions & 122 deletions src/main/java/health/ere/ps/service/cetp/CETPServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import de.gematik.ws.conn.eventservice.v7.Event;
import de.health.service.cetp.AbstractCETPEventHandler;
import de.health.service.cetp.cardlink.CardlinkWebsocketClient;
import de.servicehealth.config.api.IUserConfigurations;
import health.ere.ps.config.RuntimeConfig;
import health.ere.ps.service.cetp.tracker.TrackerService;
import health.ere.ps.service.gematik.PharmacyService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import jakarta.json.Json;
import jakarta.json.JsonArrayBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Identifier;
import org.hl7.fhir.r4.model.Task;
import org.jboss.logging.MDC;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.UUID;
import java.util.logging.Level;
Expand All @@ -26,13 +25,12 @@

import static de.health.service.cetp.utils.Utils.printException;

public class CETPServerHandler extends ChannelInboundHandlerAdapter {
public class CETPServerHandler extends AbstractCETPEventHandler {

private static final Logger log = Logger.getLogger(CETPServerHandler.class.getName());

TrackerService trackerService;
PharmacyService pharmacyService;
CardlinkWebsocketClient cardlinkWebsocketClient;

IParser parser = FhirContext.forR4().newXmlParser();

Expand All @@ -41,130 +39,85 @@ public CETPServerHandler(
PharmacyService pharmacyService,
CardlinkWebsocketClient cardlinkWebsocketClient
) {
super(cardlinkWebsocketClient);
this.trackerService = trackerService;
this.pharmacyService = pharmacyService;
this.cardlinkWebsocketClient = cardlinkWebsocketClient;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
protected String getTopicName() {
return "CARD/INSERTED";
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
String correlationId = UUID.randomUUID().toString();
MDC.put("requestCorrelationId", correlationId); // Keep MDC name in snyc with virtual-nfc-cardlink
cardlinkWebsocketClient.connect();

@SuppressWarnings("unchecked")
Pair<Event, IUserConfigurations> input = (Pair<Event, IUserConfigurations>) msg;
Event event = input.getKey();

if (event.getTopic().equals("CARD/INSERTED")) {
final Map<String, String> eventMap = event.getMessage().getParameter().stream()
.collect(Collectors.toMap(Event.Message.Parameter::getKey, Event.Message.Parameter::getValue));

// Keep MDC names in sync with virtual-nfc-cardlink
MDC.put("iccsn", eventMap.getOrDefault("ICCSN", "NoICCSNProvided"));
MDC.put("ctid", eventMap.getOrDefault("CtID", "NoCtIDProvided"));
MDC.put("slot", eventMap.getOrDefault("SlotID", "NoSlotIDProvided"));
log.fine("CARD/INSERTED event received with the following payload: %s".formatted(eventMap));

if ("EGK".equalsIgnoreCase(eventMap.get("CardType")) && eventMap.containsKey("CardHandle") && eventMap.containsKey("SlotID") && eventMap.containsKey("CtID")) {
String cardHandle = eventMap.get("CardHandle");
Integer slotId = Integer.parseInt(eventMap.get("SlotID"));
String ctId = eventMap.get("CtID");
String iccsn = eventMap.get("ICCSN");
Long endTime = System.currentTimeMillis();


String paramsStr = event.getMessage().getParameter().stream()
.filter(p -> !p.getKey().equals("CardHolderName"))
.map(p -> String.format("key=%s value=%s", p.getKey(), p.getValue())).collect(Collectors.joining(", "));

log.fine(String.format("[%s] Card inserted: params: %s", correlationId, paramsStr));
try {
IUserConfigurations uc = input.getValue();
RuntimeConfig runtimeConfig = new RuntimeConfig(uc);
Pair<Bundle, String> pair = pharmacyService.getEPrescriptionsForCardHandle(
correlationId, cardHandle, null, runtimeConfig
);
Bundle bundle = pair.getKey();
String eventId = pair.getValue();
String xml = parser.encodeToString(bundle);
cardlinkWebsocketClient.sendJson(correlationId, iccsn, "eRezeptTokensFromAVS", Map.of("slotId", slotId, "ctId", ctId, "tokens", xml));

JsonArrayBuilder bundles = prepareBundles(correlationId, bundle, runtimeConfig);
cardlinkWebsocketClient.sendJson(correlationId, iccsn, "eRezeptBundlesFromAVS", Map.of("slotId", slotId, "ctId", ctId, "bundles", bundles));

cardlinkWebsocketClient.sendJson(correlationId, iccsn, "vsdmSensorData", Map.of("slotId", slotId, "ctId", ctId, "endTime", endTime, "eventId", eventId));

trackerService.submit(ctId, uc.getMandantId(), uc.getWorkplaceId(), uc.getClientSystemId());
} catch (Exception e ) {
log.log(Level.WARNING, String.format("[%s] Could not get prescription for Bundle", correlationId), e);

if (e instanceof de.gematik.ws.conn.vsds.vsdservice.v5.FaultMessage faultMessage) {
String code = faultMessage.getFaultInfo().getTrace().get(0).getCode().toString();
cardlinkWebsocketClient.sendJson(correlationId, iccsn, "vsdmSensorData", Map.of("slotId", slotId, "ctId", ctId, "endTime", endTime, "err", code));
}
if (e instanceof de.gematik.ws.conn.eventservice.wsdl.v7.FaultMessage faultMessage) {
String code = faultMessage.getFaultInfo().getTrace().get(0).getCode().toString();
cardlinkWebsocketClient.sendJson(correlationId, iccsn, "vsdmSensorData", Map.of("slotId", slotId, "ctId", ctId, "endTime", endTime, "err", code));
}

String error = printException(e);
cardlinkWebsocketClient.sendJson(
correlationId,
iccsn,
"receiveTasklistError",
Map.of("slotId", slotId, "cardSessionId", "null", "status", 500, "tistatus", "500", "errormessage", error)
);
}
} else {
String msgFormat = "Ignored \"CARD/INSERTED\" event=%s: values=%s";
log.log(Level.INFO, String.format(msgFormat, event.getMessage(), eventMap));
protected void processEvent(IUserConfigurations uc, Map<String, String> paramsMap) {
// Keep MDC names in sync with virtual-nfc-cardlink
String correlationId = UUID.randomUUID().toString();
MDC.put("requestCorrelationId", correlationId);
MDC.put("iccsn", paramsMap.getOrDefault("ICCSN", "NoICCSNProvided"));
MDC.put("ctid", paramsMap.getOrDefault("CtID", "NoCtIDProvided"));
MDC.put("slot", paramsMap.getOrDefault("SlotID", "NoSlotIDProvided"));
log.fine("CARD/INSERTED event received with the following payload: %s".formatted(paramsMap));

if ("EGK".equalsIgnoreCase(paramsMap.get("CardType")) && paramsMap.containsKey("CardHandle") && paramsMap.containsKey("SlotID") && paramsMap.containsKey("CtID")) {
String cardHandle = paramsMap.get("CardHandle");
Integer slotId = Integer.parseInt(paramsMap.get("SlotID"));
String ctId = paramsMap.get("CtID");
String iccsn = paramsMap.get("ICCSN");
Long endTime = System.currentTimeMillis();


String paramsStr = paramsMap.entrySet().stream()
.filter(p -> !p.getKey().equals("CardHolderName"))
.map(p -> String.format("key=%s value=%s", p.getKey(), p.getValue())).collect(Collectors.joining(", "));

log.fine(String.format("[%s] Card inserted: params: %s", correlationId, paramsStr));
try {
RuntimeConfig runtimeConfig = new RuntimeConfig(uc);
Pair<Bundle, String> pair = pharmacyService.getEPrescriptionsForCardHandle(
correlationId, cardHandle, null, runtimeConfig
);
Bundle bundle = pair.getKey();
String eventId = pair.getValue();
String xml = parser.encodeToString(bundle);
cardlinkWebsocketClient.sendJson(correlationId, iccsn, "eRezeptTokensFromAVS", Map.of("slotId", slotId, "ctId", ctId, "tokens", xml));

JsonArrayBuilder bundles = prepareBundles(correlationId, bundle, runtimeConfig);
cardlinkWebsocketClient.sendJson(correlationId, iccsn, "eRezeptBundlesFromAVS", Map.of("slotId", slotId, "ctId", ctId, "bundles", bundles));

cardlinkWebsocketClient.sendJson(correlationId, iccsn, "vsdmSensorData", Map.of("slotId", slotId, "ctId", ctId, "endTime", endTime, "eventId", eventId));

trackerService.submit(ctId, uc.getMandantId(), uc.getWorkplaceId(), uc.getClientSystemId());
} catch (Exception e ) {
log.log(Level.WARNING, String.format("[%s] Could not get prescription for Bundle", correlationId), e);

if (e instanceof de.gematik.ws.conn.vsds.vsdservice.v5.FaultMessage faultMessage) {
String code = faultMessage.getFaultInfo().getTrace().get(0).getCode().toString();
cardlinkWebsocketClient.sendJson(correlationId, iccsn, "vsdmSensorData", Map.of("slotId", slotId, "ctId", ctId, "endTime", endTime, "err", code));
}
if (e instanceof de.gematik.ws.conn.eventservice.wsdl.v7.FaultMessage faultMessage) {
String code = faultMessage.getFaultInfo().getTrace().get(0).getCode().toString();
cardlinkWebsocketClient.sendJson(correlationId, iccsn, "vsdmSensorData", Map.of("slotId", slotId, "ctId", ctId, "endTime", endTime, "err", code));
}
}

} finally {
cardlinkWebsocketClient.close();
MDC.clear();
}
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (log.isLoggable(Level.FINE)) {
String port = "unknown";
if (ctx.channel().localAddress() instanceof InetSocketAddress inetSocketAddress) {
port = String.valueOf(inetSocketAddress.getPort());
}
log.fine(String.format("New CETP connection established (on port %s)", port));
}
super.channelRegistered(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (log.isLoggable(Level.FINE)) {
String port = "unknown";
if (ctx.channel().localAddress() instanceof InetSocketAddress inetSocketAddress) {
port = String.valueOf(inetSocketAddress.getPort());
String error = printException(e);
cardlinkWebsocketClient.sendJson(
correlationId,
iccsn,
"receiveTasklistError",
Map.of("slotId", slotId, "cardSessionId", "null", "status", 500, "tistatus", "500", "errormessage", error)
);
}
log.fine(String.format("CETP connection was closed (on port %s)", port));
} else {
String msgFormat = "Ignored \"CARD/INSERTED\" values=%s";
log.log(Level.INFO, String.format(msgFormat, paramsMap));
}
super.channelUnregistered(ctx);
}

private JsonArrayBuilder prepareBundles(String correlationId, Bundle bundle, RuntimeConfig runtimeConfig) {
JsonArrayBuilder bundles = Json.createArrayBuilder();
for (BundleEntryComponent entry : bundle.getEntry()) {
if (entry.getResource() instanceof org.hl7.fhir.r4.model.Task) {
if (entry.getResource() instanceof Task task) {
/*
* <identifier>
* <use value="official"/>
Expand All @@ -178,9 +131,8 @@ private JsonArrayBuilder prepareBundles(String correlationId, Bundle bundle, Run
* </identifier>
*/

org.hl7.fhir.r4.model.Task task = (org.hl7.fhir.r4.model.Task) entry.getResource();
String taskId = task.getIdentifier().stream().filter(t -> "https://gematik.de/fhir/erp/NamingSystem/GEM_ERP_NS_PrescriptionId".equals(t.getSystem())).map(t -> t.getValue()).findAny().orElse(null);
String accessCode = task.getIdentifier().stream().filter(t -> "https://gematik.de/fhir/erp/NamingSystem/GEM_ERP_NS_AccessCode".equals(t.getSystem())).map(t -> t.getValue()).findAny().orElse(null);
String taskId = task.getIdentifier().stream().filter(t -> "https://gematik.de/fhir/erp/NamingSystem/GEM_ERP_NS_PrescriptionId".equals(t.getSystem())).map(Identifier::getValue).findAny().orElse(null);
String accessCode = task.getIdentifier().stream().filter(t -> "https://gematik.de/fhir/erp/NamingSystem/GEM_ERP_NS_AccessCode".equals(t.getSystem())).map(Identifier::getValue).findAny().orElse(null);
log.fine("TaskId: " + taskId + " AccessCode: " + accessCode);
String token = "/Task/" + taskId + "/$accept?ac=" + accessCode;
try {
Expand All @@ -193,10 +145,4 @@ private JsonArrayBuilder prepareBundles(String correlationId, Bundle bundle, Run
}
return bundles;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.log(Level.SEVERE, "Caught an exception handling CETP input", cause);
ctx.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public CETPServerHandlerFactory(
}

@Override
public ChannelInboundHandler build(KonnektorConfig kc) {
public ChannelInboundHandler[] build(KonnektorConfig kc) {
CardlinkWebsocketClient cardlinkWebsocketClient = new CardlinkWebsocketClient(
kc.getCardlinkEndpoint(),
new EreJwtConfigurator(
Expand All @@ -56,6 +56,8 @@ public ChannelInboundHandler build(KonnektorConfig kc) {
kc.getCardlinkEndpoint(),
cardlinkWebsocketClient.connected()
);
return new CETPServerHandler(trackerService, pharmacyService, cardlinkWebsocketClient);
return new CETPServerHandler[] {
new CETPServerHandler(trackerService, pharmacyService, cardlinkWebsocketClient)
};
}
}
30 changes: 14 additions & 16 deletions src/main/java/health/ere/ps/service/cetp/codec/CETPDecoder.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package health.ere.ps.service.cetp.codec;

import de.gematik.ws.conn.eventservice.v7.Event;
import de.health.service.cetp.domain.eventservice.event.DecodeResult;
import de.health.service.cetp.domain.eventservice.event.mapper.CetpEventMapper;
import de.servicehealth.config.api.IUserConfigurations;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import jakarta.xml.bind.JAXBContext;
import jakarta.xml.bind.JAXBException;
import org.apache.commons.lang3.tuple.Pair;

import java.io.StringReader;
import java.nio.charset.StandardCharsets;
Expand All @@ -20,45 +21,42 @@ public class CETPDecoder extends ByteToMessageDecoder {
private static final Logger log = Logger.getLogger(CETPDecoder.class.getName());

static JAXBContext jaxbContext;

static {
try {
jaxbContext = JAXBContext.newInstance(Event.class);
} catch (JAXBException e) {
log.log(Level.SEVERE, "Failed to create JAXB context", e);
}
}
IUserConfigurations userConfigurations;

public CETPDecoder() {
IUserConfigurations configurations;
CetpEventMapper eventMapper;

public CETPDecoder() {
}

public CETPDecoder(IUserConfigurations userConfigurations) {
this.userConfigurations = userConfigurations;
public CETPDecoder(IUserConfigurations configurations, CetpEventMapper eventMapper) {
this.configurations = configurations;
this.eventMapper = eventMapper;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if(!in.isReadable(4)) {
return;
}
byte[] header = new byte[4];
if (!in.isReadable(4)) {
return;
}
byte[] header = new byte[4];
in.readBytes(header);

if(header[0] != 'C' || header[1] != 'E' || header[2] != 'T' || header[3] != 'P') {
if (header[0] != 'C' || header[1] != 'E' || header[2] != 'T' || header[3] != 'P') {
throw new IllegalArgumentException("Invalid CETP header");
}

int lengthOfMessage = in.readInt();

String message = in.readCharSequence(lengthOfMessage, StandardCharsets.UTF_8).toString();

log.info(message);

try {
Event eventType = (Event) jaxbContext.createUnmarshaller().unmarshal(new StringReader(message));
out.add(Pair.of(eventType, userConfigurations));
out.add(new DecodeResult(eventMapper.toDomain(eventType), configurations));
} catch (JAXBException e) {
log.log(Level.SEVERE, "Failed to unmarshal CETP message", e);
}
Expand Down
Loading

0 comments on commit 7779f97

Please sign in to comment.