Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't allow multiple initdb processes on OSX. #42

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 140 additions & 44 deletions src/main/java/io/zonky/test/db/postgres/embedded/EmbeddedPostgres.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.channels.*;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -49,9 +45,9 @@
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -87,6 +83,8 @@ public class EmbeddedPostgres implements Closeable
private static final String PG_SUPERUSER = "postgres";
private static final Duration DEFAULT_PG_STARTUP_WAIT = Duration.ofSeconds(10);
private static final String LOCK_FILE_NAME = "epg-lock";
private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool();
private static final AtomicInteger active = new AtomicInteger(0);

private final File pgDir;

Expand All @@ -108,6 +106,8 @@ public class EmbeddedPostgres implements Closeable
private final ProcessBuilder.Redirect errorRedirector;
private final ProcessBuilder.Redirect outputRedirector;

private Process pgProcess;

EmbeddedPostgres(File parentDirectory, File dataDirectory, boolean cleanDataDirectory,
Map<String, String> postgresConfig, Map<String, String> localeConfig, int port, Map<String, String> connectConfig,
PgBinaryResolver pgBinaryResolver, ProcessBuilder.Redirect errorRedirector, ProcessBuilder.Redirect outputRedirector) throws IOException
Expand Down Expand Up @@ -237,16 +237,19 @@ private void lock() throws IOException
}
}

private void initdb()
{
private void initdb() throws IOException {
final StopWatch watch = new StopWatch();
watch.start();
List<String> args = new ArrayList<>();
args.addAll(Arrays.asList(
"-A", "trust", "-U", PG_SUPERUSER,
"-D", dataDirectory.getPath(), "-E", "UTF-8"));
args.addAll(createLocaleOptions());
system(INIT_DB, args);
try {
system(INIT_DB, args, true, true).exit.get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e.getMessage());
}
LOG.info("{} initdb completed in {}", instanceId, watch);
}

Expand All @@ -262,16 +265,13 @@ private void startPostmaster() throws IOException
args.addAll(Arrays.asList("-D", dataDirectory.getPath()));
args.addAll(createInitOptions());

final ProcessBuilder builder = new ProcessBuilder();
POSTGRES.applyTo(builder, args);
SystemResult result = system(POSTGRES, args);

builder.redirectErrorStream(true);
builder.redirectError(errorRedirector);
builder.redirectOutput(outputRedirector);
final Process postmaster = builder.start();

if (outputRedirector.type() == ProcessBuilder.Redirect.Type.PIPE) {
ProcessOutputLogger.logOutput(LOG, postmaster, POSTGRES.processName());
final Process postmaster;
try {
postmaster = result.initProcess.get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e.getMessage());
}

LOG.info("{} postmaster started as {} on port {}. Waiting up to {} for server startup to finish.", instanceId, postmaster.toString(), port, pgStartupWait);
Expand Down Expand Up @@ -311,7 +311,7 @@ private void waitForServerStartup(StopWatch watch) throws IOException
Throwable lastCause = null;
final long start = System.nanoTime();
final long maxWaitNs = TimeUnit.NANOSECONDS.convert(pgStartupWait.toMillis(), TimeUnit.MILLISECONDS);
while (System.nanoTime() - start < maxWaitNs) {
while (System.nanoTime() - start < (maxWaitNs * Math.max(THREAD_POOL.getActiveCount(), 1))) {
try {
verifyReady();
LOG.info("{} postmaster startup finished in {}", instanceId, watch);
Expand All @@ -328,7 +328,7 @@ private void waitForServerStartup(StopWatch watch) throws IOException
return;
}
}
throw new IOException("Gave up waiting for server to start after " + pgStartupWait.toMillis() + "ms", lastCause);
throw new IOException("Gave up waiting for " + instanceId + " server to start after " + (pgStartupWait.toMillis() * Math.max(THREAD_POOL.getActiveCount(), 1)) + "ms", lastCause);
}

private void verifyReady() throws SQLException
Expand Down Expand Up @@ -383,7 +383,8 @@ public void close() throws IOException
final StopWatch watch = new StopWatch();
watch.start();
try {
pgCtl(dataDirectory, "stop");
if (pgProcess != null)
pgProcess.destroy();
LOG.info("{} shut down postmaster in {}", instanceId, watch);
} catch (final Exception e) {
LOG.error("Could not stop postmaster " + instanceId, e);
Expand All @@ -408,15 +409,18 @@ public void close() throws IOException
}
}

private void pgCtl(File dir, String action)
{
private void pgCtl(File dir, String action) throws IOException {
final List<String> args = new ArrayList<>();
args.addAll(Arrays.asList(
"-D", dir.getPath(), action,
"-m", PG_STOP_MODE, "-t",
PG_STOP_WAIT_S, "-w"
));
system(PG_CTL, args);
try {
system(PG_CTL, args, true).exit.get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e.getMessage());
}
}

