This repository has been archived by the owner on Jan 18, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRRD.py
336 lines (302 loc) · 10.9 KB
/
RRD.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import subprocess
import re
import io
import os
from tempfile import TemporaryFile
from operator import xor, eq
from functools import reduce
from itertools import starmap
import math
class RRDIncompatibleException(Exception):
"""
Is raised when an RRD doesn't have the desired definition and cannot be
upgraded to it.
"""
pass
class RRDOutdatedException(Exception):
"""
Is raised when an RRD doesn't have the desired definition, but can be
upgraded to it.
"""
pass
if not hasattr(__builtins__, "FileNotFoundError"):
class FileNotFoundError(Exception):
pass
class RRD:
"""
An RRD is a Round Robin Database, a database which forgets old data and
aggregates multiple records into new ones.
It contains multiple Data Sources (DS) which can be thought of as columns,
and Round Robin Archives (RRA) which can be thought of as tables with the
DS as columns and time-dependant rows.
"""
# rra[2].cdp_prep[0].value = 1,8583033333e+03
_info_regex = re.compile("""
(?P<section>[a-z_]+)
\[ (?P<key>[a-zA-Z0-9_]+) \]
\.
|
(?P<name>[a-z_]+)
\s*=\s*
"? (?P<value>.*?) "?
$""", re.X)
_cached_info = None
def _exec_rrdtool(self, cmd, *args, **kwargs):
pargs = ["rrdtool", cmd, self.filename]
for k,v in kwargs.items():
pargs.extend(["--" + k, str(v)])
pargs.extend(args)
subprocess.check_output(pargs)
def __init__(self, filename):
self.filename = filename
def ensureSanity(self, ds_list, rra_list, **kwargs):
"""
Create or upgrade the RRD file if necessary to contain all DS in
ds_list. If it needs to be created, the RRAs in rra_list and any kwargs
will be used for creation. Note that RRAs and options of an existing
database are NOT modified!
"""
try:
self.checkSanity(ds_list)
except FileNotFoundError:
self.create(ds_list, rra_list, **kwargs)
except RRDOutdatedException:
self.upgrade(ds_list)
def checkSanity(self, ds_list=()):
"""
Check if the RRD file exists and contains (at least) the DS listed in
ds_list.
"""
if not os.path.exists(self.filename):
raise FileNotFoundError(self.filename)
info = self.info()
if set(ds_list) - set(info['ds'].values()) != set():
for ds in ds_list:
if ds.name in info['ds'] and ds.type != info['ds'][ds.name].type:
raise RRDIncompatibleException("%s is %s but should be %s" % (ds.name, ds.type, info['ds'][ds.name].type))
else:
raise RRDOutdatedException()
def upgrade(self, dss):
"""
Upgrade the DS definitions (!) of this RRD.
(To update its values, use update())
The list dss contains DSS objects to be updated or added. The
parameters of a DS can be changed, but not its type. New DS are always
added at the end in the order of their appearance in the list.
This is done internally via an rrdtool dump -> rrdtool restore and
modifying the dump on the fly.
"""
info = self.info()
new_ds = list(info['ds'].values())
new_ds.sort(key=lambda ds: ds.index)
for ds in dss:
if ds.name in info['ds']:
old_ds = info['ds'][ds.name]
if info['ds'][ds.name].type != ds.type:
raise RuntimeError('Cannot convert existing DS "%s" from type "%s" to "%s"' %
(ds.name, old_ds.type, ds.type))
ds.index = old_ds.index
new_ds[ds.index] = ds
else:
ds.index = len(new_ds)
new_ds.append(ds)
added_ds_num = len(new_ds) - len(info['ds'])
dump = subprocess.Popen(
["rrdtool", "dump", self.filename],
stdout=subprocess.PIPE
)
restore = subprocess.Popen(
["rrdtool", "restore", "-", self.filename + ".new"],
stdin=subprocess.PIPE
)
echo = True
ds_definitions = True
for line in dump.stdout:
if ds_definitions and b'<ds>' in line:
echo = False
if b'<!-- Round Robin Archives -->' in line:
ds_definitions = False
for ds in new_ds:
restore.stdin.write(bytes("""
<ds>
<name> %s </name>
<type> %s </type>
<minimal_heartbeat>%i</minimal_heartbeat>
<min>%s</min>
<max>%s</max>
<!-- PDP Status -->
<last_ds>%s</last_ds>
<value>%s</value>
<unknown_sec> %i </unknown_sec>
</ds>
""" % (
ds.name,
ds.type,
ds.args[0],
ds.args[1],
ds.args[2],
ds.last_ds,
ds.value,
ds.unknown_sec)
, "utf-8"))
if b'</cdp_prep>' in line:
restore.stdin.write(added_ds_num*b"""
<ds>
<primary_value> NaN </primary_value>
<secondary_value> NaN </secondary_value>
<value> NaN </value>
<unknown_datapoints> 0 </unknown_datapoints>
</ds>
""")
# echoing of input line
if echo:
restore.stdin.write(
line.replace(
b'</row>',
(added_ds_num*b'<v>NaN</v>')+b'</row>'
)
)
if ds_definitions and b'</ds>' in line:
echo = True
dump.stdout.close()
restore.stdin.close()
dump.wait()
restore.wait()
os.rename(self.filename + ".new", self.filename)
self._cached_info = None
def create(self, ds_list, rra_list, **kwargs):
"""
Create a new RRD file with the specified list of RRAs and DSs.
Any kwargs are passed as --key=value to rrdtool create.
"""
self._exec_rrdtool(
"create",
*map(str, rra_list + ds_list),
**kwargs
)
self._cached_info = None
def update(self, V):
"""
Update the RRD with new values V.
V can be either list or dict:
* If it is a dict, its keys must be DS names in the RRD and it is
ensured that the correct DS are updated with the correct values, by
passing a "template" to rrdtool update (see man rrdupdate).
* If it is a list, no template is generated and the order of the
values in V must be the same as that of the DS in the RRD.
"""
try:
args = ['N:' + ':'.join(map(str, V.values()))]
kwargs = {'template': ':'.join(V.keys())}
except AttributeError:
args = ['N:' + ':'.join(map(str, V))]
kwargs = {}
self._exec_rrdtool("update", *args, **kwargs)
self._cached_info = None
def info(self):
"""
Return a dictionary with information about the RRD.
See `man rrdinfo` for more details.
"""
if self._cached_info:
return self._cached_info
env = os.environ.copy()
env["LC_ALL"] = "C"
proc = subprocess.Popen(
["rrdtool", "info", self.filename],
stdout=subprocess.PIPE,
env=env
)
out, err = proc.communicate()
out = out.decode()
info = {}
for line in out.splitlines():
base = info
for match in self._info_regex.finditer(line):
section, key, name, value = match.group("section", "key", "name", "value")
if section and key:
try:
key = int(key)
except ValueError:
pass
if section not in base:
base[section] = {}
if key not in base[section]:
base[section][key] = {}
base = base[section][key]
if name and value:
try:
base[name] = int(value)
except ValueError:
try:
base[name] = float(value)
except:
base[name] = value
dss = {}
for name, ds in info['ds'].items():
ds_obj = DS(name, ds['type'], ds['minimal_heartbeat'], ds['min'], ds['max'])
ds_obj.index = ds['index']
ds_obj.last_ds = ds['last_ds']
ds_obj.value = ds['value']
ds_obj.unknown_sec = ds['unknown_sec']
dss[name] = ds_obj
info['ds'] = dss
rras = []
for rra in info['rra'].values():
rras.append(RRA(rra['cf'], rra['xff'], rra['pdp_per_row'], rra['rows']))
info['rra'] = rras
self._cached_info = info
return info
class DS:
"""
DS stands for Data Source and represents one line of data points in a Round
Robin Database (RRD).
"""
name = None
type = None
args = []
index = -1
last_ds = 'U'
value = 0
unknown_sec = 0
def __init__(self, name, dst, *args):
self.name = name
self.type = dst
self.args = args
def __str__(self):
return "DS:%s:%s:%s" % (
self.name,
self.type,
":".join(map(str, self._nan_to_U_args()))
)
def __repr__(self):
return "%s(%r, %r, %s)" % (
self.__class__.__name__,
self.name,
self.type,
", ".join(map(repr, self.args))
)
def __eq__(self, other):
return all(starmap(eq, zip(self._compare_keys(), other._compare_keys())))
def __hash__(self):
return reduce(xor, map(hash, self._compare_keys()))
def _nan_to_U_args(self):
return tuple(
'U' if type(arg) is float and math.isnan(arg)
else arg
for arg in self.args
)
def _compare_keys(self):
return (self.name, self.type, self._nan_to_U_args())
class RRA:
def __init__(self, cf, *args):
self.cf = cf
self.args = args
def __str__(self):
return "RRA:%s:%s" % (self.cf, ":".join(map(str, self.args)))
def __repr__(self):
return "%s(%r, %s)" % (
self.__class__.__name__,
self.cf,
", ".join(map(repr, self.args))
)