Skip to content

Commit

Permalink
feat: flyway migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
Glaucio Jannotti committed Jun 18, 2024
1 parent c0215f3 commit 7f86700
Show file tree
Hide file tree
Showing 15 changed files with 338 additions and 8 deletions.
5 changes: 5 additions & 0 deletions logging-house-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ val jsonVersion: String by project
dependencies {
implementation("${edcGroup}:control-plane-core:${edcVersion}")
implementation("${edcGroup}:http-spi:${edcVersion}")
implementation("${edcGroup}:sql-core:${edcVersion}")
implementation("${edcGroup}:transaction-datasource-spi:${edcVersion}")

implementation("com.squareup.okhttp3:okhttp:${okHttpVersion}")
implementation("org.json:json:${jsonVersion}")
Expand All @@ -22,6 +24,9 @@ dependencies {
implementation("de.fraunhofer.iais.eis.ids.infomodel:infomodel-java:1.0.2-basecamp")
implementation("de.fraunhofer.iais.eis.ids.infomodel:infomodel-util:1.0.2-basecamp")

implementation("org.postgresql:postgresql:42.4.5")
implementation("org.flywaydb:flyway-core:9.0.1")

testImplementation("org.assertj:assertj-core:${assertj}")
testImplementation("org.junit.jupiter:junit-jupiter-api:${jupiterVersion}")
testImplementation("org.mockito:mockito-core:${mockitoVersion}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.truzzt.extension.logginghouse.client;

import com.truzzt.extension.logginghouse.client.messages.CreateProcessMessage;
import com.truzzt.extension.logginghouse.client.messages.LogMessage;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@

package com.truzzt.extension.logginghouse.client;

import com.truzzt.extension.logginghouse.client.flyway.FlywayService;
import com.truzzt.extension.logginghouse.client.flyway.migration.DatabaseMigrationManager;
import com.truzzt.extension.logginghouse.client.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.ids.multipart.IdsMultipartSender;
import com.truzzt.extension.logginghouse.client.messages.CreateProcessMessageSender;
import com.truzzt.extension.logginghouse.client.messages.LogMessageSender;
import com.truzzt.extension.logginghouse.client.multipart.IdsMultipartClearingRemoteMessageDispatcher;
import com.truzzt.extension.logginghouse.client.multipart.MultiContextJsonLdSerializer;
import de.fraunhofer.iais.eis.LogMessage;
import de.fraunhofer.iais.eis.RequestMessage;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationAccepted;
Expand Down Expand Up @@ -82,6 +88,15 @@ public class LoggingHouseClientExtension implements ServiceExtension {
@Setting
public static final String LOGGINGHOUSE_CLIENT_EXTENSION_ENABLED = "edc.logginghouse.extension.enabled";

@Setting
private static final String EDC_DATASOURCE_REPAIR_SETTING = "edc.flyway.repair";

@Setting
private static final String FLYWAY_CLEAN_ENABLED = "edc.flyway.clean.enable";

@Setting
private static final String FLYWAY_CLEAN = "edc.flyway.clean";

private URL loggingHouseLogUrl;
public Monitor monitor;
private boolean enabled;
Expand All @@ -107,6 +122,8 @@ public void initialize(ServiceExtensionContext context) {

this.loggingHouseLogUrl = readUrlFromSettings(context);

runFlywayMigrations(context);

registerSerializerClearingHouseMessages(context);
registerClearingHouseMessageSenders(context);

Expand All @@ -129,6 +146,17 @@ private URL readUrlFromSettings(ServiceExtensionContext context) {
}
}

public void runFlywayMigrations(ServiceExtensionContext context) {
var flywayService = new FlywayService(
context.getMonitor(),
context.getSetting(EDC_DATASOURCE_REPAIR_SETTING, false),
context.getSetting(FLYWAY_CLEAN_ENABLED, false),
context.getSetting(FLYWAY_CLEAN, false)
);
var migrationManager = new DatabaseMigrationManager(context.getConfig(), context.getMonitor(), flywayService);
migrationManager.migrate();
}

private void registerEventSubscriber(ServiceExtensionContext context) {
monitor.debug("Registering event subscriber for LoggingHouseClientExtension");

Expand Down Expand Up @@ -192,7 +220,6 @@ private void registerClearingHouseMessageSenders(ServiceExtensionContext context
dispatcherRegistry.register(dispatcher);
}


@Override
public void start() {
if (!enabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (c) 2023 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial implementation
*
*/

package com.truzzt.extension.logginghouse.client.flyway;

import com.truzzt.extension.logginghouse.client.flyway.connection.DatasourceProperties;
import com.truzzt.extension.logginghouse.client.flyway.connection.DriverManagerConnectionFactory;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.persistence.EdcPersistenceException;
import org.eclipse.edc.sql.datasource.ConnectionFactoryDataSource;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.output.MigrateResult;
import org.flywaydb.core.api.output.RepairResult;

import java.util.List;
import javax.sql.DataSource;

public class FlywayService {

private static final String MIGRATION_LOCATION_BASE = "classpath:migration";
private static final String MIGRATION_TABLE_NAME = "flyway_schema_history_logginghouse";

private final Monitor monitor;
private final boolean tryRepairOnFailedMigration;
private final boolean cleanEnabled;
private final boolean clean;

public FlywayService(Monitor monitor, boolean tryRepairOnFailedMigration, boolean cleanEnabled, boolean clean) {
this.monitor = monitor;
this.tryRepairOnFailedMigration = tryRepairOnFailedMigration;
this.cleanEnabled = cleanEnabled;
this.clean = clean;
}

public void cleanDatabase(DatasourceProperties datasourceProperties) {
if (clean) {
monitor.info("Running flyway clean.");
var flyway = setupFlyway(datasourceProperties);
flyway.clean();
}
}

public void migrateDatabase(DatasourceProperties datasourceProperties) {
var flyway = setupFlyway(datasourceProperties);
flyway.info().getInfoResult().migrations.stream()
.map(migration -> "Found migration: %s".formatted(migration.filepath))
.forEach(monitor::info);

try {
var migrateResult = flyway.migrate();
handleFlywayMigrationResult(migrateResult);
} catch (FlywayException e) {
if (tryRepairOnFailedMigration) {
repairAndRetryMigration(flyway);
} else {
throw new EdcPersistenceException("Flyway migration failed", e);
}
}
}

private void repairAndRetryMigration(Flyway flyway) {
try {
var repairResult = flyway.repair();
handleFlywayRepairResult(repairResult);
var migrateResult = flyway.migrate();
handleFlywayMigrationResult(migrateResult);
} catch (FlywayException e) {
throw new EdcPersistenceException("Flyway migration failed", e);
}
}

private void handleFlywayRepairResult(RepairResult repairResult) {
if (!repairResult.repairActions.isEmpty()) {
var repairActions = String.join(", ", repairResult.repairActions);
monitor.info("Repair actions: %s".formatted(repairActions));
}

if (!repairResult.warnings.isEmpty()) {
var warnings = String.join(", ", repairResult.warnings);
throw new EdcPersistenceException("Repairing failed: %s".formatted(warnings));
}
}

private Flyway setupFlyway(DatasourceProperties datasourceProperties) {
var dataSource = getDataSource(datasourceProperties);
var migrationLocations = List.of(MIGRATION_LOCATION_BASE);
return Flyway.configure()
.baselineVersion(MigrationVersion.fromVersion("0.0.0"))
.baselineOnMigrate(true)
.failOnMissingLocations(true)
.dataSource(dataSource)
.table(MIGRATION_TABLE_NAME)
.locations(migrationLocations.toArray(new String[0]))
.cleanDisabled(!cleanEnabled)
.load();
}

private DataSource getDataSource(DatasourceProperties datasourceProperties) {
var connectionFactory = new DriverManagerConnectionFactory(datasourceProperties);
return new ConnectionFactoryDataSource(connectionFactory);
}

private void handleFlywayMigrationResult(MigrateResult migrateResult) {
if (migrateResult.migrationsExecuted > 0) {
monitor.info(String.format(
"Successfully migrated database from version %s to version %s",
migrateResult.initialSchemaVersion,
migrateResult.targetSchemaVersion));
} else {
monitor.info(String.format(
"No migration necessary. Current version is %s",
migrateResult.initialSchemaVersion));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2023 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial implementation
*
*/

package com.truzzt.extension.logginghouse.client.flyway.connection;

import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.system.configuration.Config;

import static org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry.DEFAULT_DATASOURCE;

public class DatasourceProperties {
private static final String LOGGING_HOUSE_DATASOURCE = "logginghouse";

private static final String DATASOURCE_SETTING_NAME = "edc.datasource.%s.name";
private static final String DATASOURCE_SETTING_JDBC_URL = "edc.datasource.%s.url";
private static final String DATASOURCE_SETTING_USER = "edc.datasource.%s.user";
private static final String DATASOURCE_SETTING_PASSWORD = "edc.datasource.%s.password";

private final String name;
private final String jdbcUrl;
private final String user;
private final String password;

public DatasourceProperties(Config config) {
name = getSetting(config, String.format(DATASOURCE_SETTING_NAME, LOGGING_HOUSE_DATASOURCE),
String.format(DATASOURCE_SETTING_NAME, DEFAULT_DATASOURCE));

jdbcUrl = getSetting(config, String.format(DATASOURCE_SETTING_JDBC_URL, LOGGING_HOUSE_DATASOURCE),
String.format(DATASOURCE_SETTING_JDBC_URL, DEFAULT_DATASOURCE));

user = getSetting(config, String.format(DATASOURCE_SETTING_USER, LOGGING_HOUSE_DATASOURCE),
String.format(DATASOURCE_SETTING_USER, DEFAULT_DATASOURCE));

password = getSetting(config, String.format(DATASOURCE_SETTING_PASSWORD, LOGGING_HOUSE_DATASOURCE),
String.format(DATASOURCE_SETTING_PASSWORD, DEFAULT_DATASOURCE));
}

private String getSetting(Config config, String setting, String defaultSetting) {
try {
return config.getString(setting);
} catch (EdcException e) {
return config.getString(defaultSetting);
}
}

public String getName() {
return name;
}

public String getJdbcUrl() {
return jdbcUrl;
}

public String getUser() {
return user;
}

public String getPassword() {
return password;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2023 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial implementation
*
*/

package com.truzzt.extension.logginghouse.client.flyway.connection;

import org.eclipse.edc.spi.persistence.EdcPersistenceException;
import org.eclipse.edc.sql.ConnectionFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

public class DriverManagerConnectionFactory implements ConnectionFactory {

private static final String CONNECTION_PROPERTY_USER = "user";
private static final String CONNECTION_PROPERTY_PASSWORD = "password";

private final DatasourceProperties datasourceProperties;

public DriverManagerConnectionFactory(DatasourceProperties datasourceProperties) {
this.datasourceProperties = datasourceProperties;
}

@Override
public Connection create() {
try {
var properties = new Properties();
properties.setProperty(CONNECTION_PROPERTY_USER, datasourceProperties.getUser());
properties.setProperty(CONNECTION_PROPERTY_PASSWORD, datasourceProperties.getPassword());
return DriverManager.getConnection(datasourceProperties.getJdbcUrl(), properties);
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial implementation
*
*/

package com.truzzt.extension.logginghouse.client.flyway.migration;

import com.truzzt.extension.logginghouse.client.flyway.FlywayService;
import com.truzzt.extension.logginghouse.client.flyway.connection.DatasourceProperties;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;

public class DatabaseMigrationManager {
private final Config config;
private final Monitor monitor;
private final FlywayService flywayService;

public DatabaseMigrationManager(Config config, Monitor monitor, FlywayService flywayService) {
this.config = config;
this.monitor = monitor;
this.flywayService = flywayService;
}

public void migrate() {
var datasourceProperties = new DatasourceProperties(config);
monitor.info("Using datasource %s to apply flyway migrations".formatted(datasourceProperties.getName()));

flywayService.cleanDatabase(datasourceProperties);
flywayService.migrateDatabase(datasourceProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
*
*/

package com.truzzt.extension.logginghouse.client;
package com.truzzt.extension.logginghouse.client.messages;

import com.truzzt.extension.logginghouse.client.multipart.ExtendedMessageProtocolClearing;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;

import java.net.URI;
Expand Down
Loading

0 comments on commit 7f86700

Please sign in to comment.