Skip to content

[CELEBORN-2364] Add support to cleanup stale shuffle dependencies across all Spark apps when cluster under load#3739

Open
saurabhd336 wants to merge 3 commits into
apache:mainfrom
saurabhd336:appSideCleanup
Open

[CELEBORN-2364] Add support to cleanup stale shuffle dependencies across all Spark apps when cluster under load#3739
saurabhd336 wants to merge 3 commits into
apache:mainfrom
saurabhd336:appSideCleanup

Conversation

@saurabhd336

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

A lot of times, across multiple Spark apps hosting shuffle data on Celeborn, multiple stale shuffle dependencies can be cleaned up to reduce shuffle load. However, it is not practical to trigger a GC on each app's Driver manually.

This change allows Master to notify all connected apps to trigger a gc() when under load. This can give the cluster some breathing room by cleaning up stale shuffle.

In order to avoid repeated GCs, once triggered, app side triggers gc only after a configurable delay (5m by default) and the feature on the master side is guarded behind a feature flag (flase by default)

Why are the changes needed?

Allows aggressive cleanup of stale shuffle dependencies during heavy load

Does this PR resolve a correctness bug?

No

Does this PR introduce any user-facing change?

No

How was this patch tested?

UTs

@saurabhd336 saurabhd336 changed the title Add support to cleanup stale shuffle dependencies across all Spark apps when cluster under load [CELEBORN-2364] Add support to cleanup stale shuffle dependencies across all Spark apps when cluster under load Jun 15, 2026
@saurabhd336

Copy link
Copy Markdown
Contributor Author

@copilot Review this please

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of the cluster-overload GC feature. The proto/SerDe plumbing for shouldTriggerGc is sound (backward-compatible, no unpatched matches). The substance is in Master.isClusterOverloaded and the client GC handler — comments inline.

Highest-priority items: the overload metric is broken on remote storage (S3/OSS/HDFS) and over-fires under load on local-disk clusters; the client System.gc() runs synchronously on the heartbeat thread and uses a non-monotonic clock for its cooldown.

