From 1ee33536f54cbdefb4aeddb7d1af2d2dc19f8244 Mon Sep 17 00:00:00 2001 From: Chad Wilson Date: Mon, 17 Jul 2023 13:57:28 +0800 Subject: [PATCH] Clean up exception handling during export Try to ensure we don't swallow errors here, and it correctly fails on a timeout. --- .../com/thoughtworks/go/dbsync/DbSync.java | 54 ++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/thoughtworks/go/dbsync/DbSync.java b/src/main/java/com/thoughtworks/go/dbsync/DbSync.java index 8ce3a43..ea1ade6 100644 --- a/src/main/java/com/thoughtworks/go/dbsync/DbSync.java +++ b/src/main/java/com/thoughtworks/go/dbsync/DbSync.java @@ -42,7 +42,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.*; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPOutputStream; @@ -58,13 +57,13 @@ public class DbSync { static { System.setProperty("org.jooq.no-logo", "true"); } - static final Logger LOG = LoggerFactory.getLogger(DbSync.class); + private static final long EXPORT_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(90); + private final Args args; public DbSync(Args args) { this.args = args; - Executors.newFixedThreadPool(1); } public void export() throws Exception { @@ -95,7 +94,7 @@ public void export() throws Exception { if (!tables.isEmpty()) { LOG.error("Specified target DB is not empty. Contains '{}' tables in public schema.", String.join(", ", tables.keySet())); - LOG.error("Skipping migration.", String.join(", ", tables.keySet())); + LOG.error("Skipping migration."); System.exit(1); } })); @@ -116,14 +115,12 @@ public void export() throws Exception { LOG.info("Done initializing database views."); }); - withDataSource(targetDataSource, (targetConnection) -> { LOG.info("Copying database records."); doExport(sourceDataSource, targetDataSource, writer); LOG.info("Done copying database records."); }); - withDataSource(targetDataSource, (targetConnection) -> { LOG.info("Setting sequences for all tables."); resetSequences(targetDataSource, tables.keySet(), writer); @@ -194,27 +191,33 @@ private void doExport(BasicDataSource sourceDataSource, DataSource targetDataSou LOG.info(" {}: {} records", tableName, recordCount); }); + LOG.info("Waiting for record copy to complete..."); try (ProgressBar progressBar = progressBar(tables)) { - ThreadPoolExecutor executor = new ThreadPoolExecutor(args.threads, args.threads, 1L, TimeUnit.SECONDS, new LinkedQueue<>(2)); + ThreadPoolExecutor executor = new ThreadPoolExecutor(args.threads, args.threads, 5L, TimeUnit.SECONDS, new LinkedQueue<>(2)); try { - tables.forEach((String tableName, Integer rowCount) -> { - executor.execute(() -> { - try (Connection sourceConnection = sourceDataSource.getConnection()) { - dumpTableSQL(tableName, rowCount, sourceConnection, targetDataSource, writer, progressBar); - } catch (Exception e) { - LOG.error(null, e); - throw new RuntimeException(e); - } - }); - }); - } finally { + tables.forEach((String tableName, Integer rowCount) -> executor.execute(() -> { + try (Connection sourceConnection = sourceDataSource.getConnection()) { + dumpTableSQL(tableName, rowCount, sourceConnection, targetDataSource, writer, progressBar); + } catch (Exception e) { + LOG.error(null, e); + throw new RuntimeException(e); + } + })); LOG.debug("Shutting down thread pool executor"); executor.shutdown(); - LOG.debug("Awaiting termination of executorService"); - try { - executor.awaitTermination(1000, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); + if (!executor.awaitTermination(EXPORT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + throw new InterruptedException(String.format("Timed out after [%s] seconds waiting for DB migration to complete", EXPORT_TIMEOUT_SECONDS)); + } + } catch (RuntimeException e) { + LOG.error(null, e); + throw e; + } catch (Exception e) { + LOG.error(null, e); + throw new RuntimeException(e); + } finally { + if (!executor.isShutdown()) { + LOG.warn("Forcing shutting down copy tasks..."); + executor.shutdownNow(); } } } @@ -346,9 +349,10 @@ private void dumpTableSQL(String table, Integer rowCount, Connection sourceConne break; } - Field[] fields = records.fields(); + @SuppressWarnings("unchecked") + Field[] fields = (Field[]) records.fields(); - Field idFieldInCurrentSourceTable = (Field) Arrays.stream(fields) + Field idFieldInCurrentSourceTable = Arrays.stream(fields) .filter(field -> field.getName().equalsIgnoreCase("id")) .findFirst() .orElseThrow(() -> new RuntimeException("Unable to determine "));