[CELEBORN-2364] Add support to cleanup stale shuffle dependencies across all Spark apps when cluster under load#3739
[CELEBORN-2364] Add support to cleanup stale shuffle dependencies across all Spark apps when cluster under load#3739saurabhd336 wants to merge 3 commits into
Conversation
…ps when cluster under load
|
@copilot Review this please |
SteNicholas
left a comment
There was a problem hiding this comment.
Review of the cluster-overload GC feature. The proto/SerDe plumbing for shouldTriggerGc is sound (backward-compatible, no unpatched matches). The substance is in Master.isClusterOverloaded and the client GC handler — comments inline.
Highest-priority items: the overload metric is broken on remote storage (S3/OSS/HDFS) and over-fires under load on local-disk clusters; the client System.gc() runs synchronously on the heartbeat thread and uses a non-monotonic clock for its cooldown.
| availableWorkers: java.util.Set[WorkerInfo]): Boolean = { | ||
| if (!conf.clusterOverloadGcEnabled) return false | ||
| val totalCapacity = workersMap.values().asScala.map(_.totalSpace()).sum | ||
| if (totalCapacity <= 0) return false |
There was a problem hiding this comment.
Breaks on remote storage (S3/OSS/HDFS). Remote DiskInfos are created with actualUsableSpace = Long.MaxValue and totalSpace = 0 (StorageManager's remoteDiskInfos) and are reported to the master via allDisksSnapshot() in every worker heartbeat. So WorkerInfo.totalSpace() contributes 0 per remote disk while totalActualUsableSpace() contributes Long.MaxValue.
- Remote-only cluster: every worker's
totalSpace()is0→totalCapacity == 0→ this guard returnsfalsepermanently. The feature is a silent no-op on exactly the deployments where stale-shuffle disk pressure matters most. - Hybrid cluster:
freeCapacity(line 1654) sumsLong.MaxValueper available remote worker → Long overflow to a negative value →usedFraction = 1 - neg/pos > 1→ reports overloaded every heartbeat (or, in the single-remote no-overflow case, hugely negative → never).
Consider excluding remote / Long.MaxValue disks from this computation, or scoping it to local-disk capacity only.
| if (!conf.clusterOverloadGcEnabled) return false | ||
| val totalCapacity = workersMap.values().asScala.map(_.totalSpace()).sum | ||
| if (totalCapacity <= 0) return false | ||
| val freeCapacity = availableWorkers.asScala.toList.map(_.totalActualUsableSpace()).sum |
There was a problem hiding this comment.
Numerator/denominator worker-set mismatch. freeCapacity sums over availableWorkers (a subset), but totalCapacity (line 1652) sums over all workersMap. A worker excluded merely for high workload — which happens precisely under load (AbstractMetaManager.updateWorkerHeartbeatMeta adds it to excludedWorkers and drops it from availableWorkers) — keeps its full totalSpace in the denominator but loses its free space from the numerator.
Example: 10 workers each 90% empty; load spikes and 9 report high workload → freeCapacity = 900GB, totalCapacity = 10TB → usedFraction = 0.91 ≥ 0.9 → fleet-wide System.gc() on a 90%-empty cluster, adding STW pauses while it's already busy. Same for shutdown/decommission/lost workers, none of which mean 'disk full'. Sum free and total over the same worker set.
| val totalCapacity = workersMap.values().asScala.map(_.totalSpace()).sum | ||
| if (totalCapacity <= 0) return false | ||
| val freeCapacity = availableWorkers.asScala.toList.map(_.totalActualUsableSpace()).sum | ||
| val usedFraction = 1.0 - freeCapacity.toDouble / totalCapacity.toDouble |
There was a problem hiding this comment.
totalSpace and totalActualUsableSpace are on different scales. totalSpace() is the full filesystem size, but totalActualUsableSpace() is capped at the worker dir's configured capacity= and is net of the disk reserve (StorageManager.updateDiskInfos: min(configuredUsableSpace - usage, fsFree - reserve)). When capacity= is set below the disk size, the disk is shared with non-Celeborn data, or the reserve is large, usedFraction is inflated.
E.g. dir:capacity=200G on a 2TB disk → usedFraction ≈ 0.90 on a brand-new empty cluster → reported overloaded forever. The default capacity (1PB) avoids this, but capacity= is a supported, documented setting. The doc ('Fraction of total cluster disk capacity used') doesn't match this computation.
| workersMap: java.util.Map[String, WorkerInfo], | ||
| availableWorkers: java.util.Set[WorkerInfo]): Boolean = { | ||
| if (!conf.clusterOverloadGcEnabled) return false | ||
| val totalCapacity = workersMap.values().asScala.map(_.totalSpace()).sum |
There was a problem hiding this comment.
Unsynchronized read of mutable cluster state. workersMap and availableWorkers are mutated together under synchronized(workersMap) in AbstractMetaManager (e.g. removeWorkerMeta: workersMap.remove(...) then availableWorkers.remove(...)). This method reads both without that lock, so it can observe a worker already gone from workersMap (smaller totalCapacity here) but still present in availableWorkers (line 1654) → freeCapacity > totalCapacity → negative usedFraction → a transient missed signal. The existing DEVICE_CELEBORN_*_CAPACITY gauges share this property, but only feed metrics, not a control decision.
| } | ||
| } | ||
|
|
||
| private[master] def isClusterOverloaded( |
There was a problem hiding this comment.
Duplicates existing gauges + altitude. This method's two sums are verbatim copies of the DEVICE_CELEBORN_TOTAL_CAPACITY / DEVICE_CELEBORN_FREE_CAPACITY gauge bodies (Master.scala:297-303). Extract a shared totalClusterCapacity() / freeClusterCapacity() so the accounting can't silently diverge from the published metrics.
On altitude: broadcasting System.gc() to every driver JVM is an indirect, unmeasurable lever for stale-shuffle expiry (it frees nothing if references are still held, and the STW cost lands on all drivers). Celeborn already models 'overload' via per-worker highWorkload + autoReleaseHighWorkLoadRatioThreshold; a targeted shuffle-expiry path would be more deterministic and reuse existing machinery.
| .doc("When true, the client will trigger System.gc() upon receiving a GC signal from " + | ||
| "the master indicating cluster storage is overloaded. Disable to ignore the signal.") | ||
| .booleanConf | ||
| .createWithDefault(true) |
There was a problem hiding this comment.
Default asymmetry with the master flag. This client flag defaults true while celeborn.master.clusterOverload.gc.enabled defaults false. So flipping only the master flag silently opts every connected app on default client config into fleet-wide System.gc() STW pauses, with no action by app owners. For an opt-in performance feature, consider defaulting this to false (require both sides to opt in), or calling out prominently that the master flag alone is fleet-wide-effective.
| "signals clients to trigger GC to release stale shuffle dependencies. For example, " + | ||
| "0.9 means 90% of total capacity is in use.") | ||
| .doubleConf | ||
| .checkValue(v => v > 0.0 && v <= 1.0, "Must be between 0 (exclusive) and 1 (inclusive)") |
There was a problem hiding this comment.
Doc/validation mismatch. The doc (line 2538) says the range is '(0.0–1.0)', but checkValue rejects exactly 0.0 (v > 0.0). An operator setting 0.0 (expecting 'always overloaded' per the doc) will fail config validation. Align the doc to '(0.0 exclusive, 1.0 inclusive]'. Minor: the doc uses a Unicode en-dash (–) rather than a hyphen.
| val w1 = makeWorker("host1", totalSpace = 1000L, usableSpace = 100L) | ||
| val w2 = makeWorker("host2", totalSpace = 1000L, usableSpace = 900L) | ||
| // Only w1 is available (e.g. w2 is excluded/shutdown) | ||
| assert(Master.isClusterOverloaded(conf, workersMap(w1, w2), availableWorkers(w1))) |
There was a problem hiding this comment.
This test locks in the worker-set-mismatch bug. It asserts isClusterOverloaded == true for two healthy workers whose true utilization is 50% (w2 is merely 'not available'). That result only holds because freeCapacity (available subset) is divided by totalCapacity (all workers) — the inconsistency flagged on the production method. When that's fixed to sum both over the same worker set, this test will go red on a now-correct implementation, so it effectively cements the wrong behavior.
|
|
||
| // With 0ms interval the next call should always be allowed. | ||
| hb.handleGcSignal(shouldTriggerGc = true) | ||
| assert(hb.lastGcTriggerTimeMs >= firstTrigger) |
There was a problem hiding this comment.
Vacuous assertion. firstTrigger is captured from lastGcTriggerTimeMs, then after the second handleGcSignal the test asserts lastGcTriggerTimeMs >= firstTrigger. If the re-trigger logic were broken and GC never fired again, lastGcTriggerTimeMs stays == firstTrigger, so firstTrigger >= firstTrigger is still true — the test passes whether or not GC re-fires and cannot detect the regression it names. (With minIntervalMs = 0L, two calls can also land in the same millisecond, so even > would be flaky.)
| val registeredShuffles = ConcurrentHashMap.newKeySet[Int]() | ||
| .asInstanceOf[ConcurrentHashMap.KeySetView[Int, java.lang.Boolean]] | ||
|
|
||
| new ApplicationHeartbeater( |
There was a problem hiding this comment.
Leaked executor per test. Each makeHeartbeater constructs an ApplicationHeartbeater, whose constructor eagerly starts a daemon scheduled executor (celeborn-client-lifecycle-manager-app-heartbeater). No test calls stop(), so each test in the suite leaks a live executor thread. They're daemon threads so the JVM still exits, but under CI parallelism this is the kind of teardown asymmetry that surfaces as 'unable to create new native thread'. Add an afterEach/stop(), or share one instance.
| | celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf for debugging purposes. | 0.5.0 | | | ||
| | celeborn.master.allowWorkerHostPattern | <undefined> | false | Pattern of worker host that allowed to register with the master. If not set, all workers are allowed to register. | 0.6.0 | | | ||
| | celeborn.master.clusterOverload.gc.enabled | false | false | Whether to enable the master signaling clients to trigger GC when the cluster storage is overloaded (disk usage exceeds the threshold). | 0.7.0 | | | ||
| | celeborn.master.clusterOverload.gc.threshold | 0.9 | false | Fraction of total cluster disk capacity used (0.0–1.0) above which the master signals clients to trigger GC to release stale shuffle dependencies. For example, 0.9 means 90% of total capacity is in use. | 0.7.0 | | |
| val MASTER_CLUSTER_OVERLOAD_GC_THRESHOLD: ConfigEntry[Double] = | ||
| buildConf("celeborn.master.clusterOverload.gc.threshold") | ||
| .categories("master") | ||
| .version("0.7.0") | ||
| .doc("Fraction of total cluster disk capacity used (0.0–1.0) above which the master " + | ||
| "signals clients to trigger GC to release stale shuffle dependencies. For example, " + | ||
| "0.9 means 90% of total capacity is in use.") | ||
| .doubleConf | ||
| .checkValue(v => v > 0.0 && v <= 1.0, "Must be between 0 (exclusive) and 1 (inclusive)") | ||
| .createWithDefault(0.9) |
| private[master] def shouldTriggerGcForApp(): Boolean = | ||
| Master.isClusterOverloaded( | ||
| conf, | ||
| statusSystem.workersMap, | ||
| statusSystem.availableWorkers) |
What changes were proposed in this pull request?
A lot of times, across multiple Spark apps hosting shuffle data on Celeborn, multiple stale shuffle dependencies can be cleaned up to reduce shuffle load. However, it is not practical to trigger a GC on each app's Driver manually.
This change allows Master to notify all connected apps to trigger a gc() when under load. This can give the cluster some breathing room by cleaning up stale shuffle.
In order to avoid repeated GCs, once triggered, app side triggers gc only after a configurable delay (5m by default) and the feature on the master side is guarded behind a feature flag (flase by default)
Why are the changes needed?
Allows aggressive cleanup of stale shuffle dependencies during heavy load
Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
No
How was this patch tested?
UTs