Skip to content

Commit

Permalink
fix: handle more events for transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
schoenenberg committed Feb 14, 2024
1 parent 0f63780 commit 5258ccb
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ subprojects {
maven {
name = "GitHubPackages"
url = uri("https://maven.pkg.github.com/truzzt/mds-ap3")
version = "0.2.4"
version = "0.2.5"
credentials {
username = System.getenv("USERNAME")
password = System.getenv("TOKEN")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.spi.EdcException;
Expand All @@ -28,6 +28,9 @@
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.Hostname;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
Expand Down Expand Up @@ -73,7 +76,28 @@ public void createProcess(ContractAgreement contractAgreement, URL clearingHouse

monitor.info("Creating Process in LoggingHouse");
var logMessage = new CreateProcessMessage(clearingHouseLogUrl, connectorBaseUrl, contractAgreement.getId(), processOwners);
dispatcherRegistry.dispatch(Object.class, logMessage);

try {
dispatcherRegistry.dispatch(Object.class, logMessage);
} catch (EdcException e) {
if (e.getMessage().startsWith("No provider dispatcher registered for protocol")) {
throw e;
} else {
monitor.severe("Unhandled exception while creating process in LoggingHouse. " + e.getMessage());
// Print stack trace
String errorStr;
try (StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {

e.printStackTrace(pw);
errorStr = sw.toString();

} catch (IOException ex) {
throw new RuntimeException("Error while converting the stacktrace");
}
monitor.severe(errorStr);
}
}
}

public void logContractAgreement(ContractAgreement contractAgreement, URL clearingHouseLogUrl) {
Expand Down Expand Up @@ -102,8 +126,10 @@ public <E extends Event> void on(EventEnvelope<E> event) {
// Log Contract Agreement
var extendedLogUrl = new URL(clearingHouseLogUrl + "/messages/log/" + pid);
logContractAgreement(contractAgreement, extendedLogUrl);
} else if (event.getPayload() instanceof TransferProcessTerminated transferProcessTerminated) {
var transferProcess = resolveTransferProcess(transferProcessTerminated);
} else if (event.getPayload() instanceof TransferProcessEvent transferProcessEvent) {
monitor.debug("Logging transfer event with id " + event.getId());

var transferProcess = resolveTransferProcess(transferProcessEvent);
var pid = transferProcess.getContractId();
var extendedUrl = new URL(clearingHouseLogUrl + "/messages/log/" + pid);
logTransferProcess(transferProcess, extendedUrl);
Expand All @@ -119,8 +145,8 @@ private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized
return Objects.requireNonNull(contractNegotiation).getContractAgreement();
}

private TransferProcess resolveTransferProcess(TransferProcessTerminated transferProcessTerminated) {
var transferProcessId = transferProcessTerminated.getTransferProcessId();
private TransferProcess resolveTransferProcess(TransferProcessEvent transferProcessEvent) {
var transferProcessId = transferProcessEvent.getTransferProcessId();
return transferProcessStore.findById(transferProcessId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@
import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender;
import de.fraunhofer.iais.eis.LogMessage;
import de.fraunhofer.iais.eis.RequestMessage;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAgreed;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessInitiated;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.connector.transfer.spi.event.*;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
Expand Down Expand Up @@ -139,9 +135,10 @@ private void registerEventSubscriber(ServiceExtensionContext context) {
eventRouter.registerSync(ContractNegotiationAgreed.class, eventSubscriber);
eventRouter.registerSync(ContractNegotiationAccepted.class, eventSubscriber);
eventRouter.registerSync(TransferProcessCompleted.class, eventSubscriber);
eventRouter.registerSync(TransferProcessTerminated.class, eventSubscriber);
eventRouter.registerSync(TransferProcessInitiated.class, eventSubscriber);
eventRouter.registerSync(TransferProcessStarted.class, eventSubscriber);
eventRouter.registerSync(TransferProcessFailed.class, eventSubscriber);
eventRouter.registerSync(TransferProcessTerminated.class, eventSubscriber);
context.registerService(IdsClearingHouseServiceImpl.class, eventSubscriber);

monitor.debug("Registered event subscriber for LoggingHouseClientExtension");
Expand All @@ -161,8 +158,8 @@ private void registerCommonTypes(TypeManager typeManager) {

typeManager.registerSerializer(TYPE_MANAGER_SERIALIZER_KEY, LogMessage.class,
new MultiContextJsonLdSerializer<>(LogMessage.class, CONTEXT_MAP));
typeManager.registerSerializer(TYPE_MANAGER_SERIALIZER_KEY, RequestMessage.class,
new MultiContextJsonLdSerializer<>(RequestMessage.class, CONTEXT_MAP));
// typeManager.registerSerializer(TYPE_MANAGER_SERIALIZER_KEY, RequestMessage.class,
// new MultiContextJsonLdSerializer<>(RequestMessage.class, CONTEXT_MAP));

monitor.debug("Registered serializers for LoggingHouseClientExtension");
}
Expand Down

0 comments on commit 5258ccb

Please sign in to comment.