Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #24

Merged
merged 7 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Validate Gradle wrapper
uses: gradle/actions/wrapper-validation@v3
- name: Publish
uses: gradle/gradle-build-action@v3
uses: gradle/actions/setup-gradle@v3
with:
arguments: -Pversion=${{ env.IMAGE_TAG }} publish
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,8 @@ public void start() {
monitor.info("Skipping start of Logginghouse client extension (disabled).");
} else {
monitor.info("Starting Logginghouse client extension.");
workersManager.execute();
}

workersManager.execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.truzzt.extension.logginghouse.client.events.messages;

public record LogMessageReceipt(String data) {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

package com.truzzt.extension.logginghouse.client.events.messages;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsMultipartParts;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartResponse;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.MultipartSenderDelegate;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.ResponseUtil;
import de.fraunhofer.iais.eis.DynamicAttributeToken;
import de.fraunhofer.iais.eis.LogMessageBuilder;
import de.fraunhofer.iais.eis.Message;
Expand All @@ -32,9 +32,10 @@
import org.eclipse.edc.spi.monitor.Monitor;
import org.json.JSONObject;

import java.io.IOException;
import java.util.List;

public class LogMessageSender implements MultipartSenderDelegate<LogMessage, String> {
public class LogMessageSender implements MultipartSenderDelegate<LogMessage, LogMessageReceipt> {

Monitor monitor;
String connectorId;
Expand Down Expand Up @@ -70,8 +71,8 @@ public String buildMessagePayload(LogMessage logMessage) {
}

@Override
public MultipartResponse<String> getResponseContent(IdsMultipartParts parts) throws Exception {
return ResponseUtil.parseMultipartStringResponse(parts, JsonLd.getObjectMapper());
public MultipartResponse<LogMessageReceipt> getResponseContent(IdsMultipartParts parts) throws Exception {
return parseLogMessageReceiptResponse(parts, JsonLd.getObjectMapper());
}

@Override
Expand Down Expand Up @@ -132,4 +133,15 @@ private String buildTransferProcessPayload(TransferProcess transferProcess) {

return jo.toString();
}

public static MultipartResponse<LogMessageReceipt> parseLogMessageReceiptResponse(IdsMultipartParts parts, ObjectMapper objectMapper) throws IOException {
var header = objectMapper.readValue(parts.getHeader(), Message.class);

LogMessageReceipt payload = null;
if (parts.getPayload() != null) {
payload = objectMapper.readValue(parts.getPayload(), LogMessageReceipt.class);
}

return new MultipartResponse<>(header, payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public interface LoggingHouseMessageStore {

List<LoggingHouseMessage> listPending();

void updateSent(long id);
void updateSent(long id, String receipt);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void save(LoggingHouseMessage event) {
Objects.requireNonNull(event.getEventToLog());
Objects.requireNonNull(event.getCreateProcess());
Objects.requireNonNull(event.getProcessId());
Objects.requireNonNull(event.getStatus());
Objects.requireNonNull(event.getCreatedAt());

transactionContext.execute(() -> {
Expand All @@ -74,7 +73,6 @@ public void save(LoggingHouseMessage event) {
event.getProcessId(),
event.getConsumerId(),
event.getProviderId(),
event.getStatus().getCode(),
mapFromZonedDateTime(event.getCreatedAt())
);

Expand All @@ -89,8 +87,7 @@ public List<LoggingHouseMessage> listPending() {
return transactionContext.execute(() -> {
try {
return queryExecutor.query(getConnection(), true, this::mapResultSet,
statements.getSelectPendingStatement(),
LoggingHouseMessageStatus.PENDING.getCode())
statements.getSelectPendingStatement())
.collect(Collectors.toList());
} catch (SQLException e) {
throw new EdcPersistenceException("Error executing SELECT statement", e);
Expand All @@ -99,12 +96,12 @@ public List<LoggingHouseMessage> listPending() {
}

@Override
public void updateSent(long id) {
public void updateSent(long id, String receipt) {
transactionContext.execute(() -> {
try {
queryExecutor.execute(getConnection(),
statements.getUpdateSentTemplate(),
LoggingHouseMessageStatus.SENT.getCode(),
receipt,
mapFromZonedDateTime(ZonedDateTime.now()),
id);
} catch (SQLException e) {
Expand All @@ -124,7 +121,6 @@ private ZonedDateTime mapToZonedDateTime(ResultSet resultSet, String column) thr
}

private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception {

Class eventType = toClass(resultSet.getString(statements.getEventTypeColumn()));

Object eventToLog;
Expand All @@ -135,10 +131,10 @@ private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception {
}

LoggingHouseMessageStatus status;
try {
status = LoggingHouseMessageStatus.codeOf(resultSet.getString(statements.getStatusColumn()));
} catch (EdcPersistenceException e) {
throw new EdcPersistenceException("Error eventToLog JSON column", e);
if (resultSet.getString(statements.getReceiptColumn()) == null) {
status = LoggingHouseMessageStatus.PENDING;
} else {
status = LoggingHouseMessageStatus.SENT;
}

return LoggingHouseMessage.Builder.newInstance()
Expand All @@ -156,15 +152,11 @@ private LoggingHouseMessage mapResultSet(ResultSet resultSet) throws Exception {
}

private Class toClass(String eventType) {

switch (eventType) {
case "ContractAgreement":
return ContractAgreement.class;
case "TransferProcess":
return TransferProcess.class;
default:
throw new EdcException("Invalid eventType: " + eventType);
}
return switch (eventType) {
case "ContractAgreement" -> ContractAgreement.class;
case "TransferProcess" -> TransferProcess.class;
default -> throw new EdcException("Invalid eventType: " + eventType);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class BaseSqlDialectStatements implements LoggingHouseEventStatements {

@Override
public String getInsertTemplate() {
return format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (?, ?, ?%s, ?, ?, ?, ?, ?, ?)",
return format("INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s) VALUES (?, ?, ?%s, ?, ?, ?, ?, ?)",
getLoggingHouseMessageTable(),
getEventTypeColumn(),
getEventIdColumn(),
Expand All @@ -30,24 +30,23 @@ public String getInsertTemplate() {
getProcessIdColumn(),
getConsumerIdColumn(),
getProviderIdColumn(),
getStatusColumn(),
getCreatedAtColumn(),
getFormatAsJsonOperator()
);
}

@Override
public String getSelectPendingStatement() {
return format("SELECT * FROM %s WHERE %s = ?",
return format("SELECT * FROM %s WHERE %s IS NULL",
getLoggingHouseMessageTable(),
getStatusColumn());
getReceiptColumn());
}

@Override
public String getUpdateSentTemplate() {
return format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?",
getLoggingHouseMessageTable(),
getStatusColumn(),
getReceiptColumn(),
getSentAtColumn(),
getIdColumn());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ default String getSentAtColumn() {
return "sent_at";
}

default String getReceiptColumn() {
return "receipt";
}

String getInsertTemplate();

String getSelectPendingStatement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.truzzt.extension.logginghouse.client.events.messages.CreateProcessMessage;
import com.truzzt.extension.logginghouse.client.events.messages.LogMessage;
import com.truzzt.extension.logginghouse.client.events.messages.LogMessageReceipt;
import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage;
import org.eclipse.edc.spi.EdcException;
Expand Down Expand Up @@ -76,24 +77,24 @@ public void process(LoggingHouseMessage message) {
var extendedProcessUrl = new URL(loggingHouseUrl + "/process/" + pid);
try {
createProcess(message, extendedProcessUrl).join();

} catch (Exception e) {
throw new EdcException("Could not create process in LoggingHouse", e);
// TODO: Not fail when process already exists
monitor.warning("CreateProcess returned error (ignore it when the process already exists): " + e.getMessage());
//throw new EdcException("Could not create process in LoggingHouse", e);
}
}

// Log Message
var extendedLogUrl = new URL(loggingHouseUrl + "/messages/log/" + pid);
try {
logMessage(message, extendedLogUrl).join();

var response = logMessage(message, extendedLogUrl).join();
response.onSuccess(msg -> {
monitor.info("Received receipt successfully from LoggingHouse for message with id " + message.getEventId());
store.updateSent(message.getId(), msg.data());
});
} catch (Exception e) {
throw new EdcException("Could not log message to LoggingHouse", e);
}

// Update Status
store.updateSent(message.getId());

} catch (MalformedURLException e) {
throw new EdcException("Could not create extended clearinghouse url.");
}
Expand All @@ -111,12 +112,12 @@ public CompletableFuture<StatusResult<Object>> createProcess(LoggingHouseMessage
return dispatcherRegistry.dispatch(Object.class, logMessage);
}

public CompletableFuture<StatusResult<Object>> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) {
public CompletableFuture<StatusResult<LogMessageReceipt>> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) {

monitor.info("Logging message to LoggingHouse with type " + message.getEventType() + " and id " + message.getEventId());
var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, message.getEventToLog());

return dispatcherRegistry.dispatch(Object.class, logMessage);
return dispatcherRegistry.dispatch(LogMessageReceipt.class, logMessage);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- previous version of the table:
--
-- CREATE TABLE IF NOT EXISTS edc_logging_house_message
-- (
-- logging_house_message_id BIGSERIAL NOT NULL,
-- event_type VARCHAR NOT NULL,
-- event_id VARCHAR NOT NULL,
-- event_to_log JSON NOT NULL,
-- create_process BOOLEAN NOT NULL,
-- process_id VARCHAR NOT NULL,
-- consumer_id VARCHAR NOT NULL,
-- provider_id VARCHAR NOT NULL,
-- status VARCHAR NOT NULL,
-- created_at BIGINT NOT NULL,
-- sent_at BIGINT,
-- PRIMARY KEY (logging_house_message_id)
-- );

-- remove the status column and add the receipt column, the status is inferred from the receipt (if not null, the status is 'SENT')
ALTER TABLE edc_logging_house_message
DROP COLUMN IF EXISTS status,
ADD COLUMN IF NOT EXISTS receipt VARCHAR;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE edc_logging_house_message
ALTER COLUMN consumer_id DROP NOT NULL,
ALTER COLUMN provider_id DROP NOT NULL;
Loading