forked from electricitymaps/electricitymaps-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_parser.py
executable file
·92 lines (76 loc) · 3.07 KB
/
test_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
import time
import arrow
import click
import datetime
from utils.parsers import PARSER_KEY_TO_DICT
@click.command()
@click.argument('zone')
@click.argument('data-type', default='production')
@click.option('--target_datetime', default=None, show_default=True)
def test_parser(zone, data_type, target_datetime):
"""
Parameters
----------
zone: a two letter zone from the map
data_type: in ['production', 'exchangeForecast', 'production', 'exchange',
'price', 'consumption', 'generationForecast', 'consumptionForecast']
target_datetime: string parseable by arrow, such as 2018-05-30 15:00
Examples
-------
>>> python test_parser.py NO-NO3-\>SE exchange
parser result:
{'netFlow': -51.6563, 'datetime': datetime.datetime(2018, 7, 3, 14, 38, tzinfo=tzutc()), 'source': 'driftsdata.stattnet.no', 'sortedZoneKeys': 'NO-NO3->SE'}
---------------------
took 0.09s
min returned datetime: 2018-07-03 14:38:00+00:00
max returned datetime: 2018-07-03T14:38:00+00:00 UTC -- OK, <2h from now :) (now=2018-07-03T14:39:16.274194+00:00 UTC)
>>> python test_parser.py FR production
parser result:
[... long stuff ...]
---------------------
took 5.38s
min returned datetime: 2018-07-02 00:00:00+02:00
max returned datetime: 2018-07-03T14:30:00+00:00 UTC -- OK, <2h from now :) (now=2018-07-03T14:43:35.501375+00:00 UTC)
"""
if target_datetime:
target_datetime = arrow.get(target_datetime).datetime
start = time.time()
parser = PARSER_KEY_TO_DICT[data_type][zone]
if data_type in ['exchange', 'exchangeForecast']:
args = zone.split('->')
else:
args = [zone]
res = parser(*args, target_datetime=target_datetime)
if not res:
print('Error: parser returned nothing ({})'.format(res))
return
elapsed_time = time.time() - start
if isinstance(res, (list, tuple)):
res_list = list(res)
else:
res_list = [res]
try:
dts = [e['datetime'] for e in res_list]
except:
print('Parser output lacks `datetime` key for at least some of the '
'ouput. Full ouput: \n\n{}\n'.format(res))
return
assert all([type(e['datetime']) is datetime.datetime for e in res_list]), \
'Datetimes must be returned as native datetime.datetime objects'
last_dt = arrow.get(max(dts)).to('UTC')
first_dt = arrow.get(min(dts)).to('UTC')
max_dt_warning = ''
if not target_datetime:
max_dt_warning = (
' :( >2h from now !!!'
if (arrow.utcnow() - last_dt).total_seconds() > 2 * 3600
else ' -- OK, <2h from now :) (now={} UTC)'.format(arrow.utcnow()))
print('\n'.join(['parser result:', res.__str__(),
'---------------------',
'took {:.2f}s'.format(elapsed_time),
'min returned datetime: {} UTC'.format(first_dt),
'max returned datetime: {} UTC {}'.format(
last_dt, max_dt_warning), ]))
if __name__ == '__main__':
print(test_parser())