Skip to content

Commit

Permalink
feat: mds requested properties
Browse files Browse the repository at this point in the history
  • Loading branch information
schoenenberg committed Mar 14, 2024
1 parent e8d2e06 commit a7594c3
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,9 @@ public void createProcess(ContractAgreement contractAgreement, URL clearingHouse
monitor.severe("Unhandled exception while creating process in LoggingHouse. " + e.getMessage());
// Print stack trace
String errorStr;
try (StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {

try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) {
e.printStackTrace(pw);
errorStr = sw.toString();

} catch (IOException ex) {
throw new RuntimeException("Error while converting the stacktrace");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,22 @@
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.monitor.Monitor;
import org.json.JSONObject;

import java.util.List;

public class LogMessageSender implements MultipartSenderDelegate<LogMessage, String> {

public LogMessageSender() {
Monitor monitor;
String connectorId;
AssetIndex assetIndex;

public LogMessageSender(Monitor monitor, AssetIndex assetIndex, String connectorId) {
this.monitor = monitor;
this.assetIndex = assetIndex;
this.connectorId = connectorId;
}

@Override
Expand Down Expand Up @@ -76,24 +85,51 @@ public Class<LogMessage> getMessageType() {
}

private String buildContractAgreementPayload(ContractAgreement contractAgreement) {
assert contractAgreement != null;

var jo = new JSONObject();
jo.put("AgreementId", contractAgreement.getId());
jo.put("ProviderId", contractAgreement.getProviderId());
jo.put("ConsumerId", contractAgreement.getConsumerId());

jo.put("Timestamp", CalendarUtil.gregorianNow().toString());
jo.put("ConnectorId", connectorId);

// Check if connector is the provider
if (contractAgreement.getProviderId().equals(connectorId)) {
// In case of the provider, log asset information
var asset = assetIndex.findById(contractAgreement.getAssetId());

if (asset == null) {
monitor.warning("Asset with id " + contractAgreement.getAssetId() + " not found in asset index.");
} else {
jo.put("AssetId", asset.getId());
jo.put("AssetName", asset.getName());
jo.put("AssetDescription", asset.getDescription());
jo.put("AssetVersion", asset.getVersion());
jo.put("AssetContentType", asset.getContentType());
jo.put("AssetProperties", asset.getProperties());
}
}

jo.put("ContractAgreementId", contractAgreement.getId());
jo.put("ContractProviderId", contractAgreement.getProviderId());
jo.put("ContractConsumerId", contractAgreement.getConsumerId());
jo.put("ContractSigningDate", contractAgreement.getContractSigningDate());
jo.put("Policy", contractAgreement.getPolicy());
jo.put("AssetId", contractAgreement.getAssetId());
jo.put("ContractPolicy", contractAgreement.getPolicy());
return jo.toString();
}

private String buildTransferProcessPayload(TransferProcess transferProcess) {
var jo = new JSONObject();
jo.put("transferProcessId", transferProcess.getId());
jo.put("transferState", transferProcess.stateAsString());

var dataRequest = transferProcess.getDataRequest();
jo.put("contractId", dataRequest.getContractId());
jo.put("connectorId", dataRequest.getConnectorId());
jo.put("Timestamp", CalendarUtil.gregorianNow().toString());
jo.put("ConnectorId", connectorId);

jo.put("TransferProcessId", transferProcess.getId());
jo.put("TransferState", transferProcess.stateAsString());
jo.put("TransferProtocol", transferProcess.getProtocol());
jo.put("TransferContractId", transferProcess.getContractId());
jo.put("TransferConnectorId", transferProcess.getConnectorId());
jo.put("TransferAssetId", transferProcess.getAssetId());

return jo.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAgreed;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationTerminated;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.transfer.spi.event.*;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessFailed;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessInitiated;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessRequested;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.event.EventRouter;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.iam.IdentityService;
Expand Down Expand Up @@ -62,6 +69,8 @@ public class LoggingHouseClientExtension implements ServiceExtension {
private ContractNegotiationStore contractNegotiationStore;
@Inject
private TransferProcessStore transferProcessStore;
@Inject
private AssetIndex assetIndex;

private static final Map<String, String> CONTEXT_MAP = Map.of(
"cat", "http://w3id.org/mds/data-categories#",
Expand All @@ -75,7 +84,6 @@ public class LoggingHouseClientExtension implements ServiceExtension {

private URL loggingHouseLogUrl;
public Monitor monitor;

private boolean enabled;


Expand All @@ -86,18 +94,18 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
monitor = context.getMonitor();
var extensionEnabled = context.getSetting(LOGGINGHOUSE_CLIENT_EXTENSION_ENABLED, true);
this.monitor = context.getMonitor();

var extensionEnabled = context.getSetting(LOGGINGHOUSE_CLIENT_EXTENSION_ENABLED, true);
if (!extensionEnabled) {
enabled = false;
monitor.info("Logginghouse client extension is disabled.");
this.enabled = false;
this.monitor.info("Logginghouse client extension is disabled.");
return;
}
enabled = true;
monitor.info("Logginghouse client extension is enabled.");
this.enabled = true;
this.monitor.info("Logginghouse client extension is enabled.");

loggingHouseLogUrl = readUrlFromSettings(context);
this.loggingHouseLogUrl = readUrlFromSettings(context);

registerSerializerClearingHouseMessages(context);
registerClearingHouseMessageSenders(context);
Expand Down Expand Up @@ -135,6 +143,7 @@ private void registerEventSubscriber(ServiceExtensionContext context) {
eventRouter.registerSync(ContractNegotiationFinalized.class, eventSubscriber);
eventRouter.registerSync(ContractNegotiationAgreed.class, eventSubscriber);
eventRouter.registerSync(ContractNegotiationAccepted.class, eventSubscriber);
eventRouter.registerSync(ContractNegotiationTerminated.class, eventSubscriber); // TODO: check pid
eventRouter.registerSync(TransferProcessRequested.class, eventSubscriber);
eventRouter.registerSync(TransferProcessInitiated.class, eventSubscriber);
eventRouter.registerSync(TransferProcessStarted.class, eventSubscriber);
Expand Down Expand Up @@ -170,10 +179,9 @@ private void registerClearingHouseMessageSenders(ServiceExtensionContext context
monitor.debug("Registering message senders for LoggingHouseClientExtension");

var httpClient = context.getService(EdcHttpClient.class);
var monitor = context.getMonitor();
var objectMapper = typeManager.getMapper(TYPE_MANAGER_SERIALIZER_KEY);

var logMessageSender = new LogMessageSender();
var logMessageSender = new LogMessageSender(monitor, assetIndex, context.getConnectorId());
var createProcessMessageSender = new CreateProcessMessageSender();

var idsMultipartSender = new IdsMultipartSender(monitor, httpClient, identityService, objectMapper);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
import com.truzzt.extension.logginghouse.client.LoggingHouseClientExtension;
/*
* Copyright (c) 2024 truzzt GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
*
*/

package com.truzzt.extension.logginghouse.client;

import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down

0 comments on commit a7594c3

Please sign in to comment.