diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c883f79..900bab94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,25 @@ ## 22.0-SNAPSHOT +### 💥 Breaking Changes (upgrade difficulty: 🟢 LOW) +* All `Timer`, `Cache`, and `CachedValue` object require a 'name' property. This property was +previously optional in many cases, but is now required in order to support new cluster features, +logging, and admin tools. The new `BaseService.resources` property now will give access to all +resources by name, if needed and replaces `BaseService.timers`. + +* `BaseService` methods `getIMap()`, `getReplicatedMap()` and `getISet()` have been changed to + `createIMap()`, `createReplicatedMap()` and `createISet()`, respectively. This change provides + a consistent interface for all resources on BaseService and is not expected to impact most + applications. + +### 🎁 New Features +* `Cache` and `CachedValue` should now be created using a factory on `BaseService`. This streamlined +interface reduces boilerplate, and provides a consistent interface with `Timer`. + ### ⚙️ Technical +* Improvements to `Timer` to avoid extra executions when primary instance changes. + * Updated `ClusterService` to use Hoist's `InstanceNotFoundException` class to designate routine. * Exposed `/xh/ping` as whitelisted route for basic uptime/reachability checks. Retained legacy diff --git a/grails-app/services/io/xh/hoist/admin/ConnectionPoolMonitoringService.groovy b/grails-app/services/io/xh/hoist/admin/ConnectionPoolMonitoringService.groovy index 3abc6688..37dc7e38 100644 --- a/grails-app/services/io/xh/hoist/admin/ConnectionPoolMonitoringService.groovy +++ b/grails-app/services/io/xh/hoist/admin/ConnectionPoolMonitoringService.groovy @@ -35,8 +35,9 @@ class ConnectionPoolMonitoringService extends BaseService { void init() { createTimer( - interval: {enabled ? config.snapshotInterval * DateTimeUtils.SECONDS: -1}, - runFn: this.&takeSnapshot + name: 'takeSnapshot', + runFn: this.&takeSnapshot, + interval: {enabled ? config.snapshotInterval * DateTimeUtils.SECONDS: -1} ) } diff --git a/grails-app/services/io/xh/hoist/admin/MemoryMonitoringService.groovy b/grails-app/services/io/xh/hoist/admin/MemoryMonitoringService.groovy index 386df695..3d09b794 100644 --- a/grails-app/services/io/xh/hoist/admin/MemoryMonitoringService.groovy +++ b/grails-app/services/io/xh/hoist/admin/MemoryMonitoringService.groovy @@ -33,8 +33,9 @@ class MemoryMonitoringService extends BaseService { void init() { createTimer( - interval: {this.enabled ? config.snapshotInterval * DateTimeUtils.SECONDS: -1}, - runFn: this.&takeSnapshot + name: 'takeSnapshot', + runFn: this.&takeSnapshot, + interval: {this.enabled ? config.snapshotInterval * DateTimeUtils.SECONDS: -1} ) } @@ -178,6 +179,6 @@ class MemoryMonitoringService extends BaseService { Map getAdminStats() {[ config: configForAdminStats('xhMemoryMonitoringConfig'), - latestSnapshot: latestSnapshot, + latestSnapshot: latestSnapshot ]} } diff --git a/grails-app/services/io/xh/hoist/admin/ServiceManagerService.groovy b/grails-app/services/io/xh/hoist/admin/ServiceManagerService.groovy index 9e698ca9..56028b99 100644 --- a/grails-app/services/io/xh/hoist/admin/ServiceManagerService.groovy +++ b/grails-app/services/io/xh/hoist/admin/ServiceManagerService.groovy @@ -7,6 +7,7 @@ package io.xh.hoist.admin +import com.hazelcast.core.DistributedObject import io.xh.hoist.BaseService class ServiceManagerService extends BaseService { @@ -15,8 +16,6 @@ class ServiceManagerService extends BaseService { clusterAdminService Collection listServices() { - - getServicesInternal().collect { name, svc -> return [ name: name, @@ -28,24 +27,8 @@ class ServiceManagerService extends BaseService { Map getStats(String name) { def svc = grailsApplication.mainContext.getBean(name), - prefix = svc.class.name + '_', - timers = svc.timers*.adminStats, - distObjs = clusterService.distributedObjects - .findAll { it.getName().startsWith(prefix) } - .collect {clusterAdminService.getAdminStatsForObject(it)} - - Map ret = svc.adminStats - if (timers || distObjs) { - ret = ret.clone() - if (distObjs) ret.distributedObjects = distObjs - if (timers.size() == 1) { - ret.timer = timers[0] - } else if (timers.size() > 1) { - ret.timers = timers - } - } - - return ret + resources = getResourceStats(svc) + return resources ? [*: svc.adminStats, resources: resources] : svc.adminStats } void clearCaches(List names) { @@ -60,6 +43,23 @@ class ServiceManagerService extends BaseService { } } + //---------------------- + // Implementation + //---------------------- + private List getResourceStats(BaseService svc) { + svc.resources + .findAll { !it.key.startsWith('xh_') } // skip hoist implementation objects + .collect { k, v -> + Map stats = v instanceof DistributedObject ? + clusterAdminService.getAdminStatsForObject(v) : + v.adminStats + + // rely on the name (key) service knows, i.e avoid HZ prefix + return [*: stats, name: k] + } + } + + private Map getServicesInternal() { return grailsApplication.mainContext.getBeansOfType(BaseService.class, false, false) } diff --git a/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy b/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy index 88795e74..d93fc63b 100644 --- a/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy +++ b/grails-app/services/io/xh/hoist/alertbanner/AlertBannerService.groovy @@ -42,17 +42,14 @@ class AlertBannerService extends BaseService { private final static String presetsBlobName = 'xhAlertBannerPresets' private final Map emptyAlert = [active: false] - private CachedValue _alertBanner = new CachedValue<>( - name: 'alertBanner', - replicate: true, - svc: this - ) + private CachedValue _alertBanner = createCachedValue(name: 'alertBanner', replicate: true) private Timer timer void init() { timer = createTimer( - interval: 2 * MINUTES, + name: 'readFromSpec', runFn: this.&readFromSpec, + interval: 2 * MINUTES, primaryOnly: true ) super.init() diff --git a/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy b/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy index 808ee14d..e5bb9575 100644 --- a/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy +++ b/grails-app/services/io/xh/hoist/clienterror/ClientErrorService.groovy @@ -41,13 +41,15 @@ class ClientErrorService extends BaseService { }] ] - private IMap errors = getIMap('clientErrors') + private IMap errors = createIMap('clientErrors') private int getMaxErrors() {configService.getMap('xhClientErrorConfig').maxErrors as int} private int getAlertInterval() {configService.getMap('xhClientErrorConfig').intervalMins * MINUTES} void init() { super.init() createTimer( + name: 'processErrors', + runFn: this.&processErrors, interval: { alertInterval }, delay: 15 * SECONDS, primaryOnly: true @@ -99,7 +101,7 @@ class ClientErrorService extends BaseService { // Implementation //--------------------------------------------------------- @Transactional - void onTimer() { + private void processErrors() { if (!errors) return def maxErrors = getMaxErrors(), @@ -121,8 +123,7 @@ class ClientErrorService extends BaseService { } Map getAdminStats() {[ - config: configForAdminStats('xhClientErrorConfig'), - pendingErrorCount: errors.size() + config: configForAdminStats('xhClientErrorConfig') ]} } diff --git a/grails-app/services/io/xh/hoist/config/ConfigService.groovy b/grails-app/services/io/xh/hoist/config/ConfigService.groovy index 7b75bbf4..0279a331 100644 --- a/grails-app/services/io/xh/hoist/config/ConfigService.groovy +++ b/grails-app/services/io/xh/hoist/config/ConfigService.groovy @@ -207,8 +207,7 @@ class ConfigService extends BaseService { } void fireConfigChanged(AppConfig obj) { - def topic = clusterService.getTopic('xhConfigChanged') - topic.publishAsync([key: obj.name, value: obj.externalValue()]) + getTopic('xhConfigChanged').publishAsync([key: obj.name, value: obj.externalValue()]) } //------------------- diff --git a/grails-app/services/io/xh/hoist/ldap/LdapService.groovy b/grails-app/services/io/xh/hoist/ldap/LdapService.groovy index 06991157..52e18412 100644 --- a/grails-app/services/io/xh/hoist/ldap/LdapService.groovy +++ b/grails-app/services/io/xh/hoist/ldap/LdapService.groovy @@ -41,9 +41,9 @@ class LdapService extends BaseService { def configService - private Cache> cache = new Cache<>( - expireTime: {config.cacheExpireSecs * SECONDS}, - svc: this + private Cache> cache = createCache( + name: 'queryCache', + expireTime: {config.cacheExpireSecs * SECONDS} ) static clearCachesConfigs = ['xhLdapConfig', 'xhLdapUsername', 'xhLdapPassword'] diff --git a/grails-app/services/io/xh/hoist/log/LogArchiveService.groovy b/grails-app/services/io/xh/hoist/log/LogArchiveService.groovy index 27943b8a..b3ecdbf6 100644 --- a/grails-app/services/io/xh/hoist/log/LogArchiveService.groovy +++ b/grails-app/services/io/xh/hoist/log/LogArchiveService.groovy @@ -27,7 +27,11 @@ class LogArchiveService extends BaseService { logReaderService void init() { - createTimer(interval: 1 * DAYS) + createTimer( + name: 'archiveLogs', + runFn: { archiveLogs((Integer) config.archiveAfterDays)}, + interval: 1 * DAYS + ) } List archiveLogs(Integer daysThreshold) { @@ -69,12 +73,6 @@ class LogArchiveService extends BaseService { //------------------------ // Implementation //------------------------ - private void onTimer() { - if (isPrimary) { - archiveLogs((Integer) config.archiveAfterDays) - } - } - private File getArchiveDir(String logPath, String category) { return new File(logPath + separator + config.archiveFolder + separator + category) } diff --git a/grails-app/services/io/xh/hoist/log/LogLevelService.groovy b/grails-app/services/io/xh/hoist/log/LogLevelService.groovy index 1cca82a8..ae109116 100644 --- a/grails-app/services/io/xh/hoist/log/LogLevelService.groovy +++ b/grails-app/services/io/xh/hoist/log/LogLevelService.groovy @@ -23,11 +23,12 @@ class LogLevelService extends BaseService { private List adjustments = [] void init() { - createTimer(interval: 30 * MINUTES, runImmediatelyAndBlock: true) - } - - private void onTimer() { - calculateAdjustments() + createTimer( + name: 'calculateAdjustments', + runFn: this.&calculateAdjustments, + interval: 30 * MINUTES, + runImmediatelyAndBlock: true + ) } // ------------------------------------------------------------------------------- diff --git a/grails-app/services/io/xh/hoist/monitor/MonitorService.groovy b/grails-app/services/io/xh/hoist/monitor/MonitorService.groovy index 7208e4d5..ffa8cb85 100644 --- a/grails-app/services/io/xh/hoist/monitor/MonitorService.groovy +++ b/grails-app/services/io/xh/hoist/monitor/MonitorService.groovy @@ -41,17 +41,18 @@ class MonitorService extends BaseService { // Shared state for all servers to read - gathered by primary from all instances. // Map of monitor code to aggregated (cross-instance) results. - private CachedValue> _results = new CachedValue<>( + private CachedValue> _results = createCachedValue( name: 'results', - replicate: true, - svc: this + replicate: true ) private Timer timer void init() { timer = createTimer( - interval: { monitorInterval }, + name: 'runMonitors', + runFn: this.&runMonitors, + interval: {monitorInterval}, delay: startupDelay, primaryOnly: true ) @@ -86,7 +87,7 @@ class MonitorService extends BaseService { //------------------ // Implementation //------------------ - private void onTimer() { + private void runMonitors() { // Gather per-instance results from across the cluster Map> newChecks = clusterService .submitToAllInstances(new RunAllMonitorsTask()) diff --git a/src/main/groovy/io/xh/hoist/BaseService.groovy b/src/main/groovy/io/xh/hoist/BaseService.groovy index 85b8e1c8..4f104442 100644 --- a/src/main/groovy/io/xh/hoist/BaseService.groovy +++ b/src/main/groovy/io/xh/hoist/BaseService.groovy @@ -15,6 +15,10 @@ import com.hazelcast.topic.Message import grails.async.Promises import grails.util.GrailsClassUtils import groovy.transform.CompileDynamic +import groovy.transform.NamedParam +import groovy.transform.NamedVariant +import io.xh.hoist.cache.Cache +import io.xh.hoist.cache.CachedValue import io.xh.hoist.cluster.ClusterService import io.xh.hoist.exception.ExceptionHandler import io.xh.hoist.log.LogSupport @@ -32,6 +36,7 @@ import java.util.concurrent.TimeUnit import static grails.async.Promises.task import static io.xh.hoist.util.DateTimeUtils.SECONDS +import static io.xh.hoist.util.DateTimeUtils.MINUTES import static io.xh.hoist.util.Utils.appContext import static io.xh.hoist.util.Utils.getConfigService @@ -40,6 +45,12 @@ import static io.xh.hoist.util.Utils.getConfigService * Provides template methods for service lifecycle / state management plus support for user lookups. * As an abstract class, BaseService must reside in src/main/groovy to allow Java compilation and * to ensure it is not itself instantiated as a Grails service. + * + * BaseService also provides support for cluster aware state via factories to create + * Hoist objects such as Cache, CachedValue, Timer, as well as raw Hazelcast distributed + * data structures such as ReplicatedMap, ISet and IMap. Objects created with these factories + * will be associated with this service for the purposes of logging and management via the + * Hoist admin console. */ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBean { @@ -51,11 +62,12 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea Date initializedDate = null Date lastCachesCleared = null - protected final List timers = [] + // Caches, CachedValues and Timers and other distributed objects associated with this service + protected final ConcurrentHashMap resources = [:] private boolean _destroyed = false - private Map _replicatedCachedValues - private Map _localCachedValues + private Map _replicatedValues + private Map _localValues private final Logger _log = LoggerFactory.getLogger(this.class) @@ -109,46 +121,119 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea // Distributed Resources // Use static reference to ClusterService to allow access pre-init. //------------------------------------------------------------------ - IMap getIMap(String id) { - ClusterService.hzInstance.getMap(hzName(id)) + /** + * Create and return a reference to a Hazelcast IMap. + * + * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects + * associated with this service. + */ + IMap createIMap(String name) { + addResource(name, ClusterService.hzInstance.getMap(hzName(name))) } - ISet getISet(String id) { - ClusterService.hzInstance.getSet(hzName(id)) + /** + * Create and return a reference to a Hazelcast ISet. + * + * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects + * associated with this service. + */ + ISet createISet(String name) { + addResource(name, ClusterService.hzInstance.getSet(hzName(name))) } - ReplicatedMap getReplicatedMap(String id) { - ClusterService.hzInstance.getReplicatedMap(hzName(id)) + /** + * Create and return a reference to a Hazelcast Replicated Map. + * + * @param name - must be unique across all Caches, Timers and distributed Hazelcast objects + * associated with this service. + */ + ReplicatedMap createReplicatedMap(String name) { + addResource(name, ClusterService.hzInstance.getReplicatedMap(hzName(name))) } - ITopic getTopic(String id) { + /** + * Get a reference to a Hazelcast Replicated topic, useful to publish to a cluster-wide topic. + * To subscribe to events fired by other services on a topic, use {@link #subscribeToTopic}. + */ + ITopic getTopic(String id) { ClusterService.hzInstance.getTopic(id) } /** - * Create a new managed Timer bound to this service. - * @param args - arguments appropriate for a Hoist Timer. + * Create a new managed {@link Timer} bound to this service. + * + * Note that the provided name must be unique across all Caches, Timers and distributed + * Hazelcast objects associated with this service. */ @CompileDynamic - protected Timer createTimer(Map args) { - args.owner = this - if (!args.runFn && metaClass.respondsTo(this, 'onTimer')) { - args.runFn = this.&onTimer + @NamedVariant + Timer createTimer( + @NamedParam(required = true) String name, + @NamedParam Closure runFn = null, + @NamedParam Boolean primaryOnly = false, + @NamedParam Boolean runImmediatelyAndBlock = false, + @NamedParam Object interval = null, + @NamedParam Object timeout = 3 * MINUTES, + @NamedParam Object delay = false, + @NamedParam Long intervalUnits = 1, + @NamedParam Long timeoutUnits = 1 + ) { + if (!runFn) { + if (metaClass.respondsTo(this, 'onTimer')) { + runFn = this.&onTimer + } else { + throw new IllegalArgumentException('Must specify a runFn, or provide an onTimer() method on this service.') + } } - def ret = new Timer(args) - timers << ret - return ret + + addResource(name, + new Timer( + name, + this, + runFn, + primaryOnly, + runImmediatelyAndBlock, + interval, + timeout, + delay, + intervalUnits, + timeoutUnits + ) + ) } /** - * Managed Subscription to a Grails Event. + * Create a new {@link Cache} bound to this service. * - * NOTE: Use this method to subscribe to local Grails events on the given server - * instance only. To subscribe to cluster-wide topics, use 'subscribeToTopic' instead. + * Note that the provided name must be unique across all Caches, Timers and distributed + * Hazelcast objects associated with this service. + */ + Cache createCache(Map mp) { + // Cannot use @NamedVariant, as incompatible with generics. We'll still get run-time checks. + addResource(mp.name as String, new Cache([*:mp, svc: this])) + } + + /** + * Create a new {@link CachedValue} bound to this service. + * + * Note that the provided name must be unique across all Caches, Timers and distributed + * Hazelcast objects associated with this service. + */ + CachedValue createCachedValue(Map mp) { + // Cannot use @NamedVariant, as incompatible with generics. We'll still get run-time checks. + addResource(mp.name as String, new CachedValue([*:mp, svc: this])) + } + + + /** + * Create a managed subscription to events on the instance-local Grails event bus. + * + * NOTE: this method subscribes to Grails events on the current server instance only. + * To subscribe to cluster-wide topics, use {@link #subscribeToTopic} instead. * * This method will catch (and log) any exceptions thrown by its handler closure. - * This is important because the core grails EventBus.subscribe() will silently swallow - * exceptions, and stop processing subsequent handlers. + * This is important because the core Grails `EventBus.subscribe()` will silently swallow + * exceptions and stop processing subsequent handlers. * * This subscription also avoids firing handlers on destroyed services. This is important in a * hot-reloading scenario where multiple instances of singleton services may be created. @@ -168,22 +253,23 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea /** * - * Managed Subscription to a cluster topic. + * Create a managed subscription to a cluster topic. * - * NOTE: Use this method to subscribe to cluster-wide topics. To subscribe to local - * Grails events on this instance only, use subscribe instead. + * NOTE: this subscribes to cluster-wide topics. To subscribe to local Grails events on this + * instance only, use {@link #subscribe} instead. That said, this is most likely the method you + * want, as most pub/sub use cases should take multi-instance operation into account. * * This method will catch (and log) any exceptions thrown by its handler closure. * * This subscription also avoids firing handlers on destroyed services. This is important in a * hot-reloading scenario where multiple instances of singleton services may be created. */ - protected void subscribeToTopic(Map config) { - def topic = config.topic as String, - onMessage = config.onMessage as Closure, - primaryOnly = config.primaryOnly as Boolean - - + @NamedVariant + protected void subscribeToTopic( + @NamedParam(required = true) String topic, + @NamedParam(required = true) Closure onMessage, + @NamedParam Boolean primaryOnly = false + ) { getTopic(topic).addMessageListener { Message m -> if (destroyed || (primaryOnly && !isPrimary)) return try { @@ -230,8 +316,8 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea * Called by Spring on a clean shutdown of the application. */ void destroy() { - timers.each { - it.cancel() + resources.each { k, v -> + if (v instanceof Timer) v.cancel() } _destroyed = true } @@ -292,17 +378,39 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea //------------------------ // Internal implementation //------------------------ - protected String hzName(String key) { - this.class.name + '_' + key + protected String hzName(String name) { + this.class.name + '_' + name } - /** @internal - for use by CachedValue */ - Map getReplicatedCachedValuesMap() { - _replicatedCachedValues ?= getReplicatedMap('cachedValues') + private T addResource(String name, T resource) { + if (!name || resources.containsKey(name)) { + def msg = 'Service resource requires a unique name. ' + if (name) msg += "Name '$name' already used on this service." + throw new RuntimeException(msg) + } + resources[name] = resource + return resource } - /** @internal - for use by CachedValue */ - Map getLocalCachedValuesMap() { - _localCachedValues ?= new ConcurrentHashMap() + /** + * @internal - for use by Cache. + */ + Map getMapForCache(Cache cache) { + // register with xh prefix to avoid collisions, allow filtering out in admin + cache.useCluster ? createReplicatedMap("xh_${cache.name}") : new ConcurrentHashMap() + } + + /** + * @internal - for use by CachedValue + */ + Map getMapForCachedValue(CachedValue cachedValue) { + // register with xh prefix to avoid collisions, allow filtering out in admin + if (cachedValue.useCluster) { + if (_replicatedValues == null) _replicatedValues = createReplicatedMap('xh_cachedValues') + return _replicatedValues + } else { + if (_localValues == null) _localValues = new ConcurrentHashMap() + return _localValues + } } } diff --git a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy b/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy index faed7bcc..94dcfc93 100644 --- a/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/BaseCache.groovy @@ -55,16 +55,16 @@ abstract class BaseCache { public final List onChange = [] BaseCache( - BaseService svc, String name, + BaseService svc, Object expireTime, Closure expireFn, Closure timestampFn, boolean replicate, boolean serializeOldValue ) { - this.svc = svc this.name = name + this.svc = svc this.expireTime = expireTime this.expireFn = expireFn this.timestampFn = timestampFn @@ -78,13 +78,17 @@ abstract class BaseCache { /** Clear all values. */ abstract void clear() - //------------------------ - // Implementation - //------------------------ - protected boolean getUseCluster() { + /** True if this Cache should be stored across the cluster (backed by a ReplicatedMap). */ + boolean getUseCluster() { return replicate && ClusterService.multiInstanceEnabled } + /** Information about this object, accessible via the Hoist Admin Console. */ + abstract Map getAdminStats() + + //------------------------ + // Implementation + //------------------------ protected void fireOnChange(Object key, V oldValue, V value) { def change = new CacheValueChanged(this, key, oldValue, value) onChange.each { it.call(change) } diff --git a/src/main/groovy/io/xh/hoist/cache/Cache.groovy b/src/main/groovy/io/xh/hoist/cache/Cache.groovy index 26aa17c3..4e1e36d0 100644 --- a/src/main/groovy/io/xh/hoist/cache/Cache.groovy +++ b/src/main/groovy/io/xh/hoist/cache/Cache.groovy @@ -12,8 +12,6 @@ import groovy.transform.NamedParam import groovy.transform.NamedVariant import io.xh.hoist.BaseService import io.xh.hoist.util.Timer - -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeoutException import static io.xh.hoist.util.DateTimeUtils.MINUTES @@ -25,46 +23,43 @@ import static java.lang.System.currentTimeMillis * A key-value Cache, with support for optional entry TTL and replication across a cluster. */ @CompileStatic -class Cache extends BaseCache { +class Cache extends BaseCache { private final Map> _map - private final Timer timer + private final Timer cullTimer + /** @internal - do not construct directly - use {@link BaseService#createCache}. */ @NamedVariant Cache( + @NamedParam(required = true) String name, @NamedParam(required = true) BaseService svc, - @NamedParam String name, @NamedParam Object expireTime = null, @NamedParam Closure expireFn = null, @NamedParam Closure timestampFn = null, - @NamedParam boolean replicate = false, - @NamedParam boolean serializeOldValue = false, + @NamedParam Boolean replicate = false, + @NamedParam Boolean serializeOldValue = false, @NamedParam Closure onChange = null ) { - super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue) - - if (replicate && !name) { - throw new IllegalArgumentException("Cannot create a replicated Cache without a unique name") - } + super(name, svc, expireTime, expireFn, timestampFn, replicate, serializeOldValue) - _map = useCluster ? svc.getReplicatedMap(name) : new ConcurrentHashMap() + _map = svc.getMapForCache(this) if (onChange) addChangeHandler(onChange) - timer = new Timer( - owner: svc, - primaryOnly: replicate, + cullTimer = svc.createTimer( + name: "xh_${name}_cullEntries", runFn: this.&cullEntries, interval: 15 * MINUTES, - delay: true + delay: true, + primaryOnly: useCluster ) } - /** @returns the cached value at key. */ + /** @returns the cached value at key. */ V get(K key) { return getEntry(key)?.value } - /** @returns the cached Entry at key. */ + /** @returns the cached Entry at key. */ Entry getEntry(K key) { def ret = _map[key] if (ret && shouldExpire(ret)) { @@ -91,7 +86,7 @@ class Cache extends BaseCache { if (!useCluster) fireOnChange(this, oldEntry?.value, obj) } - /** @returns cached value for key, or lazily creates if needed. */ + /** @returns cached value for key, or lazily creates if needed. */ V getOrCreate(K key, Closure c) { V ret = get(key) if (ret == null) { @@ -101,13 +96,13 @@ class Cache extends BaseCache { return ret } - /** @returns a Map representation of currently cached data. */ + /** @returns a Map representation of currently cached data. */ Map getMap() { cullEntries() - return (Map) _map.collectEntries {k, v -> [k, v.value]} + return (Map) _map.collectEntries { k, v -> [k, v.value] } } - /** @returns the timestamp of the cached Entry at key. */ + /** @returns the timestamp of the cached Entry at key. */ Long getTimestamp(K key) { return getEntryTimestamp(_map[key]) } @@ -124,7 +119,7 @@ class Cache extends BaseCache { void clear() { // Remove key-wise to ensure that we get the proper removal message for each value and // work around exceptions with clear on replicated map. - _map.each { k, v -> remove(k)} + _map.each { k, v -> remove(k) } } void addChangeHandler(Closure handler) { @@ -137,10 +132,10 @@ class Cache extends BaseCache { /** * Wait for the cache entry to be populated. - * @param key, entry to check - * @param timeout, time in ms to wait. -1 to wait indefinitely (not recommended). - * @param interval, time in ms to wait between tests. - * @param timeoutMessage, custom message associated with any timeout. + * @param key - entry to check + * @param timeout - time in ms to wait. -1 to wait indefinitely (not recommended). + * @param interval - time in ms to wait between tests. + * @param timeoutMessage - custom message associated with any timeout. */ @NamedVariant void ensureAvailable( @@ -161,6 +156,17 @@ class Cache extends BaseCache { } } + Map getAdminStats() { + [ + name : name, + type : 'Cache' + (replicate ? ' (replicated)' : ''), + count : size(), + latestTimestamp: _map.max { it.value.dateEntered }?.value?.dateEntered, + lastCullTime : cullTimer.lastRunCompleted + ] + } + + //------------------------ // Implementation //------------------------ @@ -175,7 +181,7 @@ class Cache extends BaseCache { } if (cullKeys) { - svc.logDebug("Cache '${name ?: "anon"}' culled ${cullKeys.size()} out of $oldSize entries") + svc.logDebug("Cache '$name' culled ${cullKeys.size()} out of $oldSize entries") } } } diff --git a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy index 91103548..ee8999af 100644 --- a/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy +++ b/src/main/groovy/io/xh/hoist/cache/CachedValue.groovy @@ -19,20 +19,21 @@ class CachedValue extends BaseCache { private final Map> _map + /** @internal - do not construct directly - use {@link BaseService#createCachedValue}. */ @NamedVariant CachedValue( - @NamedParam(required = true) BaseService svc, @NamedParam(required = true) String name, + @NamedParam(required = true) BaseService svc, @NamedParam Object expireTime = null, @NamedParam Closure expireFn = null, @NamedParam Closure timestampFn = null, - @NamedParam boolean replicate = false, - @NamedParam boolean serializeOldValue = false, + @NamedParam Boolean replicate = false, + @NamedParam Boolean serializeOldValue = false, @NamedParam Closure onChange = null ) { - super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue) + super(name, svc, expireTime, expireFn, timestampFn, replicate, serializeOldValue) - _map = useCluster ? svc.replicatedCachedValuesMap : svc.localCachedValuesMap + _map = svc.getMapForCachedValue(this) if (onChange) addChangeHandler(onChange) } @@ -109,4 +110,12 @@ class CachedValue extends BaseCache { } onChange << handler } + + Map getAdminStats() { + [ + name : name, + type : 'CachedValue' + (replicate ? ' (replicated)' : ''), + timestamp: timestamp + ] + } } diff --git a/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy b/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy index c9e10383..372ef51c 100644 --- a/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy +++ b/src/main/groovy/io/xh/hoist/role/provided/DefaultRoleService.groovy @@ -81,10 +81,9 @@ class DefaultRoleService extends BaseRoleService { DefaultRoleUpdateService defaultRoleUpdateService private Timer timer - protected CachedValue>> _allRoleAssignments = new CachedValue( + protected CachedValue>> _allRoleAssignments = createCachedValue( name: 'roleAssignments', replicate: true, - svc: this, onChange: { _roleAssignmentsByUser = new ConcurrentHashMap() } @@ -102,8 +101,9 @@ class DefaultRoleService extends BaseRoleService { ensureRequiredConfigAndRolesCreated() timer = createTimer( - interval: { config.refreshIntervalSecs as int * SECONDS }, + name: 'refreshRoles', runFn: this.&refreshRoleAssignments, + interval: { config.refreshIntervalSecs as int * SECONDS }, runImmediatelyAndBlock: true, primaryOnly: true ) diff --git a/src/main/groovy/io/xh/hoist/util/Timer.groovy b/src/main/groovy/io/xh/hoist/util/Timer.groovy index b46c7598..eda05508 100644 --- a/src/main/groovy/io/xh/hoist/util/Timer.groovy +++ b/src/main/groovy/io/xh/hoist/util/Timer.groovy @@ -7,6 +7,10 @@ package io.xh.hoist.util +import groovy.transform.NamedParam +import groovy.transform.NamedVariant +import io.xh.hoist.BaseService +import io.xh.hoist.cache.CachedValue import io.xh.hoist.log.LogSupport import java.util.concurrent.ExecutionException @@ -22,19 +26,28 @@ import static io.xh.hoist.util.Utils.configService import static io.xh.hoist.util.Utils.getExceptionHandler /** - * Core Hoist Timer object. + * Hoist's implementation of an interval-based Timer, for running tasks on a repeated interval. + * Supports a dynamic / configurable run interval, startup delay, and timeout. Used by services + * that need to schedule work to maintain internal state, eg regularly refreshing a cache from an + * external data source. * - * This object is typically used by services that need to schedule work to maintain - * internal state. + * This class ensures that only one instance of the task is running at a time. To schedule an ad hoc + * run, call `forceRun()` to run again as soon as any in-progress run completes, or ASAP on the next + * tick of the Timer's internal (and fast) interval-evaluation heartbeat. + * + * Timers can be configured to run only on the primary instance in a clustered environment, to + * ensure that tasks with external side effects are not run on every instance unless so desired. + * A common pattern would be to have the primary instance run a Timer-based job to load data into + * a cache, with the cache then replicated across the cluster. */ class Timer { private static Long CONFIG_INTERVAL = 15 * SECONDS - /** Optional name for this timer (for logging purposes) **/ + /** Unique name for this timer, required for cluster aware timers (see `primaryOnly`) **/ final String name - /** Object using this timer (for logging purposes) **/ + /** Object using this timer **/ final LogSupport owner /** Closure to run */ @@ -73,7 +86,10 @@ class Timer { /** Block on an immediate initial run? Default is false. */ final boolean runImmediatelyAndBlock - /** Only run job when clustered instance is the primary instance? Default is false. */ + /** + * Only run job when clustered instance is the primary instance? Default is false. + * For timers owned by instances of BaseService only. + */ final boolean primaryOnly @@ -109,6 +125,9 @@ class Timer { private java.util.Timer configTimer + private CachedValue _lastCompletedOnCluster + + // Args from Grails 3.0 async promise implementation static ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()) @@ -116,21 +135,37 @@ class Timer { * Applications should not typically use this constructor directly. Timers are typically * created by services using the createTimer() method supplied by io.xh.hoist.BaseService. */ - Timer(Map config) { - name = config.name - owner = config.owner - runFn = config.runFn - primaryOnly = config.primaryOnly ?: false - runImmediatelyAndBlock = config.runImmediatelyAndBlock ?: false - interval = parseDynamicValue(config.interval) - timeout = parseDynamicValue(config.containsKey('timeout') ? config.timeout : 3 * MINUTES) - delay = config.delay ?: false - - intervalUnits = config.intervalUnits ?: 1 - timeoutUnits = config.timeoutUnits ?: 1 - - if ([owner, interval, runFn].contains(null)) throw new RuntimeException('Missing required arguments for Timer.') - if (config.delayUnits) throw new RuntimeException('delayUnits has been removed from the API. Specify delay in ms.') + @NamedVariant + Timer( + @NamedParam(required = true) String name, + @NamedParam(required = true) LogSupport owner, + @NamedParam(required = true) Closure runFn, + @NamedParam Boolean primaryOnly = false, + @NamedParam Boolean runImmediatelyAndBlock = false, + @NamedParam Object interval = null, + @NamedParam Object timeout = 3 * MINUTES, + @NamedParam Object delay = false, + @NamedParam Long intervalUnits = 1, + @NamedParam Long timeoutUnits = 1 + ) { + this.name = name + this.owner = owner + this.runFn = runFn + this.primaryOnly = primaryOnly + this.runImmediatelyAndBlock = runImmediatelyAndBlock + this.interval = parseDynamicValue(interval) + this.timeout = parseDynamicValue(timeout) + this.delay = delay + this.intervalUnits = intervalUnits + this.timeoutUnits = timeoutUnits + + if (primaryOnly) { + if (!owner instanceof BaseService) { + throw new IllegalArgumentException("A 'primaryOnly' timer must be owned by an instance of BaseService.") + } + + _lastCompletedOnCluster = (owner as BaseService).createCachedValue(name: "xh_${name}_lastCompleted") + } intervalMs = calcIntervalMs() timeoutMs = calcTimeoutMs() @@ -155,32 +190,30 @@ class Timer { } /** - * Force a new execution as soon as possible. + * Force a new execution as soon as possible, on the next scheduled internal heartbeat, or as + * soon as any already in-progress execution completes. * - * This will occur on the next scheduled heartbeat, or as soon as any in-progress executions complete. - * Any subsequent calls to this method before this additional execution has completed will be ignored. + * Note that any additional calls to this method before an already-requested force run has + * completed will be ignored. */ void forceRun() { forceRun = true } /** - * Cancel this timer. - * - * This will prevent any additional executions of this timer. In-progress executions will be unaffected. + * Cancel this timer, permanently preventing any additional executions. + * In-progress executions will be unaffected. */ void cancel() { coreTimer?.cancel() configTimer?.cancel() } - /** - * Information about this time for admin purposes. - */ + /** Information about this timer, accessible via the Hoist Admin Console. */ Map getAdminStats() { [ name: name, - primaryOnly: primaryOnly?: null, + type: 'Timer' + (primaryOnly ? ' (primary only)': ''), intervalMs: intervalMs, isRunning: isRunning, startTime: isRunning ? _lastRunStarted: null, @@ -217,6 +250,7 @@ class Timer { } _lastRunCompleted = new Date() + _lastCompletedOnCluster?.set(_lastRunCompleted) _isRunning = false _lastRunStats = [ startTime: _lastRunStarted, @@ -229,7 +263,7 @@ class Timer { exceptionHandler.handleException( exception: throwable, logTo: owner, - logMessage: "Failure in ${name ?: 'timer'}" + logMessage: "Failure in '$name'" ) } catch (Throwable ignore) { owner.logError('Failed to handle exception in Timer') @@ -287,15 +321,19 @@ class Timer { // frequently enough to pickup forceRun reasonably fast. Tighten down for the rare fast timer. //------------------------------------------------------------------------------------------- private void onCoreTimer() { - if (!isRunning) { - if ((intervalMs > 0 && intervalElapsed(intervalMs, lastRunCompleted)) || forceRun) { - boolean wasForced = forceRun - doRun() - if (wasForced) forceRun = false - } + if (!isRunning && (forceRun || intervalHasElapsed())) { + boolean wasForced = forceRun + doRun() + if (wasForced) forceRun = false } } + private boolean intervalHasElapsed() { + if (intervalMs <= 0) return false + def lastRun = _lastCompletedOnCluster ? _lastCompletedOnCluster.get() : _lastRunCompleted + return intervalElapsed(intervalMs, lastRun) + } + private Long calcCoreIntervalMs() { return (intervalMs > 2 * SECONDS) ? 1 * SECONDS : 250; }