From 3df5a4b700f1446ba7f5d9ad8d24a3fec804d6ea Mon Sep 17 00:00:00 2001 From: Glaucio Jannotti Date: Fri, 28 Jun 2024 09:34:44 -0300 Subject: [PATCH] feat: logging house messages store --- .../client/IdsClearingHouseServiceImpl.java | 168 ------------------ .../client/LoggingHouseClientExtension.java | 42 +++-- .../client/LoggingHouseEventSubscriber.java | 104 +++++++++++ .../spi/store/LoggingHouseMessageStore.java | 22 +++ .../client/spi/types/LoggingHouseMessage.java | 72 ++++++++ .../sql/SqlLoggingHouseMessageStore.java | 69 +++++++ .../sql/schema/BaseSqlDialectStatements.java | 31 ++++ .../schema/LoggingHouseEventStatements.java | 40 +++++ .../postgres/PostgresDialectStatements.java | 27 +++ .../migration/V0_0_1__Create_Tables.sql | 25 ++- 10 files changed, 419 insertions(+), 181 deletions(-) delete mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java deleted file mode 100644 index deec714..0000000 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (c) 2022 sovity GmbH - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * sovity GmbH - initial API and implementation - * - */ - -package com.truzzt.extension.logginghouse.client; - -import com.truzzt.extension.logginghouse.client.messages.CreateProcessMessage; -import com.truzzt.extension.logginghouse.client.messages.LogMessage; -import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; -import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; -import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; -import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; -import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; -import org.eclipse.edc.spi.EdcException; -import org.eclipse.edc.spi.event.Event; -import org.eclipse.edc.spi.event.EventEnvelope; -import org.eclipse.edc.spi.event.EventSubscriber; -import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; -import org.eclipse.edc.spi.monitor.Monitor; -import org.eclipse.edc.spi.system.Hostname; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -public class IdsClearingHouseServiceImpl implements EventSubscriber { - - private final RemoteMessageDispatcherRegistry dispatcherRegistry; - private final URI connectorBaseUrl; - private final URL clearingHouseLogUrl; - private final ContractNegotiationStore contractNegotiationStore; - private final TransferProcessStore transferProcessStore; - private final Monitor monitor; - - public IdsClearingHouseServiceImpl( - RemoteMessageDispatcherRegistry dispatcherRegistry, - Hostname hostname, - URL clearingHouseLogUrl, - ContractNegotiationStore contractNegotiationStore, - TransferProcessStore transferProcessStore, - Monitor monitor) { - this.dispatcherRegistry = dispatcherRegistry; - this.clearingHouseLogUrl = clearingHouseLogUrl; - this.contractNegotiationStore = contractNegotiationStore; - this.transferProcessStore = transferProcessStore; - this.monitor = monitor; - - try { - connectorBaseUrl = getConnectorBaseUrl(hostname); - } catch (URISyntaxException e) { - throw new EdcException("Could not create connectorBaseUrl. Hostname can be set using:" + - " edc.hostname", e); - } - } - - public void createProcess(ContractAgreement contractAgreement, URL clearingHouseLogUrl) { - // Create PID - List processOwners = new ArrayList<>(); - processOwners.add(contractAgreement.getConsumerId()); - processOwners.add(contractAgreement.getProviderId()); - - monitor.info("Creating Process in LoggingHouse"); - var logMessage = new CreateProcessMessage(clearingHouseLogUrl, connectorBaseUrl, contractAgreement.getId(), processOwners); - - try { - dispatcherRegistry.dispatch(Object.class, logMessage); - } catch (EdcException e) { - if (e.getMessage().startsWith("No provider dispatcher registered for protocol")) { - throw e; - } else { - monitor.severe("Unhandled exception while creating process in LoggingHouse. " + e.getMessage()); - // Print stack trace - String errorStr; - try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { - e.printStackTrace(pw); - errorStr = sw.toString(); - } catch (IOException ex) { - throw new RuntimeException("Error while converting the stacktrace"); - } - monitor.severe(errorStr); - } - } - } - - public void logContractAgreement(ContractAgreement contractAgreement, URL clearingHouseLogUrl) { - monitor.info("Logging ContractAgreement to LoggingHouse with contract id: " + contractAgreement.getId()); - var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, contractAgreement); - dispatcherRegistry.dispatch(Object.class, logMessage); - } - - public void logTransferProcess(TransferProcess transferProcess, URL clearingHouseLogUrl) { - monitor.info("Logging TransferProcess to LoggingHouse"); - var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, transferProcess); - dispatcherRegistry.dispatch(Object.class, logMessage); - } - - @Override - public void on(EventEnvelope event) { - try { - if (event.getPayload() instanceof ContractNegotiationFinalized contractNegotiationFinalized) { - var contractAgreement = resolveContractAgreement(contractNegotiationFinalized); - var pid = contractAgreement.getId(); - - // Create Process - var extendedProcessUrl = new URL(clearingHouseLogUrl + "/process/" + pid); - try { - createProcess(contractAgreement, extendedProcessUrl); - } catch (Exception e) { - monitor.warning("Could not create process in LoggingHouse: " + e.getMessage()); - } - - // Log Contract Agreement - var extendedLogUrl = new URL(clearingHouseLogUrl + "/messages/log/" + pid); - try { - logContractAgreement(contractAgreement, extendedLogUrl); - } catch (Exception e) { - monitor.warning("Could not log ContractNegotiation to LoggingHouse: " + e.getMessage()); - } - } else if (event.getPayload() instanceof TransferProcessEvent transferProcessEvent) { - monitor.debug("Logging transfer event with id " + event.getId()); - - var transferProcess = resolveTransferProcess(transferProcessEvent); - var pid = transferProcess.getContractId(); - var extendedUrl = new URL(clearingHouseLogUrl + "/messages/log/" + pid); - try { - logTransferProcess(transferProcess, extendedUrl); - } catch (Exception e) { - monitor.warning("Could not log TransferProcess to LoggingHouse: " + e.getMessage()); - } - } - } catch (MalformedURLException e) { - throw new EdcException("Could not create extended clearinghouse url."); - } - } - - private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) throws NullPointerException { - var contractNegotiationId = contractNegotiationFinalized.getContractNegotiationId(); - var contractNegotiation = contractNegotiationStore.findById(contractNegotiationId); - return Objects.requireNonNull(contractNegotiation).getContractAgreement(); - } - - private TransferProcess resolveTransferProcess(TransferProcessEvent transferProcessEvent) { - var transferProcessId = transferProcessEvent.getTransferProcessId(); - return transferProcessStore.findById(transferProcessId); - } - - private URI getConnectorBaseUrl(Hostname hostname) throws URISyntaxException { - return new URI(String.format("https://%s/", hostname.get())); - } -} \ No newline at end of file diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java index c116f1f..714745b 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java @@ -8,6 +8,7 @@ * SPDX-License-Identifier: Apache-2.0 * * Contributors: + * truzzt GmbH - Initial implementation * */ @@ -21,6 +22,8 @@ import com.truzzt.extension.logginghouse.client.messages.LogMessageSender; import com.truzzt.extension.logginghouse.client.multipart.IdsMultipartClearingRemoteMessageDispatcher; import com.truzzt.extension.logginghouse.client.multipart.MultiContextJsonLdSerializer; +import com.truzzt.extension.logginghouse.client.store.sql.SqlLoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.store.sql.schema.postgres.PostgresDialectStatements; import de.fraunhofer.iais.eis.LogMessage; import de.fraunhofer.iais.eis.RequestMessage; import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted; @@ -48,6 +51,9 @@ import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; import java.net.MalformedURLException; import java.net.URL; @@ -55,7 +61,6 @@ public class LoggingHouseClientExtension implements ServiceExtension { - public static final String LOGGINGHOUSE_CLIENT_EXTENSION = "LoggingHouseClientExtension"; private static final String TYPE_MANAGER_SERIALIZER_KEY = "ids-clearinghouse"; @@ -71,6 +76,13 @@ public class LoggingHouseClientExtension implements ServiceExtension { @Inject private Hostname hostname; + @Inject + private DataSourceRegistry dataSourceRegistry; + @Inject + private TransactionContext transactionContext; + @Inject + private QueryExecutor queryExecutor; + @Inject private ContractNegotiationStore contractNegotiationStore; @Inject @@ -101,7 +113,6 @@ public class LoggingHouseClientExtension implements ServiceExtension { public Monitor monitor; private boolean enabled; - @Override public String name() { return LOGGINGHOUSE_CLIENT_EXTENSION; @@ -127,7 +138,9 @@ public void initialize(ServiceExtensionContext context) { registerSerializerClearingHouseMessages(context); registerClearingHouseMessageSenders(context); - registerEventSubscriber(context); + var loggingHouseMessageStore = initializeLoggingHouseMessageStore(typeManager); + + registerEventSubscriber(context, loggingHouseMessageStore); } private URL readUrlFromSettings(ServiceExtensionContext context) { @@ -146,7 +159,7 @@ private URL readUrlFromSettings(ServiceExtensionContext context) { } } - public void runFlywayMigrations(ServiceExtensionContext context) { + private void runFlywayMigrations(ServiceExtensionContext context) { var flywayService = new FlywayService( context.getMonitor(), context.getSetting(EDC_DATASOURCE_REPAIR_SETTING, false), @@ -157,13 +170,22 @@ public void runFlywayMigrations(ServiceExtensionContext context) { migrationManager.migrate(); } - private void registerEventSubscriber(ServiceExtensionContext context) { + private SqlLoggingHouseMessageStore initializeLoggingHouseMessageStore(TypeManager typeManager) { + return new SqlLoggingHouseMessageStore( + dataSourceRegistry, + DataSourceRegistry.DEFAULT_DATASOURCE, + transactionContext, + typeManager.getMapper(), + new PostgresDialectStatements(), + queryExecutor + ); + } + + private void registerEventSubscriber(ServiceExtensionContext context, SqlLoggingHouseMessageStore loggingHouseMessageStore) { monitor.debug("Registering event subscriber for LoggingHouseClientExtension"); - var eventSubscriber = new IdsClearingHouseServiceImpl( - dispatcherRegistry, - hostname, - loggingHouseLogUrl, + var eventSubscriber = new LoggingHouseEventSubscriber( + loggingHouseMessageStore, contractNegotiationStore, transferProcessStore, monitor); @@ -178,7 +200,7 @@ private void registerEventSubscriber(ServiceExtensionContext context) { eventRouter.registerSync(TransferProcessCompleted.class, eventSubscriber); eventRouter.registerSync(TransferProcessFailed.class, eventSubscriber); eventRouter.registerSync(TransferProcessTerminated.class, eventSubscriber); - context.registerService(IdsClearingHouseServiceImpl.class, eventSubscriber); + context.registerService(LoggingHouseEventSubscriber.class, eventSubscriber); monitor.debug("Registered event subscriber for LoggingHouseClientExtension"); } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java new file mode 100644 index 0000000..76d98ad --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2022 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package com.truzzt.extension.logginghouse.client; + +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; +import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; +import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; +import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.spi.event.Event; +import org.eclipse.edc.spi.event.EventEnvelope; +import org.eclipse.edc.spi.event.EventSubscriber; +import org.eclipse.edc.spi.monitor.Monitor; + +import java.time.ZonedDateTime; +import java.util.Objects; + +public class LoggingHouseEventSubscriber implements EventSubscriber { + + private final LoggingHouseMessageStore loggingHouseMessageStore; + private final ContractNegotiationStore contractNegotiationStore; + private final TransferProcessStore transferProcessStore; + private final Monitor monitor; + + public LoggingHouseEventSubscriber( + LoggingHouseMessageStore loggingHouseMessageStore, + ContractNegotiationStore contractNegotiationStore, + TransferProcessStore transferProcessStore, + Monitor monitor) { + this.loggingHouseMessageStore = loggingHouseMessageStore; + this.contractNegotiationStore = contractNegotiationStore; + this.transferProcessStore = transferProcessStore; + this.monitor = monitor; + } + + public void storeContractAgreement(ContractAgreement contractAgreement) { + monitor.info("Storing ContractAgreement to send to LoggingHouse"); + + var message = LoggingHouseMessage.Builder.newInstance() + .eventToLog(contractAgreement) + .createdAt(ZonedDateTime.now()) + .build(); + loggingHouseMessageStore.save(message); + } + + public void storeTransferProcess(TransferProcess transferProcess) { + monitor.info("Storing TransferProcess to send to LoggingHouse"); + + var message = LoggingHouseMessage.Builder.newInstance() + .eventToLog(transferProcess) + .createdAt(ZonedDateTime.now()) + .build(); + loggingHouseMessageStore.save(message); + } + + @Override + public void on(EventEnvelope event) { + if (event.getPayload() instanceof ContractNegotiationFinalized contractNegotiationFinalized) { + monitor.debug("Storing ContractNegotiationFinalized event with id " + event.getId()); + + var contractAgreement = resolveContractAgreement(contractNegotiationFinalized); + try { + storeContractAgreement(contractAgreement); + } catch (Exception e) { + monitor.warning("Could not store ContractNegotiation: " + e.getMessage()); + } + } else if (event.getPayload() instanceof TransferProcessEvent transferProcessEvent) { + monitor.debug("Storing TransferProcess event with id " + event.getId()); + + var transferProcess = resolveTransferProcess(transferProcessEvent); + try { + storeTransferProcess(transferProcess); + } catch (Exception e) { + monitor.warning("Could not store TransferProcess: " + e.getMessage()); + } + } + } + + private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) throws NullPointerException { + var contractNegotiationId = contractNegotiationFinalized.getContractNegotiationId(); + var contractNegotiation = contractNegotiationStore.findById(contractNegotiationId); + return Objects.requireNonNull(contractNegotiation).getContractAgreement(); + } + + private TransferProcess resolveTransferProcess(TransferProcessEvent transferProcessEvent) { + var transferProcessId = transferProcessEvent.getTransferProcessId(); + return transferProcessStore.findById(transferProcessId); + } +} \ No newline at end of file diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java new file mode 100644 index 0000000..d04fa72 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2024 truzzt GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * truzzt GmbH - Initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.spi.store; + +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; + +public interface LoggingHouseMessageStore { + + void save(LoggingHouseMessage message); +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java new file mode 100644 index 0000000..7253f91 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2024 truzzt GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * truzzt GmbH - Initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.spi.types; + +import java.time.ZonedDateTime; +import java.util.Objects; + +public class LoggingHouseMessage { + private Long id; + private Object eventToLog; + private ZonedDateTime createdAt; + private ZonedDateTime sentAt; + + public Long getId() { + return id; + } + public Object getEventToLog() { + return eventToLog; + } + public ZonedDateTime getCreatedAt() { + return createdAt; + } + public ZonedDateTime getSentAt() { + return sentAt; + } + + public static final class Builder { + private final LoggingHouseMessage event = new LoggingHouseMessage(); + + private Builder() { + } + + public static LoggingHouseMessage.Builder newInstance() { + return new LoggingHouseMessage.Builder(); + } + + public LoggingHouseMessage.Builder id(Long id) { + this.event.id = id; + return this; + } + public LoggingHouseMessage.Builder eventToLog(Object eventToLog) { + this.event.eventToLog = eventToLog; + return this; + } + public LoggingHouseMessage.Builder createdAt(ZonedDateTime createdAt) { + this.event.createdAt = createdAt; + return this; + } + public LoggingHouseMessage.Builder sentAt(ZonedDateTime sentAt) { + this.event.sentAt = sentAt; + return this; + } + + public LoggingHouseMessage build() { + Objects.requireNonNull(this.event.eventToLog, "Message eventToLog must not be null"); + Objects.requireNonNull(this.event.createdAt, "Message createdAt must not be null"); + return this.event; + } + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java new file mode 100644 index 0000000..42e0c0b --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import com.truzzt.extension.logginghouse.client.store.sql.schema.LoggingHouseEventStatements; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; + +import java.time.ZonedDateTime; +import java.util.Objects; + +public class SqlLoggingHouseMessageStore extends AbstractSqlStore implements LoggingHouseMessageStore { + + private final LoggingHouseEventStatements statements; + + public SqlLoggingHouseMessageStore(DataSourceRegistry dataSourceRegistry, + String dataSourceName, + TransactionContext transactionContext, + ObjectMapper objectMapper, + LoggingHouseEventStatements statements, + QueryExecutor queryExecutor) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); + this.statements = statements; + } + + @Override + public void save(LoggingHouseMessage event) { + + Objects.requireNonNull(event); + Objects.requireNonNull(event.getEventToLog()); + Objects.requireNonNull(event.getCreatedAt()); + + transactionContext.execute(() -> { + try (var connection = getConnection()) { + queryExecutor.execute(connection, statements.getInsertTemplate(), + toJson(event.getEventToLog()), + mapFromZonedDateTime(event.getCreatedAt()) + ); + + } catch (Exception e) { + throw new EdcPersistenceException(e.getMessage(), e); + } + }); + } + + private Long mapFromZonedDateTime(ZonedDateTime zonedDateTime) { + return zonedDateTime != null ? zonedDateTime.toEpochSecond() : null; + } + +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java new file mode 100644 index 0000000..6017294 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql.schema; + +import static java.lang.String.format; + +public class BaseSqlDialectStatements implements LoggingHouseEventStatements { + + @Override + public String getInsertTemplate() { + return format("INSERT INTO %s (%s, %s) VALUES (?%s, ?)", + getLoggingHouseMessageTable(), + getEventToLogColumn(), + getCreatedAtColumn(), + getFormatAsJsonOperator() + ); + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java new file mode 100644 index 0000000..5854cdb --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql.schema; + +import org.eclipse.edc.sql.dialect.PostgresDialect; + +public interface LoggingHouseEventStatements { + + default String getLoggingHouseMessageTable() { + return "edc_logging_house_message"; + } + + default String getEventToLogColumn() { + return "event_to_log"; + } + + default String getCreatedAtColumn() { + return "created_at"; + } + + String getInsertTemplate(); + + default String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } + +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java new file mode 100644 index 0000000..e48620d --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql.schema.postgres; + +import com.truzzt.extension.logginghouse.client.store.sql.schema.BaseSqlDialectStatements; +import org.eclipse.edc.sql.dialect.PostgresDialect; + +public class PostgresDialectStatements extends BaseSqlDialectStatements { + + @Override + public String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } +} diff --git a/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql index f32e8fd..00c67b6 100644 --- a/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql +++ b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql @@ -1,6 +1,25 @@ +-- +-- Copyright (c) 2024 Daimler TSS GmbH +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- Daimler TSS GmbH - Initial SQL Query +-- + +-- THIS SCHEMA HAS BEEN WRITTEN AND TESTED ONLY FOR POSTGRES + -- table: edc_logging_house_event -CREATE TABLE IF NOT EXISTS edc_logging_house_event +CREATE TABLE IF NOT EXISTS edc_logging_house_message ( - logging_house_event_id VARCHAR NOT NULL, - PRIMARY KEY (logging_house_event_id) + logging_house_message_id BIGSERIAL NOT NULL, + event_to_log JSON NOT NULL, + created_at TIMESTAMP NOT NULL, + sent_at TIMESTAMP NOT NULL, + PRIMARY KEY (logging_house_message_id) ); +COMMENT ON COLUMN edc_logging_house_message.event_to_log IS 'Event to log serialized as JSON';