From 7f86700ee8c739c12fd1bcb99d48a373d4141bb2 Mon Sep 17 00:00:00 2001 From: Glaucio Jannotti Date: Tue, 18 Jun 2024 19:31:27 -0300 Subject: [PATCH 1/3] feat: flyway migrations --- logging-house-client/build.gradle.kts | 5 + .../client/IdsClearingHouseServiceImpl.java | 2 + .../client/LoggingHouseClientExtension.java | 29 +++- .../client/flyway/FlywayService.java | 129 ++++++++++++++++++ .../connection/DatasourceProperties.java | 72 ++++++++++ .../DriverManagerConnectionFactory.java | 47 +++++++ .../migration/DatabaseMigrationManager.java | 40 ++++++ .../{ => messages}/CreateProcessMessage.java | 3 +- .../CreateProcessMessageSender.java | 2 +- .../client/{ => messages}/LogMessage.java | 3 +- .../{ => messages}/LogMessageSender.java | 2 +- .../ExtendedMessageProtocolClearing.java | 2 +- ...tipartClearingRemoteMessageDispatcher.java | 2 +- .../MultiContextJsonLdSerializer.java | 2 +- .../migration/V0_0_1__Create_Tables.sql | 6 + 15 files changed, 338 insertions(+), 8 deletions(-) create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DriverManagerConnectionFactory.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/migration/DatabaseMigrationManager.java rename logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/{ => messages}/CreateProcessMessage.java (86%) rename logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/{ => messages}/CreateProcessMessageSender.java (97%) rename logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/{ => messages}/LogMessage.java (86%) rename logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/{ => messages}/LogMessageSender.java (98%) rename logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/{ => multipart}/ExtendedMessageProtocolClearing.java (92%) rename logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/{ => multipart}/IdsMultipartClearingRemoteMessageDispatcher.java (93%) rename logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/{ => multipart}/MultiContextJsonLdSerializer.java (98%) create mode 100644 logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql diff --git a/logging-house-client/build.gradle.kts b/logging-house-client/build.gradle.kts index 07c9610..d7b41a7 100644 --- a/logging-house-client/build.gradle.kts +++ b/logging-house-client/build.gradle.kts @@ -14,6 +14,8 @@ val jsonVersion: String by project dependencies { implementation("${edcGroup}:control-plane-core:${edcVersion}") implementation("${edcGroup}:http-spi:${edcVersion}") + implementation("${edcGroup}:sql-core:${edcVersion}") + implementation("${edcGroup}:transaction-datasource-spi:${edcVersion}") implementation("com.squareup.okhttp3:okhttp:${okHttpVersion}") implementation("org.json:json:${jsonVersion}") @@ -22,6 +24,9 @@ dependencies { implementation("de.fraunhofer.iais.eis.ids.infomodel:infomodel-java:1.0.2-basecamp") implementation("de.fraunhofer.iais.eis.ids.infomodel:infomodel-util:1.0.2-basecamp") + implementation("org.postgresql:postgresql:42.4.5") + implementation("org.flywaydb:flyway-core:9.0.1") + testImplementation("org.assertj:assertj-core:${assertj}") testImplementation("org.junit.jupiter:junit-jupiter-api:${jupiterVersion}") testImplementation("org.mockito:mockito-core:${mockitoVersion}") diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java index 9b6ccf2..deec714 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java @@ -14,6 +14,8 @@ package com.truzzt.extension.logginghouse.client; +import com.truzzt.extension.logginghouse.client.messages.CreateProcessMessage; +import com.truzzt.extension.logginghouse.client.messages.LogMessage; import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java index cd0b8ba..c116f1f 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java @@ -13,8 +13,14 @@ package com.truzzt.extension.logginghouse.client; +import com.truzzt.extension.logginghouse.client.flyway.FlywayService; +import com.truzzt.extension.logginghouse.client.flyway.migration.DatabaseMigrationManager; import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd; import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender; +import com.truzzt.extension.logginghouse.client.messages.CreateProcessMessageSender; +import com.truzzt.extension.logginghouse.client.messages.LogMessageSender; +import com.truzzt.extension.logginghouse.client.multipart.IdsMultipartClearingRemoteMessageDispatcher; +import com.truzzt.extension.logginghouse.client.multipart.MultiContextJsonLdSerializer; import de.fraunhofer.iais.eis.LogMessage; import de.fraunhofer.iais.eis.RequestMessage; import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted; @@ -82,6 +88,15 @@ public class LoggingHouseClientExtension implements ServiceExtension { @Setting public static final String LOGGINGHOUSE_CLIENT_EXTENSION_ENABLED = "edc.logginghouse.extension.enabled"; + @Setting + private static final String EDC_DATASOURCE_REPAIR_SETTING = "edc.flyway.repair"; + + @Setting + private static final String FLYWAY_CLEAN_ENABLED = "edc.flyway.clean.enable"; + + @Setting + private static final String FLYWAY_CLEAN = "edc.flyway.clean"; + private URL loggingHouseLogUrl; public Monitor monitor; private boolean enabled; @@ -107,6 +122,8 @@ public void initialize(ServiceExtensionContext context) { this.loggingHouseLogUrl = readUrlFromSettings(context); + runFlywayMigrations(context); + registerSerializerClearingHouseMessages(context); registerClearingHouseMessageSenders(context); @@ -129,6 +146,17 @@ private URL readUrlFromSettings(ServiceExtensionContext context) { } } + public void runFlywayMigrations(ServiceExtensionContext context) { + var flywayService = new FlywayService( + context.getMonitor(), + context.getSetting(EDC_DATASOURCE_REPAIR_SETTING, false), + context.getSetting(FLYWAY_CLEAN_ENABLED, false), + context.getSetting(FLYWAY_CLEAN, false) + ); + var migrationManager = new DatabaseMigrationManager(context.getConfig(), context.getMonitor(), flywayService); + migrationManager.migrate(); + } + private void registerEventSubscriber(ServiceExtensionContext context) { monitor.debug("Registering event subscriber for LoggingHouseClientExtension"); @@ -192,7 +220,6 @@ private void registerClearingHouseMessageSenders(ServiceExtensionContext context dispatcherRegistry.register(dispatcher); } - @Override public void start() { if (!enabled) { diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java new file mode 100644 index 0000000..177554d --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2023 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.flyway; + +import com.truzzt.extension.logginghouse.client.flyway.connection.DatasourceProperties; +import com.truzzt.extension.logginghouse.client.flyway.connection.DriverManagerConnectionFactory; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.sql.datasource.ConnectionFactoryDataSource; +import org.flywaydb.core.Flyway; +import org.flywaydb.core.api.FlywayException; +import org.flywaydb.core.api.MigrationVersion; +import org.flywaydb.core.api.output.MigrateResult; +import org.flywaydb.core.api.output.RepairResult; + +import java.util.List; +import javax.sql.DataSource; + +public class FlywayService { + + private static final String MIGRATION_LOCATION_BASE = "classpath:migration"; + private static final String MIGRATION_TABLE_NAME = "flyway_schema_history_logginghouse"; + + private final Monitor monitor; + private final boolean tryRepairOnFailedMigration; + private final boolean cleanEnabled; + private final boolean clean; + + public FlywayService(Monitor monitor, boolean tryRepairOnFailedMigration, boolean cleanEnabled, boolean clean) { + this.monitor = monitor; + this.tryRepairOnFailedMigration = tryRepairOnFailedMigration; + this.cleanEnabled = cleanEnabled; + this.clean = clean; + } + + public void cleanDatabase(DatasourceProperties datasourceProperties) { + if (clean) { + monitor.info("Running flyway clean."); + var flyway = setupFlyway(datasourceProperties); + flyway.clean(); + } + } + + public void migrateDatabase(DatasourceProperties datasourceProperties) { + var flyway = setupFlyway(datasourceProperties); + flyway.info().getInfoResult().migrations.stream() + .map(migration -> "Found migration: %s".formatted(migration.filepath)) + .forEach(monitor::info); + + try { + var migrateResult = flyway.migrate(); + handleFlywayMigrationResult(migrateResult); + } catch (FlywayException e) { + if (tryRepairOnFailedMigration) { + repairAndRetryMigration(flyway); + } else { + throw new EdcPersistenceException("Flyway migration failed", e); + } + } + } + + private void repairAndRetryMigration(Flyway flyway) { + try { + var repairResult = flyway.repair(); + handleFlywayRepairResult(repairResult); + var migrateResult = flyway.migrate(); + handleFlywayMigrationResult(migrateResult); + } catch (FlywayException e) { + throw new EdcPersistenceException("Flyway migration failed", e); + } + } + + private void handleFlywayRepairResult(RepairResult repairResult) { + if (!repairResult.repairActions.isEmpty()) { + var repairActions = String.join(", ", repairResult.repairActions); + monitor.info("Repair actions: %s".formatted(repairActions)); + } + + if (!repairResult.warnings.isEmpty()) { + var warnings = String.join(", ", repairResult.warnings); + throw new EdcPersistenceException("Repairing failed: %s".formatted(warnings)); + } + } + + private Flyway setupFlyway(DatasourceProperties datasourceProperties) { + var dataSource = getDataSource(datasourceProperties); + var migrationLocations = List.of(MIGRATION_LOCATION_BASE); + return Flyway.configure() + .baselineVersion(MigrationVersion.fromVersion("0.0.0")) + .baselineOnMigrate(true) + .failOnMissingLocations(true) + .dataSource(dataSource) + .table(MIGRATION_TABLE_NAME) + .locations(migrationLocations.toArray(new String[0])) + .cleanDisabled(!cleanEnabled) + .load(); + } + + private DataSource getDataSource(DatasourceProperties datasourceProperties) { + var connectionFactory = new DriverManagerConnectionFactory(datasourceProperties); + return new ConnectionFactoryDataSource(connectionFactory); + } + + private void handleFlywayMigrationResult(MigrateResult migrateResult) { + if (migrateResult.migrationsExecuted > 0) { + monitor.info(String.format( + "Successfully migrated database from version %s to version %s", + migrateResult.initialSchemaVersion, + migrateResult.targetSchemaVersion)); + } else { + monitor.info(String.format( + "No migration necessary. Current version is %s", + migrateResult.initialSchemaVersion)); + } + } + +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java new file mode 100644 index 0000000..c990604 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DatasourceProperties.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2023 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.flyway.connection; + +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.system.configuration.Config; + +import static org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry.DEFAULT_DATASOURCE; + +public class DatasourceProperties { + private static final String LOGGING_HOUSE_DATASOURCE = "logginghouse"; + + private static final String DATASOURCE_SETTING_NAME = "edc.datasource.%s.name"; + private static final String DATASOURCE_SETTING_JDBC_URL = "edc.datasource.%s.url"; + private static final String DATASOURCE_SETTING_USER = "edc.datasource.%s.user"; + private static final String DATASOURCE_SETTING_PASSWORD = "edc.datasource.%s.password"; + + private final String name; + private final String jdbcUrl; + private final String user; + private final String password; + + public DatasourceProperties(Config config) { + name = getSetting(config, String.format(DATASOURCE_SETTING_NAME, LOGGING_HOUSE_DATASOURCE), + String.format(DATASOURCE_SETTING_NAME, DEFAULT_DATASOURCE)); + + jdbcUrl = getSetting(config, String.format(DATASOURCE_SETTING_JDBC_URL, LOGGING_HOUSE_DATASOURCE), + String.format(DATASOURCE_SETTING_JDBC_URL, DEFAULT_DATASOURCE)); + + user = getSetting(config, String.format(DATASOURCE_SETTING_USER, LOGGING_HOUSE_DATASOURCE), + String.format(DATASOURCE_SETTING_USER, DEFAULT_DATASOURCE)); + + password = getSetting(config, String.format(DATASOURCE_SETTING_PASSWORD, LOGGING_HOUSE_DATASOURCE), + String.format(DATASOURCE_SETTING_PASSWORD, DEFAULT_DATASOURCE)); + } + + private String getSetting(Config config, String setting, String defaultSetting) { + try { + return config.getString(setting); + } catch (EdcException e) { + return config.getString(defaultSetting); + } + } + + public String getName() { + return name; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DriverManagerConnectionFactory.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DriverManagerConnectionFactory.java new file mode 100644 index 0000000..f1b1a53 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/connection/DriverManagerConnectionFactory.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2023 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.flyway.connection; + +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.sql.ConnectionFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +public class DriverManagerConnectionFactory implements ConnectionFactory { + + private static final String CONNECTION_PROPERTY_USER = "user"; + private static final String CONNECTION_PROPERTY_PASSWORD = "password"; + + private final DatasourceProperties datasourceProperties; + + public DriverManagerConnectionFactory(DatasourceProperties datasourceProperties) { + this.datasourceProperties = datasourceProperties; + } + + @Override + public Connection create() { + try { + var properties = new Properties(); + properties.setProperty(CONNECTION_PROPERTY_USER, datasourceProperties.getUser()); + properties.setProperty(CONNECTION_PROPERTY_PASSWORD, datasourceProperties.getPassword()); + return DriverManager.getConnection(datasourceProperties.getJdbcUrl(), properties); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/migration/DatabaseMigrationManager.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/migration/DatabaseMigrationManager.java new file mode 100644 index 0000000..81aab02 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/migration/DatabaseMigrationManager.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.flyway.migration; + +import com.truzzt.extension.logginghouse.client.flyway.FlywayService; +import com.truzzt.extension.logginghouse.client.flyway.connection.DatasourceProperties; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.system.configuration.Config; + +public class DatabaseMigrationManager { + private final Config config; + private final Monitor monitor; + private final FlywayService flywayService; + + public DatabaseMigrationManager(Config config, Monitor monitor, FlywayService flywayService) { + this.config = config; + this.monitor = monitor; + this.flywayService = flywayService; + } + + public void migrate() { + var datasourceProperties = new DatasourceProperties(config); + monitor.info("Using datasource %s to apply flyway migrations".formatted(datasourceProperties.getName())); + + flywayService.cleanDatabase(datasourceProperties); + flywayService.migrateDatabase(datasourceProperties); + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessage.java similarity index 86% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessage.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessage.java index 88854ba..bc1b298 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessage.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessage.java @@ -12,8 +12,9 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.messages; +import com.truzzt.extension.logginghouse.client.multipart.ExtendedMessageProtocolClearing; import org.eclipse.edc.spi.types.domain.message.RemoteMessage; import java.net.URI; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessageSender.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessageSender.java similarity index 97% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessageSender.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessageSender.java index ee3c71d..35ab185 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/CreateProcessMessageSender.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/CreateProcessMessageSender.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.messages; import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd; import com.truzzt.extension.logginghouse.client.ids.multipart.CalendarUtil; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessage.java similarity index 86% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessage.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessage.java index e28d206..7633f2a 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessage.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessage.java @@ -13,8 +13,9 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.messages; +import com.truzzt.extension.logginghouse.client.multipart.ExtendedMessageProtocolClearing; import org.eclipse.edc.spi.types.domain.message.RemoteMessage; import java.net.URI; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessageSender.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessageSender.java similarity index 98% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessageSender.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessageSender.java index b5a9139..9a014bf 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LogMessageSender.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/messages/LogMessageSender.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.messages; import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd; import com.truzzt.extension.logginghouse.client.ids.multipart.CalendarUtil; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ExtendedMessageProtocolClearing.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ExtendedMessageProtocolClearing.java similarity index 92% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ExtendedMessageProtocolClearing.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ExtendedMessageProtocolClearing.java index 71ba1fc..e32cec8 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/ExtendedMessageProtocolClearing.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/ExtendedMessageProtocolClearing.java @@ -13,7 +13,7 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.multipart; public final class ExtendedMessageProtocolClearing { diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsMultipartClearingRemoteMessageDispatcher.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java similarity index 93% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsMultipartClearingRemoteMessageDispatcher.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java index 6f757cc..e08e965 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsMultipartClearingRemoteMessageDispatcher.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/IdsMultipartClearingRemoteMessageDispatcher.java @@ -12,7 +12,7 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.multipart; import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartRemoteMessageDispatcher; import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender; diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/MultiContextJsonLdSerializer.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/MultiContextJsonLdSerializer.java similarity index 98% rename from logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/MultiContextJsonLdSerializer.java rename to logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/MultiContextJsonLdSerializer.java index 985dfb2..721cf55 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/MultiContextJsonLdSerializer.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/multipart/MultiContextJsonLdSerializer.java @@ -13,7 +13,7 @@ * */ -package com.truzzt.extension.logginghouse.client; +package com.truzzt.extension.logginghouse.client.multipart; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.core.JsonGenerator; diff --git a/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql new file mode 100644 index 0000000..f32e8fd --- /dev/null +++ b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql @@ -0,0 +1,6 @@ +-- table: edc_logging_house_event +CREATE TABLE IF NOT EXISTS edc_logging_house_event +( + logging_house_event_id VARCHAR NOT NULL, + PRIMARY KEY (logging_house_event_id) +); From 3df5a4b700f1446ba7f5d9ad8d24a3fec804d6ea Mon Sep 17 00:00:00 2001 From: Glaucio Jannotti Date: Fri, 28 Jun 2024 09:34:44 -0300 Subject: [PATCH 2/3] feat: logging house messages store --- .../client/IdsClearingHouseServiceImpl.java | 168 ------------------ .../client/LoggingHouseClientExtension.java | 42 +++-- .../client/LoggingHouseEventSubscriber.java | 104 +++++++++++ .../spi/store/LoggingHouseMessageStore.java | 22 +++ .../client/spi/types/LoggingHouseMessage.java | 72 ++++++++ .../sql/SqlLoggingHouseMessageStore.java | 69 +++++++ .../sql/schema/BaseSqlDialectStatements.java | 31 ++++ .../schema/LoggingHouseEventStatements.java | 40 +++++ .../postgres/PostgresDialectStatements.java | 27 +++ .../migration/V0_0_1__Create_Tables.sql | 25 ++- 10 files changed, 419 insertions(+), 181 deletions(-) delete mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java create mode 100644 logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java deleted file mode 100644 index deec714..0000000 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/IdsClearingHouseServiceImpl.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (c) 2022 sovity GmbH - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * sovity GmbH - initial API and implementation - * - */ - -package com.truzzt.extension.logginghouse.client; - -import com.truzzt.extension.logginghouse.client.messages.CreateProcessMessage; -import com.truzzt.extension.logginghouse.client.messages.LogMessage; -import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; -import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; -import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; -import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; -import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; -import org.eclipse.edc.spi.EdcException; -import org.eclipse.edc.spi.event.Event; -import org.eclipse.edc.spi.event.EventEnvelope; -import org.eclipse.edc.spi.event.EventSubscriber; -import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; -import org.eclipse.edc.spi.monitor.Monitor; -import org.eclipse.edc.spi.system.Hostname; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -public class IdsClearingHouseServiceImpl implements EventSubscriber { - - private final RemoteMessageDispatcherRegistry dispatcherRegistry; - private final URI connectorBaseUrl; - private final URL clearingHouseLogUrl; - private final ContractNegotiationStore contractNegotiationStore; - private final TransferProcessStore transferProcessStore; - private final Monitor monitor; - - public IdsClearingHouseServiceImpl( - RemoteMessageDispatcherRegistry dispatcherRegistry, - Hostname hostname, - URL clearingHouseLogUrl, - ContractNegotiationStore contractNegotiationStore, - TransferProcessStore transferProcessStore, - Monitor monitor) { - this.dispatcherRegistry = dispatcherRegistry; - this.clearingHouseLogUrl = clearingHouseLogUrl; - this.contractNegotiationStore = contractNegotiationStore; - this.transferProcessStore = transferProcessStore; - this.monitor = monitor; - - try { - connectorBaseUrl = getConnectorBaseUrl(hostname); - } catch (URISyntaxException e) { - throw new EdcException("Could not create connectorBaseUrl. Hostname can be set using:" + - " edc.hostname", e); - } - } - - public void createProcess(ContractAgreement contractAgreement, URL clearingHouseLogUrl) { - // Create PID - List processOwners = new ArrayList<>(); - processOwners.add(contractAgreement.getConsumerId()); - processOwners.add(contractAgreement.getProviderId()); - - monitor.info("Creating Process in LoggingHouse"); - var logMessage = new CreateProcessMessage(clearingHouseLogUrl, connectorBaseUrl, contractAgreement.getId(), processOwners); - - try { - dispatcherRegistry.dispatch(Object.class, logMessage); - } catch (EdcException e) { - if (e.getMessage().startsWith("No provider dispatcher registered for protocol")) { - throw e; - } else { - monitor.severe("Unhandled exception while creating process in LoggingHouse. " + e.getMessage()); - // Print stack trace - String errorStr; - try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { - e.printStackTrace(pw); - errorStr = sw.toString(); - } catch (IOException ex) { - throw new RuntimeException("Error while converting the stacktrace"); - } - monitor.severe(errorStr); - } - } - } - - public void logContractAgreement(ContractAgreement contractAgreement, URL clearingHouseLogUrl) { - monitor.info("Logging ContractAgreement to LoggingHouse with contract id: " + contractAgreement.getId()); - var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, contractAgreement); - dispatcherRegistry.dispatch(Object.class, logMessage); - } - - public void logTransferProcess(TransferProcess transferProcess, URL clearingHouseLogUrl) { - monitor.info("Logging TransferProcess to LoggingHouse"); - var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, transferProcess); - dispatcherRegistry.dispatch(Object.class, logMessage); - } - - @Override - public void on(EventEnvelope event) { - try { - if (event.getPayload() instanceof ContractNegotiationFinalized contractNegotiationFinalized) { - var contractAgreement = resolveContractAgreement(contractNegotiationFinalized); - var pid = contractAgreement.getId(); - - // Create Process - var extendedProcessUrl = new URL(clearingHouseLogUrl + "/process/" + pid); - try { - createProcess(contractAgreement, extendedProcessUrl); - } catch (Exception e) { - monitor.warning("Could not create process in LoggingHouse: " + e.getMessage()); - } - - // Log Contract Agreement - var extendedLogUrl = new URL(clearingHouseLogUrl + "/messages/log/" + pid); - try { - logContractAgreement(contractAgreement, extendedLogUrl); - } catch (Exception e) { - monitor.warning("Could not log ContractNegotiation to LoggingHouse: " + e.getMessage()); - } - } else if (event.getPayload() instanceof TransferProcessEvent transferProcessEvent) { - monitor.debug("Logging transfer event with id " + event.getId()); - - var transferProcess = resolveTransferProcess(transferProcessEvent); - var pid = transferProcess.getContractId(); - var extendedUrl = new URL(clearingHouseLogUrl + "/messages/log/" + pid); - try { - logTransferProcess(transferProcess, extendedUrl); - } catch (Exception e) { - monitor.warning("Could not log TransferProcess to LoggingHouse: " + e.getMessage()); - } - } - } catch (MalformedURLException e) { - throw new EdcException("Could not create extended clearinghouse url."); - } - } - - private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) throws NullPointerException { - var contractNegotiationId = contractNegotiationFinalized.getContractNegotiationId(); - var contractNegotiation = contractNegotiationStore.findById(contractNegotiationId); - return Objects.requireNonNull(contractNegotiation).getContractAgreement(); - } - - private TransferProcess resolveTransferProcess(TransferProcessEvent transferProcessEvent) { - var transferProcessId = transferProcessEvent.getTransferProcessId(); - return transferProcessStore.findById(transferProcessId); - } - - private URI getConnectorBaseUrl(Hostname hostname) throws URISyntaxException { - return new URI(String.format("https://%s/", hostname.get())); - } -} \ No newline at end of file diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java index c116f1f..714745b 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java @@ -8,6 +8,7 @@ * SPDX-License-Identifier: Apache-2.0 * * Contributors: + * truzzt GmbH - Initial implementation * */ @@ -21,6 +22,8 @@ import com.truzzt.extension.logginghouse.client.messages.LogMessageSender; import com.truzzt.extension.logginghouse.client.multipart.IdsMultipartClearingRemoteMessageDispatcher; import com.truzzt.extension.logginghouse.client.multipart.MultiContextJsonLdSerializer; +import com.truzzt.extension.logginghouse.client.store.sql.SqlLoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.store.sql.schema.postgres.PostgresDialectStatements; import de.fraunhofer.iais.eis.LogMessage; import de.fraunhofer.iais.eis.RequestMessage; import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted; @@ -48,6 +51,9 @@ import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; import java.net.MalformedURLException; import java.net.URL; @@ -55,7 +61,6 @@ public class LoggingHouseClientExtension implements ServiceExtension { - public static final String LOGGINGHOUSE_CLIENT_EXTENSION = "LoggingHouseClientExtension"; private static final String TYPE_MANAGER_SERIALIZER_KEY = "ids-clearinghouse"; @@ -71,6 +76,13 @@ public class LoggingHouseClientExtension implements ServiceExtension { @Inject private Hostname hostname; + @Inject + private DataSourceRegistry dataSourceRegistry; + @Inject + private TransactionContext transactionContext; + @Inject + private QueryExecutor queryExecutor; + @Inject private ContractNegotiationStore contractNegotiationStore; @Inject @@ -101,7 +113,6 @@ public class LoggingHouseClientExtension implements ServiceExtension { public Monitor monitor; private boolean enabled; - @Override public String name() { return LOGGINGHOUSE_CLIENT_EXTENSION; @@ -127,7 +138,9 @@ public void initialize(ServiceExtensionContext context) { registerSerializerClearingHouseMessages(context); registerClearingHouseMessageSenders(context); - registerEventSubscriber(context); + var loggingHouseMessageStore = initializeLoggingHouseMessageStore(typeManager); + + registerEventSubscriber(context, loggingHouseMessageStore); } private URL readUrlFromSettings(ServiceExtensionContext context) { @@ -146,7 +159,7 @@ private URL readUrlFromSettings(ServiceExtensionContext context) { } } - public void runFlywayMigrations(ServiceExtensionContext context) { + private void runFlywayMigrations(ServiceExtensionContext context) { var flywayService = new FlywayService( context.getMonitor(), context.getSetting(EDC_DATASOURCE_REPAIR_SETTING, false), @@ -157,13 +170,22 @@ public void runFlywayMigrations(ServiceExtensionContext context) { migrationManager.migrate(); } - private void registerEventSubscriber(ServiceExtensionContext context) { + private SqlLoggingHouseMessageStore initializeLoggingHouseMessageStore(TypeManager typeManager) { + return new SqlLoggingHouseMessageStore( + dataSourceRegistry, + DataSourceRegistry.DEFAULT_DATASOURCE, + transactionContext, + typeManager.getMapper(), + new PostgresDialectStatements(), + queryExecutor + ); + } + + private void registerEventSubscriber(ServiceExtensionContext context, SqlLoggingHouseMessageStore loggingHouseMessageStore) { monitor.debug("Registering event subscriber for LoggingHouseClientExtension"); - var eventSubscriber = new IdsClearingHouseServiceImpl( - dispatcherRegistry, - hostname, - loggingHouseLogUrl, + var eventSubscriber = new LoggingHouseEventSubscriber( + loggingHouseMessageStore, contractNegotiationStore, transferProcessStore, monitor); @@ -178,7 +200,7 @@ private void registerEventSubscriber(ServiceExtensionContext context) { eventRouter.registerSync(TransferProcessCompleted.class, eventSubscriber); eventRouter.registerSync(TransferProcessFailed.class, eventSubscriber); eventRouter.registerSync(TransferProcessTerminated.class, eventSubscriber); - context.registerService(IdsClearingHouseServiceImpl.class, eventSubscriber); + context.registerService(LoggingHouseEventSubscriber.class, eventSubscriber); monitor.debug("Registered event subscriber for LoggingHouseClientExtension"); } diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java new file mode 100644 index 0000000..76d98ad --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseEventSubscriber.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2022 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package com.truzzt.extension.logginghouse.client; + +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized; +import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; +import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; +import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.spi.event.Event; +import org.eclipse.edc.spi.event.EventEnvelope; +import org.eclipse.edc.spi.event.EventSubscriber; +import org.eclipse.edc.spi.monitor.Monitor; + +import java.time.ZonedDateTime; +import java.util.Objects; + +public class LoggingHouseEventSubscriber implements EventSubscriber { + + private final LoggingHouseMessageStore loggingHouseMessageStore; + private final ContractNegotiationStore contractNegotiationStore; + private final TransferProcessStore transferProcessStore; + private final Monitor monitor; + + public LoggingHouseEventSubscriber( + LoggingHouseMessageStore loggingHouseMessageStore, + ContractNegotiationStore contractNegotiationStore, + TransferProcessStore transferProcessStore, + Monitor monitor) { + this.loggingHouseMessageStore = loggingHouseMessageStore; + this.contractNegotiationStore = contractNegotiationStore; + this.transferProcessStore = transferProcessStore; + this.monitor = monitor; + } + + public void storeContractAgreement(ContractAgreement contractAgreement) { + monitor.info("Storing ContractAgreement to send to LoggingHouse"); + + var message = LoggingHouseMessage.Builder.newInstance() + .eventToLog(contractAgreement) + .createdAt(ZonedDateTime.now()) + .build(); + loggingHouseMessageStore.save(message); + } + + public void storeTransferProcess(TransferProcess transferProcess) { + monitor.info("Storing TransferProcess to send to LoggingHouse"); + + var message = LoggingHouseMessage.Builder.newInstance() + .eventToLog(transferProcess) + .createdAt(ZonedDateTime.now()) + .build(); + loggingHouseMessageStore.save(message); + } + + @Override + public void on(EventEnvelope event) { + if (event.getPayload() instanceof ContractNegotiationFinalized contractNegotiationFinalized) { + monitor.debug("Storing ContractNegotiationFinalized event with id " + event.getId()); + + var contractAgreement = resolveContractAgreement(contractNegotiationFinalized); + try { + storeContractAgreement(contractAgreement); + } catch (Exception e) { + monitor.warning("Could not store ContractNegotiation: " + e.getMessage()); + } + } else if (event.getPayload() instanceof TransferProcessEvent transferProcessEvent) { + monitor.debug("Storing TransferProcess event with id " + event.getId()); + + var transferProcess = resolveTransferProcess(transferProcessEvent); + try { + storeTransferProcess(transferProcess); + } catch (Exception e) { + monitor.warning("Could not store TransferProcess: " + e.getMessage()); + } + } + } + + private ContractAgreement resolveContractAgreement(ContractNegotiationFinalized contractNegotiationFinalized) throws NullPointerException { + var contractNegotiationId = contractNegotiationFinalized.getContractNegotiationId(); + var contractNegotiation = contractNegotiationStore.findById(contractNegotiationId); + return Objects.requireNonNull(contractNegotiation).getContractAgreement(); + } + + private TransferProcess resolveTransferProcess(TransferProcessEvent transferProcessEvent) { + var transferProcessId = transferProcessEvent.getTransferProcessId(); + return transferProcessStore.findById(transferProcessId); + } +} \ No newline at end of file diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java new file mode 100644 index 0000000..d04fa72 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/store/LoggingHouseMessageStore.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2024 truzzt GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * truzzt GmbH - Initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.spi.store; + +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; + +public interface LoggingHouseMessageStore { + + void save(LoggingHouseMessage message); +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java new file mode 100644 index 0000000..7253f91 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/spi/types/LoggingHouseMessage.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2024 truzzt GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * truzzt GmbH - Initial implementation + * + */ + +package com.truzzt.extension.logginghouse.client.spi.types; + +import java.time.ZonedDateTime; +import java.util.Objects; + +public class LoggingHouseMessage { + private Long id; + private Object eventToLog; + private ZonedDateTime createdAt; + private ZonedDateTime sentAt; + + public Long getId() { + return id; + } + public Object getEventToLog() { + return eventToLog; + } + public ZonedDateTime getCreatedAt() { + return createdAt; + } + public ZonedDateTime getSentAt() { + return sentAt; + } + + public static final class Builder { + private final LoggingHouseMessage event = new LoggingHouseMessage(); + + private Builder() { + } + + public static LoggingHouseMessage.Builder newInstance() { + return new LoggingHouseMessage.Builder(); + } + + public LoggingHouseMessage.Builder id(Long id) { + this.event.id = id; + return this; + } + public LoggingHouseMessage.Builder eventToLog(Object eventToLog) { + this.event.eventToLog = eventToLog; + return this; + } + public LoggingHouseMessage.Builder createdAt(ZonedDateTime createdAt) { + this.event.createdAt = createdAt; + return this; + } + public LoggingHouseMessage.Builder sentAt(ZonedDateTime sentAt) { + this.event.sentAt = sentAt; + return this; + } + + public LoggingHouseMessage build() { + Objects.requireNonNull(this.event.eventToLog, "Message eventToLog must not be null"); + Objects.requireNonNull(this.event.createdAt, "Message createdAt must not be null"); + return this.event; + } + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java new file mode 100644 index 0000000..42e0c0b --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/SqlLoggingHouseMessageStore.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore; +import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage; +import com.truzzt.extension.logginghouse.client.store.sql.schema.LoggingHouseEventStatements; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; + +import java.time.ZonedDateTime; +import java.util.Objects; + +public class SqlLoggingHouseMessageStore extends AbstractSqlStore implements LoggingHouseMessageStore { + + private final LoggingHouseEventStatements statements; + + public SqlLoggingHouseMessageStore(DataSourceRegistry dataSourceRegistry, + String dataSourceName, + TransactionContext transactionContext, + ObjectMapper objectMapper, + LoggingHouseEventStatements statements, + QueryExecutor queryExecutor) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); + this.statements = statements; + } + + @Override + public void save(LoggingHouseMessage event) { + + Objects.requireNonNull(event); + Objects.requireNonNull(event.getEventToLog()); + Objects.requireNonNull(event.getCreatedAt()); + + transactionContext.execute(() -> { + try (var connection = getConnection()) { + queryExecutor.execute(connection, statements.getInsertTemplate(), + toJson(event.getEventToLog()), + mapFromZonedDateTime(event.getCreatedAt()) + ); + + } catch (Exception e) { + throw new EdcPersistenceException(e.getMessage(), e); + } + }); + } + + private Long mapFromZonedDateTime(ZonedDateTime zonedDateTime) { + return zonedDateTime != null ? zonedDateTime.toEpochSecond() : null; + } + +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java new file mode 100644 index 0000000..6017294 --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/BaseSqlDialectStatements.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql.schema; + +import static java.lang.String.format; + +public class BaseSqlDialectStatements implements LoggingHouseEventStatements { + + @Override + public String getInsertTemplate() { + return format("INSERT INTO %s (%s, %s) VALUES (?%s, ?)", + getLoggingHouseMessageTable(), + getEventToLogColumn(), + getCreatedAtColumn(), + getFormatAsJsonOperator() + ); + } +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java new file mode 100644 index 0000000..5854cdb --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/LoggingHouseEventStatements.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql.schema; + +import org.eclipse.edc.sql.dialect.PostgresDialect; + +public interface LoggingHouseEventStatements { + + default String getLoggingHouseMessageTable() { + return "edc_logging_house_message"; + } + + default String getEventToLogColumn() { + return "event_to_log"; + } + + default String getCreatedAtColumn() { + return "created_at"; + } + + String getInsertTemplate(); + + default String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } + +} diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java new file mode 100644 index 0000000..e48620d --- /dev/null +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/store/sql/schema/postgres/PostgresDialectStatements.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - Initial implementation + * truzzt GmbH - PostgreSQL implementation + * + */ + +package com.truzzt.extension.logginghouse.client.store.sql.schema.postgres; + +import com.truzzt.extension.logginghouse.client.store.sql.schema.BaseSqlDialectStatements; +import org.eclipse.edc.sql.dialect.PostgresDialect; + +public class PostgresDialectStatements extends BaseSqlDialectStatements { + + @Override + public String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } +} diff --git a/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql index f32e8fd..00c67b6 100644 --- a/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql +++ b/logging-house-client/src/main/resources/migration/V0_0_1__Create_Tables.sql @@ -1,6 +1,25 @@ +-- +-- Copyright (c) 2024 Daimler TSS GmbH +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- Daimler TSS GmbH - Initial SQL Query +-- + +-- THIS SCHEMA HAS BEEN WRITTEN AND TESTED ONLY FOR POSTGRES + -- table: edc_logging_house_event -CREATE TABLE IF NOT EXISTS edc_logging_house_event +CREATE TABLE IF NOT EXISTS edc_logging_house_message ( - logging_house_event_id VARCHAR NOT NULL, - PRIMARY KEY (logging_house_event_id) + logging_house_message_id BIGSERIAL NOT NULL, + event_to_log JSON NOT NULL, + created_at TIMESTAMP NOT NULL, + sent_at TIMESTAMP NOT NULL, + PRIMARY KEY (logging_house_message_id) ); +COMMENT ON COLUMN edc_logging_house_message.event_to_log IS 'Event to log serialized as JSON'; From cb9127cec0fb1185be812b5b762a51e39ac4c721 Mon Sep 17 00:00:00 2001 From: Glaucio Jannotti Date: Mon, 1 Jul 2024 16:45:13 -0300 Subject: [PATCH 3/3] feat: logging house messages store --- .../logginghouse/client/LoggingHouseClientExtension.java | 4 ---- .../extension/logginghouse/client/flyway/FlywayService.java | 6 ++---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java index 714745b..24f4dfe 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/LoggingHouseClientExtension.java @@ -103,9 +103,6 @@ public class LoggingHouseClientExtension implements ServiceExtension { @Setting private static final String EDC_DATASOURCE_REPAIR_SETTING = "edc.flyway.repair"; - @Setting - private static final String FLYWAY_CLEAN_ENABLED = "edc.flyway.clean.enable"; - @Setting private static final String FLYWAY_CLEAN = "edc.flyway.clean"; @@ -163,7 +160,6 @@ private void runFlywayMigrations(ServiceExtensionContext context) { var flywayService = new FlywayService( context.getMonitor(), context.getSetting(EDC_DATASOURCE_REPAIR_SETTING, false), - context.getSetting(FLYWAY_CLEAN_ENABLED, false), context.getSetting(FLYWAY_CLEAN, false) ); var migrationManager = new DatabaseMigrationManager(context.getConfig(), context.getMonitor(), flywayService); diff --git a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java index 177554d..4863dd9 100644 --- a/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java +++ b/logging-house-client/src/main/java/com/truzzt/extension/logginghouse/client/flyway/FlywayService.java @@ -35,13 +35,11 @@ public class FlywayService { private final Monitor monitor; private final boolean tryRepairOnFailedMigration; - private final boolean cleanEnabled; private final boolean clean; - public FlywayService(Monitor monitor, boolean tryRepairOnFailedMigration, boolean cleanEnabled, boolean clean) { + public FlywayService(Monitor monitor, boolean tryRepairOnFailedMigration, boolean clean) { this.monitor = monitor; this.tryRepairOnFailedMigration = tryRepairOnFailedMigration; - this.cleanEnabled = cleanEnabled; this.clean = clean; } @@ -104,7 +102,7 @@ private Flyway setupFlyway(DatasourceProperties datasourceProperties) { .dataSource(dataSource) .table(MIGRATION_TABLE_NAME) .locations(migrationLocations.toArray(new String[0])) - .cleanDisabled(!cleanEnabled) + .cleanDisabled(!clean) .load(); }