diff --git a/logging-house-client/build.gradle.kts b/logging-house-client/build.gradle.kts index 07c9610..d7b41a7 100644 --- a/logging-house-client/build.gradle.kts +++ b/logging-house-client/build.gradle.kts @@ -14,6 +14,8 @@ val jsonVersion: String by project dependencies { implementation("${edcGroup}:control-plane-core:${edcVersion}") implementation("${edcGroup}:http-spi:${edcVersion}") + implementation("${edcGroup}:sql-core:${edcVersion}") + implementation("${edcGroup}:transaction-datasource-spi:${edcVersion}") implementation("com.squareup.okhttp3:okhttp:${okHttpVersion}") implementation("org.json:json:${jsonVersion}") @@ -22,6 +24,9 @@ dependencies { implementation("de.fraunhofer.iais.eis.ids.infomodel:infomodel-java:1.0.2-basecamp") implementation("de.fraunhofer.iais.eis.ids.infomodel:infomodel-util:1.0.2-basecamp") + implementation("org.postgresql:postgresql:42.4.5") + implementation("org.flywaydb:flyway-core:9.0.1") + testImplementation("org.assertj:assertj-core:${assertj}") testImplementation("org.junit.jupiter:junit-jupiter-api:${jupiterVersion}") testImplementation("org.mockito:mockito-core:${mockitoVersion}") diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ConfigConstants.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ConfigConstants.java new file mode 100644 index 0000000..c2d4477 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ConfigConstants.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 truzzt GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * truzzt GmbH - Initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client; + +public class ConfigConstants { + + static final String LOGGINGHOUSE_ENABLED = "edc.logginghouse.client.enabled"; + + static final String LOGGINGHOUSE_SERVER_URL_SETTING = "edc.logginghouse.client.server.url"; + + static final String LOGGINGHOUSE_FLYWAY_REPAIR_SETTING = "edc.logginghouse.client.flyway.repair"; + + static final String LOGGINGHOUSE_FLYWAY_CLEAN_SETTING = "edc.logginghouse.client.flyway.clean"; + + static final String LOGGINGHOUSE_EXTENSION_MAX_WORKERS = "edc.logginghouse.client.workers.max"; + + static final String LOGGINGHOUSE_EXTENSION_WORKERS_DELAY = "edc.logginghouse.client.workers.delay"; + + static final String LOGGINGHOUSE_EXTENSION_WORKERS_PERIOD = "edc.logginghouse.client.workers.period"; +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java deleted file mode 100644 index 9b6ccf2..0000000 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright (c) 2022 sovity GmbH - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * sovity GmbH - initial API and implementation - * - */ - -package com.truzzt.extension.logginghouse.client; - -import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; -import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; -import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; -import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; -import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; -import org.eclipse.edc.spi.EdcException; -import org.eclipse.edc.spi.event.Event; -import org.eclipse.edc.spi.event.EventEnvelope; -import org.eclipse.edc.spi.event.EventSubscriber; -import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; -import org.eclipse.edc.spi.monitor.Monitor; -import org.eclipse.edc.spi.system.Hostname; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -public class IdsClearingHouseServiceImpl implements EventSubscriber { - - private final RemoteMessageDispatcherRegistry dispatcherRegistry; - private final URI connectorBaseUrl; - private final URL clearingHouseLogUrl; - private final ContractNegotiationStore contractNegotiationStore; - private final TransferProcessStore transferProcessStore; - private final Monitor monitor; - - public IdsClearingHouseServiceImpl( - RemoteMessageDispatcherRegistry dispatcherRegistry, - Hostname hostname, - URL clearingHouseLogUrl, - ContractNegotiationStore contractNegotiationStore, - TransferProcessStore transferProcessStore, - Monitor monitor) { - this.dispatcherRegistry = dispatcherRegistry; - this.clearingHouseLogUrl = clearingHouseLogUrl; - this.contractNegotiationStore = contractNegotiationStore; - this.transferProcessStore = transferProcessStore; - this.monitor = monitor; - - try { - connectorBaseUrl = getConnectorBaseUrl(hostname); - } catch (URISyntaxException e) { - throw new EdcException("Could not create connectorBaseUrl. Hostname can be set using:" + - " edc.hostname", e); - } - } - - public void createProcess(ContractAgreement contractAgreement, URL clearingHouseLogUrl) { - // Create PID - List processOwners = new ArrayList<>(); - processOwners.add(contractAgreement.getConsumerId()); - processOwners.add(contractAgreement.getProviderId()); - - monitor.info("Creating Process in LoggingHouse"); - var logMessage = new CreateProcessMessage(clearingHouseLogUrl, connectorBaseUrl, contractAgreement.getId(), processOwners); - - try { - dispatcherRegistry.dispatch(Object.class, logMessage); - } catch (EdcException e) { - if (e.getMessage().startsWith("No provider dispatcher registered for protocol")) { - throw e; - } else { - monitor.severe("Unhandled exception while creating process in LoggingHouse. " + e.getMessage()); - // Print stack trace - String errorStr; - try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { - e.printStackTrace(pw); - errorStr = sw.toString(); - } catch (IOException ex) { - throw new RuntimeException("Error while converting the stacktrace"); - } - monitor.severe(errorStr); - } - } - } - - public void logContractAgreement(ContractAgreement contractAgreement, URL clearingHouseLogUrl) { - monitor.info("Logging ContractAgreement to LoggingHouse with contract id: " + contractAgreement.getId()); - var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, contractAgreement); - dispatcherRegistry.dispatch(Object.class, logMessage); - } - - public void logTransferProcess(TransferProcess transferProcess, URL clearingHouseLogUrl) { - monitor.info("Logging TransferProcess to LoggingHouse"); - var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, transferProcess); - dispatcherRegistry.dispatch(Object.class, logMessage); - } - - @Override - public void on(EventEnvelope event) { - try { - if (event.getPayload() instanceof ContractNegotiationFinalized contractNegotiationFinalized) { - var contractAgreement = resolveContractAgreement(contractNegotiationFinalized); - var pid = contractAgreement.getId(); - - // Create Process - var extendedProcessUrl = new URL(clearingHouseLogUrl + "/process/" + pid); - try { - createProcess(contractAgreement, extendedProcessUrl); - } catch (Exception e) { - monitor.warning("Could not create process in LoggingHouse: " + e.getMessage()); - } - - // Log Contract Agreement - var extendedLogUrl = new URL(clearingHouseLogUrl + "/messages/log/" + pid); - try { - logContractAgreement(contractAgreement, extendedLogUrl); - } catch (Exception e) { - monitor.warning("Could not log ContractNegotiation to LoggingHouse: " + e.getMessage()); - } - } else if (event.getPayload() instanceof TransferProcessEvent transferProcessEvent) { - monitor.debug("Logging transfer event with id " + event.getId()); - - var transferProcess = resolveTransferProcess(transferProcessEvent); - var pid = transferProcess.getContractId(); - var extendedUrl = new URL(clearingHouseLogUrl + "/messages/log/" + pid); - try { - logTransferProcess(transferProcess, extendedUrl); - } catch (Exception e) { - monitor.warning("Could not log TransferProcess to LoggingHouse: " + e.getMessage()); - } - } - } catch (MalformedURLException e) { - throw new EdcException("Could not create extended clearinghouse url."); - } - } - - private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) throws NullPointerException { - var contractNegotiationId = contractNegotiationFinalized.getContractNegotiationId(); - var contractNegotiation = contractNegotiationStore.findById(contractNegotiationId); - return Objects.requireNonNull(contractNegotiation).getContractAgreement(); - } - - private TransferProcess resolveTransferProcess(TransferProcessEvent transferProcessEvent) { - var transferProcessId = transferProcessEvent.getTransferProcessId(); - return transferProcessStore.findById(transferProcessId); - } - - private URI getConnectorBaseUrl(Hostname hostname) throws URISyntaxException { - return new URI(String.format("https://%s/", hostname.get())); - } -} \ No newline at end of file diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java index cd0b8ba..82efc7a 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java @@ -8,19 +8,30 @@ * SPDX-License-Identifier: Apache-2.0 * * Contributors: + * truzzt GmbH - Initial implementation * */ package com.truzzt.extension.logginghouse.client; -import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender; +import com.truzzt.extension.logginghouse.client.events.LoggingHouseEventSubscriber; +import com.truzzt.extension.logginghouse.client.events.messages.CreateProcessMessageSender; +import com.truzzt.extension.logginghouse.client.events.messages.LogMessageSender; +import com.truzzt.extension.logginghouse.client.flyway.FlywayService; +import com.truzzt.extension.logginghouse.client.flyway.connection.DatasourceProperties; +import com.truzzt.extension.logginghouse.client.flyway.migration.DatabaseMigrationManager; +import com.truzzt.extension.logginghouse.client.multipart.IdsMultipartClearingRemoteMessageDispatcher; +import com.truzzt.extension.logginghouse.client.multipart.MultiContextJsonLdSerializer; +import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartSender; +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.store.sql.SqlLoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.store.sql.schema.postgres.PostgresDialectStatements; +import com.truzzt.extension.logginghouse.client.worker.LoggingHouseWorkersManager; +import com.truzzt.extension.logginghouse.client.worker.WorkersExecutor; import de.fraunhofer.iais.eis.LogMessage; import de.fraunhofer.iais.eis.RequestMessage; -import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted; -import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAgreed; import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; -import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationTerminated; import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessFailed; @@ -29,8 +40,9 @@ import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.runtime.metamodel.annotation.Requires; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.asset.AssetIndex; import org.eclipse.edc.spi.event.EventRouter; @@ -42,16 +54,51 @@ import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; import java.net.MalformedURLException; import java.net.URL; +import java.time.Duration; import java.util.Map; +import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_ENABLED; +import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_EXTENSION_MAX_WORKERS; +import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_EXTENSION_WORKERS_DELAY; +import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_EXTENSION_WORKERS_PERIOD; +import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_FLYWAY_CLEAN_SETTING; +import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_FLYWAY_REPAIR_SETTING; +import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_SERVER_URL_SETTING; + +@Extension(value = LoggingHouseClientExtension.NAME) +@Requires(value = { + Hostname.class, + + TypeManager.class, + EventRouter.class, + IdentityService.class, + RemoteMessageDispatcherRegistry.class, + + DataSourceRegistry.class, + TransactionContext.class, + QueryExecutor.class, + + ContractNegotiationStore.class, + TransferProcessStore.class, + AssetIndex.class +}) public class LoggingHouseClientExtension implements ServiceExtension { - - public static final String LOGGINGHOUSE_CLIENT_EXTENSION = "LoggingHouseClientExtension"; + public static final String NAME = "LoggingHouseClientExtension"; private static final String TYPE_MANAGER_SERIALIZER_KEY = "ids-clearinghouse"; + private static final Map CONTEXT_MAP = Map.of( + "cat", "http://w3id.org/mds/data-categories#", + "ids", "https://w3id.org/idsa/core/", + "idsc", "https://w3id.org/idsa/code/"); + + @Inject + private Hostname hostname; @Inject private TypeManager typeManager; @@ -63,7 +110,11 @@ public class LoggingHouseClientExtension implements ServiceExtension { private RemoteMessageDispatcherRegistry dispatcherRegistry; @Inject - private Hostname hostname; + private DataSourceRegistry dataSourceRegistry; + @Inject + private TransactionContext transactionContext; + @Inject + private QueryExecutor queryExecutor; @Inject private ContractNegotiationStore contractNegotiationStore; @@ -72,85 +123,96 @@ public class LoggingHouseClientExtension implements ServiceExtension { @Inject private AssetIndex assetIndex; - private static final Map CONTEXT_MAP = Map.of( - "cat", "http://w3id.org/mds/data-categories#", - "ids", "https://w3id.org/idsa/core/", - "idsc", "https://w3id.org/idsa/code/"); - @Setting - public static final String LOGGINGHOUSE_LOG_URL_SETTING = "edc.logginghouse.extension.url"; - - @Setting - public static final String LOGGINGHOUSE_CLIENT_EXTENSION_ENABLED = "edc.logginghouse.extension.enabled"; - - private URL loggingHouseLogUrl; public Monitor monitor; private boolean enabled; - + private URL loggingHouseLogUrl; + private LoggingHouseWorkersManager workersManager; @Override public String name() { - return LOGGINGHOUSE_CLIENT_EXTENSION; + return NAME; } @Override public void initialize(ServiceExtensionContext context) { - this.monitor = context.getMonitor(); + monitor = context.getMonitor(); - var extensionEnabled = context.getSetting(LOGGINGHOUSE_CLIENT_EXTENSION_ENABLED, true); + var extensionEnabled = context.getSetting(LOGGINGHOUSE_ENABLED, true); if (!extensionEnabled) { - this.enabled = false; - this.monitor.info("Logginghouse client extension is disabled."); + enabled = false; + monitor.info("Logginghouse client extension is disabled."); return; } - this.enabled = true; - this.monitor.info("Logginghouse client extension is enabled."); + enabled = true; + monitor.info("Logginghouse client extension is enabled."); + + loggingHouseLogUrl = readUrlFromSettings(context); - this.loggingHouseLogUrl = readUrlFromSettings(context); + runFlywayMigrations(context); registerSerializerClearingHouseMessages(context); - registerClearingHouseMessageSenders(context); - registerEventSubscriber(context); + var store = initializeLoggingHouseMessageStore(typeManager); + registerEventSubscriber(context, store); + + registerDispatcher(context); + workersManager = initializeWorkersManager(context, store); } private URL readUrlFromSettings(ServiceExtensionContext context) { try { - var urlString = context.getSetting(LoggingHouseClientExtension.LOGGINGHOUSE_LOG_URL_SETTING, null); + var urlString = context.getSetting(LOGGINGHOUSE_SERVER_URL_SETTING, null); if (urlString == null) { - throw new EdcException(String.format("Could not initialize " + - "LoggingHouseClientExtension: " + - "No url specified using setting %s", LoggingHouseClientExtension.LOGGINGHOUSE_LOG_URL_SETTING)); + throw new EdcException(String.format("Could not initialize LoggingHouseClientExtension: " + + "No url specified using setting %s", LOGGINGHOUSE_SERVER_URL_SETTING)); } return new URL(urlString); } catch (MalformedURLException e) { throw new EdcException(String.format("Could not parse setting %s to Url", - LoggingHouseClientExtension.LOGGINGHOUSE_LOG_URL_SETTING), e); + LOGGINGHOUSE_SERVER_URL_SETTING), e); } } - private void registerEventSubscriber(ServiceExtensionContext context) { + private void runFlywayMigrations(ServiceExtensionContext context) { + var flywayService = new FlywayService( + context.getMonitor(), + context.getSetting(LOGGINGHOUSE_FLYWAY_REPAIR_SETTING, false), + context.getSetting(LOGGINGHOUSE_FLYWAY_CLEAN_SETTING, false) + ); + var migrationManager = new DatabaseMigrationManager(context.getConfig(), context.getMonitor(), flywayService); + migrationManager.migrate(); + } + + private SqlLoggingHouseMessageStore initializeLoggingHouseMessageStore(TypeManager typeManager) { + return new SqlLoggingHouseMessageStore( + dataSourceRegistry, + DatasourceProperties.LOGGING_HOUSE_DATASOURCE, + transactionContext, + typeManager.getMapper(), + new PostgresDialectStatements(), + queryExecutor + ); + } + + private void registerEventSubscriber(ServiceExtensionContext context, LoggingHouseMessageStore loggingHouseMessageStore) { monitor.debug("Registering event subscriber for LoggingHouseClientExtension"); - var eventSubscriber = new IdsClearingHouseServiceImpl( - dispatcherRegistry, - hostname, - loggingHouseLogUrl, + var eventSubscriber = new LoggingHouseEventSubscriber( + loggingHouseMessageStore, contractNegotiationStore, transferProcessStore, monitor); eventRouter.registerSync(ContractNegotiationFinalized.class, eventSubscriber); - eventRouter.registerSync(ContractNegotiationAgreed.class, eventSubscriber); - eventRouter.registerSync(ContractNegotiationAccepted.class, eventSubscriber); - eventRouter.registerSync(ContractNegotiationTerminated.class, eventSubscriber); // TODO: check pid + eventRouter.registerSync(TransferProcessRequested.class, eventSubscriber); eventRouter.registerSync(TransferProcessInitiated.class, eventSubscriber); eventRouter.registerSync(TransferProcessStarted.class, eventSubscriber); eventRouter.registerSync(TransferProcessCompleted.class, eventSubscriber); eventRouter.registerSync(TransferProcessFailed.class, eventSubscriber); eventRouter.registerSync(TransferProcessTerminated.class, eventSubscriber); - context.registerService(IdsClearingHouseServiceImpl.class, eventSubscriber); + context.registerService(LoggingHouseEventSubscriber.class, eventSubscriber); monitor.debug("Registered event subscriber for LoggingHouseClientExtension"); } @@ -175,8 +237,23 @@ private void registerCommonTypes(TypeManager typeManager) { monitor.debug("Registered serializers for LoggingHouseClientExtension"); } - private void registerClearingHouseMessageSenders(ServiceExtensionContext context) { - monitor.debug("Registering message senders for LoggingHouseClientExtension"); + private LoggingHouseWorkersManager initializeWorkersManager(ServiceExtensionContext context, LoggingHouseMessageStore store) { + var periodSeconds = context.getSetting(LOGGINGHOUSE_EXTENSION_WORKERS_DELAY, 30); + var initialDelaySeconds = context.getSetting(LOGGINGHOUSE_EXTENSION_WORKERS_PERIOD, 10); + var executor = new WorkersExecutor(Duration.ofSeconds(periodSeconds), Duration.ofSeconds(initialDelaySeconds), monitor); + + return new LoggingHouseWorkersManager(executor, + monitor, + context.getSetting(LOGGINGHOUSE_EXTENSION_MAX_WORKERS, 1), + store, + dispatcherRegistry, + hostname, + loggingHouseLogUrl + ); + } + + private void registerDispatcher(ServiceExtensionContext context) { + monitor.debug("Registering IDS dispatch sender for LoggingHouseClientExtension"); var httpClient = context.getService(EdcHttpClient.class); var objectMapper = typeManager.getMapper(TYPE_MANAGER_SERIALIZER_KEY); @@ -192,7 +269,6 @@ private void registerClearingHouseMessageSenders(ServiceExtensionContext context dispatcherRegistry.register(dispatcher); } - @Override public void start() { if (!enabled) { @@ -200,6 +276,8 @@ public void start() { } else { monitor.info("Starting Logginghouse client extension."); } + + workersManager.execute(); } @Override diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/LoggingHouseEventSubscriber.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/LoggingHouseEventSubscriber.java new file mode 100644 index 0000000..25fe50e --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/LoggingHouseEventSubscriber.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2022 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package com.truzzt.extension.logginghouse.client.events; + +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessageStatus; +import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; +import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; +import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; +import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.spi.event.Event; +import org.eclipse.edc.spi.event.EventEnvelope; +import org.eclipse.edc.spi.event.EventSubscriber; +import org.eclipse.edc.spi.monitor.Monitor; + +import java.time.ZonedDateTime; +import java.util.Objects; + +public class LoggingHouseEventSubscriber implements EventSubscriber { + + private final LoggingHouseMessageStore loggingHouseMessageStore; + private final ContractNegotiationStore contractNegotiationStore; + private final TransferProcessStore transferProcessStore; + private final Monitor monitor; + + public LoggingHouseEventSubscriber( + LoggingHouseMessageStore loggingHouseMessageStore, + ContractNegotiationStore contractNegotiationStore, + TransferProcessStore transferProcessStore, + Monitor monitor) { + this.loggingHouseMessageStore = loggingHouseMessageStore; + this.contractNegotiationStore = contractNegotiationStore; + this.transferProcessStore = transferProcessStore; + this.monitor = monitor; + } + + @Override + public void on(EventEnvelope event) { + if (event.getPayload() instanceof ContractNegotiationFinalized contractNegotiationFinalized) { + monitor.debug("Storing ContractNegotiationFinalized event with id " + event.getId()); + + var contractAgreement = resolveContractAgreement(contractNegotiationFinalized); + try { + storeContractAgreement(contractAgreement); + } catch (Exception e) { + monitor.warning("Could not store ContractNegotiation: " + e.getMessage()); + } + } else if (event.getPayload() instanceof TransferProcessEvent transferProcessEvent) { + monitor.debug("Storing TransferProcess event with id " + event.getId()); + + var transferProcess = resolveTransferProcess(transferProcessEvent); + try { + storeTransferProcess(transferProcess); + } catch (Exception e) { + monitor.warning("Could not store TransferProcess: " + e.getMessage()); + } + } + } + + private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) { + var contractNegotiationId = contractNegotiationFinalized.getContractNegotiationId(); + var contractNegotiation = contractNegotiationStore.findById(contractNegotiationId); + return Objects.requireNonNull(contractNegotiation).getContractAgreement(); + } + + private TransferProcess resolveTransferProcess(TransferProcessEvent transferProcessEvent) { + var transferProcessId = transferProcessEvent.getTransferProcessId(); + return transferProcessStore.findById(transferProcessId); + } + + public void storeContractAgreement(ContractAgreement contractAgreement) { + monitor.info("Storing ContractAgreement to send to LoggingHouse"); + + var message = LoggingHouseMessage.Builder.newInstance() + .eventType(contractAgreement.getClass()) + .eventId(contractAgreement.getId()) + .eventToLog(contractAgreement) + .createProcess(true) + .processId(contractAgreement.getId()) + .consumerId(contractAgreement.getConsumerId()) + .providerId(contractAgreement.getProviderId()) + .status(LoggingHouseMessageStatus.PENDING) + .createdAt(ZonedDateTime.now()) + .build(); + loggingHouseMessageStore.save(message); + } + + public void storeTransferProcess(TransferProcess transferProcess) { + monitor.info("Storing TransferProcess to send to LoggingHouse"); + + var message = LoggingHouseMessage.Builder.newInstance() + .eventType(transferProcess.getClass()) + .eventId(transferProcess.getId()) + .eventToLog(transferProcess) + .createProcess(false) + .processId(transferProcess.getContractId()) + .status(LoggingHouseMessageStatus.PENDING) + .createdAt(ZonedDateTime.now()) + .build(); + loggingHouseMessageStore.save(message); + } +} \ No newline at end of file diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessage.java similarity index 85% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessage.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessage.java index 88854ba..0bbf02c 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessage.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessage.java @@ -12,8 +12,9 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.events.messages; +import com.truzzt.extension.logginghouse.client.multipart.ExtendedMessageProtocolClearing; import org.eclipse.edc.spi.types.domain.message.RemoteMessage; import java.net.URI; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessageSender.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessageSender.java similarity index 74% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessageSender.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessageSender.java index ee3c71d..c7040ad 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessageSender.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessageSender.java @@ -12,15 +12,15 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.events.messages; -import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd; -import com.truzzt.extension.logginghouse.client.ids.multipart.CalendarUtil; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsConstants; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartParts; -import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartResponse; -import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartSenderDelegate; -import com.truzzt.extension.logginghouse.client.ids.multipart.ResponseUtil; +import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartParts; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartResponse; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartSenderDelegate; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.ResponseUtil; import de.fraunhofer.iais.eis.DynamicAttributeToken; import de.fraunhofer.iais.eis.Message; import de.fraunhofer.iais.eis.MessageProcessedNotificationMessageImpl; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessage.java similarity index 85% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessage.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessage.java index e28d206..823597a 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessage.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessage.java @@ -13,8 +13,9 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.events.messages; +import com.truzzt.extension.logginghouse.client.multipart.ExtendedMessageProtocolClearing; import org.eclipse.edc.spi.types.domain.message.RemoteMessage; import java.net.URI; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessageSender.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java similarity index 87% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessageSender.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java index b5a9139..8371ab1 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessageSender.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java @@ -12,15 +12,15 @@ * */ -package com.truzzt.extension.logginghouse.client; - -import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd; -import com.truzzt.extension.logginghouse.client.ids.multipart.CalendarUtil; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsConstants; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartParts; -import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartResponse; -import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartSenderDelegate; -import com.truzzt.extension.logginghouse.client.ids.multipart.ResponseUtil; +package com.truzzt.extension.logginghouse.client.events.messages; + +import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartParts; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartResponse; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartSenderDelegate; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.ResponseUtil; import de.fraunhofer.iais.eis.DynamicAttributeToken; import de.fraunhofer.iais.eis.LogMessageBuilder; import de.fraunhofer.iais.eis.Message; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java new file mode 100644 index 0000000..4863dd9 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2023 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.flyway; + +import com.truzzt.extension.logginghouse.client.flyway.connection.DatasourceProperties; +import com.truzzt.extension.logginghouse.client.flyway.connection.DriverManagerConnectionFactory; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.sql.datasource.ConnectionFactoryDataSource; +import org.flywaydb.core.Flyway; +import org.flywaydb.core.api.FlywayException; +import org.flywaydb.core.api.MigrationVersion; +import org.flywaydb.core.api.output.MigrateResult; +import org.flywaydb.core.api.output.RepairResult; + +import java.util.List; +import javax.sql.DataSource; + +public class FlywayService { + + private static final String MIGRATION_LOCATION_BASE = "classpath:migration"; + private static final String MIGRATION_TABLE_NAME = "flyway_schema_history_logginghouse"; + + private final Monitor monitor; + private final boolean tryRepairOnFailedMigration; + private final boolean clean; + + public FlywayService(Monitor monitor, boolean tryRepairOnFailedMigration, boolean clean) { + this.monitor = monitor; + this.tryRepairOnFailedMigration = tryRepairOnFailedMigration; + this.clean = clean; + } + + public void cleanDatabase(DatasourceProperties datasourceProperties) { + if (clean) { + monitor.info("Running flyway clean."); + var flyway = setupFlyway(datasourceProperties); + flyway.clean(); + } + } + + public void migrateDatabase(DatasourceProperties datasourceProperties) { + var flyway = setupFlyway(datasourceProperties); + flyway.info().getInfoResult().migrations.stream() + .map(migration -> "Found migration: %s".formatted(migration.filepath)) + .forEach(monitor::info); + + try { + var migrateResult = flyway.migrate(); + handleFlywayMigrationResult(migrateResult); + } catch (FlywayException e) { + if (tryRepairOnFailedMigration) { + repairAndRetryMigration(flyway); + } else { + throw new EdcPersistenceException("Flyway migration failed", e); + } + } + } + + private void repairAndRetryMigration(Flyway flyway) { + try { + var repairResult = flyway.repair(); + handleFlywayRepairResult(repairResult); + var migrateResult = flyway.migrate(); + handleFlywayMigrationResult(migrateResult); + } catch (FlywayException e) { + throw new EdcPersistenceException("Flyway migration failed", e); + } + } + + private void handleFlywayRepairResult(RepairResult repairResult) { + if (!repairResult.repairActions.isEmpty()) { + var repairActions = String.join(", ", repairResult.repairActions); + monitor.info("Repair actions: %s".formatted(repairActions)); + } + + if (!repairResult.warnings.isEmpty()) { + var warnings = String.join(", ", repairResult.warnings); + throw new EdcPersistenceException("Repairing failed: %s".formatted(warnings)); + } + } + + private Flyway setupFlyway(DatasourceProperties datasourceProperties) { + var dataSource = getDataSource(datasourceProperties); + var migrationLocations = List.of(MIGRATION_LOCATION_BASE); + return Flyway.configure() + .baselineVersion(MigrationVersion.fromVersion("0.0.0")) + .baselineOnMigrate(true) + .failOnMissingLocations(true) + .dataSource(dataSource) + .table(MIGRATION_TABLE_NAME) + .locations(migrationLocations.toArray(new String[0])) + .cleanDisabled(!clean) + .load(); + } + + private DataSource getDataSource(DatasourceProperties datasourceProperties) { + var connectionFactory = new DriverManagerConnectionFactory(datasourceProperties); + return new ConnectionFactoryDataSource(connectionFactory); + } + + private void handleFlywayMigrationResult(MigrateResult migrateResult) { + if (migrateResult.migrationsExecuted > 0) { + monitor.info(String.format( + "Successfully migrated database from version %s to version %s", + migrateResult.initialSchemaVersion, + migrateResult.targetSchemaVersion)); + } else { + monitor.info(String.format( + "No migration necessary. Current version is %s", + migrateResult.initialSchemaVersion)); + } + } + +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java new file mode 100644 index 0000000..543eac3 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2023 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.flyway.connection; + +import org.eclipse.edc.spi.system.configuration.Config; + +public class DatasourceProperties { + public static final String LOGGING_HOUSE_DATASOURCE = "logginghouse"; + + private static final String DATASOURCE_SETTING_NAME = "edc.datasource.logginghouse.name"; + private static final String DATASOURCE_SETTING_JDBC_URL = "edc.datasource.logginghouse.url"; + private static final String DATASOURCE_SETTING_USER = "edc.datasource.logginghouse.user"; + private static final String DATASOURCE_SETTING_PASSWORD = "edc.datasource.logginghouse.password"; + + private final String name; + private final String jdbcUrl; + private final String user; + private final String password; + + public DatasourceProperties(Config config) { + name = config.getString(DATASOURCE_SETTING_NAME); + jdbcUrl = config.getString(DATASOURCE_SETTING_JDBC_URL); + user = config.getString(DATASOURCE_SETTING_USER); + password = config.getString(DATASOURCE_SETTING_PASSWORD); + } + + public String getName() { + return name; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DriverManagerConnectionFactory.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DriverManagerConnectionFactory.java new file mode 100644 index 0000000..f1b1a53 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DriverManagerConnectionFactory.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2023 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.flyway.connection; + +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.sql.ConnectionFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +public class DriverManagerConnectionFactory implements ConnectionFactory { + + private static final String CONNECTION_PROPERTY_USER = "user"; + private static final String CONNECTION_PROPERTY_PASSWORD = "password"; + + private final DatasourceProperties datasourceProperties; + + public DriverManagerConnectionFactory(DatasourceProperties datasourceProperties) { + this.datasourceProperties = datasourceProperties; + } + + @Override + public Connection create() { + try { + var properties = new Properties(); + properties.setProperty(CONNECTION_PROPERTY_USER, datasourceProperties.getUser()); + properties.setProperty(CONNECTION_PROPERTY_PASSWORD, datasourceProperties.getPassword()); + return DriverManager.getConnection(datasourceProperties.getJdbcUrl(), properties); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/migration/DatabaseMigrationManager.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/migration/DatabaseMigrationManager.java new file mode 100644 index 0000000..81aab02 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/migration/DatabaseMigrationManager.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.flyway.migration; + +import com.truzzt.extension.logginghouse.client.flyway.FlywayService; +import com.truzzt.extension.logginghouse.client.flyway.connection.DatasourceProperties; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.system.configuration.Config; + +public class DatabaseMigrationManager { + private final Config config; + private final Monitor monitor; + private final FlywayService flywayService; + + public DatabaseMigrationManager(Config config, Monitor monitor, FlywayService flywayService) { + this.config = config; + this.monitor = monitor; + this.flywayService = flywayService; + } + + public void migrate() { + var datasourceProperties = new DatasourceProperties(config); + monitor.info("Using datasource %s to apply flyway migrations".formatted(datasourceProperties.getName())); + + flywayService.cleanDatabase(datasourceProperties); + flywayService.migrateDatabase(datasourceProperties); + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ExtendedMessageProtocolClearing.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ExtendedMessageProtocolClearing.java similarity index 92% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ExtendedMessageProtocolClearing.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ExtendedMessageProtocolClearing.java index 71ba1fc..e32cec8 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ExtendedMessageProtocolClearing.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ExtendedMessageProtocolClearing.java @@ -13,7 +13,7 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.multipart; public final class ExtendedMessageProtocolClearing { diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsMultipartClearingRemoteMessageDispatcher.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java similarity index 73% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsMultipartClearingRemoteMessageDispatcher.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java index 6f757cc..3e124ad 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsMultipartClearingRemoteMessageDispatcher.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java @@ -12,10 +12,10 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.multipart; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartRemoteMessageDispatcher; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartRemoteMessageDispatcher; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartSender; public class IdsMultipartClearingRemoteMessageDispatcher extends IdsMultipartRemoteMessageDispatcher { diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/MultiContextJsonLdSerializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/MultiContextJsonLdSerializer.java similarity index 98% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/MultiContextJsonLdSerializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/MultiContextJsonLdSerializer.java index 985dfb2..721cf55 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/MultiContextJsonLdSerializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/MultiContextJsonLdSerializer.java @@ -13,7 +13,7 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.multipart; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.core.JsonGenerator; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLd.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLd.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLd.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLd.java index c542ae0..5693a3e 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLd.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLd.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.DeserializationFeature; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLdModule.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLdModule.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLdModule.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLdModule.java index a5c4bfe..fa73508 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLdModule.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLdModule.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.databind.module.SimpleModule; import java.net.URI; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriDeserializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriDeserializer.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriDeserializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriDeserializer.java index cf684c0..e644d4f 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriDeserializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriDeserializer.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriSerializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriSerializer.java similarity index 94% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriSerializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriSerializer.java index a2e94dc..3858ccf 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriSerializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriSerializer.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarDeserializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarDeserializer.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarDeserializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarDeserializer.java index 116c11c..4279934 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarDeserializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarDeserializer.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarSerializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarSerializer.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarSerializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarSerializer.java index d42744f..9ad1cf6 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarSerializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarSerializer.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/CalendarUtil.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/CalendarUtil.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/CalendarUtil.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/CalendarUtil.java index 7a7b8bf..b338de9 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/CalendarUtil.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/CalendarUtil.java @@ -13,7 +13,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import java.time.ZonedDateTime; import java.util.GregorianCalendar; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsConstants.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsConstants.java similarity index 89% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsConstants.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsConstants.java index 7a927e8..3950c21 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsConstants.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsConstants.java @@ -13,7 +13,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; public final class IdsConstants { diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartParts.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartParts.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartParts.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartParts.java index 72bcfdf..5be1da3 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartParts.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartParts.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartRemoteMessageDispatcher.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartRemoteMessageDispatcher.java similarity index 97% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartRemoteMessageDispatcher.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartRemoteMessageDispatcher.java index 7342e56..ea74ef6 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartRemoteMessageDispatcher.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartRemoteMessageDispatcher.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartSender.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartSender.java similarity index 97% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartSender.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartSender.java index 3677494..a3ec49f 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartSender.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartSender.java @@ -14,7 +14,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import com.fasterxml.jackson.databind.ObjectMapper; import de.fraunhofer.iais.eis.DynamicAttributeToken; @@ -219,7 +219,7 @@ protected IdsMultipartParts extractResponseParts(ResponseBody body) throws Excep public void checkResponseType(MultipartResponse response, MultipartSenderDelegate senderDelegate) { var type = senderDelegate.getAllowedResponseTypes(); if (!type.contains(response.header().getClass())) { - throw new EdcException(String.format("Received %s but expected %s.", response.header().getClass(), type)); + throw new EdcException(format("Received %s but expected %s.", response.header().getClass(), type)); } } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartResponse.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartResponse.java similarity index 91% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartResponse.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartResponse.java index 83b2ce5..818c599 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartResponse.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartResponse.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import de.fraunhofer.iais.eis.Message; import org.jetbrains.annotations.NotNull; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartSenderDelegate.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartSenderDelegate.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartSenderDelegate.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartSenderDelegate.java index b5e8bdb..adfe1c3 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartSenderDelegate.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartSenderDelegate.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import de.fraunhofer.iais.eis.DynamicAttributeToken; import de.fraunhofer.iais.eis.Message; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/ResponseUtil.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/ResponseUtil.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/ResponseUtil.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/ResponseUtil.java index 493cc6e..63090ab 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/ResponseUtil.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/ResponseUtil.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import com.fasterxml.jackson.databind.ObjectMapper; import de.fraunhofer.iais.eis.Message; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java new file mode 100644 index 0000000..9351161 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 truzzt GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * truzzt GmbH - Initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.spi.store; + +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; + +import java.util.List; + +public interface LoggingHouseMessageStore { + + void save(LoggingHouseMessage message); + + List listPending(); + + void updateSent(long id); +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java new file mode 100644 index 0000000..e416e48 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2024 truzzt GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * truzzt GmbH - Initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.spi.types; + +import java.time.ZonedDateTime; +import java.util.Objects; + +public class LoggingHouseMessage { + private Long id; + private Class eventType; + private String eventId; + private Object eventToLog; + private boolean createProcess; + private String processId; + private String consumerId; + private String providerId; + private LoggingHouseMessageStatus status; + private ZonedDateTime createdAt; + + public Long getId() { + return id; + } + public Class getEventType() { + return eventType; + } + public String getEventId() { + return eventId; + } + public Object getEventToLog() { + return eventToLog; + } + public boolean getCreateProcess() { + return createProcess; + } + public String getProcessId() { + return processId; + } + public String getConsumerId() { + return consumerId; + } + public String getProviderId() { + return providerId; + } + public LoggingHouseMessageStatus getStatus() { + return status; + } + public ZonedDateTime getCreatedAt() { + return createdAt; + } + + public static final class Builder { + private final LoggingHouseMessage event = new LoggingHouseMessage(); + + private Builder() { + } + + public static LoggingHouseMessage.Builder newInstance() { + return new LoggingHouseMessage.Builder(); + } + + public LoggingHouseMessage.Builder id(Long id) { + this.event.id = id; + return this; + } + public LoggingHouseMessage.Builder eventType(Class eventType) { + this.event.eventType = eventType; + return this; + } + public LoggingHouseMessage.Builder eventId(String eventId) { + this.event.eventId = eventId; + return this; + } + public LoggingHouseMessage.Builder eventToLog(Object eventToLog) { + this.event.eventToLog = eventToLog; + return this; + } + public LoggingHouseMessage.Builder createProcess(boolean createProcess) { + this.event.createProcess = createProcess; + return this; + } + public LoggingHouseMessage.Builder processId(String processId) { + this.event.processId = processId; + return this; + } + public LoggingHouseMessage.Builder consumerId(String consumerId) { + this.event.consumerId = consumerId; + return this; + } + public LoggingHouseMessage.Builder providerId(String providerId) { + this.event.providerId = providerId; + return this; + } + public LoggingHouseMessage.Builder status(LoggingHouseMessageStatus status) { + this.event.status = status; + return this; + } + public LoggingHouseMessage.Builder createdAt(ZonedDateTime createdAt) { + this.event.createdAt = createdAt; + return this; + } + + public LoggingHouseMessage build() { + Objects.requireNonNull(this.event.eventType, "Message eventType must not be null"); + Objects.requireNonNull(this.event.eventId, "Message eventId must not be null"); + Objects.requireNonNull(this.event.eventToLog, "Message eventToLog must not be null"); + Objects.requireNonNull(this.event.createProcess, "Message createProcess must not be null"); + Objects.requireNonNull(this.event.processId, "Message processId must not be null"); + Objects.requireNonNull(this.event.status, "Message status must not be null"); + Objects.requireNonNull(this.event.createdAt, "Message createdAt must not be null"); + return this.event; + } + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessageStatus.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessageStatus.java new file mode 100644 index 0000000..5b3fdee --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessageStatus.java @@ -0,0 +1,28 @@ +package com.truzzt.extension.logginghouse.client.spi.types; + +import org.eclipse.edc.spi.EdcException; + +public enum LoggingHouseMessageStatus { + PENDING("P"), SENT("S"); + + private final String code; + + LoggingHouseMessageStatus(String code) { + this.code = code; + } + + public String getCode() { + return code; + } + + public static LoggingHouseMessageStatus codeOf(String code) { + switch(code) { + case "P": + return PENDING; + case "S": + return SENT; + default: + throw new EdcException("Invalid status code " + code); + } + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java new file mode 100644 index 0000000..be0bfcb --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessageStatus; +import com.truzzt.extension.logginghouse.client.store.sql.schema.LoggingHouseEventStatements; +import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public class SqlLoggingHouseMessageStore extends AbstractSqlStore implements LoggingHouseMessageStore { + + private final LoggingHouseEventStatements statements; + + public SqlLoggingHouseMessageStore(DataSourceRegistry dataSourceRegistry, + String dataSourceName, + TransactionContext transactionContext, + ObjectMapper objectMapper, + LoggingHouseEventStatements statements, + QueryExecutor queryExecutor) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); + this.statements = statements; + } + + @Override + public void save(LoggingHouseMessage event) { + + Objects.requireNonNull(event); + Objects.requireNonNull(event.getEventType()); + Objects.requireNonNull(event.getEventId()); + Objects.requireNonNull(event.getEventToLog()); + Objects.requireNonNull(event.getCreateProcess()); + Objects.requireNonNull(event.getProcessId()); + Objects.requireNonNull(event.getStatus()); + Objects.requireNonNull(event.getCreatedAt()); + + transactionContext.execute(() -> { + try (var connection = getConnection()) { + queryExecutor.execute(connection, statements.getInsertTemplate(), + event.getEventType().getSimpleName(), + event.getEventId(), + toJson(event.getEventToLog()), + event.getCreateProcess(), + event.getProcessId(), + event.getConsumerId(), + event.getProviderId(), + event.getStatus().getCode(), + mapFromZonedDateTime(event.getCreatedAt()) + ); + + } catch (Exception e) { + throw new EdcPersistenceException("Error executing INSERT statement", e); + } + }); + } + + @Override + public List listPending() { + return transactionContext.execute(() -> { + try { + return queryExecutor.query(getConnection(), true, this::mapResultSet, + statements.getSelectPendingStatement(), + LoggingHouseMessageStatus.PENDING.getCode()) + .collect(Collectors.toList()); + } catch (SQLException e) { + throw new EdcPersistenceException("Error executing SELECT statement", e); + } + }); + } + + @Override + public void updateSent(long id) { + transactionContext.execute(() -> { + try { + queryExecutor.execute(getConnection(), + statements.getUpdateSentTemplate(), + LoggingHouseMessageStatus.SENT.getCode(), + mapFromZonedDateTime(ZonedDateTime.now()), + id); + } catch (SQLException e) { + throw new EdcPersistenceException("Error executing UPDATE statement", e); + } + }); + } + + private Long mapFromZonedDateTime(ZonedDateTime zonedDateTime) { + return zonedDateTime != null ? zonedDateTime.toEpochSecond() : null; + } + + private ZonedDateTime mapToZonedDateTime(ResultSet resultSet, String column) throws Exception { + return resultSet.getString(column) != null ? + Instant.ofEpochSecond(resultSet.getLong(column)).atZone(ZoneId.of("Z")) : + null; + } + + private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception { + + Class eventType = toClass(resultSet.getString(statements.getEventTypeColumn())); + + Object eventToLog; + try { + eventToLog = fromJson(resultSet.getString(statements.getEventToLogColumn()), eventType); + } catch (EdcPersistenceException e) { + throw new EdcPersistenceException("Error eventToLog JSON column", e); + } + + LoggingHouseMessageStatus status; + try { + status = LoggingHouseMessageStatus.codeOf(resultSet.getString(statements.getStatusColumn())); + } catch (EdcPersistenceException e) { + throw new EdcPersistenceException("Error eventToLog JSON column", e); + } + + return LoggingHouseMessage.Builder.newInstance() + .id(resultSet.getLong(statements.getIdColumn())) + .eventType(eventType) + .eventId(resultSet.getString(statements.getIdColumn())) + .eventToLog(eventToLog) + .createProcess(resultSet.getBoolean(statements.getCreateProcessColumn())) + .processId(resultSet.getString(statements.getIdColumn())) + .consumerId(resultSet.getString(statements.getIdColumn())) + .providerId(resultSet.getString(statements.getIdColumn())) + .status(status) + .createdAt(mapToZonedDateTime(resultSet, statements.getCreatedAtColumn())) + .build(); + } + + private Class toClass(String eventType) { + + switch (eventType) { + case "ContractAgreement": + return ContractAgreement.class; + case "TransferProcess": + return TransferProcess.class; + default: + throw new EdcException("Invalid eventType: " + eventType); + } + } + +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java new file mode 100644 index 0000000..56a3010 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql.schema; + +import static java.lang.String.format; + +public class BaseSqlDialectStatements implements LoggingHouseEventStatements { + + @Override + public String getInsertTemplate() { + return format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (?, ?, ?%s, ?, ?, ?, ?, ?, ?)", + getLoggingHouseMessageTable(), + getEventTypeColumn(), + getEventIdColumn(), + getEventToLogColumn(), + getCreateProcessColumn(), + getProcessIdColumn(), + getConsumerIdColumn(), + getProviderIdColumn(), + getStatusColumn(), + getCreatedAtColumn(), + getFormatAsJsonOperator() + ); + } + + @Override + public String getSelectPendingStatement() { + return format("SELECT * FROM %s WHERE %s = ?", + getLoggingHouseMessageTable(), + getStatusColumn()); + } + + @Override + public String getUpdateSentTemplate() { + return format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?", + getLoggingHouseMessageTable(), + getStatusColumn(), + getSentAtColumn(), + getIdColumn()); + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java new file mode 100644 index 0000000..5dd8eea --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql.schema; + +import org.eclipse.edc.sql.dialect.PostgresDialect; + +public interface LoggingHouseEventStatements { + + default String getLoggingHouseMessageTable() { + return "edc_logging_house_message"; + } + + default String getIdColumn() { + return "logging_house_message_id"; + } + + default String getEventTypeColumn() { + return "event_type"; + } + + default String getEventIdColumn() { + return "event_id"; + } + + default String getEventToLogColumn() { + return "event_to_log"; + } + + default String getCreateProcessColumn() { + return "create_process"; + } + + default String getProcessIdColumn() { + return "process_id"; + } + + default String getConsumerIdColumn() { + return "consumer_id"; + } + + default String getProviderIdColumn() { + return "provider_id"; + } + + default String getStatusColumn() { + return "status"; + } + + default String getCreatedAtColumn() { + return "created_at"; + } + + default String getSentAtColumn() { + return "sent_at"; + } + + String getInsertTemplate(); + + String getSelectPendingStatement(); + + String getUpdateSentTemplate(); + + default String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } + +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java new file mode 100644 index 0000000..e48620d --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql.schema.postgres; + +import com.truzzt.extension.logginghouse.client.store.sql.schema.BaseSqlDialectStatements; +import org.eclipse.edc.sql.dialect.PostgresDialect; + +public class PostgresDialectStatements extends BaseSqlDialectStatements { + + @Override + public String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/LoggingHouseWorkersManager.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/LoggingHouseWorkersManager.java new file mode 100644 index 0000000..d7265e8 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/LoggingHouseWorkersManager.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2022 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * + */ + +package com.truzzt.extension.logginghouse.client.worker; + +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.system.Hostname; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.lang.String.format; + +public class LoggingHouseWorkersManager { + + private final WorkersExecutor executor; + private final Monitor monitor; + private final int maxWorkers; + private final LoggingHouseMessageStore store; + private final RemoteMessageDispatcherRegistry dispatcherRegistry; + private final URI connectorBaseUrl; + private final URL loggingHouseUrl; + + public LoggingHouseWorkersManager(WorkersExecutor executor, + Monitor monitor, + int maxWorkers, + LoggingHouseMessageStore store, + RemoteMessageDispatcherRegistry dispatcherRegistry, + Hostname hostname, + URL loggingHouseUrl) { + this.executor = executor; + this.monitor = monitor; + this.maxWorkers = maxWorkers; + this.store = store; + this.dispatcherRegistry = dispatcherRegistry; + this.loggingHouseUrl = loggingHouseUrl; + + try { + connectorBaseUrl = getConnectorBaseUrl(hostname); + } catch (URISyntaxException e) { + throw new EdcException("Could not create connectorBaseUrl. Hostname can be set using:" + hostname, e); + } + } + + public void execute() { + executor.run(() -> { + processPending(); + }); + + } + + private void processPending() { + List messages = store.listPending(); + if (messages.isEmpty()) { + monitor.warning("No Messages to send, aborting execution"); + return; + } + monitor.debug(log("Loaded " + messages.size() + " not sent messages from store")); + var allItems = new ArrayBlockingQueue<>(messages.size(), true, messages); + + monitor.debug(log("Instantiate workers...")); + + var actualNumWorkers = Math.min(allItems.size(), maxWorkers); + monitor.debug(format(log("Worker parallelism is %s, based on config and number of not sent messages"), actualNumWorkers)); + var availableWorkers = createWorkers(actualNumWorkers); + + while (!allItems.isEmpty()) { + var worker = nextAvailableWorker(availableWorkers); + if (worker == null) { + monitor.debug(log("No worker available, will retry later")); + continue; + } + + var item = allItems.poll(); + if (item == null) { + monitor.warning(log("WorkItem queue empty, abort execution")); + break; + } + + worker.run(item) + .whenComplete((updateResponse, throwable) -> { + if (throwable != null) { + monitor.severe(log(format("Unexpected exception happened during in worker %s", worker.getId())), throwable); + } else { + monitor.info(log(format("Worker [%s] is done", worker.getId()))); + } + availableWorkers.add(worker); + }); + } + } + + @Nullable + private MessageWorker nextAvailableWorker(ArrayBlockingQueue availableWorkers) { + MessageWorker worker = null; + try { + monitor.debug(log("Getting next available worker")); + worker = availableWorkers.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + monitor.debug("interrupted while waiting for worker to become available"); + } + return worker; + } + + @NotNull + private ArrayBlockingQueue createWorkers(int numWorkers) { + + return new ArrayBlockingQueue<>(numWorkers, true, IntStream.range(0, numWorkers) + .mapToObj(i -> new MessageWorker(monitor, dispatcherRegistry, connectorBaseUrl, loggingHouseUrl, store)) + .collect(Collectors.toList())); + } + + private String log(String input) { + return "LoggingHouseWorkersManager: " + input; + } + + private URI getConnectorBaseUrl(Hostname hostname) throws URISyntaxException { + return new URI(String.format("https://%s/", hostname.get())); + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java new file mode 100644 index 0000000..659381e --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2022 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * + */ + +package com.truzzt.extension.logginghouse.client.worker; + +import com.truzzt.extension.logginghouse.client.events.messages.CreateProcessMessage; +import com.truzzt.extension.logginghouse.client.events.messages.LogMessage; +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.response.StatusResult; + +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +public class MessageWorker { + private final Monitor monitor; + private final RemoteMessageDispatcherRegistry dispatcherRegistry; + private final URI connectorBaseUrl; + private final URL loggingHouseUrl; + private final LoggingHouseMessageStore store; + private final String workerId; + + public MessageWorker(Monitor monitor, RemoteMessageDispatcherRegistry dispatcherRegistry, URI connectorBaseUrl, URL loggingHouseUrl, + LoggingHouseMessageStore store) { + this.monitor = monitor; + this.dispatcherRegistry = dispatcherRegistry; + this.connectorBaseUrl = connectorBaseUrl; + this.loggingHouseUrl = loggingHouseUrl; + this.store = store; + + workerId = "Worker-" + UUID.randomUUID(); + } + + public String getId() { + return workerId; + } + + public CompletableFuture run(LoggingHouseMessage message) { + try { + monitor.debug("Worker " + workerId + " processing message with event of type " + message.getEventType() + " and id " + message.getEventId()); + process(message); + + return CompletableFuture.completedFuture(true); + + } catch (Exception e) { + monitor.severe(e.getMessage()); + return CompletableFuture.failedFuture(new EdcException(e)); + } + } + + public void process(LoggingHouseMessage message) { + try { + var pid = message.getProcessId(); + + // Create Process + if (message.getCreateProcess()) { + var extendedProcessUrl = new URL(loggingHouseUrl + "/process/" + pid); + try { + createProcess(message, extendedProcessUrl).join(); + + } catch (Exception e) { + throw new EdcException("Could not create process in LoggingHouse", e); + } + } + + // Log Message + var extendedLogUrl = new URL(loggingHouseUrl + "/messages/log/" + pid); + try { + logMessage(message, extendedLogUrl).join(); + + } catch (Exception e) { + throw new EdcException("Could not log message to LoggingHouse", e); + } + + // Update Status + store.updateSent(message.getId()); + + } catch (MalformedURLException e) { + throw new EdcException("Could not create extended clearinghouse url."); + } + } + + public CompletableFuture> createProcess(LoggingHouseMessage message, URL loggingHouseUrl) { + + List processOwners = new ArrayList<>(); + processOwners.add(message.getConsumerId()); + processOwners.add(message.getProviderId()); + + monitor.info("Creating process in LoggingHouse with id: " + message.getProcessId()); + var logMessage = new CreateProcessMessage(loggingHouseUrl, connectorBaseUrl, message.getProcessId(), processOwners); + + return dispatcherRegistry.dispatch(Object.class, logMessage); + } + + public CompletableFuture> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) { + + monitor.info("Logging message to LoggingHouse with type " + message.getEventType() + " and id " + message.getEventId()); + var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, message.getEventToLog()); + + return dispatcherRegistry.dispatch(Object.class, logMessage); + } + +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/WorkersExecutor.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/WorkersExecutor.java new file mode 100644 index 0000000..e917920 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/WorkersExecutor.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.worker; + +import org.eclipse.edc.spi.monitor.Monitor; + +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class WorkersExecutor { + private final Duration schedule; + private final Duration withInitialDelay; + private final Monitor monitor; + + public WorkersExecutor(Duration schedule, Duration initialDelay, Monitor monitor) { + this.schedule = schedule; + withInitialDelay = initialDelay; + this.monitor = monitor; + } + + public void run(Runnable task) { + var ses = Executors.newSingleThreadScheduledExecutor(); + ses.scheduleAtFixedRate(catchExceptions(task), withInitialDelay.toMillis(), schedule.toMillis(), TimeUnit.MILLISECONDS); + } + + private Runnable catchExceptions(Runnable original) { + return () -> { + try { + original.run(); + } catch (Throwable thr) { + monitor.severe("Unexpected error during plan execution", thr); + } + }; + } +} diff --git a/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql new file mode 100644 index 0000000..7f4ec09 --- /dev/null +++ b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql @@ -0,0 +1,34 @@ +-- +-- Copyright (c) 2024 Daimler TSS GmbH +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- Daimler TSS GmbH - Initial SQL Query +-- + +-- THIS SCHEMA HAS BEEN WRITTEN AND TESTED ONLY FOR POSTGRES + +-- table: edc_logging_house_event +CREATE TABLE IF NOT EXISTS edc_logging_house_message +( + logging_house_message_id BIGSERIAL NOT NULL, + event_type VARCHAR NOT NULL, + event_id VARCHAR NOT NULL, + event_to_log JSON NOT NULL, + create_process BOOLEAN NOT NULL, + process_id VARCHAR NOT NULL, + consumer_id VARCHAR NOT NULL, + provider_id VARCHAR NOT NULL, + status VARCHAR NOT NULL, + created_at BIGINT NOT NULL, + sent_at BIGINT, + PRIMARY KEY (logging_house_message_id) +); +COMMENT ON COLUMN edc_logging_house_message.event_to_log IS 'Event to log serialized as JSON'; + +CREATE INDEX IF NOT EXISTS idx_edc_logging_house_message_status ON edc_logging_house_message (status); diff --git a/logging-house-client/src/test/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtensionTest.java b/logging-house-client/src/test/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtensionTest.java index 90a5582..e77a091 100644 --- a/logging-house-client/src/test/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtensionTest.java +++ b/logging-house-client/src/test/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtensionTest.java @@ -36,6 +36,6 @@ void setUp() { @Test void name_shouldReturnCorrectName() { - assertEquals(LoggingHouseClientExtension.LOGGINGHOUSE_CLIENT_EXTENSION, extension.name()); + assertEquals(LoggingHouseClientExtension.NAME, extension.name()); } }