From e23e889b3e0b90907d1cecae0007bbd7a242653b Mon Sep 17 00:00:00 2001 From: lbwexler Date: Sat, 21 Sep 2024 21:26:30 -0400 Subject: [PATCH 01/12] CacheNext --- .../groovy/io/xh/hoist/BaseService.groovy | 42 ++----- .../groovy/io/xh/hoist/cache/BaseCache.groovy | 22 +--- .../groovy/io/xh/hoist/cache/Cache.groovy | 24 +++- .../io/xh/hoist/cache/CachedValue.groovy | 115 ++++++++++++++---- .../xh/hoist/cache/CachedValueChanged.groovy | 20 +++ .../io/xh/hoist/cache/CachedValueEntry.groovy | 58 +++++++++ 6 files changed, 206 insertions(+), 75 deletions(-) create mode 100644 src/main/groovy/io/xh/hoist/cache/CachedValueChanged.groovy create mode 100644 src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy diff --git a/src/main/groovy/io/xh/hoist/BaseService.groovy b/src/main/groovy/io/xh/hoist/BaseService.groovy index 4f104442..1126d648 100644 --- a/src/main/groovy/io/xh/hoist/BaseService.groovy +++ b/src/main/groovy/io/xh/hoist/BaseService.groovy @@ -66,8 +66,6 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea protected final ConcurrentHashMap resources = [:] private boolean _destroyed = false - private Map _replicatedValues - private Map _localValues private final Logger _log = LoggerFactory.getLogger(this.class) @@ -224,7 +222,6 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea addResource(mp.name as String, new CachedValue([*:mp, svc: this])) } - /** * Create a managed subscription to events on the instance-local Grails event bus. * @@ -375,13 +372,22 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea Logger getInstanceLog() { _log } - //------------------------ - // Internal implementation - //------------------------ - protected String hzName(String name) { + /** + * Generate a name for a resource, appropriate for Hazelcast. + * Note that this allows us to group all Hazelcast resources by Service + * + * Not typically called directly by applications. Applications should aim to create + * Hazelcast distributed objects using the methods in this class. + * + * @internal + */ + String hzName(String name) { this.class.name + '_' + name } + //------------------------ + // Internal implementation + //------------------------ private T addResource(String name, T resource) { if (!name || resources.containsKey(name)) { def msg = 'Service resource requires a unique name. ' @@ -391,26 +397,4 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea resources[name] = resource return resource } - - /** - * @internal - for use by Cache. - */ - Map getMapForCache(Cache cache) { - // register with xh prefix to avoid collisions, allow filtering out in admin - cache.useCluster ? createReplicatedMap("xh_${cache.name}") : new ConcurrentHashMap() - } - - /** - * @internal - for use by CachedValue - */ - Map getMapForCachedValue(CachedValue cachedValue) { - // register with xh prefix to avoid collisions, allow filtering out in admin - if (cachedValue.useCluster) { - if (_replicatedValues == null) _replicatedValues = createReplicatedMap('xh_cachedValues') - return _replicatedValues - } else { - if (_localValues == null) _localValues = new ConcurrentHashMap() - return _localValues - } - } } diff --git a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy b/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy index 1b4ef191..eb9422de 100644 --- a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy @@ -7,14 +7,10 @@ package io.xh.hoist.cache - import groovy.transform.CompileStatic import io.xh.hoist.BaseService import io.xh.hoist.cluster.ClusterService -import static io.xh.hoist.util.DateTimeUtils.asEpochMilli -import static io.xh.hoist.util.DateTimeUtils.intervalElapsed - @CompileStatic abstract class BaseCache { @@ -72,6 +68,7 @@ abstract class BaseCache { this.serializeOldValue = serializeOldValue } + /** @param handler called on change with a {@link CacheValueChanged} object. */ abstract void addChangeHandler(Closure handler) @@ -93,21 +90,4 @@ abstract class BaseCache { def change = new CacheValueChanged(this, key, oldValue, value) onChange.each { it.call(change) } } - - protected boolean shouldExpire(Entry entry) { - if (expireFn) return expireFn(entry) - - if (expireTime) { - Long timestamp = getEntryTimestamp(entry), - expire = (expireTime instanceof Closure ? expireTime.call() : expireTime) as Long - return intervalElapsed(expire, timestamp) - } - return false - } - - protected Long getEntryTimestamp(Entry entry) { - if (!entry) return null - if (timestampFn) return asEpochMilli(timestampFn(entry.value)) - return entry.dateEntered - } } diff --git a/src/main/groovy/io/xh/hoist/cache/Cache.groovy b/src/main/groovy/io/xh/hoist/cache/Cache.groovy index 61a6ad13..343d83b6 100644 --- a/src/main/groovy/io/xh/hoist/cache/Cache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/Cache.groovy @@ -11,11 +11,16 @@ import groovy.transform.CompileStatic import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.BaseService +import io.xh.hoist.cluster.ClusterService import io.xh.hoist.util.Timer + +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeoutException +import static io.xh.hoist.cluster.ClusterService.hzInstance import static io.xh.hoist.util.DateTimeUtils.MINUTES import static io.xh.hoist.util.DateTimeUtils.SECONDS +import static io.xh.hoist.util.DateTimeUtils.asEpochMilli import static io.xh.hoist.util.DateTimeUtils.intervalElapsed import static java.lang.System.currentTimeMillis @@ -42,7 +47,7 @@ class Cache extends BaseCache { ) { super(name, svc, expireTime, expireFn, timestampFn, replicate, serializeOldValue) - _map = svc.getMapForCache(this) + _map = useCluster ? hzInstance.getMap(svc.hzName("xh_$name")) : new ConcurrentHashMap() if (onChange) addChangeHandler(onChange) cullTimer = svc.createTimer( @@ -186,4 +191,21 @@ class Cache extends BaseCache { svc.logDebug("Cache '$name' culled ${cullKeys.size()} out of $oldSize entries") } } + + protected boolean shouldExpire(Entry entry) { + if (expireFn) return expireFn.call(entry) + + if (expireTime) { + Long timestamp = getEntryTimestamp(entry), + expire = (expireTime instanceof Closure ? expireTime.call() : expireTime) as Long + return intervalElapsed(expire, timestamp) + } + return false + } + + protected Long getEntryTimestamp(Entry entry) { + if (!entry) return null + if (timestampFn) return asEpochMilli(timestampFn.call(entry.value)) + return entry.dateEntered + } } diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy index 1ec538cf..fa6985bc 100644 --- a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy @@ -1,13 +1,22 @@ package io.xh.hoist.cache -import com.hazelcast.replicatedmap.ReplicatedMap +import antlr.debug.MessageListener +import com.hazelcast.ringbuffer.Ringbuffer +import com.hazelcast.topic.ITopic +import com.hazelcast.topic.Message +import com.hazelcast.topic.ReliableMessageListener import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.BaseService +import io.xh.hoist.cluster.ClusterService +import io.xh.hoist.log.LogSupport +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.util.concurrent.TimeoutException import static io.xh.hoist.util.DateTimeUtils.SECONDS +import static io.xh.hoist.util.DateTimeUtils.asEpochMilli import static io.xh.hoist.util.DateTimeUtils.intervalElapsed import static java.lang.System.currentTimeMillis @@ -15,9 +24,12 @@ import static java.lang.System.currentTimeMillis * Similar to {@link Cache}, but a single value that can be read, written, and expired. * Like Cache, this object supports replication across the cluster. */ -class CachedValue extends BaseCache { +class CachedValue extends BaseCache implements LogSupport { - private final Map> _map + private final ITopic> topic + private final Ringbuffer> ring + private final String loggerName + private CachedValueEntry entry /** @internal - do not construct directly - use {@link BaseService#createCachedValue}. */ @NamedVariant @@ -28,23 +40,24 @@ class CachedValue extends BaseCache { @NamedParam Closure expireFn = null, @NamedParam Closure timestampFn = null, @NamedParam Boolean replicate = false, - @NamedParam Boolean serializeOldValue = false, @NamedParam Closure onChange = null ) { - super(name, svc, expireTime, expireFn, timestampFn, replicate, serializeOldValue) + super(name, svc, expireTime, expireFn, timestampFn, replicate, true) + loggerName = svc.instanceLog.name + '_' + name - _map = svc.getMapForCachedValue(this) + if (useCluster) { + topic = createUpdateTopic() + } if (onChange) addChangeHandler(onChange) } /** @returns the cached value. */ V get() { - def ret = _map[name] - if (ret && shouldExpire(ret)) { + if (shouldExpire(entry)) { set(null) return null } - return ret?.value + return entry?.value } /** @returns the cached value, or calls the provided closure to create, cache, and return. */ @@ -59,15 +72,9 @@ class CachedValue extends BaseCache { /** Set the value. */ void set(V value) { - def oldEntry = _map[name] - if (!serializeOldValue) oldEntry?.serializeValue = false - if (value == null) { - _map.remove(name) - } else { - _map[name] = new Entry(name, value, svc.instanceLog.name) - } - - if (!useCluster) fireOnChange(name, oldEntry?.value, value) + if (value == entry?.value) return + setInternal(new CachedValueEntry(value, loggerName)) + topic?.publish(entry) } /** Clear the value. */ @@ -77,7 +84,7 @@ class CachedValue extends BaseCache { /** @returns timestamp of the current entry, or null if none. */ Long getTimestamp() { - getEntryTimestamp(_map[name]) + getEntryTimestamp(entry) } /** @@ -105,15 +112,75 @@ class CachedValue extends BaseCache { } void addChangeHandler(Closure handler) { - if (!onChange && _map instanceof ReplicatedMap) { - _map.addEntryListener(new HzEntryListener(this), name) - } onChange << handler } //------------------- // Implementation //------------------- + void setInternal(CachedValueEntry newEntry) { + def oldEntry = entry + entry = newEntry + def change = new CachedValueChanged(this, oldEntry?.value, newEntry?.value) + onChange.each { it.call(change) } + } + + boolean asBoolean() { + return get() != null + } + + private boolean shouldExpire(CachedValueEntry entry) { + if (!entry) return null + if (expireFn) return expireFn(entry) + + if (expireTime) { + Long timestamp = getEntryTimestamp(entry), + expire = (expireTime instanceof Closure ? expireTime.call() : expireTime) as Long + return intervalElapsed(expire, timestamp) + } + return false + } + + private Long getEntryTimestamp(CachedValueEntry entry) { + if (!entry) return null + if (timestampFn) return asEpochMilli(timestampFn(entry.value)) + return entry.dateEntered + } + + private ITopic> createUpdateTopic() { + // 0) Create a durable topic with room for just a single item + def hzInstance = ClusterService.hzInstance, + hzConfig = hzInstance.config, + hzName = svc.hzName("xh_$name") + hzConfig.getRingbufferConfig(hzName).with { capacity = 1 } + hzConfig.getReliableTopicConfig(hzName).with { readBatchSize = 1 } + + // 1) ...rnd register for all events, including replay of event before this instance existed. + def ret = hzInstance.getReliableTopic(hzName) + ret.addMessageListener( + new ReliableMessageListener>() { + void onMessage(Message> message) { + def publisher = message.publishingMember + svc.logDebug('Received update', [source: publisher.getAttribute('instanceName')]) + if (publisher.localMember()) return + setInternal((CachedValueEntry) message.messageObject) + } + + long retrieveInitialSequence() { return 0 } + + void storeSequence(long sequence) {} + + boolean isLossTolerant() { return true } + + boolean isTerminal(Throwable e) { + svc.logError('Error handling update message', e) + return false + } + } + ) + return ret + } + Map getAdminStats() { def val = get(), ret = [ @@ -127,7 +194,7 @@ class CachedValue extends BaseCache { return ret } - boolean asBoolean() { - return get() != null + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) } } diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValueChanged.groovy b/src/main/groovy/io/xh/hoist/cache/CachedValueChanged.groovy new file mode 100644 index 00000000..2650c58a --- /dev/null +++ b/src/main/groovy/io/xh/hoist/cache/CachedValueChanged.groovy @@ -0,0 +1,20 @@ +package io.xh.hoist.cache + +class CachedValueChanged { + + /** Source object the changed value is contained within. */ + final CachedValue source + + /** OldValue. Null if value being set for the first time.*/ + final V oldValue + + /** New Value. Null if value being unset or removed. */ + final V value + + /** @internal */ + CachedValueChanged(CachedValue source, V oldValue, V value) { + this.source = source + this.oldValue = oldValue + this.value = value + } +} diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy b/src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy new file mode 100644 index 00000000..c09764b0 --- /dev/null +++ b/src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy @@ -0,0 +1,58 @@ +/* + * This file belongs to Hoist, an application development toolkit + * developed by Extremely Heavy Industries (www.xh.io | info@xh.io) + * + * Copyright © 2023 Extremely Heavy Industries Inc. + */ + +package io.xh.hoist.cache + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.KryoSerializable +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output +import io.xh.hoist.log.LogSupport +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import static java.lang.System.currentTimeMillis + +class CachedValueEntry implements KryoSerializable, LogSupport { + Long dateEntered + String loggerName + T value + + CachedValueEntry(T value, String loggerName) { + this.dateEntered = currentTimeMillis() + this.loggerName = loggerName + this.value = value + } + + CachedValueEntry() {} + + void write(Kryo kryo, Output output) { + output.writeLong(dateEntered) + output.writeString(loggerName) + withSingleTrace('Serializing value') { + kryo.writeClassAndObject(output, value) + } + } + + void read(Kryo kryo, Input input) { + dateEntered = input.readLong() + loggerName = input.readString() + withSingleTrace('Deserializing value') { + value = (T) kryo.readClassAndObject(input) + } + } + + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) + } + + private void withSingleTrace(String msg, Closure c) { + Long start = currentTimeMillis() + c() + logTrace(msg, [_elapsedMs: currentTimeMillis() - start]) + } +} From ca90bd4be4278ae8b5dfca20367bfc340b422c90 Mon Sep 17 00:00:00 2001 From: lbwexler Date: Sun, 22 Sep 2024 10:24:45 -0400 Subject: [PATCH 02/12] Checkpoint --- .../groovy/io/xh/hoist/cache/BaseCache.groovy | 34 ++++++++--------- .../groovy/io/xh/hoist/cache/Cache.groovy | 38 +++++++++++++------ .../cache/{Entry.groovy => CacheEntry.groovy} | 6 +-- ...hanged.groovy => CacheEntryChanged.groovy} | 6 +-- ...tener.groovy => CacheEntryListener.groovy} | 5 ++- .../io/xh/hoist/cache/CachedValue.groovy | 21 +++------- 6 files changed, 56 insertions(+), 54 deletions(-) rename src/main/groovy/io/xh/hoist/cache/{Entry.groovy => CacheEntry.groovy} (92%) rename src/main/groovy/io/xh/hoist/cache/{CacheValueChanged.groovy => CacheEntryChanged.groovy} (89%) rename src/main/groovy/io/xh/hoist/cache/{HzEntryListener.groovy => CacheEntryListener.groovy} (87%) diff --git a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy b/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy index eb9422de..87a64e24 100644 --- a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy @@ -10,9 +10,12 @@ package io.xh.hoist.cache import groovy.transform.CompileStatic import io.xh.hoist.BaseService import io.xh.hoist.cluster.ClusterService +import io.xh.hoist.log.LogSupport +import org.slf4j.Logger +import org.slf4j.LoggerFactory @CompileStatic -abstract class BaseCache { +abstract class BaseCache implements LogSupport { /** Service using this object. */ public final BaseService svc @@ -38,26 +41,19 @@ abstract class BaseCache { /** True to replicate this cache across a cluster (default false). */ public final boolean replicate - /** - * True to serialize old values to replicas in `CacheValueChanged` events (default false). - * - * Not serializing old values improves performance and is especially important for caches - * containing large objects that are expensive to serialize + deserialize. Enable only if your - * event handlers need access to the previous value. - */ - public final boolean serializeOldValue - - /** Handlers to be called on change with a {@link CacheValueChanged} object. */ + /** Handlers to be called on change with a {@link CacheEntryChanged} or a {@link CachedValueChanged} object. */ public final List onChange = [] + /** Log to use for logging. Set by default to an extension of the owning service. */ + protected final String loggerName + BaseCache( String name, BaseService svc, Object expireTime, Closure expireFn, Closure timestampFn, - boolean replicate, - boolean serializeOldValue + boolean replicate ) { this.name = name this.svc = svc @@ -65,11 +61,12 @@ abstract class BaseCache { this.expireFn = expireFn this.timestampFn = timestampFn this.replicate = replicate - this.serializeOldValue = serializeOldValue - } + // Allow fine grain logging for this within namespace of owning service + loggerName = "${svc.instanceLog.name}.${this.class.simpleName}(${name})" + } - /** @param handler called on change with a {@link CacheValueChanged} object. */ + /** @param handler called on change with a {@link CacheEntryChanged} or a {@link CachedValueChanged} object. */ abstract void addChangeHandler(Closure handler) /** Clear all values. */ @@ -86,8 +83,7 @@ abstract class BaseCache { //------------------------ // Implementation //------------------------ - protected void fireOnChange(Object key, V oldValue, V value) { - def change = new CacheValueChanged(this, key, oldValue, value) - onChange.each { it.call(change) } + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) } } diff --git a/src/main/groovy/io/xh/hoist/cache/Cache.groovy b/src/main/groovy/io/xh/hoist/cache/Cache.groovy index 343d83b6..2c29134e 100644 --- a/src/main/groovy/io/xh/hoist/cache/Cache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/Cache.groovy @@ -11,7 +11,6 @@ import groovy.transform.CompileStatic import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.BaseService -import io.xh.hoist.cluster.ClusterService import io.xh.hoist.util.Timer import java.util.concurrent.ConcurrentHashMap @@ -30,9 +29,20 @@ import static java.lang.System.currentTimeMillis @CompileStatic class Cache extends BaseCache { - private final Map> _map + private final Map> _map private final Timer cullTimer + + /** + * True to serialize old values to replicas in `CacheEntryChanged` events (default false). + * + * Not serializing old values improves performance and is especially important for caches + * containing large objects that are expensive to serialize + deserialize. Enable only if your + * event handlers need access to the previous value. + */ + public final boolean serializeOldValue + + /** @internal - do not construct directly - use {@link BaseService#createCache}. */ @NamedVariant Cache( @@ -45,11 +55,9 @@ class Cache extends BaseCache { @NamedParam Boolean serializeOldValue = false, @NamedParam Closure onChange = null ) { - super(name, svc, expireTime, expireFn, timestampFn, replicate, serializeOldValue) - + super(name, svc, expireTime, expireFn, timestampFn, replicate) + this.serializeOldValue = serializeOldValue _map = useCluster ? hzInstance.getMap(svc.hzName("xh_$name")) : new ConcurrentHashMap() - if (onChange) addChangeHandler(onChange) - cullTimer = svc.createTimer( name: "xh_${name}_cullEntries", runFn: this.&cullEntries, @@ -57,6 +65,9 @@ class Cache extends BaseCache { delay: true, primaryOnly: useCluster ) + if (onChange) { + addChangeHandler(onChange) + } } /** @returns the cached value at key. */ @@ -65,7 +76,7 @@ class Cache extends BaseCache { } /** @returns the cached Entry at key. */ - Entry getEntry(K key) { + CacheEntry getEntry(K key) { def ret = _map[key] if (ret && shouldExpire(ret)) { remove(key) @@ -86,7 +97,7 @@ class Cache extends BaseCache { if (obj == null) { _map.remove(key) } else { - _map.put(key, new Entry(key.toString(), obj, svc.instanceLog.name)) + _map.put(key, new CacheEntry(key.toString(), obj, loggerName)) } if (!useCluster) fireOnChange(this, oldEntry?.value, obj) } @@ -129,7 +140,7 @@ class Cache extends BaseCache { void addChangeHandler(Closure handler) { if (!onChange && _map instanceof ReplicatedMap) { - _map.addEntryListener(new HzEntryListener(this)) + _map.addEntryListener(new CacheEntryListener(this)) } onChange << handler } @@ -192,7 +203,7 @@ class Cache extends BaseCache { } } - protected boolean shouldExpire(Entry entry) { + protected boolean shouldExpire(CacheEntry entry) { if (expireFn) return expireFn.call(entry) if (expireTime) { @@ -203,9 +214,14 @@ class Cache extends BaseCache { return false } - protected Long getEntryTimestamp(Entry entry) { + protected Long getEntryTimestamp(CacheEntry entry) { if (!entry) return null if (timestampFn) return asEpochMilli(timestampFn.call(entry.value)) return entry.dateEntered } + + protected void fireOnChange(Object key, V oldValue, V value) { + def change = new CacheEntryChanged(this, key, oldValue, value) + onChange.each { it.call(change) } + } } diff --git a/src/main/groovy/io/xh/hoist/cache/Entry.groovy b/src/main/groovy/io/xh/hoist/cache/CacheEntry.groovy similarity index 92% rename from src/main/groovy/io/xh/hoist/cache/Entry.groovy rename to src/main/groovy/io/xh/hoist/cache/CacheEntry.groovy index 3bd79232..04fd659f 100644 --- a/src/main/groovy/io/xh/hoist/cache/Entry.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CacheEntry.groovy @@ -19,7 +19,7 @@ import org.slf4j.LoggerFactory import static java.lang.System.currentTimeMillis @CompileStatic -class Entry implements KryoSerializable, LogSupport { +class CacheEntry implements KryoSerializable, LogSupport { String key T value Long dateEntered @@ -27,7 +27,7 @@ class Entry implements KryoSerializable, LogSupport { boolean serializeValue - Entry(String key, T value, String loggerName) { + CacheEntry(String key, T value, String loggerName) { this.key = key this.value = value this.dateEntered = currentTimeMillis() @@ -35,7 +35,7 @@ class Entry implements KryoSerializable, LogSupport { this.serializeValue = true } - Entry() {} + CacheEntry() {} void write(Kryo kryo, Output output) { output.writeBoolean(serializeValue) diff --git a/src/main/groovy/io/xh/hoist/cache/CacheValueChanged.groovy b/src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy similarity index 89% rename from src/main/groovy/io/xh/hoist/cache/CacheValueChanged.groovy rename to src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy index ba9f1f13..eba7c926 100644 --- a/src/main/groovy/io/xh/hoist/cache/CacheValueChanged.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy @@ -1,9 +1,9 @@ package io.xh.hoist.cache -class CacheValueChanged { +class CacheEntryChanged { /** Source object the changed value is contained within. */ - final BaseCache source + final Cache source /** * Key of the value being changed. @@ -17,7 +17,7 @@ class CacheValueChanged { private final V _value /** @internal */ - CacheValueChanged(BaseCache source, K key, V oldValue, V value) { + CacheEntryChanged(Cache source, K key, V oldValue, V value) { this.source = source this.key = key _oldValue = oldValue diff --git a/src/main/groovy/io/xh/hoist/cache/HzEntryListener.groovy b/src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy similarity index 87% rename from src/main/groovy/io/xh/hoist/cache/HzEntryListener.groovy rename to src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy index 59333c07..f4b8ce12 100644 --- a/src/main/groovy/io/xh/hoist/cache/HzEntryListener.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy @@ -4,11 +4,12 @@ import com.hazelcast.core.EntryEvent import com.hazelcast.core.EntryListener import com.hazelcast.map.MapEvent -class HzEntryListener implements EntryListener { +/** @internal */ +class CacheEntryListener implements EntryListener { private BaseCache target - HzEntryListener(BaseCache target) { + CacheEntryListener(BaseCache target) { this.target = target } diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy index fa6985bc..45da2a92 100644 --- a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy @@ -1,7 +1,5 @@ package io.xh.hoist.cache -import antlr.debug.MessageListener -import com.hazelcast.ringbuffer.Ringbuffer import com.hazelcast.topic.ITopic import com.hazelcast.topic.Message import com.hazelcast.topic.ReliableMessageListener @@ -9,9 +7,6 @@ import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.BaseService import io.xh.hoist.cluster.ClusterService -import io.xh.hoist.log.LogSupport -import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.util.concurrent.TimeoutException @@ -24,11 +19,9 @@ import static java.lang.System.currentTimeMillis * Similar to {@link Cache}, but a single value that can be read, written, and expired. * Like Cache, this object supports replication across the cluster. */ -class CachedValue extends BaseCache implements LogSupport { +class CachedValue extends BaseCache { private final ITopic> topic - private final Ringbuffer> ring - private final String loggerName private CachedValueEntry entry /** @internal - do not construct directly - use {@link BaseService#createCachedValue}. */ @@ -42,13 +35,13 @@ class CachedValue extends BaseCache implements LogSupport { @NamedParam Boolean replicate = false, @NamedParam Closure onChange = null ) { - super(name, svc, expireTime, expireFn, timestampFn, replicate, true) - loggerName = svc.instanceLog.name + '_' + name - + super(name, svc, expireTime, expireFn, timestampFn, replicate) if (useCluster) { topic = createUpdateTopic() } - if (onChange) addChangeHandler(onChange) + if (onChange) { + addChangeHandler(onChange) + } } /** @returns the cached value. */ @@ -193,8 +186,4 @@ class CachedValue extends BaseCache implements LogSupport { } return ret } - - Logger getInstanceLog() { - LoggerFactory.getLogger(loggerName) - } } From 86c8c007683a29f70c7cd11379d92d6c244d0b33 Mon Sep 17 00:00:00 2001 From: lbwexler Date: Sun, 22 Sep 2024 12:47:37 -0400 Subject: [PATCH 03/12] Checkpoint --- build.gradle | 4 ++ .../init/io/xh/hoist/ClusterConfig.groovy | 39 +------------- .../clienterror/ClientErrorService.groovy | 10 ++-- .../groovy/io/xh/hoist/BaseService.groovy | 51 +++++++++++++++---- 4 files changed, 48 insertions(+), 56 deletions(-) diff --git a/build.gradle b/build.gradle index 36b57dcd..6e87944b 100644 --- a/build.gradle +++ b/build.gradle @@ -92,6 +92,10 @@ tasks.withType(GroovyCompile) { forkOptions.jvmArgs = ['-Dspring.output.ansi.enabled=always'] } } +tasks.named('jar') { + duplicatesStrategy = 'warn' +} + bootJar.enabled = false tasks.bootRun.doFirst { diff --git a/grails-app/init/io/xh/hoist/ClusterConfig.groovy b/grails-app/init/io/xh/hoist/ClusterConfig.groovy index 6b3f5e04..034b2215 100755 --- a/grails-app/init/io/xh/hoist/ClusterConfig.groovy +++ b/grails-app/init/io/xh/hoist/ClusterConfig.groovy @@ -99,7 +99,6 @@ class ClusterConfig { createDefaultConfigs(ret) createHibernateConfigs(ret) - createServiceConfigs(ret) KryoSupport.setAsGlobalSerializer(ret) @@ -124,12 +123,7 @@ class ClusterConfig { * Note that Hoist also introduces two properties for declarative configuration: * * - a static 'cache' property on Grails domain objects to customize associated - * Hibernate caches. - * - a static 'clusterConfigs' property on Grails services to customize any Hazelcast - * Distributed Objects associated with the service e.g. Hoist caches - * - * See toolbox's `Phase` object and Hoist Core's `ClientErrorService` for examples of these - * customizations. + * Hibernate caches. See toolbox's `Phase` object for examples. */ protected void createDefaultConfigs(Config config) { config.getMapConfig('default').with { @@ -194,35 +188,4 @@ class ClusterConfig { } } } - - private void createServiceConfigs(Config config) { - grailsApplication.serviceClasses.each { GrailsClass gc -> - // Apply any app customization specified by new static prop introduced by Hoist - Map objs = gc.getPropertyValue('clusterConfigs') - if (!objs) return - objs.forEach {String key, List value -> - def customizer = value[1] as Closure, - objConfig - switch (value[0]) { - case IMap: - objConfig = config.getMapConfig(gc.fullName + '_' + key) - break - case ReplicatedMap: - objConfig = config.getReplicatedMapConfig(gc.fullName + '_' + key) - break - case ISet: - objConfig = config.getSetConfig(gc.fullName + '_' + key) - break - case ITopic: - objConfig = config.getTopicConfig(key) - break - default: - throw new RuntimeException('Unable to configure Cluster object') - } - customizer.delegate = objConfig - customizer.resolveStrategy = Closure.DELEGATE_FIRST - customizer(objConfig) - } - } - } } \ No newline at end of file diff --git a/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy b/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy index e5bb9575..833e7d60 100644 --- a/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy +++ b/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy @@ -35,18 +35,14 @@ class ClientErrorService extends BaseService { def clientErrorEmailService, configService - static clusterConfigs = [ - clientErrors: [IMap, { - evictionConfig.size = 100 - }] - ] - - private IMap errors = createIMap('clientErrors') private int getMaxErrors() {configService.getMap('xhClientErrorConfig').maxErrors as int} private int getAlertInterval() {configService.getMap('xhClientErrorConfig').intervalMins * MINUTES} + private IMap errors + void init() { super.init() + errors = createIMap('clientErrors') {evictionConfig.size = 100} createTimer( name: 'processErrors', runFn: this.&processErrors, diff --git a/src/main/groovy/io/xh/hoist/BaseService.groovy b/src/main/groovy/io/xh/hoist/BaseService.groovy index 1126d648..58a43a03 100644 --- a/src/main/groovy/io/xh/hoist/BaseService.groovy +++ b/src/main/groovy/io/xh/hoist/BaseService.groovy @@ -8,6 +8,8 @@ package io.xh.hoist import com.hazelcast.collection.ISet +import com.hazelcast.config.Config +import com.hazelcast.config.NamedConfig import com.hazelcast.map.IMap import com.hazelcast.replicatedmap.ReplicatedMap import com.hazelcast.topic.ITopic @@ -39,6 +41,7 @@ import static io.xh.hoist.util.DateTimeUtils.SECONDS import static io.xh.hoist.util.DateTimeUtils.MINUTES import static io.xh.hoist.util.Utils.appContext import static io.xh.hoist.util.Utils.getConfigService +import static io.xh.hoist.cluster.ClusterService.hzInstance /** * Standard superclass for all Hoist and Application-level services. @@ -117,26 +120,32 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea //----------------------------------------------------------------- // Distributed Resources - // Use static reference to ClusterService to allow access pre-init. //------------------------------------------------------------------ /** * Create and return a reference to a Hazelcast IMap. * * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects * associated with this service. + * @param customizer - closure receiving a Hazelcast MapConfig. Mutate to customize. */ - IMap createIMap(String name) { - addResource(name, ClusterService.hzInstance.getMap(hzName(name))) + IMap createIMap(String name, Closure customizer = null) { + def hzName = hzName(name) + if (customizer) hzConfig.getMapConfig(hzName).with(customizer) + addResource(name, hzInstance.getMap(hzName)) } + /** * Create and return a reference to a Hazelcast ISet. * * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects * associated with this service. + * @param customizer - closure receiving a Hazelcast SetConfig. Mutate to customize. */ - ISet createISet(String name) { - addResource(name, ClusterService.hzInstance.getSet(hzName(name))) + ISet createISet(String name, Closure customizer = null) { + def hzName = hzName(name) + if (customizer) hzConfig.getSetConfig(hzName).with(customizer) + addResource(name, hzInstance.getSet(hzName)) } /** @@ -144,17 +153,35 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea * * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects * associated with this service. + * @param customizer - closure receiving a Hazelcast ReplicatedMapConfig. Mutate to customize. + */ + ReplicatedMap createReplicatedMap(String name, Closure customizer = null) { + def hzName = hzName(name) + if (customizer) hzConfig.getReplicatedMapConfig(hzName).with(customizer) + addResource(name, hzInstance.getReplicatedMap(hzName)) + } + + /** + * Create and return a reference to a Hazelcast Topic + * + * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects + * associated with this service. + * @param customizer - closure receiving a Hazelcast ReplicatedMapConfig. Mutate to customize. + * + * Note: To get a reference to an existing or default topic. use {@link #getTopic}. */ - ReplicatedMap createReplicatedMap(String name) { - addResource(name, ClusterService.hzInstance.getReplicatedMap(hzName(name))) + ITopic createTopic(String name, Closure customizer = null) { + def hzName = hzName(name) + if (customizer) hzConfig.getTopicConfig(hzName).with(customizer) + addResource(name, hzInstance.getTopic(hzName)) } /** - * Get a reference to a Hazelcast Replicated topic, useful to publish to a cluster-wide topic. + * Get a reference to an existing or default Hazelcast topic. * To subscribe to events fired by other services on a topic, use {@link #subscribeToTopic}. */ ITopic getTopic(String id) { - ClusterService.hzInstance.getTopic(id) + hzInstance.getTopic(id) } /** @@ -282,7 +309,6 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea } } - //------------------ // Cluster Support //------------------ @@ -371,7 +397,6 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea // Provide cached logger to LogSupport for possible performance benefit Logger getInstanceLog() { _log } - /** * Generate a name for a resource, appropriate for Hazelcast. * Note that this allows us to group all Hazelcast resources by Service @@ -397,4 +422,8 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea resources[name] = resource return resource } + + private Config getHzConfig() { + hzInstance.config + } } From 0ccd53e48f793d6e423bff6981bd665da0fb5bfd Mon Sep 17 00:00:00 2001 From: lbwexler Date: Mon, 23 Sep 2024 21:11:11 -0400 Subject: [PATCH 04/12] Checkpoint --- .../hoist/cluster/ClusterAdminService.groovy | 8 ++ .../io/xh/hoist/cluster/ClusterService.groovy | 42 ++++++++++ .../groovy/io/xh/hoist/BaseService.groovy | 36 ++------ .../groovy/io/xh/hoist/cache/Cache.groovy | 4 +- .../io/xh/hoist/cache/CachedValue.groovy | 18 ++-- src/main/groovy/io/xh/hoist/util/Timer.groovy | 83 ++++++++++--------- 6 files changed, 112 insertions(+), 79 deletions(-) diff --git a/grails-app/services/io/xh/hoist/cluster/ClusterAdminService.groovy b/grails-app/services/io/xh/hoist/cluster/ClusterAdminService.groovy index 6fb25799..ee85f8d0 100644 --- a/grails-app/services/io/xh/hoist/cluster/ClusterAdminService.groovy +++ b/grails-app/services/io/xh/hoist/cluster/ClusterAdminService.groovy @@ -7,6 +7,7 @@ import com.hazelcast.executor.impl.ExecutorServiceProxy import com.hazelcast.map.IMap import com.hazelcast.nearcache.NearCacheStats import com.hazelcast.replicatedmap.ReplicatedMap +import com.hazelcast.ringbuffer.impl.RingbufferProxy import com.hazelcast.topic.ITopic import io.xh.hoist.BaseService import io.xh.hoist.util.Utils @@ -142,6 +143,13 @@ class ClusterAdminService extends BaseService { publishOperationCount: stats.publishOperationCount, receiveOperationCount: stats.receiveOperationCount ] + case RingbufferProxy: + return [ + name : obj.getName(), + type : 'RingBuffer', + size : obj.size(), + capacity : obj.capacity() + ] case CacheProxy: def evictionConfig = obj.cacheConfig.evictionConfig, expiryPolicy = obj.cacheConfig.expiryPolicyFactory.create(), diff --git a/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy b/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy index cab63901..1c61127d 100644 --- a/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy +++ b/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy @@ -4,10 +4,15 @@ import com.hazelcast.cluster.Cluster import com.hazelcast.cluster.Member import com.hazelcast.cluster.MembershipEvent import com.hazelcast.cluster.MembershipListener +import com.hazelcast.collection.ISet +import com.hazelcast.config.Config import com.hazelcast.core.DistributedObject import com.hazelcast.core.Hazelcast import com.hazelcast.core.HazelcastInstance import com.hazelcast.core.IExecutorService +import com.hazelcast.map.IMap +import com.hazelcast.replicatedmap.ReplicatedMap +import com.hazelcast.topic.ITopic import io.xh.hoist.BaseService import io.xh.hoist.ClusterConfig import io.xh.hoist.exception.InstanceNotFoundException @@ -152,6 +157,39 @@ class ClusterService extends BaseService implements ApplicationListener [member.getAttribute('instanceName'), f.get()] } } + //------------------ + // Create Objects + //----------------- + static IMap configuredIMap(String name, Closure customizer = null) { + if (customizer) hzConfig.getMapConfig(name).with(customizer) + hzInstance.getMap(name) + } + + static ISet configuredISet(String name, Closure customizer = null) { + if (customizer) hzConfig.getSetConfig(name).with(customizer) + hzInstance.getSet(name) + } + + static ReplicatedMap configuredReplicatedMap(String name, Closure customizer = null) { + if (customizer) hzConfig.getReplicatedMapConfig(name).with(customizer) + hzInstance.getReplicatedMap(name) + } + + static ITopic configuredTopic(String name, Closure customizer = null) { + if (customizer) hzConfig.getTopicConfig(name).with(customizer) + hzInstance.getTopic(name) + } + + static ITopic configuredReliableTopic( + String name, + Closure customizer = null, + Closure ringBufferCustomizer = null + ) { + if (ringBufferCustomizer) hzConfig.getRingbufferConfig(name).with(ringBufferCustomizer) + if (customizer) hzConfig.getReliableTopicConfig(name).with(customizer) + hzInstance.getReliableTopic(name) + } + //------------------------------------ // Implementation //------------------------------------ @@ -188,6 +226,10 @@ class ClusterService extends BaseService implements ApplicationListener IMap createIMap(String name, Closure customizer = null) { - def hzName = hzName(name) - if (customizer) hzConfig.getMapConfig(hzName).with(customizer) - addResource(name, hzInstance.getMap(hzName)) + addResource(name, configuredIMap(hzName(name), customizer)) } - /** * Create and return a reference to a Hazelcast ISet. * @@ -143,9 +141,7 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea * @param customizer - closure receiving a Hazelcast SetConfig. Mutate to customize. */ ISet createISet(String name, Closure customizer = null) { - def hzName = hzName(name) - if (customizer) hzConfig.getSetConfig(hzName).with(customizer) - addResource(name, hzInstance.getSet(hzName)) + addResource(name, configuredISet(hzName(name), customizer)) } /** @@ -156,25 +152,9 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea * @param customizer - closure receiving a Hazelcast ReplicatedMapConfig. Mutate to customize. */ ReplicatedMap createReplicatedMap(String name, Closure customizer = null) { - def hzName = hzName(name) - if (customizer) hzConfig.getReplicatedMapConfig(hzName).with(customizer) - addResource(name, hzInstance.getReplicatedMap(hzName)) + addResource(name, configuredReplicatedMap(hzName(name), customizer)) } - /** - * Create and return a reference to a Hazelcast Topic - * - * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects - * associated with this service. - * @param customizer - closure receiving a Hazelcast ReplicatedMapConfig. Mutate to customize. - * - * Note: To get a reference to an existing or default topic. use {@link #getTopic}. - */ - ITopic createTopic(String name, Closure customizer = null) { - def hzName = hzName(name) - if (customizer) hzConfig.getTopicConfig(hzName).with(customizer) - addResource(name, hzInstance.getTopic(hzName)) - } /** * Get a reference to an existing or default Hazelcast topic. @@ -422,8 +402,4 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea resources[name] = resource return resource } - - private Config getHzConfig() { - hzInstance.config - } } diff --git a/src/main/groovy/io/xh/hoist/cache/Cache.groovy b/src/main/groovy/io/xh/hoist/cache/Cache.groovy index 2c29134e..82096ecf 100644 --- a/src/main/groovy/io/xh/hoist/cache/Cache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/Cache.groovy @@ -57,9 +57,9 @@ class Cache extends BaseCache { ) { super(name, svc, expireTime, expireFn, timestampFn, replicate) this.serializeOldValue = serializeOldValue - _map = useCluster ? hzInstance.getMap(svc.hzName("xh_$name")) : new ConcurrentHashMap() + _map = useCluster ? hzInstance.getMap(svc.hzName(name)) : new ConcurrentHashMap() cullTimer = svc.createTimer( - name: "xh_${name}_cullEntries", + name: "${name}_cullEntries", runFn: this.&cullEntries, interval: 15 * MINUTES, delay: true, diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy index 45da2a92..5234e233 100644 --- a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy @@ -6,10 +6,10 @@ import com.hazelcast.topic.ReliableMessageListener import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.BaseService -import io.xh.hoist.cluster.ClusterService import java.util.concurrent.TimeoutException +import static io.xh.hoist.cluster.ClusterService.configuredReliableTopic import static io.xh.hoist.util.DateTimeUtils.SECONDS import static io.xh.hoist.util.DateTimeUtils.asEpochMilli import static io.xh.hoist.util.DateTimeUtils.intervalElapsed @@ -141,15 +141,13 @@ class CachedValue extends BaseCache { } private ITopic> createUpdateTopic() { - // 0) Create a durable topic with room for just a single item - def hzInstance = ClusterService.hzInstance, - hzConfig = hzInstance.config, - hzName = svc.hzName("xh_$name") - hzConfig.getRingbufferConfig(hzName).with { capacity = 1 } - hzConfig.getReliableTopicConfig(hzName).with { readBatchSize = 1 } - - // 1) ...rnd register for all events, including replay of event before this instance existed. - def ret = hzInstance.getReliableTopic(hzName) + // Create a durable topic with room for just a single item + // and register for all events, including replay of event before this instance existed. + def ret = configuredReliableTopic( + svc.hzName(name), + { readBatchSize = 1 }, + { capacity = 1 } + ) ret.addMessageListener( new ReliableMessageListener>() { void onMessage(Message> message) { diff --git a/src/main/groovy/io/xh/hoist/util/Timer.groovy b/src/main/groovy/io/xh/hoist/util/Timer.groovy index eda05508..b15b706d 100644 --- a/src/main/groovy/io/xh/hoist/util/Timer.groovy +++ b/src/main/groovy/io/xh/hoist/util/Timer.groovy @@ -7,10 +7,10 @@ package io.xh.hoist.util +import com.hazelcast.replicatedmap.ReplicatedMap import groovy.transform.NamedParam import groovy.transform.NamedVariant -import io.xh.hoist.BaseService -import io.xh.hoist.cache.CachedValue +import io.xh.hoist.cluster.ClusterService import io.xh.hoist.log.LogSupport import java.util.concurrent.ExecutionException @@ -21,9 +21,12 @@ import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException +import static io.xh.hoist.cluster.ClusterService.multiInstanceEnabled import static io.xh.hoist.util.DateTimeUtils.* import static io.xh.hoist.util.Utils.configService import static io.xh.hoist.util.Utils.getExceptionHandler +import static java.lang.Math.max +import static java.lang.System.currentTimeMillis /** * Hoist's implementation of an interval-based Timer, for running tasks on a repeated interval. @@ -42,9 +45,9 @@ import static io.xh.hoist.util.Utils.getExceptionHandler */ class Timer { - private static Long CONFIG_INTERVAL = 15 * SECONDS + private static Long CONFIG_INTERVAL = 15 * SECONDS - /** Unique name for this timer, required for cluster aware timers (see `primaryOnly`) **/ + /** Unique name for this timer **/ final String name /** Object using this timer **/ @@ -88,18 +91,17 @@ class Timer { /** * Only run job when clustered instance is the primary instance? Default is false. - * For timers owned by instances of BaseService only. */ final boolean primaryOnly /** Date last run started. */ - Date getLastRunStarted() { + Long getLastRunStarted() { _lastRunStarted } /** Date last run completed. */ - Date getLastRunCompleted() { + Long getLastRunCompleted() { _lastRunCompleted } @@ -115,22 +117,21 @@ class Timer { private Long timeoutMs private Long coreIntervalMs - - private Date _lastRunCompleted = null - private Date _lastRunStarted = null + private Long _lastRunCompleted = null + private Long _lastRunStarted = null private Map _lastRunStats = null private boolean _isRunning = false - private boolean forceRun = false + private boolean forceRun = false private java.util.Timer coreTimer private java.util.Timer configTimer + private String uuid = UUID.randomUUID() - - private CachedValue _lastCompletedOnCluster - + private static ReplicatedMap lastCompletedOnCluster // Args from Grails 3.0 async promise implementation static ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()) + /** * Applications should not typically use this constructor directly. Timers are typically * created by services using the createTimer() method supplied by io.xh.hoist.BaseService. @@ -159,19 +160,15 @@ class Timer { this.intervalUnits = intervalUnits this.timeoutUnits = timeoutUnits - if (primaryOnly) { - if (!owner instanceof BaseService) { - throw new IllegalArgumentException("A 'primaryOnly' timer must be owned by an instance of BaseService.") - } - - _lastCompletedOnCluster = (owner as BaseService).createCachedValue(name: "xh_${name}_lastCompleted") - } - intervalMs = calcIntervalMs() timeoutMs = calcTimeoutMs() delayMs = calcDelayMs() coreIntervalMs = calcCoreIntervalMs() + if (useCluster && lastCompletedOnCluster == null) { + lastCompletedOnCluster = ClusterService.configuredReplicatedMap('xhTimersLastCompleted') + } + if (runImmediatelyAndBlock) { doRun() } @@ -184,7 +181,7 @@ class Timer { if (interval instanceof Closure || timeout instanceof Closure) { configTimer = new java.util.Timer() configTimer.schedule( - (this.&onConfigTimer as TimerTask), CONFIG_INTERVAL, CONFIG_INTERVAL + (this.&onConfigTimer as TimerTask), CONFIG_INTERVAL, CONFIG_INTERVAL ) } } @@ -212,13 +209,13 @@ class Timer { /** Information about this timer, accessible via the Hoist Admin Console. */ Map getAdminStats() { [ - name: name, - type: 'Timer' + (primaryOnly ? ' (primary only)': ''), + name : name, + type : 'Timer' + (primaryOnly ? ' (primary only)' : ''), intervalMs: intervalMs, - isRunning: isRunning, - startTime: isRunning ? _lastRunStarted: null, - last: _lastRunStats - ].findAll {it.value != null} + isRunning : isRunning, + startTime : isRunning ? _lastRunStarted : null, + last : _lastRunStats + ].findAll { it.value != null } } @@ -229,7 +226,7 @@ class Timer { if (primaryOnly && !Utils.clusterService.isPrimary) return _isRunning = true - _lastRunStarted = new Date() + _lastRunStarted = currentTimeMillis() Throwable throwable = null Future future = null try { @@ -249,13 +246,12 @@ class Timer { throwable = t } - _lastRunCompleted = new Date() - _lastCompletedOnCluster?.set(_lastRunCompleted) + setLastCompletedInternal(currentTimeMillis()) _isRunning = false _lastRunStats = [ startTime: _lastRunStarted, - endTime: _lastRunCompleted, - elapsedMs: _lastRunCompleted.time - _lastRunStarted.time + endTime : _lastRunCompleted, + elapsedMs: _lastRunCompleted - _lastRunStarted ] if (throwable) { try { @@ -329,9 +325,7 @@ class Timer { } private boolean intervalHasElapsed() { - if (intervalMs <= 0) return false - def lastRun = _lastCompletedOnCluster ? _lastCompletedOnCluster.get() : _lastRunCompleted - return intervalElapsed(intervalMs, lastRun) + intervalMs > 0 ? intervalElapsed(intervalMs, lastCompletedInternal) : false } private Long calcCoreIntervalMs() { @@ -347,4 +341,19 @@ class Timer { coreIntervalMs = newCoreIntervalMs } } + + private Long getLastCompletedInternal() { + def local = _lastRunCompleted, + onCluster = useCluster ? lastCompletedOnCluster[uuid] : null + return local && onCluster ? max(local, onCluster) : (onCluster ?: local) + } + + private setLastCompletedInternal(Long timestamp) { + if (useCluster) lastCompletedOnCluster.put(uuid, timestamp) + _lastRunCompleted = timestamp + } + + private boolean getUseCluster() { + multiInstanceEnabled && primaryOnly + } } From 4bc2a31a696ffee97f1ec2953dca3c0557b2d9ad Mon Sep 17 00:00:00 2001 From: lbwexler Date: Tue, 24 Sep 2024 01:34:02 -0400 Subject: [PATCH 05/12] Checkpoint -- harden + improve logging, --- .../init/io/xh/hoist/ClusterConfig.groovy | 3 ++ .../io/xh/hoist/cache/CachedValue.groovy | 38 +++++++++++-------- .../io/xh/hoist/cache/CachedValueEntry.groovy | 5 +++ 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/grails-app/init/io/xh/hoist/ClusterConfig.groovy b/grails-app/init/io/xh/hoist/ClusterConfig.groovy index 034b2215..a092c311 100755 --- a/grails-app/init/io/xh/hoist/ClusterConfig.groovy +++ b/grails-app/init/io/xh/hoist/ClusterConfig.groovy @@ -140,6 +140,9 @@ class ClusterConfig { config.getTopicConfig('default').with { statisticsEnabled = true } + config.getReliableTopicConfig('default').with { + statisticsEnabled = true + } config.getSetConfig('default').with { statisticsEnabled = true } diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy index 5234e233..a2fefa5c 100644 --- a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy @@ -1,5 +1,6 @@ package io.xh.hoist.cache +import com.hazelcast.config.InMemoryFormat import com.hazelcast.topic.ITopic import com.hazelcast.topic.Message import com.hazelcast.topic.ReliableMessageListener @@ -9,6 +10,7 @@ import io.xh.hoist.BaseService import java.util.concurrent.TimeoutException +import static grails.async.Promises.task import static io.xh.hoist.cluster.ClusterService.configuredReliableTopic import static io.xh.hoist.util.DateTimeUtils.SECONDS import static io.xh.hoist.util.DateTimeUtils.asEpochMilli @@ -36,9 +38,7 @@ class CachedValue extends BaseCache { @NamedParam Closure onChange = null ) { super(name, svc, expireTime, expireFn, timestampFn, replicate) - if (useCluster) { - topic = createUpdateTopic() - } + topic = useCluster ? createUpdateTopic() : null if (onChange) { addChangeHandler(onChange) } @@ -65,9 +65,7 @@ class CachedValue extends BaseCache { /** Set the value. */ void set(V value) { - if (value == entry?.value) return - setInternal(new CachedValueEntry(value, loggerName)) - topic?.publish(entry) + setInternal(new CachedValueEntry(value, loggerName), true) } /** Clear the value. */ @@ -111,11 +109,17 @@ class CachedValue extends BaseCache { //------------------- // Implementation //------------------- - void setInternal(CachedValueEntry newEntry) { + synchronized void setInternal(CachedValueEntry newEntry, boolean publish) { def oldEntry = entry + if (oldEntry?.value === newEntry.value || oldEntry?.uuid == newEntry.uuid) return entry = newEntry - def change = new CachedValueChanged(this, oldEntry?.value, newEntry?.value) - onChange.each { it.call(change) } + if (publish) topic?.publish(newEntry) + if (onChange) { + task { + def change = new CachedValueChanged(this, oldEntry?.value, newEntry.value) + onChange.each { it.call(change) } + } + } } boolean asBoolean() { @@ -145,16 +149,20 @@ class CachedValue extends BaseCache { // and register for all events, including replay of event before this instance existed. def ret = configuredReliableTopic( svc.hzName(name), - { readBatchSize = 1 }, - { capacity = 1 } + {readBatchSize = 1}, + { + capacity = 1 + inMemoryFormat = InMemoryFormat.OBJECT + } ) ret.addMessageListener( new ReliableMessageListener>() { void onMessage(Message> message) { - def publisher = message.publishingMember - svc.logDebug('Received update', [source: publisher.getAttribute('instanceName')]) - if (publisher.localMember()) return - setInternal((CachedValueEntry) message.messageObject) + logDebug('Received update from topic', [ + uuid: message.messageObject.uuid, + source : message.publishingMember.getAttribute('instanceName'), + ]) + setInternal(message.messageObject, false) } long retrieveInitialSequence() { return 0 } diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy b/src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy index c09764b0..89b88ffd 100644 --- a/src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy @@ -20,12 +20,15 @@ import static java.lang.System.currentTimeMillis class CachedValueEntry implements KryoSerializable, LogSupport { Long dateEntered String loggerName + String uuid T value CachedValueEntry(T value, String loggerName) { this.dateEntered = currentTimeMillis() this.loggerName = loggerName + this.uuid = uuid this.value = value + this.uuid = UUID.randomUUID() } CachedValueEntry() {} @@ -33,6 +36,7 @@ class CachedValueEntry implements KryoSerializable, LogSupport { void write(Kryo kryo, Output output) { output.writeLong(dateEntered) output.writeString(loggerName) + output.writeString(uuid) withSingleTrace('Serializing value') { kryo.writeClassAndObject(output, value) } @@ -41,6 +45,7 @@ class CachedValueEntry implements KryoSerializable, LogSupport { void read(Kryo kryo, Input input) { dateEntered = input.readLong() loggerName = input.readString() + uuid = input.readString() withSingleTrace('Deserializing value') { value = (T) kryo.readClassAndObject(input) } From fe24d2447a15b40e12b208426d24364ccff04187 Mon Sep 17 00:00:00 2001 From: lbwexler Date: Tue, 24 Sep 2024 18:47:44 -0400 Subject: [PATCH 06/12] Comments from CR: - Less thrashing in getOrCreate() - Don't use hidden delegates in customize closures - Be sure to apply same value with updated timestamp -- just avoid thrashing event handlers. - Use Square Brackets for consistent naming of new loggers --- CHANGELOG.md | 8 ++ build.gradle | 4 - .../alertbanner/AlertBannerService.groovy | 2 +- .../clienterror/ClientErrorService.groovy | 2 +- .../io/xh/hoist/cluster/ClusterService.groovy | 12 +-- .../io/xh/hoist/monitor/MonitorService.groovy | 2 +- .../groovy/io/xh/hoist/BaseService.groovy | 5 +- .../groovy/io/xh/hoist/cache/BaseCache.groovy | 89 --------------- .../groovy/io/xh/hoist/cache/Cache.groovy | 84 ++++++++++++--- .../xh/hoist/cache/CacheEntryChanged.groovy | 2 +- .../xh/hoist/cache/CacheEntryListener.groovy | 4 +- .../{cache => cachedvalue}/CachedValue.groovy | 101 ++++++++++++++---- .../CachedValueChanged.groovy | 2 +- .../CachedValueEntry.groovy | 3 +- .../role/provided/DefaultRoleService.groovy | 2 +- src/main/groovy/io/xh/hoist/util/Timer.groovy | 2 +- 16 files changed, 172 insertions(+), 152 deletions(-) delete mode 100644 src/main/groovy/io/xh/hoist/cache/BaseCache.groovy rename src/main/groovy/io/xh/hoist/{cache => cachedvalue}/CachedValue.groovy (65%) rename src/main/groovy/io/xh/hoist/{cache => cachedvalue}/CachedValueChanged.groovy (93%) rename src/main/groovy/io/xh/hoist/{cache => cachedvalue}/CachedValueEntry.groovy (97%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84055edf..03c48795 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## 23.0-SNAPSHOT - unreleased +### 💥 Breaking Changes (upgrade difficulty: 🟢 LOW) + +* Improvements to the efficiency of `CachedValue` for sharing of large objects. This included + moving to its own package `io.xh.hoist.cachedvalue` for clarity. +* New dynamic configuration for all distributed hazelcast objects. See `ClusterService.configureXXX`. + This replaces the static map `BaseService.clusterConfigs`. +* Misc. improvements to logging and performance of `Cache` and `Timer`. + ## 22.0.0 - 2024-09-18 ### 💥 Breaking Changes (upgrade difficulty: 🟢 LOW) diff --git a/build.gradle b/build.gradle index 6e87944b..36b57dcd 100644 --- a/build.gradle +++ b/build.gradle @@ -92,10 +92,6 @@ tasks.withType(GroovyCompile) { forkOptions.jvmArgs = ['-Dspring.output.ansi.enabled=always'] } } -tasks.named('jar') { - duplicatesStrategy = 'warn' -} - bootJar.enabled = false tasks.bootRun.doFirst { diff --git a/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy b/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy index c62cd9d1..b9c5551e 100644 --- a/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy +++ b/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy @@ -9,7 +9,7 @@ package io.xh.hoist.alertbanner import groovy.transform.CompileStatic import io.xh.hoist.BaseService -import io.xh.hoist.cache.CachedValue +import io.xh.hoist.cachedvalue.CachedValue import io.xh.hoist.config.ConfigService import io.xh.hoist.jsonblob.JsonBlobService import io.xh.hoist.util.Timer diff --git a/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy b/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy index 833e7d60..691535de 100644 --- a/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy +++ b/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy @@ -42,7 +42,7 @@ class ClientErrorService extends BaseService { void init() { super.init() - errors = createIMap('clientErrors') {evictionConfig.size = 100} + errors = createIMap('clientErrors') {it.evictionConfig.size = 100} createTimer( name: 'processErrors', runFn: this.&processErrors, diff --git a/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy b/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy index 1c61127d..32334dae 100644 --- a/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy +++ b/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy @@ -161,22 +161,22 @@ class ClusterService extends BaseService implements ApplicationListener IMap configuredIMap(String name, Closure customizer = null) { - if (customizer) hzConfig.getMapConfig(name).with(customizer) + customizer?.call(hzConfig.getMapConfig(name)) hzInstance.getMap(name) } static ISet configuredISet(String name, Closure customizer = null) { - if (customizer) hzConfig.getSetConfig(name).with(customizer) + customizer?.call(hzConfig.getSetConfig(name)) hzInstance.getSet(name) } static ReplicatedMap configuredReplicatedMap(String name, Closure customizer = null) { - if (customizer) hzConfig.getReplicatedMapConfig(name).with(customizer) + customizer?.call(hzConfig.getReplicatedMapConfig(name)) hzInstance.getReplicatedMap(name) } static ITopic configuredTopic(String name, Closure customizer = null) { - if (customizer) hzConfig.getTopicConfig(name).with(customizer) + customizer?.call(hzConfig.getTopicConfig(name)) hzInstance.getTopic(name) } @@ -185,8 +185,8 @@ class ClusterService extends BaseService implements ApplicationListener ITopic getTopic(String id) { diff --git a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy b/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy deleted file mode 100644 index 87a64e24..00000000 --- a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy +++ /dev/null @@ -1,89 +0,0 @@ -/* - * This file belongs to Hoist, an application development toolkit - * developed by Extremely Heavy Industries (www.xh.io | info@xh.io) - * - * Copyright © 2023 Extremely Heavy Industries Inc. - */ - -package io.xh.hoist.cache - -import groovy.transform.CompileStatic -import io.xh.hoist.BaseService -import io.xh.hoist.cluster.ClusterService -import io.xh.hoist.log.LogSupport -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -@CompileStatic -abstract class BaseCache implements LogSupport { - - /** Service using this object. */ - public final BaseService svc - - /** Unique name in the context of the service associated with this object. */ - public final String name - - /** Closure to determine if an entry should be expired (optional). */ - public final Closure expireFn - - /** - * Entry TTL as epochMillis Long, or closure to return the same (optional). - * No effect if a custom expireFn is provided instead. If both null, entries will never expire. - */ - public final Object expireTime - - /** - * Closure to determine the timestamp of an entry (optional). - * Must return a Long (as epochMillis), Date, or Instant. - */ - public final Closure timestampFn - - /** True to replicate this cache across a cluster (default false). */ - public final boolean replicate - - /** Handlers to be called on change with a {@link CacheEntryChanged} or a {@link CachedValueChanged} object. */ - public final List onChange = [] - - /** Log to use for logging. Set by default to an extension of the owning service. */ - protected final String loggerName - - BaseCache( - String name, - BaseService svc, - Object expireTime, - Closure expireFn, - Closure timestampFn, - boolean replicate - ) { - this.name = name - this.svc = svc - this.expireTime = expireTime - this.expireFn = expireFn - this.timestampFn = timestampFn - this.replicate = replicate - - // Allow fine grain logging for this within namespace of owning service - loggerName = "${svc.instanceLog.name}.${this.class.simpleName}(${name})" - } - - /** @param handler called on change with a {@link CacheEntryChanged} or a {@link CachedValueChanged} object. */ - abstract void addChangeHandler(Closure handler) - - /** Clear all values. */ - abstract void clear() - - /** True if this Cache should be stored across the cluster (backed by a ReplicatedMap). */ - boolean getUseCluster() { - return replicate && ClusterService.multiInstanceEnabled - } - - /** Information about this object, accessible via the Hoist Admin Console. */ - abstract Map getAdminStats() - - //------------------------ - // Implementation - //------------------------ - Logger getInstanceLog() { - LoggerFactory.getLogger(loggerName) - } -} diff --git a/src/main/groovy/io/xh/hoist/cache/Cache.groovy b/src/main/groovy/io/xh/hoist/cache/Cache.groovy index 82096ecf..0e5163c9 100644 --- a/src/main/groovy/io/xh/hoist/cache/Cache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/Cache.groovy @@ -11,7 +11,11 @@ import groovy.transform.CompileStatic import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.BaseService +import io.xh.hoist.cluster.ClusterService +import io.xh.hoist.log.LogSupport import io.xh.hoist.util.Timer +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeoutException @@ -27,11 +31,34 @@ import static java.lang.System.currentTimeMillis * A key-value Cache, with support for optional entry TTL and replication across a cluster. */ @CompileStatic -class Cache extends BaseCache { +class Cache implements LogSupport { - private final Map> _map - private final Timer cullTimer + /** Service using this object. */ + public final BaseService svc + + /** Unique name in the context of the service associated with this object. */ + public final String name + + /** Closure to determine if an entry should be expired (optional). */ + public final Closure expireFn + + /** + * Entry TTL as epochMillis Long, or closure to return the same (optional). + * No effect if a custom expireFn is provided instead. If both null, entries will never expire. + */ + public final Object expireTime + + /** + * Closure to determine the timestamp of an entry (optional). + * Must return a Long (as epochMillis), Date, or Instant. + */ + public final Closure timestampFn + /** True to replicate this cache across a cluster (default false). */ + public final boolean replicate + + /** Handlers to be called on change with a {@link CacheEntryChanged} */ + public final List onChange = [] /** * True to serialize old values to replicas in `CacheEntryChanged` events (default false). @@ -43,6 +70,11 @@ class Cache extends BaseCache { public final boolean serializeOldValue + private final String loggerName + private final Map> _map + private final Timer cullTimer + + /** @internal - do not construct directly - use {@link BaseService#createCache}. */ @NamedVariant Cache( @@ -55,8 +87,17 @@ class Cache extends BaseCache { @NamedParam Boolean serializeOldValue = false, @NamedParam Closure onChange = null ) { - super(name, svc, expireTime, expireFn, timestampFn, replicate) + this.name = name + this.svc = svc + this.expireTime = expireTime + this.expireFn = expireFn + this.timestampFn = timestampFn + this.replicate = replicate this.serializeOldValue = serializeOldValue + + // Allow fine grain logging for this within namespace of owning service + loggerName = "${svc.instanceLog.name}.Cache[${name}]" + _map = useCluster ? hzInstance.getMap(svc.hzName(name)) : new ConcurrentHashMap() cullTimer = svc.createTimer( name: "${name}_cullEntries", @@ -85,6 +126,17 @@ class Cache extends BaseCache { return ret } + /** @returns cached value for key, or lazily creates if needed. */ + V getOrCreate(K key, Closure c) { + CacheEntry entry = _map[key] + if (!entry || shouldExpire(entry)) { + def val = c(key) + put(key, val) + return val + } + return entry.value + } + /** Remove the value at key. */ void remove(K key) { put(key, null) @@ -102,15 +154,6 @@ class Cache extends BaseCache { if (!useCluster) fireOnChange(this, oldEntry?.value, obj) } - /** @returns cached value for key, or lazily creates if needed. */ - V getOrCreate(K key, Closure c) { - V ret = get(key) - if (ret == null) { - ret = c(key) - put(key, ret) - } - return ret - } /** @returns a Map representation of currently cached data. */ Map getMap() { @@ -184,10 +227,18 @@ class Cache extends BaseCache { ] } + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) + } + boolean asBoolean() { return size() > 0 } + private boolean getUseCluster() { + return replicate && ClusterService.multiInstanceEnabled + } + private void cullEntries() { Set cullKeys = new HashSet<>() def oldSize = size() @@ -203,7 +254,7 @@ class Cache extends BaseCache { } } - protected boolean shouldExpire(CacheEntry entry) { + private boolean shouldExpire(CacheEntry entry) { if (expireFn) return expireFn.call(entry) if (expireTime) { @@ -214,13 +265,14 @@ class Cache extends BaseCache { return false } - protected Long getEntryTimestamp(CacheEntry entry) { + private Long getEntryTimestamp(CacheEntry entry) { if (!entry) return null if (timestampFn) return asEpochMilli(timestampFn.call(entry.value)) return entry.dateEntered } - protected void fireOnChange(Object key, V oldValue, V value) { + private void fireOnChange(Object key, V oldValue, V value) { + if (oldValue === value) return def change = new CacheEntryChanged(this, key, oldValue, value) onChange.each { it.call(change) } } diff --git a/src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy b/src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy index eba7c926..9b199f5f 100644 --- a/src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy @@ -37,7 +37,7 @@ class CacheEntryChanged { */ V getOldValue() { if (!source.serializeOldValue) { - source.svc.logWarn('Accessing the old value for a cache with serializeOldValue=false') + source.logWarn('Accessing the old value for a cache with serializeOldValue=false') return null } return this._oldValue diff --git a/src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy b/src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy index f4b8ce12..8f755d6b 100644 --- a/src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy @@ -7,9 +7,9 @@ import com.hazelcast.map.MapEvent /** @internal */ class CacheEntryListener implements EntryListener { - private BaseCache target + private Cache target - CacheEntryListener(BaseCache target) { + CacheEntryListener(Cache target) { this.target = target } diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy similarity index 65% rename from src/main/groovy/io/xh/hoist/cache/CachedValue.groovy rename to src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy index a2fefa5c..a626b8d8 100644 --- a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy @@ -1,4 +1,4 @@ -package io.xh.hoist.cache +package io.xh.hoist.cachedvalue import com.hazelcast.config.InMemoryFormat import com.hazelcast.topic.ITopic @@ -7,24 +7,58 @@ import com.hazelcast.topic.ReliableMessageListener import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.BaseService +import io.xh.hoist.cluster.ClusterService +import io.xh.hoist.log.LogSupport +import io.xh.hoist.util.DateTimeUtils +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.util.concurrent.TimeoutException import static grails.async.Promises.task import static io.xh.hoist.cluster.ClusterService.configuredReliableTopic -import static io.xh.hoist.util.DateTimeUtils.SECONDS import static io.xh.hoist.util.DateTimeUtils.asEpochMilli import static io.xh.hoist.util.DateTimeUtils.intervalElapsed import static java.lang.System.currentTimeMillis /** - * Similar to {@link Cache}, but a single value that can be read, written, and expired. + * Similar to {@link io.xh.hoist.cache.Cache}, but a single value that can be read, written, and expired. * Like Cache, this object supports replication across the cluster. */ -class CachedValue extends BaseCache { +class CachedValue implements LogSupport { + /** Service using this object. */ + public final BaseService svc + + /** Unique name in the context of the service associated with this object. */ + public final String name + + /** Closure to determine if an entry should be expired (optional). */ + public final Closure expireFn + + /** + * Entry TTL as epochMillis Long, or closure to return the same (optional). + * No effect if a custom expireFn is provided instead. If both null, entries will never expire. + */ + public final Object expireTime + + /** + * Closure to determine the timestamp of an entry (optional). + * Must return a Long (as epochMillis), Date, or Instant. + */ + public final Closure timestampFn + + /** True to replicate this cache across a cluster (default false). */ + public final boolean replicate + + /** Handlers to be called on change with a {@link CachedValueChanged} object. */ + public final List onChange = [] + + + private final String loggerName private final ITopic> topic - private CachedValueEntry entry + private CachedValueEntry entry = new CachedValueEntry(null, loggerName) + /** @internal - do not construct directly - use {@link BaseService#createCachedValue}. */ @NamedVariant @@ -37,7 +71,17 @@ class CachedValue extends BaseCache { @NamedParam Boolean replicate = false, @NamedParam Closure onChange = null ) { - super(name, svc, expireTime, expireFn, timestampFn, replicate) + + this.name = name + this.svc = svc + this.expireTime = expireTime + this.expireFn = expireFn + this.timestampFn = timestampFn + this.replicate = replicate + + // Allow fine grain logging for this within namespace of owning service + loggerName = "${svc.instanceLog.name}.CachedValue[${name}]" + topic = useCluster ? createUpdateTopic() : null if (onChange) { addChangeHandler(onChange) @@ -50,13 +94,12 @@ class CachedValue extends BaseCache { set(null) return null } - return entry?.value + return entry.value } /** @returns the cached value, or calls the provided closure to create, cache, and return. */ V getOrCreate(Closure c) { - V ret = get() - if (ret == null) { + if (entry.value == null || shouldExpire(entry)) { ret = c() set(ret) } @@ -86,8 +129,8 @@ class CachedValue extends BaseCache { */ @NamedVariant void ensureAvailable( - @NamedParam Long timeout = 30 * SECONDS, - @NamedParam Long interval = 1 * SECONDS, + @NamedParam Long timeout = 30 * DateTimeUtils.SECONDS, + @NamedParam Long interval = 1 * DateTimeUtils.SECONDS, @NamedParam String timeoutMessage = null ) { if (get() != null) return @@ -101,7 +144,7 @@ class CachedValue extends BaseCache { throw new TimeoutException(msg) } } - + /** @param handler called on change with a {@link CachedValueChanged} object. */ void addChangeHandler(Closure handler) { onChange << handler } @@ -110,13 +153,17 @@ class CachedValue extends BaseCache { // Implementation //------------------- synchronized void setInternal(CachedValueEntry newEntry, boolean publish) { + if (newEntry.uuid == entry.uuid) return + + // Make the swap and put on topic. def oldEntry = entry - if (oldEntry?.value === newEntry.value || oldEntry?.uuid == newEntry.uuid) return entry = newEntry - if (publish) topic?.publish(newEntry) - if (onChange) { + if (publish && topic) topic.publish(newEntry) + + // Fire event handlers + if (onChange && oldEntry.value !== newEntry.value) { task { - def change = new CachedValueChanged(this, oldEntry?.value, newEntry.value) + def change = new CachedValueChanged(this, oldEntry.value, newEntry.value) onChange.each { it.call(change) } } } @@ -127,7 +174,7 @@ class CachedValue extends BaseCache { } private boolean shouldExpire(CachedValueEntry entry) { - if (!entry) return null + if (entry.value == null) return false if (expireFn) return expireFn(entry) if (expireTime) { @@ -139,9 +186,7 @@ class CachedValue extends BaseCache { } private Long getEntryTimestamp(CachedValueEntry entry) { - if (!entry) return null - if (timestampFn) return asEpochMilli(timestampFn(entry.value)) - return entry.dateEntered + return timestampFn ? asEpochMilli(timestampFn(entry.value)) : entry.dateEntered } private ITopic> createUpdateTopic() { @@ -149,10 +194,10 @@ class CachedValue extends BaseCache { // and register for all events, including replay of event before this instance existed. def ret = configuredReliableTopic( svc.hzName(name), - {readBatchSize = 1}, + {it.readBatchSize = 1}, { - capacity = 1 - inMemoryFormat = InMemoryFormat.OBJECT + it.capacity = 1 + it.inMemoryFormat = InMemoryFormat.OBJECT } ) ret.addMessageListener( @@ -180,6 +225,15 @@ class CachedValue extends BaseCache { return ret } + private boolean getUseCluster() { + return replicate && ClusterService.multiInstanceEnabled + } + + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) + } + + Map getAdminStats() { def val = get(), ret = [ @@ -192,4 +246,5 @@ class CachedValue extends BaseCache { } return ret } + } diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValueChanged.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueChanged.groovy similarity index 93% rename from src/main/groovy/io/xh/hoist/cache/CachedValueChanged.groovy rename to src/main/groovy/io/xh/hoist/cachedvalue/CachedValueChanged.groovy index 2650c58a..19f6f65c 100644 --- a/src/main/groovy/io/xh/hoist/cache/CachedValueChanged.groovy +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueChanged.groovy @@ -1,4 +1,4 @@ -package io.xh.hoist.cache +package io.xh.hoist.cachedvalue class CachedValueChanged { diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueEntry.groovy similarity index 97% rename from src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy rename to src/main/groovy/io/xh/hoist/cachedvalue/CachedValueEntry.groovy index 89b88ffd..b14fd29b 100644 --- a/src/main/groovy/io/xh/hoist/cache/CachedValueEntry.groovy +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueEntry.groovy @@ -5,7 +5,7 @@ * Copyright © 2023 Extremely Heavy Industries Inc. */ -package io.xh.hoist.cache +package io.xh.hoist.cachedvalue import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.KryoSerializable @@ -26,7 +26,6 @@ class CachedValueEntry implements KryoSerializable, LogSupport { CachedValueEntry(T value, String loggerName) { this.dateEntered = currentTimeMillis() this.loggerName = loggerName - this.uuid = uuid this.value = value this.uuid = UUID.randomUUID() } diff --git a/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy b/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy index 372ef51c..0e9b69b3 100644 --- a/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy +++ b/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy @@ -8,7 +8,7 @@ package io.xh.hoist.role.provided import grails.gorm.transactions.ReadOnly -import io.xh.hoist.cache.CachedValue +import io.xh.hoist.cachedvalue.CachedValue import io.xh.hoist.config.ConfigService import io.xh.hoist.ldap.LdapService import io.xh.hoist.role.BaseRoleService diff --git a/src/main/groovy/io/xh/hoist/util/Timer.groovy b/src/main/groovy/io/xh/hoist/util/Timer.groovy index b15b706d..9b8aeb89 100644 --- a/src/main/groovy/io/xh/hoist/util/Timer.groovy +++ b/src/main/groovy/io/xh/hoist/util/Timer.groovy @@ -349,7 +349,7 @@ class Timer { } private setLastCompletedInternal(Long timestamp) { - if (useCluster) lastCompletedOnCluster.put(uuid, timestamp) + if (useCluster) lastCompletedOnCluster.put(uuid, timestamp) _lastRunCompleted = timestamp } From 80eb66588ee16179c62c0c595ceca67d166cdb41 Mon Sep 17 00:00:00 2001 From: lbwexler Date: Tue, 24 Sep 2024 18:56:33 -0400 Subject: [PATCH 07/12] Tweak to logging --- src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy index a626b8d8..5527b6db 100644 --- a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy @@ -203,10 +203,9 @@ class CachedValue implements LogSupport { ret.addMessageListener( new ReliableMessageListener>() { void onMessage(Message> message) { - logDebug('Received update from topic', [ - uuid: message.messageObject.uuid, - source : message.publishingMember.getAttribute('instanceName'), - ]) + def member = message.publishingMember, + src = member.localMember() ? 'Self' : member.getAttribute('instanceName') + logDebug("Received msg from $src", message.messageObject.uuid) setInternal(message.messageObject, false) } From 854dada109af65af2ba44c0a1a4e41af6701ae16 Mon Sep 17 00:00:00 2001 From: lbwexler Date: Tue, 24 Sep 2024 19:32:23 -0400 Subject: [PATCH 08/12] Naming tweak to match logging --- src/main/groovy/io/xh/hoist/BaseService.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/groovy/io/xh/hoist/BaseService.groovy b/src/main/groovy/io/xh/hoist/BaseService.groovy index af81c85d..e0c1e430 100644 --- a/src/main/groovy/io/xh/hoist/BaseService.groovy +++ b/src/main/groovy/io/xh/hoist/BaseService.groovy @@ -386,7 +386,7 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea * @internal */ String hzName(String name) { - this.class.name + '_' + name + "${this.class.name}[$name]" } //------------------------ From 72af52bec8487f17654d55f6b168d3389d25b22e Mon Sep 17 00:00:00 2001 From: lbwexler Date: Tue, 24 Sep 2024 19:43:53 -0400 Subject: [PATCH 09/12] quiet log message --- src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy index 5527b6db..f7720f08 100644 --- a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy @@ -204,8 +204,8 @@ class CachedValue implements LogSupport { new ReliableMessageListener>() { void onMessage(Message> message) { def member = message.publishingMember, - src = member.localMember() ? 'Self' : member.getAttribute('instanceName') - logDebug("Received msg from $src", message.messageObject.uuid) + src = member.localMember() ? '[self]' : member.getAttribute('instanceName') + logTrace("Received msg from $src", message.messageObject.uuid) setInternal(message.messageObject, false) } From eeb2f92122b25126b6b5d2092a1007527fae8363 Mon Sep 17 00:00:00 2001 From: lbwexler Date: Tue, 24 Sep 2024 20:11:25 -0400 Subject: [PATCH 10/12] Symmetry for Logging with Timer, Cache, and CachedValue --- src/main/groovy/io/xh/hoist/cache/Cache.groovy | 7 ++++--- .../io/xh/hoist/cachedvalue/CachedValue.groovy | 2 +- src/main/groovy/io/xh/hoist/util/Timer.groovy | 13 ++++++++++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/main/groovy/io/xh/hoist/cache/Cache.groovy b/src/main/groovy/io/xh/hoist/cache/Cache.groovy index 0e5163c9..baca19f6 100644 --- a/src/main/groovy/io/xh/hoist/cache/Cache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/Cache.groovy @@ -96,11 +96,12 @@ class Cache implements LogSupport { this.serializeOldValue = serializeOldValue // Allow fine grain logging for this within namespace of owning service - loggerName = "${svc.instanceLog.name}.Cache[${name}]" + loggerName = "${svc.instanceLog.name}.Cache[$name]" _map = useCluster ? hzInstance.getMap(svc.hzName(name)) : new ConcurrentHashMap() - cullTimer = svc.createTimer( - name: "${name}_cullEntries", + cullTimer = new Timer( + name: 'cullEntries', + owner: this, runFn: this.&cullEntries, interval: 15 * MINUTES, delay: true, diff --git a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy index f7720f08..37f19885 100644 --- a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy @@ -80,7 +80,7 @@ class CachedValue implements LogSupport { this.replicate = replicate // Allow fine grain logging for this within namespace of owning service - loggerName = "${svc.instanceLog.name}.CachedValue[${name}]" + loggerName = "${svc.instanceLog.name}.CachedValue[$name]" topic = useCluster ? createUpdateTopic() : null if (onChange) { diff --git a/src/main/groovy/io/xh/hoist/util/Timer.groovy b/src/main/groovy/io/xh/hoist/util/Timer.groovy index 9b8aeb89..fc19ff68 100644 --- a/src/main/groovy/io/xh/hoist/util/Timer.groovy +++ b/src/main/groovy/io/xh/hoist/util/Timer.groovy @@ -12,6 +12,8 @@ import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.cluster.ClusterService import io.xh.hoist.log.LogSupport +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutorService @@ -43,11 +45,11 @@ import static java.lang.System.currentTimeMillis * A common pattern would be to have the primary instance run a Timer-based job to load data into * a cache, with the cache then replicated across the cluster. */ -class Timer { +class Timer implements LogSupport { private static Long CONFIG_INTERVAL = 15 * SECONDS - /** Unique name for this timer **/ + /** Name for this timer. Should be unique within the context of owner for the purpose of logging. **/ final String name /** Object using this timer **/ @@ -94,7 +96,6 @@ class Timer { */ final boolean primaryOnly - /** Date last run started. */ Long getLastRunStarted() { _lastRunStarted @@ -125,6 +126,7 @@ class Timer { private java.util.Timer coreTimer private java.util.Timer configTimer private String uuid = UUID.randomUUID() + private String loggerName private static ReplicatedMap lastCompletedOnCluster @@ -151,6 +153,7 @@ class Timer { ) { this.name = name this.owner = owner + this.loggerName = "${owner.instanceLog.name}.Timer[$name]" this.runFn = runFn this.primaryOnly = primaryOnly this.runImmediatelyAndBlock = runImmediatelyAndBlock @@ -356,4 +359,8 @@ class Timer { private boolean getUseCluster() { multiInstanceEnabled && primaryOnly } + + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) + } } From 0bf692d08bf9576265a334d5fff8b53698e99860 Mon Sep 17 00:00:00 2001 From: lbwexler Date: Tue, 24 Sep 2024 20:24:28 -0400 Subject: [PATCH 11/12] + Fix regression. + Allow more fine-grained logging in timer --- .../groovy/io/xh/hoist/cachedvalue/CachedValue.groovy | 3 ++- src/main/groovy/io/xh/hoist/util/Timer.groovy | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy index 37f19885..d4d76529 100644 --- a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy @@ -99,7 +99,8 @@ class CachedValue implements LogSupport { /** @returns the cached value, or calls the provided closure to create, cache, and return. */ V getOrCreate(Closure c) { - if (entry.value == null || shouldExpire(entry)) { + V ret = entry.value + if (ret == null || shouldExpire(entry)) { ret = c() set(ret) } diff --git a/src/main/groovy/io/xh/hoist/util/Timer.groovy b/src/main/groovy/io/xh/hoist/util/Timer.groovy index fc19ff68..801c4038 100644 --- a/src/main/groovy/io/xh/hoist/util/Timer.groovy +++ b/src/main/groovy/io/xh/hoist/util/Timer.groovy @@ -261,11 +261,11 @@ class Timer implements LogSupport { _lastRunStats.error = exceptionHandler.summaryTextForThrowable(throwable) exceptionHandler.handleException( exception: throwable, - logTo: owner, + logTo: this, logMessage: "Failure in '$name'" ) } catch (Throwable ignore) { - owner.logError('Failed to handle exception in Timer') + logError('Failed to handle exception in Timer') } } } @@ -280,7 +280,7 @@ class Timer implements LogSupport { if (interval == null) return null Long ret = (interval instanceof Closure ? (interval as Closure)() : interval) * intervalUnits; if (ret > 0 && ret < 500) { - owner.logWarn('Timer cannot be set for values less than 500ms.') + logWarn('Timer cannot be set for values less than 500ms.') ret = 500 } return ret @@ -303,7 +303,7 @@ class Timer implements LogSupport { timeoutMs = calcTimeoutMs() adjustCoreTimerIfNeeded() } catch (Throwable t) { - owner.logError('Timer failed to reload config', t) + logError('Timer failed to reload config', t) } } From cda8e49a54b98e7a1873a20d476fa3d4fd4d6d24 Mon Sep 17 00:00:00 2001 From: lbwexler Date: Wed, 25 Sep 2024 11:20:40 -0400 Subject: [PATCH 12/12] + Fix regression. * Use correct logger for cache --- src/main/groovy/io/xh/hoist/cache/Cache.groovy | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/groovy/io/xh/hoist/cache/Cache.groovy b/src/main/groovy/io/xh/hoist/cache/Cache.groovy index baca19f6..6a3af631 100644 --- a/src/main/groovy/io/xh/hoist/cache/Cache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/Cache.groovy @@ -98,7 +98,7 @@ class Cache implements LogSupport { // Allow fine grain logging for this within namespace of owning service loggerName = "${svc.instanceLog.name}.Cache[$name]" - _map = useCluster ? hzInstance.getMap(svc.hzName(name)) : new ConcurrentHashMap() + _map = useCluster ? hzInstance.getReplicatedMap(svc.hzName(name)) : new ConcurrentHashMap() cullTimer = new Timer( name: 'cullEntries', owner: this, @@ -205,7 +205,7 @@ class Cache implements LogSupport { ) { if (getEntry(key)) return - svc.withDebug("Waiting for cache entry value at '$key'") { + withDebug("Waiting for cache entry value at '$key'") { for (def startTime = currentTimeMillis(); !intervalElapsed(timeout, startTime); sleep(interval)) { if (getEntry(key)) return; } @@ -251,7 +251,7 @@ class Cache implements LogSupport { } if (cullKeys) { - svc.logDebug("Cache '$name' culled ${cullKeys.size()} out of $oldSize entries") + logDebug("Cache '$name' culled ${cullKeys.size()} out of $oldSize entries") } }