-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathwifi_receiver.py
executable file
·155 lines (138 loc) · 5.49 KB
/
wifi_receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
# (c) 2024 B.Kerler
import os
import sys
from subprocess import Popen, PIPE, STDOUT
import argparse
import json
from Library.utils import search_interfaces, get_iw_interfaces, extract_wifi_if_details, enable_monitor_mode, \
set_interface_channel, cexec, enable_managed_mode
from OpenDroneID.wifi_parser import oui_to_parser
from scapy.all import *
from scapy.layers.dot11 import Dot11EltVendorSpecific, Dot11, Dot11Elt
import zmq
verbose = False
context = zmq.Context()
socket = None
def pcapng_parser(filename: str):
while True:
for packet in PcapReader(filename):
try:
filter_frames(packet)
except Exception as err:
pass
except KeyboardInterrupt:
break
def filter_frames(packet: Packet) -> None:
global socket
global verbose
macdb = {}
pt = packet.getlayer(Dot11)
# subtype 0 = Management, 0x8 = Beacon, 0x13 = Action
# NAN Service Discovery Frames shall be encoded in 0x13 and contain DRI Info
# NAN Synchronization Beacon shall be encoded in 0x8 but doesn't contain DRI Info
# Broadcast Message can only happen on channel 6 and contains DRI Info
if pt is not None and pt.subtype in [0, 0x8, 0x13]:
if packet.haslayer(Dot11EltVendorSpecific): # check vendor specific ID -> 221
vendor_spec: Dot11EltVendorSpecific = packet.getlayer(Dot11EltVendorSpecific)
mac = packet.payload.addr2
macdb["DroneID"] = {}
macdb["DroneID"][mac] = []
while vendor_spec:
parser = oui_to_parser(vendor_spec.oui, vendor_spec.info)
if parser is not None:
if "DRI" in parser.msg:
macdb["DroneID"][mac] = parser.msg["DRI"]
elif "Beacon" in parser.msg:
macdb["DroneID"][mac] = parser.msg["Beacon"]
if socket:
socket.send_string(json.dumps(macdb))
if not socket or verbose:
print(json.dumps(macdb))
break
def main():
global verbose
global socket
info = "Host-side receiver for OpenDrone ID wifi (c) B.Kerler 2024"
print(info)
aparse = argparse.ArgumentParser(description=info)
aparse.add_argument("-z", "--zmq", action="store_true", help="Enable zmq")
aparse.add_argument("--zmqsetting", default="127.0.0.1:4223", help="Define zmq server settings")
aparse.add_argument("--interface", help="Define zmq host")
aparse.add_argument("--pcap", help="Use pcap file")
aparse.add_argument("-v", "--verbose", action="store_true", help="Print messages")
args = aparse.parse_args()
current_python_executable = cexec(["readlink", "-f", f"{sys.executable}"]).replace("\n", "")
res = cexec(["getcap", f"{current_python_executable}"])
if not "cap_net_admin" in res or not "cap_net_raw" in res:
print(
f"Please run: \"sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' {current_python_executable}\" before running this script.")
exit(1)
interfaces = search_interfaces()
if args.verbose:
verbose = True
if args.interface is None and args.pcap is None:
interface = get_iw_interfaces(interfaces)
elif args.interface is not None:
interface = args.interface
elif args.pcap is not None:
interface = None
else:
print("--pcap [file.pcapng] or --interface [wifi_monitor_interface] needed")
exit(1)
if interface is not None:
i2d = extract_wifi_if_details(interface)
if not enable_monitor_mode(i2d, interface):
sys.stdout.flush()
exit(1)
print("Setting wifi channel 6")
set_interface_channel(interface,6)
zthread = None
if args.zmq:
socket = context.socket(zmq.XPUB)
socket.setsockopt(zmq.XPUB_VERBOSE, True)
url = f"tcp://{args.zmqsetting}"
socket.setsockopt(zmq.XPUB_VERBOSE, True)
socket.bind(url)
def zmq_thread(socket):
try:
while True:
event = socket.recv()
# Event is one byte 0=unsub or 1=sub, followed by topic
if event[0] == 1:
log("new subscriber for", event[1:])
elif event[0] == 0:
log("unsubscribed", event[1:])
except zmq.error.ContextTerminated:
pass
def log(*msg):
s = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("%s:" % s, *msg, end="\n", file=sys.stderr)
from threading import Thread
zthread = Thread(target=zmq_thread, args=[socket], daemon=True, name='zmq')
zthread.start()
if interface is not None:
sniffer = AsyncSniffer(
iface=interface,
lfilter=lambda s: s.getlayer(Dot11).subtype==0x8,
prn=filter_frames,
)
sniffer.start()
print(f"Starting sniffer on interface {interface}")
while True:
try:
sniffer.join()
time.sleep(1)
except KeyboardInterrupt:
break
print(f"Stopping sniffer on interface {interface}")
sniffer.stop()
if args.zmq:
zthread.join()
if interface is not None:
i2d = extract_wifi_if_details(interface)
enable_managed_mode(i2d,interface)
else:
pcapng_parser(args.pcap)
if __name__ == "__main__":
main()