forked from CrowdStrike/zscaler-FalconX-integration
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lookup.py
69 lines (61 loc) · 2.27 KB
/
lookup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from app._util.logger import Logger
from requests.exceptions import HTTPError
import config as config
import requests
import json
import time
class LookUp():
def __init__(self, auth):
self.auth = auth
self.token = auth.get_token()
self.logger = Logger()
self.hostname = config.zs_hostname
self.lookup_url = self.hostname + "/api/v1/urlLookup"
self.headers = {
'content-type': "application/json",
'cache-control': "no-cache",
'cookie': "JSESSIONID=" + str(self.token)
}
def auth_refresh(self):
self.auth.refresh_token()
self.token = self.auth.get_token()
self.headers = {
'content-type': "application/json",
'cache-control': "no-cache",
'cookie': "JSESSIONID=" + str(self.token)
}
def url_model(self, classified_urls_json):
urls = []
db_categorized_urls = []
for url in classified_urls_json:
if url['urlClassificationsWithSecurityAlert']:
pass
elif 'urlClassifications' not in url:
urls.append(url['url'])
elif 'MISCELLANEOUS_OR_UNKNOWN' in url['urlClassifications']:
urls.append(url['url'])
else:
db_categorized_urls.append(url['url'])
ingestable_model = {
'urls': urls,
"dbCategorizedUrls": db_categorized_urls
}
return ingestable_model
def url_look_up(self, url_list):
payload = url_list
response = requests.request(
"POST", self.lookup_url, headers=self.headers, data=json.dumps(payload))
if response.status_code == 401:
self.logger.error("401 at " + self.lookup_url +
"; Attempting to reauthenticate")
self.auth_refresh()
return self.url_look_up(url_list)
else:
classified_urls_json = response.json()
# URL Look Up rate limit exceeded
# This shouldn't happen unless debugging
if 'Retry-After' in classified_urls_json:
return classified_urls_json
time.sleep(1)
ingestable_model = self.url_model(classified_urls_json)
return ingestable_model