From f1328d7c7dcf9add9b08ea818b82fa10e99e38d9 Mon Sep 17 00:00:00 2001 From: "mousumi.kundu" Date: Mon, 10 Aug 2020 16:32:31 +0530 Subject: [PATCH 1/2] YARN-6492 --- .../scheduler/QueueMetrics.java | 500 ++++++++--- .../TestSchedulerApplicationAttempt.java | 48 +- .../TestNodeLabelContainerAllocation.java | 781 +++++++++++++++++- 3 files changed, 1193 insertions(+), 136 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index eafe8edfcc945..550607974ba66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -49,6 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; @InterfaceAudience.Private @@ -68,10 +69,18 @@ public class QueueMetrics implements MetricsSource { MutableCounterLong aggregateOffSwitchContainersAllocated; @Metric("Aggregate # of preempted containers") MutableCounterLong aggregateContainersPreempted; + @Metric("Aggregate # of preempted memory seconds") MutableCounterLong + aggregateMemoryMBSecondsPreempted; + @Metric("Aggregate # of preempted vcore seconds") MutableCounterLong + aggregateVcoreSecondsPreempted; @Metric("# of active users") MutableGaugeInt activeUsers; @Metric("# of active applications") MutableGaugeInt activeApplications; @Metric("App Attempt First Container Allocation Delay") MutableRate appAttemptFirstContainerAllocationDelay; + @Metric("Aggregate total of preempted memory MB") + MutableCounterLong aggregateMemoryMBPreempted; + @Metric("Aggregate total of preempted vcores") + MutableCounterLong aggregateVcoresPreempted; //Metrics updated only for "default" partition @Metric("Allocated memory in MB") MutableGaugeLong allocatedMB; @@ -101,23 +110,43 @@ public class QueueMetrics implements MetricsSource { info("Queue", "Metrics by queue"); protected static final MetricsInfo USER_INFO = info("User", "Metrics by user"); + protected static final MetricsInfo PARTITION_INFO = + info("Partition", "Metrics by partition"); static final Splitter Q_SPLITTER = Splitter.on('.').omitEmptyStrings().trimResults(); protected final MetricsRegistry registry; protected final String queueName; - protected final QueueMetrics parent; + private QueueMetrics parent; + private final Queue parentQueue; protected final MetricsSystem metricsSystem; protected final Map users; protected final Configuration conf; + private final boolean enableUserMetrics; + + protected static final MetricsInfo P_RECORD_INFO = + info("PartitionQueueMetrics", "Metrics for the resource scheduler"); + + // Use "default" to operate NO_LABEL (default) partition internally + public static final String DEFAULT_PARTITION = "default"; + + // Use "" to register NO_LABEL (default) partition into metrics system + public static final String DEFAULT_PARTITION_JMX_STR = ""; + + // Metric Name Delimiter + public static final String METRIC_NAME_DELIMITER = "."; + + public QueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf) { - protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, - boolean enableUserMetrics, Configuration conf) { registry = new MetricsRegistry(RECORD_INFO); this.queueName = queueName; + this.parent = parent != null ? parent.getMetrics() : null; - this.users = enableUserMetrics ? new HashMap() - : null; + this.parentQueue = parent; + this.users = enableUserMetrics ? new HashMap() : null; + this.enableUserMetrics = enableUserMetrics; + metricsSystem = ms; this.conf = conf; runningTime = buildBuckets(conf); @@ -137,12 +166,25 @@ protected static StringBuilder sourceName(String queueName) { return sb; } - public synchronized - static QueueMetrics forQueue(String queueName, Queue parent, - boolean enableUserMetrics, - Configuration conf) { + static StringBuilder pSourceName(String partition) { + StringBuilder sb = new StringBuilder(P_RECORD_INFO.name()); + sb.append(",partition").append('=').append(partition); + return sb; + } + + static StringBuilder qSourceName(String queueName) { + StringBuilder sb = new StringBuilder(); + int i = 0; + for (String node : Q_SPLITTER.split(queueName)) { + sb.append(",q").append(i++).append('=').append(node); + } + return sb; + } + + public synchronized static QueueMetrics forQueue(String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf) { return forQueue(DefaultMetricsSystem.instance(), queueName, parent, - enableUserMetrics, conf); + enableUserMetrics, conf); } /** @@ -168,24 +210,20 @@ protected static Map getQueueMetrics() { return QUEUE_METRICS; } - public synchronized - static QueueMetrics forQueue(MetricsSystem ms, String queueName, - Queue parent, boolean enableUserMetrics, - Configuration conf) { - QueueMetrics metrics = QUEUE_METRICS.get(queueName); + public synchronized static QueueMetrics forQueue(MetricsSystem ms, + String queueName, Queue parent, boolean enableUserMetrics, + Configuration conf) { + QueueMetrics metrics = getQueueMetrics().get(queueName); if (metrics == null) { - metrics = - new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf). - tag(QUEUE_INFO, queueName); - + metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf) + .tag(QUEUE_INFO, queueName); + // Register with the MetricsSystems if (ms != null) { - metrics = - ms.register( - sourceName(queueName).toString(), - "Metrics for queue: " + queueName, metrics); + metrics = ms.register(sourceName(queueName).toString(), + "Metrics for queue: " + queueName, metrics); } - QUEUE_METRICS.put(queueName, metrics); + getQueueMetrics().put(queueName, metrics); } return metrics; @@ -197,7 +235,8 @@ public synchronized QueueMetrics getUserMetrics(String userName) { } QueueMetrics metrics = users.get(userName); if (metrics == null) { - metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new QueueMetrics(metricsSystem, queueName, null, false, conf); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), @@ -207,6 +246,96 @@ public synchronized QueueMetrics getUserMetrics(String userName) { return metrics; } + /** + * Partition * Queue Metrics + * + * Computes Metrics at Partition (Node Label) * Queue Level. + * + * Sample JMX O/P Structure: + * + * PartitionQueueMetrics (labelX) + * QueueMetrics (A) + * metrics + * QueueMetrics (A1) + * metrics + * QueueMetrics (A2) + * metrics + * QueueMetrics (B) + * metrics + * + * @param partition + * @return QueueMetrics + */ + public synchronized QueueMetrics getPartitionQueueMetrics(String partition) { + + String partitionJMXStr = partition; + + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = DEFAULT_PARTITION; + partitionJMXStr = DEFAULT_PARTITION_JMX_STR; + } + + String metricName = partition + METRIC_NAME_DELIMITER + this.queueName; + QueueMetrics metrics = getQueueMetrics().get(metricName); + + if (metrics == null) { + QueueMetrics queueMetrics = + new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue, + this.enableUserMetrics, this.conf, partition); + metricsSystem.register( + pSourceName(partitionJMXStr).append(qSourceName(this.queueName)) + .toString(), + "Metrics for queue: " + this.queueName, + queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO, + this.queueName)); + getQueueMetrics().put(metricName, queueMetrics); + return queueMetrics; + } else { + return metrics; + } + } + + /** + * Partition Metrics + * + * Computes Metrics at Partition (Node Label) Level. + * + * Sample JMX O/P Structure: + * + * PartitionQueueMetrics (labelX) + * metrics + * + * @param partition + * @return QueueMetrics + */ + private QueueMetrics getPartitionMetrics(String partition) { + + String partitionJMXStr = partition; + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = DEFAULT_PARTITION; + partitionJMXStr = DEFAULT_PARTITION_JMX_STR; + } + + String metricName = partition + METRIC_NAME_DELIMITER; + QueueMetrics metrics = getQueueMetrics().get(metricName); + if (metrics == null) { + metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null, + false, this.conf, partition); + + // Register with the MetricsSystems + if (metricsSystem != null) { + metricsSystem.register(pSourceName(partitionJMXStr).toString(), + "Metrics for partition: " + partitionJMXStr, + (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, + partitionJMXStr)); + } + getQueueMetrics().put(metricName, metrics); + } + return metrics; + } + private ArrayList parseInts(String value) { ArrayList result = new ArrayList(); for(String s: value.split(",")) { @@ -346,12 +475,31 @@ public void moveAppTo(AppSchedulingInfo app) { * @param limit resource limit */ public void setAvailableResourcesToQueue(String partition, Resource limit) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - availableMB.set(limit.getMemorySize()); - availableVCores.set(limit.getVirtualCores()); + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + setAvailableResources(limit); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.setAvailableResources(limit); + if (this.queueName.equals("root")) { + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.setAvailableResources(limit); + } + } } } - + + /** + * Set Available resources with support for resource vectors. + * + * @param limit + */ + public void setAvailableResources(Resource limit) { + availableMB.set(limit.getMemorySize()); + availableVCores.set(limit.getVirtualCores()); + } + /** * Set available resources. To be called by scheduler periodically as * resources become available. @@ -373,7 +521,15 @@ public void setAvailableResourcesToUser(String partition, if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.setAvailableResourcesToQueue(partition, limit); + userMetrics.setAvailableResources(limit); + } + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + QueueMetrics partitionUserMetrics = + partitionQueueMetrics.getUserMetrics(user); + if (partitionUserMetrics != null) { + partitionUserMetrics.setAvailableResources(limit); } } } @@ -388,19 +544,34 @@ public void setAvailableResourcesToUser(String partition, */ public void incrPendingResources(String partition, String user, int containers, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _incrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.incrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.incrPendingResources(partition, user, containers, res); + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + internalIncrPendingResources(partition, user, containers, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalIncrPendingResources(partition, user, + containers, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.incrementPendingResources(containers, res); } } } - private void _incrPendingResources(int containers, Resource res) { + public void internalIncrPendingResources(String partition, String user, + int containers, Resource res) { + incrementPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalIncrPendingResources(partition, user, containers, + res); + } + if (parent != null) { + parent.internalIncrPendingResources(partition, user, containers, res); + } + } + + private void incrementPendingResources(int containers, Resource res) { pendingContainers.incr(containers); pendingMB.incr(res.getMemorySize() * containers); pendingVCores.incr(res.getVirtualCores() * containers); @@ -409,19 +580,34 @@ private void _incrPendingResources(int containers, Resource res) { public void decrPendingResources(String partition, String user, int containers, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _decrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.decrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.decrPendingResources(partition, user, containers, res); + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + internalDecrPendingResources(partition, user, containers, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalDecrPendingResources(partition, user, + containers, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.decrementPendingResources(containers, res); } } } + + public void internalDecrPendingResources(String partition, String user, + int containers, Resource res) { + decrementPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalDecrPendingResources(partition, user, containers, + res); + } + if (parent != null) { + parent.internalDecrPendingResources(partition, user, containers, res); + } + } - private void _decrPendingResources(int containers, Resource res) { + private void decrementPendingResources(int containers, Resource res) { pendingContainers.decr(containers); pendingMB.decr(res.getMemorySize() * containers); pendingVCores.decr(res.getVirtualCores() * containers); @@ -446,28 +632,45 @@ public void incrNodeTypeAggregations(String user, NodeType type) { } } - public void allocateResources(String partition, String user, - int containers, Resource res, boolean decrPending) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.incr(containers); - aggregateContainersAllocated.incr(containers); - - allocatedMB.incr(res.getMemorySize() * containers); - allocatedVCores.incr(res.getVirtualCores() * containers); - if (decrPending) { - _decrPendingResources(containers, res); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, - containers, res, decrPending); - } - if (parent != null) { - parent.allocateResources(partition, user, containers, res, decrPending); + public void allocateResources(String partition, String user, int containers, + Resource res, boolean decrPending) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + internalAllocateResources(partition, user, containers, res, decrPending); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalAllocateResources(partition, user, + containers, res, decrPending); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.computeAllocateResources(containers, res, decrPending); } } } + public void internalAllocateResources(String partition, String user, + int containers, Resource res, boolean decrPending) { + computeAllocateResources(containers, res, decrPending); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalAllocateResources(partition, user, containers, res, + decrPending); + } + if (parent != null) { + parent.internalAllocateResources(partition, user, containers, res, decrPending); + } + } + + private void computeAllocateResources(int containers, Resource res, + boolean decrPending) { + allocatedContainers.incr(containers); + aggregateContainersAllocated.incr(containers); + allocatedMB.incr(res.getMemorySize() * containers); + allocatedVCores.incr(res.getVirtualCores() * containers); + if (decrPending) { + decrementPendingResources(containers, res); + } + } /** * Allocate Resource for container size change. * @param partition Node Partition @@ -475,40 +678,57 @@ public void allocateResources(String partition, String user, * @param res */ public void allocateResources(String partition, String user, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedMB.incr(res.getMemorySize()); - allocatedVCores.incr(res.getVirtualCores()); + allocatedMB.incr(res.getMemorySize()); + allocatedVCores.incr(res.getVirtualCores()); - pendingMB.decr(res.getMemorySize()); - pendingVCores.decr(res.getVirtualCores()); + pendingMB.decr(res.getMemorySize()); + pendingVCores.decr(res.getVirtualCores()); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, res); - } - if (parent != null) { - parent.allocateResources(partition, user, res); - } + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, res); + } + if (parent != null) { + parent.allocateResources(partition, user, res); } } - public void releaseResources(String partition, - String user, int containers, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.decr(containers); - aggregateContainersReleased.incr(containers); - allocatedMB.decr(res.getMemorySize() * containers); - allocatedVCores.decr(res.getVirtualCores() * containers); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(partition, user, containers, res); - } - if (parent != null) { - parent.releaseResources(partition, user, containers, res); + public void releaseResources(String partition, String user, int containers, + Resource res) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + internalReleaseResources(partition, user, containers, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalReleaseResources(partition, user, + containers, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.computeReleaseResources(containers, res); } } } + public void internalReleaseResources(String partition, String user, + int containers, Resource res) { + computeReleaseResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalReleaseResources(partition, user, containers, res); + } + if (parent != null) { + parent.internalReleaseResources(partition, user, containers, res); + } + } + + private void computeReleaseResources(int containers, Resource res) { + allocatedContainers.decr(containers); + aggregateContainersReleased.incr(containers); + allocatedMB.decr(res.getMemorySize() * containers); + allocatedVCores.decr(res.getVirtualCores() * containers); + } + + /** * Release Resource for container size change. * @@ -534,43 +754,91 @@ public void preemptContainer() { } } + public void updatePreemptedMemoryMBSeconds(long mbSeconds) { + aggregateMemoryMBSecondsPreempted.incr(mbSeconds); + if (parent != null) { + parent.updatePreemptedMemoryMBSeconds(mbSeconds); + } + } + + public void updatePreemptedVcoreSeconds(long vcoreSeconds) { + aggregateVcoreSecondsPreempted.incr(vcoreSeconds); + if (parent != null) { + parent.updatePreemptedVcoreSeconds(vcoreSeconds); + } + } + + public void updatePreemptedResources(Resource res) { + aggregateMemoryMBPreempted.incr(res.getMemorySize()); + aggregateVcoresPreempted.incr(res.getVirtualCores()); + if (parent != null) { + parent.updatePreemptedResources(res); + } + } + public void reserveResource(String partition, String user, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - reserveResource(user, res); + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + internalReserveResources(partition, user, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalReserveResources(partition, user, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.incrReserveResources(res); + } } } - public void reserveResource(String user, Resource res) { - reservedContainers.incr(); - reservedMB.incr(res.getMemorySize()); - reservedVCores.incr(res.getVirtualCores()); + public void internalReserveResources(String partition, String user, + Resource res) { + incrReserveResources(res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.reserveResource(user, res); + userMetrics.internalReserveResources(partition, user, res); } if (parent != null) { - parent.reserveResource(user, res); + parent.internalReserveResources(partition, user, res); } } - public void unreserveResource(String user, Resource res) { - reservedContainers.decr(); - reservedMB.decr(res.getMemorySize()); - reservedVCores.decr(res.getVirtualCores()); + public void incrReserveResources(Resource res) { + reservedContainers.incr(); + reservedMB.incr(res.getMemorySize()); + reservedVCores.incr(res.getVirtualCores()); + } + + public void internalUnReserveResources(String partition, String user, + Resource res) { + decrReserveResource(user, res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.unreserveResource(user, res); + userMetrics.internalUnReserveResources(partition, user, res); } if (parent != null) { - parent.unreserveResource(user, res); + parent.internalUnReserveResources(partition, user, res); } } - + public void unreserveResource(String partition, String user, Resource res) { if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - unreserveResource(user, res); + internalUnReserveResources(partition, user, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalUnReserveResources(partition, user, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.decrReserveResource(user, res); + } } } + + public void decrReserveResource(String user, Resource res) { + reservedContainers.decr(); + reservedMB.decr(res.getMemorySize()); + reservedVCores.decr(res.getVirtualCores()); + } public void incrActiveUsers() { activeUsers.incr(); @@ -714,4 +982,22 @@ public long getAggegatedReleasedContainers() { public long getAggregatePreemptedContainers() { return aggregateContainersPreempted.value(); } + + @VisibleForTesting + public long getAggregateMemoryMBPreempted() { + return aggregateMemoryMBPreempted.value(); + } + + @VisibleForTesting + public long getAggregateVcoresPreempted() { + return aggregateVcoresPreempted.value(); + } + + public void setParent(QueueMetrics parent) { + this.parent = parent; + } + + public Queue getParentQueue() { + return parentQueue; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index fa16effd25f4d..25062b9fc05fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -58,6 +58,50 @@ public void tearDown() { QueueMetrics.clearQueueMetrics(); DefaultMetricsSystem.shutdown(); } + + @Test + public void testActiveUsersWhenMove() { + final String user = "user1"; + Queue parentQueue = createQueue("parent", null); + Queue queue1 = createQueue("queue1", parentQueue); + Queue queue2 = createQueue("queue2", parentQueue); + Queue queue3 = createQueue("queue3", parentQueue); + + ApplicationAttemptId appAttId = createAppAttemptId(0, 0); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getEpoch()).thenReturn(3L); + SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, + user, queue1, queue1.getAbstractUsersManager(), rmContext); + + // Resource request + Resource requestedResource = Resource.newInstance(1536, 2); + Priority requestedPriority = Priority.newInstance(2); + ResourceRequest request = ResourceRequest.newInstance(requestedPriority, + ResourceRequest.ANY, requestedResource, 1); + app.updateResourceRequests(Arrays.asList(request)); + + assertEquals(1, queue1.getAbstractUsersManager().getNumActiveUsers()); + // move app from queue1 to queue2 + app.move(queue2); + // Active user count has to decrease from queue1 + assertEquals(0, queue1.getAbstractUsersManager().getNumActiveUsers()); + // Increase the active user count in queue2 if the moved app has pending requests + assertEquals(1, queue2.getAbstractUsersManager().getNumActiveUsers()); + + // Allocated container + RMContainer container1 = createRMContainer(appAttId, 1, requestedResource); + app.liveContainers.put(container1.getContainerId(), container1); + SchedulerNode node = createNode(); + app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, + toSchedulerKey(requestedPriority), container1); + + // Active user count has to decrease from queue2 due to app has NO pending requests + assertEquals(0, queue2.getAbstractUsersManager().getNumActiveUsers()); + // move app from queue2 to queue3 + app.move(queue3); + // Active user count in queue3 stays same if the moved app has NO pending requests + assertEquals(0, queue3.getAbstractUsersManager().getNumActiveUsers()); + } @Test public void testMove() { @@ -91,7 +135,7 @@ public void testMove() { app.liveContainers.put(container1.getContainerId(), container1); SchedulerNode node = createNode(); app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, - toSchedulerKey(requestedPriority), container1.getContainer()); + toSchedulerKey(requestedPriority), container1); // Reserved container Priority prio1 = Priority.newInstance(1); @@ -165,7 +209,7 @@ private Queue createQueue(String name, Queue parent) { private Queue createQueue(String name, Queue parent, float capacity) { QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf); QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null, - null, QueueState.RUNNING, null, "", null, false); + null, QueueState.RUNNING, null, "", null, false, null, false); ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics); Queue queue = mock(Queue.class); when(queue.getMetrics()).thenReturn(metrics); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 740ef33662972..636cb99da0c0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -25,9 +25,11 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -47,11 +49,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestPartitionQueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -547,6 +552,68 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + @Test (timeout = 120000) + public void testRMContainerLeakInLeafQueue() throws Exception { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), + NodeId.newInstance("h2", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = + new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { + @Override public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x + rm1.registerNode("h2:1234", 8 * GB); // label = x + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(1 * GB, "app1", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + RMApp app2 = rm1.submitApp(1 * GB, "app2", "user", null, "a1"); + MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // request a container. + am1.allocate("*", 7 * GB, 2, new ArrayList()); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + + // Do node heartbeats 1 time + // scheduler will reserve a container for app1 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Check if a 4G container allocated for app1, and 4G is reserved + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + + // kill app2 then do node heartbeat 1 time + // scheduler will allocate a container from the reserved container on nm1 + rm1.killApp(app2.getApplicationId()); + rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(0, schedulerApp1.getReservedContainers().size()); + + // After kill app1, LeafQueue#ignorePartitionExclusivityRMContainers should + // be clean, otherwise resource leak happened + rm1.killApp(app1.getApplicationId()); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + Assert.assertEquals(0, leafQueue.getIgnoreExclusivityRMContainers().size()); + + rm1.close(); + } + private void checkPendingResource(MockRM rm, int priority, ApplicationAttemptId attemptId, int memory) { CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); @@ -1926,6 +1993,15 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a"); + assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + LeafQueue leafQueueB = (LeafQueue) cs.getQueue("b"); + assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB()); + // app1 -> a RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -1933,7 +2009,6 @@ public RMNodeLabelsManager createNodeLabelManager() { // app1 asks for 5 partition=x containers am1.allocate("*", 1 * GB, 5, new ArrayList(), "x"); // NM1 do 50 heartbeats - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); @@ -1957,17 +2032,22 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(10 * GB, reportNm2.getAvailableResource().getMemorySize()); - LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); - assertEquals(5 * GB, leafQueue.getMetrics().getAvailableMB()); - assertEquals(0 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB()); + + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(0 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a cs.killAllAppsInQueue("a"); rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); rm1.waitForAppRemovedFromScheduler(app1.getApplicationId()); - assertEquals(0 * GB, leafQueue.getMetrics().getUsedAMResourceMB()); - assertEquals(0, leafQueue.getMetrics().getUsedAMResourceVCores()); + assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB()); + assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores()); rm1.close(); } @@ -2008,6 +2088,8 @@ public void testQueueMetricsWithLabelsOnDefaultLabelNode() throws Exception { csConf.setCapacityByLabel(queueB, "x", 50); csConf.setMaximumCapacityByLabel(queueB, "x", 50); + csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + // set node -> label mgr.addToCluserNodeLabels( ImmutableSet.of(NodeLabel.newInstance("x", false))); @@ -2026,6 +2108,54 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + double delta = 0.0001; + CSQueue leafQueue = cs.getQueue("a"); + CSQueue leafQueueB = cs.getQueue("b"); + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + + MetricsSystem ms = leafQueueB.getMetrics().getMetricsSystem(); + QueueMetrics partXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x"); + QueueMetrics partDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, ""); + QueueMetrics queueAMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a"); + QueueMetrics queueBMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.b"); + QueueMetrics queueAPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a"); + QueueMetrics queueAPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a"); + QueueMetrics queueBPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.b"); + QueueMetrics queueBPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.b"); + QueueMetrics rootMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root"); + assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); + assertEquals(10 * GB, rootMetrics.getAvailableMB(), delta); + assertEquals(2.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta); + // app1 -> a RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); @@ -2033,54 +2163,651 @@ public RMNodeLabelsManager createNodeLabelManager() { // app1 asks for 3 partition= containers am1.allocate("*", 1 * GB, 3, new ArrayList()); - // NM1 do 50 heartbeats - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - - SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); for (int i = 0; i < 50; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); } + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } // app1 gets all resource in partition=x (non-exclusive) Assert.assertEquals(3, schedulerNode1.getNumContainers()); - SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() .getNodeReport(nm1.getNodeId()); Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize()); Assert.assertEquals(7 * GB, reportNm1.getAvailableResource().getMemorySize()); - SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() .getNodeReport(nm2.getNodeId()); Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize()); Assert.assertEquals(9 * GB, reportNm2.getAvailableResource().getMemorySize()); - - LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); - double delta = 0.0001; - // 3GB is used from label x quota. 1.5 GB is remaining from default label. - // 2GB is remaining from label x. - assertEquals(6.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAMetrics.getAllocatedMB(), delta); + assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta); + assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta); + assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta); + assertEquals(1.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta); + + QueueMetrics partDefaultQueueAUserMetrics = + (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "", "user", + "root.a"); + QueueMetrics partXQueueAUserMetrics = + (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "x", "user", + "root.a"); + QueueMetrics queueAUserMetrics = + (QueueMetrics) TestQueueMetrics.userSource(ms, "root.a", "user"); + + assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAUserMetrics.getAllocatedMB(), delta); + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); - // app1 asks for 1 default partition container am1.allocate("*", 1 * GB, 5, new ArrayList()); - // NM2 do couple of heartbeats - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - // app1 gets all resource in default partition Assert.assertEquals(2, schedulerNode2.getNumContainers()); + Assert.assertEquals(3, schedulerNode1.getNumContainers()); // 3GB is used from label x quota. 2GB used from default label. - // So total 2.5 GB is remaining. - assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + // So 0.5 GB is remaining from default label. + assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB()); assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB()); + // The total memory tracked by QueueMetrics is 10GB + // for the default partition + assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); + + assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), + delta); + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta); + assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta); + assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta); + + // Pending Resources when containers are waiting on "default" partition + assertEquals(4 * GB, queueAMetrics.getPendingMB(), delta); + assertEquals(4 * GB, queueAPartDefaultMetrics.getPendingMB(), delta); + assertEquals(4 * GB, partDefaultQueueAUserMetrics.getPendingMB(), + delta); + assertEquals(4 * GB, queueAUserMetrics.getPendingMB(), delta); + assertEquals(4 * GB, partDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partXMetrics.getPendingMB(), delta); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(7 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), + delta); + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(0 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(7 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta); + assertEquals(3 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(7 * GB, partXMetrics.getAllocatedMB(), delta); + assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta); + + // Pending Resources after containers has been assigned on "x" partition + assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partDefaultQueueAUserMetrics.getPendingMB(), + delta); + assertEquals(0 * GB, queueAUserMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partXMetrics.getPendingMB(), delta); + + rm1.killApp(app1.getApplicationId()); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + assertEquals(2, queueAMetrics.getAggregateAllocatedContainers()); + assertEquals(2, queueAMetrics.getAggegatedReleasedContainers()); + assertEquals(2, queueAPartDefaultMetrics.getAggregateAllocatedContainers()); + assertEquals(2, queueAPartDefaultMetrics.getAggegatedReleasedContainers()); + assertEquals(7, partXMetrics.getAggregateAllocatedContainers()); + assertEquals(2, partDefaultMetrics.getAggregateAllocatedContainers()); + assertEquals(7, queueAPartXMetrics.getAggregateAllocatedContainers()); + assertEquals(7, queueAPartXMetrics.getAggegatedReleasedContainers()); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + rm1.close(); + } + + @Test + public void testQueueMetricsWithMixedLabels() throws Exception { + // There is only one queue which can access both default label and label x. + // There are two nodes of 10GB label x and 12GB no label. + // The test is to make sure that the queue metrics is only tracking the + // allocations and availability from default partition + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a"}); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 100); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 100); + csConf.setMaximumCapacityByLabel(queueA, "x", 100); + + // set node -> label + // label x exclusivity is set to true + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", true))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a"); + assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + // app1 -> a + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app1 asks for 5 partition=x containers + am1.allocate("*", 1 * GB, 5, new ArrayList(), "x"); + // NM1 do 50 heartbeats + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode1.getNumContainers()); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() + .getNodeReport(nm2.getNodeId()); + Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(12 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + // app2 -> a + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a", ""); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // app2 asks for 5 partition= containers + am2.allocate("*", 1 * GB, 5, new ArrayList(), ""); + // NM2 do 50 heartbeats + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode2.getNumContainers()); + + reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId()); + Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(6 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + // The total memory tracked by QueueMetrics is 12GB + // for the default partition + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); + + // Kill all apps in queue a + cs.killAllAppsInQueue("a"); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + rm1.waitForAppRemovedFromScheduler(app1.getApplicationId()); + + assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB()); + assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores()); + rm1.close(); + } + + @Test + public void testTwoLevelQueueMetricsWithLabels() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a"}); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 100); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 100); + csConf.setMaximumCapacityByLabel(queueA, "x", 100); + + csConf.setQueues(queueA, new String[] {"a1"}); + final String queueA1 = queueA + ".a1"; + csConf.setCapacity(queueA1, 100); + + csConf.setAccessibleNodeLabels(queueA1, toSet("x")); + csConf.setCapacityByLabel(queueA1, "x", 100); + csConf.setMaximumCapacityByLabel(queueA1, "x", 100); + + // set node -> label + // label x exclusivity is set to true + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", true))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + ParentQueue leafQueueA = (ParentQueue) cs.getQueue("a"); + LeafQueue leafQueueA1 = (LeafQueue) cs.getQueue("a1"); + assertEquals(12 * GB, leafQueueA1.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA1.getMetrics().getAllocatedMB()); + MetricsSystem ms = leafQueueA1.getMetrics().getMetricsSystem(); + QueueMetrics partXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x"); + QueueMetrics partDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, ""); + QueueMetrics queueAPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a"); + QueueMetrics queueAPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a"); + QueueMetrics queueA1PartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a.a1"); + QueueMetrics queueA1PartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a.a1"); + QueueMetrics queueRootPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root"); + QueueMetrics queueRootPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root"); + QueueMetrics queueAMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a"); + QueueMetrics queueA1Metrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a.a1"); + QueueMetrics queueRootMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root"); + assertEquals(12 * GB, queueAMetrics.getAvailableMB()); + assertEquals(12 * GB, queueA1Metrics.getAvailableMB()); + assertEquals(12 * GB, queueRootMetrics.getAvailableMB()); + assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(10 * GB, queueA1PartXMetrics.getAvailableMB()); + assertEquals(10 * GB, queueAPartXMetrics.getAvailableMB()); + assertEquals(10 * GB, queueRootPartXMetrics.getAvailableMB()); + assertEquals(12 * GB, queueA1PartDefaultMetrics.getAvailableMB()); + assertEquals(12 * GB, queueAPartDefaultMetrics.getAvailableMB()); + assertEquals(12 * GB, queueRootPartDefaultMetrics.getAvailableMB()); + assertEquals(10 * GB, partXMetrics.getAvailableMB()); + assertEquals(12 * GB, partDefaultMetrics.getAvailableMB()); + + // app1 -> a + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app1 asks for 5 partition=x containers + am1.allocate("*", 1 * GB, 5, new ArrayList(), "x"); + // NM1 do 50 heartbeats + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode1.getNumContainers()); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, reportNm1.getAvailableResource().getMemorySize()); + + SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() + .getNodeReport(nm2.getNodeId()); + Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(12 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(0 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueA1Metrics.getAllocatedMB()); + assertEquals(0 * GB, queueRootMetrics.getAllocatedMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueA1PartDefaultMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueAPartDefaultMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueRootPartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, partXMetrics.getAllocatedMB()); + assertEquals(0 * GB, partDefaultMetrics.getAllocatedMB()); + + // app2 -> a + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a1", ""); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // app2 asks for 5 partition= containers + am2.allocate("*", 1 * GB, 5, new ArrayList(), ""); + // NM2 do 50 heartbeats + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode2.getNumContainers()); + + reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId()); + Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(6 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + // The total memory tracked by QueueMetrics is 12GB + // for the default partition + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); + + assertEquals(6 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueA1Metrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueA1PartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueAPartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootPartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, partXMetrics.getAllocatedMB()); + assertEquals(6 * GB, partDefaultMetrics.getAllocatedMB()); + + // Kill all apps in queue a + cs.killAllAppsInQueue("a1"); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + rm1.waitForAppRemovedFromScheduler(app1.getApplicationId()); + assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB()); + assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores()); + rm1.close(); + } + + @Test + public void testQueueMetricsWithLabelsDisableElasticity() throws Exception { + /** + * Test case: have a following queue structure: + * + *
+     *
+     *          root
+     *        /      \
+     *       a        b
+     *      (x)      (x)
+     *      / \
+     *     a1 a2
+     *    (x) (x)
+     * 
+ * + * a/b can access x, both of them has max-capacity-on-x = 50 + * + * When doing non-exclusive allocation, app in a (or b) can use 100% of x + * resource. + */ + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 50); + csConf.setMaximumCapacity(queueA, 100); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 50); + csConf.setMaximumCapacityByLabel(queueA, "x", 100); + final String queueB = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(queueB, 50); + csConf.setMaximumCapacity(queueB, 100); + csConf.setAccessibleNodeLabels(queueB, toSet("x")); + csConf.setCapacityByLabel(queueB, "x", 50); + csConf.setMaximumCapacityByLabel(queueB, "x", 100); + + // Define 2nd-level queues + csConf.setQueues(queueA, new String[] { "a1", + "a2"}); + + final String A1 = queueA + ".a1"; + csConf.setCapacity(A1, 20); + csConf.setMaximumCapacity(A1, 60); + csConf.setAccessibleNodeLabels(A1, toSet("x")); + csConf.setCapacityByLabel(A1, "x", 60); + csConf.setMaximumCapacityByLabel(A1, "x", 30); + + final String A2 = queueA + ".a2"; + csConf.setCapacity(A2, 80); + csConf.setMaximumCapacity(A2, 40); + csConf.setAccessibleNodeLabels(A2, toSet("x")); + csConf.setCapacityByLabel(A2, "x", 40); + csConf.setMaximumCapacityByLabel(A2, "x", 20); + + // set node -> label + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app1 asks for 6 partition=x containers + am1.allocate("*", 1 * GB, 6, new ArrayList(), "x"); + + // NM1 do 50 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + doNMHeartbeat(rm1, nm1.getNodeId(), 50); + checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(14 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + // Try to launch app2 in a2, asked 2GB, should success + // app2 -> a2 + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // app2 asks for 4 partition=x containers + am2.allocate("*", 1 * GB, 4, new ArrayList(), "x"); + // NM1 do 50 heartbeats + + doNMHeartbeat(rm1, nm1.getNodeId(), 50); + checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + + reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(10 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + // Kill all apps in queue a2 + cs.killAllAppsInQueue("a2"); + rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED); + rm1.waitForAppRemovedFromScheduler(app2.getApplicationId()); + + // Try to launch app3 in a2, asked 6GB, should fail + // app3 -> a2 + RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); + + am3.allocate("*", 1 * GB, 6, new ArrayList(), "x"); + // NM1 do 50 heartbeats + doNMHeartbeat(rm1, nm1.getNodeId(), 50); + // app3 cannot preempt more resources restricted by disable elasticity + checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(), + cs.getApplicationAttempt(am3.getApplicationAttemptId())); + + Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(10 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + // Kill all apps in queue a1 + cs.killAllAppsInQueue("a1"); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + rm1.waitForAppRemovedFromScheduler(app1.getApplicationId()); + + // app4 -> a1, try to allocate more than 6GB resource, should fail + RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x"); + MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm1); + + // app3 asks for 7 partition=x containers + am4.allocate("*", 1 * GB, 7, new ArrayList(), "x"); + // NM1 do 50 heartbeats + doNMHeartbeat(rm1, nm1.getNodeId(), 50); + + // app4 should only gets 6GB resource in partition=x + // since elasticity is disabled + checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(), + cs.getApplicationAttempt(am4.getApplicationAttemptId())); + + Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(10 * GB, + reportNm1.getAvailableResource().getMemorySize()); + rm1.close(); } } From d1f68ca2d4ce8a6746860f7704fe17cee169ea04 Mon Sep 17 00:00:00 2001 From: "mousumi.kundu" Date: Mon, 10 Aug 2020 18:38:22 +0530 Subject: [PATCH 2/2] YARN-6492-branch-2.9.015 --- .../hadoop/yarn/util/resource/Resources.java | 13 + .../scheduler/AppSchedulingInfo.java | 28 +- .../scheduler/ContainerUpdateContext.java | 12 +- .../scheduler/PartitionQueueMetrics.java | 89 +++ .../SchedulerApplicationAttempt.java | 6 +- .../scheduler/capacity/CSQueueMetrics.java | 7 +- .../scheduler/capacity/LeafQueue.java | 34 +- .../common/fica/FiCaSchedulerApp.java | 3 +- .../scheduler/fair/FSAppAttempt.java | 2 +- .../scheduler/fifo/FifoAppAttempt.java | 2 +- .../scheduler/TestAppSchedulingInfo.java | 7 +- .../scheduler/TestPartitionQueueMetrics.java | 752 ++++++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 4 +- 13 files changed, 923 insertions(+), 36 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 932fb821f4b75..402645f7beb73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -192,6 +192,19 @@ public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) { } return lhs; } + + /** + * Subtract {@code rhs} from {@code lhs} and reset any negative values to + * zero. This call will operate on a copy of {@code lhs}, leaving {@code lhs} + * unmodified. + * + * @param lhs {@link Resource} to subtract from + * @param rhs {@link Resource} to subtract + * @return the value of lhs after subtraction + */ + public static Resource subtractNonNegative(Resource lhs, Resource rhs) { + return subtractFromNonNegative(clone(lhs), rhs); + } public static Resource negate(Resource resource) { return subtract(NONE, resource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 5034b713107aa..14556381b7b50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -89,10 +90,12 @@ public class AppSchedulingInfo { private final ReentrantReadWriteLock.WriteLock writeLock; public final ContainerUpdateContext updateContext; + + private final RMContext rmContext; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, - long epoch, ResourceUsage appResourceUsage) { + long epoch, ResourceUsage appResourceUsage, RMContext rmContext) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; @@ -106,6 +109,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, updateContext = new ContainerUpdateContext(this); readLock = lock.readLock(); writeLock = lock.writeLock(); + this.rmContext = rmContext; } public ApplicationId getApplicationId() { @@ -437,7 +441,7 @@ public boolean isPlaceBlacklisted(String resourceName, public List allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, - Container containerAllocated) { + RMContainer containerAllocated) { try { writeLock.lock(); @@ -591,7 +595,7 @@ public boolean checkAllocation(NodeType type, SchedulerNode node, } private void updateMetricsForAllocatedContainer(NodeType type, - SchedulerNode node, Container containerAllocated) { + SchedulerNode node, RMContainer containerAllocated) { QueueMetrics metrics = queue.getMetrics(); if (pending) { // once an allocation is done we assume the application is @@ -602,14 +606,16 @@ private void updateMetricsForAllocatedContainer(NodeType type, if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationId=" + applicationId + " container=" - + containerAllocated.getId() + " host=" + containerAllocated - .getNodeId().toString() + " user=" + user + " resource=" - + containerAllocated.getResource() + " type=" - + type); + + containerAllocated.getContainer().getId() + " host=" + + containerAllocated.getContainer().getNodeId().toString() + " user=" + + user + " resource=" + + containerAllocated.getContainer().getResource() + " type=" + type); } - if(node != null) { + if (node != null) { metrics.allocateResources(node.getPartition(), user, 1, - containerAllocated.getResource(), true); + containerAllocated.getContainer().getResource(), false); + metrics.decrPendingResources(containerAllocated.getNodeLabelExpression(), + user, 1, containerAllocated.getContainer().getResource()); } metrics.incrNodeTypeAggregations(user, type); } @@ -655,4 +661,8 @@ public boolean acceptNodePartition(SchedulerRequestKey schedulerKey, this.readLock.unlock(); } } + + public RMContext getRMContext() { + return this.rmContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 5ac2ac5918e51..93f5891dcec29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -162,10 +162,16 @@ private void cancelPreviousRequest(SchedulerNode schedulerNode, // Decrement the pending using a dummy RR with // resource = prev update req capability if (prevReq != null) { + Container container = Container.newInstance(UNDEFINED, + schedulerNode.getNodeID(), "host:port", prevReq.getCapability(), + schedulerKey.getPriority(), null); appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode, - schedulerKey, Container.newInstance(UNDEFINED, - schedulerNode.getNodeID(), "host:port", - prevReq.getCapability(), schedulerKey.getPriority(), null)); + schedulerKey, + new RMContainerImpl(container, schedulerKey, + appSchedulingInfo.getApplicationAttemptId(), + schedulerNode.getNodeID(), appSchedulingInfo.getUser(), + appSchedulingInfo.getRMContext(), + schedulingPlacementSet.getPrimaryRequestedNodePartition())); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java new file mode 100644 index 0000000000000..75e8380d5533a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +@Metrics(context = "yarn") +public class PartitionQueueMetrics extends QueueMetrics { + + private String partition; + + protected PartitionQueueMetrics(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf, + String partition) { + super(ms, queueName, parent, enableUserMetrics, conf); + this.partition = partition; + if (getParentQueue() != null) { + String newQueueName = (getParentQueue() instanceof CSQueue) + ? ((CSQueue) getParentQueue()).getQueuePath() + : getParentQueue().getQueueName(); + String parentMetricName = + partition + METRIC_NAME_DELIMITER + newQueueName; + setParent(getQueueMetrics().get(parentMetricName)); + } + } + + /** + * Partition * Queue * User Metrics + * + * Computes Metrics at Partition (Node Label) * Queue * User Level. + * + * Sample JMX O/P Structure: + * + * PartitionQueueMetrics (labelX) + * QueueMetrics (A) + * usermetrics + * QueueMetrics (A1) + * usermetrics + * QueueMetrics (A2) + * usermetrics + * QueueMetrics (B) + * usermetrics + * + * @return QueueMetrics + */ + @Override + public synchronized QueueMetrics getUserMetrics(String userName) { + if (users == null) { + return null; + } + + String partitionJMXStr = + (partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR + : partition; + + QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName); + if (metrics == null) { + metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName, + null, false, this.conf, this.partition); + users.put(userName, metrics); + metricsSystem.register( + pSourceName(partitionJMXStr).append(qSourceName(queueName)) + .append(",user=").append(userName).toString(), + "Metrics for user '" + userName + "' in queue '" + queueName + "'", + ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr) + .tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName)); + } + return metrics; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 05dc8343c4bc5..11d1078abbaac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -204,9 +204,9 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, RMContext rmContext) { Preconditions.checkNotNull(rmContext, "RMContext should not be null"); this.rmContext = rmContext; - this.appSchedulingInfo = - new AppSchedulingInfo(applicationAttemptId, user, queue, - abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage); + this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, + queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage, + this.rmContext); this.queue = queue; this.pendingRelease = Collections.newSetFromMap( new ConcurrentHashMap()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index 87fc23458a288..d6ac5b1e1e268 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -129,7 +129,7 @@ public void setAbsoluteUsedCapacity(String partition, public synchronized static CSQueueMetrics forQueue(String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { MetricsSystem ms = DefaultMetricsSystem.instance(); - QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName); + QueueMetrics metrics = getQueueMetrics().get(queueName); if (metrics == null) { metrics = new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf) @@ -141,7 +141,7 @@ public synchronized static CSQueueMetrics forQueue(String queueName, ms.register(sourceName(queueName).toString(), "Metrics for queue: " + queueName, metrics); } - QueueMetrics.getQueueMetrics().put(queueName, metrics); + getQueueMetrics().put(queueName, metrics); } return (CSQueueMetrics) metrics; @@ -154,7 +154,8 @@ public synchronized QueueMetrics getUserMetrics(String userName) { } CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName); if (metrics == null) { - metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new CSQueueMetrics(metricsSystem, queueName, null, false, conf); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index e8814951efc00..fdfda838e80f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1357,8 +1357,9 @@ private Resource getHeadroom(User user, : getQueueMaxResource(partition, clusterResource); Resource headroom = Resources.componentwiseMin( - Resources.subtract(userLimitResource, user.getUsed(partition)), - Resources.subtract(currentPartitionResourceLimit, + Resources.subtractNonNegative(userLimitResource, + user.getUsed(partition)), + Resources.subtractNonNegative(currentPartitionResourceLimit, queueUsage.getUsed(partition))); // Normalize it before return headroom = @@ -1660,12 +1661,17 @@ void allocateResource(Resource clusterResource, User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, true); - // Note this is a bit unconventional since it gets the object and modifies - // it here, rather then using set routine - Resources.subtractFrom(application.getHeadroom(), resource); // headroom - metrics.setAvailableResourcesToUser(nodePartition, - userName, application.getHeadroom()); - + Resource partitionHeadroom = Resources.createResource(0, 0); + if (metrics.getUserMetrics(userName) != null) { + partitionHeadroom = getHeadroom(user, + cachedResourceLimitsForHeadroom.getLimit(), clusterResource, + getResourceLimitForActiveUsers(userName, clusterResource, + nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + nodePartition); + } + metrics.setAvailableResourcesToUser(nodePartition, userName, + partitionHeadroom); + if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage.getUsed(nodePartition) + " numContainers=" @@ -1703,8 +1709,16 @@ void releaseResource(Resource clusterResource, User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, false); - metrics.setAvailableResourcesToUser(nodePartition, - userName, application.getHeadroom()); + Resource partitionHeadroom = Resources.createResource(0, 0); + if (metrics.getUserMetrics(userName) != null) { + partitionHeadroom = getHeadroom(user, + cachedResourceLimitsForHeadroom.getLimit(), clusterResource, + getResourceLimitForActiveUsers(userName, clusterResource, + nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + nodePartition); + } + metrics.setAvailableResourcesToUser(nodePartition, userName, + partitionHeadroom); if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 2d8a9d514bcae..78858c38146fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -535,8 +535,7 @@ public boolean apply(Resource cluster, List requests = appSchedulingInfo.allocate( allocation.getAllocationLocalityType(), schedulerContainer.getSchedulerNode(), - schedulerContainer.getSchedulerRequestKey(), - schedulerContainer.getRmContainer().getContainer()); + schedulerContainer.getSchedulerRequestKey(), rmContainer); ((RMContainerImpl) rmContainer).setResourceRequests(requests); attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 21863b83fc56a..1a46db1ca7bd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -468,7 +468,7 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( - type, node, schedulerKey, container); + type, node, schedulerKey, rmContainer); this.attemptResourceUsage.incUsed(container.getResource()); getQueue().incUsedResource(container.getResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index d932e0e089887..4622fc819558a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -81,7 +81,7 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node, // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( - type, node, schedulerKey, container); + type, node, schedulerKey, rmContainer); attemptResourceUsage.incUsed(node.getPartition(), container.getResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index bb29889a85477..3afa0ec6aee47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -47,8 +48,9 @@ public void testBacklistChanged() { FSLeafQueue queue = mock(FSLeafQueue.class); doReturn("test").when(queue).getQueueName(); + RMContext rmContext = mock(RMContext.class); AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo( - appAttemptId, "test", queue, null, 0, new ResourceUsage()); + appAttemptId, "test", queue, null, 0, new ResourceUsage(), rmContext); appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList(), new ArrayList()); @@ -118,9 +120,10 @@ public void testSchedulerKeyAccounting() { Queue queue = mock(Queue.class); doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); + RMContext rmContext = mock(RMContext.class); AppSchedulingInfo info = new AppSchedulingInfo( appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, - new ResourceUsage()); + new ResourceUsage(), rmContext); Assert.assertEquals(0, info.getSchedulerKeys().size()); Priority pri1 = Priority.newInstance(1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java new file mode 100644 index 0000000000000..eb240d1b6d338 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java @@ -0,0 +1,752 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestPartitionQueueMetrics { + + static final int GB = 1024; // MB + private static final Configuration CONF = new Configuration(); + + private MetricsSystem ms; + + @Before + public void setUp() { + ms = new MetricsSystemImpl(); + QueueMetrics.clearQueueMetrics(); + PartitionQueueMetrics.clearQueueMetrics(); + } + + @After + public void tearDown() { + ms.shutdown(); + } + + /** + * Structure: + * Both queues, q1 & q2 has been configured to run in only 1 partition, x. + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + + @Test + public void testSinglePartitionWithSingleLevelQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, CONF); + when(parentQueue.getMetrics()).thenReturn(root); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF); + QueueMetrics q2 = + QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + q1.setAvailableResourcesToQueue("x", + Resources.createResource(100 * GB, 100)); + + q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2); + + q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = queueSource(ms, "x", "root.q2"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3); + } + + /** + * Structure: + * Both queues, q1 & q2 has been configured to run in both partitions, x & y. + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test + public void testTwoPartitionWithSingleLevelQueueMetrics() throws Exception { + + String parentQueueName = "root"; + String user = "alice"; + + QueueMetrics root = + QueueMetrics.forQueue(ms, parentQueueName, null, false, CONF); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getMetrics()).thenReturn(root); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, false, CONF); + QueueMetrics q2 = + QueueMetrics.forQueue(ms, "root.q2", parentQueue, false, CONF); + + AppSchedulingInfo app = mockApp(user); + q1.submitApp(user); + q1.submitAppAttempt(user); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + q1.setAvailableResourcesToQueue("x", + Resources.createResource(100 * GB, 100)); + + q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource xPartitionSource = partitionSource(ms, "x"); + MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(xPartitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(xRootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2); + + root.setAvailableResourcesToQueue("y", + Resources.createResource(400 * GB, 400)); + q2.setAvailableResourcesToQueue("y", + Resources.createResource(200 * GB, 200)); + + q2.incrPendingResources("y", user, 3, Resource.newInstance(1024, 1)); + + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q2Source = queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3); + checkResources(q2Source, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3); + } + + /** + * Structure: + * Both queues, q1 has been configured to run in multiple partitions, x & y. + * + * root + * / + * q1 + * + * @throws Exception + */ + @Test + public void testMultiplePartitionWithSingleQueueMetrics() throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF); + when(parentQueue.getMetrics()).thenReturn(root); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + root.setAvailableResourcesToQueue("y", + Resources.createResource(300 * GB, 300)); + + q1.incrPendingResources("x", "test_user", 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2); + checkResources(userSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2); + + q1.incrPendingResources("x", "test_user", 3, Resource.newInstance(1024, 1)); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5); + checkResources(q1Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + checkResources(userSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + + q1.incrPendingResources("x", "test_user1", 4, + Resource.newInstance(1024, 1)); + MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9); + checkResources(q1Source, 0, 0, 0, 0, 0, 9 * GB, 9, 9); + checkResources(userSource1, 0, 0, 0, 0, 0, 4 * GB, 4, 4); + + q1.incrPendingResources("y", "test_user1", 6, + Resource.newInstance(1024, 1)); + MetricsSource partitionSourceY = partitionSource(ms, "y"); + MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName); + MetricsSource q1SourceY = queueSource(ms, "y", "root.q1"); + MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1"); + + checkResources(partitionSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6); + checkResources(rootQueueSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6); + checkResources(q1SourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6); + checkResources(userSourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6); + } + + /** + * Structure: + * Both queues, q1 & q2 has been configured to run in both partitions, x & y. + * + * root + * / \ + * q1 q2 + * q1 + * / \ + * q11 q12 + * q2 + * / \ + * q21 q22 + * + * @throws Exception + */ + + @Test + public void testMultiplePartitionsWithMultiLevelQueuesMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + when(parentQueue.getMetrics()).thenReturn(root); + + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF); + Queue childQueue1 = mock(Queue.class); + when(childQueue1.getQueueName()).thenReturn("root.q1"); + when(childQueue1.getMetrics()).thenReturn(q1); + + QueueMetrics q11 = + QueueMetrics.forQueue(ms, "root.q1.q11", childQueue1, true, CONF); + QueueMetrics q12 = + QueueMetrics.forQueue(ms, "root.q1.q12", childQueue1, true, CONF); + + QueueMetrics q2 = + QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF); + Queue childQueue2 = mock(Queue.class); + when(childQueue2.getQueueName()).thenReturn("root.q2"); + when(childQueue2.getMetrics()).thenReturn(q2); + + QueueMetrics q21 = + QueueMetrics.forQueue(ms, "root.q2.q21", childQueue2, true, CONF); + QueueMetrics q22 = + QueueMetrics.forQueue(ms, "root.q2.q22", childQueue2, true, CONF); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + + q1.setAvailableResourcesToQueue("x", + Resources.createResource(100 * GB, 100)); + q11.setAvailableResourcesToQueue("x", + Resources.createResource(50 * GB, 50)); + + q11.incrPendingResources("x", "test_user", 2, + Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2); + checkResources(userSource, 0, 0, 0, 0 * GB, 0, 2 * GB, 2, 2); + + q11.incrPendingResources("x", "test_user", 4, + Resource.newInstance(1024, 1)); + + MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6); + checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 6 * GB, 6, 6); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 6 * GB, 6, 6); + checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6); + + q11.incrPendingResources("x", "test_user1", 5, + Resource.newInstance(1024, 1)); + + MetricsSource q1UserSource1 = userSource(ms, "x", "test_user1", "root.q1"); + MetricsSource userSource1 = + userSource(ms, "x", "test_user1", "root.q1.q11"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11); + checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 11 * GB, 11, 11); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 11 * GB, 11, 11); + checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6); + checkResources(q1UserSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5); + checkResources(userSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5); + + q12.incrPendingResources("x", "test_user", 5, + Resource.newInstance(1024, 1)); + MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 16 * GB, 16, 16); + checkResources(q12Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + + root.setAvailableResourcesToQueue("y", + Resources.createResource(200 * GB, 200)); + q1.setAvailableResourcesToQueue("y", + Resources.createResource(100 * GB, 100)); + q12.setAvailableResourcesToQueue("y", + Resources.createResource(50 * GB, 50)); + + q12.incrPendingResources("y", "test_user", 3, + Resource.newInstance(1024, 1)); + + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q1YSource = queueSource(ms, "y", "root.q1"); + MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12"); + + checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3); + checkResources(q1YSource, 0, 0, 0, 100 * GB, 100, 3 * GB, 3, 3); + checkResources(q12YSource, 0, 0, 0, 50 * GB, 50, 3 * GB, 3, 3); + + root.setAvailableResourcesToQueue("y", + Resources.createResource(200 * GB, 200)); + q2.setAvailableResourcesToQueue("y", + Resources.createResource(100 * GB, 100)); + q21.setAvailableResourcesToQueue("y", + Resources.createResource(50 * GB, 50)); + + q21.incrPendingResources("y", "test_user", 5, + Resource.newInstance(1024, 1)); + MetricsSource q21Source = queueSource(ms, "y", "root.q2.q21"); + MetricsSource q2YSource = queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8); + checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8); + checkResources(q2YSource, 0, 0, 0, 100 * GB, 100, 5 * GB, 5, 5); + checkResources(q21Source, 0, 0, 0, 50 * GB, 50, 5 * GB, 5, 5); + + q22.incrPendingResources("y", "test_user", 6, + Resource.newInstance(1024, 1)); + MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22"); + + checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14); + checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14); + checkResources(q22Source, 0, 0, 0, 0, 0, 6 * GB, 6, 6); + } + + @Test + public void testTwoLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String user = "alice"; + String partition = "x"; + + QueueMetrics parentMetrics = + QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + QueueMetrics metrics = + QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, CONF); + AppSchedulingInfo app = mockApp(user); + + metrics.submitApp(user); + metrics.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100 * GB, 100)); + metrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100 * GB, 100)); + parentMetrics.setAvailableResourcesToUser(partition, user, + Resources.createResource(10 * GB, 10)); + metrics.setAvailableResourcesToUser(partition, user, + Resources.createResource(10 * GB, 10)); + metrics.incrPendingResources(partition, user, 6, + Resources.createResource(3 * GB, 3)); + + MetricsSource partitionSource = partitionSource(ms, partition); + MetricsSource parentQueueSource = + queueSource(ms, partition, parentQueueName); + MetricsSource queueSource = queueSource(ms, partition, leafQueueName); + MetricsSource userSource = userSource(ms, partition, user, leafQueueName); + MetricsSource userSource1 = + userSource(ms, partition, user, parentQueueName); + + checkResources(queueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, 0, + 0, 0); + checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, + 6, 0, 0, 0); + checkResources(userSource, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, 0, + 0); + checkResources(userSource1, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, + 0, 0); + checkResources(partitionSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, + 6, 0, 0, 0); + + metrics.runAppAttempt(app.getApplicationId(), user); + + metrics.allocateResources(partition, user, 3, + Resources.createResource(1 * GB, 1), true); + metrics.reserveResource(partition, user, + Resources.createResource(3 * GB, 3)); + + // Available resources is set externally, as it depends on dynamic + // configurable cluster/queue resources + checkResources(queueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, 15, + 3, 3 * GB, 3, 1); + checkResources(parentQueueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, + 15 * GB, 15, 3, 3 * GB, 3, 1); + checkResources(partitionSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, + 15, 3, 3 * GB, 3, 1); + checkResources(userSource, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3, + 3 * GB, 3, 1); + checkResources(userSource1, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3, + 3 * GB, 3, 1); + + metrics.allocateResources(partition, user, 3, + Resources.createResource(1 * GB, 1), true); + + checkResources(queueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, 12 * GB, 12, + 0, 3 * GB, 3, 1); + checkResources(parentQueueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, + 12 * GB, 12, 0, 3 * GB, 3, 1); + + metrics.releaseResources(partition, user, 1, + Resources.createResource(2 * GB, 2)); + metrics.unreserveResource(partition, user, + Resources.createResource(3 * GB, 3)); + checkResources(queueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, 12, + 0, 0, 0, 0); + checkResources(parentQueueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, + 12 * GB, 12, 0, 0, 0, 0); + checkResources(partitionSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, + 12, 0, 0, 0, 0); + checkResources(userSource, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0, + 0, 0, 0); + checkResources(userSource1, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0, + 0, 0, 0); + + metrics.finishAppAttempt(app.getApplicationId(), app.isPending(), + app.getUser()); + + metrics.finishApp(user, RMAppState.FINISHED); + } + + @Test + public void testThreeLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String leafQueueName1 = "root.leaf.leaf1"; + String user = "alice"; + String partitionX = "x"; + String partitionY = "y"; + + QueueMetrics parentMetrics = + QueueMetrics.forQueue(parentQueueName, null, true, CONF); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + QueueMetrics metrics = + QueueMetrics.forQueue(leafQueueName, parentQueue, true, CONF); + Queue leafQueue = mock(Queue.class); + when(leafQueue.getQueueName()).thenReturn(leafQueueName); + when(leafQueue.getMetrics()).thenReturn(metrics); + QueueMetrics metrics1 = + QueueMetrics.forQueue(leafQueueName1, leafQueue, true, CONF); + AppSchedulingInfo app = mockApp(user); + + metrics1.submitApp(user); + metrics1.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(200 * GB, 200)); + parentMetrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(500 * GB, 500)); + metrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(100 * GB, 100)); + metrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(400 * GB, 400)); + metrics1.setAvailableResourcesToQueue(partitionX, + Resources.createResource(50 * GB, 50)); + metrics1.setAvailableResourcesToQueue(partitionY, + Resources.createResource(300 * GB, 300)); + parentMetrics.setAvailableResourcesToUser(partitionX, user, + Resources.createResource(20 * GB, 20)); + parentMetrics.setAvailableResourcesToUser(partitionY, user, + Resources.createResource(50 * GB, 50)); + metrics.setAvailableResourcesToUser(partitionX, user, + Resources.createResource(10 * GB, 10)); + metrics.setAvailableResourcesToUser(partitionY, user, + Resources.createResource(40 * GB, 40)); + metrics1.setAvailableResourcesToUser(partitionX, user, + Resources.createResource(5 * GB, 5)); + metrics1.setAvailableResourcesToUser(partitionY, user, + Resources.createResource(30 * GB, 30)); + metrics1.incrPendingResources(partitionX, user, 6, + Resources.createResource(3 * GB, 3)); + metrics1.incrPendingResources(partitionY, user, 6, + Resources.createResource(4 * GB, 4)); + + MetricsSource partitionSourceX = + partitionSource(metrics1.getMetricsSystem(), partitionX); + + MetricsSource parentQueueSourceWithPartX = + queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName); + MetricsSource queueSourceWithPartX = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName); + MetricsSource queueSource1WithPartX = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1); + MetricsSource parentUserSourceWithPartX = userSource(metrics1.getMetricsSystem(), + partitionX, user, parentQueueName); + MetricsSource userSourceWithPartX = userSource(metrics1.getMetricsSystem(), + partitionX, user, leafQueueName); + MetricsSource userSource1WithPartX = userSource(metrics1.getMetricsSystem(), + partitionX, user, leafQueueName1); + + checkResources(partitionSourceX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, 18, + 6, 0, 0, 0); + checkResources(parentQueueSourceWithPartX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, + 18, 6, 0, 0, 0); + + checkResources(queueSourceWithPartX, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, + 0, 0, 0); + checkResources(queueSource1WithPartX, 0, 0, 0, 0, 0, 50 * GB, 50, 18 * GB, 18, 6, + 0, 0, 0); + checkResources(parentUserSourceWithPartX, 0, 0, 0, 0, 0, 20 * GB, 20, 18 * GB, 18, + 6, 0, 0, 0); + checkResources(userSourceWithPartX, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, + 0, 0); + checkResources(userSource1WithPartX, 0, 0, 0, 0, 0, 5 * GB, 5, 18 * GB, 18, 6, 0, + 0, 0); + + MetricsSource partitionSourceY = + partitionSource(metrics1.getMetricsSystem(), partitionY); + + MetricsSource parentQueueSourceWithPartY = + queueSource(metrics1.getMetricsSystem(), partitionY, parentQueueName); + MetricsSource queueSourceWithPartY = + queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName); + MetricsSource queueSource1WithPartY = + queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName1); + MetricsSource parentUserSourceWithPartY = userSource(metrics1.getMetricsSystem(), + partitionY, user, parentQueueName); + MetricsSource userSourceWithPartY = userSource(metrics1.getMetricsSystem(), + partitionY, user, leafQueueName); + MetricsSource userSource1WithPartY = userSource(metrics1.getMetricsSystem(), + partitionY, user, leafQueueName1); + + checkResources(partitionSourceY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, 24, + 6, 0, 0, 0); + checkResources(parentQueueSourceWithPartY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, + 24, 6, 0, 0, 0); + checkResources(queueSourceWithPartY, 0, 0, 0, 0, 0, 400 * GB, 400, 24 * GB, 24, 6, + 0, 0, 0); + checkResources(queueSource1WithPartY, 0, 0, 0, 0, 0, 300 * GB, 300, 24 * GB, 24, 6, + 0, 0, 0); + checkResources(parentUserSourceWithPartY, 0, 0, 0, 0, 0, 50 * GB, 50, 24 * GB, 24, + 6, 0, 0, 0); + checkResources(userSourceWithPartY, 0, 0, 0, 0, 0, 40 * GB, 40, 24 * GB, 24, 6, 0, + 0, 0); + checkResources(userSource1WithPartY, 0, 0, 0, 0, 0, 30 * GB, 30, 24 * GB, 24, 6, 0, + 0, 0); + + metrics1.finishAppAttempt(app.getApplicationId(), app.isPending(), + app.getUser()); + + metrics1.finishApp(user, RMAppState.FINISHED); + } + + /** + * Structure: + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * UserMetrics has been disabled, hence trying to access the user source + * throws NPE from sources. + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test(expected = NullPointerException.class) + public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + QueueMetrics root = QueueMetrics.forQueue("root", null, false, CONF); + when(parentQueue.getMetrics()).thenReturn(root); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + CSQueueMetrics q1 = + CSQueueMetrics.forQueue("root.q1", parentQueue, false, CONF); + CSQueueMetrics q2 = + CSQueueMetrics.forQueue("root.q2", parentQueue, false, CONF); + + AppSchedulingInfo app = mockApp(user); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + + q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x"); + MetricsSource rootQueueSource = + queueSource(q1.getMetricsSystem(), "x", parentQueueName); + MetricsSource q1Source = queueSource(q1.getMetricsSystem(), "x", "root.q1"); + MetricsSource q1UserSource = + userSource(q1.getMetricsSystem(), "x", user, "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2); + checkResources(q1UserSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2); + + q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = queueSource(q2.getMetricsSystem(), "x", "root.q2"); + MetricsSource q2UserSource = + userSource(q1.getMetricsSystem(), "x", user, "root.q2"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3); + checkResources(q2UserSource, 0, 0, 0, 0, 0, 3 * GB, 3, 3); + + q1.finishAppAttempt(app.getApplicationId(), app.isPending(), app.getUser()); + q1.finishApp(user, RMAppState.FINISHED); + } + + public static MetricsSource partitionSource(MetricsSystem ms, + String partition) { + MetricsSource s = + ms.getSource(QueueMetrics.pSourceName(partition).toString()); + return s; + } + + public static MetricsSource queueSource(MetricsSystem ms, String partition, + String queue) { + MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition) + .append(QueueMetrics.qSourceName(queue)).toString()); + return s; + } + + public static MetricsSource userSource(MetricsSystem ms, String partition, + String user, String queue) { + MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition) + .append(QueueMetrics.qSourceName(queue)).append(",user=") + .append(user).toString()); + return s; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long availableMB, int availableCores, + long pendingMB, int pendingCores, int pendingCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + } + + private static AppSchedulingInfo mockApp(String user) { + AppSchedulingInfo app = mock(AppSchedulingInfo.class); + when(app.getUser()).thenReturn(user); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1); + when(app.getApplicationAttemptId()).thenReturn(id); + return app; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long aggreAllocCtnrs, + long aggreReleasedCtnrs, long availableMB, int availableCores, + long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB, + int reservedCores, int reservedCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); + assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + assertGauge("ReservedMB", reservedMB, rb); + assertGauge("ReservedVCores", reservedCores, rb); + assertGauge("ReservedContainers", reservedCtnrs, rb); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 8f97ea494ee21..a4fc7b8dee663 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1181,9 +1181,9 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { qb.finishApplication(app_0.getApplicationId(), user_0); qb.finishApplication(app_2.getApplicationId(), user_1); qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1), - null, null); + "", null); qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1), - null, null); + "", null); qb.setUserLimit(50); qb.setUserLimitFactor(1);