Skip to content

Commit 940609e

Browse files
committed
[CELEBORN-1577][Phase2] QuotaManager should support interrupt shuffle.
1 parent 7102174 commit 940609e

File tree

15 files changed

+962
-188
lines changed

15 files changed

+962
-188
lines changed

common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

+61-3
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
665665
def estimatedPartitionSizeForEstimationUpdateInterval: Long =
666666
get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL)
667667
def masterResourceConsumptionInterval: Long = get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
668+
def masterUserDiskUsageThreshold: Long = get(MASTER_USER_DISK_USAGE_THRESHOLD)
669+
def masterClusterDiskUsageThreshold: Long = get(MASTER_CLUSTER_DISK_USAGE_THRESHOLD)
668670
def clusterName: String = get(CLUSTER_NAME)
669671

670672
// //////////////////////////////////////////////////////
@@ -2928,6 +2930,26 @@ object CelebornConf extends Logging {
29282930
.timeConf(TimeUnit.MILLISECONDS)
29292931
.createWithDefaultString("30s")
29302932

2933+
val MASTER_USER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] =
2934+
buildConf("celeborn.master.userResourceConsumption.user.threshold")
2935+
.categories("master")
2936+
.doc("When user resource consumption exceeds quota, Master will " +
2937+
"interrupt some apps until user resource consumption is less " +
2938+
"than this value. Default value is Long.MaxValue which means disable check.")
2939+
.version("0.6.0")
2940+
.bytesConf(ByteUnit.BYTE)
2941+
.createWithDefault(Long.MaxValue)
2942+
2943+
val MASTER_CLUSTER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] =
2944+
buildConf("celeborn.master.userResourceConsumption.cluster.threshold")
2945+
.categories("master")
2946+
.doc("When cluster resource consumption exceeds quota, Master will " +
2947+
"interrupt some apps until cluster resource consumption is less " +
2948+
"than this value. Default value is Long.MaxValue which means disable check.")
2949+
.version("0.6.0")
2950+
.bytesConf(ByteUnit.BYTE)
2951+
.createWithDefault(Long.MaxValue)
2952+
29312953
val CLUSTER_NAME: ConfigEntry[String] =
29322954
buildConf("celeborn.cluster.name")
29332955
.categories("master", "worker")
@@ -5380,7 +5402,7 @@ object CelebornConf extends Logging {
53805402
.dynamic
53815403
.doc("Quota dynamic configuration for written disk bytes.")
53825404
.version("0.5.0")
5383-
.longConf
5405+
.bytesConf(ByteUnit.BYTE)
53845406
.createWithDefault(Long.MaxValue)
53855407

53865408
val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] =
@@ -5389,7 +5411,7 @@ object CelebornConf extends Logging {
53895411
.dynamic
53905412
.doc("Quota dynamic configuration for written disk file count.")
53915413
.version("0.5.0")
5392-
.longConf
5414+
.bytesConf(ByteUnit.BYTE)
53935415
.createWithDefault(Long.MaxValue)
53945416

53955417
val QUOTA_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
@@ -5398,7 +5420,7 @@ object CelebornConf extends Logging {
53985420
.dynamic
53995421
.doc("Quota dynamic configuration for written hdfs bytes.")
54005422
.version("0.5.0")
5401-
.longConf
5423+
.bytesConf(ByteUnit.BYTE)
54025424
.createWithDefault(Long.MaxValue)
54035425

54045426
val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] =
@@ -6014,4 +6036,40 @@ object CelebornConf extends Logging {
60146036
.doubleConf
60156037
.checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
60166038
.createWithDefault(1)
6039+
6040+
val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
6041+
buildConf("celeborn.quota.cluster.diskBytesWritten")
6042+
.categories("quota")
6043+
.dynamic
6044+
.doc("Quota dynamic configuration for cluster written disk bytes.")
6045+
.version("0.6.0")
6046+
.bytesConf(ByteUnit.BYTE)
6047+
.createWithDefault(Long.MaxValue)
6048+
6049+
val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] =
6050+
buildConf("celeborn.quota.cluster.diskFileCount")
6051+
.categories("quota")
6052+
.dynamic
6053+
.doc("Quota dynamic configuration for cluster written disk file count.")
6054+
.version("0.6.0")
6055+
.longConf
6056+
.createWithDefault(Long.MaxValue)
6057+
6058+
val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
6059+
buildConf("celeborn.quota.cluster.hdfsBytesWritten")
6060+
.categories("quota")
6061+
.dynamic
6062+
.doc("Quota dynamic configuration for cluster written hdfs bytes.")
6063+
.version("0.6.0")
6064+
.bytesConf(ByteUnit.BYTE)
6065+
.createWithDefault(Long.MaxValue)
6066+
6067+
val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] =
6068+
buildConf("celeborn.quota.cluster.hdfsFileCount")
6069+
.categories("quota")
6070+
.dynamic
6071+
.doc("Quota dynamic configuration for cluster written hdfs file count.")
6072+
.version("0.6.0")
6073+
.longConf
6074+
.createWithDefault(Long.MaxValue)
60176075
}

