Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Timer/Cache/CachedValues API #399

Merged
merged 14 commits into from
Sep 16, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### ⚙️ Technical

* Improvements to `Timer` to avoid extra executions when primary instance changes.
lbwexler marked this conversation as resolved.
Show resolved Hide resolved

* Updated `ClusterService` to use Hoist's `InstanceNotFoundException` class to designate routine.

* Exposed `/xh/ping` as whitelisted route for basic uptime/reachability checks. Retained legacy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class AlertBannerService extends BaseService {

void init() {
timer = createTimer(
name: 'readFromSpec',
interval: 2 * MINUTES,
runFn: this.&readFromSpec,
primaryOnly: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ClientErrorService extends BaseService {
void init() {
super.init()
createTimer(
name: 'processErrors',
interval: { alertInterval },
delay: 15 * SECONDS,
primaryOnly: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class FeedbackEmailService extends BaseService {

void init() {
subscribeToTopic(
name: 'emailFeedback',
topic: 'xhFeedbackReceived',
onMessage: this.&emailFeedback,
primaryOnly: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class MonitorService extends BaseService {

void init() {
timer = createTimer(
name: 'runMonitors',
interval: { monitorInterval },
delay: startupDelay,
primaryOnly: true
Expand Down
1 change: 1 addition & 0 deletions src/main/groovy/io/xh/hoist/cache/Cache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class Cache<K,V> extends BaseCache<V> {
if (onChange) addChangeHandler(onChange)

timer = new Timer(
name: name ? "cullEntries_$name" : 'cullEntries',
owner: svc,
primaryOnly: replicate,
runFn: this.&cullEntries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class DefaultRoleService extends BaseRoleService {
ensureRequiredConfigAndRolesCreated()

timer = createTimer(
name: 'refreshRoles',
interval: { config.refreshIntervalSecs as int * SECONDS },
runFn: this.&refreshRoleAssignments,
runImmediatelyAndBlock: true,
Expand Down
41 changes: 32 additions & 9 deletions src/main/groovy/io/xh/hoist/util/Timer.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package io.xh.hoist.util

import io.xh.hoist.BaseService
import io.xh.hoist.cache.CachedValue
import io.xh.hoist.log.LogSupport

import java.util.concurrent.ExecutionException
Expand All @@ -31,10 +33,10 @@ class Timer {

private static Long CONFIG_INTERVAL = 15 * SECONDS

/** Optional name for this timer (for logging purposes) **/
/** Unique name for this timer, required for cluster aware timers (see `primaryOnly`) **/
final String name

/** Object using this timer (for logging purposes) **/
/** Object using this timer **/
final LogSupport owner

/** Closure to run */
Expand Down Expand Up @@ -73,7 +75,10 @@ class Timer {
/** Block on an immediate initial run? Default is false. */
final boolean runImmediatelyAndBlock

/** Only run job when clustered instance is the primary instance? Default is false. */
/**
* Only run job when clustered instance is the primary instance? Default is false.
* For timers owned by instances of BaseService only.
*/
final boolean primaryOnly


Expand Down Expand Up @@ -109,6 +114,9 @@ class Timer {
private java.util.Timer configTimer


private CachedValue<Date> _lastCompletedOnCluster
lbwexler marked this conversation as resolved.
Show resolved Hide resolved


// Args from Grails 3.0 async promise implementation
static ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())

Expand All @@ -132,6 +140,16 @@ class Timer {
if ([owner, interval, runFn].contains(null)) throw new RuntimeException('Missing required arguments for Timer.')
if (config.delayUnits) throw new RuntimeException('delayUnits has been removed from the API. Specify delay in ms.')

if (primaryOnly) {
if (!name) {
throw new IllegalArgumentException("Cannot create a 'primaryOnly' timer without a unique name")
}
if (!owner instanceof BaseService) {
throw new IllegalArgumentException("A 'primaryOnly' timer must be owned by an instance of BaseService.")
}
_lastCompletedOnCluster = new CachedValue<>(name: "xhTimer_$name", svc: owner as BaseService)
}

intervalMs = calcIntervalMs()
timeoutMs = calcTimeoutMs()
delayMs = calcDelayMs()
Expand Down Expand Up @@ -217,6 +235,7 @@ class Timer {
}

_lastRunCompleted = new Date()
_lastCompletedOnCluster?.set(_lastRunCompleted)
_isRunning = false
_lastRunStats = [
startTime: _lastRunStarted,
Expand Down Expand Up @@ -287,15 +306,19 @@ class Timer {
// frequently enough to pickup forceRun reasonably fast. Tighten down for the rare fast timer.
//-------------------------------------------------------------------------------------------
private void onCoreTimer() {
if (!isRunning) {
if ((intervalMs > 0 && intervalElapsed(intervalMs, lastRunCompleted)) || forceRun) {
boolean wasForced = forceRun
doRun()
if (wasForced) forceRun = false
}
if (!isRunning && (forceRun || isIntervalElapsed())) {
boolean wasForced = forceRun
doRun()
if (wasForced) forceRun = false
}
}

private boolean isIntervalElapsed() {
lbwexler marked this conversation as resolved.
Show resolved Hide resolved
if (intervalMs <= 0) return false
def lastRun = _lastCompletedOnCluster ? _lastCompletedOnCluster.get() : _lastRunCompleted
return intervalElapsed(intervalMs, lastRun)
}

private Long calcCoreIntervalMs() {
return (intervalMs > 2 * SECONDS) ? 1 * SECONDS : 250;
}
Expand Down
Loading