-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathutils.py
53 lines (44 loc) · 1.57 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import json
import datetime
import decimal
import requests
from constant import SLACK_HOOK
IGNORED_LIST = ['row_id', 'row_created', 'row_updated']
class Utils:
def __init__(self):
self.msg_type = {
0 : 'info:: ',
1 : 'err:: '
}
def report_slack(self, type, slack_msg):
url = SLACK_HOOK['hostname'] + SLACK_HOOK['path']
prefix = self.msg_type.get(type, "")
print(url)
payload = {"channel": "#contract-index-alerts",
"username": "webhookbot",
"text": prefix + slack_msg,
"icon_emoji": ":ghost:"
}
resp = requests.post(url=url, data=json.dumps(payload))
print(resp.status_code, resp.text)
def clean(self, value_list):
for value in value_list:
self.clean_row(value)
def clean_row(self, row):
for item in IGNORED_LIST:
del row[item]
for key in row:
if isinstance(row[key], decimal.Decimal) or isinstance(row[key], datetime.datetime):
row[key] = str(row[key])
elif isinstance(row[key], bytes):
if row[key] == b'\x01':
row[key] = 1
elif row[key] == b'\x00':
row[key] = 0
else:
raise Exception("Unsupported bytes object. Key " + str(key) + " value " + str(row[key]))
return row
def remove_http_https_prefix(self, url):
url = url.replace("https://","")
url = url.replace("http://","")
return url