common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala

+21
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ case class ResourceConsumption(
3030
hdfsFileCount: Long,
3131
var subResourceConsumptions: util.Map[String, ResourceConsumption] = null) {
3232

33+
def withSubResourceConsumptions(
34+
resourceConsumptions: util.Map[String, ResourceConsumption]): ResourceConsumption = {
35+
subResourceConsumptions = resourceConsumptions
36+
this
37+
}
38+
3339
def add(other: ResourceConsumption): ResourceConsumption = {
3440
ResourceConsumption(
3541
diskBytesWritten + other.diskBytesWritten,
@@ -38,6 +44,14 @@ case class ResourceConsumption(
3844
hdfsFileCount + other.hdfsFileCount)
3945
}
4046

47+
def subtract(other: ResourceConsumption): ResourceConsumption = {
48+
ResourceConsumption(
49+
diskBytesWritten - other.diskBytesWritten,
50+
diskFileCount - other.diskFileCount,
51+
hdfsBytesWritten - other.hdfsBytesWritten,
52+
hdfsFileCount - other.hdfsFileCount)
53+
}
54+
4155
def addSubResourceConsumptions(otherSubResourceConsumptions: Map[
4256
String,
4357
ResourceConsumption]): Map[String, ResourceConsumption] = {
@@ -77,4 +91,11 @@ case class ResourceConsumption(
7791
s" hdfsFileCount: $hdfsFileCount," +
7892
s" subResourceConsumptions: $subResourceConsumptionString)"
7993
}
94+
95+
def simpleString: String = {
96+
s"ResourceConsumption(diskBytesWritten: ${Utils.bytesToString(diskBytesWritten)}," +
97+
s" diskFileCount: $diskFileCount," +
98+
s" hdfsBytesWritten: ${Utils.bytesToString(hdfsBytesWritten)}," +
99+
s" hdfsFileCount: $hdfsFileCount)"
100+
}
80101
}

common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.celeborn.common.quota
2020
import org.apache.celeborn.common.internal.Logging
2121
import org.apache.celeborn.common.util.Utils
2222

23-
case class Quota(
23+
case class StorageQuota(
2424
diskBytesWritten: Long,
2525
diskFileCount: Long,
2626
hdfsBytesWritten: Long,
@@ -34,3 +34,7 @@ case class Quota(
3434
s"]"
3535
}
3636
}
37+
38+
object StorageQuota {
39+
val DEFAULT_QUOTA = StorageQuota(Long.MaxValue, Long.MaxValue, Long.MaxValue, Long.MaxValue)
40+
}

docs/configuration/master.md

+2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ license: |
7979
| celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | false | This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created. | 0.3.0 | celeborn.slots.assign.loadAware.numDiskGroups |
8080
| celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. | 0.3.1 | |
8181
| celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.availableTypes` | 0.3.0 | celeborn.slots.assign.policy |
82+
| celeborn.master.userResourceConsumption.cluster.threshold | 9223372036854775807b | false | When cluster resource consumption exceeds quota, Master will interrupt some apps until cluster resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | |
8283
| celeborn.master.userResourceConsumption.update.interval | 30s | false | Time length for a window about compute user resource consumption. | 0.3.0 | |
84+
| celeborn.master.userResourceConsumption.user.threshold | 9223372036854775807b | false | When user resource consumption exceeds quota, Master will interrupt some apps until user resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | |
8385
| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker unavailable info would be cleared when the retention period is expired. Set -1 to disable the expiration. | 0.3.1 | |
8486
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
8587
| celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | Regex to decide which Celeborn configuration properties and environment variables in master and worker environments contain sensitive information. When this regex matches a property key or value, the value is redacted from the logging. | 0.5.0 | |

docs/configuration/quota.md

+7-3
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@ license: |
1919
<!--begin-include-->
2020
| Key | Default | isDynamic | Description | Since | Deprecated |
2121
| --- | ------- | --------- | ----------- | ----- | ---------- |
22+
| celeborn.quota.cluster.diskBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for cluster written disk bytes. | 0.6.0 | |
23+
| celeborn.quota.cluster.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for cluster written disk file count. | 0.6.0 | |
24+
| celeborn.quota.cluster.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for cluster written hdfs bytes. | 0.6.0 | |
25+
| celeborn.quota.cluster.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for cluster written hdfs file count. | 0.6.0 | |
2226
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
2327
| celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | false | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.2.0 | |
2428
| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
2529
| celeborn.quota.identity.user-specific.userName | default | false | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
2630
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
27-
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | |
28-
| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
29-
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
31+
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | |
32+
| celeborn.quota.tenant.diskFileCount | 9223372036854775807b | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
33+
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
3034
| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. | 0.5.0 | |
3135
<!--end-include-->

master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala

+8-70
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,12 @@ private[celeborn] class Master(
186186
private val hasHDFSStorage = conf.hasHDFSStorage
187187
private val hasS3Storage = conf.hasS3Storage
188188

189-
private val quotaManager = new QuotaManager(conf, configService)
189+
private val quotaManager = new QuotaManager(
190+
statusSystem,
191+
masterSource,
192+
resourceConsumptionSource,
193+
conf,
194+
configService)
190195
private val tagsManager = new TagsManager(Option(configService))
191196
private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval
192197
private val userResourceConsumptions =
@@ -1135,7 +1140,7 @@ private[celeborn] class Master(
11351140
new util.ArrayList[WorkerInfo](
11361141
(statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava),
11371142
new util.ArrayList(appRelatedShuffles),
1138-
CheckQuotaResponse(isAvailable = true, "")))
1143+
quotaManager.checkApplicationQuotaStatus(appId)))
11391144
} else {
11401145
context.reply(OneWayMessageResponse)
11411146
}
@@ -1151,78 +1156,11 @@ private[celeborn] class Master(
11511156
}
11521157
}
11531158

1154-
private def handleResourceConsumption(userIdentifier: UserIdentifier): ResourceConsumption = {
1155-
val userResourceConsumption = computeUserResourceConsumption(userIdentifier)
1156-
gaugeResourceConsumption(userIdentifier)
1157-
userResourceConsumption
1158-
}
1159-
1160-
private def gaugeResourceConsumption(
1161-
userIdentifier: UserIdentifier,
1162-
applicationId: String = null): Unit = {
1163-
val resourceConsumptionLabel =
1164-
if (applicationId == null) userIdentifier.toMap
1165-
else userIdentifier.toMap + (resourceConsumptionSource.applicationLabel -> applicationId)
1166-
resourceConsumptionSource.addGauge(
1167-
ResourceConsumptionSource.DISK_FILE_COUNT,
1168-
resourceConsumptionLabel) { () =>
1169-
computeResourceConsumption(userIdentifier, applicationId).diskFileCount
1170-
}
1171-
resourceConsumptionSource.addGauge(
1172-
ResourceConsumptionSource.DISK_BYTES_WRITTEN,
1173-
resourceConsumptionLabel) { () =>
1174-
computeResourceConsumption(userIdentifier, applicationId).diskBytesWritten
1175-
}
1176-
if (hasHDFSStorage) {
1177-
resourceConsumptionSource.addGauge(
1178-
ResourceConsumptionSource.HDFS_FILE_COUNT,
1179-
resourceConsumptionLabel) { () =>
1180-
computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
1181-
}
1182-
resourceConsumptionSource.addGauge(
1183-
ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
1184-
resourceConsumptionLabel) { () =>
1185-
computeResourceConsumption(userIdentifier, applicationId).hdfsBytesWritten
1186-
}
1187-
}
1188-
}
1189-
1190-
private def computeResourceConsumption(
1191-
userIdentifier: UserIdentifier,
1192-
applicationId: String = null): ResourceConsumption = {
1193-
val newResourceConsumption = computeUserResourceConsumption(userIdentifier)
1194-
if (applicationId == null) {
1195-
val current = System.currentTimeMillis()
1196-
if (userResourceConsumptions.containsKey(userIdentifier)) {
1197-
val resourceConsumptionAndUpdateTime = userResourceConsumptions.get(userIdentifier)
1198-
if (current - resourceConsumptionAndUpdateTime._2 <= masterResourceConsumptionInterval) {
1199-
return resourceConsumptionAndUpdateTime._1
1200-
}
1201-
}
1202-
userResourceConsumptions.put(userIdentifier, (newResourceConsumption, current))
1203-
newResourceConsumption
1204-
} else {
1205-
newResourceConsumption.subResourceConsumptions.get(applicationId)
1206-
}
1207-
}
1208-
1209-
// TODO: Support calculate topN app resource consumption.
1210-
private def computeUserResourceConsumption(
1211-
userIdentifier: UserIdentifier): ResourceConsumption = {
1212-
val resourceConsumption = statusSystem.workersMap.values().asScala.flatMap {
1213-
workerInfo => workerInfo.userResourceConsumption.asScala.get(userIdentifier)
1214-
}.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _)
1215-
resourceConsumption
1216-
}
1217-
12181159
private[master] def handleCheckQuota(
12191160
userIdentifier: UserIdentifier,
12201161
context: RpcCallContext): Unit = {
1221-
val userResourceConsumption = handleResourceConsumption(userIdentifier)
12221162
if (conf.quotaEnabled) {
1223-
val (isAvailable, reason) =
1224-
quotaManager.checkQuotaSpaceAvailable(userIdentifier, userResourceConsumption)
1225-
context.reply(CheckQuotaResponse(isAvailable, reason))
1163+
context.reply(quotaManager.checkUserQuotaStatus(userIdentifier))
12261164
} else {
12271165
context.reply(CheckQuotaResponse(true, ""))
12281166
}

master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala

+2
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,6 @@ object MasterSource {
6363
// Capacity
6464
val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes"
6565
val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes"
66+
67+
val UPDATE_RESOURCE_CONSUMPTION_TIME = "UpdateResourceConsumptionTime"
6668
}

0 commit comments

Comments
 (0)