-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1577][Phase2] QuotaManager should support interrupt shuffle. #2819
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls resolve the conficts~
@@ -671,8 +671,14 @@ private[celeborn] class Worker( | |||
val resourceConsumptionSnapshot = storageManager.userResourceConsumptionSnapshot() | |||
val userResourceConsumptions = | |||
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava) | |||
resourceConsumptionSnapshot.foreach { case (userIdentifier, _) => | |||
resourceConsumptionSnapshot.foreach { case (userIdentifier, userResourceConsumption) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better not persist resourceConsumption into ratis When worker heartbeat subResourceConsumption
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
masterSource.sample(UPDATE_RESOURCE_CONSUMPTION_TIME, this.getClass.getSimpleName, Map.empty) { | ||
val clusterQuota = getClusterStorageQuota | ||
var clusterResourceConsumption = ResourceConsumption(0, 0, 0, 0) | ||
val userResourceConsumption = statusSystem.workerSnapshot.asScala.flatMap { workerInfo => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lack of tenant-level quota checks.
@@ -504,4 +501,10 @@ public boolean isWorkerAvailable(WorkerInfo workerInfo) { | |||
public void updateApplicationMeta(ApplicationMeta applicationMeta) { | |||
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta); | |||
} | |||
|
|||
public List<WorkerInfo> workerSnapshot() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not directly use workers for computing resource consumption?
quotaChecker.scheduleWithFixedDelay( | ||
() => { | ||
try { | ||
updateResourceConsumption() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method not ly update source consumption but also check quota for user-level, app-level, so may use another name or separate the logic
This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
ff37e63
to
940609e
Compare
I will fix above problems later. |
a13e20b
to
1b94be6
Compare
This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
9e172e2
to
57bd3ed
Compare
This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
Thanks @leixm , Very Sorry for the delayed review. |
} | ||
|
||
private def clearQuotaStatus(activeUsers: mutable.Set[UserIdentifier]): Unit = { | ||
userQuotaStatus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use userQuotaStatus.keySet().removeIf(userIdentifier => !activeUsers.contains(userIdentifier))
.asScala | ||
.diff(activeUsers) | ||
.foreach(userQuotaStatus.remove) | ||
tenantQuotaStatus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
userResourceConsumptionMap.getOrDefault(userIdentifier, ResourceConsumption(0, 0, 0, 0)) | ||
} | ||
|
||
private def registerUserResourceConsumptionMetrics(userIdentifier: UserIdentifier): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also register resource consumption metrics for tenants and clusters? The default option to enable registerUserResourceConsumptionMetrics may result in an excessive number of metrics. Could we consider disabling this by default?
|
||
// Step 2: Update user resource consumption metrics. | ||
// For extract metrics | ||
userResourceConsumptionMap.put(userIdentifier, resourceConsumption) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we can remove userResourceConsumptionMap
. and use registerUserResourceConsumptionMetrics(userIdentifier, resourceConsumption)
instead. userResourceConsumptionMap
may contain outdated userIdentifier.
} | ||
|
||
def checkUserQuotaStatus(userIdentifier: UserIdentifier): CheckQuotaResponse = { | ||
val tenantStatus = tenantQuotaStatus.getOrDefault(userIdentifier.tenantId, QuotaStatus()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it would be beneficial to support dynamic enabling and disabling of quota checks for tenants, users, and clusters. This would provide greater flexibility in production environments.
val quota = getQuota(userIdentifier) | ||
def checkApplicationQuotaStatus(applicationId: String): CheckQuotaResponse = { | ||
val status = appQuotaStatus.getOrDefault(applicationId, QuotaStatus()) | ||
CheckQuotaResponse(!status.exceed, status.exceedReason) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better log the quota check result if quota exceed
def checkUserQuotaStatus(userIdentifier: UserIdentifier): CheckQuotaResponse = { | ||
val tenantStatus = tenantQuotaStatus.getOrDefault(userIdentifier.tenantId, QuotaStatus()) | ||
val userStatus = userQuotaStatus.getOrDefault(userIdentifier, QuotaStatus()) | ||
if (userStatus.exceed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better log the quota check result if quota exceed
System.currentTimeMillis(), | ||
highWorkload, | ||
workerStatus, | ||
requestId) | ||
statusSystem.updateWorkerResourceConsumptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also use this way to update resource consumption when worker register to master
userResourceConsumptionMap.put(userIdentifier, resourceConsumption) | ||
registerUserResourceConsumptionMetrics(userIdentifier) | ||
|
||
// Step 3: Expire user level exceeded app except already expired app |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we separate this into checkUserResourceConsumption
, checkTenantResourceConsumption
, and checkClusterResourceConsumption
? This would improve unit testability and allow for better future extensions
What changes were proposed in this pull request?
2.1 When the tenant's resourceConsumption exceeds the tenant's quota, select the app with a larger consumption to mark interrupted.
2.2 When the resourceConsumption of the cluster exceeds the cluster quota, select the app with larger consumption to mark interrupted.
Why are the changes needed?
The current storage quota logic can only limit new shuffles, and cannot limit the writing of existing shuffles. In our production environment, there is such an scenario: the cluster is small, but the user's app single shuffle is large which occupied disk resources, we want to interrupt those shuffle.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UTs.