Skip to content

Commit

Permalink
Merge pull request #24 from truzzt/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
dhommen authored Jul 4, 2024
2 parents 94350f5 + dd3dd10 commit 8cf3307
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Validate Gradle wrapper
uses: gradle/actions/wrapper-validation@v3
- name: Publish
uses: gradle/gradle-build-action@v3
uses: gradle/actions/setup-gradle@v3
with:
arguments: -Pversion=${{ env.IMAGE_TAG }} publish
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,8 @@ public void start() {
monitor.info("Skipping start of Logginghouse client extension (disabled).");
} else {
monitor.info("Starting Logginghouse client extension.");
workersManager.execute();
}

workersManager.execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.truzzt.extension.logginghouse.client.events.messages;

public record LogMessageReceipt(String data) {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

package com.truzzt.extension.logginghouse.client.events.messages;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartParts;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartResponse;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartSenderDelegate;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.ResponseUtil;
import de.fraunhofer.iais.eis.DynamicAttributeToken;
import de.fraunhofer.iais.eis.LogMessageBuilder;
import de.fraunhofer.iais.eis.Message;
Expand All @@ -32,9 +32,10 @@
import org.eclipse.edc.spi.monitor.Monitor;
import org.json.JSONObject;

import java.io.IOException;
import java.util.List;

public class LogMessageSender implements MultipartSenderDelegate<LogMessage, String> {
public class LogMessageSender implements MultipartSenderDelegate<LogMessage, LogMessageReceipt> {

Monitor monitor;
String connectorId;
Expand Down Expand Up @@ -70,8 +71,8 @@ public String buildMessagePayload(LogMessage logMessage) {
}

@Override
public MultipartResponse<String> getResponseContent(IdsMultipartParts parts) throws Exception {
return ResponseUtil.parseMultipartStringResponse(parts, JsonLd.getObjectMapper());
public MultipartResponse<LogMessageReceipt> getResponseContent(IdsMultipartParts parts) throws Exception {
return parseLogMessageReceiptResponse(parts, JsonLd.getObjectMapper());
}

@Override
Expand Down Expand Up @@ -132,4 +133,15 @@ private String buildTransferProcessPayload(TransferProcess transferProcess) {

return jo.toString();
}

public static MultipartResponse<LogMessageReceipt> parseLogMessageReceiptResponse(IdsMultipartParts parts, ObjectMapper objectMapper) throws IOException {
var header = objectMapper.readValue(parts.getHeader(), Message.class);

LogMessageReceipt payload = null;
if (parts.getPayload() != null) {
payload = objectMapper.readValue(parts.getPayload(), LogMessageReceipt.class);
}

return new MultipartResponse<>(header, payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public interface LoggingHouseMessageStore {

List<LoggingHouseMessage> listPending();

void updateSent(long id);
void updateSent(long id, String receipt);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void save(LoggingHouseMessage event) {
Objects.requireNonNull(event.getEventToLog());
Objects.requireNonNull(event.getCreateProcess());
Objects.requireNonNull(event.getProcessId());
Objects.requireNonNull(event.getStatus());
Objects.requireNonNull(event.getCreatedAt());

transactionContext.execute(() -> {
Expand All @@ -74,7 +73,6 @@ public void save(LoggingHouseMessage event) {
event.getProcessId(),
event.getConsumerId(),
event.getProviderId(),
event.getStatus().getCode(),
mapFromZonedDateTime(event.getCreatedAt())
);

Expand All @@ -89,8 +87,7 @@ public List<LoggingHouseMessage> listPending() {
return transactionContext.execute(() -> {
try {
return queryExecutor.query(getConnection(), true, this::mapResultSet,
statements.getSelectPendingStatement(),
LoggingHouseMessageStatus.PENDING.getCode())
statements.getSelectPendingStatement())
.collect(Collectors.toList());
} catch (SQLException e) {
throw new EdcPersistenceException("Error executing SELECT statement", e);
Expand All @@ -99,12 +96,12 @@ public List<LoggingHouseMessage> listPending() {
}

@Override
public void updateSent(long id) {
public void updateSent(long id, String receipt) {
transactionContext.execute(() -> {
try {
queryExecutor.execute(getConnection(),
statements.getUpdateSentTemplate(),
LoggingHouseMessageStatus.SENT.getCode(),
receipt,
mapFromZonedDateTime(ZonedDateTime.now()),
id);
} catch (SQLException e) {
Expand All @@ -124,7 +121,6 @@ private ZonedDateTime mapToZonedDateTime(ResultSet resultSet, String column) thr
}

private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception {

Class eventType = toClass(resultSet.getString(statements.getEventTypeColumn()));

Object eventToLog;
Expand All @@ -135,10 +131,10 @@ private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception {
}

LoggingHouseMessageStatus status;
try {
status = LoggingHouseMessageStatus.codeOf(resultSet.getString(statements.getStatusColumn()));
} catch (EdcPersistenceException e) {
throw new EdcPersistenceException("Error eventToLog JSON column", e);
if (resultSet.getString(statements.getReceiptColumn()) == null) {
status = LoggingHouseMessageStatus.PENDING;
} else {
status = LoggingHouseMessageStatus.SENT;
}

return LoggingHouseMessage.Builder.newInstance()
Expand All @@ -156,15 +152,11 @@ private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception {
}

private Class toClass(String eventType) {

switch (eventType) {
case "ContractAgreement":
return ContractAgreement.class;
case "TransferProcess":
return TransferProcess.class;
default:
throw new EdcException("Invalid eventType: " + eventType);
}
return switch (eventType) {
case "ContractAgreement" -> ContractAgreement.class;
case "TransferProcess" -> TransferProcess.class;
default -> throw new EdcException("Invalid eventType: " + eventType);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class BaseSqlDialectStatements implements LoggingHouseEventStatements {

@Override
public String getInsertTemplate() {
return format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (?, ?, ?%s, ?, ?, ?, ?, ?, ?)",
return format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s) VALUES (?, ?, ?%s, ?, ?, ?, ?, ?)",
getLoggingHouseMessageTable(),
getEventTypeColumn(),
getEventIdColumn(),
Expand All @@ -30,24 +30,23 @@ public String getInsertTemplate() {
getProcessIdColumn(),
getConsumerIdColumn(),
getProviderIdColumn(),
getStatusColumn(),
getCreatedAtColumn(),
getFormatAsJsonOperator()
);
}

@Override
public String getSelectPendingStatement() {
return format("SELECT * FROM %s WHERE %s = ?",
return format("SELECT * FROM %s WHERE %s IS NULL",
getLoggingHouseMessageTable(),
getStatusColumn());
getReceiptColumn());
}

@Override
public String getUpdateSentTemplate() {
return format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?",
getLoggingHouseMessageTable(),
getStatusColumn(),
getReceiptColumn(),
getSentAtColumn(),
getIdColumn());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ default String getSentAtColumn() {
return "sent_at";
}

default String getReceiptColumn() {
return "receipt";
}

String getInsertTemplate();

String getSelectPendingStatement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.truzzt.extension.logginghouse.client.events.messages.CreateProcessMessage;
import com.truzzt.extension.logginghouse.client.events.messages.LogMessage;
import com.truzzt.extension.logginghouse.client.events.messages.LogMessageReceipt;
import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage;
import org.eclipse.edc.spi.EdcException;
Expand Down Expand Up @@ -76,24 +77,24 @@ public void process(LoggingHouseMessage message) {
var extendedProcessUrl = new URL(loggingHouseUrl + "/process/" + pid);
try {
createProcess(message, extendedProcessUrl).join();

} catch (Exception e) {
throw new EdcException("Could not create process in LoggingHouse", e);
// TODO: Not fail when process already exists
monitor.warning("CreateProcess returned error (ignore it when the process already exists): " + e.getMessage());
//throw new EdcException("Could not create process in LoggingHouse", e);
}
}

// Log Message
var extendedLogUrl = new URL(loggingHouseUrl + "/messages/log/" + pid);
try {
logMessage(message, extendedLogUrl).join();

var response = logMessage(message, extendedLogUrl).join();
response.onSuccess(msg -> {
monitor.info("Received receipt successfully from LoggingHouse for message with id " + message.getEventId());
store.updateSent(message.getId(), msg.data());
});
} catch (Exception e) {
throw new EdcException("Could not log message to LoggingHouse", e);
}

// Update Status
store.updateSent(message.getId());

} catch (MalformedURLException e) {
throw new EdcException("Could not create extended clearinghouse url.");
}
Expand All @@ -111,12 +112,12 @@ public CompletableFuture<StatusResult<Object>> createProcess(LoggingHouseMessage
return dispatcherRegistry.dispatch(Object.class, logMessage);
}

public CompletableFuture<StatusResult<Object>> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) {
public CompletableFuture<StatusResult<LogMessageReceipt>> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) {

monitor.info("Logging message to LoggingHouse with type " + message.getEventType() + " and id " + message.getEventId());
var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, message.getEventToLog());

return dispatcherRegistry.dispatch(Object.class, logMessage);
return dispatcherRegistry.dispatch(LogMessageReceipt.class, logMessage);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- previous version of the table:
--
-- CREATE TABLE IF NOT EXISTS edc_logging_house_message
-- (
-- logging_house_message_id BIGSERIAL NOT NULL,
-- event_type VARCHAR NOT NULL,
-- event_id VARCHAR NOT NULL,
-- event_to_log JSON NOT NULL,
-- create_process BOOLEAN NOT NULL,
-- process_id VARCHAR NOT NULL,
-- consumer_id VARCHAR NOT NULL,
-- provider_id VARCHAR NOT NULL,
-- status VARCHAR NOT NULL,
-- created_at BIGINT NOT NULL,
-- sent_at BIGINT,
-- PRIMARY KEY (logging_house_message_id)
-- );

-- remove the status column and add the receipt column, the status is inferred from the receipt (if not null, the status is 'SENT')
ALTER TABLE edc_logging_house_message
DROP COLUMN IF EXISTS status,
ADD COLUMN IF NOT EXISTS receipt VARCHAR;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE edc_logging_house_message
ALTER COLUMN consumer_id DROP NOT NULL,
ALTER COLUMN provider_id DROP NOT NULL;

0 comments on commit 8cf3307

Please sign in to comment.