From 78c73da85562b282fd756f528d9bb3fd5e3d579f Mon Sep 17 00:00:00 2001 From: Gordon Morrow Date: Sun, 2 Jul 2017 18:37:45 +0100 Subject: [PATCH 1/8] Adding in ManifestComparator to Elasticcache --- gradle.properties | 2 +- .../server/application/registry/JobTest.java | 8 + micro-couchbase/build.gradle | 1 + .../server/couchbase/ConfigureCouchbase.java | 2 +- .../CouchbaseDistributedMapClient.java | 21 ++ .../CouchbaseManifestComparator.java | 2 +- .../aol/micro/server/CouchbaseRunnerTest.java | 1 - .../second/ManifestComparatorRunnerTest.java | 1 + .../base/ManifestComparatorTest.groovy | 1 - ...estComparatorKeyNotFoundExceptionTest.java | 2 +- .../comparator/ManifestComparatorTest.groovy | 1 + micro-elasticache/build.gradle | 1 + .../elasticache/ConfigureElasticache.java | 3 +- .../elasticache/DistributedCacheManager.java | 9 - .../TransientElasticacheDataConnection.java | 53 +++- .../ElasticacheManifestComparator.java | 279 ++++++++++++++++++ ...ransientElasticacheDataConnectionTest.java | 16 +- .../server/distributed/DistributedMap.java | 5 +- 18 files changed, 360 insertions(+), 48 deletions(-) create mode 100644 micro-application-register/src/test/java/com/aol/micro/server/application/registry/JobTest.java rename micro-couchbase/src/main/java/com/aol/micro/server/couchbase/{base => manifest/comparator}/CouchbaseManifestComparator.java (99%) delete mode 100644 micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorTest.groovy rename micro-couchbase/src/test/java/com/aol/micro/server/couchbase/{base => manifest/comparator}/ManifestComparatorKeyNotFoundExceptionTest.java (90%) create mode 100644 micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorTest.groovy delete mode 100644 micro-elasticache/src/main/java/com/aol/micro/server/elasticache/DistributedCacheManager.java create mode 100644 micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java diff --git a/gradle.properties b/gradle.properties index 7972f1b76..e2026f8cc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=0.91.3 +version=0.91.4 springVersion=4.3.3.RELEASE springBootVersion=1.4.1.RELEASE jerseyVersion=2.24 diff --git a/micro-application-register/src/test/java/com/aol/micro/server/application/registry/JobTest.java b/micro-application-register/src/test/java/com/aol/micro/server/application/registry/JobTest.java new file mode 100644 index 000000000..16268b150 --- /dev/null +++ b/micro-application-register/src/test/java/com/aol/micro/server/application/registry/JobTest.java @@ -0,0 +1,8 @@ +import static org.junit.Assert.*; + +/** + * Created by gordonmorrow on 26/06/2017. + */ +public class JobTest { + +} \ No newline at end of file diff --git a/micro-couchbase/build.gradle b/micro-couchbase/build.gradle index 34f3b0326..1982eb22d 100644 --- a/micro-couchbase/build.gradle +++ b/micro-couchbase/build.gradle @@ -11,6 +11,7 @@ dependencies { compile project(':micro-manifest-comparator') compile project(':micro-core') compile project(':micro-guava') + compile project(':micro-events') testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version:'2.3.3' testCompile(group: 'org.spockframework', name: 'spock-core', version:'0.7-groovy-2.0') { exclude(module: 'groovy-all') } testCompile group: 'com.cyrusinnovation', name: 'mockito-groovy-support', version:'1.3' diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java index 162011c26..ecdc5d1b0 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java @@ -14,7 +14,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; -import com.aol.micro.server.couchbase.base.CouchbaseManifestComparator; +import com.aol.micro.server.couchbase.manifest.comparator.CouchbaseManifestComparator; import com.couchbase.client.CouchbaseClient; import com.couchbase.client.CouchbaseConnectionFactory; import com.couchbase.client.CouchbaseConnectionFactoryBuilder; diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java index fc0dcc874..7935ed8e0 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java @@ -29,6 +29,26 @@ public boolean put(final String key, final V value) { } + @Override + public boolean put(final String key, int expiry, final V value) { + logger.debug("put '{}', value:{}", key, value); + return couchbaseClient.map(c -> putInternalWithExpiry(c, key, value, expiry)) + .orElse(false); + + } + + private boolean putInternalWithExpiry(final CouchbaseClient client, final String key, final V value, int expiry) { + + try { + return client.set(key,expiry, value) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw ExceptionSoftener.throwSoftenedException(e); + + } + } + + private boolean putInternal(final CouchbaseClient client, final String key, final V value) { try { @@ -50,4 +70,5 @@ public Optional get(String key) { public void delete(String key) { couchbaseClient.map(c -> c.delete(key)); } + } diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/base/CouchbaseManifestComparator.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java similarity index 99% rename from micro-couchbase/src/main/java/com/aol/micro/server/couchbase/base/CouchbaseManifestComparator.java rename to micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java index 37c7ea52e..328fea2b2 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/base/CouchbaseManifestComparator.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java @@ -1,4 +1,4 @@ -package com.aol.micro.server.couchbase.base; +package com.aol.micro.server.couchbase.manifest.comparator; import java.util.Date; import java.util.Optional; diff --git a/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseRunnerTest.java b/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseRunnerTest.java index fe3159016..385bddfcb 100644 --- a/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseRunnerTest.java +++ b/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseRunnerTest.java @@ -48,7 +48,6 @@ public void stopServer() { } @Test - @Ignore public void runAppAndBasicTest() throws InterruptedException, ExecutionException { rest.get("http://localhost:8080/simple-app/couchbase/put"); assertThat(rest.get("http://localhost:8080/simple-app/couchbase/get"), containsString("world")); diff --git a/micro-couchbase/src/test/java/app/couchbase/manifest/comparator/com/aol/micro/server/second/ManifestComparatorRunnerTest.java b/micro-couchbase/src/test/java/app/couchbase/manifest/comparator/com/aol/micro/server/second/ManifestComparatorRunnerTest.java index e82d52397..72650bc76 100644 --- a/micro-couchbase/src/test/java/app/couchbase/manifest/comparator/com/aol/micro/server/second/ManifestComparatorRunnerTest.java +++ b/micro-couchbase/src/test/java/app/couchbase/manifest/comparator/com/aol/micro/server/second/ManifestComparatorRunnerTest.java @@ -8,6 +8,7 @@ import org.couchbase.mock.CouchbaseMock; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import com.aol.micro.server.MicroserverApp; diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorTest.groovy b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorTest.groovy deleted file mode 100644 index cc56352af..000000000 --- a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorTest.groovy +++ /dev/null @@ -1 +0,0 @@ -package com.aol.micro.server.couchbase.base; diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorKeyNotFoundExceptionTest.java b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorKeyNotFoundExceptionTest.java similarity index 90% rename from micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorKeyNotFoundExceptionTest.java rename to micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorKeyNotFoundExceptionTest.java index 2dd6fb910..919ed07ee 100644 --- a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/base/ManifestComparatorKeyNotFoundExceptionTest.java +++ b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorKeyNotFoundExceptionTest.java @@ -1,4 +1,4 @@ -package com.aol.micro.server.couchbase.base; +package com.aol.micro.server.couchbase.manifest.comparator; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorTest.groovy b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorTest.groovy new file mode 100644 index 000000000..570bcdbac --- /dev/null +++ b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/manifest/comparator/ManifestComparatorTest.groovy @@ -0,0 +1 @@ +package com.aol.micro.server.couchbase.manifest.comparator; diff --git a/micro-elasticache/build.gradle b/micro-elasticache/build.gradle index 565bb2089..4d29a8a78 100644 --- a/micro-elasticache/build.gradle +++ b/micro-elasticache/build.gradle @@ -8,6 +8,7 @@ dependencies { compile group: 'net.spy', name: 'spymemcached', version: '2.12.3' compile project(':micro-core') compile project(':micro-guava') + compile project(':micro-manifest-comparator') testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version:'2.3.3' testCompile(group: 'org.spockframework', name: 'spock-core', version:'0.7-groovy-2.0') { exclude(module: 'groovy-all') } testCompile group: 'com.cyrusinnovation', name: 'mockito-groovy-support', version:'1.3' diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java index f8076bb1a..6947e8f16 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java @@ -2,6 +2,7 @@ +import com.aol.micro.server.distributed.DistributedMap; import lombok.extern.slf4j.Slf4j; import net.spy.memcached.*; @@ -47,7 +48,7 @@ public ConfigureElasticache( @Value("${elasticache.hostname:null}") String hostn @Bean(name = "transientCache") - public DistributedCacheManager transientCache() throws IOException, URISyntaxException { + public DistributedMap transientCache() throws IOException, URISyntaxException { try { log.info("Creating Memcached Data connection for elasticache cluster: {}", hostname); return new TransientElasticacheDataConnection(createMemcachedClient(), retryAfterSecs, maxRetries); diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/DistributedCacheManager.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/DistributedCacheManager.java deleted file mode 100644 index b0669ff8b..000000000 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/DistributedCacheManager.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.aol.micro.server.elasticache; -import java.util.Optional; - -public interface DistributedCacheManager { - void setConnectionTested(boolean result); - boolean isAvailable(); - boolean add(String key, int exp, V value); - Optional get(String key); -} diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java index 5f7973892..7cf6a0a11 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java @@ -5,15 +5,16 @@ import java.util.Optional; import net.spy.memcached.MemcachedClient; import lombok.extern.slf4j.Slf4j; +import com.aol.micro.server.distributed.DistributedMap; @Slf4j -public class TransientElasticacheDataConnection implements DistributedCacheManager { +public class TransientElasticacheDataConnection implements DistributedMap { - private volatile boolean available = false; private final MemcachedClient memcachedClient; private final int retryAfterSec; private final int maxTry; + private final int defaultExpiry = 3600; public TransientElasticacheDataConnection(MemcachedClient memcachedClient,int retryAfterSec, int maxTry) { this.memcachedClient = memcachedClient; @@ -22,8 +23,7 @@ public TransientElasticacheDataConnection(MemcachedClient memcachedClient,int re } @Override - public boolean add(final String key, int exp, final Object value) { - + public boolean put(final String key, final Object value) { log.trace("Memcached add operation on key '{}', with value:{}", key, value); boolean success = false; int tryCount = 0; @@ -35,7 +35,7 @@ public boolean add(final String key, int exp, final Object value) { log.warn("retrying operation #{}", tryCount); } tryCount++; - success = memcachedClient.add(key, exp, value) + success = memcachedClient.add(key, defaultExpiry, value) .get(); } catch (final Exception e) { log.warn("memcache set: {}", e.getMessage()); @@ -48,23 +48,46 @@ public boolean add(final String key, int exp, final Object value) { if (success && tryCount > 1) { log.info("Connection restored OK to Elasticache cluster"); } - - available = success; return success; } + @Override + public boolean put(final String key, int expiry, final Object value) { + log.trace("Memcached add operation on key '{}', with value:{}", key, value); + boolean success = false; + int tryCount = 0; + + do { + try { + if (tryCount > 0) { + Thread.sleep(retryAfterSec * 1000); + log.warn("retrying operation #{}", tryCount); + } + tryCount++; + success = memcachedClient.add(key, expiry, value) + .get(); + } catch (final Exception e) { + log.warn("memcache set: {}", e.getMessage()); + } + } while (!success && tryCount < maxTry); + + if (!success) { + log.error("Failed to add key to Elasticache {}", key); + } + if (success && tryCount > 1) { + log.info("Connection restored OK to Elasticache cluster"); + } + return success; + } + @Override public Optional get(String key) { return (Optional) Optional.ofNullable(memcachedClient.get(key)); } - @Override - public boolean isAvailable() { - return available; - } + @Override + public void delete(String key) { + + } - @Override - public final void setConnectionTested(final boolean available) { - this.available = available; - } } \ No newline at end of file diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java new file mode 100644 index 000000000..97e81cc02 --- /dev/null +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java @@ -0,0 +1,279 @@ +package com.aol.micro.server.elasticache.manifest.comparator; + +import com.aol.cyclops2.util.ExceptionSoftener; +import com.aol.micro.server.distributed.DistributedMap; +import com.aol.micro.server.manifest.Data; +import com.aol.micro.server.manifest.ManifestComparator; +import com.aol.micro.server.manifest.ManifestComparatorKeyNotFoundException; +import com.aol.micro.server.manifest.VersionedKey; +import com.aol.micro.server.rest.jackson.JacksonUtil; +import cyclops.control.Xor; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.SneakyThrows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Optional; + +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class ElasticacheManifestComparator implements ManifestComparator { + + /** + * Manifest comparator for use with a distributed map -assumes single producer / + * multiple consumers + * + * Uses to entries in the map + * + * key : versioned key versioned key : actual data + * + * ManifestComparator stores the current version number, only when the version + * changes is the full data set loaded from the remote store. + * + * Usage as a Spring Bean - inject into the host class, and use withKey to + * customise for the targeted Key. + * + * + *
+     * {@code
+     * @Rest
+    public class MyDataService {
+
+
+
+    private final ManifestComparator comparator;
+    @Autowired
+    public  MyDataService(ManifestComparator comparator) {
+    this.comparator = comparator.withKey("test-key");
+    }
+     *
+     * }
+     * 
+ * + * micro-elasticache configures a single ManifestComparator bean that can be + * customized for multiple different keys via withKey + * + * When your bean is injected save via saveAndIncrement, and periodically call + * load() to refresh data if (and only if) it has changed. + * + * ManifestComparator will automatically remove old versions on + * saveAndIncrement, but system outages may occasionally cause old keys to + * linger, you can also use clean & cleanAll to periodically to remove old key + * versions. + * + * + * @author gordonmorrow + * + * @param + */ + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private final String key; + + private volatile Xor data = Xor.secondary(null); // Void represents + // an unitialized + // state + + @Getter + private volatile String versionedKey; + private final DistributedMap connection; + + /** + * Create a ManifestComparator with the supplied distributed map client Data + * stored by ManifestComparator will be + * + * key : versioned key versioned key : actual data + * + * @param connection + * DistributedMapClient to store comparison data + */ + public ElasticacheManifestComparator(DistributedMap connection) { + this.key = "default"; + this.versionedKey = newKey(1L).toJson(); + this.connection = connection; + } + + /** + * Create a ManifestComparator with the supplied distributed map client + * + * Data stored by ManifestComparator will be + * + * key : versioned key versioned key : actual data + * + * @param key + * To store actual data with + * @param connection + * DistributeMapClient connection + */ + public ElasticacheManifestComparator(String key, DistributedMap connection) { + this.key = key; + this.versionedKey = newKey(1L).toJson(); + this.connection = connection; + } + + /** + * Create a new ManifestComparator with the same distributed map connection + * that targets a different key + * + * @param key + * Key to store data with + * @return new ManifestComparator that targets specified key + */ + @Override + public ElasticacheManifestComparator withKey(String key) { + return new ElasticacheManifestComparator<>( + key, connection); + } + + private VersionedKey newKey(Long version) { + return new VersionedKey( + key, version); + } + + private VersionedKey increment() { + VersionedKey currentVersionedKey = loadKeyFromCouchbase(); + return currentVersionedKey.withVersion(currentVersionedKey.getVersion() + 1); + } + + private VersionedKey loadKeyFromCouchbase() { + Optional optionalKey = connection.get(key); + return optionalKey.flatMap(val -> Optional.of(JacksonUtil.convertFromJson(val, VersionedKey.class))) + .orElse(newKey(0L)); + + } + + @Override + @SneakyThrows + public T getData() { + while (data.isSecondary()) { + Thread.sleep(500); + } + return data.get(); + } + + @Override + public T getCurrentData() { + return data.visit(present -> present, () -> null); + } + + /** + * @return true - if current data is stale and needs refreshed + */ + @Override + public boolean isOutOfDate() { + + return !versionedKey.equals(loadKeyFromCouchbase().toJson()); + } + + /** + * Load data from remote store if stale + */ + @Override + public synchronized boolean load() { + Xor oldData = data; + String oldKey = versionedKey; + try { + if (isOutOfDate()) { + String newVersionedKey = (String) connection.get(key) + .get(); + data = Xor.primary((T) nonAtomicload(newVersionedKey)); + versionedKey = newVersionedKey; + } else { + return false; + } + } catch (Throwable e) { + data = oldData; + versionedKey = oldKey; + logger.debug(e.getMessage(), e); + throw ExceptionSoftener.throwSoftenedException(e); + } + return true; + } + + @SuppressWarnings("unchecked") + private Object nonAtomicload(String newVersionedKey) throws Throwable { + Data data = (Data) connection.get(newVersionedKey) + .orElseThrow(() -> { + return new ManifestComparatorKeyNotFoundException( + "Missing versioned key " + + newVersionedKey + + " - likely data changed during read"); + }); + logger.info("Loaded new data with date {} for key {}, versionedKey {}, versionedKey from data ", + new Object[] { data.getDate(), key, newVersionedKey, data.getVersionedKey() }); + return data.getData(); + } + + /** + * Clean all old (not current) versioned keys + */ + @Override + public void cleanAll() { + clean(-1); + } + + /** + * Clean specified number of old (not current) versioned keys) + * + * @param numberToClean + */ + @Override + public void clean(int numberToClean) { + logger.info("Attempting to delete the last {} records for key {}", numberToClean, key); + VersionedKey currentVersionedKey = loadKeyFromCouchbase(); + long start = 0; + if (numberToClean != -1) + start = currentVersionedKey.getVersion() - numberToClean; + for (long i = start; i < currentVersionedKey.getVersion(); i++) { + delete(currentVersionedKey.withVersion(i) + .toJson()); + } + logger.info("Finished deleting the last {} records for key {}", numberToClean, key); + } + + private void delete(String withVersion) { + connection.delete(withVersion); + } + + /** + * Save provided data with the key this ManifestComparator manages bump the + * versioned key version. + * + * NB : To avoid race conditions - make sure only one service (an elected + * leader) can write at a time (see micro-mysql for a mysql distributed + * lock, or micro-curator for a curator / zookeeper distributed lock + * implementation). + * + * @param data + * to save + */ + @Override + public void saveAndIncrement(T data) { + Xor oldData = this.data; + VersionedKey newVersionedKey = increment(); + logger.info("Saving data with key {}, new version is {}", key, newVersionedKey.toJson()); + connection.put(newVersionedKey.toJson(), new Data( + data, new Date(), newVersionedKey.toJson())); + connection.put(key, newVersionedKey.toJson()); + try { + this.data = Xor.primary(data); + delete(versionedKey); + + } catch (Throwable t) { + this.data = oldData; + } finally { + versionedKey = newVersionedKey.toJson(); + } + } + + @Override + public String toString() { + return "[ElasticacheManifestComparator:key:" + key + ",versionedKey:" + JacksonUtil.serializeToJson(versionedKey) + + "]"; + } + } + + diff --git a/micro-elasticache/src/test/java/com/aol/micros/server/elasticache/TransientElasticacheDataConnectionTest.java b/micro-elasticache/src/test/java/com/aol/micros/server/elasticache/TransientElasticacheDataConnectionTest.java index fa02fcd18..34180812f 100644 --- a/micro-elasticache/src/test/java/com/aol/micros/server/elasticache/TransientElasticacheDataConnectionTest.java +++ b/micro-elasticache/src/test/java/com/aol/micros/server/elasticache/TransientElasticacheDataConnectionTest.java @@ -40,21 +40,7 @@ public void notExistingKeyGetTest() { @Test public void notExistingKeyPutTest() { TransientElasticacheDataConnection transientClient = new TransientElasticacheDataConnection(memcachedClient, 3, 1); - assertEquals(false, transientClient.add("keyAdd", 3600, "valueadd")); - } - - @Test - public void testIsAvailableFalse() { - TransientElasticacheDataConnection transientClient = new TransientElasticacheDataConnection(memcachedClient, 3, 1); - transientClient.setConnectionTested(false); - assertEquals(false, transientClient.isAvailable()); - } - - @Test - public void testIsAvailableTrue() { - TransientElasticacheDataConnection transientClient = new TransientElasticacheDataConnection(memcachedClient, 3, 1); - transientClient.setConnectionTested(true); - assertEquals(true, transientClient.isAvailable()); + assertEquals(false, transientClient.put("keyAdd", "valueadd")); } diff --git a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java index 9dd77084b..1e9028f4b 100644 --- a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java +++ b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java @@ -5,9 +5,10 @@ public interface DistributedMap { boolean put(String key, V value); - Optional get(String key); - void delete(String key); + default boolean put(String key, int expiry, V value){ + return false; + } } \ No newline at end of file From d1baf274bdb43b252f463d3dc836571666ea8a70 Mon Sep 17 00:00:00 2001 From: Gordon Morrow Date: Sun, 2 Jul 2017 18:45:05 +0100 Subject: [PATCH 2/8] Cleaning up code --- .../aol/micro/server/application/registry/JobTest.java | 8 -------- .../micro/server/second/ManifestComparatorRunnerTest.java | 1 - .../elasticache/TransientElasticacheDataConnection.java | 1 + 3 files changed, 1 insertion(+), 9 deletions(-) delete mode 100644 micro-application-register/src/test/java/com/aol/micro/server/application/registry/JobTest.java diff --git a/micro-application-register/src/test/java/com/aol/micro/server/application/registry/JobTest.java b/micro-application-register/src/test/java/com/aol/micro/server/application/registry/JobTest.java deleted file mode 100644 index 16268b150..000000000 --- a/micro-application-register/src/test/java/com/aol/micro/server/application/registry/JobTest.java +++ /dev/null @@ -1,8 +0,0 @@ -import static org.junit.Assert.*; - -/** - * Created by gordonmorrow on 26/06/2017. - */ -public class JobTest { - -} \ No newline at end of file diff --git a/micro-couchbase/src/test/java/app/couchbase/manifest/comparator/com/aol/micro/server/second/ManifestComparatorRunnerTest.java b/micro-couchbase/src/test/java/app/couchbase/manifest/comparator/com/aol/micro/server/second/ManifestComparatorRunnerTest.java index 72650bc76..e82d52397 100644 --- a/micro-couchbase/src/test/java/app/couchbase/manifest/comparator/com/aol/micro/server/second/ManifestComparatorRunnerTest.java +++ b/micro-couchbase/src/test/java/app/couchbase/manifest/comparator/com/aol/micro/server/second/ManifestComparatorRunnerTest.java @@ -8,7 +8,6 @@ import org.couchbase.mock.CouchbaseMock; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import com.aol.micro.server.MicroserverApp; diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java index 7cf6a0a11..a446ad486 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java @@ -87,6 +87,7 @@ public Optional get(String key) { @Override public void delete(String key) { + memcachedClient.delete(key); } From b1de0e0f24e9f838129f7a9b028a21a3eb4a47be Mon Sep 17 00:00:00 2001 From: Gordon Morrow Date: Sun, 2 Jul 2017 19:01:16 +0100 Subject: [PATCH 3/8] Removing compile micro-events --- micro-couchbase/build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/micro-couchbase/build.gradle b/micro-couchbase/build.gradle index 1982eb22d..34f3b0326 100644 --- a/micro-couchbase/build.gradle +++ b/micro-couchbase/build.gradle @@ -11,7 +11,6 @@ dependencies { compile project(':micro-manifest-comparator') compile project(':micro-core') compile project(':micro-guava') - compile project(':micro-events') testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version:'2.3.3' testCompile(group: 'org.spockframework', name: 'spock-core', version:'0.7-groovy-2.0') { exclude(module: 'groovy-all') } testCompile group: 'com.cyrusinnovation', name: 'mockito-groovy-support', version:'1.3' From ab47860842188cb3883a39323034f3d65c32b3d8 Mon Sep 17 00:00:00 2001 From: Gordon Morrow Date: Mon, 3 Jul 2017 13:55:12 +0100 Subject: [PATCH 4/8] Adding in connection tester to both elasticache and couchbase --- micro-couchbase/build.gradle | 1 + .../couchbase/ConfigureCacheTester.java | 21 ++++++ .../server/couchbase/ConfigureCouchbase.java | 15 +++- .../couchbase/CouchbaseConnectionTester.java | 57 +++++++++++++++ .../CouchbaseDistributedMapClient.java | 71 +++++++++++++++++-- ...SimpleCouchbaseClientConnectionTest.groovy | 4 +- micro-elasticache/build.gradle | 1 + .../elasticache/ConfigureCacheTester.java | 21 ++++++ .../elasticache/ConfigureElasticache.java | 11 +-- .../ElasticacheConnectionTester.java | 57 +++++++++++++++ .../TransientElasticacheDataConnection.java | 7 +- .../server/distributed/DistributedMap.java | 4 +- 12 files changed, 245 insertions(+), 25 deletions(-) create mode 100644 micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java create mode 100644 micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java create mode 100644 micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java create mode 100644 micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java diff --git a/micro-couchbase/build.gradle b/micro-couchbase/build.gradle index 34f3b0326..1982eb22d 100644 --- a/micro-couchbase/build.gradle +++ b/micro-couchbase/build.gradle @@ -11,6 +11,7 @@ dependencies { compile project(':micro-manifest-comparator') compile project(':micro-core') compile project(':micro-guava') + compile project(':micro-events') testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version:'2.3.3' testCompile(group: 'org.spockframework', name: 'spock-core', version:'0.7-groovy-2.0') { exclude(module: 'groovy-all') } testCompile group: 'com.cyrusinnovation', name: 'mockito-groovy-support', version:'1.3' diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java new file mode 100644 index 000000000..71e5dc8df --- /dev/null +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java @@ -0,0 +1,21 @@ +package com.aol.micro.server.couchbase; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; + +/** + * Created by gordonmorrow on 03/07/2017. + */ +@Configuration +public class ConfigureCacheTester { + + @Autowired + private CouchbaseConnectionTester couchbaseConnectionTester; + + + @Scheduled(fixedDelay = 60000) + public synchronized void runCouchbaseConnectionTester(){ + couchbaseConnectionTester.scheduleAndLog(); + } +} diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java index ecdc5d1b0..92a18cf87 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java @@ -45,15 +45,26 @@ public class ConfigureCouchbase { @Value("${couchbaseClientOperationTimeout:120000}") private long opTimeout; + @Value("${distributed.cache.default.expiration:691200}") + private int expiresAfterSeconds = 691200; + + @Value("${distributed.cache.maxTry:5}") + private int maxTry = 5; + + @Value("${distributed.cache.retryAfterSec:1}") + private int retryAfterSec = 1; + @SuppressWarnings("rawtypes") @Bean(name = "couchbaseDistributedMap") public CouchbaseDistributedMapClient simpleCouchbaseClient() throws IOException, URISyntaxException { if (couchbaseClientEnabled) { return new CouchbaseDistributedMapClient( - couchbaseClient()); + couchbaseClient(), expiresAfterSeconds, maxTry, + retryAfterSec); } else { return new CouchbaseDistributedMapClient( - null); + null, expiresAfterSeconds, maxTry, + retryAfterSec); } } diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java new file mode 100644 index 000000000..fa378f676 --- /dev/null +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java @@ -0,0 +1,57 @@ +package com.aol.micro.server.couchbase; + +/** + * Created by gordonmorrow on 03/07/2017. + */ + +import com.aol.micro.server.distributed.DistributedMap; +import com.aol.micro.server.events.ScheduledJob; +import com.aol.micro.server.events.SystemData; +import com.couchbase.client.CouchbaseClient; +import lombok.extern.slf4j.Slf4j; + +import java.util.Random; + +@Slf4j + public class CouchbaseConnectionTester implements ScheduledJob { + + private static final Random random = new Random(); + + private final DistributedMap cache; + private final CouchbaseClient couchbaseClient; + + public CouchbaseConnectionTester(DistributedMap cache, CouchbaseClient couchbaseClient) { + + this.cache = cache; + this.couchbaseClient = couchbaseClient; + } + + @Override + public SystemData scheduleAndLog() { + + log.trace("runTestConnection()..."); + boolean result = false; + try { + result = testConnection(); + } catch (RuntimeException e) { + log.debug("Could not connect to Cache" + e.getMessage()); + } + cache.setConnectionTested(result); + + log.debug("Testing Couchbase connection: {}", result); + return null; + + } + + private boolean testConnection() { + String key = "PING_TEST"; + log.trace("Testing connection using key {}", key); + + int testValue = random.nextInt(1111); + couchbaseClient.set(key, 120, testValue); + int received = (Integer) couchbaseClient.get(key); + + return received == testValue; + } + +} \ No newline at end of file diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java index 7935ed8e0..b7738638f 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java @@ -4,36 +4,97 @@ import java.util.concurrent.ExecutionException; import com.aol.cyclops2.util.ExceptionSoftener; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.aol.micro.server.distributed.DistributedMap; import com.couchbase.client.CouchbaseClient; +@Slf4j public class CouchbaseDistributedMapClient implements DistributedMap { private final Logger logger = LoggerFactory.getLogger(getClass()); + private volatile boolean available = false; private final Optional couchbaseClient; + private final int expiresAfterSeconds, maxTry, retryAfterSec; - public CouchbaseDistributedMapClient(CouchbaseClient couchbaseClient) { + public CouchbaseDistributedMapClient(CouchbaseClient couchbaseClient, final int expiresAfterSeconds, + final int maxTry, final int retryAfterSec) { this.couchbaseClient = Optional.ofNullable(couchbaseClient); + this.expiresAfterSeconds = expiresAfterSeconds; + this.maxTry = maxTry; + this.retryAfterSec = retryAfterSec; } @Override public boolean put(final String key, final V value) { - logger.debug("put '{}', value:{}", key, value); - return couchbaseClient.map(c -> putInternal(c, key, value)) - .orElse(false); + log.trace("put '{}', value:{}", key, value); + boolean success = false; + int tryCount = 0; + + do { + try { + if (tryCount > 0) { + Thread.sleep(retryAfterSec * 1000); + log.warn("retry #{}", tryCount); + } + tryCount++; + success = couchbaseClient.map(c -> putInternal(c, key, value)) + .orElse(false); + + } catch (final Exception e) { + + log.warn("memcache put: {}", e.getMessage()); + } + } while (!success && tryCount < maxTry); + + if (!success) { + log.error("Failed to place item in couchbase"); + } + if (success && tryCount > 1) { + log.info("Connection restored OK"); + } + + available = success; + + return success; } @Override public boolean put(final String key, int expiry, final V value) { logger.debug("put '{}', value:{}", key, value); - return couchbaseClient.map(c -> putInternalWithExpiry(c, key, value, expiry)) + boolean success = false; + int tryCount = 0; + + do { + try { + if (tryCount > 0) { + Thread.sleep(retryAfterSec * 1000); + log.warn("retry #{}", tryCount); + } + tryCount++; + success = couchbaseClient.map(c -> putInternalWithExpiry(c, key, value, expiry)) .orElse(false); + } catch (final Exception e) { + + log.warn("memcache put: {}", e.getMessage()); + } + } while (!success && tryCount < maxTry); + + if (!success) { + log.error("Failed to place item in couchbase"); + } + if (success && tryCount > 1) { + log.info("Connection restored OK"); + } + + available = success; + + return success; } diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy index 0c24348e6..2e5ecb2ca 100644 --- a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy +++ b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy @@ -19,7 +19,7 @@ class SimpleCouchbaseClientConnectionTest { @Before public void setup() { client = Mockito.mock(CouchbaseClient) - con = new CouchbaseDistributedMapClient(client) + con = new CouchbaseDistributedMapClient(client,1,1,1) } @Test public void testDelete() { @@ -35,7 +35,7 @@ class SimpleCouchbaseClientConnectionTest { @Test public void testGetDistributedCacheDisabled() { - con = new CouchbaseDistributedMapClient(null) + con = new CouchbaseDistributedMapClient(null,1,1,1) Optional result = con.get("key") assertThat(result, is(Optional.empty())) } diff --git a/micro-elasticache/build.gradle b/micro-elasticache/build.gradle index 4d29a8a78..054225854 100644 --- a/micro-elasticache/build.gradle +++ b/micro-elasticache/build.gradle @@ -8,6 +8,7 @@ dependencies { compile group: 'net.spy', name: 'spymemcached', version: '2.12.3' compile project(':micro-core') compile project(':micro-guava') + compile project(':micro-events') compile project(':micro-manifest-comparator') testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version:'2.3.3' testCompile(group: 'org.spockframework', name: 'spock-core', version:'0.7-groovy-2.0') { exclude(module: 'groovy-all') } diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java new file mode 100644 index 000000000..b205bbf37 --- /dev/null +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java @@ -0,0 +1,21 @@ +package com.aol.micro.server.elasticache; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; + +/** + * Created by gordonmorrow on 03/07/2017. + */ +@Configuration +public class ConfigureCacheTester { + + @Autowired + private ElasticacheConnectionTester elasticacheConnectionTester; + + + @Scheduled(fixedDelay = 60000) + public synchronized void runElasticacheConnectionTester(){ + elasticacheConnectionTester.scheduleAndLog(); + } +} diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java index 6947e8f16..2e7303e14 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java @@ -4,27 +4,18 @@ import com.aol.micro.server.distributed.DistributedMap; import lombok.extern.slf4j.Slf4j; -import net.spy.memcached.*; +import net.spy.memcached.MemcachedClient; import java.io.IOException; import java.net.URISyntaxException; -import java.util.ArrayList; - -import net.spy.memcached.auth.AuthDescriptor; -import net.spy.memcached.auth.PlainCallbackHandler; -import net.spy.memcached.MemcachedClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.util.StringUtils; import java.net.InetSocketAddress; -import java.util.List; -import java.util.Optional; - @Slf4j @Configuration public class ConfigureElasticache { diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java new file mode 100644 index 000000000..9c322ff44 --- /dev/null +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java @@ -0,0 +1,57 @@ +package com.aol.micro.server.elasticache; + +/** + * Created by gordonmorrow on 03/07/2017. + */ + +import com.aol.micro.server.distributed.DistributedMap; +import com.aol.micro.server.events.ScheduledJob; +import com.aol.micro.server.events.SystemData; +import lombok.extern.slf4j.Slf4j; +import net.spy.memcached.MemcachedClient; + +import java.util.Random; + +@Slf4j + public class ElasticacheConnectionTester implements ScheduledJob { + + private static final Random random = new Random(); + + private final DistributedMap cache; + private final MemcachedClient memcachedClient; + + public ElasticacheConnectionTester(DistributedMap cache, MemcachedClient memcachedClient) { + + this.cache = cache; + this.memcachedClient = memcachedClient; + } + + @Override + public SystemData scheduleAndLog() { + + log.trace("runTestConnection()..."); + boolean result = false; + try { + result = testConnection(); + } catch (RuntimeException e) { + log.debug("Could not connect to Cache" + e.getMessage()); + } + cache.setConnectionTested(result); + + log.debug("Testing Couchbase connection: {}", result); + return null; + + } + + private boolean testConnection() { + String key = "PING_TEST"; + log.trace("Testing connection using key {}", key); + + int testValue = random.nextInt(1111); + memcachedClient.set(key, 120, testValue); + int received = (Integer) memcachedClient.get(key); + + return received == testValue; + } + +} \ No newline at end of file diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java index a446ad486..2f61fc2d3 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java @@ -1,10 +1,8 @@ package com.aol.micro.server.elasticache; import lombok.extern.slf4j.Slf4j; - import java.util.Optional; import net.spy.memcached.MemcachedClient; -import lombok.extern.slf4j.Slf4j; import com.aol.micro.server.distributed.DistributedMap; @@ -80,8 +78,9 @@ public boolean put(final String key, int expiry, final Object value) { return success; } - @Override - public Optional get(String key) { + + @Override + public Optional get(String key) { return (Optional) Optional.ofNullable(memcachedClient.get(key)); } diff --git a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java index 1e9028f4b..d0925530e 100644 --- a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java +++ b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java @@ -3,12 +3,12 @@ import java.util.Optional; public interface DistributedMap { - boolean put(String key, V value); Optional get(String key); void delete(String key); default boolean put(String key, int expiry, V value){ return false; } - + default boolean isAvailable() { return false; } + default void setConnectionTested(boolean result){}; } \ No newline at end of file From c1293191e7d81e9c1c84fc764ece4ef00ef1dfbd Mon Sep 17 00:00:00 2001 From: Gordon Morrow Date: Mon, 3 Jul 2017 16:00:51 +0100 Subject: [PATCH 5/8] Updating to use DistributedCache interface --- .../aol/micro/server/CouchbaseResource.java | 5 ++-- .../aol/micro/server/CouchbaseResource.java | 5 ++-- .../aol/micro/server/CouchbaseResource.java | 7 +++--- .../aol/micro/server/CouchbaseResource.java | 5 ++-- .../couchbase/ConfigureCacheTester.java | 3 --- .../server/couchbase/ConfigureCouchbase.java | 15 +++++------- .../couchbase/CouchbaseConnectionTester.java | 8 +++---- ...a => CouchbaseDistributedCacheClient.java} | 23 +++++++++++++------ .../CouchbaseManifestComparator.java | 7 +++--- .../aol/micro/server/CouchbaseResource.java | 5 ++-- .../couchbase/ConfigureCouchbaseTest.java | 5 ++-- ...SimpleCouchbaseClientConnectionTest.groovy | 11 +++++---- .../elasticache/ConfigureCacheTester.java | 3 --- .../elasticache/ConfigureElasticache.java | 6 ++--- .../ElasticacheConnectionTester.java | 10 +++----- .../TransientElasticacheDataConnection.java | 22 ++++++++++++++---- .../ElasticacheManifestComparator.java | 7 +++--- .../server/distributed/DistributedCache.java | 13 +++++++++++ 18 files changed, 92 insertions(+), 68 deletions(-) rename micro-couchbase/src/main/java/com/aol/micro/server/couchbase/{CouchbaseDistributedMapClient.java => CouchbaseDistributedCacheClient.java} (85%) create mode 100644 micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java diff --git a/micro-async-data-loader/src/test/java/app/loader/scheduled/com/aol/micro/server/CouchbaseResource.java b/micro-async-data-loader/src/test/java/app/loader/scheduled/com/aol/micro/server/CouchbaseResource.java index 7b5e218ec..6d0969b3a 100644 --- a/micro-async-data-loader/src/test/java/app/loader/scheduled/com/aol/micro/server/CouchbaseResource.java +++ b/micro-async-data-loader/src/test/java/app/loader/scheduled/com/aol/micro/server/CouchbaseResource.java @@ -4,6 +4,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.collections.immutable.LinkedListX; import cyclops.control.Maybe; import org.springframework.beans.factory.annotation.Autowired; @@ -20,11 +21,11 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; private volatile LinkedListX dataLoads = LinkedListX.empty(); @Autowired - public CouchbaseResource(DistributedMap client, EventBus bus) { + public CouchbaseResource(DistributedCache client, EventBus bus) { this.client = client; bus.register(this); } diff --git a/micro-async-data-loader/src/test/java/app/loader/scheduled/off/com/aol/micro/server/CouchbaseResource.java b/micro-async-data-loader/src/test/java/app/loader/scheduled/off/com/aol/micro/server/CouchbaseResource.java index 190d1eb13..a06298f7f 100644 --- a/micro-async-data-loader/src/test/java/app/loader/scheduled/off/com/aol/micro/server/CouchbaseResource.java +++ b/micro-async-data-loader/src/test/java/app/loader/scheduled/off/com/aol/micro/server/CouchbaseResource.java @@ -4,6 +4,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.collections.immutable.LinkedListX; import cyclops.control.Maybe; import org.springframework.beans.factory.annotation.Autowired; @@ -20,11 +21,11 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; private volatile LinkedListX dataLoads = LinkedListX.empty(); @Autowired - public CouchbaseResource(DistributedMap client, EventBus bus) { + public CouchbaseResource(DistributedCache client, EventBus bus) { this.client = client; bus.register(this); } diff --git a/micro-async-data-writer/src/test/java/app/cleaner/off/scheduled/com/aol/micro/server/CouchbaseResource.java b/micro-async-data-writer/src/test/java/app/cleaner/off/scheduled/com/aol/micro/server/CouchbaseResource.java index 9d30b5aaf..683da1a18 100644 --- a/micro-async-data-writer/src/test/java/app/cleaner/off/scheduled/com/aol/micro/server/CouchbaseResource.java +++ b/micro-async-data-writer/src/test/java/app/cleaner/off/scheduled/com/aol/micro/server/CouchbaseResource.java @@ -4,13 +4,12 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.collections.immutable.LinkedListX; import cyclops.control.Maybe; import org.springframework.beans.factory.annotation.Autowired; - import com.aol.micro.server.auto.discovery.Rest; -import com.aol.micro.server.distributed.DistributedMap; import com.aol.micro.server.events.SystemData; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; @@ -19,12 +18,12 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; private volatile LinkedListX dataCleans = LinkedListX.empty(); @Autowired - public CouchbaseResource(DistributedMap client, EventBus bus) { + public CouchbaseResource(DistributedCache client, EventBus bus) { this.client = client; bus.register(this); } diff --git a/micro-async-data-writer/src/test/java/app/cleaner/scheduled/com/aol/micro/server/CouchbaseResource.java b/micro-async-data-writer/src/test/java/app/cleaner/scheduled/com/aol/micro/server/CouchbaseResource.java index cc0c3008e..ae0348476 100644 --- a/micro-async-data-writer/src/test/java/app/cleaner/scheduled/com/aol/micro/server/CouchbaseResource.java +++ b/micro-async-data-writer/src/test/java/app/cleaner/scheduled/com/aol/micro/server/CouchbaseResource.java @@ -4,6 +4,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.collections.immutable.LinkedListX; import cyclops.control.Maybe; import org.springframework.beans.factory.annotation.Autowired; @@ -19,12 +20,12 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; private volatile LinkedListX dataCleans = LinkedListX.empty(); @Autowired - public CouchbaseResource(DistributedMap client, EventBus bus) { + public CouchbaseResource(DistributedCache client, EventBus bus) { this.client = client; bus.register(this); } diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java index 71e5dc8df..22b79fbdc 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCacheTester.java @@ -4,9 +4,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; -/** - * Created by gordonmorrow on 03/07/2017. - */ @Configuration public class ConfigureCacheTester { diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java index 92a18cf87..ab678e01f 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java @@ -7,8 +7,7 @@ import java.util.List; import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -21,11 +20,10 @@ import lombok.Setter; +@Slf4j @Configuration public class ConfigureCouchbase { - private Logger logger = LoggerFactory.getLogger(getClass()); - @Value("${couchbase.manifest.comparison.key:default-key}") private String defaultCouchbaseManifestComparisonKey; @Setter @@ -56,13 +54,13 @@ public class ConfigureCouchbase { @SuppressWarnings("rawtypes") @Bean(name = "couchbaseDistributedMap") - public CouchbaseDistributedMapClient simpleCouchbaseClient() throws IOException, URISyntaxException { + public CouchbaseDistributedCacheClient simpleCouchbaseClient() throws IOException, URISyntaxException { if (couchbaseClientEnabled) { - return new CouchbaseDistributedMapClient( + return new CouchbaseDistributedCacheClient( couchbaseClient(), expiresAfterSeconds, maxTry, retryAfterSec); } else { - return new CouchbaseDistributedMapClient( + return new CouchbaseDistributedCacheClient( null, expiresAfterSeconds, maxTry, retryAfterSec); } @@ -71,7 +69,7 @@ public CouchbaseDistributedMapClient simpleCouchbaseClient() throws IOException, @Bean(name = "couchbaseClient") public CouchbaseClient couchbaseClient() throws IOException, URISyntaxException { if (couchbaseClientEnabled) { - logger.info("Creating CouchbaseClient for servers: {}", couchbaseServers); + log.info("Creating CouchbaseClient for servers: {}", couchbaseServers); CouchbaseConnectionFactoryBuilder builder = new CouchbaseConnectionFactoryBuilder(); builder.setOpTimeout(opTimeout); CouchbaseConnectionFactory cf = builder.buildCouchbaseConnection(getServersList(), couchbaseBucket, @@ -81,7 +79,6 @@ public CouchbaseClient couchbaseClient() throws IOException, URISyntaxException cf); } return null; - } @Bean diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java index fa378f676..a732826df 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseConnectionTester.java @@ -1,9 +1,7 @@ package com.aol.micro.server.couchbase; -/** - * Created by gordonmorrow on 03/07/2017. - */ +import com.aol.micro.server.distributed.DistributedCache; import com.aol.micro.server.distributed.DistributedMap; import com.aol.micro.server.events.ScheduledJob; import com.aol.micro.server.events.SystemData; @@ -17,10 +15,10 @@ public class CouchbaseConnectionTester implements ScheduledJob { private static final Random random = new Random(); - private final DistributedMap cache; + private final DistributedCache cache; private final CouchbaseClient couchbaseClient; - public CouchbaseConnectionTester(DistributedMap cache, CouchbaseClient couchbaseClient) { + public CouchbaseConnectionTester(DistributedCache cache, CouchbaseClient couchbaseClient) { this.cache = cache; this.couchbaseClient = couchbaseClient; diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedCacheClient.java similarity index 85% rename from micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java rename to micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedCacheClient.java index b7738638f..6c2aefbee 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedMapClient.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/CouchbaseDistributedCacheClient.java @@ -4,15 +4,15 @@ import java.util.concurrent.ExecutionException; import com.aol.cyclops2.util.ExceptionSoftener; +import com.aol.micro.server.distributed.DistributedCache; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.aol.micro.server.distributed.DistributedMap; import com.couchbase.client.CouchbaseClient; @Slf4j -public class CouchbaseDistributedMapClient implements DistributedMap { +public class CouchbaseDistributedCacheClient implements DistributedCache { private final Logger logger = LoggerFactory.getLogger(getClass()); private volatile boolean available = false; @@ -20,8 +20,8 @@ public class CouchbaseDistributedMapClient implements DistributedMap { private final Optional couchbaseClient; private final int expiresAfterSeconds, maxTry, retryAfterSec; - public CouchbaseDistributedMapClient(CouchbaseClient couchbaseClient, final int expiresAfterSeconds, - final int maxTry, final int retryAfterSec) { + public CouchbaseDistributedCacheClient(CouchbaseClient couchbaseClient, final int expiresAfterSeconds, + final int maxTry, final int retryAfterSec) { this.couchbaseClient = Optional.ofNullable(couchbaseClient); this.expiresAfterSeconds = expiresAfterSeconds; @@ -59,7 +59,7 @@ public boolean put(final String key, final V value) { log.info("Connection restored OK"); } - available = success; + setConnectionTested(success); return success; } @@ -92,7 +92,7 @@ public boolean put(final String key, int expiry, final V value) { log.info("Connection restored OK"); } - available = success; + setConnectionTested(success); return success; @@ -124,7 +124,6 @@ private boolean putInternal(final CouchbaseClient client, final String key, fina @Override public Optional get(String key) { return couchbaseClient.map(c -> (V) c.get(key)); - } @Override @@ -132,4 +131,14 @@ public void delete(String key) { couchbaseClient.map(c -> c.delete(key)); } + @Override + public boolean isAvailable(){ + return available; + } + + @Override + public void setConnectionTested(boolean result){ + available = result; + } + } diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java index 328fea2b2..161c0f6b4 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/manifest/comparator/CouchbaseManifestComparator.java @@ -4,6 +4,7 @@ import java.util.Optional; import com.aol.cyclops2.util.ExceptionSoftener; +import com.aol.micro.server.distributed.DistributedCache; import cyclops.control.Xor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +82,7 @@ public class CouchbaseManifestComparator implements ManifestComparator { @Getter private volatile String versionedKey; - private final DistributedMap connection; + private final DistributedCache connection; /** * Create a ManifestComparator with the supplied distributed map client Data @@ -92,7 +93,7 @@ public class CouchbaseManifestComparator implements ManifestComparator { * @param connection * DistributedMapClient to store comparison data */ - public CouchbaseManifestComparator(DistributedMap connection) { + public CouchbaseManifestComparator(DistributedCache connection) { this.key = "default"; this.versionedKey = newKey(1L).toJson(); this.connection = connection; @@ -110,7 +111,7 @@ public CouchbaseManifestComparator(DistributedMap connection) { * @param connection * DistributeMapClient connection */ - public CouchbaseManifestComparator(String key, DistributedMap connection) { + public CouchbaseManifestComparator(String key, DistributedCache connection) { this.key = key; this.versionedKey = newKey(1L).toJson(); this.connection = connection; diff --git a/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseResource.java b/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseResource.java index 6676cb2b7..a8338c5f8 100644 --- a/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseResource.java +++ b/micro-couchbase/src/test/java/app/couchbase/distributed/map/com/aol/micro/server/CouchbaseResource.java @@ -3,6 +3,7 @@ import javax.ws.rs.GET; import javax.ws.rs.Path; +import com.aol.micro.server.distributed.DistributedCache; import org.springframework.beans.factory.annotation.Autowired; import com.aol.micro.server.auto.discovery.Rest; @@ -12,10 +13,10 @@ @Rest public class CouchbaseResource { - private final DistributedMap client; + private final DistributedCache client; @Autowired - public CouchbaseResource(DistributedMap client) { + public CouchbaseResource(DistributedCache client) { this.client = client; } diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/ConfigureCouchbaseTest.java b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/ConfigureCouchbaseTest.java index 3ce542b9a..0d9492b70 100644 --- a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/ConfigureCouchbaseTest.java +++ b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/ConfigureCouchbaseTest.java @@ -8,11 +8,10 @@ import java.net.URISyntaxException; import java.util.Optional; +import com.aol.micro.server.distributed.DistributedCache; import org.junit.Before; import org.junit.Test; -import com.aol.micro.server.distributed.DistributedMap; - public class ConfigureCouchbaseTest { ConfigureCouchbase config; @@ -25,7 +24,7 @@ public void setUp() throws Exception { @Test public void createDistributedCacheMemcachedOff() throws IOException, URISyntaxException { config.setCouchbaseClientEnabled(false); - DistributedMap cache = config.simpleCouchbaseClient(); + DistributedCache cache = config.simpleCouchbaseClient(); assertThat(cache.get("hello"), is(Optional.empty())); } diff --git a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy index 2e5ecb2ca..3c93a8451 100644 --- a/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy +++ b/micro-couchbase/src/test/java/com/aol/micro/server/couchbase/SimpleCouchbaseClientConnectionTest.groovy @@ -1,4 +1,6 @@ -package com.aol.micro.server.couchbase; +package com.aol.micro.server.couchbase + +import com.aol.micro.server.distributed.DistributedCache; import static org.hamcrest.Matchers.is import static org.junit.Assert.* @@ -8,18 +10,17 @@ import org.junit.Before import org.junit.Test import org.mockito.Mockito -import com.aol.micro.server.distributed.DistributedMap; import com.couchbase.client.CouchbaseClient class SimpleCouchbaseClientConnectionTest { CouchbaseClient client - DistributedMap con + DistributedCache con @Before public void setup() { client = Mockito.mock(CouchbaseClient) - con = new CouchbaseDistributedMapClient(client,1,1,1) + con = new CouchbaseDistributedCacheClient(client,1,1,1) } @Test public void testDelete() { @@ -35,7 +36,7 @@ class SimpleCouchbaseClientConnectionTest { @Test public void testGetDistributedCacheDisabled() { - con = new CouchbaseDistributedMapClient(null,1,1,1) + con = new CouchbaseDistributedCacheClient(null,1,1,1) Optional result = con.get("key") assertThat(result, is(Optional.empty())) } diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java index b205bbf37..b78217448 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java @@ -4,9 +4,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; -/** - * Created by gordonmorrow on 03/07/2017. - */ @Configuration public class ConfigureCacheTester { diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java index 2e7303e14..f84c60062 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureElasticache.java @@ -1,8 +1,6 @@ package com.aol.micro.server.elasticache; - - -import com.aol.micro.server.distributed.DistributedMap; +import com.aol.micro.server.distributed.DistributedCache; import lombok.extern.slf4j.Slf4j; import net.spy.memcached.MemcachedClient; @@ -39,7 +37,7 @@ public ConfigureElasticache( @Value("${elasticache.hostname:null}") String hostn @Bean(name = "transientCache") - public DistributedMap transientCache() throws IOException, URISyntaxException { + public DistributedCache transientCache() throws IOException, URISyntaxException { try { log.info("Creating Memcached Data connection for elasticache cluster: {}", hostname); return new TransientElasticacheDataConnection(createMemcachedClient(), retryAfterSecs, maxRetries); diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java index 9c322ff44..0a0afb203 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java @@ -1,10 +1,6 @@ package com.aol.micro.server.elasticache; -/** - * Created by gordonmorrow on 03/07/2017. - */ - -import com.aol.micro.server.distributed.DistributedMap; +import com.aol.micro.server.distributed.DistributedCache; import com.aol.micro.server.events.ScheduledJob; import com.aol.micro.server.events.SystemData; import lombok.extern.slf4j.Slf4j; @@ -17,10 +13,10 @@ public class ElasticacheConnectionTester implements ScheduledJob { private static final Random random = new Random(); - private final DistributedMap cache; + private final DistributedCache cache; private final MemcachedClient memcachedClient; - public ElasticacheConnectionTester(DistributedMap cache, MemcachedClient memcachedClient) { + public ElasticacheConnectionTester(DistributedCache cache, MemcachedClient memcachedClient) { this.cache = cache; this.memcachedClient = memcachedClient; diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java index 2f61fc2d3..745f6f940 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/TransientElasticacheDataConnection.java @@ -1,20 +1,21 @@ package com.aol.micro.server.elasticache; +import com.aol.micro.server.distributed.DistributedCache; import lombok.extern.slf4j.Slf4j; import java.util.Optional; import net.spy.memcached.MemcachedClient; -import com.aol.micro.server.distributed.DistributedMap; - @Slf4j -public class TransientElasticacheDataConnection implements DistributedMap { +public class TransientElasticacheDataConnection implements DistributedCache { private final MemcachedClient memcachedClient; private final int retryAfterSec; private final int maxTry; private final int defaultExpiry = 3600; + private volatile boolean available = false; + - public TransientElasticacheDataConnection(MemcachedClient memcachedClient,int retryAfterSec, int maxTry) { + public TransientElasticacheDataConnection(MemcachedClient memcachedClient,int retryAfterSec, int maxTry) { this.memcachedClient = memcachedClient; this.retryAfterSec = retryAfterSec; this.maxTry = maxTry; @@ -46,6 +47,7 @@ public boolean put(final String key, final Object value) { if (success && tryCount > 1) { log.info("Connection restored OK to Elasticache cluster"); } + setConnectionTested(success); return success; } @@ -75,6 +77,7 @@ public boolean put(final String key, int expiry, final Object value) { if (success && tryCount > 1) { log.info("Connection restored OK to Elasticache cluster"); } + setConnectionTested(success); return success; } @@ -90,4 +93,15 @@ public void delete(String key) { } + @Override + public boolean isAvailable() { + return available; + } + + @Override + public void setConnectionTested(boolean result) { + available = result; + + } + } \ No newline at end of file diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java index 97e81cc02..4aea531c6 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java @@ -1,6 +1,7 @@ package com.aol.micro.server.elasticache.manifest.comparator; import com.aol.cyclops2.util.ExceptionSoftener; +import com.aol.micro.server.distributed.DistributedCache; import com.aol.micro.server.distributed.DistributedMap; import com.aol.micro.server.manifest.Data; import com.aol.micro.server.manifest.ManifestComparator; @@ -79,7 +80,7 @@ public MyDataService(ManifestComparator comparator) { @Getter private volatile String versionedKey; - private final DistributedMap connection; + private final DistributedCache connection; /** * Create a ManifestComparator with the supplied distributed map client Data @@ -90,7 +91,7 @@ public MyDataService(ManifestComparator comparator) { * @param connection * DistributedMapClient to store comparison data */ - public ElasticacheManifestComparator(DistributedMap connection) { + public ElasticacheManifestComparator(DistributedCache connection) { this.key = "default"; this.versionedKey = newKey(1L).toJson(); this.connection = connection; @@ -108,7 +109,7 @@ public ElasticacheManifestComparator(DistributedMap connection) { * @param connection * DistributeMapClient connection */ - public ElasticacheManifestComparator(String key, DistributedMap connection) { + public ElasticacheManifestComparator(String key, DistributedCache connection) { this.key = key; this.versionedKey = newKey(1L).toJson(); this.connection = connection; diff --git a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java new file mode 100644 index 000000000..9fd909a36 --- /dev/null +++ b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java @@ -0,0 +1,13 @@ +package com.aol.micro.server.distributed; + + +import java.util.Optional; + +public interface DistributedCache { + boolean put(String key, V value); + Optional get(String key); + void delete(String key); + boolean put(String key, int expiry, V value); + boolean isAvailable(); + void setConnectionTested(boolean result); +} From 5d2fe578661cb048d4c21313a146d0bd0ac3ff4e Mon Sep 17 00:00:00 2001 From: Gordon Morrow Date: Mon, 3 Jul 2017 16:22:49 +0100 Subject: [PATCH 6/8] Updating as per PR comments --- .../com/aol/micro/server/couchbase/ConfigureCouchbase.java | 6 +++--- .../server/elasticache/ElasticacheConnectionTester.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java index ab678e01f..21d8708a4 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/ConfigureCouchbase.java @@ -44,13 +44,13 @@ public class ConfigureCouchbase { private long opTimeout; @Value("${distributed.cache.default.expiration:691200}") - private int expiresAfterSeconds = 691200; + private int expiresAfterSeconds; @Value("${distributed.cache.maxTry:5}") - private int maxTry = 5; + private int maxTry; @Value("${distributed.cache.retryAfterSec:1}") - private int retryAfterSec = 1; + private int retryAfterSec; @SuppressWarnings("rawtypes") @Bean(name = "couchbaseDistributedMap") diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java index 0a0afb203..1b8a23bc8 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ElasticacheConnectionTester.java @@ -30,7 +30,7 @@ public SystemData scheduleAndLog() { try { result = testConnection(); } catch (RuntimeException e) { - log.debug("Could not connect to Cache" + e.getMessage()); + log.error("Could not connect to Cache" + e.getMessage()); } cache.setConnectionTested(result); From 27f84fca2234570963d7316756995e95acd474a7 Mon Sep 17 00:00:00 2001 From: Gordon Morrow Date: Mon, 3 Jul 2017 16:55:15 +0100 Subject: [PATCH 7/8] Updating to DistributedCache to extend DistributedMap --- .../com/aol/micro/server/distributed/DistributedCache.java | 5 +---- .../com/aol/micro/server/distributed/DistributedMap.java | 5 ----- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java index 9fd909a36..81dcdb5a1 100644 --- a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java +++ b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedCache.java @@ -3,10 +3,7 @@ import java.util.Optional; -public interface DistributedCache { - boolean put(String key, V value); - Optional get(String key); - void delete(String key); +public interface DistributedCache extends DistributedMap { boolean put(String key, int expiry, V value); boolean isAvailable(); void setConnectionTested(boolean result); diff --git a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java index d0925530e..03cf7494a 100644 --- a/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java +++ b/micro-manifest-comparator/src/main/java/com/aol/micro/server/distributed/DistributedMap.java @@ -6,9 +6,4 @@ public interface DistributedMap { boolean put(String key, V value); Optional get(String key); void delete(String key); - default boolean put(String key, int expiry, V value){ - return false; - } - default boolean isAvailable() { return false; } - default void setConnectionTested(boolean result){}; } \ No newline at end of file From bcf247c9870b3453dd4018cfbbfeefa0205c0848 Mon Sep 17 00:00:00 2001 From: Gordon Morrow Date: Tue, 4 Jul 2017 12:17:33 +0100 Subject: [PATCH 8/8] Updating based on PR comments --- .../elasticache/ConfigureCacheTester.java | 5 ++- .../ElasticacheManifestComparator.java | 39 +++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java index b78217448..007c397f8 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/ConfigureCacheTester.java @@ -1,6 +1,7 @@ package com.aol.micro.server.elasticache; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; @@ -10,8 +11,10 @@ public class ConfigureCacheTester { @Autowired private ElasticacheConnectionTester elasticacheConnectionTester; + @Value("${elasticache.connection.checker.frequency:60000}") + private final int fixedDelay = 60000; - @Scheduled(fixedDelay = 60000) + @Scheduled(fixedDelay = fixedDelay) public synchronized void runElasticacheConnectionTester(){ elasticacheConnectionTester.scheduleAndLog(); } diff --git a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java index 4aea531c6..8a3270484 100644 --- a/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java +++ b/micro-elasticache/src/main/java/com/aol/micro/server/elasticache/manifest/comparator/ElasticacheManifestComparator.java @@ -2,7 +2,6 @@ import com.aol.cyclops2.util.ExceptionSoftener; import com.aol.micro.server.distributed.DistributedCache; -import com.aol.micro.server.distributed.DistributedMap; import com.aol.micro.server.manifest.Data; import com.aol.micro.server.manifest.ManifestComparator; import com.aol.micro.server.manifest.ManifestComparatorKeyNotFoundException; @@ -13,12 +12,14 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.Optional; +@Slf4j @AllArgsConstructor(access = AccessLevel.PRIVATE) public class ElasticacheManifestComparator implements ManifestComparator { @@ -70,13 +71,9 @@ public MyDataService(ManifestComparator comparator) { * @param */ - private final Logger logger = LoggerFactory.getLogger(getClass()); - private final String key; - private volatile Xor data = Xor.secondary(null); // Void represents - // an unitialized - // state + private volatile Optional data = Optional.empty(); @Getter private volatile String versionedKey; @@ -135,11 +132,11 @@ private VersionedKey newKey(Long version) { } private VersionedKey increment() { - VersionedKey currentVersionedKey = loadKeyFromCouchbase(); + VersionedKey currentVersionedKey = loadKeyFromElasticache(); return currentVersionedKey.withVersion(currentVersionedKey.getVersion() + 1); } - private VersionedKey loadKeyFromCouchbase() { + private VersionedKey loadKeyFromElasticache() { Optional optionalKey = connection.get(key); return optionalKey.flatMap(val -> Optional.of(JacksonUtil.convertFromJson(val, VersionedKey.class))) .orElse(newKey(0L)); @@ -149,7 +146,7 @@ private VersionedKey loadKeyFromCouchbase() { @Override @SneakyThrows public T getData() { - while (data.isSecondary()) { + while (!data.isPresent()) { Thread.sleep(500); } return data.get(); @@ -157,7 +154,7 @@ public T getData() { @Override public T getCurrentData() { - return data.visit(present -> present, () -> null); + return data.orElse(null); } /** @@ -166,7 +163,7 @@ public T getCurrentData() { @Override public boolean isOutOfDate() { - return !versionedKey.equals(loadKeyFromCouchbase().toJson()); + return !versionedKey.equals(loadKeyFromElasticache().toJson()); } /** @@ -174,13 +171,13 @@ public boolean isOutOfDate() { */ @Override public synchronized boolean load() { - Xor oldData = data; + Optional oldData = data; String oldKey = versionedKey; try { if (isOutOfDate()) { String newVersionedKey = (String) connection.get(key) .get(); - data = Xor.primary((T) nonAtomicload(newVersionedKey)); + data = (Optional) nonAtomicload(newVersionedKey); versionedKey = newVersionedKey; } else { return false; @@ -188,7 +185,7 @@ public synchronized boolean load() { } catch (Throwable e) { data = oldData; versionedKey = oldKey; - logger.debug(e.getMessage(), e); + log.debug(e.getMessage(), e); throw ExceptionSoftener.throwSoftenedException(e); } return true; @@ -203,7 +200,7 @@ private Object nonAtomicload(String newVersionedKey) throws Throwable { + newVersionedKey + " - likely data changed during read"); }); - logger.info("Loaded new data with date {} for key {}, versionedKey {}, versionedKey from data ", + log.info("Loaded new data with date {} for key {}, versionedKey {}, versionedKey from data ", new Object[] { data.getDate(), key, newVersionedKey, data.getVersionedKey() }); return data.getData(); } @@ -223,8 +220,8 @@ public void cleanAll() { */ @Override public void clean(int numberToClean) { - logger.info("Attempting to delete the last {} records for key {}", numberToClean, key); - VersionedKey currentVersionedKey = loadKeyFromCouchbase(); + log.info("Attempting to delete the last {} records for key {}", numberToClean, key); + VersionedKey currentVersionedKey = loadKeyFromElasticache(); long start = 0; if (numberToClean != -1) start = currentVersionedKey.getVersion() - numberToClean; @@ -232,7 +229,7 @@ public void clean(int numberToClean) { delete(currentVersionedKey.withVersion(i) .toJson()); } - logger.info("Finished deleting the last {} records for key {}", numberToClean, key); + log.info("Finished deleting the last {} records for key {}", numberToClean, key); } private void delete(String withVersion) { @@ -253,14 +250,14 @@ private void delete(String withVersion) { */ @Override public void saveAndIncrement(T data) { - Xor oldData = this.data; + Optional oldData = this.data; VersionedKey newVersionedKey = increment(); - logger.info("Saving data with key {}, new version is {}", key, newVersionedKey.toJson()); + log.info("Saving data with key {}, new version is {}", key, newVersionedKey.toJson()); connection.put(newVersionedKey.toJson(), new Data( data, new Date(), newVersionedKey.toJson())); connection.put(key, newVersionedKey.toJson()); try { - this.data = Xor.primary(data); + this.data = (Optional) data; delete(versionedKey); } catch (Throwable t) {