Skip to content

Commit

Permalink
Clean up exception handling during export
Browse files Browse the repository at this point in the history
Try to ensure we don't swallow errors here, and it correctly fails on a timeout.
  • Loading branch information
chadlwilson committed Jul 17, 2023
1 parent 79c0a40 commit 3bfb62b
Showing 1 changed file with 26 additions and 24 deletions.
50 changes: 26 additions & 24 deletions src/main/java/com/thoughtworks/go/dbsync/DbSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import me.tongfei.progressbar.ProgressBarStyle;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.io.FileUtils;
import org.jooq.Record;
import org.jooq.*;
import org.jooq.conf.RenderNameStyle;
import org.jooq.conf.SettingsTools;
Expand All @@ -42,7 +41,6 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
Expand All @@ -58,13 +56,13 @@ public class DbSync {
static {
System.setProperty("org.jooq.no-logo", "true");
}

static final Logger LOG = LoggerFactory.getLogger(DbSync.class);
private static final long EXPORT_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(90);

private final Args args;

public DbSync(Args args) {
this.args = args;
Executors.newFixedThreadPool(1);
}

public void export() throws Exception {
Expand Down Expand Up @@ -95,7 +93,7 @@ public void export() throws Exception {

if (!tables.isEmpty()) {
LOG.error("Specified target DB is not empty. Contains '{}' tables in public schema.", String.join(", ", tables.keySet()));
LOG.error("Skipping migration.", String.join(", ", tables.keySet()));
LOG.error("Skipping migration.");
System.exit(1);
}
}));
Expand All @@ -116,14 +114,12 @@ public void export() throws Exception {
LOG.info("Done initializing database views.");
});


withDataSource(targetDataSource, (targetConnection) -> {
LOG.info("Copying database records.");
doExport(sourceDataSource, targetDataSource, writer);
LOG.info("Done copying database records.");
});


withDataSource(targetDataSource, (targetConnection) -> {
LOG.info("Setting sequences for all tables.");
resetSequences(targetDataSource, tables.keySet(), writer);
Expand Down Expand Up @@ -194,27 +190,33 @@ private void doExport(BasicDataSource sourceDataSource, DataSource targetDataSou
LOG.info(" {}: {} records", tableName, recordCount);
});

LOG.info("Waiting for record copy to complete...");
try (ProgressBar progressBar = progressBar(tables)) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(args.threads, args.threads, 1L, TimeUnit.SECONDS, new LinkedQueue<>(2));
ThreadPoolExecutor executor = new ThreadPoolExecutor(args.threads, args.threads, 5L, TimeUnit.SECONDS, new LinkedQueue<>(2));
try {
tables.forEach((String tableName, Integer rowCount) -> {
executor.execute(() -> {
try (Connection sourceConnection = sourceDataSource.getConnection()) {
dumpTableSQL(tableName, rowCount, sourceConnection, targetDataSource, writer, progressBar);
} catch (Exception e) {
LOG.error(null, e);
throw new RuntimeException(e);
}
});
});
} finally {
tables.forEach((String tableName, Integer rowCount) -> executor.execute(() -> {
try (Connection sourceConnection = sourceDataSource.getConnection()) {
dumpTableSQL(tableName, rowCount, sourceConnection, targetDataSource, writer, progressBar);
} catch (Exception e) {
LOG.error(null, e);
throw new RuntimeException(e);
}
}));
LOG.debug("Shutting down thread pool executor");
executor.shutdown();
LOG.debug("Awaiting termination of executorService");
try {
executor.awaitTermination(1000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
if (!executor.awaitTermination(EXPORT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
throw new InterruptedException(String.format("Timed out after [%s] seconds waiting for DB migration to complete", EXPORT_TIMEOUT_SECONDS));
}
} catch (RuntimeException e) {
LOG.error(null, e);
throw e;
} catch (Exception e) {
LOG.error(null, e);
throw new RuntimeException(e);
} finally {
if (!executor.isShutdown()) {
LOG.warn("Forcing shutting down copy tasks...");
executor.shutdownNow();
}
}
}
Expand Down

0 comments on commit 3bfb62b

Please sign in to comment.