diff --git a/CHANGELOG.md b/CHANGELOG.md index 84055edf..03c48795 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## 23.0-SNAPSHOT - unreleased +### 💥 Breaking Changes (upgrade difficulty: 🟢 LOW) + +* Improvements to the efficiency of `CachedValue` for sharing of large objects. This included + moving to its own package `io.xh.hoist.cachedvalue` for clarity. +* New dynamic configuration for all distributed hazelcast objects. See `ClusterService.configureXXX`. + This replaces the static map `BaseService.clusterConfigs`. +* Misc. improvements to logging and performance of `Cache` and `Timer`. + ## 22.0.0 - 2024-09-18 ### 💥 Breaking Changes (upgrade difficulty: 🟢 LOW) diff --git a/grails-app/init/io/xh/hoist/ClusterConfig.groovy b/grails-app/init/io/xh/hoist/ClusterConfig.groovy index 6b3f5e04..a092c311 100755 --- a/grails-app/init/io/xh/hoist/ClusterConfig.groovy +++ b/grails-app/init/io/xh/hoist/ClusterConfig.groovy @@ -99,7 +99,6 @@ class ClusterConfig { createDefaultConfigs(ret) createHibernateConfigs(ret) - createServiceConfigs(ret) KryoSupport.setAsGlobalSerializer(ret) @@ -124,12 +123,7 @@ class ClusterConfig { * Note that Hoist also introduces two properties for declarative configuration: * * - a static 'cache' property on Grails domain objects to customize associated - * Hibernate caches. - * - a static 'clusterConfigs' property on Grails services to customize any Hazelcast - * Distributed Objects associated with the service e.g. Hoist caches - * - * See toolbox's `Phase` object and Hoist Core's `ClientErrorService` for examples of these - * customizations. + * Hibernate caches. See toolbox's `Phase` object for examples. */ protected void createDefaultConfigs(Config config) { config.getMapConfig('default').with { @@ -146,6 +140,9 @@ class ClusterConfig { config.getTopicConfig('default').with { statisticsEnabled = true } + config.getReliableTopicConfig('default').with { + statisticsEnabled = true + } config.getSetConfig('default').with { statisticsEnabled = true } @@ -194,35 +191,4 @@ class ClusterConfig { } } } - - private void createServiceConfigs(Config config) { - grailsApplication.serviceClasses.each { GrailsClass gc -> - // Apply any app customization specified by new static prop introduced by Hoist - Map objs = gc.getPropertyValue('clusterConfigs') - if (!objs) return - objs.forEach {String key, List value -> - def customizer = value[1] as Closure, - objConfig - switch (value[0]) { - case IMap: - objConfig = config.getMapConfig(gc.fullName + '_' + key) - break - case ReplicatedMap: - objConfig = config.getReplicatedMapConfig(gc.fullName + '_' + key) - break - case ISet: - objConfig = config.getSetConfig(gc.fullName + '_' + key) - break - case ITopic: - objConfig = config.getTopicConfig(key) - break - default: - throw new RuntimeException('Unable to configure Cluster object') - } - customizer.delegate = objConfig - customizer.resolveStrategy = Closure.DELEGATE_FIRST - customizer(objConfig) - } - } - } } \ No newline at end of file diff --git a/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy b/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy index c62cd9d1..b9c5551e 100644 --- a/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy +++ b/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy @@ -9,7 +9,7 @@ package io.xh.hoist.alertbanner import groovy.transform.CompileStatic import io.xh.hoist.BaseService -import io.xh.hoist.cache.CachedValue +import io.xh.hoist.cachedvalue.CachedValue import io.xh.hoist.config.ConfigService import io.xh.hoist.jsonblob.JsonBlobService import io.xh.hoist.util.Timer diff --git a/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy b/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy index e5bb9575..691535de 100644 --- a/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy +++ b/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy @@ -35,18 +35,14 @@ class ClientErrorService extends BaseService { def clientErrorEmailService, configService - static clusterConfigs = [ - clientErrors: [IMap, { - evictionConfig.size = 100 - }] - ] - - private IMap errors = createIMap('clientErrors') private int getMaxErrors() {configService.getMap('xhClientErrorConfig').maxErrors as int} private int getAlertInterval() {configService.getMap('xhClientErrorConfig').intervalMins * MINUTES} + private IMap errors + void init() { super.init() + errors = createIMap('clientErrors') {it.evictionConfig.size = 100} createTimer( name: 'processErrors', runFn: this.&processErrors, diff --git a/grails-app/services/io/xh/hoist/cluster/ClusterAdminService.groovy b/grails-app/services/io/xh/hoist/cluster/ClusterAdminService.groovy index 6fb25799..ee85f8d0 100644 --- a/grails-app/services/io/xh/hoist/cluster/ClusterAdminService.groovy +++ b/grails-app/services/io/xh/hoist/cluster/ClusterAdminService.groovy @@ -7,6 +7,7 @@ import com.hazelcast.executor.impl.ExecutorServiceProxy import com.hazelcast.map.IMap import com.hazelcast.nearcache.NearCacheStats import com.hazelcast.replicatedmap.ReplicatedMap +import com.hazelcast.ringbuffer.impl.RingbufferProxy import com.hazelcast.topic.ITopic import io.xh.hoist.BaseService import io.xh.hoist.util.Utils @@ -142,6 +143,13 @@ class ClusterAdminService extends BaseService { publishOperationCount: stats.publishOperationCount, receiveOperationCount: stats.receiveOperationCount ] + case RingbufferProxy: + return [ + name : obj.getName(), + type : 'RingBuffer', + size : obj.size(), + capacity : obj.capacity() + ] case CacheProxy: def evictionConfig = obj.cacheConfig.evictionConfig, expiryPolicy = obj.cacheConfig.expiryPolicyFactory.create(), diff --git a/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy b/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy index cab63901..32334dae 100644 --- a/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy +++ b/grails-app/services/io/xh/hoist/cluster/ClusterService.groovy @@ -4,10 +4,15 @@ import com.hazelcast.cluster.Cluster import com.hazelcast.cluster.Member import com.hazelcast.cluster.MembershipEvent import com.hazelcast.cluster.MembershipListener +import com.hazelcast.collection.ISet +import com.hazelcast.config.Config import com.hazelcast.core.DistributedObject import com.hazelcast.core.Hazelcast import com.hazelcast.core.HazelcastInstance import com.hazelcast.core.IExecutorService +import com.hazelcast.map.IMap +import com.hazelcast.replicatedmap.ReplicatedMap +import com.hazelcast.topic.ITopic import io.xh.hoist.BaseService import io.xh.hoist.ClusterConfig import io.xh.hoist.exception.InstanceNotFoundException @@ -152,6 +157,39 @@ class ClusterService extends BaseService implements ApplicationListener [member.getAttribute('instanceName'), f.get()] } } + //------------------ + // Create Objects + //----------------- + static IMap configuredIMap(String name, Closure customizer = null) { + customizer?.call(hzConfig.getMapConfig(name)) + hzInstance.getMap(name) + } + + static ISet configuredISet(String name, Closure customizer = null) { + customizer?.call(hzConfig.getSetConfig(name)) + hzInstance.getSet(name) + } + + static ReplicatedMap configuredReplicatedMap(String name, Closure customizer = null) { + customizer?.call(hzConfig.getReplicatedMapConfig(name)) + hzInstance.getReplicatedMap(name) + } + + static ITopic configuredTopic(String name, Closure customizer = null) { + customizer?.call(hzConfig.getTopicConfig(name)) + hzInstance.getTopic(name) + } + + static ITopic configuredReliableTopic( + String name, + Closure customizer = null, + Closure ringBufferCustomizer = null + ) { + ringBufferCustomizer?.call(hzConfig.getRingbufferConfig(name)) + customizer?.call(hzConfig.getReliableTopicConfig(name)) + hzInstance.getReliableTopic(name) + } + //------------------------------------ // Implementation //------------------------------------ @@ -188,6 +226,10 @@ class ClusterService extends BaseService implements ApplicationListener resources = [:] private boolean _destroyed = false - private Map _replicatedValues - private Map _localValues private final Logger _log = LoggerFactory.getLogger(this.class) @@ -119,16 +121,16 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea //----------------------------------------------------------------- // Distributed Resources - // Use static reference to ClusterService to allow access pre-init. //------------------------------------------------------------------ /** * Create and return a reference to a Hazelcast IMap. * * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects * associated with this service. + * @param customizer - closure receiving a Hazelcast MapConfig. Mutate to customize. */ - IMap createIMap(String name) { - addResource(name, ClusterService.hzInstance.getMap(hzName(name))) + IMap createIMap(String name, Closure customizer = null) { + addResource(name, configuredIMap(hzName(name), customizer)) } /** @@ -136,9 +138,10 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea * * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects * associated with this service. + * @param customizer - closure receiving a Hazelcast SetConfig. Mutate to customize. */ - ISet createISet(String name) { - addResource(name, ClusterService.hzInstance.getSet(hzName(name))) + ISet createISet(String name, Closure customizer = null) { + addResource(name, configuredISet(hzName(name), customizer)) } /** @@ -146,17 +149,18 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea * * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects * associated with this service. + * @param customizer - closure receiving a Hazelcast ReplicatedMapConfig. Mutate to customize. */ - ReplicatedMap createReplicatedMap(String name) { - addResource(name, ClusterService.hzInstance.getReplicatedMap(hzName(name))) - } + ReplicatedMap createReplicatedMap(String name, Closure customizer = null) { + addResource(name, configuredReplicatedMap(hzName(name), customizer)) + } /** - * Get a reference to a Hazelcast Replicated topic, useful to publish to a cluster-wide topic. + * Get a reference to an existing or new Hazelcast topic. * To subscribe to events fired by other services on a topic, use {@link #subscribeToTopic}. */ ITopic getTopic(String id) { - ClusterService.hzInstance.getTopic(id) + hzInstance.getTopic(id) } /** @@ -224,7 +228,6 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea addResource(mp.name as String, new CachedValue([*:mp, svc: this])) } - /** * Create a managed subscription to events on the instance-local Grails event bus. * @@ -285,7 +288,6 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea } } - //------------------ // Cluster Support //------------------ @@ -374,14 +376,22 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea // Provide cached logger to LogSupport for possible performance benefit Logger getInstanceLog() { _log } + /** + * Generate a name for a resource, appropriate for Hazelcast. + * Note that this allows us to group all Hazelcast resources by Service + * + * Not typically called directly by applications. Applications should aim to create + * Hazelcast distributed objects using the methods in this class. + * + * @internal + */ + String hzName(String name) { + "${this.class.name}[$name]" + } //------------------------ // Internal implementation //------------------------ - protected String hzName(String name) { - this.class.name + '_' + name - } - private T addResource(String name, T resource) { if (!name || resources.containsKey(name)) { def msg = 'Service resource requires a unique name. ' @@ -391,26 +401,4 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea resources[name] = resource return resource } - - /** - * @internal - for use by Cache. - */ - Map getMapForCache(Cache cache) { - // register with xh prefix to avoid collisions, allow filtering out in admin - cache.useCluster ? createReplicatedMap("xh_${cache.name}") : new ConcurrentHashMap() - } - - /** - * @internal - for use by CachedValue - */ - Map getMapForCachedValue(CachedValue cachedValue) { - // register with xh prefix to avoid collisions, allow filtering out in admin - if (cachedValue.useCluster) { - if (_replicatedValues == null) _replicatedValues = createReplicatedMap('xh_cachedValues') - return _replicatedValues - } else { - if (_localValues == null) _localValues = new ConcurrentHashMap() - return _localValues - } - } } diff --git a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy b/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy deleted file mode 100644 index 1b4ef191..00000000 --- a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy +++ /dev/null @@ -1,113 +0,0 @@ -/* - * This file belongs to Hoist, an application development toolkit - * developed by Extremely Heavy Industries (www.xh.io | info@xh.io) - * - * Copyright © 2023 Extremely Heavy Industries Inc. - */ - -package io.xh.hoist.cache - - -import groovy.transform.CompileStatic -import io.xh.hoist.BaseService -import io.xh.hoist.cluster.ClusterService - -import static io.xh.hoist.util.DateTimeUtils.asEpochMilli -import static io.xh.hoist.util.DateTimeUtils.intervalElapsed - -@CompileStatic -abstract class BaseCache { - - /** Service using this object. */ - public final BaseService svc - - /** Unique name in the context of the service associated with this object. */ - public final String name - - /** Closure to determine if an entry should be expired (optional). */ - public final Closure expireFn - - /** - * Entry TTL as epochMillis Long, or closure to return the same (optional). - * No effect if a custom expireFn is provided instead. If both null, entries will never expire. - */ - public final Object expireTime - - /** - * Closure to determine the timestamp of an entry (optional). - * Must return a Long (as epochMillis), Date, or Instant. - */ - public final Closure timestampFn - - /** True to replicate this cache across a cluster (default false). */ - public final boolean replicate - - /** - * True to serialize old values to replicas in `CacheValueChanged` events (default false). - * - * Not serializing old values improves performance and is especially important for caches - * containing large objects that are expensive to serialize + deserialize. Enable only if your - * event handlers need access to the previous value. - */ - public final boolean serializeOldValue - - /** Handlers to be called on change with a {@link CacheValueChanged} object. */ - public final List onChange = [] - - BaseCache( - String name, - BaseService svc, - Object expireTime, - Closure expireFn, - Closure timestampFn, - boolean replicate, - boolean serializeOldValue - ) { - this.name = name - this.svc = svc - this.expireTime = expireTime - this.expireFn = expireFn - this.timestampFn = timestampFn - this.replicate = replicate - this.serializeOldValue = serializeOldValue - } - - /** @param handler called on change with a {@link CacheValueChanged} object. */ - abstract void addChangeHandler(Closure handler) - - /** Clear all values. */ - abstract void clear() - - /** True if this Cache should be stored across the cluster (backed by a ReplicatedMap). */ - boolean getUseCluster() { - return replicate && ClusterService.multiInstanceEnabled - } - - /** Information about this object, accessible via the Hoist Admin Console. */ - abstract Map getAdminStats() - - //------------------------ - // Implementation - //------------------------ - protected void fireOnChange(Object key, V oldValue, V value) { - def change = new CacheValueChanged(this, key, oldValue, value) - onChange.each { it.call(change) } - } - - protected boolean shouldExpire(Entry entry) { - if (expireFn) return expireFn(entry) - - if (expireTime) { - Long timestamp = getEntryTimestamp(entry), - expire = (expireTime instanceof Closure ? expireTime.call() : expireTime) as Long - return intervalElapsed(expire, timestamp) - } - return false - } - - protected Long getEntryTimestamp(Entry entry) { - if (!entry) return null - if (timestampFn) return asEpochMilli(timestampFn(entry.value)) - return entry.dateEntered - } -} diff --git a/src/main/groovy/io/xh/hoist/cache/Cache.groovy b/src/main/groovy/io/xh/hoist/cache/Cache.groovy index 61a6ad13..6a3af631 100644 --- a/src/main/groovy/io/xh/hoist/cache/Cache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/Cache.groovy @@ -11,11 +11,19 @@ import groovy.transform.CompileStatic import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.BaseService +import io.xh.hoist.cluster.ClusterService +import io.xh.hoist.log.LogSupport import io.xh.hoist.util.Timer +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeoutException +import static io.xh.hoist.cluster.ClusterService.hzInstance import static io.xh.hoist.util.DateTimeUtils.MINUTES import static io.xh.hoist.util.DateTimeUtils.SECONDS +import static io.xh.hoist.util.DateTimeUtils.asEpochMilli import static io.xh.hoist.util.DateTimeUtils.intervalElapsed import static java.lang.System.currentTimeMillis @@ -23,11 +31,50 @@ import static java.lang.System.currentTimeMillis * A key-value Cache, with support for optional entry TTL and replication across a cluster. */ @CompileStatic -class Cache extends BaseCache { +class Cache implements LogSupport { + + /** Service using this object. */ + public final BaseService svc + + /** Unique name in the context of the service associated with this object. */ + public final String name + + /** Closure to determine if an entry should be expired (optional). */ + public final Closure expireFn + + /** + * Entry TTL as epochMillis Long, or closure to return the same (optional). + * No effect if a custom expireFn is provided instead. If both null, entries will never expire. + */ + public final Object expireTime + + /** + * Closure to determine the timestamp of an entry (optional). + * Must return a Long (as epochMillis), Date, or Instant. + */ + public final Closure timestampFn - private final Map> _map + /** True to replicate this cache across a cluster (default false). */ + public final boolean replicate + + /** Handlers to be called on change with a {@link CacheEntryChanged} */ + public final List onChange = [] + + /** + * True to serialize old values to replicas in `CacheEntryChanged` events (default false). + * + * Not serializing old values improves performance and is especially important for caches + * containing large objects that are expensive to serialize + deserialize. Enable only if your + * event handlers need access to the previous value. + */ + public final boolean serializeOldValue + + + private final String loggerName + private final Map> _map private final Timer cullTimer + /** @internal - do not construct directly - use {@link BaseService#createCache}. */ @NamedVariant Cache( @@ -40,18 +87,29 @@ class Cache extends BaseCache { @NamedParam Boolean serializeOldValue = false, @NamedParam Closure onChange = null ) { - super(name, svc, expireTime, expireFn, timestampFn, replicate, serializeOldValue) + this.name = name + this.svc = svc + this.expireTime = expireTime + this.expireFn = expireFn + this.timestampFn = timestampFn + this.replicate = replicate + this.serializeOldValue = serializeOldValue - _map = svc.getMapForCache(this) - if (onChange) addChangeHandler(onChange) + // Allow fine grain logging for this within namespace of owning service + loggerName = "${svc.instanceLog.name}.Cache[$name]" - cullTimer = svc.createTimer( - name: "xh_${name}_cullEntries", + _map = useCluster ? hzInstance.getReplicatedMap(svc.hzName(name)) : new ConcurrentHashMap() + cullTimer = new Timer( + name: 'cullEntries', + owner: this, runFn: this.&cullEntries, interval: 15 * MINUTES, delay: true, primaryOnly: useCluster ) + if (onChange) { + addChangeHandler(onChange) + } } /** @returns the cached value at key. */ @@ -60,7 +118,7 @@ class Cache extends BaseCache { } /** @returns the cached Entry at key. */ - Entry getEntry(K key) { + CacheEntry getEntry(K key) { def ret = _map[key] if (ret && shouldExpire(ret)) { remove(key) @@ -69,6 +127,17 @@ class Cache extends BaseCache { return ret } + /** @returns cached value for key, or lazily creates if needed. */ + V getOrCreate(K key, Closure c) { + CacheEntry entry = _map[key] + if (!entry || shouldExpire(entry)) { + def val = c(key) + put(key, val) + return val + } + return entry.value + } + /** Remove the value at key. */ void remove(K key) { put(key, null) @@ -81,20 +150,11 @@ class Cache extends BaseCache { if (obj == null) { _map.remove(key) } else { - _map.put(key, new Entry(key.toString(), obj, svc.instanceLog.name)) + _map.put(key, new CacheEntry(key.toString(), obj, loggerName)) } if (!useCluster) fireOnChange(this, oldEntry?.value, obj) } - /** @returns cached value for key, or lazily creates if needed. */ - V getOrCreate(K key, Closure c) { - V ret = get(key) - if (ret == null) { - ret = c(key) - put(key, ret) - } - return ret - } /** @returns a Map representation of currently cached data. */ Map getMap() { @@ -124,7 +184,7 @@ class Cache extends BaseCache { void addChangeHandler(Closure handler) { if (!onChange && _map instanceof ReplicatedMap) { - _map.addEntryListener(new HzEntryListener(this)) + _map.addEntryListener(new CacheEntryListener(this)) } onChange << handler } @@ -145,7 +205,7 @@ class Cache extends BaseCache { ) { if (getEntry(key)) return - svc.withDebug("Waiting for cache entry value at '$key'") { + withDebug("Waiting for cache entry value at '$key'") { for (def startTime = currentTimeMillis(); !intervalElapsed(timeout, startTime); sleep(interval)) { if (getEntry(key)) return; } @@ -168,10 +228,18 @@ class Cache extends BaseCache { ] } + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) + } + boolean asBoolean() { return size() > 0 } + private boolean getUseCluster() { + return replicate && ClusterService.multiInstanceEnabled + } + private void cullEntries() { Set cullKeys = new HashSet<>() def oldSize = size() @@ -183,7 +251,30 @@ class Cache extends BaseCache { } if (cullKeys) { - svc.logDebug("Cache '$name' culled ${cullKeys.size()} out of $oldSize entries") + logDebug("Cache '$name' culled ${cullKeys.size()} out of $oldSize entries") + } + } + + private boolean shouldExpire(CacheEntry entry) { + if (expireFn) return expireFn.call(entry) + + if (expireTime) { + Long timestamp = getEntryTimestamp(entry), + expire = (expireTime instanceof Closure ? expireTime.call() : expireTime) as Long + return intervalElapsed(expire, timestamp) } + return false + } + + private Long getEntryTimestamp(CacheEntry entry) { + if (!entry) return null + if (timestampFn) return asEpochMilli(timestampFn.call(entry.value)) + return entry.dateEntered + } + + private void fireOnChange(Object key, V oldValue, V value) { + if (oldValue === value) return + def change = new CacheEntryChanged(this, key, oldValue, value) + onChange.each { it.call(change) } } } diff --git a/src/main/groovy/io/xh/hoist/cache/Entry.groovy b/src/main/groovy/io/xh/hoist/cache/CacheEntry.groovy similarity index 92% rename from src/main/groovy/io/xh/hoist/cache/Entry.groovy rename to src/main/groovy/io/xh/hoist/cache/CacheEntry.groovy index 3bd79232..04fd659f 100644 --- a/src/main/groovy/io/xh/hoist/cache/Entry.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CacheEntry.groovy @@ -19,7 +19,7 @@ import org.slf4j.LoggerFactory import static java.lang.System.currentTimeMillis @CompileStatic -class Entry implements KryoSerializable, LogSupport { +class CacheEntry implements KryoSerializable, LogSupport { String key T value Long dateEntered @@ -27,7 +27,7 @@ class Entry implements KryoSerializable, LogSupport { boolean serializeValue - Entry(String key, T value, String loggerName) { + CacheEntry(String key, T value, String loggerName) { this.key = key this.value = value this.dateEntered = currentTimeMillis() @@ -35,7 +35,7 @@ class Entry implements KryoSerializable, LogSupport { this.serializeValue = true } - Entry() {} + CacheEntry() {} void write(Kryo kryo, Output output) { output.writeBoolean(serializeValue) diff --git a/src/main/groovy/io/xh/hoist/cache/CacheValueChanged.groovy b/src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy similarity index 80% rename from src/main/groovy/io/xh/hoist/cache/CacheValueChanged.groovy rename to src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy index ba9f1f13..9b199f5f 100644 --- a/src/main/groovy/io/xh/hoist/cache/CacheValueChanged.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CacheEntryChanged.groovy @@ -1,9 +1,9 @@ package io.xh.hoist.cache -class CacheValueChanged { +class CacheEntryChanged { /** Source object the changed value is contained within. */ - final BaseCache source + final Cache source /** * Key of the value being changed. @@ -17,7 +17,7 @@ class CacheValueChanged { private final V _value /** @internal */ - CacheValueChanged(BaseCache source, K key, V oldValue, V value) { + CacheEntryChanged(Cache source, K key, V oldValue, V value) { this.source = source this.key = key _oldValue = oldValue @@ -37,7 +37,7 @@ class CacheValueChanged { */ V getOldValue() { if (!source.serializeOldValue) { - source.svc.logWarn('Accessing the old value for a cache with serializeOldValue=false') + source.logWarn('Accessing the old value for a cache with serializeOldValue=false') return null } return this._oldValue diff --git a/src/main/groovy/io/xh/hoist/cache/HzEntryListener.groovy b/src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy similarity index 85% rename from src/main/groovy/io/xh/hoist/cache/HzEntryListener.groovy rename to src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy index 59333c07..8f755d6b 100644 --- a/src/main/groovy/io/xh/hoist/cache/HzEntryListener.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CacheEntryListener.groovy @@ -4,11 +4,12 @@ import com.hazelcast.core.EntryEvent import com.hazelcast.core.EntryListener import com.hazelcast.map.MapEvent -class HzEntryListener implements EntryListener { +/** @internal */ +class CacheEntryListener implements EntryListener { - private BaseCache target + private Cache target - HzEntryListener(BaseCache target) { + CacheEntryListener(Cache target) { this.target = target } diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy deleted file mode 100644 index 1ec538cf..00000000 --- a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy +++ /dev/null @@ -1,133 +0,0 @@ -package io.xh.hoist.cache - -import com.hazelcast.replicatedmap.ReplicatedMap -import groovy.transform.NamedParam -import groovy.transform.NamedVariant -import io.xh.hoist.BaseService - -import java.util.concurrent.TimeoutException - -import static io.xh.hoist.util.DateTimeUtils.SECONDS -import static io.xh.hoist.util.DateTimeUtils.intervalElapsed -import static java.lang.System.currentTimeMillis - -/** - * Similar to {@link Cache}, but a single value that can be read, written, and expired. - * Like Cache, this object supports replication across the cluster. - */ -class CachedValue extends BaseCache { - - private final Map> _map - - /** @internal - do not construct directly - use {@link BaseService#createCachedValue}. */ - @NamedVariant - CachedValue( - @NamedParam(required = true) String name, - @NamedParam(required = true) BaseService svc, - @NamedParam Object expireTime = null, - @NamedParam Closure expireFn = null, - @NamedParam Closure timestampFn = null, - @NamedParam Boolean replicate = false, - @NamedParam Boolean serializeOldValue = false, - @NamedParam Closure onChange = null - ) { - super(name, svc, expireTime, expireFn, timestampFn, replicate, serializeOldValue) - - _map = svc.getMapForCachedValue(this) - if (onChange) addChangeHandler(onChange) - } - - /** @returns the cached value. */ - V get() { - def ret = _map[name] - if (ret && shouldExpire(ret)) { - set(null) - return null - } - return ret?.value - } - - /** @returns the cached value, or calls the provided closure to create, cache, and return. */ - V getOrCreate(Closure c) { - V ret = get() - if (ret == null) { - ret = c() - set(ret) - } - return ret - } - - /** Set the value. */ - void set(V value) { - def oldEntry = _map[name] - if (!serializeOldValue) oldEntry?.serializeValue = false - if (value == null) { - _map.remove(name) - } else { - _map[name] = new Entry(name, value, svc.instanceLog.name) - } - - if (!useCluster) fireOnChange(name, oldEntry?.value, value) - } - - /** Clear the value. */ - void clear() { - set(null) - } - - /** @returns timestamp of the current entry, or null if none. */ - Long getTimestamp() { - getEntryTimestamp(_map[name]) - } - - /** - * Wait for the replicated value to be populated. - * @param timeout, time in ms to wait. -1 to wait indefinitely (not recommended). - * @param interval, time in ms to wait between tests. - * @param timeoutMessage, custom message associated with any timeout. - */ - @NamedVariant - void ensureAvailable( - @NamedParam Long timeout = 30 * SECONDS, - @NamedParam Long interval = 1 * SECONDS, - @NamedParam String timeoutMessage = null - ) { - if (get() != null) return - - svc.withDebug("Waiting for CachedValue '$name'") { - for (def startTime = currentTimeMillis(); !intervalElapsed(timeout, startTime); sleep(interval)) { - if (get() != null) return - } - - String msg = timeoutMessage ?: "Timed out after ${timeout}ms waiting for CachedValue '$name'" - throw new TimeoutException(msg) - } - } - - void addChangeHandler(Closure handler) { - if (!onChange && _map instanceof ReplicatedMap) { - _map.addEntryListener(new HzEntryListener(this), name) - } - onChange << handler - } - - //------------------- - // Implementation - //------------------- - Map getAdminStats() { - def val = get(), - ret = [ - name : name, - type : 'CachedValue' + (replicate ? ' (replicated)' : ''), - timestamp: timestamp - ] - if (val instanceof Collection || val instanceof Map) { - ret.size = val.size() - } - return ret - } - - boolean asBoolean() { - return get() != null - } -} diff --git a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy new file mode 100644 index 00000000..d4d76529 --- /dev/null +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValue.groovy @@ -0,0 +1,250 @@ +package io.xh.hoist.cachedvalue + +import com.hazelcast.config.InMemoryFormat +import com.hazelcast.topic.ITopic +import com.hazelcast.topic.Message +import com.hazelcast.topic.ReliableMessageListener +import groovy.transform.NamedParam +import groovy.transform.NamedVariant +import io.xh.hoist.BaseService +import io.xh.hoist.cluster.ClusterService +import io.xh.hoist.log.LogSupport +import io.xh.hoist.util.DateTimeUtils +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.util.concurrent.TimeoutException + +import static grails.async.Promises.task +import static io.xh.hoist.cluster.ClusterService.configuredReliableTopic +import static io.xh.hoist.util.DateTimeUtils.asEpochMilli +import static io.xh.hoist.util.DateTimeUtils.intervalElapsed +import static java.lang.System.currentTimeMillis + +/** + * Similar to {@link io.xh.hoist.cache.Cache}, but a single value that can be read, written, and expired. + * Like Cache, this object supports replication across the cluster. + */ +class CachedValue implements LogSupport { + + /** Service using this object. */ + public final BaseService svc + + /** Unique name in the context of the service associated with this object. */ + public final String name + + /** Closure to determine if an entry should be expired (optional). */ + public final Closure expireFn + + /** + * Entry TTL as epochMillis Long, or closure to return the same (optional). + * No effect if a custom expireFn is provided instead. If both null, entries will never expire. + */ + public final Object expireTime + + /** + * Closure to determine the timestamp of an entry (optional). + * Must return a Long (as epochMillis), Date, or Instant. + */ + public final Closure timestampFn + + /** True to replicate this cache across a cluster (default false). */ + public final boolean replicate + + /** Handlers to be called on change with a {@link CachedValueChanged} object. */ + public final List onChange = [] + + + private final String loggerName + private final ITopic> topic + private CachedValueEntry entry = new CachedValueEntry(null, loggerName) + + + /** @internal - do not construct directly - use {@link BaseService#createCachedValue}. */ + @NamedVariant + CachedValue( + @NamedParam(required = true) String name, + @NamedParam(required = true) BaseService svc, + @NamedParam Object expireTime = null, + @NamedParam Closure expireFn = null, + @NamedParam Closure timestampFn = null, + @NamedParam Boolean replicate = false, + @NamedParam Closure onChange = null + ) { + + this.name = name + this.svc = svc + this.expireTime = expireTime + this.expireFn = expireFn + this.timestampFn = timestampFn + this.replicate = replicate + + // Allow fine grain logging for this within namespace of owning service + loggerName = "${svc.instanceLog.name}.CachedValue[$name]" + + topic = useCluster ? createUpdateTopic() : null + if (onChange) { + addChangeHandler(onChange) + } + } + + /** @returns the cached value. */ + V get() { + if (shouldExpire(entry)) { + set(null) + return null + } + return entry.value + } + + /** @returns the cached value, or calls the provided closure to create, cache, and return. */ + V getOrCreate(Closure c) { + V ret = entry.value + if (ret == null || shouldExpire(entry)) { + ret = c() + set(ret) + } + return ret + } + + /** Set the value. */ + void set(V value) { + setInternal(new CachedValueEntry(value, loggerName), true) + } + + /** Clear the value. */ + void clear() { + set(null) + } + + /** @returns timestamp of the current entry, or null if none. */ + Long getTimestamp() { + getEntryTimestamp(entry) + } + + /** + * Wait for the replicated value to be populated. + * @param timeout, time in ms to wait. -1 to wait indefinitely (not recommended). + * @param interval, time in ms to wait between tests. + * @param timeoutMessage, custom message associated with any timeout. + */ + @NamedVariant + void ensureAvailable( + @NamedParam Long timeout = 30 * DateTimeUtils.SECONDS, + @NamedParam Long interval = 1 * DateTimeUtils.SECONDS, + @NamedParam String timeoutMessage = null + ) { + if (get() != null) return + + svc.withDebug("Waiting for CachedValue '$name'") { + for (def startTime = currentTimeMillis(); !intervalElapsed(timeout, startTime); sleep(interval)) { + if (get() != null) return + } + + String msg = timeoutMessage ?: "Timed out after ${timeout}ms waiting for CachedValue '$name'" + throw new TimeoutException(msg) + } + } + /** @param handler called on change with a {@link CachedValueChanged} object. */ + void addChangeHandler(Closure handler) { + onChange << handler + } + + //------------------- + // Implementation + //------------------- + synchronized void setInternal(CachedValueEntry newEntry, boolean publish) { + if (newEntry.uuid == entry.uuid) return + + // Make the swap and put on topic. + def oldEntry = entry + entry = newEntry + if (publish && topic) topic.publish(newEntry) + + // Fire event handlers + if (onChange && oldEntry.value !== newEntry.value) { + task { + def change = new CachedValueChanged(this, oldEntry.value, newEntry.value) + onChange.each { it.call(change) } + } + } + } + + boolean asBoolean() { + return get() != null + } + + private boolean shouldExpire(CachedValueEntry entry) { + if (entry.value == null) return false + if (expireFn) return expireFn(entry) + + if (expireTime) { + Long timestamp = getEntryTimestamp(entry), + expire = (expireTime instanceof Closure ? expireTime.call() : expireTime) as Long + return intervalElapsed(expire, timestamp) + } + return false + } + + private Long getEntryTimestamp(CachedValueEntry entry) { + return timestampFn ? asEpochMilli(timestampFn(entry.value)) : entry.dateEntered + } + + private ITopic> createUpdateTopic() { + // Create a durable topic with room for just a single item + // and register for all events, including replay of event before this instance existed. + def ret = configuredReliableTopic( + svc.hzName(name), + {it.readBatchSize = 1}, + { + it.capacity = 1 + it.inMemoryFormat = InMemoryFormat.OBJECT + } + ) + ret.addMessageListener( + new ReliableMessageListener>() { + void onMessage(Message> message) { + def member = message.publishingMember, + src = member.localMember() ? '[self]' : member.getAttribute('instanceName') + logTrace("Received msg from $src", message.messageObject.uuid) + setInternal(message.messageObject, false) + } + + long retrieveInitialSequence() { return 0 } + + void storeSequence(long sequence) {} + + boolean isLossTolerant() { return true } + + boolean isTerminal(Throwable e) { + svc.logError('Error handling update message', e) + return false + } + } + ) + return ret + } + + private boolean getUseCluster() { + return replicate && ClusterService.multiInstanceEnabled + } + + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) + } + + + Map getAdminStats() { + def val = get(), + ret = [ + name : name, + type : 'CachedValue' + (replicate ? ' (replicated)' : ''), + timestamp: timestamp + ] + if (val instanceof Collection || val instanceof Map) { + ret.size = val.size() + } + return ret + } + +} diff --git a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueChanged.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueChanged.groovy new file mode 100644 index 00000000..19f6f65c --- /dev/null +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueChanged.groovy @@ -0,0 +1,20 @@ +package io.xh.hoist.cachedvalue + +class CachedValueChanged { + + /** Source object the changed value is contained within. */ + final CachedValue source + + /** OldValue. Null if value being set for the first time.*/ + final V oldValue + + /** New Value. Null if value being unset or removed. */ + final V value + + /** @internal */ + CachedValueChanged(CachedValue source, V oldValue, V value) { + this.source = source + this.oldValue = oldValue + this.value = value + } +} diff --git a/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueEntry.groovy b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueEntry.groovy new file mode 100644 index 00000000..b14fd29b --- /dev/null +++ b/src/main/groovy/io/xh/hoist/cachedvalue/CachedValueEntry.groovy @@ -0,0 +1,62 @@ +/* + * This file belongs to Hoist, an application development toolkit + * developed by Extremely Heavy Industries (www.xh.io | info@xh.io) + * + * Copyright © 2023 Extremely Heavy Industries Inc. + */ + +package io.xh.hoist.cachedvalue + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.KryoSerializable +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output +import io.xh.hoist.log.LogSupport +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import static java.lang.System.currentTimeMillis + +class CachedValueEntry implements KryoSerializable, LogSupport { + Long dateEntered + String loggerName + String uuid + T value + + CachedValueEntry(T value, String loggerName) { + this.dateEntered = currentTimeMillis() + this.loggerName = loggerName + this.value = value + this.uuid = UUID.randomUUID() + } + + CachedValueEntry() {} + + void write(Kryo kryo, Output output) { + output.writeLong(dateEntered) + output.writeString(loggerName) + output.writeString(uuid) + withSingleTrace('Serializing value') { + kryo.writeClassAndObject(output, value) + } + } + + void read(Kryo kryo, Input input) { + dateEntered = input.readLong() + loggerName = input.readString() + uuid = input.readString() + withSingleTrace('Deserializing value') { + value = (T) kryo.readClassAndObject(input) + } + } + + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) + } + + private void withSingleTrace(String msg, Closure c) { + Long start = currentTimeMillis() + c() + logTrace(msg, [_elapsedMs: currentTimeMillis() - start]) + } +} diff --git a/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy b/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy index 372ef51c..0e9b69b3 100644 --- a/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy +++ b/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy @@ -8,7 +8,7 @@ package io.xh.hoist.role.provided import grails.gorm.transactions.ReadOnly -import io.xh.hoist.cache.CachedValue +import io.xh.hoist.cachedvalue.CachedValue import io.xh.hoist.config.ConfigService import io.xh.hoist.ldap.LdapService import io.xh.hoist.role.BaseRoleService diff --git a/src/main/groovy/io/xh/hoist/util/Timer.groovy b/src/main/groovy/io/xh/hoist/util/Timer.groovy index eda05508..801c4038 100644 --- a/src/main/groovy/io/xh/hoist/util/Timer.groovy +++ b/src/main/groovy/io/xh/hoist/util/Timer.groovy @@ -7,11 +7,13 @@ package io.xh.hoist.util +import com.hazelcast.replicatedmap.ReplicatedMap import groovy.transform.NamedParam import groovy.transform.NamedVariant -import io.xh.hoist.BaseService -import io.xh.hoist.cache.CachedValue +import io.xh.hoist.cluster.ClusterService import io.xh.hoist.log.LogSupport +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutorService @@ -21,9 +23,12 @@ import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException +import static io.xh.hoist.cluster.ClusterService.multiInstanceEnabled import static io.xh.hoist.util.DateTimeUtils.* import static io.xh.hoist.util.Utils.configService import static io.xh.hoist.util.Utils.getExceptionHandler +import static java.lang.Math.max +import static java.lang.System.currentTimeMillis /** * Hoist's implementation of an interval-based Timer, for running tasks on a repeated interval. @@ -40,11 +45,11 @@ import static io.xh.hoist.util.Utils.getExceptionHandler * A common pattern would be to have the primary instance run a Timer-based job to load data into * a cache, with the cache then replicated across the cluster. */ -class Timer { +class Timer implements LogSupport { - private static Long CONFIG_INTERVAL = 15 * SECONDS + private static Long CONFIG_INTERVAL = 15 * SECONDS - /** Unique name for this timer, required for cluster aware timers (see `primaryOnly`) **/ + /** Name for this timer. Should be unique within the context of owner for the purpose of logging. **/ final String name /** Object using this timer **/ @@ -88,18 +93,16 @@ class Timer { /** * Only run job when clustered instance is the primary instance? Default is false. - * For timers owned by instances of BaseService only. */ final boolean primaryOnly - /** Date last run started. */ - Date getLastRunStarted() { + Long getLastRunStarted() { _lastRunStarted } /** Date last run completed. */ - Date getLastRunCompleted() { + Long getLastRunCompleted() { _lastRunCompleted } @@ -115,22 +118,22 @@ class Timer { private Long timeoutMs private Long coreIntervalMs - - private Date _lastRunCompleted = null - private Date _lastRunStarted = null + private Long _lastRunCompleted = null + private Long _lastRunStarted = null private Map _lastRunStats = null private boolean _isRunning = false - private boolean forceRun = false + private boolean forceRun = false private java.util.Timer coreTimer private java.util.Timer configTimer + private String uuid = UUID.randomUUID() + private String loggerName - - private CachedValue _lastCompletedOnCluster - + private static ReplicatedMap lastCompletedOnCluster // Args from Grails 3.0 async promise implementation static ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()) + /** * Applications should not typically use this constructor directly. Timers are typically * created by services using the createTimer() method supplied by io.xh.hoist.BaseService. @@ -150,6 +153,7 @@ class Timer { ) { this.name = name this.owner = owner + this.loggerName = "${owner.instanceLog.name}.Timer[$name]" this.runFn = runFn this.primaryOnly = primaryOnly this.runImmediatelyAndBlock = runImmediatelyAndBlock @@ -159,19 +163,15 @@ class Timer { this.intervalUnits = intervalUnits this.timeoutUnits = timeoutUnits - if (primaryOnly) { - if (!owner instanceof BaseService) { - throw new IllegalArgumentException("A 'primaryOnly' timer must be owned by an instance of BaseService.") - } - - _lastCompletedOnCluster = (owner as BaseService).createCachedValue(name: "xh_${name}_lastCompleted") - } - intervalMs = calcIntervalMs() timeoutMs = calcTimeoutMs() delayMs = calcDelayMs() coreIntervalMs = calcCoreIntervalMs() + if (useCluster && lastCompletedOnCluster == null) { + lastCompletedOnCluster = ClusterService.configuredReplicatedMap('xhTimersLastCompleted') + } + if (runImmediatelyAndBlock) { doRun() } @@ -184,7 +184,7 @@ class Timer { if (interval instanceof Closure || timeout instanceof Closure) { configTimer = new java.util.Timer() configTimer.schedule( - (this.&onConfigTimer as TimerTask), CONFIG_INTERVAL, CONFIG_INTERVAL + (this.&onConfigTimer as TimerTask), CONFIG_INTERVAL, CONFIG_INTERVAL ) } } @@ -212,13 +212,13 @@ class Timer { /** Information about this timer, accessible via the Hoist Admin Console. */ Map getAdminStats() { [ - name: name, - type: 'Timer' + (primaryOnly ? ' (primary only)': ''), + name : name, + type : 'Timer' + (primaryOnly ? ' (primary only)' : ''), intervalMs: intervalMs, - isRunning: isRunning, - startTime: isRunning ? _lastRunStarted: null, - last: _lastRunStats - ].findAll {it.value != null} + isRunning : isRunning, + startTime : isRunning ? _lastRunStarted : null, + last : _lastRunStats + ].findAll { it.value != null } } @@ -229,7 +229,7 @@ class Timer { if (primaryOnly && !Utils.clusterService.isPrimary) return _isRunning = true - _lastRunStarted = new Date() + _lastRunStarted = currentTimeMillis() Throwable throwable = null Future future = null try { @@ -249,24 +249,23 @@ class Timer { throwable = t } - _lastRunCompleted = new Date() - _lastCompletedOnCluster?.set(_lastRunCompleted) + setLastCompletedInternal(currentTimeMillis()) _isRunning = false _lastRunStats = [ startTime: _lastRunStarted, - endTime: _lastRunCompleted, - elapsedMs: _lastRunCompleted.time - _lastRunStarted.time + endTime : _lastRunCompleted, + elapsedMs: _lastRunCompleted - _lastRunStarted ] if (throwable) { try { _lastRunStats.error = exceptionHandler.summaryTextForThrowable(throwable) exceptionHandler.handleException( exception: throwable, - logTo: owner, + logTo: this, logMessage: "Failure in '$name'" ) } catch (Throwable ignore) { - owner.logError('Failed to handle exception in Timer') + logError('Failed to handle exception in Timer') } } } @@ -281,7 +280,7 @@ class Timer { if (interval == null) return null Long ret = (interval instanceof Closure ? (interval as Closure)() : interval) * intervalUnits; if (ret > 0 && ret < 500) { - owner.logWarn('Timer cannot be set for values less than 500ms.') + logWarn('Timer cannot be set for values less than 500ms.') ret = 500 } return ret @@ -304,7 +303,7 @@ class Timer { timeoutMs = calcTimeoutMs() adjustCoreTimerIfNeeded() } catch (Throwable t) { - owner.logError('Timer failed to reload config', t) + logError('Timer failed to reload config', t) } } @@ -329,9 +328,7 @@ class Timer { } private boolean intervalHasElapsed() { - if (intervalMs <= 0) return false - def lastRun = _lastCompletedOnCluster ? _lastCompletedOnCluster.get() : _lastRunCompleted - return intervalElapsed(intervalMs, lastRun) + intervalMs > 0 ? intervalElapsed(intervalMs, lastCompletedInternal) : false } private Long calcCoreIntervalMs() { @@ -347,4 +344,23 @@ class Timer { coreIntervalMs = newCoreIntervalMs } } + + private Long getLastCompletedInternal() { + def local = _lastRunCompleted, + onCluster = useCluster ? lastCompletedOnCluster[uuid] : null + return local && onCluster ? max(local, onCluster) : (onCluster ?: local) + } + + private setLastCompletedInternal(Long timestamp) { + if (useCluster) lastCompletedOnCluster.put(uuid, timestamp) + _lastRunCompleted = timestamp + } + + private boolean getUseCluster() { + multiInstanceEnabled && primaryOnly + } + + Logger getInstanceLog() { + LoggerFactory.getLogger(loggerName) + } }