Skip to content

Commit

Permalink
Improvements to Cluster API and Performance (#406)
Browse files Browse the repository at this point in the history
cr: atm and Tom
  • Loading branch information
lbwexler authored Sep 25, 2024
1 parent 6a2ba2c commit b424f50
Show file tree
Hide file tree
Showing 19 changed files with 611 additions and 409 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

## 23.0-SNAPSHOT - unreleased

### 💥 Breaking Changes (upgrade difficulty: 🟢 LOW)

* Improvements to the efficiency of `CachedValue` for sharing of large objects. This included
moving to its own package `io.xh.hoist.cachedvalue` for clarity.
* New dynamic configuration for all distributed hazelcast objects. See `ClusterService.configureXXX`.
This replaces the static map `BaseService.clusterConfigs`.
* Misc. improvements to logging and performance of `Cache` and `Timer`.

## 22.0.0 - 2024-09-18

### 💥 Breaking Changes (upgrade difficulty: 🟢 LOW)
Expand Down
42 changes: 4 additions & 38 deletions grails-app/init/io/xh/hoist/ClusterConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ class ClusterConfig {

createDefaultConfigs(ret)
createHibernateConfigs(ret)
createServiceConfigs(ret)

KryoSupport.setAsGlobalSerializer(ret)

Expand All @@ -124,12 +123,7 @@ class ClusterConfig {
* Note that Hoist also introduces two properties for declarative configuration:
*
* - a static 'cache' property on Grails domain objects to customize associated
* Hibernate caches.
* - a static 'clusterConfigs' property on Grails services to customize any Hazelcast
* Distributed Objects associated with the service e.g. Hoist caches
*
* See toolbox's `Phase` object and Hoist Core's `ClientErrorService` for examples of these
* customizations.
* Hibernate caches. See toolbox's `Phase` object for examples.
*/
protected void createDefaultConfigs(Config config) {
config.getMapConfig('default').with {
Expand All @@ -146,6 +140,9 @@ class ClusterConfig {
config.getTopicConfig('default').with {
statisticsEnabled = true
}
config.getReliableTopicConfig('default').with {
statisticsEnabled = true
}
config.getSetConfig('default').with {
statisticsEnabled = true
}
Expand Down Expand Up @@ -194,35 +191,4 @@ class ClusterConfig {
}
}
}

private void createServiceConfigs(Config config) {
grailsApplication.serviceClasses.each { GrailsClass gc ->
// Apply any app customization specified by new static prop introduced by Hoist
Map objs = gc.getPropertyValue('clusterConfigs')
if (!objs) return
objs.forEach {String key, List value ->
def customizer = value[1] as Closure,
objConfig
switch (value[0]) {
case IMap:
objConfig = config.getMapConfig(gc.fullName + '_' + key)
break
case ReplicatedMap:
objConfig = config.getReplicatedMapConfig(gc.fullName + '_' + key)
break
case ISet:
objConfig = config.getSetConfig(gc.fullName + '_' + key)
break
case ITopic:
objConfig = config.getTopicConfig(key)
break
default:
throw new RuntimeException('Unable to configure Cluster object')
}
customizer.delegate = objConfig
customizer.resolveStrategy = Closure.DELEGATE_FIRST
customizer(objConfig)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package io.xh.hoist.alertbanner

import groovy.transform.CompileStatic
import io.xh.hoist.BaseService
import io.xh.hoist.cache.CachedValue
import io.xh.hoist.cachedvalue.CachedValue
import io.xh.hoist.config.ConfigService
import io.xh.hoist.jsonblob.JsonBlobService
import io.xh.hoist.util.Timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,14 @@ class ClientErrorService extends BaseService {
def clientErrorEmailService,
configService

static clusterConfigs = [
clientErrors: [IMap, {
evictionConfig.size = 100
}]
]

private IMap<String, Map> errors = createIMap('clientErrors')
private int getMaxErrors() {configService.getMap('xhClientErrorConfig').maxErrors as int}
private int getAlertInterval() {configService.getMap('xhClientErrorConfig').intervalMins * MINUTES}

private IMap<String, Map> errors

void init() {
super.init()
errors = createIMap('clientErrors') {it.evictionConfig.size = 100}
createTimer(
name: 'processErrors',
runFn: this.&processErrors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.hazelcast.executor.impl.ExecutorServiceProxy
import com.hazelcast.map.IMap
import com.hazelcast.nearcache.NearCacheStats
import com.hazelcast.replicatedmap.ReplicatedMap
import com.hazelcast.ringbuffer.impl.RingbufferProxy
import com.hazelcast.topic.ITopic
import io.xh.hoist.BaseService
import io.xh.hoist.util.Utils
Expand Down Expand Up @@ -142,6 +143,13 @@ class ClusterAdminService extends BaseService {
publishOperationCount: stats.publishOperationCount,
receiveOperationCount: stats.receiveOperationCount
]
case RingbufferProxy:
return [
name : obj.getName(),
type : 'RingBuffer',
size : obj.size(),
capacity : obj.capacity()
]
case CacheProxy:
def evictionConfig = obj.cacheConfig.evictionConfig,
expiryPolicy = obj.cacheConfig.expiryPolicyFactory.create(),
Expand Down
42 changes: 42 additions & 0 deletions grails-app/services/io/xh/hoist/cluster/ClusterService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import com.hazelcast.cluster.Cluster
import com.hazelcast.cluster.Member
import com.hazelcast.cluster.MembershipEvent
import com.hazelcast.cluster.MembershipListener
import com.hazelcast.collection.ISet
import com.hazelcast.config.Config
import com.hazelcast.core.DistributedObject
import com.hazelcast.core.Hazelcast
import com.hazelcast.core.HazelcastInstance
import com.hazelcast.core.IExecutorService
import com.hazelcast.map.IMap
import com.hazelcast.replicatedmap.ReplicatedMap
import com.hazelcast.topic.ITopic
import io.xh.hoist.BaseService
import io.xh.hoist.ClusterConfig
import io.xh.hoist.exception.InstanceNotFoundException
Expand Down Expand Up @@ -152,6 +157,39 @@ class ClusterService extends BaseService implements ApplicationListener<Applicat
.collectEntries { member, f -> [member.getAttribute('instanceName'), f.get()] }
}

//------------------
// Create Objects
//-----------------
static <K, V> IMap<K, V> configuredIMap(String name, Closure customizer = null) {
customizer?.call(hzConfig.getMapConfig(name))
hzInstance.getMap(name)
}

static <V> ISet<V> configuredISet(String name, Closure customizer = null) {
customizer?.call(hzConfig.getSetConfig(name))
hzInstance.getSet(name)
}

static <K, V> ReplicatedMap<K, V> configuredReplicatedMap(String name, Closure customizer = null) {
customizer?.call(hzConfig.getReplicatedMapConfig(name))
hzInstance.getReplicatedMap(name)
}

static <M> ITopic<M> configuredTopic(String name, Closure customizer = null) {
customizer?.call(hzConfig.getTopicConfig(name))
hzInstance.getTopic(name)
}

static <M> ITopic<M> configuredReliableTopic(
String name,
Closure customizer = null,
Closure ringBufferCustomizer = null
) {
ringBufferCustomizer?.call(hzConfig.getRingbufferConfig(name))
customizer?.call(hzConfig.getReliableTopicConfig(name))
hzInstance.getReliableTopic(name)
}

//------------------------------------
// Implementation
//------------------------------------
Expand Down Expand Up @@ -188,6 +226,10 @@ class ClusterService extends BaseService implements ApplicationListener<Applicat
return (clazz.getConstructor().newInstance() as ClusterConfig)
}

private static Config getHzConfig() {
hzInstance.config
}

void onApplicationEvent(ApplicationReadyEvent event) {
isReady = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import grails.compiler.GrailsCompileStatic
import grails.gorm.transactions.ReadOnly
import groovy.transform.CompileDynamic
import io.xh.hoist.BaseService
import io.xh.hoist.cache.CachedValue
import io.xh.hoist.cachedvalue.CachedValue
import io.xh.hoist.cluster.ClusterRequest
import io.xh.hoist.config.ConfigService
import io.xh.hoist.util.Timer
Expand Down
70 changes: 29 additions & 41 deletions src/main/groovy/io/xh/hoist/BaseService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import groovy.transform.CompileDynamic
import groovy.transform.NamedParam
import groovy.transform.NamedVariant
import io.xh.hoist.cache.Cache
import io.xh.hoist.cache.CachedValue
import io.xh.hoist.cachedvalue.CachedValue
import io.xh.hoist.cluster.ClusterService
import io.xh.hoist.exception.ExceptionHandler
import io.xh.hoist.log.LogSupport
Expand All @@ -35,10 +35,14 @@ import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit

import static grails.async.Promises.task
import static io.xh.hoist.cluster.ClusterService.configuredIMap
import static io.xh.hoist.cluster.ClusterService.configuredISet
import static io.xh.hoist.cluster.ClusterService.configuredReplicatedMap
import static io.xh.hoist.util.DateTimeUtils.SECONDS
import static io.xh.hoist.util.DateTimeUtils.MINUTES
import static io.xh.hoist.util.Utils.appContext
import static io.xh.hoist.util.Utils.getConfigService
import static io.xh.hoist.cluster.ClusterService.hzInstance

/**
* Standard superclass for all Hoist and Application-level services.
Expand Down Expand Up @@ -66,8 +70,6 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea
protected final ConcurrentHashMap<String, Object> resources = [:]

private boolean _destroyed = false
private Map _replicatedValues
private Map _localValues

private final Logger _log = LoggerFactory.getLogger(this.class)

Expand Down Expand Up @@ -119,44 +121,46 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea

//-----------------------------------------------------------------
// Distributed Resources
// Use static reference to ClusterService to allow access pre-init.
//------------------------------------------------------------------
/**
* Create and return a reference to a Hazelcast IMap.
*
* @param name - must be unique across all Caches, Timers and distributed Hazelcast objects
* associated with this service.
* @param customizer - closure receiving a Hazelcast MapConfig. Mutate to customize.
*/
<K, V> IMap<K, V> createIMap(String name) {
addResource(name, ClusterService.hzInstance.getMap(hzName(name)))
<K, V> IMap<K, V> createIMap(String name, Closure customizer = null) {
addResource(name, configuredIMap(hzName(name), customizer))
}

/**
* Create and return a reference to a Hazelcast ISet.
*
* @param name - must be unique across all Caches, Timers and distributed Hazelcast objects
* associated with this service.
* @param customizer - closure receiving a Hazelcast SetConfig. Mutate to customize.
*/
<V> ISet<V> createISet(String name) {
addResource(name, ClusterService.hzInstance.getSet(hzName(name)))
<V> ISet<V> createISet(String name, Closure customizer = null) {
addResource(name, configuredISet(hzName(name), customizer))
}

/**
* Create and return a reference to a Hazelcast Replicated Map.
*
* @param name - must be unique across all Caches, Timers and distributed Hazelcast objects
* associated with this service.
* @param customizer - closure receiving a Hazelcast ReplicatedMapConfig. Mutate to customize.
*/
<K, V> ReplicatedMap<K, V> createReplicatedMap(String name) {
addResource(name, ClusterService.hzInstance.getReplicatedMap(hzName(name)))
}
<K, V> ReplicatedMap<K, V> createReplicatedMap(String name, Closure customizer = null) {
addResource(name, configuredReplicatedMap(hzName(name), customizer))
}

/**
* Get a reference to a Hazelcast Replicated topic, useful to publish to a cluster-wide topic.
* Get a reference to an existing or new Hazelcast topic.
* To subscribe to events fired by other services on a topic, use {@link #subscribeToTopic}.
*/
<M> ITopic<M> getTopic(String id) {
ClusterService.hzInstance.getTopic(id)
hzInstance.getTopic(id)
}

/**
Expand Down Expand Up @@ -224,7 +228,6 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea
addResource(mp.name as String, new CachedValue([*:mp, svc: this]))
}


/**
* Create a managed subscription to events on the instance-local Grails event bus.
*
Expand Down Expand Up @@ -285,7 +288,6 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea
}
}


//------------------
// Cluster Support
//------------------
Expand Down Expand Up @@ -374,14 +376,22 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea
// Provide cached logger to LogSupport for possible performance benefit
Logger getInstanceLog() { _log }

/**
* Generate a name for a resource, appropriate for Hazelcast.
* Note that this allows us to group all Hazelcast resources by Service
*
* Not typically called directly by applications. Applications should aim to create
* Hazelcast distributed objects using the methods in this class.
*
* @internal
*/
String hzName(String name) {
"${this.class.name}[$name]"
}

//------------------------
// Internal implementation
//------------------------
protected String hzName(String name) {
this.class.name + '_' + name
}

private <T> T addResource(String name, T resource) {
if (!name || resources.containsKey(name)) {
def msg = 'Service resource requires a unique name. '
Expand All @@ -391,26 +401,4 @@ abstract class BaseService implements LogSupport, IdentitySupport, DisposableBea
resources[name] = resource
return resource
}

/**
* @internal - for use by Cache.
*/
Map getMapForCache(Cache cache) {
// register with xh prefix to avoid collisions, allow filtering out in admin
cache.useCluster ? createReplicatedMap("xh_${cache.name}") : new ConcurrentHashMap()
}

/**
* @internal - for use by CachedValue
*/
Map getMapForCachedValue(CachedValue cachedValue) {
// register with xh prefix to avoid collisions, allow filtering out in admin
if (cachedValue.useCluster) {
if (_replicatedValues == null) _replicatedValues = createReplicatedMap('xh_cachedValues')
return _replicatedValues
} else {
if (_localValues == null) _localValues = new ConcurrentHashMap()
return _localValues
}
}
}
Loading

0 comments on commit b424f50

Please sign in to comment.