forked from Jrohy/multi-v2ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
204 lines (166 loc) · 5.85 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import urllib.request
import os
import re
from enum import Enum, unique
from OpenSSL import crypto
class ColorStr:
RED = '\033[31m' # 红色
GREEN = '\033[32m' # 绿色
YELLOW = '\033[33m' # 黄色
BLUE = '\033[34m' # 蓝色
FUCHSIA = '\033[35m' # 紫红色
CYAN = '\033[36m' # 青蓝色
WHITE = '\033[37m' # 白色
#: no color
RESET = '\033[0m' # 终端默认颜色
@classmethod
def red(cls, s):
return cls.RED + s + cls.RESET
@classmethod
def green(cls, s):
return cls.GREEN + s + cls.RESET
@classmethod
def yellow(cls, s):
return cls.YELLOW + s + cls.RESET
@classmethod
def blue(cls, s):
return cls.BLUE + s + cls.RESET
@classmethod
def cyan(cls, s):
return cls.CYAN + s + cls.RESET
@classmethod
def fuchsia(cls, s):
return cls.FUCHSIA + s + cls.RESET
@classmethod
def white(cls, s):
return cls.WHITE + s + cls.RESET
@unique
class StreamType(Enum):
TCP = 'tcp'
TCP_HOST = 'tcp_host'
SOCKS = 'socks'
SS = 'ss'
MTPROTO = 'mtproto'
H2 = 'h2'
WS = 'ws'
QUIC = 'quic'
KCP = 'kcp'
KCP_UTP = 'utp'
KCP_SRTP = 'srtp'
KCP_DTLS = 'dtls'
KCP_WECHAT = 'wechat'
KCP_WG = 'wireguard'
def stream_list():
return [
StreamType.KCP_WG,
StreamType.KCP_DTLS,
StreamType.KCP_WECHAT,
StreamType.KCP_UTP,
StreamType.KCP_SRTP,
StreamType.MTPROTO,
StreamType.SOCKS,
StreamType.SS
]
def header_type_list():
return ("none", "srtp", "utp", "wechat-video", "dtls", "wireguard")
def ss_method():
return ("aes-256-cfb", "aes-128-cfb", "chacha20",
"chacha20-ietf", "aes-256-gcm", "aes-128-gcm", "chacha20-poly1305")
def get_ip():
"""
获取本地ip
"""
my_ip = urllib.request.urlopen('http://api.ipify.org').read()
return bytes.decode(my_ip)
def port_is_use(port):
"""
判断端口是否占用
"""
cmd = "lsof -i:" + str(port)
result = os.popen(cmd).readlines()
return result != []
def is_email(email):
"""
判断是否是邮箱格式
"""
str=r'^[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+){0,4}@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+){0,4}$'
return re.match(str, email)
def get_domain_by_crt_file(crt_path):
"""
通过证书文件获取域名, 证书文件有误或不存在则返回空
"""
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(crt_path).read())
except:
return
return cert.get_subject().CN
def bytes_2_human_readable(number_of_bytes, precision=1):
"""
流量bytes转换为kb, mb, gb等单位
"""
if number_of_bytes < 0:
raise ValueError("!!! number_of_bytes can't be smaller than 0 !!!")
step_to_greater_unit = 1024.
number_of_bytes = float(number_of_bytes)
unit = 'bytes'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'KB'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'MB'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'GB'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'TB'
number_of_bytes = round(number_of_bytes, precision)
return str(number_of_bytes) + ' ' + unit
def gen_cert(domain):
service_name = ["v2ray", "nginx", "httpd", "apache2"]
start_cmd = "service {} start >/dev/null 2>&1"
stop_cmd = "service {} stop >/dev/null 2>&1"
if not os.path.exists("/root/.acme.sh/acme.sh"):
os.system("curl https://get.acme.sh | sh")
get_ssl_cmd = "bash /root/.acme.sh/acme.sh --issue -d " + domain + " --standalone --keylength ec-256"
for name in service_name:
os.system(stop_cmd.format(name))
os.system(get_ssl_cmd)
for name in service_name:
os.system(start_cmd.format(name))
def calcul_iptables_traffic(port):
traffic_result = os.popen("bash /usr/local/multi-v2ray/global_setting/calcul_traffic.sh {}".format(str(port))).readlines()
if traffic_result:
traffic_list = traffic_result[0].split()
upload_traffic = bytes_2_human_readable(int(traffic_list[0]), 2)
download_traffic = bytes_2_human_readable(int(traffic_list[1]), 2)
total_traffic = bytes_2_human_readable(int(traffic_list[2]), 2)
return "{0}: upload:{1} download:{2} total:{3}".format(ColorStr.green(str(port)),
ColorStr.cyan(upload_traffic), ColorStr.cyan(download_traffic), ColorStr.cyan(total_traffic))
def clean_iptables(port):
clean_cmd = "iptables -D {0} {1}"
check_cmd = "iptables -nvL %s --line-number|grep -w \"%s\"|awk '{print $1}'|sort -r"
input_result = os.popen(check_cmd % ("INPUT", str(port))).readlines()
for line in input_result:
os.system(clean_cmd.format("INPUT", str(line)))
output_result = os.popen(check_cmd % ("OUTPUT", str(port))).readlines()
for line in output_result:
os.system(clean_cmd.format("OUTPUT", str(line)))
def open_port():
input_cmd = "iptables -I INPUT -p {0} --dport {1} -j ACCEPT"
output_cmd = "iptables -I OUTPUT -p {0} --sport {1}"
check_cmd = "iptables -nvL --line-number|grep -w \"%s\""
from loader import Loader
group_list = Loader().profile.group_list
port_set = set([group.port for group in group_list])
for port in port_set:
port_str = str(port)
if len(os.popen(check_cmd % port_str).readlines()) > 0:
continue
os.system(input_cmd.format("tcp", port_str))
os.system(input_cmd.format("udp", port_str))
os.system(output_cmd.format("tcp", port_str))
os.system(output_cmd.format("udp", port_str))