availableWorkers: java.util.Set[WorkerInfo]): Boolean = {
if (!conf.clusterOverloadGcEnabled) return false
val totalCapacity = workersMap.values().asScala.map(_.totalSpace()).sum
if (totalCapacity <= 0) return false

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaks on remote storage (S3/OSS/HDFS). Remote DiskInfos are created with actualUsableSpace = Long.MaxValue and totalSpace = 0 (StorageManager's remoteDiskInfos) and are reported to the master via allDisksSnapshot() in every worker heartbeat. So WorkerInfo.totalSpace() contributes 0 per remote disk while totalActualUsableSpace() contributes Long.MaxValue.

  • Remote-only cluster: every worker's totalSpace() is 0totalCapacity == 0 → this guard returns false permanently. The feature is a silent no-op on exactly the deployments where stale-shuffle disk pressure matters most.
  • Hybrid cluster: freeCapacity (line 1654) sums Long.MaxValue per available remote worker → Long overflow to a negative value → usedFraction = 1 - neg/pos > 1 → reports overloaded every heartbeat (or, in the single-remote no-overflow case, hugely negative → never).

Consider excluding remote / Long.MaxValue disks from this computation, or scoping it to local-disk capacity only.

if (!conf.clusterOverloadGcEnabled) return false
val totalCapacity = workersMap.values().asScala.map(_.totalSpace()).sum
if (totalCapacity <= 0) return false
val freeCapacity = availableWorkers.asScala.toList.map(_.totalActualUsableSpace()).sum

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Numerator/denominator worker-set mismatch. freeCapacity sums over availableWorkers (a subset), but totalCapacity (line 1652) sums over all workersMap. A worker excluded merely for high workload — which happens precisely under load (AbstractMetaManager.updateWorkerHeartbeatMeta adds it to excludedWorkers and drops it from availableWorkers) — keeps its full totalSpace in the denominator but loses its free space from the numerator.

Example: 10 workers each 90% empty; load spikes and 9 report high workload → freeCapacity = 900GB, totalCapacity = 10TBusedFraction = 0.91 ≥ 0.9 → fleet-wide System.gc() on a 90%-empty cluster, adding STW pauses while it's already busy. Same for shutdown/decommission/lost workers, none of which mean 'disk full'. Sum free and total over the same worker set.

val totalCapacity = workersMap.values().asScala.map(_.totalSpace()).sum
if (totalCapacity <= 0) return false
val freeCapacity = availableWorkers.asScala.toList.map(_.totalActualUsableSpace()).sum
val usedFraction = 1.0 - freeCapacity.toDouble / totalCapacity.toDouble

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totalSpace and totalActualUsableSpace are on different scales. totalSpace() is the full filesystem size, but totalActualUsableSpace() is capped at the worker dir's configured capacity= and is net of the disk reserve (StorageManager.updateDiskInfos: min(configuredUsableSpace - usage, fsFree - reserve)). When capacity= is set below the disk size, the disk is shared with non-Celeborn data, or the reserve is large, usedFraction is inflated.

E.g. dir:capacity=200G on a 2TB disk → usedFraction ≈ 0.90 on a brand-new empty cluster → reported overloaded forever. The default capacity (1PB) avoids this, but capacity= is a supported, documented setting. The doc ('Fraction of total cluster disk capacity used') doesn't match this computation.

workersMap: java.util.Map[String, WorkerInfo],
availableWorkers: java.util.Set[WorkerInfo]): Boolean = {
if (!conf.clusterOverloadGcEnabled) return false
val totalCapacity = workersMap.values().asScala.map(_.totalSpace()).sum

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsynchronized read of mutable cluster state. workersMap and availableWorkers are mutated together under synchronized(workersMap) in AbstractMetaManager (e.g. removeWorkerMeta: workersMap.remove(...) then availableWorkers.remove(...)). This method reads both without that lock, so it can observe a worker already gone from workersMap (smaller totalCapacity here) but still present in availableWorkers (line 1654) → freeCapacity > totalCapacity → negative usedFraction → a transient missed signal. The existing DEVICE_CELEBORN_*_CAPACITY gauges share this property, but only feed metrics, not a control decision.

}
}

private[master] def isClusterOverloaded(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicates existing gauges + altitude. This method's two sums are verbatim copies of the DEVICE_CELEBORN_TOTAL_CAPACITY / DEVICE_CELEBORN_FREE_CAPACITY gauge bodies (Master.scala:297-303). Extract a shared totalClusterCapacity() / freeClusterCapacity() so the accounting can't silently diverge from the published metrics.

On altitude: broadcasting System.gc() to every driver JVM is an indirect, unmeasurable lever for stale-shuffle expiry (it frees nothing if references are still held, and the STW cost lands on all drivers). Celeborn already models 'overload' via per-worker highWorkload + autoReleaseHighWorkLoadRatioThreshold; a targeted shuffle-expiry path would be more deterministic and reuse existing machinery.

.doc("When true, the client will trigger System.gc() upon receiving a GC signal from " +
"the master indicating cluster storage is overloaded. Disable to ignore the signal.")
.booleanConf
.createWithDefault(true)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default asymmetry with the master flag. This client flag defaults true while celeborn.master.clusterOverload.gc.enabled defaults false. So flipping only the master flag silently opts every connected app on default client config into fleet-wide System.gc() STW pauses, with no action by app owners. For an opt-in performance feature, consider defaulting this to false (require both sides to opt in), or calling out prominently that the master flag alone is fleet-wide-effective.

"signals clients to trigger GC to release stale shuffle dependencies. For example, " +
"0.9 means 90% of total capacity is in use.")
.doubleConf
.checkValue(v => v > 0.0 && v <= 1.0, "Must be between 0 (exclusive) and 1 (inclusive)")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc/validation mismatch. The doc (line 2538) says the range is '(0.0–1.0)', but checkValue rejects exactly 0.0 (v > 0.0). An operator setting 0.0 (expecting 'always overloaded' per the doc) will fail config validation. Align the doc to '(0.0 exclusive, 1.0 inclusive]'. Minor: the doc uses a Unicode en-dash () rather than a hyphen.

