diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 824e0de..2aeaf72 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -25,7 +25,7 @@ jobs: - name: Validate Gradle wrapper uses: gradle/actions/wrapper-validation@v3 - name: Publish - uses: gradle/gradle-build-action@v3 + uses: gradle/actions/setup-gradle@v3 with: arguments: -Pversion=${{ env.IMAGE_TAG }} publish env: diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java index b5c8e8b..2e815bd 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java @@ -275,9 +275,8 @@ public void start() { monitor.info("Skipping start of Logginghouse client extension (disabled)."); } else { monitor.info("Starting Logginghouse client extension."); + workersManager.execute(); } - - workersManager.execute(); } @Override diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageReceipt.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageReceipt.java new file mode 100644 index 0000000..4e43eda --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageReceipt.java @@ -0,0 +1,3 @@ +package com.truzzt.extension.logginghouse.client.events.messages; + +public record LogMessageReceipt(String data) {} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java index 8371ab1..ab9d575 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/events/messages/LogMessageSender.java @@ -14,13 +14,13 @@ package com.truzzt.extension.logginghouse.client.events.messages; +import com.fasterxml.jackson.databind.ObjectMapper; import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd; import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil; import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants; import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartParts; import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartResponse; import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartSenderDelegate; -import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.ResponseUtil; import de.fraunhofer.iais.eis.DynamicAttributeToken; import de.fraunhofer.iais.eis.LogMessageBuilder; import de.fraunhofer.iais.eis.Message; @@ -32,9 +32,10 @@ import org.eclipse.edc.spi.monitor.Monitor; import org.json.JSONObject; +import java.io.IOException; import java.util.List; -public class LogMessageSender implements MultipartSenderDelegate { +public class LogMessageSender implements MultipartSenderDelegate { Monitor monitor; String connectorId; @@ -70,8 +71,8 @@ public String buildMessagePayload(LogMessage logMessage) { } @Override - public MultipartResponse getResponseContent(IdsMultipartParts parts) throws Exception { - return ResponseUtil.parseMultipartStringResponse(parts, JsonLd.getObjectMapper()); + public MultipartResponse getResponseContent(IdsMultipartParts parts) throws Exception { + return parseLogMessageReceiptResponse(parts, JsonLd.getObjectMapper()); } @Override @@ -132,4 +133,15 @@ private String buildTransferProcessPayload(TransferProcess transferProcess) { return jo.toString(); } + + public static MultipartResponse parseLogMessageReceiptResponse(IdsMultipartParts parts, ObjectMapper objectMapper) throws IOException { + var header = objectMapper.readValue(parts.getHeader(), Message.class); + + LogMessageReceipt payload = null; + if (parts.getPayload() != null) { + payload = objectMapper.readValue(parts.getPayload(), LogMessageReceipt.class); + } + + return new MultipartResponse<>(header, payload); + } } \ No newline at end of file diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java index 9351161..cca8343 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java @@ -24,5 +24,5 @@ public interface LoggingHouseMessageStore { List listPending(); - void updateSent(long id); + void updateSent(long id, String receipt); } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java index dc8886a..d61dd52 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java @@ -61,7 +61,6 @@ public void save(LoggingHouseMessage event) { Objects.requireNonNull(event.getEventToLog()); Objects.requireNonNull(event.getCreateProcess()); Objects.requireNonNull(event.getProcessId()); - Objects.requireNonNull(event.getStatus()); Objects.requireNonNull(event.getCreatedAt()); transactionContext.execute(() -> { @@ -74,7 +73,6 @@ public void save(LoggingHouseMessage event) { event.getProcessId(), event.getConsumerId(), event.getProviderId(), - event.getStatus().getCode(), mapFromZonedDateTime(event.getCreatedAt()) ); @@ -89,8 +87,7 @@ public List listPending() { return transactionContext.execute(() -> { try { return queryExecutor.query(getConnection(), true, this::mapResultSet, - statements.getSelectPendingStatement(), - LoggingHouseMessageStatus.PENDING.getCode()) + statements.getSelectPendingStatement()) .collect(Collectors.toList()); } catch (SQLException e) { throw new EdcPersistenceException("Error executing SELECT statement", e); @@ -99,12 +96,12 @@ public List listPending() { } @Override - public void updateSent(long id) { + public void updateSent(long id, String receipt) { transactionContext.execute(() -> { try { queryExecutor.execute(getConnection(), statements.getUpdateSentTemplate(), - LoggingHouseMessageStatus.SENT.getCode(), + receipt, mapFromZonedDateTime(ZonedDateTime.now()), id); } catch (SQLException e) { @@ -124,7 +121,6 @@ private ZonedDateTime mapToZonedDateTime(ResultSet resultSet, String column) thr } private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception { - Class eventType = toClass(resultSet.getString(statements.getEventTypeColumn())); Object eventToLog; @@ -135,10 +131,10 @@ private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception { } LoggingHouseMessageStatus status; - try { - status = LoggingHouseMessageStatus.codeOf(resultSet.getString(statements.getStatusColumn())); - } catch (EdcPersistenceException e) { - throw new EdcPersistenceException("Error eventToLog JSON column", e); + if (resultSet.getString(statements.getReceiptColumn()) == null) { + status = LoggingHouseMessageStatus.PENDING; + } else { + status = LoggingHouseMessageStatus.SENT; } return LoggingHouseMessage.Builder.newInstance() @@ -156,15 +152,11 @@ private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception { } private Class toClass(String eventType) { - - switch (eventType) { - case "ContractAgreement": - return ContractAgreement.class; - case "TransferProcess": - return TransferProcess.class; - default: - throw new EdcException("Invalid eventType: " + eventType); - } + return switch (eventType) { + case "ContractAgreement" -> ContractAgreement.class; + case "TransferProcess" -> TransferProcess.class; + default -> throw new EdcException("Invalid eventType: " + eventType); + }; } } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java index 56a3010..e497a32 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java @@ -21,7 +21,7 @@ public class BaseSqlDialectStatements implements LoggingHouseEventStatements { @Override public String getInsertTemplate() { - return format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (?, ?, ?%s, ?, ?, ?, ?, ?, ?)", + return format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s) VALUES (?, ?, ?%s, ?, ?, ?, ?, ?)", getLoggingHouseMessageTable(), getEventTypeColumn(), getEventIdColumn(), @@ -30,7 +30,6 @@ public String getInsertTemplate() { getProcessIdColumn(), getConsumerIdColumn(), getProviderIdColumn(), - getStatusColumn(), getCreatedAtColumn(), getFormatAsJsonOperator() ); @@ -38,16 +37,16 @@ public String getInsertTemplate() { @Override public String getSelectPendingStatement() { - return format("SELECT * FROM %s WHERE %s = ?", + return format("SELECT * FROM %s WHERE %s IS NULL", getLoggingHouseMessageTable(), - getStatusColumn()); + getReceiptColumn()); } @Override public String getUpdateSentTemplate() { return format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?", getLoggingHouseMessageTable(), - getStatusColumn(), + getReceiptColumn(), getSentAtColumn(), getIdColumn()); } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java index 5dd8eea..056e043 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java @@ -67,6 +67,10 @@ default String getSentAtColumn() { return "sent_at"; } + default String getReceiptColumn() { + return "receipt"; + } + String getInsertTemplate(); String getSelectPendingStatement(); diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java index 659381e..edd102e 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/worker/MessageWorker.java @@ -16,6 +16,7 @@ import com.truzzt.extension.logginghouse.client.events.messages.CreateProcessMessage; import com.truzzt.extension.logginghouse.client.events.messages.LogMessage; +import com.truzzt.extension.logginghouse.client.events.messages.LogMessageReceipt; import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; import org.eclipse.edc.spi.EdcException; @@ -76,24 +77,24 @@ public void process(LoggingHouseMessage message) { var extendedProcessUrl = new URL(loggingHouseUrl + "/process/" + pid); try { createProcess(message, extendedProcessUrl).join(); - } catch (Exception e) { - throw new EdcException("Could not create process in LoggingHouse", e); + // TODO: Not fail when process already exists + monitor.warning("CreateProcess returned error (ignore it when the process already exists): " + e.getMessage()); + //throw new EdcException("Could not create process in LoggingHouse", e); } } // Log Message var extendedLogUrl = new URL(loggingHouseUrl + "/messages/log/" + pid); try { - logMessage(message, extendedLogUrl).join(); - + var response = logMessage(message, extendedLogUrl).join(); + response.onSuccess(msg -> { + monitor.info("Received receipt successfully from LoggingHouse for message with id " + message.getEventId()); + store.updateSent(message.getId(), msg.data()); + }); } catch (Exception e) { throw new EdcException("Could not log message to LoggingHouse", e); } - - // Update Status - store.updateSent(message.getId()); - } catch (MalformedURLException e) { throw new EdcException("Could not create extended clearinghouse url."); } @@ -111,12 +112,12 @@ public CompletableFuture> createProcess(LoggingHouseMessage return dispatcherRegistry.dispatch(Object.class, logMessage); } - public CompletableFuture> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) { + public CompletableFuture> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) { monitor.info("Logging message to LoggingHouse with type " + message.getEventType() + " and id " + message.getEventId()); var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, message.getEventToLog()); - return dispatcherRegistry.dispatch(Object.class, logMessage); + return dispatcherRegistry.dispatch(LogMessageReceipt.class, logMessage); } } diff --git a/logging-house-client/src/main/resources/migration/logginghouse/V0_0_2__Update_Tables.sql b/logging-house-client/src/main/resources/migration/logginghouse/V0_0_2__Update_Tables.sql new file mode 100644 index 0000000..cdd7b81 --- /dev/null +++ b/logging-house-client/src/main/resources/migration/logginghouse/V0_0_2__Update_Tables.sql @@ -0,0 +1,22 @@ +-- previous version of the table: +-- +-- CREATE TABLE IF NOT EXISTS edc_logging_house_message +-- ( +-- logging_house_message_id BIGSERIAL NOT NULL, +-- event_type VARCHAR NOT NULL, +-- event_id VARCHAR NOT NULL, +-- event_to_log JSON NOT NULL, +-- create_process BOOLEAN NOT NULL, +-- process_id VARCHAR NOT NULL, +-- consumer_id VARCHAR NOT NULL, +-- provider_id VARCHAR NOT NULL, +-- status VARCHAR NOT NULL, +-- created_at BIGINT NOT NULL, +-- sent_at BIGINT, +-- PRIMARY KEY (logging_house_message_id) +-- ); + +-- remove the status column and add the receipt column, the status is inferred from the receipt (if not null, the status is 'SENT') +ALTER TABLE edc_logging_house_message + DROP COLUMN IF EXISTS status, + ADD COLUMN IF NOT EXISTS receipt VARCHAR; \ No newline at end of file diff --git a/logging-house-client/src/main/resources/migration/logginghouse/V0_0_3__Remove_Constraint.sql b/logging-house-client/src/main/resources/migration/logginghouse/V0_0_3__Remove_Constraint.sql new file mode 100644 index 0000000..080eabe --- /dev/null +++ b/logging-house-client/src/main/resources/migration/logginghouse/V0_0_3__Remove_Constraint.sql @@ -0,0 +1,3 @@ +ALTER TABLE edc_logging_house_message + ALTER COLUMN consumer_id DROP NOT NULL, + ALTER COLUMN provider_id DROP NOT NULL; \ No newline at end of file