-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathripe-lg-graph.py
312 lines (228 loc) · 9.55 KB
/
ripe-lg-graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import argparse
import dns.resolver
import pydot
import ipaddress
import os
import random
import requests
import shutil
import sys
import typing
from datetime import datetime
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from types import TracebackType
looking_glass_url = "https://stat.ripe.net/data/looking-glass/data.json"
# See: https://stat.ripe.net/docs/data_api#RulesOfUsage
sourceapp_name = "ripe-lg-graph_py"
# DON'T TOUCH IF YOU DON'T KNOW WHAT YOU'RE DOING
target_folder = datetime.now().strftime("%Y-%m-%d %H%M%S")
target = ""
class AddressOrPrefixNotFoundError(Exception):
"""Exception raised for errors in the input.
Attributes:
message -- explanation of the error
"""
def __init__(self, message):
super().__init__(message)
def is_valid(address_prefix:str) -> bool:
try:
ipaddress.ip_address(address_prefix)
return True
except ValueError:
try:
ipaddress.ip_network(address_prefix, strict=True)
return True
except ValueError:
return False
def process_rrc_options(raw_rrc_list_str:str) -> typing.Union[str, typing.List[str]]:
returning_list = []
if raw_rrc_list_str.isdigit():
rrc_id = int(raw_rrc_list_str)
returning_list.append(f"{rrc_id:02}")
elif "," in raw_rrc_list_str:
raw_list = raw_rrc_list_str.split(",")
for rrc_id_str in raw_list:
if not rrc_id_str.isdigit():
continue
rrc_id = int(rrc_id_str)
returning_list.append(f"{rrc_id:02}")
if len(returning_list) != 0:
return returning_list
else:
return ""
def form_params(resource_param:str="") -> typing.Dict[str, str]:
params = {
"sourceapp": sourceapp_name,
"soft_limit": "ignore"
}
if (resource_param != ""):
params["resource"] = resource_param
return params
def get_rrc_data(address_prefix:str, rrc_list:typing.Union[str, typing.List[str]]="") -> typing.Dict[str, typing.Dict[str, typing.Union[str, typing.List[str]]]]:
parted_url = list(urlparse(looking_glass_url))
query = dict(parse_qsl(parted_url[4]))
query.update(form_params(address_prefix))
parted_url[4] = urlencode(query)
final_url = urlunparse(parted_url)
print("Contacting RIPE NCC RIS looking glass...")
r = requests.get(final_url)
r.raise_for_status()
data = r.json()
if (data["messages"]):
for message_array in data["messages"]:
if (message_array[0].lower() == "error"):
raise Exception(message_array[1])
else:
print(f"RIPE {message_array[0]}: {message_array[1]}")
if (len(data["data"]["rrcs"]) == 0):
raise AddressOrPrefixNotFoundError("Prefix or address is not found on RIPE NCC's RIS.")
global target
target = data["data"]["parameters"]["resource"]
raw_returning_data = {}
for rrc_dict in data["data"]["rrcs"]:
rrc_name = rrc_dict['rrc']
if (rrc_name not in raw_returning_data):
raw_returning_data[rrc_name] = {
"location": rrc_dict["location"],
"paths": []
}
for rrc_peer in rrc_dict["peers"]:
as_path = rrc_peer["as_path"]
as_path_list = []
# this is done to strip prepends
for as_number in as_path.split(" "):
if ("".join(as_path_list[-1:]) == as_number):
continue
as_path_list.append(as_number)
raw_returning_data[rrc_name]["paths"].append(" ".join(as_path_list))
if rrc_list == "":
print("Processing all available RRCs...")
returning_data = dict(sorted(raw_returning_data.items()))
else:
pre_proc_returning_data = {}
for rrc in rrc_list:
rrc_name = f"RRC{rrc}"
if rrc_name not in raw_returning_data:
print(f"{rrc_name} is either invalid or not found, skipping...")
continue
pre_proc_returning_data[rrc_name] = raw_returning_data[rrc_name]
if len(pre_proc_returning_data) == 0:
print("None of the specified RRCs are in the result list, passing all RRCs...")
pre_proc_returning_data = raw_returning_data
returning_data = dict(sorted(pre_proc_returning_data.items()))
return returning_data
def query_asn_info(asn:str) -> str:
try:
data = dns.resolver.resolve(f"AS{asn}.asn.cymru.com", "TXT").response.answer[0][0].to_text().replace("'","").replace('"','')
except:
return " "*5
return [ field.strip() for field in data.split("|") ]
def get_as_name(_as:str) -> str:
if not _as:
return "AS?????"
if not _as.isdigit():
return _as.strip()
name = query_asn_info(_as)[-1].replace(" ","\r",1)
return f"AS{_as} | {name}"
def make_bgpmap(rrc:str, rrc_data_dict:typing.Dict[str, typing.Union[str, typing.List[str]]]) -> True:
rrc_full = f"{rrc} - {rrc_data_dict['location']}"
print(f"Now processing: {rrc_full}")
graph = pydot.Dot('BGPMAP', graph_type='digraph')
nodes = {}
edges = {}
def escape(label):
label = label.replace("&", "&")
label = label.replace(">", ">")
label = label.replace("<", "<")
return label
def add_node(_as, **kwargs):
carriage_return = "\r"
if _as not in nodes:
kwargs["label"] = f"<<TABLE CELLBORDER=\"0\" BORDER=\"0\" CELLPADDING=\"0\" CELLSPACING=\"0\"><TR><TD ALIGN=\"CENTER\">{escape(kwargs.get('label', get_as_name(_as))).replace(carriage_return,'<BR/>')}</TD></TR></TABLE>>"
nodes[_as] = pydot.Node(_as, style="filled", fontsize="10", **kwargs)
graph.add_node(nodes[_as])
return nodes[_as]
def add_edge(_previous_as, _as, **kwargs):
kwargs["splines"] = "true"
force = kwargs.get("force", False)
edge_tuple = (_previous_as, _as)
if force or edge_tuple not in edges:
edge = pydot.Edge(*edge_tuple, **kwargs)
graph.add_edge(edge)
edges[edge_tuple] = edge
elif "label" in kwargs and kwargs["label"]:
e = edges[edge_tuple]
label_without_star = kwargs["label"].replace("*", "")
if e.get_label() is not None:
labels = e.get_label().split("\r")
else:
return edges[edge_tuple]
if "%s*" % label_without_star not in labels:
labels = [ kwargs["label"] ] + [ l for l in labels if not l.startswith(label_without_star) ]
labels = sorted(labels, key=lambda x: x.endswith("*") and -1 or 1)
label = escape("\r".join(labels))
e.set_label(label)
return edges[edge_tuple]
add_node(rrc_full, label=rrc_full, shape="box", fillcolor="#F5A9A9")
previous_as = None
first = True
for asmap in rrc_data_dict["paths"]:
previous_as = rrc_full
color = "#%x" % random.randint(0, 16777215)
hop = False
hop_label = ""
for _as in asmap.split(" "):
if not hop:
hop = True
hop_label = _as
if first:
hop_label = hop_label + "*"
if (_as == asmap[-1]):
add_node(_as, fillcolor="#F5A9A9", shape="box")
else:
add_node(_as, fillcolor=(first and "#F5A9A9" or "white"))
if hop_label:
edge = add_edge(nodes[previous_as], nodes[_as], label=hop_label, fontsize="7")
else:
edge = add_edge(nodes[previous_as], nodes[_as], fontsize="7")
hop_label = ""
if (first or _as == asmap[-1]):
edge.set_style("bold")
edge.set_color("red")
elif edge.get_style() != "bold":
edge.set_style("dashed")
edge.set_color(color)
previous_as = _as
first = False
add_node("Prefix", label=target, fillcolor="#F5A9A9", shape="box")
final_edge = add_edge(nodes[_as], nodes["Prefix"], fontsize="7")
final_edge.set_style("bold")
final_edge.set_color("red")
graph.write(f"./output/{target_folder}/png/{rrc_full}.png", format="png")
graph.write(f"./output/{target_folder}/svg/{rrc_full}.svg", format="svg")
return True
def except_clearence_hook(exctype:typing.Type[BaseException], value:BaseException, traceback:TracebackType) -> None:
if (os.path.exists(f"./output/{target_folder}")):
shutil.rmtree(f"./output/{target_folder}")
sys.__excepthook__(exctype, value, traceback)
sys.excepthook = except_clearence_hook
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Create a routing graph with data from RIPE NCC's RIS.")
parser.add_argument(
"--rrc", help="ID(s) of the RRC for graphing, process all if none specified (comma seperated if multiple)", type=str, required=False, default=""
)
parser.add_argument(
"address_prefix", help="IP prefix or address, will not search for the nearest announced object.", type=str
)
args = parser.parse_args()
if (is_valid(args.address_prefix)):
rrc_path_data = get_rrc_data(args.address_prefix, process_rrc_options(args.rrc))
os.makedirs(f"./output/{target_folder}/png")
os.makedirs(f"./output/{target_folder}/svg")
for rrc, rrc_data_dict in rrc_path_data.items():
make_bgpmap(rrc, rrc_data_dict)
print("\nDone!")
else:
raise AddressOrPrefixNotFoundError("Entered address or prefix is invalid.")