diff --git a/.gitignore b/.gitignore index 030dfa3..6e892c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,3 @@ -# Java class files -*.class - # MacOS *.DS_Store @@ -16,10 +13,10 @@ *.ipr # Maven -log/ target/ # Gradle +!/gradle/wrapper/gradle-wrapper.jar .gradle/ */build/ out/ @@ -31,3 +28,12 @@ out/ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* + + +# Project files +*.class +*/performance*.log +performance*.log +*/logs/ +logs/ + diff --git a/README.md b/README.md index 65fc82c..7ea93d7 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,261 @@ time-hz-executor ======= -My pet project. -This is a prototype of multithreaded service executes incoming tasks on a schedule time. +My pet project. +This is a prototype of multithreaded and distributed service executes incoming tasks on a scheduled time. Powered by: `Java SE`, `Guice` and `Hazelcast`. Project uses SEDA like highly customizable thread model: independent tasks to use producers, consumers and queue, distributed map between them. + + +## Rules: + +![export-2](https://cloud.githubusercontent.com/assets/4469702/24129401/85e05708-0df3-11e7-8dd0-17f8bbb1e12e.png) + + * The cluster of Hazelcast nodes accepts tasks with `LocalDateTime` and `Callable`. + * `LocalDateTime` is a scheduled time and `Callable` is a task for execution on that time. + * Order of the execution uses scheduled time or number of inbound order. + * Tasks could comes in a random order + * 'Hot' tasks should not waste time and executes immediately. + + +## Requirements: + + * Java SE Development Kit 8 (or newer) + * Gradle 2.x (or you could use Gradle wrapper) + + +## Project configuration: + + * Java SE should be installed and you need to set path variable for `JAVA_HOME`. + * Gradle doesn't need to install because you might do this automatically thanks Gradle Wrapper. + + +## Run + + * Build project. Go to the root path `/time-hz-executor/` of the project and run: +```sh +time-hz-executor$ ./gradlew clean build +Version 0.1 +:client-cli:clean UP-TO-DATE +:hz-node:clean UP-TO-DATE +:shared:clean UP-TO-DATE +:shared:compileJava +:shared:processResources +:shared:classes +:shared:jar +:client-cli:compileJava +:client-cli:processResources +:client-cli:classes +:client-cli:jar +:client-cli:assemble +:client-cli:compileTestJava NO-SOURCE +:client-cli:processTestResources NO-SOURCE +:client-cli:testClasses UP-TO-DATE +:client-cli:test NO-SOURCE +:client-cli:check UP-TO-DATE +:client-cli:build +:hz-node:compileJava +:hz-node:processResources +:hz-node:classes +:hz-node:jar +:hz-node:assemble +:hz-node:compileTestJava NO-SOURCE +:hz-node:processTestResources NO-SOURCE +:hz-node:testClasses UP-TO-DATE +:hz-node:test NO-SOURCE +:hz-node:check UP-TO-DATE +:hz-node:build +:shared:assemble +:shared:compileTestJava +:shared:processTestResources NO-SOURCE +:shared:testClasses +:shared:test +:shared:check +:shared:build + +BUILD SUCCESSFUL + +Total time: 5.176 secs + +``` + * Run Hazelcast cluster: +```sh +time-hz-executor/scripts$ ./start_hz_cluster.sh + +... ... + +05:43:37.025 [main] INFO r.s.c.ThreadPoolBuilder - create thread pool: node.executor, threads [8..32], idleTime: 60 SECONDS, queue: SynchronousQueue [] +05:43:37.094 [main] INFO r.s.c.Node - ----- // ----- Node: 1 START 2017-03-21T05:43:37.093 ----- // ----- +05:43:37.096 [node-hz-1] INFO r.s.c.Node - Node: 1 starting... +05:43:37.096 [node-hz-1] INFO r.s.c.ServiceController - Node:1 services starting... +05:43:37.096 [node-main-1] INFO r.s.c.Node - Node: 1 thread: Thread[node-main-1,5,main] await the state: IDLE to stop itself +05:43:37.097 [node-main-1] DEBUG r.s.c.Node - Thread: Thread[node-main-1,5,main] is alive +05:43:37.118 [node-hz-1] INFO r.s.h.HzService - Hz server service starting... +05:43:37.120 [node-hz-1] DEBUG r.s.h.HzBuilder - Load HZ server instance... +мар 21, 2017 5:43:37 AM com.hazelcast.config.ClasspathXmlConfig +INFO: Configuring Hazelcast from 'hazelcast.xml'. +05:43:37.197 [main] INFO r.s.c.ThreadPoolBuilder - create thread pool: node.executor, threads [8..32], idleTime: 60 SECONDS, queue: SynchronousQueue [] +05:43:37.284 [main] INFO r.s.c.Node - ----- // ----- Node: 2 START 2017-03-21T05:43:37.284 ----- // ----- +05:43:37.290 [node-hz-2] INFO r.s.c.Node - Node: 2 starting... +05:43:37.291 [node-hz-2] INFO r.s.c.ServiceController - Node:2 services starting... +05:43:37.291 [node-main-2] INFO r.s.c.Node - Node: 2 thread: Thread[node-main-2,5,main] await the state: IDLE to stop itself +05:43:37.291 [node-main-2] DEBUG r.s.c.Node - Thread: Thread[node-main-2,5,main] is alive +05:43:37.312 [node-hz-2] INFO r.s.h.HzService - Hz server service starting... +05:43:37.315 [node-hz-2] DEBUG r.s.h.HzBuilder - Load HZ server instance... +мар 21, 2017 5:43:37 AM com.hazelcast.config.ClasspathXmlConfig +INFO: Configuring Hazelcast from 'hazelcast.xml'. +05:43:37.412 [main] INFO r.s.c.ThreadPoolBuilder - create thread pool: node.executor, threads [8..32], idleTime: 60 SECONDS, queue: SynchronousQueue [] +05:43:37.542 [main] INFO r.s.c.Node - ----- // ----- Node: 3 START 2017-03-21T05:43:37.542 ----- // ----- +05:43:37.545 [node-hz-3] INFO r.s.c.Node - Node: 3 starting... +05:43:37.546 [node-hz-3] INFO r.s.c.ServiceController - Node:3 services starting... +05:43:37.546 [node-main-3] INFO r.s.c.Node - Node: 3 thread: Thread[node-main-3,5,main] await the state: IDLE to stop itself + +... ... + +Members [3] { + Member [127.0.0.1]:5701 + Member [127.0.0.1]:5702 + Member [127.0.0.1]:5703 this +} + +05:43:50.659 [node-hz-1] INFO c.h.p.InternalPartitionService - [127.0.0.1]:5701 [dev-node-hz] [3.6.7] Initializing cluster partition table arrangement... +05:43:50.670 [node-hz-1] INFO c.h.i.HazelcastInstanceImpl - [127.0.0.1]:5701 [dev-node-hz] [3.6.7] HazelcastInstance starting after waiting for cluster size of 2 +05:43:50.670 [node-hz-1] INFO c.h.c.LifecycleService - [127.0.0.1]:5701 [dev-node-hz] [3.6.7] Address[127.0.0.1]:5701 is STARTED +05:43:50.670 [node-hz-1] INFO r.s.h.HzService - Hz server service started +05:43:50.671 [node-hz-1] INFO r.s.c.TaskTimeService - TaskTimeService Node:1 starting... +05:43:50.675 [node-hz-1] INFO r.s.c.TaskTimeService - TaskTimeService Node:1 started +05:43:50.676 [node-hz-1] INFO r.s.c.ServiceController - Listener: Node:1 has started all services --> +05:43:50.676 [node-hz-1] INFO r.s.c.ServiceController - Node:1 services started, state: RUN +05:43:50.681 [node-hz-1] INFO r.s.c.Node - Node: 1 started, state: RUN + +... ... + +``` + * The same log you could see in the files in current directory: + ``` + time-hz-executor/scripts$ tree ./logs/ +./logs/ +└── node.log + +0 directories, 1 file + +``` + * Run Hazelcast client: +```sh + +... ... + +INFO: HazelcastClient[hz.client_0_dev-node-hz][3.6.7] is STARTED +мар 21, 2017 5:56:52 AM com.hazelcast.client.spi.impl.ClientMembershipListener +INFO: + +Members [3] { + Member [127.0.0.1]:5701 + Member [127.0.0.1]:5702 + Member [127.0.0.1]:5703 +} + +мар 21, 2017 5:56:52 AM com.hazelcast.core.LifecycleService +INFO: HazelcastClient[hz.client_0_dev-node-hz][3.6.7] is CLIENT_CONNECTED +ConsoleClient: Client get ready, choose command... (/h - help) + +... ... + +``` + * Use the command line interface to send a task on execution to Hazelcast cluster +```sh +/h + h - help + You see current message + + s - send : : + You send the text message at the scheduled time to execute on Hazelcast node + + q - quit + End session and quit + + t - utc + Get current Hazelcast cluster time in UTC + +Start your command with slash symbol '/' +Author: Dmitriy Shishmakov + +/s 2017-03-21T03:02:12 Test_message_hello! +Send task successfully! +``` + * On a server side you could see +```sh +06:03:50.034 [node-main-3] DEBUG r.s.c.Node - Thread: Thread[node-main-3,5,main] is alive +06:03:59.102 [node.executor-0] DEBUG r.s.c.FirstLevelWatcher - <-- FirstLevelWatcher Node:2 take task 'TimeTask[orderId=1,scheduledTime=1490065332000]'; checkTime: 1490065439097, scheduledTime: 1490065332000, delta: -107097 +06:03:59.102 [node.executor-0] DEBUG r.s.c.FirstLevelWatcher - --> FirstLevelWatcher Node:2 put task 'TimeTask[orderId=1,scheduledTime=1490065332000]' +06:03:59.102 [node.executor-2] DEBUG r.s.c.FirstLevelConsumer - <-- FirstLevelConsumer:1 Node:2 start process task 'TimeTask[orderId=1,scheduledTime=1490065332000]' ... +06:03:59.103 [node.executor-2] INFO r.s.h.MessageTask - Run task; time: 2017-03-21T03:02:12, message: Test_message_hello! +06:03:59.966 [node-main-1] DEBUG r.s.c.Node - Thread: Thread[node-main-1,5,main] is alive +06:04:00.018 [node-main-2] DEBUG r.s.c.Node - Thread: Thread[node-main-2,5,main] is alive +``` + + +## Stop + + * First choice for the client `time-hz-executor` is terminated in response to a user interrupt, such as typing `^C` (Ctrl + C), or a system-wide event of shutdown. +```sh + +... ... + +^CClient: 1 STOP 2017-03-21T06:15:23.772 +Buy! +Client: 1 stopped, state: IDLE +time-hz-executor/scripts$ +``` + * Second choice is to use inner command line +```sh + +... ... + +ConsoleClient: Client get ready, choose command... (/h - help) +/h + h - help + You see current message + + s - send : : + You send the text message at the scheduled time to execute on Hazelcast node + + q - quit + End session and quit + + t - utc + Get current Hazelcast cluster time in UTC + +Start your command with slash symbol '/' +Author: Dmitriy Shishmakov + +/q +мар 21, 2017 6:22:09 AM com.hazelcast.core.LifecycleService +INFO: HazelcastClient[hz.client_0_dev-node-hz][3.6.7] is SHUTTING_DOWN +мар 21, 2017 6:22:09 AM com.hazelcast.core.LifecycleService +INFO: HazelcastClient[hz.client_0_dev-node-hz][3.6.7] is SHUTDOWN +Client: 1 stopped, state: IDLE +Client: 1 STOP 2017-03-21T06:22:14.996 +Buy! +time-hz-executor/scripts$ + +``` + * Stop Hazelcast cluster: +```sh +time-hz-executor/scripts$ ./stop_hz_cluster.sh + +... ... + +06:25:24.349 [node-2-hook-thread] INFO r.s.c.ServiceController - Listener: Node:2 has stopped all services <-- +06:25:24.349 [node-2-hook-thread] INFO r.s.c.ServiceController - Node:2 services stopped, state: IDLE +06:25:24.349 [node-2-hook-thread] INFO r.s.c.Node - Node: 2 executor services stopping... +06:25:24.350 [node-2-hook-thread] INFO r.s.c.Node - Node: 2 executor services stopped +06:25:24.350 [node-2-hook-thread] INFO r.s.c.Node - Node: 2 stopped, state: IDLE + +... ... + +06:25:26.568 [node-1-hook-thread] INFO r.s.c.TaskTimeService - TaskTimeService Node:1 stopped +06:25:26.568 [node-1-hook-thread] INFO r.s.c.ServiceController - Listener: Node:1 has stopped all services <-- +06:25:26.568 [node-1-hook-thread] INFO r.s.c.ServiceController - Node:1 services stopped, state: IDLE +06:25:26.568 [node-1-hook-thread] INFO r.s.c.Node - Node: 1 executor services stopping... +06:25:26.569 [node-1-hook-thread] INFO r.s.c.Node - Node: 1 executor services stopped +06:25:26.569 [node-1-hook-thread] INFO r.s.c.Node - Node: 1 stopped, state: IDLE +``` diff --git a/build.gradle b/build.gradle index 3b85ae5..d6e3b58 100644 --- a/build.gradle +++ b/build.gradle @@ -1,3 +1,5 @@ +import java.time.ZonedDateTime + /* * This build file was generated by the Gradle 'init' task. * @@ -6,26 +8,85 @@ * user guide available at https://docs.gradle.org/3.4.1/userguide/tutorial_java_projects.html */ +ext { + buildVesion = '0.1' + powerMockVersion = "1.6.6" + jacksonVersion = "2.8.7" + hazelcastVersion = "3.6.7" +} -// Apply the java plugin to add support for Java -apply plugin: 'java' +task wrapper(type: Wrapper) { + gradleVersion = '3.4.1' +} -// In this section you declare where to find the dependencies of your project -repositories { - // Use 'jcenter' for resolving your dependencies. - // You can declare any Maven/Ivy/file repository here. - jcenter() +task printVersion { + logger.quiet "Version $buildVesion" } -// In this section you declare the dependencies for your production and test code -dependencies { - // The production code uses the SLF4J logging API at compile time - compile 'org.slf4j:slf4j-api:1.7.21' +subprojects { + apply plugin: 'java' + group = 'ru.shishmakov' + version = buildVesion + + tasks.withType(JavaCompile) { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 + } + + repositories { + jcenter() + mavenLocal() + } - // Declare the dependency for your favourite test framework you want to use in your tests. - // TestNG is also supported by the Gradle Test task. Just change the - // testCompile dependency to testCompile 'org.testng:testng:6.8.1' and add - // 'test.useTestNG()' to your build script. - testCompile 'junit:junit:4.12' + tasks.withType(Jar) { jar -> + jar.manifest.attributes( + 'Created-By': 'Gradle ' + gradle.gradleVersion + ', Java ' + JavaVersion.current(), + 'Date': ZonedDateTime.now(), + 'Implementation-Title': project.name + ' of project ' + rootProject.name, + 'Implementation-Version': version, + 'Implementation-Vendor': 'ru.shishmakov' + ) + if (!project.name.startsWith("shared")) { + jar.baseName = project.name + '-all' + jar.from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } + } + } } +configure(subprojects) { + [compileJava, compileTestJava]*.options*.encoding = 'UTF-8' + + dependencies { + compile 'ch.qos.logback:logback-classic:1+' + compile 'org.apache.commons:commons-lang3:3+' + compile 'commons-collections:commons-collections:3+' + compile 'com.google.guava:guava:21.0' + compile 'com.google.inject:guice:4.1.0' + compile 'com.google.code.findbugs:jsr305:3.0.1' + compile 'ru.vyarus:guice-ext-annotations:1.2.1' + compile 'org.aeonbits.owner:owner:1.0.9' + + compile "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${jacksonVersion}" + compile "com.hazelcast:hazelcast:${hazelcastVersion}" + compile "com.hazelcast:hazelcast-client:${hazelcastVersion}" +// compile 'info.jerrinot:subzero-core:0.7' +// compile 'com.esotericsoftware:kryo:3.0.3' // 4.0.0 +// compile 'de.javakaffee:kryo-serializers:0.37' // 0.41 + + testCompile 'org.hamcrest:hamcrest-all:1.3' + testCompile 'junit:junit:4.12' + testCompile 'org.mockito:mockito-all:1.10.19' + testCompile "org.powermock:powermock-module-junit4:${powerMockVersion}" + testCompile("org.powermock:powermock-api-mockito:${powerMockVersion}") { + exclude group: 'org.hamcrest', module: 'hamcrest-core' + } + } + + test { + useJUnit() + maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 + minHeapSize = '128M' + maxHeapSize = '256M' + } +} diff --git a/client-cli/build.gradle b/client-cli/build.gradle new file mode 100644 index 0000000..ad24517 --- /dev/null +++ b/client-cli/build.gradle @@ -0,0 +1,7 @@ +dependencies { + compile project(':shared') +} + +tasks.withType(Jar) { j -> + j.manifest.attributes('Main-Class': 'ru.shishmakov.client.Main') +} diff --git a/client-cli/src/main/java/ru/shishmakov/client/Main.java b/client-cli/src/main/java/ru/shishmakov/client/Main.java new file mode 100644 index 0000000..a23fb2c --- /dev/null +++ b/client-cli/src/main/java/ru/shishmakov/client/Main.java @@ -0,0 +1,17 @@ +package ru.shishmakov.client; + +import com.google.inject.Guice; +import ru.shishmakov.core.Client; +import ru.shishmakov.core.ClientModule; + +/** + * @author Dmitriy Shishmakov on 13.03.17 + */ +public class Main { + + public static void main(String[] args) throws InterruptedException { + Guice.createInjector(new ClientModule()) + .getInstance(Client.class) + .startAsync().await(); + } +} diff --git a/client-cli/src/main/java/ru/shishmakov/core/Client.java b/client-cli/src/main/java/ru/shishmakov/core/Client.java new file mode 100644 index 0000000..4a67ce8 --- /dev/null +++ b/client-cli/src/main/java/ru/shishmakov/core/Client.java @@ -0,0 +1,133 @@ +package ru.shishmakov.core; + +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.shishmakov.concurrent.LifeCycle; +import ru.shishmakov.concurrent.ServiceController; +import ru.shishmakov.hz.HzService; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Singleton; +import java.lang.invoke.MethodHandles; +import java.time.LocalDateTime; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static ru.shishmakov.concurrent.LifeCycle.*; +import static ru.shishmakov.concurrent.Threads.*; + +/** + * Start point of the Hazelcast client instance + * + * @author Dmitriy Shishmakov on 10.03.17 + */ +@Singleton +public class Client { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger ucLogger = LoggerFactory.getLogger("userConsole"); + + private static final String NAME = MethodHandles.lookup().lookupClass().getSimpleName(); + private static final AtomicReference CLIENT_STATE = new AtomicReference<>(IDLE); + private static final CountDownLatch awaitStart = new CountDownLatch(1); + private static final String CLIENT_SYSTEM_KEY = "client"; + + @Inject + @Named("client.executor") + private ExecutorService executor; + @Inject + private ConsoleClient consoleClient; + @Inject + private HzService hzService; + @Inject + private ServiceController serviceController; + private final int clientNumber; + + + public Client() { + this.clientNumber = Integer.valueOf(System.getProperty(CLIENT_SYSTEM_KEY, "0")); + } + + @PostConstruct + public void setUp() { + logger.info("----- // ----- {}: {} START {} ----- // -----", NAME, clientNumber, LocalDateTime.now()); + ucLogger.info("{}: {} START {}", NAME, clientNumber, LocalDateTime.now()); + this.serviceController.setMetaInfo(clientNumber, "Client"); + this.consoleClient.setMetaInfo(clientNumber, NAME); + } + + @PreDestroy + public void tearDown() { + logger.info("----- // ----- {}: {} STOP {} ----- // -----", NAME, clientNumber, LocalDateTime.now()); + ucLogger.info("{}: {} STOP {}\nBuy!", NAME, clientNumber, LocalDateTime.now()); + } + + public Client startAsync() { + new Thread(this::start, "client-hz-" + clientNumber).start(); + return this; + } + + public Client start() { + logger.info("{}: {} starting...", NAME, clientNumber); + + final LifeCycle state = CLIENT_STATE.get(); + if (LifeCycle.isNotIdle(state)) { + logger.warn("Warning! {}: {} already started, state: {}", NAME, clientNumber, state); + return this; + } + CLIENT_STATE.set(INIT); + awaitStart.countDown(); + serviceController.startServices(hzService, consoleClient); + assignThreadHook(this::stop, "client-" + clientNumber + "-hook-thread"); + + CLIENT_STATE.set(RUN); + logger.info("{}: {} started, state: {}", NAME, clientNumber, CLIENT_STATE.get()); + return this; + } + + public void stop() { + logger.info("{}: {} stopping...", NAME, clientNumber); + final LifeCycle state = CLIENT_STATE.get(); + if (LifeCycle.isNotRun(state)) { + logger.warn("Warning! {}: {} already stopped, state: {}", NAME, clientNumber, state); + return; + } + + try { + CLIENT_STATE.set(STOPPING); + serviceController.stopServices(); + stopExecutors(); + } finally { + CLIENT_STATE.set(IDLE); + logger.info("{}: {} stopped, state: {}", NAME, clientNumber, CLIENT_STATE.get()); + ucLogger.info("{}: {} stopped, state: {}", NAME, clientNumber, CLIENT_STATE.get()); + } + } + + public void await() throws InterruptedException { + awaitStart.await(); + Thread.currentThread().setName("client-main-" + clientNumber); + logger.info("{}: {} thread: {} await the state: {} to stop itself", NAME, clientNumber, Thread.currentThread(), IDLE); + for (long count = 0; LifeCycle.isNotIdle(CLIENT_STATE.get()); count++) { + if (count % 100 == 0) logger.debug("Thread: {} is alive", Thread.currentThread()); + sleepWithoutInterruptedAfterTimeout(100, MILLISECONDS); + } + } + + + private void stopExecutors() { + logger.info("{}: {} executor services stopping...", NAME, clientNumber); + try { + MoreExecutors.shutdownAndAwaitTermination(executor, STOP_TIMEOUT_SEC, SECONDS); + logger.info("Executor services stopped"); + } catch (Exception e) { + logger.error("{}: {} exception occurred during stopping executor services", NAME, clientNumber, e); + } + } +} diff --git a/client-cli/src/main/java/ru/shishmakov/core/ClientModule.java b/client-cli/src/main/java/ru/shishmakov/core/ClientModule.java new file mode 100644 index 0000000..13922fd --- /dev/null +++ b/client-cli/src/main/java/ru/shishmakov/core/ClientModule.java @@ -0,0 +1,45 @@ +package ru.shishmakov.core; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import org.aeonbits.owner.ConfigFactory; +import ru.shishmakov.concurrent.ThreadPoolBuilder; +import ru.shishmakov.config.HzConfig; +import ru.shishmakov.config.TimeConfig; +import ru.vyarus.guice.ext.ExtAnnotationsModule; + +import javax.inject.Named; +import javax.inject.Singleton; +import java.util.concurrent.ExecutorService; + +/** + * @author Dmitriy Shishmakov on 13.03.17 + */ +public class ClientModule extends AbstractModule { + + @Override + protected void configure() { + binder().install(new ExtAnnotationsModule()); + } + + @Provides + @Singleton + @Named("client.executor") + public ExecutorService starterExecutor() { + return ThreadPoolBuilder.pool("client.executor") + .withThreads(1) + .build(); + } + + @Provides + @Singleton + public TimeConfig timeConfig() { + return ConfigFactory.create(TimeConfig.class); + } + + @Provides + @Singleton + public HzConfig hzConfig() { + return ConfigFactory.create(HzConfig.class); + } +} diff --git a/client-cli/src/main/java/ru/shishmakov/core/ConsoleClient.java b/client-cli/src/main/java/ru/shishmakov/core/ConsoleClient.java new file mode 100644 index 0000000..36b018e --- /dev/null +++ b/client-cli/src/main/java/ru/shishmakov/core/ConsoleClient.java @@ -0,0 +1,175 @@ +package ru.shishmakov.core; + +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.AbstractService; +import com.hazelcast.core.IMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.shishmakov.config.HzConfig; +import ru.shishmakov.config.TimeConfig; +import ru.shishmakov.hz.HzObjects; +import ru.shishmakov.hz.HzService; +import ru.shishmakov.hz.MessageTask; +import ru.shishmakov.hz.TimeTask; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Provider; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.lang.invoke.MethodHandles; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.commons.lang3.StringUtils.*; + +/** + * Service declares the rules to start, shutdown and interaction between Hazelcast client and server + * + * @author Dmitriy Shishmakov on 20.03.17 + */ +public class ConsoleClient extends AbstractService { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger ucLogger = LoggerFactory.getLogger("userConsole"); + + private static final String NAME = MethodHandles.lookup().lookupClass().getSimpleName(); + @Inject + @Named("client.executor") + private ExecutorService executor; + @Inject + private HzService hzService; + @Inject + private HzConfig hzConfig; + @Inject + private HzObjects hzObjects; + @Inject + private TimeConfig timeConfig; + @Inject + private Provider client; + + private final AtomicBoolean watcherState = new AtomicBoolean(true); + private int ownerNumber; + private String ownerName; + + public void setMetaInfo(int ownerNumber, String ownerName) { + this.ownerNumber = ownerNumber; + this.ownerName = ownerName; + } + + @Override + protected void doStart() { + logger.info("{} {}:{} starting...", NAME, ownerName, ownerNumber); + try { + startClientService(); + notifyStarted(); + logger.info("{} {}:{} started", NAME, ownerName, ownerNumber); + } catch (Throwable e) { + notifyFailed(e); + } + } + + @Override + protected void doStop() { + logger.info("{} {}:{} stopping...", NAME, ownerName, ownerNumber); + try { + stopClientService(); + notifyStopped(); + logger.info("{} {}:{} stopped", NAME, ownerName, ownerNumber); + } catch (Throwable e) { + notifyFailed(e); + } + } + + private void startClientService() throws TimeoutException { + hzService.awaitRunning(hzConfig.clientInitialWaitTimeoutSec(), SECONDS); + executor.execute(this::process); + } + + private void stopClientService() { + shutdownClient(); + } + + private void process() { + logger.info("{} {}:{} listening user typing...", NAME, ownerName, ownerNumber); + ucLogger.info("{}: {} get ready, choose command... (/h - help)", NAME, ownerName); + try (BufferedReader input = new BufferedReader(new InputStreamReader(System.in))) { + while (watcherState.get() && !Thread.currentThread().isInterrupted()) { + final String read = input.readLine(); + if (isBlank(read)) continue; + + final Iterator it = Splitter.on(' ').split(read).iterator(); + if (!it.hasNext()) continue; + + final String cmd = it.next(); + if (isBlank(cmd)) continue; + + logger.debug("{} {}:{} user typed: {}", NAME, ownerName, ownerNumber, cmd); + + if (equalsIgnoreCase(cmd, "/h") || equalsIgnoreCase(cmd, "/help")) { + ucLogger.info(String.format("\t%s - %s%n\t%s%n", "h", "help", "You see current message")); + ucLogger.info(String.format("\t%s - %s%n\t%s%n", "s", + "send : :", + "You send the text message at the scheduled time to execute on Hazelcast node")); + ucLogger.info(String.format("\t%s - %s%n\t%s%n", "q", "quit", "End session and quit")); + ucLogger.info(String.format("\t%s - %s%n\t%s%n", "t", "utc", "Get current Hazelcast cluster time in UTC")); + ucLogger.info("Start your command with slash symbol '/'\nAuthor: Dmitriy Shishmakov\n"); + continue; + } + + if (equalsIgnoreCase(cmd, "/q") || equalsIgnoreCase(cmd, "/quit")) { + client.get().stop(); + break; + } + + if (equalsIgnoreCase(cmd, "/t") || equalsIgnoreCase(cmd, "/utc")) { + final long clusterTime = hzObjects.getClusterTime(); + ucLogger.info("Cluster time: {}\n", + LocalDateTime.ofInstant(Instant.ofEpochMilli(clusterTime), ZoneId.of("UTC"))); + continue; + } + + if (equalsIgnoreCase(cmd, "/s") || equalsIgnoreCase(cmd, "/send")) { + final String time = it.hasNext() ? it.next() : EMPTY; + final String message = it.hasNext() ? it.next() : EMPTY; + if (isBlank(time) || isBlank(message)) { + ucLogger.info("Could not parse your typing. Please try again...\n"); + continue; + } + try { + final LocalDateTime localDateTime = LocalDateTime.parse(time, ISO_LOCAL_DATE_TIME); + final long timeStamp = timeConfig.hotTaskUpperBoundMs() + hzObjects.getClusterTime(); + final TimeTask task = new TimeTask(hzObjects.getTaskIdGenerator().newId(), + localDateTime, new MessageTask(message, localDateTime)); + final IMap map = timeStamp >= task.getScheduledTime() + ? hzObjects.getFirstLevelMap() + : hzObjects.getSecondLevelMap(); + map.set(task.getOrderId(), task); + ucLogger.info("Send task successfully!\n"); + logger.debug("{} {}:{} send task: {}", NAME, ownerName, ownerNumber, task); + } catch (Exception e) { + logger.error("{} {}:{} error in time to send the task", NAME, ownerName, ownerNumber, e); + ucLogger.info("Fail send task!\n"); + } + } + } + } catch (Exception e) { + logger.error("{} {}:{} error in time of processing", NAME, ownerName, ownerNumber, e); + } finally { + shutdownClient(); + } + } + + private void shutdownClient() { + if (watcherState.compareAndSet(true, false)) { + logger.debug("{} {}:{} waiting for shutdown the client...", NAME, ownerName, ownerNumber); + } + + } +} diff --git a/client-cli/src/main/resources/config/hz.properties b/client-cli/src/main/resources/config/hz.properties new file mode 100644 index 0000000..f206f0b --- /dev/null +++ b/client-cli/src/main/resources/config/hz.properties @@ -0,0 +1,5 @@ +################################### +# Your current properties +# Overridden properties +################################### +server=false \ No newline at end of file diff --git a/client-cli/src/main/resources/logback.xml b/client-cli/src/main/resources/logback.xml new file mode 100644 index 0000000..53e3d25 --- /dev/null +++ b/client-cli/src/main/resources/logback.xml @@ -0,0 +1,48 @@ + + + + + + UTF-8 + %msg%n + false + + + + + + UTF-8 + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n + false + + + + + logs/client.log + + logs/client-%d{yyyy-MM-dd}.log.zip + 14 + 3GB + + + UTF-8 + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n + false + + + + + + + + + + + + + + + + + + diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100755 index 0000000..ad0cc20 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index edde2eb..1007fa3 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Fri Mar 10 15:42:19 MSK 2017 +#Fri Mar 10 16:46:52 MSK 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-3.4.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-3.4.1-all.zip diff --git a/hz-node/build.gradle b/hz-node/build.gradle new file mode 100644 index 0000000..00d3cd1 --- /dev/null +++ b/hz-node/build.gradle @@ -0,0 +1,7 @@ +dependencies { + compile project(':shared') +} + +tasks.withType(Jar) { j -> + j.manifest.attributes('Main-Class': 'ru.shishmakov.node.Main') +} diff --git a/hz-node/src/main/java/ru/shishmakov/core/FirstLevelConsumer.java b/hz-node/src/main/java/ru/shishmakov/core/FirstLevelConsumer.java new file mode 100644 index 0000000..4f75202 --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/core/FirstLevelConsumer.java @@ -0,0 +1,41 @@ +package ru.shishmakov.core; + +import com.hazelcast.core.IMap; +import ru.shishmakov.hz.TimeTask; + +import javax.inject.Inject; +import javax.inject.Named; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * First level is a subset of hot tasks retrieve from {@link BlockingQueue} and execute independent of other part of tasks + * + * @author Dmitriy Shishmakov on 18.03.17 + */ +public class FirstLevelConsumer extends LevelConsumer { + + private static final AtomicInteger iteratorFirstLevel = new AtomicInteger(); + + @Inject + @Named("timeQueue.firstLevel") + private BlockingQueue queueFirstLevel; + private IMap mapFirstLevel; + + @Override + protected BlockingQueue getQueue() { + return queueFirstLevel; + } + + @Override + protected IMap getIMap() { + return mapFirstLevel; + } + + @Override + public void start() { + this.mapFirstLevel = hzObjects.getFirstLevelMap(); + this.selfNumber = iteratorFirstLevel.incrementAndGet(); + super.start(); + } +} diff --git a/hz-node/src/main/java/ru/shishmakov/core/FirstLevelWatcher.java b/hz-node/src/main/java/ru/shishmakov/core/FirstLevelWatcher.java new file mode 100644 index 0000000..36a2f85 --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/core/FirstLevelWatcher.java @@ -0,0 +1,40 @@ +package ru.shishmakov.core; + +import com.hazelcast.core.IMap; +import ru.shishmakov.hz.TimeTask; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Singleton; +import java.util.concurrent.BlockingQueue; + +/** + * First level is a subset of hot tasks retrieve from {@link IMap} and put them to {@link BlockingQueue} + * + * @author Dmitriy Shishmakov on 16.03.17 + */ +@Singleton +public class FirstLevelWatcher extends LevelWatcher { + + @Inject + @Named("timeQueue.firstLevel") + private BlockingQueue queueFirstLevel; + private IMap mapFirstLevel; + + @Override + protected BlockingQueue getQueue() { + return queueFirstLevel; + } + + @Override + protected IMap getIMap() { + return mapFirstLevel; + } + + @Override + public void start() { + this.mapFirstLevel = hzObjects.getFirstLevelMap(); + super.start(); + } + +} diff --git a/hz-node/src/main/java/ru/shishmakov/core/LevelConsumer.java b/hz-node/src/main/java/ru/shishmakov/core/LevelConsumer.java new file mode 100644 index 0000000..92d08ae --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/core/LevelConsumer.java @@ -0,0 +1,93 @@ +package ru.shishmakov.core; + +import com.hazelcast.core.IMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.shishmakov.config.TimeConfig; +import ru.shishmakov.hz.HzObjects; +import ru.shishmakov.hz.TimeTask; +import ru.shishmakov.util.QueueUtils; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static ru.shishmakov.concurrent.LifeCycle.RUN; +import static ru.shishmakov.concurrent.Threads.sleepInterrupted; + +/** + * Retrieves tasks {@link TimeTask} from {@link BlockingQueue} and executes them + * + * @author Dmitriy Shishmakov on 18.03.17 + */ +@Singleton +public abstract class LevelConsumer { + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Inject + private TimeConfig timeConfig; + @Inject + protected HzObjects hzObjects; + + private final String name = this.getClass().getSimpleName(); + private final AtomicBoolean consumerState = new AtomicBoolean(true); + private final CountDownLatch awaitStop = new CountDownLatch(1); + protected int selfNumber; + private int ownerNumber; + private String ownerName; + + public void setMetaInfo(int ownerNumber, String ownerName) { + this.ownerNumber = ownerNumber; + this.ownerName = ownerName; + } + + public void start() { + logger.info("{}:{} {}:{} started", name, selfNumber, ownerName, ownerNumber); + try { + while (consumerState.get() && !Thread.currentThread().isInterrupted()) { + QueueUtils.poll(getQueue()).ifPresent(t -> { + t.setState(RUN); + logger.debug("<-- {}:{} {}:{} start process task \'{}\' ...", name, selfNumber, ownerName, ownerNumber, t); + try { + t.call(); + } catch (Exception e) { + logger.error("X--X {}:{} {}:{} failed process task '{}'", name, selfNumber, ownerName, ownerNumber, e); + } finally { + getIMap().delete(t.getOrderId()); + } + }); + sleepInterrupted(timeConfig.scanIntervalMs(), MILLISECONDS); + } + } catch (Exception e) { + logger.error("{}:{} {}:{} error in time of processing", name, selfNumber, ownerName, ownerNumber, e); + } finally { + shutdownWatcher(); + awaitStop.countDown(); + } + } + + public void stop() { + logger.info("{}:{} {}:{} stopping...", name, selfNumber, ownerName, ownerNumber); + try { + shutdownWatcher(); + awaitStop.await(2, SECONDS); + logger.info("{}:{} {}:{} stopped", name, selfNumber, ownerName, ownerNumber); + } catch (Exception e) { + logger.error("{}:{} {}:{} error in time of stopping", name, selfNumber, ownerName, ownerNumber, e); + } + } + + protected abstract BlockingQueue getQueue(); + + protected abstract IMap getIMap(); + + private void shutdownWatcher() { + if (consumerState.compareAndSet(true, false)) { + logger.debug("{}:{} {}:{} waiting for shutdown process to complete...", name, selfNumber, ownerName, ownerNumber); + } + } +} diff --git a/hz-node/src/main/java/ru/shishmakov/core/LevelWatcher.java b/hz-node/src/main/java/ru/shishmakov/core/LevelWatcher.java new file mode 100644 index 0000000..e53443d --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/core/LevelWatcher.java @@ -0,0 +1,113 @@ +package ru.shishmakov.core; + +import com.hazelcast.core.IMap; +import com.hazelcast.query.Predicate; +import com.hazelcast.query.Predicates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.shishmakov.config.TimeConfig; +import ru.shishmakov.hz.HzObjects; +import ru.shishmakov.hz.HzService; +import ru.shishmakov.hz.TimeTask; +import ru.shishmakov.util.QueueUtils; + +import javax.inject.Inject; +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static ru.shishmakov.concurrent.LifeCycle.IDLE; +import static ru.shishmakov.concurrent.LifeCycle.INIT; +import static ru.shishmakov.concurrent.Threads.sleepInterrupted; + +/** + * Retrieves tasks from {@link IMap} and put them to {@link BlockingQueue} if and only if they are ready to process by schedule time + * + * @author Dmitriy Shishmakov on 16.03.17 + */ +public abstract class LevelWatcher { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final String NAME = this.getClass().getSimpleName(); + + @Inject + private TimeConfig timeConfig; + @Inject + private HzService hzService; + @Inject + protected HzObjects hzObjects; + + private final AtomicBoolean watcherState = new AtomicBoolean(true); + private final CountDownLatch awaitStop = new CountDownLatch(1); + private int ownerNumber; + private String ownerName; + + public void setMetaInfo(int ownerNumber, String ownerName) { + this.ownerNumber = ownerNumber; + this.ownerName = ownerName; + } + + public void start() { + logger.info("{} {}:{} started", NAME, ownerName, ownerNumber); + try { + while (watcherState.get() && !Thread.currentThread().isInterrupted()) { + if (hzService.hasHzInstance()) process(); + else logger.warn("{} {}:{} hz instance is not available!", NAME, ownerName, ownerNumber); + sleepInterrupted(timeConfig.scanIntervalMs(), MILLISECONDS); + } + } catch (Exception e) { + logger.error("{} {}:{} error in time of processing", NAME, ownerName, ownerNumber, e); + } finally { + shutdownWatcher(); + awaitStop.countDown(); + } + } + + public void stop() { + logger.info("{} {}:{} stopping...", NAME, ownerName, ownerNumber); + try { + shutdownWatcher(); + awaitStop.await(2, SECONDS); + logger.info("{} {}:{} stopped", NAME, ownerName, ownerNumber); + } catch (Exception e) { + logger.error("{} {}:{} error in time of stopping", NAME, ownerName, ownerNumber, e); + } + } + + protected abstract BlockingQueue getQueue(); + + protected abstract IMap getIMap(); + + private void process() { + final long now = hzObjects.getClusterTime(); + getHotLevelTasks(now + timeConfig.scanIntervalMs()).forEach((time, list) -> { + Collections.sort(list); + list.forEach(t -> { + logger.debug("<-- {} {}:{} take task \'{}\'; checkTime: {}, scheduledTime: {}, delta: {}", + NAME, ownerName, ownerNumber, t, now, t.getScheduledTime(), t.getScheduledTime() - now); + if (QueueUtils.offer(getQueue(), t)) { + t.setState(INIT); + logger.debug("--> {} {}:{} put task \'{}\'", NAME, ownerName, ownerNumber, t); + } + }); + }); + } + + private Map> getHotLevelTasks(long timeStamp) { + final Set localHotKeys = new HashSet<>(getIMap() + .localKeySet(Predicates.and(Predicates.lessEqual("scheduledTime", timeStamp), Predicates.equal("state", IDLE)))); + return (localHotKeys.isEmpty() + ? Collections.emptyList() + : getIMap().values((Predicate) e -> localHotKeys.contains(e.getKey()))) + .stream().collect(Collectors.groupingBy(TimeTask::getScheduledTime, Collectors.toList())); + } + + private void shutdownWatcher() { + if (watcherState.compareAndSet(true, false)) { + logger.debug("{} {}:{} waiting for shutdown process to complete...", NAME, ownerName, ownerNumber); + } + } +} diff --git a/hz-node/src/main/java/ru/shishmakov/core/Node.java b/hz-node/src/main/java/ru/shishmakov/core/Node.java new file mode 100644 index 0000000..b3fe65e --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/core/Node.java @@ -0,0 +1,128 @@ +package ru.shishmakov.core; + +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.shishmakov.concurrent.LifeCycle; +import ru.shishmakov.concurrent.ServiceController; +import ru.shishmakov.hz.HzService; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Singleton; +import java.lang.invoke.MethodHandles; +import java.time.LocalDateTime; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static ru.shishmakov.concurrent.LifeCycle.*; +import static ru.shishmakov.concurrent.Threads.*; + +/** + * Start point of the Hazelcast node + * + * @author Dmitriy Shishmakov on 10.03.17 + */ +@Singleton +public class Node { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String NODE_SYSTEM_KEY = "node"; + private static final String NAME = MethodHandles.lookup().lookupClass().getSimpleName(); + private static final AtomicReference NODE_STATE = new AtomicReference<>(IDLE); + private static final CountDownLatch awaitStart = new CountDownLatch(1); + + @Inject + @Named("node.executor") + private ExecutorService executor; + @Inject + private HzService hzService; + @Inject + private TaskTimeService timeService; + @Inject + private ServiceController serviceController; + private final int nodeNumber; + + public Node() { + this.nodeNumber = Integer.valueOf(System.getProperty(NODE_SYSTEM_KEY, "0")); + } + + @PostConstruct + public void setUp() { + logger.info("----- // ----- {}: {} START {} ----- // -----", NAME, nodeNumber, LocalDateTime.now()); + this.timeService.setMetaInfo(nodeNumber, "Node"); + this.serviceController.setMetaInfo(nodeNumber, NAME); + } + + @PreDestroy + public void tearDown() { + logger.info("----- // ----- {}: {} STOP {} ----- // -----", NAME, nodeNumber, LocalDateTime.now()); + } + + public Node startAsync() { + new Thread(this::start, "node-hz-" + nodeNumber).start(); + return this; + } + + public Node start() { + logger.info("{}: {} starting...", NAME, nodeNumber); + + final LifeCycle state = NODE_STATE.get(); + if (LifeCycle.isNotIdle(state)) { + logger.warn("Warning! {}: {} already started, state: {}", NAME, nodeNumber, state); + return this; + } + NODE_STATE.set(INIT); + awaitStart.countDown(); + serviceController.startServices(hzService, timeService); + assignThreadHook(this::stop, "node-" + nodeNumber + "-hook-thread"); + + NODE_STATE.set(RUN); + logger.info("{}: {} started, state: {}", NAME, nodeNumber, NODE_STATE.get()); + return this; + } + + + public void stop() { + logger.info("{}: {} stopping...", NAME, nodeNumber); + final LifeCycle state = NODE_STATE.get(); + if (LifeCycle.isNotRun(state)) { + logger.warn("Warning! {}: {} already stopped, state: {}", NAME, nodeNumber, state); + return; + } + + try { + NODE_STATE.set(STOPPING); + serviceController.stopServices(); + stopExecutors(); + } finally { + NODE_STATE.set(IDLE); + logger.info("{}: {} stopped, state: {}", NAME, nodeNumber, NODE_STATE.get()); + } + } + + public void await() throws InterruptedException { + awaitStart.await(); + Thread.currentThread().setName("node-main-" + nodeNumber); + logger.info("{}: {} thread: {} await the state: {} to stop itself", NAME, nodeNumber, Thread.currentThread(), IDLE); + for (long count = 0; LifeCycle.isNotIdle(NODE_STATE.get()); count++) { + if (count % 100 == 0) logger.debug("Thread: {} is alive", Thread.currentThread()); + sleepWithoutInterruptedAfterTimeout(100, MILLISECONDS); + } + } + + private void stopExecutors() { + logger.info("{}: {} executor services stopping...", NAME, nodeNumber); + try { + MoreExecutors.shutdownAndAwaitTermination(executor, STOP_TIMEOUT_SEC, SECONDS); + logger.info("{}: {} executor services stopped", NAME, nodeNumber); + } catch (Exception e) { + logger.error("{}: {} exception occurred during stopping executor services", NAME, nodeNumber, e); + } + } +} diff --git a/hz-node/src/main/java/ru/shishmakov/core/NodeModule.java b/hz-node/src/main/java/ru/shishmakov/core/NodeModule.java new file mode 100644 index 0000000..bc3f350 --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/core/NodeModule.java @@ -0,0 +1,66 @@ +package ru.shishmakov.core; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import org.aeonbits.owner.ConfigFactory; +import ru.shishmakov.concurrent.ThreadPoolBuilder; +import ru.shishmakov.config.HzConfig; +import ru.shishmakov.config.TimeConfig; +import ru.shishmakov.hz.TimeTask; +import ru.vyarus.guice.ext.ExtAnnotationsModule; + +import javax.inject.Named; +import javax.inject.Singleton; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; + +/** + * @author Dmitriy Shishmakov on 11.03.17 + */ +public class NodeModule extends AbstractModule { + + private static final int QUEUE_CAPACITY = 10_240; + + @Override + protected void configure() { + binder().install(new ExtAnnotationsModule()); + } + + @Provides + @Singleton + @Named("node.executor") + public ExecutorService starterExecutor() { + final int cores = Runtime.getRuntime().availableProcessors(); + return ThreadPoolBuilder.pool("node.executor") + .withThreads(cores, cores * 4) + .withSyncQueue() + .build(); + } + + @Provides + @Singleton + @Named("timeQueue.firstLevel") + public BlockingQueue queueFirstLevel() { + return new ArrayBlockingQueue<>(QUEUE_CAPACITY); + } + + @Provides + @Singleton + @Named("timeQueue.secondLevel") + public BlockingQueue queueSecondLevel() { + return new ArrayBlockingQueue<>(QUEUE_CAPACITY); + } + + @Provides + @Singleton + public TimeConfig timeConfig() { + return ConfigFactory.create(TimeConfig.class); + } + + @Provides + @Singleton + public HzConfig hzConfig() { + return ConfigFactory.create(HzConfig.class); + } +} diff --git a/hz-node/src/main/java/ru/shishmakov/core/SecondLevelConsumer.java b/hz-node/src/main/java/ru/shishmakov/core/SecondLevelConsumer.java new file mode 100644 index 0000000..eaf8943 --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/core/SecondLevelConsumer.java @@ -0,0 +1,41 @@ +package ru.shishmakov.core; + +import com.hazelcast.core.IMap; +import ru.shishmakov.hz.TimeTask; + +import javax.inject.Inject; +import javax.inject.Named; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Second level is a subset of main tasks retrieve from {@link BlockingQueue} and execute them + * + * @author Dmitriy Shishmakov on 18.03.17 + */ +public class SecondLevelConsumer extends LevelConsumer { + + private static final AtomicInteger iteratorSecondLevel = new AtomicInteger(); + + @Inject + @Named("timeQueue.secondLevel") + private BlockingQueue queueSecondLevel; + private IMap mapSecondLevel; + + @Override + protected BlockingQueue getQueue() { + return queueSecondLevel; + } + + @Override + protected IMap getIMap() { + return mapSecondLevel; + } + + @Override + public void start() { + this.mapSecondLevel = hzObjects.getSecondLevelMap(); + this.selfNumber = iteratorSecondLevel.incrementAndGet(); + super.start(); + } +} diff --git a/hz-node/src/main/java/ru/shishmakov/core/SecondLevelWatcher.java b/hz-node/src/main/java/ru/shishmakov/core/SecondLevelWatcher.java new file mode 100644 index 0000000..816a663 --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/core/SecondLevelWatcher.java @@ -0,0 +1,58 @@ +package ru.shishmakov.core; + +import com.hazelcast.core.IMap; +import ru.shishmakov.hz.TimeTask; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Singleton; +import java.util.concurrent.BlockingQueue; + +/** + * Second level is a subset of main tasks retrieve from {@link IMap} and put them to {@link BlockingQueue} + * + * @author Dmitriy Shishmakov on 16.03.17 + */ +@Singleton +public class SecondLevelWatcher extends LevelWatcher { + + @Inject + @Named("timeQueue.secondLevel") + private BlockingQueue queueSecondLevel; + private IMap mapSecondLevel; + + @Override + protected BlockingQueue getQueue() { + return queueSecondLevel; + } + + @Override + protected IMap getIMap() { + return mapSecondLevel; + } + + @Override + public void start() { + this.mapSecondLevel = hzObjects.getSecondLevelMap(); + super.start(); + } + +} + + + + + + + + + + + + + + + + + + diff --git a/hz-node/src/main/java/ru/shishmakov/core/TaskTimeService.java b/hz-node/src/main/java/ru/shishmakov/core/TaskTimeService.java new file mode 100644 index 0000000..31412f9 --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/core/TaskTimeService.java @@ -0,0 +1,108 @@ +package ru.shishmakov.core; + +import com.google.common.util.concurrent.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.shishmakov.config.HzConfig; +import ru.shishmakov.hz.HzService; +import ru.shishmakov.hz.TimeTask; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Provider; +import javax.inject.Singleton; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Service declares the rules to start and shutdown watchers and consumers for {@link TimeTask} + * + * @author Dmitriy Shishmakov on 15.03.17 + */ +@Singleton +public class TaskTimeService extends AbstractService { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String NAME = MethodHandles.lookup().lookupClass().getSimpleName(); + + @Inject + @Named("node.executor") + private ExecutorService executor; + @Inject + private HzConfig hzConfig; + @Inject + private HzService hzService; + @Inject + private FirstLevelWatcher flWatcher; + @Inject + private SecondLevelWatcher slWatcher; + private Provider flConsumer; + private Provider slConsumer; + private final List consumers = new ArrayList<>(); + private int ownerNumber; + private String ownerName; + + @Inject + public TaskTimeService(Provider flConsumer, Provider slConsumer) { + this.flConsumer = flConsumer; + this.slConsumer = slConsumer; + } + + public void setMetaInfo(int ownerNumber, String ownerName) { + this.ownerNumber = ownerNumber; + this.ownerName = ownerName; + } + + @Override + protected void doStart() { + logger.info("{} {}:{} starting...", NAME, ownerName, ownerNumber); + try { + startTimeService(); + notifyStarted(); + logger.info("{} {}:{} started", NAME, ownerName, ownerNumber); + } catch (Throwable e) { + notifyFailed(e); + } + } + + @Override + protected void doStop() { + logger.info("{} {}:{} stopping...", NAME, ownerName, ownerNumber); + try { + stopTimeService(); + notifyStopped(); + logger.info("{} {}:{} stopped", NAME, ownerName, ownerNumber); + } catch (Throwable e) { + notifyFailed(e); + } + } + + protected void startTimeService() throws TimeoutException { + hzService.awaitRunning(hzConfig.clientInitialWaitTimeoutSec(), SECONDS); + flWatcher.setMetaInfo(ownerNumber, ownerName); + slWatcher.setMetaInfo(ownerNumber, ownerName); + executor.execute(() -> flWatcher.start()); + executor.execute(() -> slWatcher.start()); + + for (int count = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); count > 0; count--) { + executor.execute(() -> defineLevelConsumer(flConsumer.get()).start()); + executor.execute(() -> defineLevelConsumer(slConsumer.get()).start()); + } + } + + private LevelConsumer defineLevelConsumer(LevelConsumer consumer) { + consumer.setMetaInfo(ownerNumber, ownerName); + consumers.add(consumer); + return consumer; + } + + protected void stopTimeService() throws InterruptedException { + flWatcher.stop(); + slWatcher.stop(); + consumers.forEach(LevelConsumer::stop); + } +} diff --git a/hz-node/src/main/java/ru/shishmakov/node/Main.java b/hz-node/src/main/java/ru/shishmakov/node/Main.java new file mode 100644 index 0000000..b88543c --- /dev/null +++ b/hz-node/src/main/java/ru/shishmakov/node/Main.java @@ -0,0 +1,17 @@ +package ru.shishmakov.node; + +import com.google.inject.Guice; +import ru.shishmakov.core.Node; +import ru.shishmakov.core.NodeModule; + +/** + * @author Dmitriy Shishmakov on 11.03.17 + */ +public class Main { + + public static void main(String[] args) throws Exception { + Guice.createInjector(new NodeModule()) + .getInstance(Node.class) + .startAsync().await(); + } +} diff --git a/hz-node/src/main/resources/logback.xml b/hz-node/src/main/resources/logback.xml new file mode 100644 index 0000000..79f3bbf --- /dev/null +++ b/hz-node/src/main/resources/logback.xml @@ -0,0 +1,31 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n + false + + + + + logs/node.log + + logs/node-%d{yyyy-MM-dd}.log.zip + 14 + 3GB + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n + false + + + + + + + + + + + diff --git a/scripts/start_hz_client.sh b/scripts/start_hz_client.sh new file mode 100755 index 0000000..cb692f9 --- /dev/null +++ b/scripts/start_hz_client.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +java -jar -Dclient=1 -Dfile.encoding=UTF-8 ../client-cli/build/libs/client-cli-all-*.jar diff --git a/scripts/start_hz_cluster.sh b/scripts/start_hz_cluster.sh new file mode 100755 index 0000000..de71907 --- /dev/null +++ b/scripts/start_hz_cluster.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +java -jar -Dnode=1 -Dfile.encoding=UTF-8 ../hz-node/build/libs/hz-node-all-*.jar & +sleep 0.1 +java -jar -Dnode=2 -Dfile.encoding=UTF-8 ../hz-node/build/libs/hz-node-all-*.jar & +sleep 0.1 +java -jar -Dnode=3 -Dfile.encoding=UTF-8 ../hz-node/build/libs/hz-node-all-*.jar & diff --git a/scripts/stop_hz_cluster.sh b/scripts/stop_hz_cluster.sh new file mode 100755 index 0000000..9fc9856 --- /dev/null +++ b/scripts/stop_hz_cluster.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +pkill -f hz-node-all-* diff --git a/settings.gradle b/settings.gradle index e1c8327..e8f4b57 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,11 +8,8 @@ * in the user guide at https://docs.gradle.org/3.4.1/userguide/multi_project_builds.html */ -/* -// To declare projects as part of a multi-project build use the 'include' method -include 'shared' -include 'api' -include 'services:webservice' -*/ - rootProject.name = 'time-hz-executor' + +include 'shared' +include 'client-cli' +include 'hz-node' diff --git a/shared/build.gradle b/shared/build.gradle new file mode 100644 index 0000000..bb9779c --- /dev/null +++ b/shared/build.gradle @@ -0,0 +1 @@ +// to do nothing diff --git a/shared/src/main/java/ru/shishmakov/concurrent/LifeCycle.java b/shared/src/main/java/ru/shishmakov/concurrent/LifeCycle.java new file mode 100644 index 0000000..a9257bb --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/concurrent/LifeCycle.java @@ -0,0 +1,37 @@ +package ru.shishmakov.concurrent; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * @author Dmitriy Shishmakov on 12.03.17 + */ +public enum LifeCycle { + IDLE, + INIT, + RUN, + STOPPING; + + public static boolean isNotIdle(LifeCycle state) { + return !isIdle(state); + } + + public static boolean isIdle(LifeCycle state) { + return checkNotNull(state, "state is null") == IDLE; + } + + public static boolean isNotStopping(LifeCycle state) { + return !isStopping(state); + } + + public static boolean isStopping(LifeCycle state) { + return checkNotNull(state, "state is null") == STOPPING; + } + + public static boolean isNotRun(LifeCycle state) { + return !isRun(state); + } + + public static boolean isRun(LifeCycle state) { + return checkNotNull(state, "state is null") == RUN; + } +} diff --git a/shared/src/main/java/ru/shishmakov/concurrent/ServiceController.java b/shared/src/main/java/ru/shishmakov/concurrent/ServiceController.java new file mode 100644 index 0000000..084e448 --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/concurrent/ServiceController.java @@ -0,0 +1,96 @@ +package ru.shishmakov.concurrent; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Service; +import com.google.common.util.concurrent.ServiceManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.TimeUnit.SECONDS; +import static ru.shishmakov.concurrent.LifeCycle.*; +import static ru.shishmakov.concurrent.Threads.STOP_TIMEOUT_SEC; + +/** + * @author Dmitriy Shishmakov on 14.03.17 + */ +public class ServiceController { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final AtomicReference SERVICES_STATE = new AtomicReference<>(IDLE); + @Nullable + private volatile ServiceManager sm; + private int ownerNumber; + private String ownerName; + + public void setMetaInfo(int ownerNumber, String ownerName) { + this.ownerNumber = ownerNumber; + this.ownerName = ownerName; + } + + public void startServices(Service service, Service... services) { + logger.info("{}:{} services starting...", ownerName, ownerNumber); + final LifeCycle state = SERVICES_STATE.get(); + if (LifeCycle.isNotIdle(state)) { + logger.warn("Warning! {}:{} services already started, state: {}", ownerName, ownerNumber, state); + return; + } + + try { + SERVICES_STATE.set(INIT); + final ServiceManager sm = new ServiceManager(Lists.asList(service, services)); + sm.addListener(buildServiceListener(), MoreExecutors.directExecutor()); + sm.startAsync().awaitHealthy(); + this.sm = sm; + } catch (Throwable e) { + logger.error("Exception occurred during starting node services", e); + } finally { + SERVICES_STATE.set(RUN); + logger.info("{}:{} services started, state: {}", ownerName, ownerNumber, SERVICES_STATE.get()); + } + } + + public void stopServices() { + logger.info("{}:{} services stopping...", ownerName, ownerNumber); + final LifeCycle state = SERVICES_STATE.get(); + if (LifeCycle.isNotRun(state)) { + logger.warn("Warning! {}:{} services already stopped, state: {}", ownerName, ownerNumber, state); + return; + } + + try { + SERVICES_STATE.set(STOPPING); + checkNotNull(sm, "Service manager is null").stopAsync().awaitStopped(STOP_TIMEOUT_SEC, SECONDS); + sm = null; + } catch (Throwable e) { + logger.error("Exception occurred during stopping node services", e); + } finally { + SERVICES_STATE.set(IDLE); + logger.info("{}:{} services stopped, state: {}", ownerName, ownerNumber, SERVICES_STATE.get()); + } + } + + private ServiceManager.Listener buildServiceListener() { + return new ServiceManager.Listener() { + @Override + public void healthy() { + logger.info("Listener: {}:{} has started all services -->", ownerName, ownerNumber); + } + + @Override + public void stopped() { + logger.info("Listener: {}:{} has stopped all services <--", ownerName, ownerNumber); + } + + @Override + public void failure(Service service) { + logger.error("Error! {}:{} service: {} has crashed X--X", ownerName, ownerNumber, service, service.failureCause()); + } + }; + } +} diff --git a/shared/src/main/java/ru/shishmakov/concurrent/TaskRejectedHandler.java b/shared/src/main/java/ru/shishmakov/concurrent/TaskRejectedHandler.java new file mode 100644 index 0000000..a7ced85 --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/concurrent/TaskRejectedHandler.java @@ -0,0 +1,27 @@ +package ru.shishmakov.concurrent; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.LongAdder; + +/** + * @author Dmitriy Shishmakov on 11.03.17 + */ +public class TaskRejectedHandler implements RejectedExecutionHandler { + private final LongAdder count = new LongAdder(); + private final RejectedExecutionHandler handler; + + public TaskRejectedHandler(RejectedExecutionHandler handler) { + this.handler = handler; + } + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + count.increment(); + handler.rejectedExecution(r, executor); + } + + public long rejected(boolean needReset) { + return needReset ? count.sumThenReset() : count.longValue(); + } +} diff --git a/shared/src/main/java/ru/shishmakov/concurrent/ThreadPoolBuilder.java b/shared/src/main/java/ru/shishmakov/concurrent/ThreadPoolBuilder.java new file mode 100644 index 0000000..c46af3d --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/concurrent/ThreadPoolBuilder.java @@ -0,0 +1,114 @@ +package ru.shishmakov.concurrent; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.*; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * @author Dmitriy Shishmakov on 11.03.17 + */ +public class ThreadPoolBuilder { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final int DEFAULT_QUEUE_CAPACITY = 4096; + + private final TaskRejectedHandler rejectedHandler; + private final ThreadFactoryBuilder threadFactory; + private final String name; + + private int min; + private int max; + private BlockingQueue queue; + private long idleTime; + private TimeUnit idleUnit; + + private ThreadPoolBuilder(String name) { + this.name = name; + this.rejectedHandler = new TaskRejectedHandler(new AbortPolicy()); + this.threadFactory = defaultThreadFactory(name); + this.queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + this.min = 1; + this.max = Math.max(min, Runtime.getRuntime().availableProcessors() * 4); + this.idleTime = 60; + this.idleUnit = SECONDS; + } + + public static ThreadPoolBuilder pool(String name) { + return new ThreadPoolBuilder(checkNotNull(name, "pool name should not be null")); + } + + public ThreadPoolBuilder withIdleTime(long idleTime, TimeUnit unit) { + checkArgument(idleTime > 0, "idle time of thread should be positive value: %s", idleTime); + this.idleUnit = checkNotNull(unit, "time unit should not be null"); + this.idleTime = idleTime; + return this; + } + + public ThreadPoolBuilder withDaemonThreads(boolean daemon) { + threadFactory.setDaemon(daemon); + return this; + } + + public ThreadPoolBuilder withPriorityThreads(int priority) { + threadFactory.setPriority(priority); + return this; + } + + public ThreadPoolBuilder withThreads(int threads) { + return withMin(threads).withMax(threads); + } + + public ThreadPoolBuilder withThreads(int min, int max) { + return withMin(min).withMax(max); + } + + + public ThreadPoolBuilder withMin(int threads) { + checkArgument(threads >= 0, "thread count should not be negative value: %s", threads); + return (this.min = threads) > max ? withMax(threads) : this; + } + + public ThreadPoolBuilder withMax(int threads) { + checkArgument(threads >= 0, "threads should not be negative value: %s", threads); + return (this.max = threads) < min ? withMin(threads) : this; + } + + public ThreadPoolBuilder withQueue(BlockingQueue queue) { + this.queue = checkNotNull(queue, "queue should not be null"); + return this; + } + + public ThreadPoolBuilder withArrayQueue(int capacity) { + checkArgument(capacity > 0, "capacity should be positive value: %s", capacity); + return withQueue(new ArrayBlockingQueue<>(capacity)); + } + + public ThreadPoolBuilder withLinkedQueue(int capacity) { + checkArgument(capacity > 0, "capacity should be positive value: %s", capacity); + return withQueue(new LinkedBlockingQueue<>(capacity)); + } + + public ThreadPoolBuilder withSyncQueue() { + return withQueue(new SynchronousQueue<>()); + } + + public ThreadPoolExecutor build() { + logger.info("create thread pool: {}, threads [{}..{}], idleTime: {} {}, queue: {} {} ", + name, min, max, idleTime, idleUnit, queue.getClass().getSimpleName(), queue); + return new ThreadPoolExecutor(min, max, idleTime, idleUnit, queue, threadFactory.build(), rejectedHandler); + } + + private static ThreadFactoryBuilder defaultThreadFactory(String name) { + ThreadFactoryBuilder factory = new ThreadFactoryBuilder(); + factory.setNameFormat(name + "-%d"); + factory.setUncaughtExceptionHandler((t, e) -> logger.warn("thread pool: " + name + " has unhandled exception", e)); + return factory; + } +} diff --git a/shared/src/main/java/ru/shishmakov/concurrent/Threads.java b/shared/src/main/java/ru/shishmakov/concurrent/Threads.java new file mode 100644 index 0000000..c753adc --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/concurrent/Threads.java @@ -0,0 +1,63 @@ +package ru.shishmakov.concurrent; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * @author Dmitriy Shishmakov on 13.03.17 + */ +public final class Threads { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final long STOP_TIMEOUT_SEC = 10; + + /** + * Modified version of the method {@link Uninterruptibles#sleepUninterruptibly(long, TimeUnit)} + * suppresses {@link InterruptedException} occurred in time of timeout + */ + public static void sleepWithoutInterruptedAfterTimeout(long timeout, TimeUnit unit) { + long remainingNanos = unit.toNanos(timeout); + final long end = System.nanoTime() + remainingNanos; + while (true) { + try { + // TimeUnit.sleep() treats negative timeouts just like zero. + NANOSECONDS.sleep(remainingNanos); + return; + } catch (InterruptedException e) { + remainingNanos = end - System.nanoTime(); + } + } + } + + /** + * Version of the method {@link Uninterruptibles#sleepUninterruptibly(long, TimeUnit)} + * interrupts the thread after timeout if {@link InterruptedException} happened + */ + public static void sleepWithInterruptedAfterTimeout(long timeout, TimeUnit unit) { + Uninterruptibles.sleepUninterruptibly(timeout, unit); + } + + /** + * The thread could interrupted in time of timeout + */ + public static void sleepInterrupted(long timeout, TimeUnit unit) { + try { + Thread.sleep(unit.toMillis(timeout)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public static void assignThreadHook(Runnable task, String name) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.debug("Thread: {} was interrupted by hook", Thread.currentThread()); + task.run(); + }, name)); + } +} diff --git a/shared/src/main/java/ru/shishmakov/config/HzConfig.java b/shared/src/main/java/ru/shishmakov/config/HzConfig.java new file mode 100644 index 0000000..72ced45 --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/config/HzConfig.java @@ -0,0 +1,19 @@ +package ru.shishmakov.config; + +import org.aeonbits.owner.Config; + +/** + * @author Dmitriy Shishmakov on 12.03.17 + */ +@Config.Sources({"file:config/hz.properties", "classpath:config/hz.properties"}) +public interface HzConfig extends Config { + + @DefaultValue("true") + boolean server(); + + @DefaultValue("2") + int clientMinClusterSize(); + + @DefaultValue("30") + long clientInitialWaitTimeoutSec(); +} diff --git a/shared/src/main/java/ru/shishmakov/config/TimeConfig.java b/shared/src/main/java/ru/shishmakov/config/TimeConfig.java new file mode 100644 index 0000000..56948ae --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/config/TimeConfig.java @@ -0,0 +1,22 @@ +package ru.shishmakov.config; + +import org.aeonbits.owner.Config; + +/** + * @author Dmitriy Shishmakov on 11.03.17 + */ +@Config.Sources({"file:config/time.properties", "classpath:config/time.properties"}) +public interface TimeConfig extends Config { + + @DefaultValue("250") + @Key("time.scanIntervalMs") + long scanIntervalMs(); + + @DefaultValue("10000") + @Key("time.hotTaskUpperBoundMs") + long hotTaskUpperBoundMs(); + + @DefaultValue("false") + @Key("schedule.rejectOldTasks") + boolean isRejectOldTasks(); +} diff --git a/shared/src/main/java/ru/shishmakov/hz/HzBuilder.java b/shared/src/main/java/ru/shishmakov/hz/HzBuilder.java new file mode 100644 index 0000000..d77f3d9 --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/hz/HzBuilder.java @@ -0,0 +1,83 @@ +package ru.shishmakov.hz; + +import com.google.common.base.Stopwatch; +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.client.config.XmlClientConfigBuilder; +import com.hazelcast.config.ClasspathXmlConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.commons.lang3.Range; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.shishmakov.config.HzConfig; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static ru.shishmakov.concurrent.Threads.sleepWithInterruptedAfterTimeout; + +/** + * Class builds client or server instance of Hazelcast + * + * @author Dmitriy Shishmakov on 12.03.17 + */ +public class HzBuilder { + private static Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final HzConfig hzConfig; + private final String configFile; + private boolean kryo; + + private HzBuilder(HzConfig hzConfig) { + this.hzConfig = hzConfig; + this.configFile = hzConfig.server() ? "hazelcast.xml" : "hazelcast-client.xml"; + } + + public static HzBuilder instance(HzConfig hzConfig) { + return new HzBuilder(hzConfig); + } + + public HzBuilder withKryo() { + throw new UnsupportedOperationException("kryo is not supported yet!"); +// this.kryo = true; +// return this; + } + + public HazelcastInstance build() throws InterruptedException { + return hzConfig.server() ? buildHZInstance() : buildHZClientInstance(); + } + + private HazelcastInstance buildHZInstance() { + logger.debug("Load HZ server instance..."); + return Hazelcast.newHazelcastInstance(new ClasspathXmlConfig(configFile)); + } + + private HazelcastInstance buildHZClientInstance() throws InterruptedException { + logger.debug("Load HZ client instance..."); + try { + final HazelcastInstance instance = HazelcastClient.newHazelcastClient(new XmlClientConfigBuilder(configFile).build()); + awaitClusterQuorum(instance); + return instance; + } catch (IOException e) { + throw new IllegalStateException("Could not load client config file", e); + } + } + + private void awaitClusterQuorum(HazelcastInstance client) throws InterruptedException { + if (hzConfig.clientMinClusterSize() <= 0) return; + + final Range timeout = Range.between(0L, SECONDS.toMillis(hzConfig.clientInitialWaitTimeoutSec())); + final Stopwatch sw = Stopwatch.createStarted(); + while (client.getCluster().getMembers().size() < hzConfig.clientMinClusterSize()) { + logger.debug("Client await cluster quorum... found: {}, need: {}", client.getCluster().getMembers().size(), + hzConfig.clientMinClusterSize()); + sleepWithInterruptedAfterTimeout(1, SECONDS); + if (timeout.isBefore(sw.elapsed(MILLISECONDS))) { + throw new RuntimeException("Wait timeout is exceeded, elapsed: " + sw.elapsed(MILLISECONDS)); + } + } + logger.info("Client wasted time to have a cluster quorum: {}", sw); + } +} diff --git a/shared/src/main/java/ru/shishmakov/hz/HzCallable.java b/shared/src/main/java/ru/shishmakov/hz/HzCallable.java new file mode 100644 index 0000000..6661c7d --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/hz/HzCallable.java @@ -0,0 +1,28 @@ +package ru.shishmakov.hz; + +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.HazelcastInstanceAware; + +import javax.annotation.Nullable; +import java.io.Serializable; +import java.util.concurrent.Callable; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * @author Dmitriy Shishmakov on 16.03.17 + */ +public abstract class HzCallable implements Callable, Serializable, HazelcastInstanceAware { + @Nullable + private transient HazelcastInstance hz; + + @Override + public void setHazelcastInstance(HazelcastInstance hz) { + this.hz = checkNotNull(hz, "hazelcast instance is null"); + } + + @Nullable + public HazelcastInstance getHz() { + return checkNotNull(hz, "hazelcast instance is null"); + } +} diff --git a/shared/src/main/java/ru/shishmakov/hz/HzObjects.java b/shared/src/main/java/ru/shishmakov/hz/HzObjects.java new file mode 100644 index 0000000..a3fd66f --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/hz/HzObjects.java @@ -0,0 +1,44 @@ +package ru.shishmakov.hz; + +import com.hazelcast.core.IMap; +import com.hazelcast.core.IdGenerator; + +import javax.inject.Inject; +import javax.inject.Singleton; + +/** + * Distributed objects of Hazelcast + * + * @author Dmitriy Shishmakov on 16.03.17 + */ +@Singleton +public class HzObjects { + + @Inject + private HzService hzService; + + private enum Maps { + FIRST_LEVEL, + SECOND_LEVEL + } + + private enum IdGenerators { + TASK_ID + } + + public long getClusterTime() { + return hzService.getHzInstance().getCluster().getClusterTime(); + } + + public IdGenerator getTaskIdGenerator() { + return hzService.getHzInstance().getIdGenerator(IdGenerators.TASK_ID.name()); + } + + public IMap getFirstLevelMap() { + return hzService.getHzInstance().getMap(Maps.FIRST_LEVEL.name()); + } + + public IMap getSecondLevelMap() { + return hzService.getHzInstance().getMap(Maps.SECOND_LEVEL.name()); + } +} diff --git a/shared/src/main/java/ru/shishmakov/hz/HzService.java b/shared/src/main/java/ru/shishmakov/hz/HzService.java new file mode 100644 index 0000000..8be5966 --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/hz/HzService.java @@ -0,0 +1,81 @@ +package ru.shishmakov.hz; + +import com.google.common.util.concurrent.AbstractService; +import com.hazelcast.core.HazelcastInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.shishmakov.config.HzConfig; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import javax.inject.Singleton; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Service declares the rules to start and shutdown instance of Hazelcast + * + * @author Dmitriy Shishmakov on 11.03.17 + */ +@Singleton +public class HzService extends AbstractService { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final AtomicReference HZ_INSTANCE = new AtomicReference<>(); + + @Inject + private HzConfig hzConfig; + private String label; + + @PostConstruct + public void setUp() { + this.label = hzConfig.server() ? "server" : "client"; + } + + public HazelcastInstance getHzInstance() { + return checkNotNull(HZ_INSTANCE.get(), "hz instance is null"); + } + + public boolean hasHzInstance() { + return HZ_INSTANCE.get() != null; + } + + @Override + protected void doStart() { + logger.info("Hz {} service starting...", label); + try { + startHz(); + notifyStarted(); + logger.info("Hz {} service started", label); + } catch (Throwable e) { + notifyFailed(e); + } + } + + @Override + protected void doStop() { + logger.info("Hz {} service stopping...", label); + try { + stopHz(); + notifyStopped(); + logger.info("Hz {} service stopped", label); + } catch (Throwable e) { + notifyFailed(e); + } + } + + protected void startHz() throws InterruptedException { + final HazelcastInstance current = HzBuilder.instance(hzConfig).build(); + if (HZ_INSTANCE.getAndSet(current) != null) { + logger.warn("Warning! Hz {} service already has instance", label); + } + } + + protected void stopHz() { + final HazelcastInstance current = HZ_INSTANCE.getAndSet(null); + if (current != null) current.shutdown(); + else logger.warn("Warning! Hz {} service is not available to stop", label); + } +} diff --git a/shared/src/main/java/ru/shishmakov/hz/MessageTask.java b/shared/src/main/java/ru/shishmakov/hz/MessageTask.java new file mode 100644 index 0000000..0617ca1 --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/hz/MessageTask.java @@ -0,0 +1,35 @@ +package ru.shishmakov.hz; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; + +/** + * Callable wrapper for executing + * + * @author Dmitriy Shishmakov on 20.03.17 + */ +public class MessageTask extends HzCallable { + private static Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final long scheduledTime; + private final String message; + + public MessageTask(String message, LocalDateTime localDateTime) { + this.message = message; + this.scheduledTime = localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + } + + @Override + public Void call() throws Exception { + logger.info("Run task; time: {}, message: {}", + LocalDateTime.ofInstant(Instant.ofEpochMilli(scheduledTime), ZoneId.of("UTC")), + message); + return null; + } +} diff --git a/shared/src/main/java/ru/shishmakov/hz/TimeTask.java b/shared/src/main/java/ru/shishmakov/hz/TimeTask.java new file mode 100644 index 0000000..cd3235c --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/hz/TimeTask.java @@ -0,0 +1,79 @@ +package ru.shishmakov.hz; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import ru.shishmakov.concurrent.LifeCycle; + +import javax.annotation.Nonnull; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Comparator; +import java.util.concurrent.Callable; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE; +import static ru.shishmakov.concurrent.LifeCycle.IDLE; + +/** + * Task wrapper for sending data between Hazelcast client and server + * + * @author Dmitriy Shishmakov on 16.03.17 + */ +public class TimeTask extends HzCallable implements Comparable { + private static final Comparator TT_COMPARATOR = buildTaskTimeComparator(); + + private final long orderId; + private final long scheduledTime; + private final Callable task; + private volatile LifeCycle state; + + public TimeTask(long orderId, LocalDateTime localDateTime, Callable task) { + this.orderId = orderId; + this.scheduledTime = localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(); + this.task = task; + this.state = IDLE; + } + + public long getOrderId() { + return orderId; + } + + public long getScheduledTime() { + return scheduledTime; + } + + public Callable getTask() { + return task; + } + + public LifeCycle getState() { + return state; + } + + public void setState(LifeCycle state) { + this.state = state; + } + + @Override + public Void call() throws Exception { + task.call(); + return null; + } + + @Override + public String toString() { + return new ToStringBuilder(this, SHORT_PREFIX_STYLE) + .append("orderId", orderId) + .append("scheduledTime", scheduledTime) + .toString(); + } + + @Override + public int compareTo(@Nonnull TimeTask other) { + return TT_COMPARATOR.compare(this, checkNotNull(other, "{} is null", TimeTask.class.getSimpleName())); + } + + private static Comparator buildTaskTimeComparator() { + return Comparator.comparing(TimeTask::getScheduledTime) + .thenComparing(TimeTask::getOrderId); + } +} diff --git a/shared/src/main/java/ru/shishmakov/util/QueueUtils.java b/shared/src/main/java/ru/shishmakov/util/QueueUtils.java new file mode 100644 index 0000000..8fd4190 --- /dev/null +++ b/shared/src/main/java/ru/shishmakov/util/QueueUtils.java @@ -0,0 +1,66 @@ +package ru.shishmakov.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * @author Dmitriy Shishmakov on 16.03.17 + */ +public final class QueueUtils { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int TIMES_DEFAULT = 11; + private static final int DELAY_DEFAULT = 20; + + public static Optional poll(BlockingQueue queue) { + return poll(queue, TIMES_DEFAULT, DELAY_DEFAULT, MILLISECONDS); + } + + /** + * @return true - if item inserted successfully, false otherwise + */ + public static boolean offer(BlockingQueue queue, T item) { + return offer(queue, item, TIMES_DEFAULT, DELAY_DEFAULT, MILLISECONDS); + } + + /** + * @return item from the queue + */ + public static Optional poll(BlockingQueue queue, int times, int delay, TimeUnit unit) { + try { + T item = null; + while (times-- > 0 && (item = queue.poll(delay, unit)) == null) { + logger.trace("effort: {} X--- item is absent; delay: {}", times, delay); + } + logger.trace("<--- take item: {}", (item == null) ? null : item.getClass().getSimpleName()); + return Optional.ofNullable(item); + } catch (Exception e) { + logger.error("Queue poll exception ...", e); + return Optional.empty(); + } + } + + /** + * @return true - if item inserted successfully, false otherwise + */ + public static boolean offer(BlockingQueue queue, T item, int times, int delay, TimeUnit unit) { + try { + boolean success = false; + while (--times > 0 && !(success = queue.offer(item, delay, unit))) { + logger.trace("effort: {} ---X reject item: {}; delay: {}", times, item.getClass().getSimpleName()); + } + if (success) logger.trace("---> insert item: {}", item.getClass().getSimpleName()); + return success; + } catch (Exception e) { + logger.error("Queue offer exception ...", e); + } + return false; + } +} diff --git a/shared/src/main/resources/hazelcast-client-group.xml b/shared/src/main/resources/hazelcast-client-group.xml new file mode 100644 index 0000000..12ad643 --- /dev/null +++ b/shared/src/main/resources/hazelcast-client-group.xml @@ -0,0 +1,10 @@ + + + + dev-node-hz + dev-node-hz + + diff --git a/shared/src/main/resources/hazelcast-client.xml b/shared/src/main/resources/hazelcast-client.xml new file mode 100644 index 0000000..e9e912f --- /dev/null +++ b/shared/src/main/resources/hazelcast-client.xml @@ -0,0 +1,29 @@ + + + + + + +
127.0.0.1:5701
+
127.0.0.1:5702
+
127.0.0.1:5703
+
+ + true + 5000 + 3000 + 0 + + + true + true + +
+ + + slf4j + +
diff --git a/shared/src/main/resources/hazelcast-group.xml b/shared/src/main/resources/hazelcast-group.xml new file mode 100644 index 0000000..453bb98 --- /dev/null +++ b/shared/src/main/resources/hazelcast-group.xml @@ -0,0 +1,10 @@ + + + + dev-node-hz + dev-node-hz + + diff --git a/shared/src/main/resources/hazelcast.xml b/shared/src/main/resources/hazelcast.xml new file mode 100644 index 0000000..efc7ffb --- /dev/null +++ b/shared/src/main/resources/hazelcast.xml @@ -0,0 +1,62 @@ + + + + + + slf4j + 30 + 2 + true + true + true + INFO + + + + 5701 + + + + 127.0.0.1:5701 + 127.0.0.1:5702 + 127.0.0.1:5703 + + + + + + 2 + + + + BINARY + 1 + 0 + 0 + 0 + LRU + 30 + 25 + 60000 + com.hazelcast.map.merge.PutIfAbsentMapMergePolicy + nodes-quorum + + + + BINARY + 1 + 0 + 0 + 0 + LRU + 30 + 25 + 60000 + com.hazelcast.map.merge.PutIfAbsentMapMergePolicy + nodes-quorum + + + diff --git a/shared/src/test/java/ru/shishmakov/BaseTest.java b/shared/src/test/java/ru/shishmakov/BaseTest.java new file mode 100644 index 0000000..03e797e --- /dev/null +++ b/shared/src/test/java/ru/shishmakov/BaseTest.java @@ -0,0 +1,33 @@ +package ru.shishmakov; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for JUnit test classes. + * + * @author Dmitriy Shishmakov on 16.03.17 + */ +public abstract class BaseTest { + + /** + * Logger used by test. + */ + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Rule + public TestName testName = new TestName(); + + @Before + public void setUp() { + logTestStart(); + } + + private void logTestStart() { + logger.info("Running test \"{}\"", testName.getMethodName()); + } + +} diff --git a/shared/src/test/java/ru/shishmakov/util/PriorityBlockingQueueTest.java b/shared/src/test/java/ru/shishmakov/util/PriorityBlockingQueueTest.java new file mode 100644 index 0000000..ad45fe3 --- /dev/null +++ b/shared/src/test/java/ru/shishmakov/util/PriorityBlockingQueueTest.java @@ -0,0 +1,46 @@ +package ru.shishmakov.util; + +import org.junit.Test; +import ru.shishmakov.BaseTest; +import ru.shishmakov.hz.TimeTask; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static junit.framework.TestCase.assertFalse; +import static org.apache.commons.lang3.ArrayUtils.isSorted; +import static org.junit.Assert.assertTrue; + +/** + * @author Dmitriy Shishmakov on 16.03.17 + */ +public class PriorityBlockingQueueTest extends BaseTest { + + private static final Callable DUMMY_TASK = () -> null; + + @Test + public void pollingShouldRetrieveSortedElements() { + final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC")); + final List tasksBefore = LongStream.range(0, 10) + .boxed().map(id -> new TimeTask(id, now.plusHours(id), DUMMY_TASK)) + .collect(Collectors.toList()); + + Collections.shuffle(tasksBefore); + assertFalse("Tasks should be unsorted", isSorted(tasksBefore.toArray(new TimeTask[tasksBefore.size()]))); + + BlockingQueue queue = new PriorityBlockingQueue<>(tasksBefore.size()); + tasksBefore.forEach(t -> QueueUtils.offer(queue, t)); + final List tasksAfter = new ArrayList<>(queue.size()); + while (!queue.isEmpty()) QueueUtils.poll(queue).ifPresent(tasksAfter::add); + + assertTrue("Tasks should be sorted", isSorted(tasksAfter.toArray(new TimeTask[tasksAfter.size()]))); + } +}