Skip to content

Commit f3126d4

Browse files
committed
[CELEBORN-1800] Introduce ApplicationTotalCount and ApplicationFallbackCount metric to record the total and fallback count of application
1 parent 6028a04 commit f3126d4

File tree

22 files changed

+482
-122
lines changed

22 files changed

+482
-122
lines changed

assets/grafana/celeborn-dashboard.json

+181-1
Original file line numberDiff line numberDiff line change
@@ -1472,7 +1472,7 @@
14721472
"type": "prometheus",
14731473
"uid": "${DS_PROMETHEUS}"
14741474
},
1475-
"description": "The total count of shuffle including celeborn shuffle and spark built-in shuffle.",
1475+
"description": "The total count of shuffle including celeborn shuffle and engine built-in shuffle.",
14761476
"fieldConfig": {
14771477
"defaults": {
14781478
"color": {
@@ -1646,6 +1646,186 @@
16461646
],
16471647
"title": "metrics_ShuffleFallbackCount_Value",
16481648
"type": "timeseries"
1649+
},
1650+
{
1651+
"datasource": {
1652+
"type": "prometheus",
1653+
"uid": "${DS_PROMETHEUS}"
1654+
},
1655+
"description": "The total count of application running with celeborn shuffle and engine built-in shuffle.",
1656+
"fieldConfig": {
1657+
"defaults": {
1658+
"color": {
1659+
"mode": "palette-classic"
1660+
},
1661+
"custom": {
1662+
"axisCenteredZero": false,
1663+
"axisColorMode": "text",
1664+
"axisLabel": "",
1665+
"axisPlacement": "auto",
1666+
"barAlignment": 0,
1667+
"drawStyle": "line",
1668+
"fillOpacity": 0,
1669+
"gradientMode": "none",
1670+
"hideFrom": {
1671+
"legend": false,
1672+
"tooltip": false,
1673+
"viz": false
1674+
},
1675+
"lineInterpolation": "linear",
1676+
"lineWidth": 1,
1677+
"pointSize": 5,
1678+
"scaleDistribution": {
1679+
"type": "linear"
1680+
},
1681+
"showPoints": "auto",
1682+
"spanNulls": false,
1683+
"stacking": {
1684+
"group": "A",
1685+
"mode": "none"
1686+
},
1687+
"thresholdsStyle": {
1688+
"mode": "off"
1689+
}
1690+
},
1691+
"mappings": [],
1692+
"thresholds": {
1693+
"mode": "absolute",
1694+
"steps": [
1695+
{
1696+
"color": "green"
1697+
},
1698+
{
1699+
"color": "red",
1700+
"value": 80
1701+
}
1702+
]
1703+
}
1704+
},
1705+
"overrides": []
1706+
},
1707+
"gridPos": {
1708+
"h": 8,
1709+
"w": 12,
1710+
"x": 0,
1711+
"y": 90
1712+
},
1713+
"id": 234,
1714+
"options": {
1715+
"legend": {
1716+
"calcs": [],
1717+
"displayMode": "list",
1718+
"placement": "bottom",
1719+
"showLegend": true
1720+
},
1721+
"tooltip": {
1722+
"maxHeight": 600,
1723+
"mode": "single",
1724+
"sort": "none"
1725+
}
1726+
},
1727+
"targets": [
1728+
{
1729+
"datasource": {
1730+
"type": "prometheus",
1731+
"uid": "${DS_PROMETHEUS}"
1732+
},
1733+
"expr": "metrics_ApplicationTotalCount_Value{instance=~\"${instance}\"}",
1734+
"legendFormat": "${baseLegend}",
1735+
"range": true,
1736+
"refId": "A"
1737+
}
1738+
],
1739+
"title": "metrics_ApplicationTotalCount_Value",
1740+
"type": "timeseries"
1741+
},
1742+
{
1743+
"datasource": {
1744+
"type": "prometheus",
1745+
"uid": "${DS_PROMETHEUS}"
1746+
},
1747+
"description": "The count of application fallbacks.",
1748+
"fieldConfig": {
1749+
"defaults": {
1750+
"color": {
1751+
"mode": "palette-classic"
1752+
},
1753+
"custom": {
1754+
"axisCenteredZero": false,
1755+
"axisColorMode": "text",
1756+
"axisLabel": "",
1757+
"axisPlacement": "auto",
1758+
"barAlignment": 0,
1759+
"drawStyle": "line",
1760+
"fillOpacity": 0,
1761+
"gradientMode": "none",
1762+
"hideFrom": {
1763+
"legend": false,
1764+
"tooltip": false,
1765+
"viz": false
1766+
},
1767+
"lineInterpolation": "linear",
1768+
"lineWidth": 1,
1769+
"pointSize": 5,
1770+
"scaleDistribution": {
1771+
"type": "linear"
1772+
},
1773+
"showPoints": "auto",
1774+
"spanNulls": false,
1775+
"stacking": {
1776+
"group": "A",
1777+
"mode": "none"
1778+
},
1779+
"thresholdsStyle": {
1780+
"mode": "off"
1781+
}
1782+
},
1783+
"mappings": [],
1784+
"thresholds": {
1785+
"mode": "absolute",
1786+
"steps": [
1787+
{
1788+
"color": "green"
1789+
}
1790+
]
1791+
}
1792+
},
1793+
"overrides": []
1794+
},
1795+
"gridPos": {
1796+
"h": 8,
1797+
"w": 12,
1798+
"x": 12,
1799+
"y": 90
1800+
},
1801+
"id": 235,
1802+
"options": {
1803+
"legend": {
1804+
"calcs": [],
1805+
"displayMode": "list",
1806+
"placement": "bottom",
1807+
"showLegend": true
1808+
},
1809+
"tooltip": {
1810+
"maxHeight": 600,
1811+
"mode": "single",
1812+
"sort": "none"
1813+
}
1814+
},
1815+
"targets": [
1816+
{
1817+
"datasource": {
1818+
"type": "prometheus",
1819+
"uid": "${DS_PROMETHEUS}"
1820+
},
1821+
"expr": "metrics_ApplicationFallbackCount_Value{instance=~\"${instance}\"}",
1822+
"legendFormat": "${baseLegend}",
1823+
"range": true,
1824+
"refId": "A"
1825+
}
1826+
],
1827+
"title": "metrics_ApplicationFallbackCount_Value",
1828+
"type": "timeseries"
16491829
}
16501830
],
16511831
"title": "Master",

