diff --git a/gradle.properties b/gradle.properties index 7972f1b76..e2026f8cc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=0.91.3 +version=0.91.4 springVersion=4.3.3.RELEASE springBootVersion=1.4.1.RELEASE jerseyVersion=2.24 diff --git a/micro-async-data-loader/src/test/java/app/loader/scheduled/com/aol/micro/server/CouchbaseResource.java b/micro-async-data-loader/src/test/java/app/loader/scheduled/com/aol/micro/server/CouchbaseResource.java index 7b5e218ec..6d0969b3a 100644 --- a/micro-async-data-loader/src/test/java/app/loader/scheduled/com/aol/micro/server/CouchbaseResource.java +++ b/micro-async-data-loader/src/test/java/app/loader/scheduled/com/aol/micro/server/CouchbaseResource.java @@ -4,6 +4,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.collections.immutable.LinkedListX; import cyclops.control.Maybe; import org.springframework.beans.factory.annotation.Autowired; @@ -20,11 +21,11 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; private volatile LinkedListX dataLoads = LinkedListX.empty(); @Autowired - public CouchbaseResource(DistributedMap client, EventBus bus) { + public CouchbaseResource(DistributedCache client, EventBus bus) { this.client = client; bus.register(this); } diff --git a/micro-async-data-loader/src/test/java/app/loader/scheduled/off/com/aol/micro/server/CouchbaseResource.java b/micro-async-data-loader/src/test/java/app/loader/scheduled/off/com/aol/micro/server/CouchbaseResource.java index 190d1eb13..a06298f7f 100644 --- a/micro-async-data-loader/src/test/java/app/loader/scheduled/off/com/aol/micro/server/CouchbaseResource.java +++ b/micro-async-data-loader/src/test/java/app/loader/scheduled/off/com/aol/micro/server/CouchbaseResource.java @@ -4,6 +4,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.collections.immutable.LinkedListX; import cyclops.control.Maybe; import org.springframework.beans.factory.annotation.Autowired; @@ -20,11 +21,11 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; private volatile LinkedListX dataLoads = LinkedListX.empty(); @Autowired - public CouchbaseResource(DistributedMap client, EventBus bus) { + public CouchbaseResource(DistributedCache client, EventBus bus) { this.client = client; bus.register(this); } diff --git a/micro-async-data-writer/src/test/java/app/cleaner/off/scheduled/com/aol/micro/server/CouchbaseResource.java b/micro-async-data-writer/src/test/java/app/cleaner/off/scheduled/com/aol/micro/server/CouchbaseResource.java index 9d30b5aaf..683da1a18 100644 --- a/micro-async-data-writer/src/test/java/app/cleaner/off/scheduled/com/aol/micro/server/CouchbaseResource.java +++ b/micro-async-data-writer/src/test/java/app/cleaner/off/scheduled/com/aol/micro/server/CouchbaseResource.java @@ -4,13 +4,12 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.collections.immutable.LinkedListX; import cyclops.control.Maybe; import org.springframework.beans.factory.annotation.Autowired; - import com.aol.micro.server.auto.discovery.Rest; -import com.aol.micro.server.distributed.DistributedMap; import com.aol.micro.server.events.SystemData; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; @@ -19,12 +18,12 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; private volatile LinkedListX dataCleans = LinkedListX.empty(); @Autowired - public CouchbaseResource(DistributedMap client, EventBus bus) { + public CouchbaseResource(DistributedCache client, EventBus bus) { this.client = client; bus.register(this); } diff --git a/micro-async-data-writer/src/test/java/app/cleaner/scheduled/com/aol/micro/server/CouchbaseResource.java b/micro-async-data-writer/src/test/java/app/cleaner/scheduled/com/aol/micro/server/CouchbaseResource.java index cc0c3008e..ae0348476 100644 --- a/micro-async-data-writer/src/test/java/app/cleaner/scheduled/com/aol/micro/server/CouchbaseResource.java +++ b/micro-async-data-writer/src/test/java/app/cleaner/scheduled/com/aol/micro/server/CouchbaseResource.java @@ -4,6 +4,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.collections.immutable.LinkedListX; import cyclops.control.Maybe; import org.springframework.beans.factory.annotation.Autowired; @@ -19,12 +20,12 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; private volatile LinkedListX dataCleans = LinkedListX.empty(); @Autowired - public CouchbaseResource(DistributedMap client, EventBus bus) { + public CouchbaseResource(DistributedCache client, EventBus bus) { this.client = client; bus.register(this); } diff --git a/micro-couchbase/build.gradle b/micro-couchbase/build.gradle index 34f3b0326..1982eb22d 100644 --- a/micro-couchbase/build.gradle +++ b/micro-couchbase/build.gradle @@ -11,6 +11,7 @@ dependencies { compile project(':micro-manifest-comparator') compile project(':micro-core') compile project(':micro-guava') + compile project(':micro-events') testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version:'2.3.3' testCompile(group: 'org.spockframework', name: 'spock-core', version:'0.7-groovy-2.0') { exclude(module: 'groovy-all') } testCompile group: 'com.cyrusinnovation', name: 'mockito-groovy-support', version:'1.3' diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java new file mode 100644 index 000000000..22b79fbdc --- /dev/null +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java @@ -0,0 +1,18 @@ +package com.aol.micro.server.couchbase; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; + +@Configuration +public class ConfigureCacheTester { + + @Autowired + private CouchbaseConnectionTester couchbaseConnectionTester; + + + @Scheduled(fixedDelay = 60000) + public synchronized void runCouchbaseConnectionTester(){ + couchbaseConnectionTester.scheduleAndLog(); + } +} diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java index 162011c26..21d8708a4 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java @@ -7,25 +7,23 @@ import java.util.List; import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; -import com.aol.micro.server.couchbase.base.CouchbaseManifestComparator; +import com.aol.micro.server.couchbase.manifest.comparator.CouchbaseManifestComparator; import com.couchbase.client.CouchbaseClient; import com.couchbase.client.CouchbaseConnectionFactory; import com.couchbase.client.CouchbaseConnectionFactoryBuilder; import lombok.Setter; +@Slf4j @Configuration public class ConfigureCouchbase { - private Logger logger = LoggerFactory.getLogger(getClass()); - @Value("${couchbase.manifest.comparison.key:default-key}") private String defaultCouchbaseManifestComparisonKey; @Setter @@ -45,22 +43,33 @@ public class ConfigureCouchbase { @Value("${couchbaseClientOperationTimeout:120000}") private long opTimeout; + @Value("${distributed.cache.default.expiration:691200}") + private int expiresAfterSeconds; + + @Value("${distributed.cache.maxTry:5}") + private int maxTry; + + @Value("${distributed.cache.retryAfterSec:1}") + private int retryAfterSec; + @SuppressWarnings("rawtypes") @Bean(name = "couchbaseDistributedMap") - public CouchbaseDistributedMapClient simpleCouchbaseClient() throws IOException, URISyntaxException { + public CouchbaseDistributedCacheClient simpleCouchbaseClient() throws IOException, URISyntaxException { if (couchbaseClientEnabled) { - return new CouchbaseDistributedMapClient( - couchbaseClient()); + return new CouchbaseDistributedCacheClient( + couchbaseClient(), expiresAfterSeconds, maxTry, + retryAfterSec); } else { - return new CouchbaseDistributedMapClient( - null); + return new CouchbaseDistributedCacheClient( + null, expiresAfterSeconds, maxTry, + retryAfterSec); } } @Bean(name = "couchbaseClient") public CouchbaseClient couchbaseClient() throws IOException, URISyntaxException { if (couchbaseClientEnabled) { - logger.info("Creating CouchbaseClient for servers: {}", couchbaseServers); + log.info("Creating CouchbaseClient for servers: {}", couchbaseServers); CouchbaseConnectionFactoryBuilder builder = new CouchbaseConnectionFactoryBuilder(); builder.setOpTimeout(opTimeout); CouchbaseConnectionFactory cf = builder.buildCouchbaseConnection(getServersList(), couchbaseBucket, @@ -70,7 +79,6 @@ public CouchbaseClient couchbaseClient() throws IOException, URISyntaxException cf); } return null; - } @Bean diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java new file mode 100644 index 000000000..a732826df --- /dev/null +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java @@ -0,0 +1,55 @@ +package com.aol.micro.server.couchbase; + + +import com.aol.micro.server.distributed.DistributedCache; +import com.aol.micro.server.distributed.DistributedMap; +import com.aol.micro.server.events.ScheduledJob; +import com.aol.micro.server.events.SystemData; +import com.couchbase.client.CouchbaseClient; +import lombok.extern.slf4j.Slf4j; + +import java.util.Random; + +@Slf4j + public class CouchbaseConnectionTester implements ScheduledJob { + + private static final Random random = new Random(); + + private final DistributedCache cache; + private final CouchbaseClient couchbaseClient; + + public CouchbaseConnectionTester(DistributedCache cache, CouchbaseClient couchbaseClient) { + + this.cache = cache; + this.couchbaseClient = couchbaseClient; + } + + @Override + public SystemData scheduleAndLog() { + + log.trace("runTestConnection()..."); + boolean result = false; + try { + result = testConnection(); + } catch (RuntimeException e) { + log.debug("Could not connect to Cache" + e.getMessage()); + } + cache.setConnectionTested(result); + + log.debug("Testing Couchbase connection: {}", result); + return null; + + } + + private boolean testConnection() { + String key = "PING_TEST"; + log.trace("Testing connection using key {}", key); + + int testValue = random.nextInt(1111); + couchbaseClient.set(key, 120, testValue); + int received = (Integer) couchbaseClient.get(key); + + return received == testValue; + } + +} \ No newline at end of file diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedCacheClient.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedCacheClient.java new file mode 100644 index 000000000..6c2aefbee --- /dev/null +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedCacheClient.java @@ -0,0 +1,144 @@ +package com.aol.micro.server.couchbase; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import com.aol.cyclops2.util.ExceptionSoftener; +import com.aol.micro.server.distributed.DistributedCache; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.couchbase.client.CouchbaseClient; + +@Slf4j +public class CouchbaseDistributedCacheClient implements DistributedCache { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + private volatile boolean available = false; + + private final Optional couchbaseClient; + private final int expiresAfterSeconds, maxTry, retryAfterSec; + + public CouchbaseDistributedCacheClient(CouchbaseClient couchbaseClient, final int expiresAfterSeconds, + final int maxTry, final int retryAfterSec) { + + this.couchbaseClient = Optional.ofNullable(couchbaseClient); + this.expiresAfterSeconds = expiresAfterSeconds; + this.maxTry = maxTry; + this.retryAfterSec = retryAfterSec; + } + + @Override + public boolean put(final String key, final V value) { + + log.trace("put '{}', value:{}", key, value); + boolean success = false; + int tryCount = 0; + + do { + try { + if (tryCount > 0) { + Thread.sleep(retryAfterSec * 1000); + log.warn("retry #{}", tryCount); + } + tryCount++; + success = couchbaseClient.map(c -> putInternal(c, key, value)) + .orElse(false); + + } catch (final Exception e) { + + log.warn("memcache put: {}", e.getMessage()); + } + } while (!success && tryCount < maxTry); + + if (!success) { + log.error("Failed to place item in couchbase"); + } + if (success && tryCount > 1) { + log.info("Connection restored OK"); + } + + setConnectionTested(success); + + return success; + } + + @Override + public boolean put(final String key, int expiry, final V value) { + logger.debug("put '{}', value:{}", key, value); + boolean success = false; + int tryCount = 0; + + do { + try { + if (tryCount > 0) { + Thread.sleep(retryAfterSec * 1000); + log.warn("retry #{}", tryCount); + } + tryCount++; + success = couchbaseClient.map(c -> putInternalWithExpiry(c, key, value, expiry)) + .orElse(false); + } catch (final Exception e) { + + log.warn("memcache put: {}", e.getMessage()); + } + } while (!success && tryCount < maxTry); + + if (!success) { + log.error("Failed to place item in couchbase"); + } + if (success && tryCount > 1) { + log.info("Connection restored OK"); + } + + setConnectionTested(success); + + return success; + + } + + private boolean putInternalWithExpiry(final CouchbaseClient client, final String key, final V value, int expiry) { + + try { + return client.set(key,expiry, value) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw ExceptionSoftener.throwSoftenedException(e); + + } + } + + + private boolean putInternal(final CouchbaseClient client, final String key, final V value) { + + try { + return client.set(key, value) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw ExceptionSoftener.throwSoftenedException(e); + + } + } + + @Override + public Optional get(String key) { + return couchbaseClient.map(c -> (V) c.get(key)); + } + + @Override + public void delete(String key) { + couchbaseClient.map(c -> c.delete(key)); + } + + @Override + public boolean isAvailable(){ + return available; + } + + @Override + public void setConnectionTested(boolean result){ + available = result; + } + +} diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java deleted file mode 100644 index fc0dcc874..000000000 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.aol.micro.server.couchbase; - -import java.util.Optional; -import java.util.concurrent.ExecutionException; - -import com.aol.cyclops2.util.ExceptionSoftener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.aol.micro.server.distributed.DistributedMap; -import com.couchbase.client.CouchbaseClient; - -public class CouchbaseDistributedMapClient implements DistributedMap { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - private final Optional couchbaseClient; - - public CouchbaseDistributedMapClient(CouchbaseClient couchbaseClient) { - - this.couchbaseClient = Optional.ofNullable(couchbaseClient); - } - - @Override - public boolean put(final String key, final V value) { - logger.debug("put '{}', value:{}", key, value); - return couchbaseClient.map(c -> putInternal(c, key, value)) - .orElse(false); - - } - - private boolean putInternal(final CouchbaseClient client, final String key, final V value) { - - try { - return client.set(key, value) - .get(); - } catch (InterruptedException | ExecutionException e) { - throw ExceptionSoftener.throwSoftenedException(e); - - } - } - - @Override - public Optional get(String key) { - return couchbaseClient.map(c -> (V) c.get(key)); - - } - - @Override - public void delete(String key) { - couchbaseClient.map(c -> c.delete(key)); - } -} diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/base/CouchbaseManifestComparator.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java similarity index 96% rename from micro-couchbase/src/main/java/com/aol/micro/server/couchbase/base/CouchbaseManifestComparator.java rename to micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java index 37c7ea52e..161c0f6b4 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/base/CouchbaseManifestComparator.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java @@ -1,9 +1,10 @@ -package com.aol.micro.server.couchbase.base; +package com.aol.micro.server.couchbase.manifest.comparator; import java.util.Date; import java.util.Optional; import com.aol.cyclops2.util.ExceptionSoftener; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.control.Xor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +82,7 @@ public class CouchbaseManifestComparator implements ManifestComparator { @Getter private volatile String versionedKey; - private final DistributedMap connection; + private final DistributedCache connection; /** * Create a ManifestComparator with the supplied distributed map client Data @@ -92,7 +93,7 @@ public class CouchbaseManifestComparator implements ManifestComparator { * @param connection * DistributedMapClient to store comparison data */ - public CouchbaseManifestComparator(DistributedMap connection) { + public CouchbaseManifestComparator(DistributedCache connection) { this.key = "default"; this.versionedKey = newKey(1L).toJson(); this.connection = connection; @@ -110,7 +111,7 @@ public CouchbaseManifestComparator(DistributedMap connection) { * @param connection * DistributeMapClient connection */ - public CouchbaseManifestComparator(String key, DistributedMap connection) { + public CouchbaseManifestComparator(String key, DistributedCache connection) { this.key = key; this.versionedKey = newKey(1L).toJson(); this.connection = connection; diff --git a/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseResource.java b/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseResource.java index 6676cb2b7..a8338c5f8 100644 --- a/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseResource.java +++ b/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseResource.java @@ -3,6 +3,7 @@ import javax.ws.rs.GET; import javax.ws.rs.Path; +import com.aol.micro.server.distributed.DistributedCache; import org.springframework.beans.factory.annotation.Autowired; import com.aol.micro.server.auto.discovery.Rest; @@ -12,10 +13,10 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; @Autowired - public CouchbaseResource(DistributedMap client) { + public CouchbaseResource(DistributedCache client) { this.client = client; } diff --git a/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseRunnerTest.java b/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseRunnerTest.java index fe3159016..385bddfcb 100644 --- a/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseRunnerTest.java +++ b/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseRunnerTest.java @@ -48,7 +48,6 @@ public void stopServer() { } @Test - @Ignore public void runAppAndBasicTest() throws InterruptedException, ExecutionException { rest.get("http://localhost:8080/simple-app/couchbase/put"); assertThat(rest.get("http://localhost:8080/simple-app/couchbase/get"), containsString("world")); diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/ConfigureCouchbaseTest.java b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/ConfigureCouchbaseTest.java index 3ce542b9a..0d9492b70 100644 --- a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/ConfigureCouchbaseTest.java +++ b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/ConfigureCouchbaseTest.java @@ -8,11 +8,10 @@ import java.net.URISyntaxException; import java.util.Optional; +import com.aol.micro.server.distributed.DistributedCache; import org.junit.Before; import org.junit.Test; -import com.aol.micro.server.distributed.DistributedMap; - public class ConfigureCouchbaseTest { ConfigureCouchbase config; @@ -25,7 +24,7 @@ public void setUp() throws Exception { @Test public void createDistributedCacheMemcachedOff() throws IOException, URISyntaxException { config.setCouchbaseClientEnabled(false); - DistributedMap cache = config.simpleCouchbaseClient(); + DistributedCache cache = config.simpleCouchbaseClient(); assertThat(cache.get("hello"), is(Optional.empty())); } diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy index 0c24348e6..3c93a8451 100644 --- a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy +++ b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy @@ -1,4 +1,6 @@ -package com.aol.micro.server.couchbase; +package com.aol.micro.server.couchbase + +import com.aol.micro.server.distributed.DistributedCache; import static org.hamcrest.Matchers.is import static org.junit.Assert.* @@ -8,18 +10,17 @@ import org.junit.Before import org.junit.Test import org.mockito.Mockito -import com.aol.micro.server.distributed.DistributedMap; import com.couchbase.client.CouchbaseClient class SimpleCouchbaseClientConnectionTest { CouchbaseClient client - DistributedMap con + DistributedCache con @Before public void setup() { client = Mockito.mock(CouchbaseClient) - con = new CouchbaseDistributedMapClient(client) + con = new CouchbaseDistributedCacheClient(client,1,1,1) } @Test public void testDelete() { @@ -35,7 +36,7 @@ class SimpleCouchbaseClientConnectionTest { @Test public void testGetDistributedCacheDisabled() { - con = new CouchbaseDistributedMapClient(null) + con = new CouchbaseDistributedCacheClient(null,1,1,1) Optional result = con.get("key") assertThat(result, is(Optional.empty())) } diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorTest.groovy b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorTest.groovy deleted file mode 100644 index cc56352af..000000000 --- a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorTest.groovy +++ /dev/null @@ -1 +0,0 @@ -package com.aol.micro.server.couchbase.base; diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorKeyNotFoundExceptionTest.java b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorKeyNotFoundExceptionTest.java similarity index 90% rename from micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorKeyNotFoundExceptionTest.java rename to micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorKeyNotFoundExceptionTest.java index 2dd6fb910..919ed07ee 100644 --- a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorKeyNotFoundExceptionTest.java +++ b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorKeyNotFoundExceptionTest.java @@ -1,4 +1,4 @@ -package com.aol.micro.server.couchbase.base; +package com.aol.micro.server.couchbase.manifest.comparator; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorTest.groovy b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorTest.groovy new file mode 100644 index 000000000..570bcdbac --- /dev/null +++ b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorTest.groovy @@ -0,0 +1 @@ +package com.aol.micro.server.couchbase.manifest.comparator; diff --git a/micro-elasticache/build.gradle b/micro-elasticache/build.gradle index 565bb2089..054225854 100644 --- a/micro-elasticache/build.gradle +++ b/micro-elasticache/build.gradle @@ -8,6 +8,8 @@ dependencies { compile group: 'net.spy', name: 'spymemcached', version: '2.12.3' compile project(':micro-core') compile project(':micro-guava') + compile project(':micro-events') + compile project(':micro-manifest-comparator') testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version:'2.3.3' testCompile(group: 'org.spockframework', name: 'spock-core', version:'0.7-groovy-2.0') { exclude(module: 'groovy-all') } testCompile group: 'com.cyrusinnovation', name: 'mockito-groovy-support', version:'1.3' diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java new file mode 100644 index 000000000..007c397f8 --- /dev/null +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java @@ -0,0 +1,21 @@ +package com.aol.micro.server.elasticache; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; + +@Configuration +public class ConfigureCacheTester { + + @Autowired + private ElasticacheConnectionTester elasticacheConnectionTester; + + @Value("${elasticache.connection.checker.frequency:60000}") + private final int fixedDelay = 60000; + + @Scheduled(fixedDelay = fixedDelay) + public synchronized void runElasticacheConnectionTester(){ + elasticacheConnectionTester.scheduleAndLog(); + } +} diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java index f8076bb1a..f84c60062 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java @@ -1,29 +1,19 @@ package com.aol.micro.server.elasticache; - - +import com.aol.micro.server.distributed.DistributedCache; import lombok.extern.slf4j.Slf4j; -import net.spy.memcached.*; +import net.spy.memcached.MemcachedClient; import java.io.IOException; import java.net.URISyntaxException; -import java.util.ArrayList; - -import net.spy.memcached.auth.AuthDescriptor; -import net.spy.memcached.auth.PlainCallbackHandler; -import net.spy.memcached.MemcachedClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.util.StringUtils; import java.net.InetSocketAddress; -import java.util.List; -import java.util.Optional; - @Slf4j @Configuration public class ConfigureElasticache { @@ -47,7 +37,7 @@ public ConfigureElasticache( @Value("${elasticache.hostname:null}") String hostn @Bean(name = "transientCache") - public DistributedCacheManager transientCache() throws IOException, URISyntaxException { + public DistributedCache transientCache() throws IOException, URISyntaxException { try { log.info("Creating Memcached Data connection for elasticache cluster: {}", hostname); return new TransientElasticacheDataConnection(createMemcachedClient(), retryAfterSecs, maxRetries); diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/DistributedCacheManager.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/DistributedCacheManager.java deleted file mode 100644 index b0669ff8b..000000000 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/DistributedCacheManager.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.aol.micro.server.elasticache; -import java.util.Optional; - -public interface DistributedCacheManager { - void setConnectionTested(boolean result); - boolean isAvailable(); - boolean add(String key, int exp, V value); - Optional get(String key); -} diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java new file mode 100644 index 000000000..1b8a23bc8 --- /dev/null +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java @@ -0,0 +1,53 @@ +package com.aol.micro.server.elasticache; + +import com.aol.micro.server.distributed.DistributedCache; +import com.aol.micro.server.events.ScheduledJob; +import com.aol.micro.server.events.SystemData; +import lombok.extern.slf4j.Slf4j; +import net.spy.memcached.MemcachedClient; + +import java.util.Random; + +@Slf4j + public class ElasticacheConnectionTester implements ScheduledJob { + + private static final Random random = new Random(); + + private final DistributedCache cache; + private final MemcachedClient memcachedClient; + + public ElasticacheConnectionTester(DistributedCache cache, MemcachedClient memcachedClient) { + + this.cache = cache; + this.memcachedClient = memcachedClient; + } + + @Override + public SystemData scheduleAndLog() { + + log.trace("runTestConnection()..."); + boolean result = false; + try { + result = testConnection(); + } catch (RuntimeException e) { + log.error("Could not connect to Cache" + e.getMessage()); + } + cache.setConnectionTested(result); + + log.debug("Testing Couchbase connection: {}", result); + return null; + + } + + private boolean testConnection() { + String key = "PING_TEST"; + log.trace("Testing connection using key {}", key); + + int testValue = random.nextInt(1111); + memcachedClient.set(key, 120, testValue); + int received = (Integer) memcachedClient.get(key); + + return received == testValue; + } + +} \ No newline at end of file diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java index 5f7973892..745f6f940 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java @@ -1,29 +1,28 @@ package com.aol.micro.server.elasticache; +import com.aol.micro.server.distributed.DistributedCache; import lombok.extern.slf4j.Slf4j; - import java.util.Optional; import net.spy.memcached.MemcachedClient; -import lombok.extern.slf4j.Slf4j; - @Slf4j -public class TransientElasticacheDataConnection implements DistributedCacheManager { +public class TransientElasticacheDataConnection implements DistributedCache { - private volatile boolean available = false; private final MemcachedClient memcachedClient; private final int retryAfterSec; private final int maxTry; + private final int defaultExpiry = 3600; + private volatile boolean available = false; + - public TransientElasticacheDataConnection(MemcachedClient memcachedClient,int retryAfterSec, int maxTry) { + public TransientElasticacheDataConnection(MemcachedClient memcachedClient,int retryAfterSec, int maxTry) { this.memcachedClient = memcachedClient; this.retryAfterSec = retryAfterSec; this.maxTry = maxTry; } @Override - public boolean add(final String key, int exp, final Object value) { - + public boolean put(final String key, final Object value) { log.trace("Memcached add operation on key '{}', with value:{}", key, value); boolean success = false; int tryCount = 0; @@ -35,7 +34,7 @@ public boolean add(final String key, int exp, final Object value) { log.warn("retrying operation #{}", tryCount); } tryCount++; - success = memcachedClient.add(key, exp, value) + success = memcachedClient.add(key, defaultExpiry, value) .get(); } catch (final Exception e) { log.warn("memcache set: {}", e.getMessage()); @@ -48,23 +47,61 @@ public boolean add(final String key, int exp, final Object value) { if (success && tryCount > 1) { log.info("Connection restored OK to Elasticache cluster"); } - - available = success; + setConnectionTested(success); return success; } - @Override - public Optional get(String key) { - return (Optional) Optional.ofNullable(memcachedClient.get(key)); - } + @Override + public boolean put(final String key, int expiry, final Object value) { + log.trace("Memcached add operation on key '{}', with value:{}", key, value); + boolean success = false; + int tryCount = 0; - @Override - public boolean isAvailable() { - return available; + do { + try { + if (tryCount > 0) { + Thread.sleep(retryAfterSec * 1000); + log.warn("retrying operation #{}", tryCount); + } + tryCount++; + success = memcachedClient.add(key, expiry, value) + .get(); + } catch (final Exception e) { + log.warn("memcache set: {}", e.getMessage()); + } + } while (!success && tryCount < maxTry); + + if (!success) { + log.error("Failed to add key to Elasticache {}", key); + } + if (success && tryCount > 1) { + log.info("Connection restored OK to Elasticache cluster"); } + setConnectionTested(success); + return success; + } - @Override - public final void setConnectionTested(final boolean available) { - this.available = available; + + @Override + public Optional get(String key) { + return (Optional) Optional.ofNullable(memcachedClient.get(key)); } + + @Override + public void delete(String key) { + memcachedClient.delete(key); + + } + + @Override + public boolean isAvailable() { + return available; + } + + @Override + public void setConnectionTested(boolean result) { + available = result; + + } + } \ No newline at end of file diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java new file mode 100644 index 000000000..8a3270484 --- /dev/null +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java @@ -0,0 +1,277 @@ +package com.aol.micro.server.elasticache.manifest.comparator; + +import com.aol.cyclops2.util.ExceptionSoftener; +import com.aol.micro.server.distributed.DistributedCache; +import com.aol.micro.server.manifest.Data; +import com.aol.micro.server.manifest.ManifestComparator; +import com.aol.micro.server.manifest.ManifestComparatorKeyNotFoundException; +import com.aol.micro.server.manifest.VersionedKey; +import com.aol.micro.server.rest.jackson.JacksonUtil; +import cyclops.control.Xor; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Optional; + +@Slf4j +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class ElasticacheManifestComparator implements ManifestComparator { + + /** + * Manifest comparator for use with a distributed map -assumes single producer / + * multiple consumers + * + * Uses to entries in the map + * + * key : versioned key versioned key : actual data + * + * ManifestComparator stores the current version number, only when the version + * changes is the full data set loaded from the remote store. + * + * Usage as a Spring Bean - inject into the host class, and use withKey to + * customise for the targeted Key. + * + * + *
+     * {@code
+     * @Rest
+    public class MyDataService {
+
+
+
+    private final ManifestComparator comparator;
+    @Autowired
+    public  MyDataService(ManifestComparator comparator) {
+    this.comparator = comparator.withKey("test-key");
+    }
+     *
+     * }
+     * 
+ * + * micro-elasticache configures a single ManifestComparator bean that can be + * customized for multiple different keys via withKey + * + * When your bean is injected save via saveAndIncrement, and periodically call + * load() to refresh data if (and only if) it has changed. + * + * ManifestComparator will automatically remove old versions on + * saveAndIncrement, but system outages may occasionally cause old keys to + * linger, you can also use clean & cleanAll to periodically to remove old key + * versions. + * + * + * @author gordonmorrow + * + * @param + */ + + private final String key; + + private volatile Optional data = Optional.empty(); + + @Getter + private volatile String versionedKey; + private final DistributedCache connection; + + /** + * Create a ManifestComparator with the supplied distributed map client Data + * stored by ManifestComparator will be + * + * key : versioned key versioned key : actual data + * + * @param connection + * DistributedMapClient to store comparison data + */ + public ElasticacheManifestComparator(DistributedCache connection) { + this.key = "default"; + this.versionedKey = newKey(1L).toJson(); + this.connection = connection; + } + + /** + * Create a ManifestComparator with the supplied distributed map client + * + * Data stored by ManifestComparator will be + * + * key : versioned key versioned key : actual data + * + * @param key + * To store actual data with + * @param connection + * DistributeMapClient connection + */ + public ElasticacheManifestComparator(String key, DistributedCache connection) { + this.key = key; + this.versionedKey = newKey(1L).toJson(); + this.connection = connection; + } + + /** + * Create a new ManifestComparator with the same distributed map connection + * that targets a different key + * + * @param key + * Key to store data with + * @return new ManifestComparator that targets specified key + */ + @Override + public ElasticacheManifestComparator withKey(String key) { + return new ElasticacheManifestComparator<>( + key, connection); + } + + private VersionedKey newKey(Long version) { + return new VersionedKey( + key, version); + } + + private VersionedKey increment() { + VersionedKey currentVersionedKey = loadKeyFromElasticache(); + return currentVersionedKey.withVersion(currentVersionedKey.getVersion() + 1); + } + + private VersionedKey loadKeyFromElasticache() { + Optional optionalKey = connection.get(key); + return optionalKey.flatMap(val -> Optional.of(JacksonUtil.convertFromJson(val, VersionedKey.class))) + .orElse(newKey(0L)); + + } + + @Override + @SneakyThrows + public T getData() { + while (!data.isPresent()) { + Thread.sleep(500); + } + return data.get(); + } + + @Override + public T getCurrentData() { + return data.orElse(null); + } + + /** + * @return true - if current data is stale and needs refreshed + */ + @Override + public boolean isOutOfDate() { + + return !versionedKey.equals(loadKeyFromElasticache().toJson()); + } + + /** + * Load data from remote store if stale + */ + @Override + public synchronized boolean load() { + Optional oldData = data; + String oldKey = versionedKey; + try { + if (isOutOfDate()) { + String newVersionedKey = (String) connection.get(key) + .get(); + data = (Optional) nonAtomicload(newVersionedKey); + versionedKey = newVersionedKey; + } else { + return false; + } + } catch (Throwable e) { + data = oldData; + versionedKey = oldKey; + log.debug(e.getMessage(), e); + throw ExceptionSoftener.throwSoftenedException(e); + } + return true; + } + + @SuppressWarnings("unchecked") + private Object nonAtomicload(String newVersionedKey) throws Throwable { + Data data = (Data) connection.get(newVersionedKey) + .orElseThrow(() -> { + return new ManifestComparatorKeyNotFoundException( + "Missing versioned key " + + newVersionedKey + + " - likely data changed during read"); + }); + log.info("Loaded new data with date {} for key {}, versionedKey {}, versionedKey from data ", + new Object[] { data.getDate(), key, newVersionedKey, data.getVersionedKey() }); + return data.getData(); + } + + /** + * Clean all old (not current) versioned keys + */ + @Override + public void cleanAll() { + clean(-1); + } + + /** + * Clean specified number of old (not current) versioned keys) + * + * @param numberToClean + */ + @Override + public void clean(int numberToClean) { + log.info("Attempting to delete the last {} records for key {}", numberToClean, key); + VersionedKey currentVersionedKey = loadKeyFromElasticache(); + long start = 0; + if (numberToClean != -1) + start = currentVersionedKey.getVersion() - numberToClean; + for (long i = start; i < currentVersionedKey.getVersion(); i++) { + delete(currentVersionedKey.withVersion(i) + .toJson()); + } + log.info("Finished deleting the last {} records for key {}", numberToClean, key); + } + + private void delete(String withVersion) { + connection.delete(withVersion); + } + + /** + * Save provided data with the key this ManifestComparator manages bump the + * versioned key version. + * + * NB : To avoid race conditions - make sure only one service (an elected + * leader) can write at a time (see micro-mysql for a mysql distributed + * lock, or micro-curator for a curator / zookeeper distributed lock + * implementation). + * + * @param data + * to save + */ + @Override + public void saveAndIncrement(T data) { + Optional oldData = this.data; + VersionedKey newVersionedKey = increment(); + log.info("Saving data with key {}, new version is {}", key, newVersionedKey.toJson()); + connection.put(newVersionedKey.toJson(), new Data( + data, new Date(), newVersionedKey.toJson())); + connection.put(key, newVersionedKey.toJson()); + try { + this.data = (Optional) data; + delete(versionedKey); + + } catch (Throwable t) { + this.data = oldData; + } finally { + versionedKey = newVersionedKey.toJson(); + } + } + + @Override + public String toString() { + return "[ElasticacheManifestComparator:key:" + key + ",versionedKey:" + JacksonUtil.serializeToJson(versionedKey) + + "]"; + } + } + + diff --git a/micro-elasticache/src/test/java/com/aol/micros/server/elasticache/TransientElasticacheDataConnectionTest.java b/micro-elasticache/src/test/java/com/aol/micros/server/elasticache/TransientElasticacheDataConnectionTest.java index fa02fcd18..34180812f 100644 --- a/micro-elasticache/src/test/java/com/aol/micros/server/elasticache/TransientElasticacheDataConnectionTest.java +++ b/micro-elasticache/src/test/java/com/aol/micros/server/elasticache/TransientElasticacheDataConnectionTest.java @@ -40,21 +40,7 @@ public void notExistingKeyGetTest() { @Test public void notExistingKeyPutTest() { TransientElasticacheDataConnection transientClient = new TransientElasticacheDataConnection(memcachedClient, 3, 1); - assertEquals(false, transientClient.add("keyAdd", 3600, "valueadd")); - } - - @Test - public void testIsAvailableFalse() { - TransientElasticacheDataConnection transientClient = new TransientElasticacheDataConnection(memcachedClient, 3, 1); - transientClient.setConnectionTested(false); - assertEquals(false, transientClient.isAvailable()); - } - - @Test - public void testIsAvailableTrue() { - TransientElasticacheDataConnection transientClient = new TransientElasticacheDataConnection(memcachedClient, 3, 1); - transientClient.setConnectionTested(true); - assertEquals(true, transientClient.isAvailable()); + assertEquals(false, transientClient.put("keyAdd", "valueadd")); } diff --git a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java new file mode 100644 index 000000000..81dcdb5a1 --- /dev/null +++ b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java @@ -0,0 +1,10 @@ +package com.aol.micro.server.distributed; + + +import java.util.Optional; + +public interface DistributedCache extends DistributedMap { + boolean put(String key, int expiry, V value); + boolean isAvailable(); + void setConnectionTested(boolean result); +} diff --git a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java index 9dd77084b..03cf7494a 100644 --- a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java +++ b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java @@ -3,11 +3,7 @@ import java.util.Optional; public interface DistributedMap { - boolean put(String key, V value); - Optional get(String key); - void delete(String key); - } \ No newline at end of file