-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor_domains_sni.py
82 lines (63 loc) · 1.83 KB
/
monitor_domains_sni.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from collections import defaultdict
ip2queryShort=defaultdict(lambda: {})
ip2query=defaultdict(lambda: {})
def getIPQueryMappingShort():
return ip2queryShort
def getIPQueryMapping():
return ip2query
def sortDomainsByLastResolved(domains,reverse=False):
now=time.time()
domains.sort(reverse=reverse, key=lambda d: (lastDomainResolveTime(d) or now+2,lastDomainResolveTime(d,True) or now+1,d))
resolveShortTimeList={}
resolveTimeList={}
def wasResolved(domain,short=False):
"was domain or its short form resolved this session"
l = resolveTimeList
if short:
domain=ip2dns.shorten(domain)
l = resolveShortTimeList
resolved = l.get(domain,False)
return True if resolved else False
def wasResolvedBy(domain,ip=False,short=False):
raise NotImplementedError("todo")
def filterOnlyResolvedDomains(domains,short=False):
"return the domains that were actually resolved during this session (short=compare using short form)"
ret=[]
for d in domains:
if wasResolved(d,short):
ret.append(d)
if ret!=[]:
return ret
return False
# For prioritizing shown domains
def lastDomainResolveTime(domain,short=False):
l = resolveTimeList
if short:
domain=ip2dns.shorten(domain)
l=resolveShortTimeList
return l.get(domain)
import time
import ip2dns
import ipaddress
import config
def onData(entry):
pass
import select
import subprocess
import sys
process = subprocess.Popen(["sni-sniffer","--sniff",config.internal_interface], stdout=subprocess.PIPE)
for c in iter(lambda: process.stdout.read(1), b""):
sys.stdout.buffer.write(c)
def monitor():
reader = ndjson.reader(f)
try:
for post in reader:
print(post)
except KeyboardInterrupt as e:
return
import threading
def start_monitoring():
monitor_thread = threading.Thread(name='monitorthread', target=monitor, daemon=True)
monitor_thread.start()
if __name__ == '__main__':
monitor()