client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public void registerJob(JobShuffleContext context) {
102102
celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
103103
LOG.info("CelebornAppId: {}", celebornAppId);
104104
lifecycleManager = new LifecycleManager(celebornAppId, conf);
105+
lifecycleManager.applicationCount().increment();
105106
this.shuffleResourceTracker = new ShuffleResourceTracker(executor, lifecycleManager);
106107
}
107108
}
@@ -113,7 +114,10 @@ public void registerJob(JobShuffleContext context) {
113114
ShuffleFallbackPolicyRunner.getActivatedFallbackPolicy(context, conf, lifecycleManager);
114115
if (shuffleFallbackPolicy.isPresent()) {
115116
LOG.warn("Fallback to vanilla Flink NettyShuffleMaster for job: {}.", jobID);
116-
jobFallbackPolicies.put(jobID, shuffleFallbackPolicy.get().getClass().getName());
117+
String jobFallbackPolicy = shuffleFallbackPolicy.get().getClass().getName();
118+
jobFallbackPolicies.put(jobID, jobFallbackPolicy);
119+
lifecycleManager.computeFallbackCounts(
120+
lifecycleManager.applicationFallbackCounts(), jobFallbackPolicy);
117121
nettyShuffleMaster().registerJob(context);
118122
return;
119123
}
@@ -161,9 +165,8 @@ public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(
161165
String jobFallbackPolicy = jobFallbackPolicies.get(jobID);
162166
if (jobFallbackPolicy != null) {
163167
try {
164-
lifecycleManager
165-
.shuffleFallbackCounts()
166-
.compute(jobFallbackPolicy, (key, value) -> value == null ? 1L : value + 1L);
168+
lifecycleManager.computeFallbackCounts(
169+
lifecycleManager.shuffleFallbackCounts(), jobFallbackPolicy);
167170
return nettyShuffleMaster()
168171
.registerPartitionWithProducer(jobID, partitionDescriptor, producerDescriptor)
169172
.get();

client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ConcurrentHashMap;
2323

2424
import scala.Int;
25+
import scala.Option;
2526

2627
import org.apache.spark.*;
2728
import org.apache.spark.internal.config.package$;
@@ -58,6 +59,8 @@ public class SparkShuffleManager implements ShuffleManager {
5859
private volatile SortShuffleManager _sortShuffleManager;
5960
private final ConcurrentHashMap.KeySetView<Integer, Boolean> sortShuffleIds =
6061
ConcurrentHashMap.newKeySet();
62+
private final ConcurrentHashMap.KeySetView<String, Boolean> fallbackApps =
63+
ConcurrentHashMap.newKeySet();
6164
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
6265

6366
private long sendBufferPoolCheckInterval;
@@ -97,6 +100,7 @@ private void initializeLifecycleManager(String appId) {
97100
if (lifecycleManager == null) {
98101
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
99102
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
103+
lifecycleManager.applicationCount().increment();
100104
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
101105
if (celebornConf.clientStageRerunEnabled()) {
102106
MapOutputTrackerMaster mapOutputTracker =
@@ -119,9 +123,19 @@ public <K, V, C> ShuffleHandle registerShuffle(
119123
initializeLifecycleManager(appId);
120124

121125
lifecycleManager.shuffleCount().increment();
122-
if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) {
126+
Option<ShuffleFallbackPolicy> fallbackPolicyOpt =
127+
fallbackPolicyRunner.getActivatedFallbackPolicy(dependency, lifecycleManager);
128+
if (fallbackPolicyOpt.isDefined()) {
123129
logger.warn("Fallback to SortShuffleManager!");
124130
sortShuffleIds.add(shuffleId);
131+
String shuffleFallbackPolicy = fallbackPolicyOpt.get().getClass().getName();
132+
lifecycleManager.computeFallbackCounts(
133+
lifecycleManager.shuffleFallbackCounts(), shuffleFallbackPolicy);
134+
if (!fallbackApps.contains(appId)) {
135+
fallbackApps.add(appId);
136+
lifecycleManager.computeFallbackCounts(
137+
lifecycleManager.applicationFallbackCounts(), shuffleFallbackPolicy);
138+
}
125139
return sortShuffleManager().registerShuffle(shuffleId, numMaps, dependency);
126140
} else {
127141
lifecycleManager.registerAppShuffleDeterminate(

client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala

+6-20
Original file line numberDiff line numberDiff line change
@@ -34,29 +34,15 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
3434
private val shuffleFallbackPolicies =
3535
ShuffleFallbackPolicyFactory.getShuffleFallbackPolicies.asScala
3636

37-
def applyFallbackPolicies[K, V, C](
37+
def getActivatedFallbackPolicy[K, V, C](
3838
dependency: ShuffleDependency[K, V, C],
39-
lifecycleManager: LifecycleManager): Boolean = {
39+
lifecycleManager: LifecycleManager): Option[ShuffleFallbackPolicy] = {
4040
val fallbackPolicy =
4141
shuffleFallbackPolicies.find(_.needFallback(dependency, conf, lifecycleManager))
42-
if (fallbackPolicy.isDefined) {
43-
if (FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
44-
throw new CelebornIOException(
45-
"Fallback to spark built-in shuffle implementation is prohibited.")
46-
} else {
47-
lifecycleManager.shuffleFallbackCounts.compute(
48-
fallbackPolicy.get.getClass.getName,
49-
new BiFunction[String, java.lang.Long, java.lang.Long] {
50-
override def apply(k: String, v: java.lang.Long): java.lang.Long = {
51-
if (v == null) {
52-
1L
53-
} else {
54-
v + 1L
55-
}
56-
}
57-
})
58-
}
42+
if (fallbackPolicy.isDefined && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
43+
throw new CelebornIOException(
44+
"Fallback to spark built-in shuffle implementation is prohibited.")
5945
}
60-
fallbackPolicy.isDefined
46+
fallbackPolicy
6147
}
6248
}

client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.Objects;
2222
import java.util.concurrent.ConcurrentHashMap;
2323

24+
import scala.Option;
25+
2426
import org.apache.spark.*;
2527
import org.apache.spark.internal.config.package$;
2628
import org.apache.spark.launcher.SparkLauncher;
@@ -82,6 +84,8 @@ public class SparkShuffleManager implements ShuffleManager {
8284
private volatile SortShuffleManager _sortShuffleManager;
8385
private final ConcurrentHashMap.KeySetView<Integer, Boolean> sortShuffleIds =
8486
ConcurrentHashMap.newKeySet();
87+
private final ConcurrentHashMap.KeySetView<String, Boolean> fallbackApps =
88+
ConcurrentHashMap.newKeySet();
8589
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
8690

8791
private long sendBufferPoolCheckInterval;
@@ -139,6 +143,7 @@ private void initializeLifecycleManager(String appId) {
139143
if (lifecycleManager == null) {
140144
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
141145
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
146+
lifecycleManager.applicationCount().increment();
142147
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
143148
if (celebornConf.clientStageRerunEnabled()) {
144149
MapOutputTrackerMaster mapOutputTracker =
@@ -162,7 +167,9 @@ public <K, V, C> ShuffleHandle registerShuffle(
162167
initializeLifecycleManager(appId);
163168

164169
lifecycleManager.shuffleCount().increment();
165-
if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) {
170+
Option<ShuffleFallbackPolicy> fallbackPolicyOpt =
171+
fallbackPolicyRunner.getActivatedFallbackPolicy(dependency, lifecycleManager);
172+
if (fallbackPolicyOpt.isDefined()) {
166173
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
167174
&& !conf.getBoolean("spark.shuffle.service.enabled", false)) {
168175
logger.error(
@@ -174,6 +181,14 @@ public <K, V, C> ShuffleHandle registerShuffle(
174181
logger.warn("Fallback to vanilla Spark SortShuffleManager for shuffle: {}", shuffleId);
175182
}
176183
sortShuffleIds.add(shuffleId);
184+
String shuffleFallbackPolicy = fallbackPolicyOpt.get().getClass().getName();
185+
lifecycleManager.computeFallbackCounts(
186+
lifecycleManager.shuffleFallbackCounts(), shuffleFallbackPolicy);
187+
if (!fallbackApps.contains(appId)) {
188+
fallbackApps.add(appId);
189+
lifecycleManager.computeFallbackCounts(
190+
lifecycleManager.applicationFallbackCounts(), shuffleFallbackPolicy);
191+
}
177192
return sortShuffleManager().registerShuffle(shuffleId, dependency);
178193
} else {
179194
lifecycleManager.registerAppShuffleDeterminate(

client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala

+6-18
Original file line numberDiff line numberDiff line change
@@ -32,27 +32,15 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
3232
private val shuffleFallbackPolicies =
3333
ShuffleFallbackPolicyFactory.getShuffleFallbackPolicies.asScala
3434

35-
def applyFallbackPolicies[K, V, C](
35+
def getActivatedFallbackPolicy[K, V, C](
3636
dependency: ShuffleDependency[K, V, C],
37-
lifecycleManager: LifecycleManager): Boolean = {
37+
lifecycleManager: LifecycleManager): Option[ShuffleFallbackPolicy] = {
3838
val fallbackPolicy =
3939
shuffleFallbackPolicies.find(_.needFallback(dependency, conf, lifecycleManager))
40-
if (fallbackPolicy.isDefined) {
41-
if (FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
42-
throw new CelebornIOException(
43-
"Fallback to spark built-in shuffle implementation is prohibited.")
44-
} else {
45-
lifecycleManager.shuffleFallbackCounts.compute(
46-
fallbackPolicy.get.getClass.getName,
47-
(_, v) => {
48-
if (v == null) {
49-
1L
50-
} else {
51-
v + 1L
52-
}
53-
})
54-
}
40+
if (fallbackPolicy.isDefined && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
41+
throw new CelebornIOException(
42+
"Fallback to spark built-in shuffle implementation is prohibited.")
5543
}
56-
fallbackPolicy.isDefined
44+
fallbackPolicy
5745
}
5846
}

0 commit comments

Comments
 (0)