Skip to content

Commit

Permalink
Clean up exception handling during export
Browse files Browse the repository at this point in the history
Try to ensure we don't swallow errors here, and it correctly fails on a timeout.
  • Loading branch information
chadlwilson committed Jul 17, 2023
1 parent 36140c6 commit 1ee3353
Showing 1 changed file with 29 additions and 25 deletions.
54 changes: 29 additions & 25 deletions src/main/java/com/thoughtworks/go/dbsync/DbSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
Expand All @@ -58,13 +57,13 @@ public class DbSync {
static {
System.setProperty("org.jooq.no-logo", "true");
}

static final Logger LOG = LoggerFactory.getLogger(DbSync.class);
private static final long EXPORT_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(90);

private final Args args;

public DbSync(Args args) {
this.args = args;
Executors.newFixedThreadPool(1);
}

public void export() throws Exception {
Expand Down Expand Up @@ -95,7 +94,7 @@ public void export() throws Exception {

if (!tables.isEmpty()) {
LOG.error("Specified target DB is not empty. Contains '{}' tables in public schema.", String.join(", ", tables.keySet()));
LOG.error("Skipping migration.", String.join(", ", tables.keySet()));
LOG.error("Skipping migration.");
System.exit(1);
}
}));
Expand All @@ -116,14 +115,12 @@ public void export() throws Exception {
LOG.info("Done initializing database views.");
});


withDataSource(targetDataSource, (targetConnection) -> {
LOG.info("Copying database records.");
doExport(sourceDataSource, targetDataSource, writer);
LOG.info("Done copying database records.");
});


withDataSource(targetDataSource, (targetConnection) -> {
LOG.info("Setting sequences for all tables.");
resetSequences(targetDataSource, tables.keySet(), writer);
Expand Down Expand Up @@ -194,27 +191,33 @@ private void doExport(BasicDataSource sourceDataSource, DataSource targetDataSou
LOG.info(" {}: {} records", tableName, recordCount);
});

LOG.info("Waiting for record copy to complete...");
try (ProgressBar progressBar = progressBar(tables)) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(args.threads, args.threads, 1L, TimeUnit.SECONDS, new LinkedQueue<>(2));
ThreadPoolExecutor executor = new ThreadPoolExecutor(args.threads, args.threads, 5L, TimeUnit.SECONDS, new LinkedQueue<>(2));
try {
tables.forEach((String tableName, Integer rowCount) -> {
executor.execute(() -> {
try (Connection sourceConnection = sourceDataSource.getConnection()) {
dumpTableSQL(tableName, rowCount, sourceConnection, targetDataSource, writer, progressBar);
} catch (Exception e) {
LOG.error(null, e);
throw new RuntimeException(e);
}
});
});
} finally {
tables.forEach((String tableName, Integer rowCount) -> executor.execute(() -> {
try (Connection sourceConnection = sourceDataSource.getConnection()) {
dumpTableSQL(tableName, rowCount, sourceConnection, targetDataSource, writer, progressBar);
} catch (Exception e) {
LOG.error(null, e);
throw new RuntimeException(e);
}
}));
LOG.debug("Shutting down thread pool executor");
executor.shutdown();
LOG.debug("Awaiting termination of executorService");
try {
executor.awaitTermination(1000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
if (!executor.awaitTermination(EXPORT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
throw new InterruptedException(String.format("Timed out after [%s] seconds waiting for DB migration to complete", EXPORT_TIMEOUT_SECONDS));
}
} catch (RuntimeException e) {
LOG.error(null, e);
throw e;
} catch (Exception e) {
LOG.error(null, e);
throw new RuntimeException(e);
} finally {
if (!executor.isShutdown()) {
LOG.warn("Forcing shutting down copy tasks...");
executor.shutdownNow();
}
}
}
Expand Down Expand Up @@ -346,9 +349,10 @@ private void dumpTableSQL(String table, Integer rowCount, Connection sourceConne
break;
}

Field<?>[] fields = records.fields();
@SuppressWarnings("unchecked")
Field<Long>[] fields = (Field<Long>[]) records.fields();

Field<Long> idFieldInCurrentSourceTable = (Field<Long>) Arrays.stream(fields)
Field<Long> idFieldInCurrentSourceTable = Arrays.stream(fields)
.filter(field -> field.getName().equalsIgnoreCase("id"))
.findFirst()
.orElseThrow(() -> new RuntimeException("Unable to determine "));
Expand Down

0 comments on commit 1ee3353

Please sign in to comment.