-
Notifications
You must be signed in to change notification settings - Fork 29.2k
Expand file tree
/
Copy pathmetrics.py
More file actions
316 lines (257 loc) · 10.4 KB
/
metrics.py
File metadata and controls
316 lines (257 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
.. connect:: true
"""
import abc
import dataclasses
from typing import Optional, List, Tuple, Dict, Any, Union, TYPE_CHECKING, Sequence
from pyspark.errors import PySparkValueError
if TYPE_CHECKING:
try:
import graphviz # type: ignore
except ImportError:
pass
class ObservedMetrics(abc.ABC):
@property
@abc.abstractmethod
def name(self) -> str: ...
@property
@abc.abstractmethod
def pairs(self) -> Dict[str, Any]: ...
@property
@abc.abstractmethod
def keys(self) -> List[str]: ...
class MetricValue:
"""The metric values is the Python representation of a plan metric value from the JVM.
However, it does not have any reference to the original value."""
def __init__(self, name: str, value: Union[int, float], type: str):
self._name = name
self._type = type
self._value = value
def __repr__(self) -> str:
return f"<{self._name}={self._value} ({self._type})>"
@property
def name(self) -> str:
return self._name
@property
def value(self) -> Union[int, float]:
return self._value
@property
def metric_type(self) -> str:
return self._type
def to_dict(self) -> Dict[str, Any]:
"""Return a JSON-serializable dictionary representation of this metric value.
Returns
-------
dict
A dictionary with keys 'name', 'value', and 'type'.
"""
return {
"name": self._name,
"value": self._value,
"type": self._type,
}
class PlanMetrics:
"""Represents a particular plan node and the associated metrics of this node."""
def __init__(self, name: str, id: int, parent: int, metrics: List[MetricValue]):
self._name = name
self._id = id
self._parent_id = parent
self._metrics = metrics
def __repr__(self) -> str:
return f"Plan({self._name}: {self._id}->{self._parent_id})={self._metrics}"
@property
def name(self) -> str:
return self._name
@property
def plan_id(self) -> int:
return self._id
@property
def parent_plan_id(self) -> int:
return self._parent_id
@property
def metrics(self) -> List[MetricValue]:
return self._metrics
def to_dict(self) -> Dict[str, Any]:
"""Return a JSON-serializable dictionary representation of this plan metrics.
Returns
-------
dict
A dictionary with keys 'name', 'plan_id', 'parent_plan_id', and 'metrics'.
"""
return {
"name": self._name,
"plan_id": self._id,
"parent_plan_id": self._parent_id,
"metrics": [m.to_dict() for m in self._metrics],
}
class CollectedMetrics:
@dataclasses.dataclass
class Node:
id: int
name: str = dataclasses.field(default="")
metrics: List[MetricValue] = dataclasses.field(default_factory=list)
children: List[int] = dataclasses.field(default_factory=list)
def text(self, current: "Node", graph: Dict[int, "Node"], prefix: str = "") -> str:
"""
Converts the current node and its children into a textual representation. This is used
to provide a usable output for the command line or other text-based interfaces. However,
it is recommended to use the Graphviz representation for a more visual representation.
Parameters
----------
current: Node
Current node in the graph.
graph: dict
A dictionary representing the full graph mapping from node ID (int) to the node itself.
The node is an instance of :class:`CollectedMetrics:Node`.
prefix: str
String prefix used for generating the output buffer.
Returns
-------
The full string representation of the current node as root.
"""
base_metrics = set(["numPartitions", "peakMemory", "numOutputRows", "spillSize"])
# Format the metrics of this node:
metric_buffer = []
for m in current.metrics:
if m.name in base_metrics:
metric_buffer.append(f"{m.name}: {m.value} ({m.metric_type})")
buffer = f"{prefix}+- {current.name}({','.join(metric_buffer)})\n"
for i, child in enumerate(current.children):
c = graph[child]
new_prefix = prefix + " " if i == len(c.children) - 1 else prefix
if current.id != c.id:
buffer += self.text(c, graph, new_prefix)
return buffer
def __init__(self, metrics: List[PlanMetrics]):
# Sort the input list
self._metrics = sorted(metrics, key=lambda x: x._parent_id, reverse=False)
def extract_graph(self) -> Tuple[int, Dict[int, "CollectedMetrics.Node"]]:
"""
Builds the graph of the query plan. The graph is represented as a dictionary where the key
is the node ID and the value is the node itself. The root node is the node that has no
parent.
Returns
-------
The root node ID and the graph of all nodes.
"""
all_nodes: Dict[int, CollectedMetrics.Node] = {}
for m in self._metrics:
# Add yourself to the list if you have to.
if m.plan_id not in all_nodes:
all_nodes[m.plan_id] = CollectedMetrics.Node(m.plan_id, m.name, m.metrics)
else:
all_nodes[m.plan_id].name = m.name
all_nodes[m.plan_id].metrics = m.metrics
# Now check for the parent of this node if it's in
if m.parent_plan_id not in all_nodes:
all_nodes[m.parent_plan_id] = CollectedMetrics.Node(m.parent_plan_id)
all_nodes[m.parent_plan_id].children.append(m.plan_id)
# Next step is to find all the root nodes. Root nodes are never used in children.
# So we start with all node ids as candidates.
candidates = set(all_nodes.keys())
for k, v in all_nodes.items():
for c in v.children:
if c in candidates and c != k:
candidates.remove(c)
assert len(candidates) == 1, f"Expected 1 root node, found {len(candidates)}"
return candidates.pop(), all_nodes
def toText(self) -> str:
"""
Converts the execution graph from a graph into a textual representation
that can be read at the command line for example.
Returns
-------
A string representation of the collected metrics.
"""
root, graph = self.extract_graph()
return self.text(graph[root], graph)
def toDot(self, filename: Optional[str] = None, out_format: str = "png") -> "graphviz.Digraph":
"""
Converts the collected metrics into a dot representation. Since the graphviz Digraph
implementation provides the ability to render the result graph directory in a
notebook, we return the graph object directly.
If the graphviz package is not available, a PACKAGE_NOT_INSTALLED error is raised.
Parameters
----------
filename : str, optional
The filename to save the graph to given an output format. The path can be
relative or absolute.
out_format : str
The output format of the graph. The default is 'png'.
Returns
-------
An instance of the graphviz.Digraph object.
"""
try:
import graphviz
dot = graphviz.Digraph(
comment="Query Plan",
node_attr={
"shape": "box",
"font-size": "10pt",
},
)
root, graph = self.extract_graph()
for k, v in graph.items():
# Build table rows for the metrics
rows = "\n".join(
[
(
f'<TR><TD><FONT POINT-SIZE="8">{x.name}</FONT></TD><TD>'
f'<FONT POINT-SIZE="8">{x.value} ({x.metric_type})</FONT></TD></TR>'
)
for x in v.metrics
]
)
dot.node(
str(k),
"""<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR>
<TD COLSPAN="2" BGCOLOR="lightgrey">
<FONT POINT-SIZE=\"10\">{}</FONT>
</TD>
</TR>
<TR><TD COLSPAN="2"><FONT POINT-SIZE=\"10\">Metrics</FONT></TD></TR>
{}
</TABLE>>""".format(v.name, rows),
)
for c in v.children:
dot.edge(str(k), str(c))
if filename:
dot.render(filename, format=out_format, cleanup=True)
return dot
except ImportError:
raise PySparkValueError(
errorClass="PACKAGE_NOT_INSTALLED",
messageParameters={"package_name": "graphviz", "minimum_version": "0.20"},
)
class ExecutionInfo:
"""The ExecutionInfo class allows users to inspect the query execution of this particular
data frame. This value is only set in the data frame if it was executed."""
def __init__(
self, metrics: Optional[list[PlanMetrics]], obs: Optional[Sequence[ObservedMetrics]]
):
self._metrics = CollectedMetrics(metrics) if metrics else None
self._observations = obs if obs else []
@property
def metrics(self) -> Optional[CollectedMetrics]:
return self._metrics
@property
def flows(self) -> List[Tuple[str, Dict[str, Any]]]:
return [(f.name, f.pairs) for f in self._observations]