diff --git a/core/src/main/java/dev/keva/core/aof/AOFContainer.java b/core/src/main/java/dev/keva/core/aof/AOFContainer.java index 1736fd12..afa9ae07 100644 --- a/core/src/main/java/dev/keva/core/aof/AOFContainer.java +++ b/core/src/main/java/dev/keva/core/aof/AOFContainer.java @@ -1,6 +1,7 @@ package dev.keva.core.aof; import dev.keva.core.config.KevaConfig; +import dev.keva.core.exception.StartupException; import dev.keva.ioc.annotation.Autowired; import dev.keva.ioc.annotation.Component; import dev.keva.protocol.resp.Command; @@ -16,11 +17,13 @@ @Slf4j @Component -public class AOFContainer { +public class AOFContainer implements Closeable { private ReentrantLock bufferLock; private ObjectOutputStream output; private FileDescriptor fd; private boolean alwaysFlush; + private ScheduledExecutorService executorService; + private volatile boolean isOpen; @Autowired private KevaConfig kevaConfig; @@ -47,7 +50,7 @@ public void init() { } if (!alwaysFlush) { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleAtFixedRate(() -> { try { flush(); @@ -59,10 +62,15 @@ public void init() { } else { log.info("AOF will trigger for every new mutate command"); } + isOpen = true; } public void write(Command command) { bufferLock.lock(); + if (!isOpen) { + log.warn("Dropping write to AOF as it is closed!"); + return; + } try { output.writeObject(command.getObjects()); if (alwaysFlush) { @@ -75,34 +83,44 @@ public void write(Command command) { } } - private void flush() throws IOException { - fd.sync(); - } - public List read() throws IOException { - try { - List commands = new ArrayList<>(100); - FileInputStream fis = new FileInputStream(getWorkingDir() + "keva.aof"); + final List commands = new ArrayList<>(100); + try (FileInputStream fis = new FileInputStream(getWorkingDir() + "keva.aof"); + ObjectInputStream input = new ObjectInputStream(fis)) { log.info("AOF size is: {}", fis.getChannel().size()); - ObjectInputStream input = new ObjectInputStream(fis); while (true) { - try { - byte[][] objects = (byte[][]) input.readObject(); - commands.add(Command.newInstance(objects, false)); - } catch (EOFException e) { - log.error("Error while reading AOF command", e); - fis.close(); - return commands; - } catch (ClassNotFoundException e) { - log.error("Error reading AOF file", e); - return commands; - } + byte[][] objects = (byte[][]) input.readObject(); + commands.add(Command.newInstance(objects, false)); + } + } catch (final FileNotFoundException | EOFException ignored) { + return commands; + } catch (final ClassNotFoundException e) { + final String msg = "Error reading AOF file"; + log.error(msg, e); + throw new StartupException(msg, e); + } + } + + public void close() throws IOException { + bufferLock.lock(); + isOpen = false; + log.info("Closing AOF log."); + try { + if (executorService != null) { + executorService.shutdown(); } - } catch (FileNotFoundException ignored) { - throw new FileNotFoundException("AOF file not found"); + // Closing the stream should flush it, but still doing it explicitly! + flush(); + output.close(); + } finally { + bufferLock.unlock(); } } + private void flush() throws IOException { + fd.sync(); + } + private String getWorkingDir() { String workingDir = kevaConfig.getWorkDirectory(); return workingDir.equals("./") ? "" : workingDir + "/"; diff --git a/core/src/main/java/dev/keva/core/aof/AOFManager.java b/core/src/main/java/dev/keva/core/aof/AOFManager.java index 89ce9540..1dc20a96 100644 --- a/core/src/main/java/dev/keva/core/aof/AOFManager.java +++ b/core/src/main/java/dev/keva/core/aof/AOFManager.java @@ -50,4 +50,8 @@ public void init() { aof.init(); } + + public void stop() throws IOException { + aof.close(); + } } diff --git a/core/src/main/java/dev/keva/core/exception/StartupException.java b/core/src/main/java/dev/keva/core/exception/StartupException.java new file mode 100644 index 00000000..a90bbb69 --- /dev/null +++ b/core/src/main/java/dev/keva/core/exception/StartupException.java @@ -0,0 +1,11 @@ +package dev.keva.core.exception; + +/** + * StartupException indicates any fatal error encountered during server boot. + */ +public class StartupException extends RuntimeException{ + + public StartupException(final String msg, final Throwable cause){ + super(msg, cause); + } +} diff --git a/core/src/main/java/dev/keva/core/server/KevaServer.java b/core/src/main/java/dev/keva/core/server/KevaServer.java index 0f4072e3..ca42bda5 100644 --- a/core/src/main/java/dev/keva/core/server/KevaServer.java +++ b/core/src/main/java/dev/keva/core/server/KevaServer.java @@ -30,6 +30,12 @@ public class KevaServer implements Server { " | |/ / | __| \\ \\ / / /_\\ \n" + " | ' < | _| \\ V / / _ \\ \n" + " |_|\\_\\ |___| \\_/ /_/ \\_\\"; + private static final int SHUTDOWN_TIMEOUT_MS = 1000; + private enum State { + CREATED, CREATING, RUNNING, TERMINATING, TERMINATED + } + + private volatile State state; private final KevaDatabase database; private final KevaConfig config; private final NettyChannelInitializer nettyChannelInitializer; @@ -47,6 +53,7 @@ public KevaServer(KevaDatabase database, KevaConfig config, NettyChannelInitiali this.nettyChannelInitializer = nettyChannelInitializer; this.commandMapper = commandMapper; this.aofManager = aofManager; + this.state = State.CREATED; } public static KevaServer ofDefaults() { @@ -64,7 +71,102 @@ public static KevaServer ofCustomBeans(Object... beans) { return context.getBean(KevaServer.class); } - public ServerBootstrap bootstrapServer() throws NettyNativeTransportLoader.NettyNativeLoaderException { + @Override + public void shutdown() { + switch (state) { + case CREATED: + case CREATING: + throw new RuntimeException("Attempt to shutdown a non-started server!"); + case RUNNING: + boolean set = updateState(State.TERMINATING); + if (!set) { + // The state was concurrently modified, so re-check the condition. + shutdown(); + return; + } + try { + bossGroup.shutdownGracefully(0, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS).sync(); + workerGroup.shutdownGracefully(0, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS).sync(); + channel.close(); + database.close(); + aofManager.stop(); + } catch (Exception e) { + log.warn("Encountered error while shutting down server, ignoring", e); + } + state = State.TERMINATED; + log.info("Keva server at {} stopped", config.getPort()); + log.info("Bye bye!"); + return; + default: + } + } + + @Override + public void run() { + switch (state) { + case CREATING: + case RUNNING: + return; + case CREATED: + // take create lock and call run again + boolean set = updateState(State.CREATING); + if (!set) { + // The state was concurrently modified, so re-check the condition. + run(); + return; + } + try { + stopwatch.start(); + ServerBootstrap server = bootstrapServer(); + + aofManager.init(); + + ChannelFuture sync = server.bind(config.getPort()).sync(); + log.info("{} server started at {}:{}, in {} ms", + KEVA_BANNER, + config.getHostname(), config.getPort(), + stopwatch.elapsed(TimeUnit.MILLISECONDS)); + log.info("Ready to accept connections"); + state = State.RUNNING; + System.out.println("Set state to running"); + channel = sync.channel(); + channel.closeFuture().sync(); //block + } catch (InterruptedException e) { + log.error("Failed to start server: ", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("Failed to start server: ", e); + } finally { + stopwatch.stop(); + } + return; + default: + throw new RuntimeException("Attempt to run a stopped server"); + } + } + + @Override + public void clear() { + switch (state) { + case RUNNING: + database.flush(); + return; + default: + throw new RuntimeException("Attempt to clear a non-running server"); + } + + } + + // Do a CAS on state + private synchronized boolean updateState(State state) { + if (this.state.equals(state)) { + return false; + } + this.state = state; + return true; + } + + private ServerBootstrap bootstrapServer() throws NettyNativeTransportLoader.NettyNativeLoaderException { try { commandMapper.init(); Class executorGroupClazz = NettyNativeTransportLoader.getEventExecutorGroupClazz(); @@ -85,45 +187,4 @@ public ServerBootstrap bootstrapServer() throws NettyNativeTransportLoader.Netty throw new NettyNativeTransportLoader.NettyNativeLoaderException("Cannot load Netty classes"); } } - - @Override - public void shutdown() { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - channel.close(); - log.info("Keva server at {} stopped", config.getPort()); - log.info("Bye bye!"); - } - - @Override - public void run() { - try { - stopwatch.start(); - ServerBootstrap server = bootstrapServer(); - - aofManager.init(); - - ChannelFuture sync = server.bind(config.getPort()).sync(); - log.info("{} server started at {}:{}, in {} ms", - KEVA_BANNER, - config.getHostname(), config.getPort(), - stopwatch.elapsed(TimeUnit.MILLISECONDS)); - log.info("Ready to accept connections"); - - channel = sync.channel(); - channel.closeFuture().sync(); - } catch (InterruptedException e) { - log.error("Failed to start server: ", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - log.error("Failed to start server: ", e); - } finally { - stopwatch.stop(); - } - } - - @Override - public void clear() { - database.flush(); - } } diff --git a/core/src/test/java/dev/keva/core/server/AbstractServerTest.java b/core/src/test/java/dev/keva/core/server/AbstractServerTest.java index 74563aae..82d3927a 100644 --- a/core/src/test/java/dev/keva/core/server/AbstractServerTest.java +++ b/core/src/test/java/dev/keva/core/server/AbstractServerTest.java @@ -20,7 +20,7 @@ public abstract class AbstractServerTest { static Jedis jedis; - static Server server; + static volatile Server server; static Jedis subscriber; @Test diff --git a/store/src/main/java/dev/keva/store/KevaDatabase.java b/store/src/main/java/dev/keva/store/KevaDatabase.java index b6d6f625..b45242ae 100644 --- a/store/src/main/java/dev/keva/store/KevaDatabase.java +++ b/store/src/main/java/dev/keva/store/KevaDatabase.java @@ -2,14 +2,18 @@ import dev.keva.util.hashbytes.BytesKey; +import java.io.Closeable; import java.util.AbstractMap; import java.util.concurrent.locks.Lock; -public interface KevaDatabase { +public interface KevaDatabase extends Closeable { Lock getLock(); void flush(); + @Override + default void close() {} + void put(byte[] key, byte[] val); void expireAt(byte[] key, long timestampInMillis); diff --git a/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java b/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java index 356ab083..db3436b2 100644 --- a/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java +++ b/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java @@ -74,6 +74,16 @@ public void flush() { } } + @Override + public void close() { + lock.lock(); + try { + chronicleMap.close(); + } finally { + lock.unlock(); + } + } + private byte[] getExpireKey(byte[] key) { return Bytes.concat(key, EXP_POSTFIX); }