val w1 = makeWorker("host1", totalSpace = 1000L, usableSpace = 100L)
val w2 = makeWorker("host2", totalSpace = 1000L, usableSpace = 900L)
// Only w1 is available (e.g. w2 is excluded/shutdown)
assert(Master.isClusterOverloaded(conf, workersMap(w1, w2), availableWorkers(w1)))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test locks in the worker-set-mismatch bug. It asserts isClusterOverloaded == true for two healthy workers whose true utilization is 50% (w2 is merely 'not available'). That result only holds because freeCapacity (available subset) is divided by totalCapacity (all workers) — the inconsistency flagged on the production method. When that's fixed to sum both over the same worker set, this test will go red on a now-correct implementation, so it effectively cements the wrong behavior.


// With 0ms interval the next call should always be allowed.
hb.handleGcSignal(shouldTriggerGc = true)
assert(hb.lastGcTriggerTimeMs >= firstTrigger)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vacuous assertion. firstTrigger is captured from lastGcTriggerTimeMs, then after the second handleGcSignal the test asserts lastGcTriggerTimeMs >= firstTrigger. If the re-trigger logic were broken and GC never fired again, lastGcTriggerTimeMs stays == firstTrigger, so firstTrigger >= firstTrigger is still true — the test passes whether or not GC re-fires and cannot detect the regression it names. (With minIntervalMs = 0L, two calls can also land in the same millisecond, so even > would be flaky.)

val registeredShuffles = ConcurrentHashMap.newKeySet[Int]()
.asInstanceOf[ConcurrentHashMap.KeySetView[Int, java.lang.Boolean]]

new ApplicationHeartbeater(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaked executor per test. Each makeHeartbeater constructs an ApplicationHeartbeater, whose constructor eagerly starts a daemon scheduled executor (celeborn-client-lifecycle-manager-app-heartbeater). No test calls stop(), so each test in the suite leaks a live executor thread. They're daemon threads so the JVM still exits, but under CI parallelism this is the kind of teardown asymmetry that surfaces as 'unable to create new native thread'. Add an afterEach/stop(), or share one instance.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.

| celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf for debugging purposes. | 0.5.0 | |
| celeborn.master.allowWorkerHostPattern | &lt;undefined&gt; | false | Pattern of worker host that allowed to register with the master. If not set, all workers are allowed to register. | 0.6.0 | |
| celeborn.master.clusterOverload.gc.enabled | false | false | Whether to enable the master signaling clients to trigger GC when the cluster storage is overloaded (disk usage exceeds the threshold). | 0.7.0 | |
| celeborn.master.clusterOverload.gc.threshold | 0.9 | false | Fraction of total cluster disk capacity used (0.0–1.0) above which the master signals clients to trigger GC to release stale shuffle dependencies. For example, 0.9 means 90% of total capacity is in use. | 0.7.0 | |
Comment on lines +2534 to +2543
val MASTER_CLUSTER_OVERLOAD_GC_THRESHOLD: ConfigEntry[Double] =
buildConf("celeborn.master.clusterOverload.gc.threshold")
.categories("master")
.version("0.7.0")
.doc("Fraction of total cluster disk capacity used (0.0–1.0) above which the master " +
"signals clients to trigger GC to release stale shuffle dependencies. For example, " +
"0.9 means 90% of total capacity is in use.")
.doubleConf
.checkValue(v => v > 0.0 && v <= 1.0, "Must be between 0 (exclusive) and 1 (inclusive)")
.createWithDefault(0.9)
Comment on lines +1265 to +1269
private[master] def shouldTriggerGcForApp(): Boolean =
Master.isClusterOverloaded(
conf,
statusSystem.workersMap,
statusSystem.availableWorkers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants