diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ConfigConstants.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ConfigConstants.java index dfd0ebc..56c5554 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ConfigConstants.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ConfigConstants.java @@ -11,4 +11,8 @@ public class ConfigConstants { static final String LOGGINGHOUSE_FLYWAY_CLEAN_SETTING = "edc.logginghouse.client.flyway.clean"; static final String LOGGINGHOUSE_EXTENSION_MAX_WORKERS = "edc.logginghouse.client.workers.max"; + + static final String LOGGINGHOUSE_EXTENSION_WORKERS_DELAY = "edc.logginghouse.client.workers.delay"; + + static final String LOGGINGHOUSE_EXTENSION_WORKERS_PERIOD = "edc.logginghouse.client.workers.period"; } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java index 07260de..089c839 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java @@ -14,24 +14,24 @@ package com.truzzt.extension.logginghouse.client; +import com.truzzt.extension.logginghouse.client.events.LoggingHouseEventSubscriber; import com.truzzt.extension.logginghouse.client.flyway.FlywayService; +import com.truzzt.extension.logginghouse.client.flyway.connection.DatasourceProperties; import com.truzzt.extension.logginghouse.client.flyway.migration.DatabaseMigrationManager; -import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender; -import com.truzzt.extension.logginghouse.client.messages.CreateProcessMessageSender; -import com.truzzt.extension.logginghouse.client.messages.LogMessageSender; +import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartSender; +import com.truzzt.extension.logginghouse.client.events.messages.CreateProcessMessageSender; +import com.truzzt.extension.logginghouse.client.events.messages.LogMessageSender; import com.truzzt.extension.logginghouse.client.multipart.IdsMultipartClearingRemoteMessageDispatcher; import com.truzzt.extension.logginghouse.client.multipart.MultiContextJsonLdSerializer; import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; import com.truzzt.extension.logginghouse.client.store.sql.SqlLoggingHouseMessageStore; import com.truzzt.extension.logginghouse.client.store.sql.schema.postgres.PostgresDialectStatements; -import com.truzzt.extension.logginghouse.client.worker.WorkersManager; +import com.truzzt.extension.logginghouse.client.worker.LoggingHouseWorkersManager; +import com.truzzt.extension.logginghouse.client.worker.WorkersExecutor; import de.fraunhofer.iais.eis.LogMessage; import de.fraunhofer.iais.eis.RequestMessage; -import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted; -import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAgreed; import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; -import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationTerminated; import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessFailed; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessInitiated; @@ -41,7 +41,9 @@ import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Requires; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.asset.AssetIndex; import org.eclipse.edc.spi.event.EventRouter; @@ -59,18 +61,39 @@ import java.net.MalformedURLException; import java.net.URL; +import java.time.Duration; import java.util.Map; -import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_ENABLED; -import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_SERVER_URL_SETTING; -import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_FLYWAY_REPAIR_SETTING; -import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_FLYWAY_CLEAN_SETTING; -import static com.truzzt.extension.logginghouse.client.ConfigConstants.LOGGINGHOUSE_EXTENSION_MAX_WORKERS; +import static com.truzzt.extension.logginghouse.client.ConfigConstants.*; +@Extension(value = LoggingHouseClientExtension.NAME) +@Requires(value = { + Hostname.class, + + TypeManager.class, + EventRouter.class, + IdentityService.class, + RemoteMessageDispatcherRegistry.class, + + DataSourceRegistry.class, + TransactionContext.class, + QueryExecutor.class, + + ContractNegotiationStore.class, + TransferProcessStore.class, + AssetIndex.class +}) public class LoggingHouseClientExtension implements ServiceExtension { - public static final String LOGGINGHOUSE_CLIENT_EXTENSION = "LoggingHouseClientExtension"; + public static final String NAME = "LoggingHouseClientExtension"; private static final String TYPE_MANAGER_SERIALIZER_KEY = "ids-clearinghouse"; + private static final Map CONTEXT_MAP = Map.of( + "cat", "http://w3id.org/mds/data-categories#", + "ids", "https://w3id.org/idsa/core/", + "idsc", "https://w3id.org/idsa/code/"); + + @Inject + private Hostname hostname; @Inject private TypeManager typeManager; @@ -81,9 +104,6 @@ public class LoggingHouseClientExtension implements ServiceExtension { @Inject private RemoteMessageDispatcherRegistry dispatcherRegistry; - @Inject - private Hostname hostname; - @Inject private DataSourceRegistry dataSourceRegistry; @Inject @@ -98,20 +118,14 @@ public class LoggingHouseClientExtension implements ServiceExtension { @Inject private AssetIndex assetIndex; - private static final Map CONTEXT_MAP = Map.of( - "cat", "http://w3id.org/mds/data-categories#", - "ids", "https://w3id.org/idsa/core/", - "idsc", "https://w3id.org/idsa/code/"); - public Monitor monitor; private boolean enabled; private URL loggingHouseLogUrl; - - private WorkersManager workersManager; + private LoggingHouseWorkersManager workersManager; @Override public String name() { - return LOGGINGHOUSE_CLIENT_EXTENSION; + return NAME; } @Override @@ -168,7 +182,7 @@ private void runFlywayMigrations(ServiceExtensionContext context) { private SqlLoggingHouseMessageStore initializeLoggingHouseMessageStore(TypeManager typeManager) { return new SqlLoggingHouseMessageStore( dataSourceRegistry, - DataSourceRegistry.DEFAULT_DATASOURCE, + DatasourceProperties.LOGGING_HOUSE_DATASOURCE, transactionContext, typeManager.getMapper(), new PostgresDialectStatements(), @@ -186,9 +200,7 @@ private void registerEventSubscriber(ServiceExtensionContext context, LoggingHou monitor); eventRouter.registerSync(ContractNegotiationFinalized.class, eventSubscriber); - eventRouter.registerSync(ContractNegotiationAgreed.class, eventSubscriber); - eventRouter.registerSync(ContractNegotiationAccepted.class, eventSubscriber); - eventRouter.registerSync(ContractNegotiationTerminated.class, eventSubscriber); // TODO: check pid + eventRouter.registerSync(TransferProcessRequested.class, eventSubscriber); eventRouter.registerSync(TransferProcessInitiated.class, eventSubscriber); eventRouter.registerSync(TransferProcessStarted.class, eventSubscriber); @@ -220,8 +232,13 @@ private void registerCommonTypes(TypeManager typeManager) { monitor.debug("Registered serializers for LoggingHouseClientExtension"); } - private WorkersManager initializeWorkersManager(ServiceExtensionContext context, LoggingHouseMessageStore store) { - return new WorkersManager(this.monitor, + private LoggingHouseWorkersManager initializeWorkersManager(ServiceExtensionContext context, LoggingHouseMessageStore store) { + var periodSeconds = context.getSetting(LOGGINGHOUSE_EXTENSION_WORKERS_DELAY, 30); + var initialDelaySeconds = context.getSetting(LOGGINGHOUSE_EXTENSION_WORKERS_PERIOD, 10); + var executor = new WorkersExecutor(Duration.ofSeconds(periodSeconds), Duration.ofSeconds(initialDelaySeconds), monitor); + + return new LoggingHouseWorkersManager(executor, + monitor, context.getSetting(LOGGINGHOUSE_EXTENSION_MAX_WORKERS, 1), store, dispatcherRegistry, @@ -254,6 +271,8 @@ public void start() { } else { monitor.info("Starting Logginghouse client extension."); } + + workersManager.execute(); } @Override diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/LoggingHouseEventSubscriber.java similarity index 84% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/LoggingHouseEventSubscriber.java index 76d98ad..25fe50e 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/LoggingHouseEventSubscriber.java @@ -12,10 +12,11 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.events; import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessageStatus; import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; @@ -48,26 +49,6 @@ public LoggingHouseEventSubscriber( this.monitor = monitor; } - public void storeContractAgreement(ContractAgreement contractAgreement) { - monitor.info("Storing ContractAgreement to send to LoggingHouse"); - - var message = LoggingHouseMessage.Builder.newInstance() - .eventToLog(contractAgreement) - .createdAt(ZonedDateTime.now()) - .build(); - loggingHouseMessageStore.save(message); - } - - public void storeTransferProcess(TransferProcess transferProcess) { - monitor.info("Storing TransferProcess to send to LoggingHouse"); - - var message = LoggingHouseMessage.Builder.newInstance() - .eventToLog(transferProcess) - .createdAt(ZonedDateTime.now()) - .build(); - loggingHouseMessageStore.save(message); - } - @Override public void on(EventEnvelope event) { if (event.getPayload() instanceof ContractNegotiationFinalized contractNegotiationFinalized) { @@ -91,7 +72,7 @@ public void on(EventEnvelope event) { } } - private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) throws NullPointerException { + private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) { var contractNegotiationId = contractNegotiationFinalized.getContractNegotiationId(); var contractNegotiation = contractNegotiationStore.findById(contractNegotiationId); return Objects.requireNonNull(contractNegotiation).getContractAgreement(); @@ -101,4 +82,36 @@ private TransferProcess resolveTransferProcess(TransferProcessEvent transferProc var transferProcessId = transferProcessEvent.getTransferProcessId(); return transferProcessStore.findById(transferProcessId); } + + public void storeContractAgreement(ContractAgreement contractAgreement) { + monitor.info("Storing ContractAgreement to send to LoggingHouse"); + + var message = LoggingHouseMessage.Builder.newInstance() + .eventType(contractAgreement.getClass()) + .eventId(contractAgreement.getId()) + .eventToLog(contractAgreement) + .createProcess(true) + .processId(contractAgreement.getId()) + .consumerId(contractAgreement.getConsumerId()) + .providerId(contractAgreement.getProviderId()) + .status(LoggingHouseMessageStatus.PENDING) + .createdAt(ZonedDateTime.now()) + .build(); + loggingHouseMessageStore.save(message); + } + + public void storeTransferProcess(TransferProcess transferProcess) { + monitor.info("Storing TransferProcess to send to LoggingHouse"); + + var message = LoggingHouseMessage.Builder.newInstance() + .eventType(transferProcess.getClass()) + .eventId(transferProcess.getId()) + .eventToLog(transferProcess) + .createProcess(false) + .processId(transferProcess.getContractId()) + .status(LoggingHouseMessageStatus.PENDING) + .createdAt(ZonedDateTime.now()) + .build(); + loggingHouseMessageStore.save(message); + } } \ No newline at end of file diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessage.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessage.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessage.java index bc1b298..0bbf02c 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessage.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessage.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.messages; +package com.truzzt.extension.logginghouse.client.events.messages; import com.truzzt.extension.logginghouse.client.multipart.ExtendedMessageProtocolClearing; import org.eclipse.edc.spi.types.domain.message.RemoteMessage; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessageSender.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessageSender.java similarity index 74% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessageSender.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessageSender.java index 35ab185..c7040ad 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessageSender.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/CreateProcessMessageSender.java @@ -12,15 +12,15 @@ * */ -package com.truzzt.extension.logginghouse.client.messages; +package com.truzzt.extension.logginghouse.client.events.messages; -import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd; -import com.truzzt.extension.logginghouse.client.ids.multipart.CalendarUtil; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsConstants; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartParts; -import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartResponse; -import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartSenderDelegate; -import com.truzzt.extension.logginghouse.client.ids.multipart.ResponseUtil; +import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartParts; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartResponse; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartSenderDelegate; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.ResponseUtil; import de.fraunhofer.iais.eis.DynamicAttributeToken; import de.fraunhofer.iais.eis.Message; import de.fraunhofer.iais.eis.MessageProcessedNotificationMessageImpl; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessage.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessage.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessage.java index 7633f2a..823597a 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessage.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessage.java @@ -13,7 +13,7 @@ * */ -package com.truzzt.extension.logginghouse.client.messages; +package com.truzzt.extension.logginghouse.client.events.messages; import com.truzzt.extension.logginghouse.client.multipart.ExtendedMessageProtocolClearing; import org.eclipse.edc.spi.types.domain.message.RemoteMessage; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessageSender.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java similarity index 87% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessageSender.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java index 9a014bf..8371ab1 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessageSender.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java @@ -12,15 +12,15 @@ * */ -package com.truzzt.extension.logginghouse.client.messages; - -import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd; -import com.truzzt.extension.logginghouse.client.ids.multipart.CalendarUtil; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsConstants; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartParts; -import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartResponse; -import com.truzzt.extension.logginghouse.client.ids.multipart.MultipartSenderDelegate; -import com.truzzt.extension.logginghouse.client.ids.multipart.ResponseUtil; +package com.truzzt.extension.logginghouse.client.events.messages; + +import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartParts; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartResponse; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartSenderDelegate; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.ResponseUtil; import de.fraunhofer.iais.eis.DynamicAttributeToken; import de.fraunhofer.iais.eis.LogMessageBuilder; import de.fraunhofer.iais.eis.Message; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java index c990604..543eac3 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java @@ -14,18 +14,15 @@ package com.truzzt.extension.logginghouse.client.flyway.connection; -import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.system.configuration.Config; -import static org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry.DEFAULT_DATASOURCE; - public class DatasourceProperties { - private static final String LOGGING_HOUSE_DATASOURCE = "logginghouse"; + public static final String LOGGING_HOUSE_DATASOURCE = "logginghouse"; - private static final String DATASOURCE_SETTING_NAME = "edc.datasource.%s.name"; - private static final String DATASOURCE_SETTING_JDBC_URL = "edc.datasource.%s.url"; - private static final String DATASOURCE_SETTING_USER = "edc.datasource.%s.user"; - private static final String DATASOURCE_SETTING_PASSWORD = "edc.datasource.%s.password"; + private static final String DATASOURCE_SETTING_NAME = "edc.datasource.logginghouse.name"; + private static final String DATASOURCE_SETTING_JDBC_URL = "edc.datasource.logginghouse.url"; + private static final String DATASOURCE_SETTING_USER = "edc.datasource.logginghouse.user"; + private static final String DATASOURCE_SETTING_PASSWORD = "edc.datasource.logginghouse.password"; private final String name; private final String jdbcUrl; @@ -33,25 +30,10 @@ public class DatasourceProperties { private final String password; public DatasourceProperties(Config config) { - name = getSetting(config, String.format(DATASOURCE_SETTING_NAME, LOGGING_HOUSE_DATASOURCE), - String.format(DATASOURCE_SETTING_NAME, DEFAULT_DATASOURCE)); - - jdbcUrl = getSetting(config, String.format(DATASOURCE_SETTING_JDBC_URL, LOGGING_HOUSE_DATASOURCE), - String.format(DATASOURCE_SETTING_JDBC_URL, DEFAULT_DATASOURCE)); - - user = getSetting(config, String.format(DATASOURCE_SETTING_USER, LOGGING_HOUSE_DATASOURCE), - String.format(DATASOURCE_SETTING_USER, DEFAULT_DATASOURCE)); - - password = getSetting(config, String.format(DATASOURCE_SETTING_PASSWORD, LOGGING_HOUSE_DATASOURCE), - String.format(DATASOURCE_SETTING_PASSWORD, DEFAULT_DATASOURCE)); - } - - private String getSetting(Config config, String setting, String defaultSetting) { - try { - return config.getString(setting); - } catch (EdcException e) { - return config.getString(defaultSetting); - } + name = config.getString(DATASOURCE_SETTING_NAME); + jdbcUrl = config.getString(DATASOURCE_SETTING_JDBC_URL); + user = config.getString(DATASOURCE_SETTING_USER); + password = config.getString(DATASOURCE_SETTING_PASSWORD); } public String getName() { diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java index e08e965..3e124ad 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java @@ -14,8 +14,8 @@ package com.truzzt.extension.logginghouse.client.multipart; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartRemoteMessageDispatcher; -import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartRemoteMessageDispatcher; +import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartSender; public class IdsMultipartClearingRemoteMessageDispatcher extends IdsMultipartRemoteMessageDispatcher { diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLd.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLd.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLd.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLd.java index c542ae0..5693a3e 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLd.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLd.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.DeserializationFeature; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLdModule.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLdModule.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLdModule.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLdModule.java index a5c4bfe..fa73508 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/JsonLdModule.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/JsonLdModule.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.databind.module.SimpleModule; import java.net.URI; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriDeserializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriDeserializer.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriDeserializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriDeserializer.java index cf684c0..e644d4f 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriDeserializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriDeserializer.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriSerializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriSerializer.java similarity index 94% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriSerializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriSerializer.java index a2e94dc..3858ccf 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/UriSerializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/UriSerializer.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarDeserializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarDeserializer.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarDeserializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarDeserializer.java index 116c11c..4279934 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarDeserializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarDeserializer.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarSerializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarSerializer.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarSerializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarSerializer.java index d42744f..9ad1cf6 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/jsonld/XmlGregorianCalendarSerializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/jsonld/XmlGregorianCalendarSerializer.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.jsonld; +package com.truzzt.extension.logginghouse.client.multipart.ids.jsonld; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/CalendarUtil.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/CalendarUtil.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/CalendarUtil.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/CalendarUtil.java index 7a7b8bf..b338de9 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/CalendarUtil.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/CalendarUtil.java @@ -13,7 +13,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import java.time.ZonedDateTime; import java.util.GregorianCalendar; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsConstants.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsConstants.java similarity index 89% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsConstants.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsConstants.java index 7a927e8..3950c21 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsConstants.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsConstants.java @@ -13,7 +13,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; public final class IdsConstants { diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartParts.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartParts.java similarity index 95% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartParts.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartParts.java index 72bcfdf..5be1da3 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartParts.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartParts.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartRemoteMessageDispatcher.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartRemoteMessageDispatcher.java similarity index 97% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartRemoteMessageDispatcher.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartRemoteMessageDispatcher.java index 7342e56..ea74ef6 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartRemoteMessageDispatcher.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartRemoteMessageDispatcher.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartSender.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartSender.java similarity index 97% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartSender.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartSender.java index 3677494..a3ec49f 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/IdsMultipartSender.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/IdsMultipartSender.java @@ -14,7 +14,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import com.fasterxml.jackson.databind.ObjectMapper; import de.fraunhofer.iais.eis.DynamicAttributeToken; @@ -219,7 +219,7 @@ protected IdsMultipartParts extractResponseParts(ResponseBody body) throws Excep public void checkResponseType(MultipartResponse response, MultipartSenderDelegate senderDelegate) { var type = senderDelegate.getAllowedResponseTypes(); if (!type.contains(response.header().getClass())) { - throw new EdcException(String.format("Received %s but expected %s.", response.header().getClass(), type)); + throw new EdcException(format("Received %s but expected %s.", response.header().getClass(), type)); } } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartResponse.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartResponse.java similarity index 91% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartResponse.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartResponse.java index 83b2ce5..818c599 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartResponse.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartResponse.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import de.fraunhofer.iais.eis.Message; import org.jetbrains.annotations.NotNull; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartSenderDelegate.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartSenderDelegate.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartSenderDelegate.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartSenderDelegate.java index b5e8bdb..adfe1c3 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/MultipartSenderDelegate.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/MultipartSenderDelegate.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import de.fraunhofer.iais.eis.DynamicAttributeToken; import de.fraunhofer.iais.eis.Message; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/ResponseUtil.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/ResponseUtil.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/ResponseUtil.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/ResponseUtil.java index 493cc6e..63090ab 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ids/multipart/ResponseUtil.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ids/multipart/ResponseUtil.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client.ids.multipart; +package com.truzzt.extension.logginghouse.client.multipart.ids.multipart; import com.fasterxml.jackson.databind.ObjectMapper; import de.fraunhofer.iais.eis.Message; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java index 6fbba22..9351161 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java @@ -22,5 +22,7 @@ public interface LoggingHouseMessageStore { void save(LoggingHouseMessage message); - List listNotSent(); + List listPending(); + + void updateSent(long id); } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java index 2272f1b..e416e48 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java @@ -19,19 +19,20 @@ public class LoggingHouseMessage { private Long id; - private String eventType; + private Class eventType; private String eventId; private Object eventToLog; + private boolean createProcess; private String processId; private String consumerId; private String providerId; + private LoggingHouseMessageStatus status; private ZonedDateTime createdAt; - private ZonedDateTime sentAt; public Long getId() { return id; } - public String getEventType() { + public Class getEventType() { return eventType; } public String getEventId() { @@ -40,6 +41,9 @@ public String getEventId() { public Object getEventToLog() { return eventToLog; } + public boolean getCreateProcess() { + return createProcess; + } public String getProcessId() { return processId; } @@ -49,12 +53,12 @@ public String getConsumerId() { public String getProviderId() { return providerId; } + public LoggingHouseMessageStatus getStatus() { + return status; + } public ZonedDateTime getCreatedAt() { return createdAt; } - public ZonedDateTime getSentAt() { - return sentAt; - } public static final class Builder { private final LoggingHouseMessage event = new LoggingHouseMessage(); @@ -70,7 +74,7 @@ public LoggingHouseMessage.Builder id(Long id) { this.event.id = id; return this; } - public LoggingHouseMessage.Builder eventType(String eventType) { + public LoggingHouseMessage.Builder eventType(Class eventType) { this.event.eventType = eventType; return this; } @@ -82,6 +86,10 @@ public LoggingHouseMessage.Builder eventToLog(Object eventToLog) { this.event.eventToLog = eventToLog; return this; } + public LoggingHouseMessage.Builder createProcess(boolean createProcess) { + this.event.createProcess = createProcess; + return this; + } public LoggingHouseMessage.Builder processId(String processId) { this.event.processId = processId; return this; @@ -94,12 +102,12 @@ public LoggingHouseMessage.Builder providerId(String providerId) { this.event.providerId = providerId; return this; } - public LoggingHouseMessage.Builder createdAt(ZonedDateTime createdAt) { - this.event.createdAt = createdAt; + public LoggingHouseMessage.Builder status(LoggingHouseMessageStatus status) { + this.event.status = status; return this; } - public LoggingHouseMessage.Builder sentAt(ZonedDateTime sentAt) { - this.event.sentAt = sentAt; + public LoggingHouseMessage.Builder createdAt(ZonedDateTime createdAt) { + this.event.createdAt = createdAt; return this; } @@ -107,9 +115,9 @@ public LoggingHouseMessage build() { Objects.requireNonNull(this.event.eventType, "Message eventType must not be null"); Objects.requireNonNull(this.event.eventId, "Message eventId must not be null"); Objects.requireNonNull(this.event.eventToLog, "Message eventToLog must not be null"); + Objects.requireNonNull(this.event.createProcess, "Message createProcess must not be null"); Objects.requireNonNull(this.event.processId, "Message processId must not be null"); - Objects.requireNonNull(this.event.consumerId, "Message consumerId must not be null"); - Objects.requireNonNull(this.event.providerId, "Message providerId must not be null"); + Objects.requireNonNull(this.event.status, "Message status must not be null"); Objects.requireNonNull(this.event.createdAt, "Message createdAt must not be null"); return this.event; } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessageStatus.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessageStatus.java new file mode 100644 index 0000000..5b3fdee --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessageStatus.java @@ -0,0 +1,28 @@ +package com.truzzt.extension.logginghouse.client.spi.types; + +import org.eclipse.edc.spi.EdcException; + +public enum LoggingHouseMessageStatus { + PENDING("P"), SENT("S"); + + private final String code; + + LoggingHouseMessageStatus(String code) { + this.code = code; + } + + public String getCode() { + return code; + } + + public static LoggingHouseMessageStatus codeOf(String code) { + switch(code) { + case "P": + return PENDING; + case "S": + return SENT; + default: + throw new EdcException("Invalid status code " + code); + } + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java index 7ff745d..be0bfcb 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java @@ -18,17 +18,25 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessageStatus; import com.truzzt.extension.logginghouse.client.store.sql.schema.LoggingHouseEventStatements; +import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.persistence.EdcPersistenceException; import org.eclipse.edc.sql.QueryExecutor; import org.eclipse.edc.sql.store.AbstractSqlStore; import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; import org.eclipse.edc.transaction.spi.TransactionContext; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.time.ZoneId; import java.time.ZonedDateTime; -import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; public class SqlLoggingHouseMessageStore extends AbstractSqlStore implements LoggingHouseMessageStore { @@ -48,29 +56,115 @@ public SqlLoggingHouseMessageStore(DataSourceRegistry dataSourceRegistry, public void save(LoggingHouseMessage event) { Objects.requireNonNull(event); + Objects.requireNonNull(event.getEventType()); + Objects.requireNonNull(event.getEventId()); Objects.requireNonNull(event.getEventToLog()); + Objects.requireNonNull(event.getCreateProcess()); + Objects.requireNonNull(event.getProcessId()); + Objects.requireNonNull(event.getStatus()); Objects.requireNonNull(event.getCreatedAt()); transactionContext.execute(() -> { try (var connection = getConnection()) { queryExecutor.execute(connection, statements.getInsertTemplate(), + event.getEventType().getSimpleName(), + event.getEventId(), toJson(event.getEventToLog()), + event.getCreateProcess(), + event.getProcessId(), + event.getConsumerId(), + event.getProviderId(), + event.getStatus().getCode(), mapFromZonedDateTime(event.getCreatedAt()) ); } catch (Exception e) { - throw new EdcPersistenceException(e.getMessage(), e); + throw new EdcPersistenceException("Error executing INSERT statement", e); } }); } @Override - public List listNotSent() { - return new ArrayList(); + public List listPending() { + return transactionContext.execute(() -> { + try { + return queryExecutor.query(getConnection(), true, this::mapResultSet, + statements.getSelectPendingStatement(), + LoggingHouseMessageStatus.PENDING.getCode()) + .collect(Collectors.toList()); + } catch (SQLException e) { + throw new EdcPersistenceException("Error executing SELECT statement", e); + } + }); + } + + @Override + public void updateSent(long id) { + transactionContext.execute(() -> { + try { + queryExecutor.execute(getConnection(), + statements.getUpdateSentTemplate(), + LoggingHouseMessageStatus.SENT.getCode(), + mapFromZonedDateTime(ZonedDateTime.now()), + id); + } catch (SQLException e) { + throw new EdcPersistenceException("Error executing UPDATE statement", e); + } + }); } private Long mapFromZonedDateTime(ZonedDateTime zonedDateTime) { return zonedDateTime != null ? zonedDateTime.toEpochSecond() : null; } + private ZonedDateTime mapToZonedDateTime(ResultSet resultSet, String column) throws Exception { + return resultSet.getString(column) != null ? + Instant.ofEpochSecond(resultSet.getLong(column)).atZone(ZoneId.of("Z")) : + null; + } + + private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception { + + Class eventType = toClass(resultSet.getString(statements.getEventTypeColumn())); + + Object eventToLog; + try { + eventToLog = fromJson(resultSet.getString(statements.getEventToLogColumn()), eventType); + } catch (EdcPersistenceException e) { + throw new EdcPersistenceException("Error eventToLog JSON column", e); + } + + LoggingHouseMessageStatus status; + try { + status = LoggingHouseMessageStatus.codeOf(resultSet.getString(statements.getStatusColumn())); + } catch (EdcPersistenceException e) { + throw new EdcPersistenceException("Error eventToLog JSON column", e); + } + + return LoggingHouseMessage.Builder.newInstance() + .id(resultSet.getLong(statements.getIdColumn())) + .eventType(eventType) + .eventId(resultSet.getString(statements.getIdColumn())) + .eventToLog(eventToLog) + .createProcess(resultSet.getBoolean(statements.getCreateProcessColumn())) + .processId(resultSet.getString(statements.getIdColumn())) + .consumerId(resultSet.getString(statements.getIdColumn())) + .providerId(resultSet.getString(statements.getIdColumn())) + .status(status) + .createdAt(mapToZonedDateTime(resultSet, statements.getCreatedAtColumn())) + .build(); + } + + private Class toClass(String eventType) { + + switch (eventType) { + case "ContractAgreement": + return ContractAgreement.class; + case "TransferProcess": + return TransferProcess.class; + default: + throw new EdcException("Invalid eventType: " + eventType); + } + } + } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java index 6017294..56a3010 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java @@ -21,11 +21,34 @@ public class BaseSqlDialectStatements implements LoggingHouseEventStatements { @Override public String getInsertTemplate() { - return format("INSERT INTO %s (%s, %s) VALUES (?%s, ?)", + return format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (?, ?, ?%s, ?, ?, ?, ?, ?, ?)", getLoggingHouseMessageTable(), + getEventTypeColumn(), + getEventIdColumn(), getEventToLogColumn(), + getCreateProcessColumn(), + getProcessIdColumn(), + getConsumerIdColumn(), + getProviderIdColumn(), + getStatusColumn(), getCreatedAtColumn(), getFormatAsJsonOperator() ); } + + @Override + public String getSelectPendingStatement() { + return format("SELECT * FROM %s WHERE %s = ?", + getLoggingHouseMessageTable(), + getStatusColumn()); + } + + @Override + public String getUpdateSentTemplate() { + return format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?", + getLoggingHouseMessageTable(), + getStatusColumn(), + getSentAtColumn(), + getIdColumn()); + } } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java index 5854cdb..5dd8eea 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java @@ -22,17 +22,57 @@ public interface LoggingHouseEventStatements { default String getLoggingHouseMessageTable() { return "edc_logging_house_message"; } + + default String getIdColumn() { + return "logging_house_message_id"; + } + + default String getEventTypeColumn() { + return "event_type"; + } + + default String getEventIdColumn() { + return "event_id"; + } default String getEventToLogColumn() { return "event_to_log"; } + default String getCreateProcessColumn() { + return "create_process"; + } + + default String getProcessIdColumn() { + return "process_id"; + } + + default String getConsumerIdColumn() { + return "consumer_id"; + } + + default String getProviderIdColumn() { + return "provider_id"; + } + + default String getStatusColumn() { + return "status"; + } + default String getCreatedAtColumn() { return "created_at"; } + default String getSentAtColumn() { + return "sent_at"; + } + String getInsertTemplate(); + String getSelectPendingStatement(); + + String getUpdateSentTemplate(); + default String getFormatAsJsonOperator() { return PostgresDialect.getJsonCastOperator(); } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/WorkersManager.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/LoggingHouseWorkersManager.java similarity index 83% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/WorkersManager.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/LoggingHouseWorkersManager.java index 4b8b804..d7265e8 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/WorkersManager.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/LoggingHouseWorkersManager.java @@ -34,8 +34,9 @@ import static java.lang.String.format; -public class WorkersManager { +public class LoggingHouseWorkersManager { + private final WorkersExecutor executor; private final Monitor monitor; private final int maxWorkers; private final LoggingHouseMessageStore store; @@ -43,12 +44,14 @@ public class WorkersManager { private final URI connectorBaseUrl; private final URL loggingHouseUrl; - public WorkersManager(Monitor monitor, - int maxWorkers, - LoggingHouseMessageStore store, - RemoteMessageDispatcherRegistry dispatcherRegistry, - Hostname hostname, - URL loggingHouseUrl) { + public LoggingHouseWorkersManager(WorkersExecutor executor, + Monitor monitor, + int maxWorkers, + LoggingHouseMessageStore store, + RemoteMessageDispatcherRegistry dispatcherRegistry, + Hostname hostname, + URL loggingHouseUrl) { + this.executor = executor; this.monitor = monitor; this.maxWorkers = maxWorkers; this.store = store; @@ -62,8 +65,15 @@ public WorkersManager(Monitor monitor, } } - public void run() { - List messages = store.listNotSent(); + public void execute() { + executor.run(() -> { + processPending(); + }); + + } + + private void processPending() { + List messages = store.listPending(); if (messages.isEmpty()) { monitor.warning("No Messages to send, aborting execution"); return; @@ -118,12 +128,12 @@ private MessageWorker nextAvailableWorker(ArrayBlockingQueue avai private ArrayBlockingQueue createWorkers(int numWorkers) { return new ArrayBlockingQueue<>(numWorkers, true, IntStream.range(0, numWorkers) - .mapToObj(i -> new MessageWorker(monitor, dispatcherRegistry, connectorBaseUrl, loggingHouseUrl)) + .mapToObj(i -> new MessageWorker(monitor, dispatcherRegistry, connectorBaseUrl, loggingHouseUrl, store)) .collect(Collectors.toList())); } private String log(String input) { - return "WorkersManager: " + input; + return "LoggingHouseWorkersManager: " + input; } private URI getConnectorBaseUrl(Hostname hostname) throws URISyntaxException { diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java index d74bd82..424a2d1 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java @@ -14,12 +14,14 @@ package com.truzzt.extension.logginghouse.client.worker; -import com.truzzt.extension.logginghouse.client.messages.CreateProcessMessage; -import com.truzzt.extension.logginghouse.client.messages.LogMessage; +import com.truzzt.extension.logginghouse.client.events.messages.CreateProcessMessage; +import com.truzzt.extension.logginghouse.client.events.messages.LogMessage; +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.response.StatusResult; import java.net.MalformedURLException; import java.net.URI; @@ -34,13 +36,16 @@ public class MessageWorker { private final RemoteMessageDispatcherRegistry dispatcherRegistry; private final URI connectorBaseUrl; private final URL loggingHouseUrl; + private final LoggingHouseMessageStore store; private final String workerId; - public MessageWorker(Monitor monitor, RemoteMessageDispatcherRegistry dispatcherRegistry, URI connectorBaseUrl, URL loggingHouseUrl) { + public MessageWorker(Monitor monitor, RemoteMessageDispatcherRegistry dispatcherRegistry, URI connectorBaseUrl, URL loggingHouseUrl, + LoggingHouseMessageStore store) { this.monitor = monitor; this.dispatcherRegistry = dispatcherRegistry; this.connectorBaseUrl = connectorBaseUrl; this.loggingHouseUrl = loggingHouseUrl; + this.store = store; workerId = "Worker-" + UUID.randomUUID(); } @@ -64,30 +69,37 @@ public CompletableFuture run(LoggingHouseMessage message) { public void process(LoggingHouseMessage message) { try { - var pid = message.getProcessId(); + var pid = message.getProcessId(); - // Create Process + // Create Process + if (message.getCreateProcess()) { var extendedProcessUrl = new URL(loggingHouseUrl + "/process/" + pid); try { - createProcess(message, extendedProcessUrl); - } catch (Exception e) { - monitor.warning("Could not create process in LoggingHouse: " + e.getMessage()); - } + createProcess(message, extendedProcessUrl).join(); - // Log Contract Agreement - var extendedLogUrl = new URL(loggingHouseUrl + "/messages/log/" + pid); - try { - logMessage(message, extendedLogUrl); } catch (Exception e) { - monitor.warning("Could not log message to LoggingHouse: " + e.getMessage()); + throw new EdcException("Could not create process in LoggingHouse", e); } + } + + // Log Message + var extendedLogUrl = new URL(loggingHouseUrl + "/messages/log/" + pid); + try { + logMessage(message, extendedLogUrl).join(); + + } catch (Exception e) { + throw new EdcException("Could not log message to LoggingHouse", e); + } + + // Update Status + store.updateSent(message.getId()); } catch (MalformedURLException e) { throw new EdcException("Could not create extended clearinghouse url."); } } - public void createProcess(LoggingHouseMessage message, URL loggingHouseUrl) { + public CompletableFuture> createProcess(LoggingHouseMessage message, URL loggingHouseUrl) { List processOwners = new ArrayList<>(); processOwners.add(message.getConsumerId()); @@ -96,15 +108,15 @@ public void createProcess(LoggingHouseMessage message, URL loggingHouseUrl) { monitor.info("Creating process in LoggingHouse with id: " + message.getProcessId()); var logMessage = new CreateProcessMessage(loggingHouseUrl, connectorBaseUrl, message.getProcessId(), processOwners); - dispatcherRegistry.dispatch(Object.class, logMessage); + return dispatcherRegistry.dispatch(Object.class, logMessage); } - public void logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) { + public CompletableFuture> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) { monitor.info("Logging message to LoggingHouse with type " + message.getEventType() + " and id " + message.getEventId()); var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, message.getEventToLog()); - dispatcherRegistry.dispatch(Object.class, logMessage); + return dispatcherRegistry.dispatch(Object.class, logMessage); } } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/WorkersExecutor.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/WorkersExecutor.java new file mode 100644 index 0000000..e917920 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/WorkersExecutor.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.worker; + +import org.eclipse.edc.spi.monitor.Monitor; + +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class WorkersExecutor { + private final Duration schedule; + private final Duration withInitialDelay; + private final Monitor monitor; + + public WorkersExecutor(Duration schedule, Duration initialDelay, Monitor monitor) { + this.schedule = schedule; + withInitialDelay = initialDelay; + this.monitor = monitor; + } + + public void run(Runnable task) { + var ses = Executors.newSingleThreadScheduledExecutor(); + ses.scheduleAtFixedRate(catchExceptions(task), withInitialDelay.toMillis(), schedule.toMillis(), TimeUnit.MILLISECONDS); + } + + private Runnable catchExceptions(Runnable original) { + return () -> { + try { + original.run(); + } catch (Throwable thr) { + monitor.severe("Unexpected error during plan execution", thr); + } + }; + } +} diff --git a/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql index 00c67b6..7f4ec09 100644 --- a/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql +++ b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql @@ -17,9 +17,18 @@ CREATE TABLE IF NOT EXISTS edc_logging_house_message ( logging_house_message_id BIGSERIAL NOT NULL, + event_type VARCHAR NOT NULL, + event_id VARCHAR NOT NULL, event_to_log JSON NOT NULL, - created_at TIMESTAMP NOT NULL, - sent_at TIMESTAMP NOT NULL, + create_process BOOLEAN NOT NULL, + process_id VARCHAR NOT NULL, + consumer_id VARCHAR NOT NULL, + provider_id VARCHAR NOT NULL, + status VARCHAR NOT NULL, + created_at BIGINT NOT NULL, + sent_at BIGINT, PRIMARY KEY (logging_house_message_id) ); COMMENT ON COLUMN edc_logging_house_message.event_to_log IS 'Event to log serialized as JSON'; + +CREATE INDEX IF NOT EXISTS idx_edc_logging_house_message_status ON edc_logging_house_message (status); diff --git a/logging-house-client/src/test/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtensionTest.java b/logging-house-client/src/test/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtensionTest.java index 90a5582..e77a091 100644 --- a/logging-house-client/src/test/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtensionTest.java +++ b/logging-house-client/src/test/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtensionTest.java @@ -36,6 +36,6 @@ void setUp() { @Test void name_shouldReturnCorrectName() { - assertEquals(LoggingHouseClientExtension.LOGGINGHOUSE_CLIENT_EXTENSION, extension.name()); + assertEquals(LoggingHouseClientExtension.NAME, extension.name()); } }