diff --git a/src/main/java/io/zonky/test/db/postgres/embedded/EmbeddedPostgres.java b/src/main/java/io/zonky/test/db/postgres/embedded/EmbeddedPostgres.java index cd9aef92..e16a503c 100644 --- a/src/main/java/io/zonky/test/db/postgres/embedded/EmbeddedPostgres.java +++ b/src/main/java/io/zonky/test/db/postgres/embedded/EmbeddedPostgres.java @@ -24,11 +24,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousFileChannel; -import java.nio.channels.Channel; -import java.nio.channels.CompletionHandler; -import java.nio.channels.FileLock; -import java.nio.channels.OverlappingFileLockException; +import java.nio.channels.*; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; @@ -49,9 +45,9 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -87,6 +83,8 @@ public class EmbeddedPostgres implements Closeable private static final String PG_SUPERUSER = "postgres"; private static final Duration DEFAULT_PG_STARTUP_WAIT = Duration.ofSeconds(10); private static final String LOCK_FILE_NAME = "epg-lock"; + private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool(); + private static final AtomicInteger active = new AtomicInteger(0); private final File pgDir; @@ -108,6 +106,8 @@ public class EmbeddedPostgres implements Closeable private final ProcessBuilder.Redirect errorRedirector; private final ProcessBuilder.Redirect outputRedirector; + private Process pgProcess; + EmbeddedPostgres(File parentDirectory, File dataDirectory, boolean cleanDataDirectory, Map postgresConfig, Map localeConfig, int port, Map connectConfig, PgBinaryResolver pgBinaryResolver, ProcessBuilder.Redirect errorRedirector, ProcessBuilder.Redirect outputRedirector) throws IOException @@ -237,8 +237,7 @@ private void lock() throws IOException } } - private void initdb() - { + private void initdb() throws IOException { final StopWatch watch = new StopWatch(); watch.start(); List args = new ArrayList<>(); @@ -246,7 +245,11 @@ private void initdb() "-A", "trust", "-U", PG_SUPERUSER, "-D", dataDirectory.getPath(), "-E", "UTF-8")); args.addAll(createLocaleOptions()); - system(INIT_DB, args); + try { + system(INIT_DB, args, true, true).exit.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e.getMessage()); + } LOG.info("{} initdb completed in {}", instanceId, watch); } @@ -262,16 +265,13 @@ private void startPostmaster() throws IOException args.addAll(Arrays.asList("-D", dataDirectory.getPath())); args.addAll(createInitOptions()); - final ProcessBuilder builder = new ProcessBuilder(); - POSTGRES.applyTo(builder, args); + SystemResult result = system(POSTGRES, args); - builder.redirectErrorStream(true); - builder.redirectError(errorRedirector); - builder.redirectOutput(outputRedirector); - final Process postmaster = builder.start(); - - if (outputRedirector.type() == ProcessBuilder.Redirect.Type.PIPE) { - ProcessOutputLogger.logOutput(LOG, postmaster, POSTGRES.processName()); + final Process postmaster; + try { + postmaster = result.initProcess.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e.getMessage()); } LOG.info("{} postmaster started as {} on port {}. Waiting up to {} for server startup to finish.", instanceId, postmaster.toString(), port, pgStartupWait); @@ -311,7 +311,7 @@ private void waitForServerStartup(StopWatch watch) throws IOException Throwable lastCause = null; final long start = System.nanoTime(); final long maxWaitNs = TimeUnit.NANOSECONDS.convert(pgStartupWait.toMillis(), TimeUnit.MILLISECONDS); - while (System.nanoTime() - start < maxWaitNs) { + while (System.nanoTime() - start < (maxWaitNs * Math.max(THREAD_POOL.getActiveCount(), 1))) { try { verifyReady(); LOG.info("{} postmaster startup finished in {}", instanceId, watch); @@ -328,7 +328,7 @@ private void waitForServerStartup(StopWatch watch) throws IOException return; } } - throw new IOException("Gave up waiting for server to start after " + pgStartupWait.toMillis() + "ms", lastCause); + throw new IOException("Gave up waiting for " + instanceId + " server to start after " + (pgStartupWait.toMillis() * Math.max(THREAD_POOL.getActiveCount(), 1)) + "ms", lastCause); } private void verifyReady() throws SQLException @@ -383,7 +383,8 @@ public void close() throws IOException final StopWatch watch = new StopWatch(); watch.start(); try { - pgCtl(dataDirectory, "stop"); + if (pgProcess != null) + pgProcess.destroy(); LOG.info("{} shut down postmaster in {}", instanceId, watch); } catch (final Exception e) { LOG.error("Could not stop postmaster " + instanceId, e); @@ -408,15 +409,18 @@ public void close() throws IOException } } - private void pgCtl(File dir, String action) - { + private void pgCtl(File dir, String action) throws IOException { final List args = new ArrayList<>(); args.addAll(Arrays.asList( "-D", dir.getPath(), action, "-m", PG_STOP_MODE, "-t", PG_STOP_WAIT_S, "-w" )); - system(PG_CTL, args); + try { + system(PG_CTL, args, true).exit.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e.getMessage()); + } } private void cleanOldDataDirectories(File parentDirectory) @@ -610,29 +614,121 @@ public int hashCode() { } } - private void system(Command command, List args) + private int systemThread(ProcessBuilder builder, Command command, SystemResult result, boolean retry, boolean clean) { + AtomicInteger exit = new AtomicInteger(-1); + int retries = 0; + while (exit.get() != 0) { + final Process[] process = new Process[1]; + CompletableFuture initProcess; + if (retries == 0 ) { + initProcess = result.initProcess; + } else { + initProcess = new CompletableFuture<>(); + } + int lastActive = THREAD_POOL.getActiveCount(); + if (lastActive > active.get()) + active.set(lastActive); + Callable task = () -> { + try { + process[0] = builder.start(); + pgProcess = process[0]; + result.process = process[0]; + result.initProcess = initProcess; + initProcess.complete(process[0]); + + if (outputRedirector.type() == ProcessBuilder.Redirect.Type.PIPE) { + ProcessOutputLogger.logOutput(LOG, process[0], command.processName()); + } + return process[0]; + } catch (IOException e) { + initProcess.completeExceptionally(e); + throw e; + } + }; + Future thread = THREAD_POOL.submit(task); + if (retry) { + try { + exit.set(thread.get().waitFor()); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + if (0 != exit.get()) { + LOG.info("Active threads running {}", THREAD_POOL.getActiveCount()); + int currentActive = THREAD_POOL.getActiveCount(); + if (currentActive >= lastActive) + lastActive = currentActive; + if (lastActive > active.get()) + active.set(lastActive); + if (lastActive >= active.get() - 1 && active.get() > 0 && THREAD_POOL.getActiveCount() <= THREAD_POOL.getPoolSize()) { + THREAD_POOL.setMaximumPoolSize(active.decrementAndGet()); + LOG.info("Reduced thread pool size to {}", active.get()); + } + retries++; + if (clean) { + try { + FileUtils.cleanDirectory(dataDirectory); + } catch (IOException e) { + LOG.error("Could not clean up directory {} for retry", dataDirectory.getAbsolutePath()); + result.initProcess.completeExceptionally(e); + result.exit.completeExceptionally(e); + break; + } + } + } + } else { + try { + thread.wait(); + } catch (InterruptedException e) { + result.initProcess.completeExceptionally(new IOException()); + result.exit.completeExceptionally(new IOException("Failed to execute: " + command.processName())); + e.printStackTrace(); + break; + } + } + + if (retries >= 10) { + result.initProcess.completeExceptionally(new IOException()); + result.exit.completeExceptionally(new IOException("Failed to execute: " + command.processName() + ", too many failures.")); + break; + } + + if (!retry) + break; + } + return exit.get(); + } + + private SystemResult system(Command command, List args, boolean retry, boolean clean) { - try { - final ProcessBuilder builder = new ProcessBuilder(); + final ProcessBuilder builder = new ProcessBuilder(); - command.applyTo(builder, args); - builder.redirectErrorStream(true); - builder.redirectError(errorRedirector); - builder.redirectOutput(outputRedirector); + command.applyTo(builder, args); + builder.redirectErrorStream(true); + builder.redirectError(errorRedirector); + builder.redirectOutput(outputRedirector); - final Process process = builder.start(); + SystemResult result = new SystemResult(); + result.initProcess = new CompletableFuture<>(); + result.builder = builder; + result.exit = CompletableFuture.supplyAsync(() -> systemThread(builder, command, result, retry, clean)); + return result; + } - if (outputRedirector.type() == ProcessBuilder.Redirect.Type.PIPE) { - ProcessOutputLogger.logOutput(LOG, process, command.processName()); - } - if (0 != process.waitFor()) { - throw new IllegalStateException(String.format("Process %s failed", builder.command())); - } - } catch (final RuntimeException e) { // NOPMD - throw e; - } catch (final Exception e) { - throw new RuntimeException(e); - } + private SystemResult system(Command command, List args, boolean retry) + { + return system(command, args, retry, false); + } + + private SystemResult system(Command command, List args) + { + return system(command, args, false, false); + } + + private class SystemResult { + ProcessBuilder builder; + CompletableFuture initProcess; + Process process; + CompletableFuture exit; } private static void mkdirs(File dir) diff --git a/src/test/java/io/zonky/test/db/postgres/embedded/InitdbShmgetTest.java b/src/test/java/io/zonky/test/db/postgres/embedded/InitdbShmgetTest.java new file mode 100644 index 00000000..f637636b --- /dev/null +++ b/src/test/java/io/zonky/test/db/postgres/embedded/InitdbShmgetTest.java @@ -0,0 +1,43 @@ +package io.zonky.test.db.postgres.embedded; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class InitdbShmgetTest { + + @Test + public void testEmbeddedPg() throws InterruptedException, ExecutionException { + ExecutorService executor = Executors.newCachedThreadPool(); + List futureList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + futureList.add(new InitdbThread()); + } + try{ + List> futures = executor.invokeAll(futureList); + for(Future future : futures){ + future.get(); + assertTrue(future.isDone()); + } + } finally { + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); + } + } + + class InitdbThread implements Callable { + + public Void call() throws IOException, InterruptedException { + EmbeddedPostgres.Builder databaseBuilder = EmbeddedPostgres.builder(); + EmbeddedPostgres pg = databaseBuilder.start(); + Thread.sleep(5000); + pg.close(); + return null; + } + } +}