-
Notifications
You must be signed in to change notification settings - Fork 0
/
key_generator.py
executable file
·132 lines (119 loc) · 5.34 KB
/
key_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python
# Author: Sawood Alam <[email protected]>
#
# This class selects appropriate methdos for generating keys based on the policy.
import re
import time
import tldextract
from urlparse import urlparse
from surt import surt
class KeyGenerator(object):
"""Bind Profiler method based on the policy."""
def __init__(self, policy="H1P0"):
"""Initialize a basic archive profile object."""
print("{0} => Initializing the profiler with policy {1}".format(time.strftime("%Y-%m-%d %H:%M:%S"), policy))
self.policy_map = {
"HmPn": self._hmpn,
"DDom": self._ddom,
"DSub": self._dsub,
"DPth": self._dpth,
"DQry": self._dqry,
"DIni": self._dini
}
self.policy = policy
self.options = {}
pattern = re.compile("^H(?P<host>\d+|x)P(?P<path>\d+|x)$")
match = pattern.match(policy)
if match:
try:
self.options["max_host_segments"] = int(match.group("host"))
except ValueError:
self.options["max_host_segments"] = None
try:
self.options["max_path_segments"] = int(match.group("path"))
except ValueError:
self.options["max_path_segments"] = None
policy = "HmPn"
try:
self.generate_key = self.policy_map[policy]
except KeyError:
raise ValueError("Unrecognized profiling policy: {0}".format(self.policy))
def _hmpn(self, url):
return self._suburi(surt(url), max_host_segments=self.options["max_host_segments"], max_path_segments=self.options["max_path_segments"])
def _ddom(self, url):
ext = tldextract.extract(url)
urlseg = urlparse("http://" + url)
reg_dom = surt(ext.registered_domain)
if reg_dom[0].isalpha() and ")/" in reg_dom:
return reg_dom
def _dsub(self, url):
ext = tldextract.extract(url)
urlseg = urlparse("http://" + url)
reg_dom = surt(ext.registered_domain)
if reg_dom[0].isalpha() and ")/" in reg_dom:
subdom_len = 0
if ext.subdomain:
subdom_len = ext.subdomain.count(".") + 1
return "{0}{1}".format(reg_dom, subdom_len)
def _dpth(self, url):
ext = tldextract.extract(url)
urlseg = urlparse("http://" + url)
reg_dom = surt(ext.registered_domain)
if reg_dom[0].isalpha() and ")/" in reg_dom:
subdom_len = path_len = 0
if ext.subdomain:
subdom_len = ext.subdomain.count(".") + 1
if urlseg.path:
path_len = urlseg.path.strip("\n\r/").count("/") + 1
return "{0}{1}/{2}".format(reg_dom, subdom_len, path_len)
def _dqry(self, url):
ext = tldextract.extract(url)
urlseg = urlparse("http://" + url)
reg_dom = surt(ext.registered_domain)
if reg_dom[0].isalpha() and ")/" in reg_dom:
subdom_len = path_len = query_len = 0
if ext.subdomain:
subdom_len = ext.subdomain.count(".") + 1
if urlseg.path:
path_len = urlseg.path.strip("\n\r/").count("/") + 1
if urlseg.query:
query_len = urlseg.query.strip("?&").count("&") + 1
return "{0}{1}/{2}/{3}".format(reg_dom, subdom_len, path_len, query_len)
def _dini(self, url):
ext = tldextract.extract(url)
urlseg = urlparse("http://" + url)
reg_dom = surt(ext.registered_domain)
if reg_dom[0].isalpha() and ")/" in reg_dom:
subdom_len = path_len = query_len = 0
path_init = urlseg.path.strip("\n\r/")[:1]
if ext.subdomain:
subdom_len = ext.subdomain.count(".") + 1
if urlseg.path:
path_len = urlseg.path.strip("\n\r/").count("/") + 1
if urlseg.query:
query_len = urlseg.query.strip("?&").count("&") + 1
if not path_init.isalnum():
path_init = "-"
return "{0}{1}/{2}/{3}/{4}".format(reg_dom, subdom_len, path_len, query_len, path_init)
def _suburi(self, surt, max_host_segments=None, max_path_segments=None):
if surt[:2].isalpha() and ")/" in surt:
host, path = surt.split("?")[0].split(")", 1)
host_segments = host.strip(",").split(",")
if not isinstance(max_host_segments, int):
max_host_segments = len(host_segments)
if len(host_segments) > max_host_segments:
return ",".join(host_segments[0:max_host_segments]) + ")/"
path_segments = path.strip("/").split("/")
if not isinstance(max_path_segments, int):
max_path_segments = len(path_segments)
return host + ")/" + "/".join(path_segments[0:max_path_segments])
def generate_keys_from_files(self, inputs, output):
for inp in inputs:
print("{0} => Generating keys from {1}".format(time.strftime("%Y-%m-%d %H:%M:%S"), inp))
with open(inp) as f:
for line in f:
k = self.generate_key(line.split(" ")[0])
if k:
output.write(k + "\n")
if __name__ == "__main__":
print("Pass a policy parameter to create an object of KeyGenerator class then call generate_key or generate_keys_from_files methods on it.")