-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathutils.py
601 lines (536 loc) · 21.2 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
from __future__ import print_function
import threading
import Queue
import time
import logging
LOGGER = logging.getLogger(__name__)
class CheckCounter(object):
def __init__(self, name, must_change=True, required=False):
"""
:param name: the name of the counter
:param must_change: this counter should be changing
:param required: is this counter required?
"""
self.name = name
self.must_change = must_change
self.required = required
self.data = None
self.changed = False
def create_meta_dictionary(metalist):
"""
Build a meta information dictionary from a provided raw meta info list.
:param metalist: a list of all meta information about the system
:return: a dictionary of device info, keyed by unique device name
"""
meta_items = {}
try:
for name, tag, param, value in metalist:
if name not in meta_items:
meta_items[name] = {}
try:
if meta_items[name]['tag'] != tag:
raise ValueError(
'Different tags - %s, %s - for the same item %s' % (
meta_items[name]['tag'], tag, name))
except KeyError:
meta_items[name]['tag'] = tag
meta_items[name][param] = value
except ValueError as e:
for ctr, contents in enumerate(metalist):
print(ctr, end='')
print(contents)
raise e
return meta_items
def get_kwarg(field, kwargs, default=None):
try:
return kwargs[field]
except KeyError:
return default
def get_hostname(**kwargs):
"""
:param kwargs:
"""
host = kwargs['host']
bitstream = get_kwarg('bitstream', kwargs)
if ',' in host:
host, bitstream = host.split(',')
return host, bitstream
def parse_fpg(filename):
"""
Read the meta information from the FPG file.
:param filename: the name of the fpg file to parse
:return: device info dictionary, memory map info (coreinfo.tab) dictionary
"""
LOGGER.debug('Parsing file %s for system information' % filename)
if filename is not None:
fptr = open(filename, 'r')
firstline = fptr.readline().strip().rstrip('\n')
if firstline != '#!/bin/kcpfpg':
fptr.close()
raise RuntimeError('%s does not look like an fpg file we can '
'parse.' % filename)
else:
raise IOError('No such file %s' % filename)
memorydict = {}
metalist = []
while True:
line = fptr.readline().strip().rstrip('\n')
if line.lstrip().rstrip() == '?quit':
break
elif line.startswith('?meta'):
# some versions of mlib_devel may mistakenly have put spaces
# as delimiters where tabs should have been used. Rectify that
# here.
if line.startswith('?meta '):
LOGGER.warn('An old version of mlib_devel generated %s. Please '
'update. Meta fields are seperated by spaces, '
'should be tabs.' % filename)
line = line.replace(' ', '\t')
# and carry on as usual.
line = line.replace('\_', ' ').replace('?meta', '')
line = line.replace('\n', '').lstrip().rstrip()
#line_split = line.split('\t')
# Rather split on any space
line_split = line.split()
name = line_split[0]
tag = line_split[1]
param = line_split[2]
if len(line_split[3:]) == 1:
value = line_split[3:][0]
else:
value = ' '.join(line_split[3:])
# name, tag, param, value = line.split('\t')
name = name.replace('/', '_')
metalist.append((name, tag, param, value))
elif line.startswith('?register'):
if line.startswith('?register '):
register = line.replace('\_', ' ').replace('?register ', '')
register = register.replace('\n', '').lstrip().rstrip()
name, address, size_bytes = register.split(' ')
elif line.startswith('?register\t'):
register = line.replace('\_', ' ').replace('?register\t', '')
register = register.replace('\n', '').lstrip().rstrip()
name, address, size_bytes = register.split('\t')
else:
raise ValueError('Cannot find ?register entries in '
'correct format.')
address = int(address, 16)
size_bytes = int(size_bytes, 16)
if name in memorydict.keys():
raise RuntimeError('%s: mem device %s already in '
'dictionary' % (filename, name))
memorydict[name] = {'address': address, 'bytes': size_bytes}
fptr.close()
return create_meta_dictionary(metalist), memorydict
def get_git_info_from_fpg(fpg_file):
"""
Method to get git info from an fpg-file's header
:param fpg_file: filename as string
:return: Dictionary of git_info
- key = git-repo
- value = git-version
"""
git_tag = '77777_git'
git_info_dict = None
# This returns a tuple of dictionaries,
# - device info dict, memory map info (coreinfo.tab) dict
# - Git info is meta-data, and should only ever be in the
# first dictionary of the tuple
fpg_header = parse_fpg(fpg_file)
fpg_metadata = fpg_header[0]
git_info_dict = fpg_metadata.get(git_tag, None)
try:
git_info_dict.pop('tag')
return git_info_dict
except KeyError:
# tag: rcs entry isn't there, no worries
return git_info_dict
def pull_info_from_fpg(fpg_file, parameter):
"""
Pull available parameters about x-engine or f-engine from .fpg file.
Available options for x-engine: 'x_fpga_clock', 'xeng_outbits',
'xeng_accumulation_len'
Available options for f-engine: 'n_chans', 'quant_format', 'spead_flavour'
:param fpg_file: bit file path
:param parameter: parameter string
:return: pattern value (string)
"""
match = []
fpg_dict = parse_fpg(fpg_file)
if parameter == 'x_fpga_clock':
match = str(int(fpg_dict[0]['XSG_core_config']['clk_rate'])*10**6)
if parameter == 'xeng_outbits':
match = fpg_dict[0]['sys0_vacc']['n_bits']
if parameter == 'xeng_accumulation_len':
match = fpg_dict[0]['sys0_xeng']['acc_len']
if parameter == 'spead_flavour':
match1 = fpg_dict[0]['pack_spead_pack0']['spead_msw']
match2 = fpg_dict[0]['pack_spead_pack0']['spead_lsw']
s = ','
match = s.join([match1, match2])
if parameter == 'quant_format':
match1 = fpg_dict[0]['snap_quant0']['io_widths']
match2 = fpg_dict[0]['snap_quant0']['io_bps']
s = '.'
match = s.join([match1[1], match2[1]])
if parameter == 'n_chans':
pfb_dict = fpg_dict[0]['pfb_fft_wideband_real_fft_biplex_real_4x']
match1 = int(pfb_dict['fftsize'])
match2 = int(pfb_dict['n_inputs'])
match = match2*2**match1
if match is []:
errstr = 'Parameter %s does not match any field in fpg ' \
'file.' % parameter
LOGGER.error(errstr)
raise RuntimeError(errstr)
return match
def check_changing_status(counters, data_function, wait_time, num_checks):
"""
Check a changing set of status fields.
:param counters: a list of CheckCounters
:param data_function: a function that will return a single value for the
fields from field_dict
:param wait_time: seconds to wait between calls to data_function
:param num_checks: times to run data_function
"""
# fields = [
# CheckCounter('test_required_diff', False, True),
# CheckCounter('test_required_same', True, True),
# CheckCounter('test_diff', False, True),
# CheckCounter('test_same', True, True),
# ]
#
# def get_data():
# res = {}
# res['test_required_diff'] = time.time()
# res['test_required_same'] = 123456
# res['test_diff'] = time.time()
# res['test_same'] = 654321
# return res
if num_checks < 2:
raise ValueError('num_checks of less than two makes no sense')
# check that required data fields are returned by the data function
change_required = {}
d = data_function()
to_remove = []
for checkctr in counters:
if checkctr.name not in d:
if checkctr.required:
return False, 'required field %s not found' % checkctr.name
to_remove.append(checkctr.name)
else:
checkctr.data = d[checkctr.name]
for name in to_remove:
for idx, checkctr in enumerate(counters):
if checkctr.name == name:
counters.pop(idx)
break
time.sleep(wait_time)
for loop in range(num_checks - 1):
dnew = data_function()
for checkctr in counters:
ctrnew = dnew[checkctr.name]
if checkctr.must_change:
# must change
if ctrnew != checkctr.data:
checkctr.changed = True
else:
# must stay the same
if ctrnew != checkctr.data:
return False, '%s changing: %.3f > %.3f' % (
checkctr.name, checkctr.data, ctrnew)
time.sleep(wait_time)
for checkctr in counters:
if checkctr.must_change and (not checkctr.changed):
return False, '%s is not changing: %.3f' % (
checkctr.name, checkctr.data)
return True, ''
def program_fpgas(fpga_list, progfile, timeout=10):
"""
Program more than one FPGA at the same time.
:param fpga_list: a list of objects for the FPGAs to be programmed
:param progfile: string, the file used to program the FPGAs
:param timeout: how long to wait for a response, in seconds
"""
stime = time.time()
if progfile is None:
for fpga in fpga_list:
try:
tuplen = len(fpga)
fpga[0].bitstream = fpga[1]
except TypeError:
pass
else:
for fpga in fpga_list:
fpga.bitstream = progfile
threaded_fpga_function(fpga_list, 60, 'upload_to_ram_and_program')
LOGGER.info('Programming %d FPGAs took %.3f seconds.' % (
len(fpga_list), time.time() - stime))
def threaded_create_fpgas_from_hosts(host_list, fpga_class=None,
port=7147, timeout=10,
best_effort=False, **kwargs):
"""
Create KatcpClientFpga objects in many threads, Moar FASTAAA!
:param fpga_class: the class to insantiate, usually CasperFpga
:param host_list: a comma-seperated list of hosts
:param port: the port on which to do network comms
:param timeout: how long to wait, in seconds
:param best_effort: return as many hosts as it was possible to make
"""
if fpga_class is None:
from casperfpga import CasperFpga
fpga_class = CasperFpga
num_hosts = len(host_list)
result_queue = Queue.Queue(maxsize=num_hosts)
thread_list = []
def makehost(hostname):
result_queue.put_nowait(fpga_class(hostname, port, **kwargs))
for host_ in host_list:
thread = threading.Thread(target=makehost, args=(host_,))
thread.daemon = True
thread.start()
thread_list.append(thread)
for thread_ in thread_list:
thread_.join(timeout)
fpgas = [None] * num_hosts
hosts_missing = host_list[:]
while True:
try:
result = result_queue.get_nowait()
host_pos = host_list.index(result.host)
fpgas[host_pos] = result
hosts_missing.pop(hosts_missing.index(result.host))
except Queue.Empty:
break
if hosts_missing:
for host in hosts_missing:
LOGGER.error('Could not create host %s.' % host)
errstr = 'Given %d hosts, only made %d CasperFpgas.' % (
num_hosts, num_hosts-len(hosts_missing))
LOGGER.error(errstr)
if best_effort:
rv = []
for fpga in fpgas:
if fpga is not None:
rv.append(fpga)
return rv
raise RuntimeError(errstr)
return fpgas
def _check_target_func(target_function):
"""
:param target_function:
"""
if isinstance(target_function, basestring):
return target_function, (), {}
try:
len(target_function)
except TypeError:
return target_function, (), {}
if len(target_function) == 3:
return target_function
elif len(target_function) == 1:
target_function = (target_function[0], (), {})
elif len(target_function) == 2:
target_function = (target_function[0], target_function[1], {})
else:
raise RuntimeError('target_function tuple too long? - (name, (), {})')
return target_function
def threaded_fpga_function(fpga_list, timeout, target_function):
"""
Thread the running of any CasperFpga function on a list of
CasperFpga objects. Much faster.
:param fpga_list: list of KatcpClientFpga objects
:param timeout: how long to wait before timing out
:param target_function: a tuple with three parts:
1. string, the KatcpClientFpga function to
run e.g. 'disconnect' for fpgaobj.disconnect()
2. tuple, the arguments to the function
3. dict, the keyword arguments to the function e.g. (func_name, (1,2,), {'another_arg': 3})
:return: a dictionary of the results, keyed on hostname
"""
target_function = _check_target_func(target_function)
def dofunc(fpga, *args, **kwargs):
try:
rv = getattr(fpga, target_function[0])(*args, **kwargs)
return rv
except AttributeError:
LOGGER.error('FPGA %s has no such function: %s' % (
fpga.host, target_function[0]))
raise
return threaded_fpga_operation(
fpga_list, timeout, (dofunc, target_function[1], target_function[2]))
def threaded_fpga_operation(fpga_list, timeout, target_function, num_retries=5, retry_sleep_time=5):
"""
Thread any operation against many FPGA objects
:param fpga_list: list of KatcpClientFpga objects
:param timeout: how long to wait before timing out
:param target_function: a tuple with three parts:
1. reference, the function object that must be
run - MUST take FPGA object as first argument
2. tuple, the arguments to the function
3. dict, the keyword arguments to the function e.g. (func_name, (1,2,), {'another_arg': 3})
:return: a dictionary of the results, keyed on hostname
"""
target_function = _check_target_func(target_function)
def jobfunc(resultq, fpga):
rv = target_function[0](fpga, *target_function[1], **target_function[2])
resultq.put_nowait((fpga.host, rv))
def run_threaded_op(int_fpga_list):
num_fpgas = len(int_fpga_list)
result_queue = Queue.Queue(maxsize=num_fpgas)
thread_list = []
for fpga_ in int_fpga_list:
thread = threading.Thread(target=jobfunc, args=(result_queue, fpga_))
thread.setDaemon(True)
thread.start()
thread_list.append(thread)
for thread_ in thread_list:
thread_.join(timeout)
if thread_.isAlive():
break
returnval = {}
hosts_missing = [fpga.host for fpga in int_fpga_list]
while True:
try:
result = result_queue.get_nowait()
returnval[result[0]] = result[1]
hosts_missing.pop(hosts_missing.index(result[0]))
except Queue.Empty:
break
return returnval, hosts_missing
retry = 0
current_fpga_list = fpga_list[:]
while retry < num_retries:
returnval, hosts_missing = run_threaded_op(current_fpga_list)
if hosts_missing:
#warnmsg = ('Ran function {} on hosts. Did not get a response '
# 'from {}.'.format(target_function[0].__name__, hosts_missing))
warnmsg = ('Ran function {} on hosts. Did not get a response '
'from {}.'.format(target_function[0].func_name, hosts_missing))
LOGGER.warning(warnmsg)
retry += 1
new_fpga_list = []
for fpga_obj in current_fpga_list:
if fpga_obj.host in hosts_missing:
new_fpga_list.append(fpga_obj)
current_fpga_list = new_fpga_list[:]
time.sleep(retry_sleep_time)
else:
break
if hosts_missing:
errmsg = 'Ran function \'%s\' on hosts. Did not get a response ' \
'from %s.' % (target_function[0].__name__, hosts_missing)
LOGGER.error(errmsg)
return returnval
def threaded_non_blocking_request(fpga_list, timeout, request, request_args):
"""
Make a non-blocking KatCP request to a list of KatcpClientFpgas, using
the Asynchronous client.
:param fpga_list: list of KatcpClientFpga objects
:param timeout: the request timeout
:param request: the request string
:param request_args: the arguments to the request, as a list
:return: a dictionary, keyed by hostname, of result dictionaries containing
reply and informs
"""
raise DeprecationWarning
num_fpgas = len(fpga_list)
reply_queue = Queue.Queue(maxsize=num_fpgas)
requests = {}
replies = {}
# reply callback
def reply_cb(host, req_id):
LOGGER.debug('Reply(%s) from host(%s)' % (req_id, host))
reply_queue.put_nowait([host, req_id])
# start the requests
LOGGER.debug('Send request(%s) to %i hosts.' % (request, num_fpgas))
lock = threading.Lock()
for fpga_ in fpga_list:
lock.acquire()
req = fpga_.nb_request(request, None, reply_cb, *request_args)
requests[req['host']] = [req['request'], req['id']]
lock.release()
LOGGER.debug('Request \'%s\' id(%s) to host(%s)' % (
req['request'], req['id'], req['host']))
# wait for replies from the requests
timedout = False
done = False
while (not timedout) and (not done):
try:
it = reply_queue.get(block=True, timeout=timeout)
except:
timedout = True
break
replies[it[0]] = it[1]
if len(replies) == num_fpgas:
done = True
if timedout:
LOGGER.error('non_blocking_request timeout after %is.' % timeout)
LOGGER.error(replies)
raise RuntimeError('non_blocking_request timeout after %is.' % timeout)
# process the replies
returnval = {}
for fpga_ in fpga_list:
try:
request_id = replies[fpga_.host]
except KeyError:
LOGGER.error(replies)
raise KeyError(
'Didn\'t get a reply for FPGA \'%s\' so the request \'%s\' '
'probably didn\'t complete.' % (fpga_.host, request))
reply, informs = fpga_.nb_get_request_result(request_id)
frv = {'request': requests[fpga_.host][0],
'reply': reply.arguments[0],
'reply_args': reply.arguments}
informlist = []
for inf in informs:
informlist.append(inf.arguments)
frv['informs'] = informlist
returnval[fpga_.host] = frv
fpga_.nb_pop_request_by_id(request_id)
return returnval
def hosts_from_dhcp_leases(host_pref=None,
leases_file='/var/lib/misc/dnsmasq.leases'):
"""
Get a list of hosts from a leases file.
:param host_pref: the prefix of the hosts in which we're interested
:param leases_file: the file to read
"""
hosts = []
if host_pref is None:
host_pref = ['roach', 'skarab']
if not isinstance(host_pref, list):
host_pref = [host_pref]
with open(leases_file) as masqfile:
masqlines = masqfile.readlines()
for line in masqlines:
(leasetime, mac, ip, host, mac2) = line.replace('\n', '').split(' ')
for host_prefix in host_pref:
if host.startswith(host_prefix):
hosts.append(host if host != '*' else ip)
break
return hosts, leases_file
def deprogram_hosts(host_list):
"""
:param host_list:
"""
if len(host_list) == 0:
raise RuntimeError('No good carrying on without hosts.')
fpgas = threaded_create_fpgas_from_hosts(host_list)
running = threaded_fpga_function(fpgas, 10, 'is_running')
deprogrammed = []
to_deprogram = []
already_deprogrammed = []
for fpga in fpgas:
if running[fpga.host]:
deprogrammed.append(fpga.host)
to_deprogram.append(fpga)
else:
already_deprogrammed.append(fpga.host)
running = threaded_fpga_function(to_deprogram, 10, 'deprogram')
if len(deprogrammed) != 0:
print('%s: deprogrammed okay.' % deprogrammed)
if len(already_deprogrammed) != 0:
print('%s: already deprogrammed.' % already_deprogrammed)
threaded_fpga_function(fpgas, 10, 'disconnect')
# end