private void cleanOldDataDirectories(File parentDirectory)
Expand Down Expand Up @@ -610,29 +614,121 @@ public int hashCode() {
}
}

private void system(Command command, List<String> args)
private int systemThread(ProcessBuilder builder, Command command, SystemResult result, boolean retry, boolean clean) {
AtomicInteger exit = new AtomicInteger(-1);
int retries = 0;
while (exit.get() != 0) {
final Process[] process = new Process[1];
CompletableFuture<Process> initProcess;
if (retries == 0 ) {
initProcess = result.initProcess;
} else {
initProcess = new CompletableFuture<>();
}
int lastActive = THREAD_POOL.getActiveCount();
if (lastActive > active.get())
active.set(lastActive);
Callable<Process> task = () -> {
try {
process[0] = builder.start();
pgProcess = process[0];
result.process = process[0];
result.initProcess = initProcess;
initProcess.complete(process[0]);

if (outputRedirector.type() == ProcessBuilder.Redirect.Type.PIPE) {
ProcessOutputLogger.logOutput(LOG, process[0], command.processName());
}
return process[0];
} catch (IOException e) {
initProcess.completeExceptionally(e);
throw e;
}
};
Future<Process> thread = THREAD_POOL.submit(task);
if (retry) {
try {
exit.set(thread.get().waitFor());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
if (0 != exit.get()) {
LOG.info("Active threads running {}", THREAD_POOL.getActiveCount());
int currentActive = THREAD_POOL.getActiveCount();
if (currentActive >= lastActive)
lastActive = currentActive;
if (lastActive > active.get())
active.set(lastActive);
if (lastActive >= active.get() - 1 && active.get() > 0 && THREAD_POOL.getActiveCount() <= THREAD_POOL.getPoolSize()) {
THREAD_POOL.setMaximumPoolSize(active.decrementAndGet());
LOG.info("Reduced thread pool size to {}", active.get());
}
retries++;
if (clean) {
try {
FileUtils.cleanDirectory(dataDirectory);
} catch (IOException e) {
LOG.error("Could not clean up directory {} for retry", dataDirectory.getAbsolutePath());
result.initProcess.completeExceptionally(e);
result.exit.completeExceptionally(e);
break;
}
}
}
} else {
try {
thread.wait();
} catch (InterruptedException e) {
result.initProcess.completeExceptionally(new IOException());
result.exit.completeExceptionally(new IOException("Failed to execute: " + command.processName()));
e.printStackTrace();
break;
}
}

if (retries >= 10) {
result.initProcess.completeExceptionally(new IOException());
result.exit.completeExceptionally(new IOException("Failed to execute: " + command.processName() + ", too many failures."));
break;
}

if (!retry)
break;
}
return exit.get();
}

private SystemResult system(Command command, List<String> args, boolean retry, boolean clean)
{
try {
final ProcessBuilder builder = new ProcessBuilder();
final ProcessBuilder builder = new ProcessBuilder();

command.applyTo(builder, args);
builder.redirectErrorStream(true);
builder.redirectError(errorRedirector);
builder.redirectOutput(outputRedirector);
command.applyTo(builder, args);
builder.redirectErrorStream(true);
builder.redirectError(errorRedirector);
builder.redirectOutput(outputRedirector);

final Process process = builder.start();
SystemResult result = new SystemResult();
result.initProcess = new CompletableFuture<>();
result.builder = builder;
result.exit = CompletableFuture.supplyAsync(() -> systemThread(builder, command, result, retry, clean));
return result;
}

if (outputRedirector.type() == ProcessBuilder.Redirect.Type.PIPE) {
ProcessOutputLogger.logOutput(LOG, process, command.processName());
}
if (0 != process.waitFor()) {
throw new IllegalStateException(String.format("Process %s failed", builder.command()));
}
} catch (final RuntimeException e) { // NOPMD
throw e;
} catch (final Exception e) {
throw new RuntimeException(e);
}
private SystemResult system(Command command, List<String> args, boolean retry)
{
return system(command, args, retry, false);
}

private SystemResult system(Command command, List<String> args)
{
return system(command, args, false, false);
}

private class SystemResult {
ProcessBuilder builder;
CompletableFuture<Process> initProcess;
Process process;
CompletableFuture<Integer> exit;
}

private static void mkdirs(File dir)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.zonky.test.db.postgres.embedded;

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class InitdbShmgetTest {

@Test
public void testEmbeddedPg() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newCachedThreadPool();
List<InitdbThread> futureList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
futureList.add(new InitdbThread());
}
try{
List<Future<Void>> futures = executor.invokeAll(futureList);
for(Future<Void> future : futures){
future.get();
assertTrue(future.isDone());
}
} finally {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
}

class InitdbThread implements Callable<Void> {

public Void call() throws IOException, InterruptedException {
EmbeddedPostgres.Builder databaseBuilder = EmbeddedPostgres.builder();
EmbeddedPostgres pg = databaseBuilder.start();
Thread.sleep(5000);
pg.close();
return null;
}
}
}