-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_scrapers.py
179 lines (151 loc) · 7.27 KB
/
run_scrapers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/python
from __future__ import print_function
import time
import argparse
import ConfigParser
import pprint
from scrapers.agis import AGIS # EOL is near
from scrapers.rebus import REBUS # EOL is near
from scrapers.cric import CRIC
from scrapers.grafana import Grafana
from scrapers.elasticsearch import ElasticSearch
from maps import PQ_names_map as pq_map
import logging
from commonHelpers.logger import logger
from commonHelpers import notifications
# do some configurations
config = ConfigParser.ConfigParser()
config.read("config.cfg")
logger = logger.getChild("mephisto")
parser = argparse.ArgumentParser(description="Run a set of JSON/web scrapers")
parser.add_argument("--debug", action="store_true", help="print debug messages")
parser.add_argument(
"-interval", default="10m", help="Defines which scrapers are being run"
)
argparse = parser.parse_args()
if argparse.debug:
logger.setLevel(logging.DEBUG)
def run():
# Each time the scrapers are run, we update the PQ map
pqs = pq_map.PQ_names_map(file="data/map_PQ_names.json")
if not pqs.update(
ifile="data/scraped_cric_pandaqueue.json",
ofile="data/map_PQ_names.json",
key="panda_resource",
):
logger.warning("PQ map is not available")
if argparse.interval == "10m":
# Now run all the scrapers that should run in 10min intervals
# First the PQ CRIC information
cric = CRIC()
raw_data = cric.download(
url="https://atlas-cric.cern.ch/api/atlas/pandaqueue/query/?json"
)
json_data = cric.convert(data=raw_data, sort_field="panda_resource")
if cric.save(file="data/scraped_cric_pandaqueue.json", data=json_data):
logger.info("Scraped PQ CRIC")
else:
logger.error("Problem scraping PQ CRIC")
elif argparse.interval == "1h":
# Run all the scrapers that only need to be run once per hour (because they don't change too often)
# Next the ATLAS sites CRIC information
cric = CRIC()
raw_data = cric.download(
url="https://atlas-cric.cern.ch/api/atlas/site/query/?json"
)
json_data = cric.convert(data=raw_data, sort_field="name")
if cric.save(file="data/scraped_cric_sites.json", data=json_data):
logger.info("Scraped sites CRIC")
else:
logger.error("Problem scraping sites CRIC")
# Now the DDM info from CRIC
raw_data = cric.download(
url="https://atlas-cric.cern.ch/api/atlas/ddmendpoint/query/?json"
)
json_data = cric.convert(data=raw_data, sort_field="site")
if cric.save(file="data/scraped_cric_ddm.json", data=json_data):
logger.info("Scraped DDM CRIC")
else:
logger.error("Problem scraping DDM CRIC")
# Next up is REBUS, start with the actual federation map
rebus = REBUS()
raw_data = rebus.download(
url="https://wlcg-cric.cern.ch/api/core/federation/query/?json"
)
json_data = rebus.convert(data=raw_data, sort_field="rcsites")
if rebus.save(file="data/scraped_rebus_federations.json", data=json_data):
logger.info("Scraped federations CRIC")
else:
logger.error("Problem scraping federations CRIC")
# then the pledges
# can actually use same JSON raw data as before
json_data = rebus.convert(
data=raw_data, sort_field="accounting_name", append_mode=True
)
if rebus.save(file="data/scraped_rebus_pledges.json", data=json_data):
logger.info("Scraped pledges CRIC")
else:
logger.error("Problem scraping pledges CRIC")
# we also get datadisk information from monit Grafana
url = config.get("credentials_monit_grafana", "url")
token = config.get("credentials_monit_grafana", "token")
now = int(round(time.time() * 1000))
date_to = now - 12 * 60 * 60 * 1000
date_from = date_to - 24 * 60 * 60 * 1000
period = """"gte":{0},"lte":{1}""".format(date_from, date_to)
data = (
"""{"search_type":"query_then_fetch","ignore_unavailable":true,"index":["monit_prod_rucioacc_enr_site*"]}\n{"size":0,"query":{"bool":{"filter":[{"range":{"metadata.timestamp":{"""
+ period
+ ""","format":"epoch_millis"}}},{"query_string":{"analyze_wildcard":true,"query":"data.account:* AND data.campaign:* AND data.country:* AND data.cloud:* AND data.datatype:* AND data.datatype_grouped:* AND data.prod_step:* AND data.provenance:* AND data.rse:* AND data.scope:* AND data.experiment_site:* AND data.stream_name:* AND data.tier:* AND data.token:(\\\"ATLASDATADISK\\\" OR \\\"ATLASSCRATCHDISK\\\") AND data.tombstone:(\\\"primary\\\" OR \\\"secondary\\\") AND NOT(data.tombstone:UNKNOWN) AND data.rse:/.*().*/ AND NOT data.rse:/.*(none).*/"}}]}},"aggs":{"4":{"terms":{"field":"data.rse","size":500,"order":{"_term":"desc"},"min_doc_count":1},"aggs":{"1":{"sum":{"field":"data.files"}},"3":{"sum":{"field":"data.bytes"}}}}}}\n"""
)
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": "Bearer %s" % token,
}
grafana = Grafana(url=url, request=data, headers=headers)
raw_data = grafana.download()
pprint.pprint(raw_data)
json_data = grafana.convert(data=raw_data.json())
if grafana.save(file="data/scraped_grafana_datadisk.json", data=json_data):
logger.info("Scraped datadisks from monit grafana")
else:
logger.error("Problem scraping datadisks from monit grafana")
# TODO: not running ES scraper for now since the benchmark jobs are no longer being run
# #get credentials
# password = config.get("credentials_elasticsearch", "password")
# username = config.get("credentials_elasticsearch", "username")
# host = config.get("credentials_elasticsearch", "host")
# arg = ([{'host': host, 'port': 9200}])
# elasticsearch = ElasticSearch(arg,**{'http_auth':(username, password)})
# kwargs = {
# 'index' : "benchmarks-*",
# 'body' : {
# "size" : 10000,"query" : {"match_all" : {},},
# "collapse": {"field": "metadata.PanDAQueue","inner_hits": {"name": "most_recent","size": 50,"sort": [{"timestamp": "desc"}]}
# }
# },
# 'filter_path' : [""]
# }
# raw_data = elasticsearch.download(**kwargs)
# json_data = elasticsearch.convert(data=raw_data)
#
# if elasticsearch.save(file='data/scraped_elasticsearch_benchmark.json', data=json_data):
# logger.info('Scraped benchmark results from ES')
# else:
# logger.error('Problem scraping benchmark results from ES')
else:
# Nothing to do otherwise
print("Dropping out")
if __name__ == "__main__":
try:
run()
except Exception, e:
logger.error("Got error while running scrapers. " + str(e))
msg = "QMonit failed to run a scraper job.\n\nError:\n" + str(e)
subj = "[QMonit error] InfluxDB"
notifications.send_email(
message=msg,
subject=subj,
**{"password": config.get("credentials_adcmon", "password")}
)