-
Notifications
You must be signed in to change notification settings - Fork 0
/
scanfordevices.py
128 lines (106 loc) · 5.19 KB
/
scanfordevices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# ==================================================================================
# File: scanfordevices.py
# Author: Larry W Jordan Jr ([email protected])
# Use: Scans for BLW Devices and updates the cache file
#
# Online: www.hackinmakin.com
#
# (c) 2020 Larouex Software Design LLC
# This code is licensed under MIT license (see LICENSE.txt for details)
# ==================================================================================
import time, logging, string, json, os, binascii, struct, threading, asyncio
from bluepy.btle import Scanner, DefaultDelegate, Peripheral
from classes.devicescache import DevicesCache
# -------------------------------------------------------------------------------
# Delegate Class to Handle Discovered Devices
# -------------------------------------------------------------------------------
class ScanDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
def HandleDiscovery(self,dev,new_dev,new_dat):
if new_dev:
pass
if new_dat:
pass
# -------------------------------------------------------------------------------
# ScanDevices Class
# -------------------------------------------------------------------------------
class ScanForDevices():
timer = None
timer_ran = False
dcm_value = None
def __init__(self, Log, IFace, ResetHCI, ScanSeconds):
self.logger = Log
self.data = []
self.new_devices = []
self.characteristics = []
self.iface = IFace
self.resethci = ResetHCI
self.scan_seconds = ScanSeconds
async def scan_for_devices(self):
self.logger.info("resethci: %s" % str(self.resethci))
self.logger.info("Bluetooth Interface: %s" % str(self.iface))
# Write flag
new_devices_discovered = False
# Load the Devices Cache File for any devices
# that have already been provisioned
devicescache = DevicesCache(self.logger)
# Make a working copy of the cache file
self.data = devicescache.data
if self.resethci:
try:
self.logger.info("hciconfig down..." )
os.system("sudo hciconfig hci%s down" % self.iface)
self.logger.info("hciconfig up..." )
os.system("sudo hciconfig hci%s up" % self.iface)
except:
self.logger.warning("We encountered an error executing OS command hciconfig" )
try:
# Scan the BLE's that are Advertising
self.logger.warning("Please Wait: Scanning for BLE Devices Advertising...(%s Seconds)" % self.scan_seconds)
scanner = Scanner(self.iface).withDelegate(ScanDelegate())
devices = scanner.scan(float(self.scan_seconds))
# Check the devices and add the ones that match the pattern indicated in
# the devicescache.json file element [DeviceNamePrefix]...
for device in devices:
devicename = str(device.getValueText(9))
if [x for x in self.data["Devices"] if x.get("Address")==device.addr]:
self.logger.warning("[ALREADY DISCOVERED, SKIPPING] %s" % devicename)
else:
# Check our Pattern for Matching Names
new_device = False
dcm_for_device = None
for name_dcm in self.data["DeviceCapabilityModels"]:
if devicename.startswith(name_dcm["DeviceNamePrefix"]):
self.logger.warning("[FOUND NEW DEVICE] %s" % devicename)
new_device = True
newDevice = {
"DeviceName": devicename,
"Address": str(device.addr),
"LastRSSI": "%s dB" % str(device.rssi),
"DCM": name_dcm["DCM"],
"DeviceInfoInterface": name_dcm["DeviceInfoInterface"],
"DeviceInfoInterfaceInstanceName": name_dcm["DeviceInfoInterfaceInstanceName"],
"NanoBLEInterface": name_dcm["NanoBLEInterface"],
"NanoBLEInterfaceInstanceName": name_dcm["NanoBLEInterfaceInstanceName"],
"LastProvisioned": None
}
if new_device:
new_devices_discovered = True
# Associate the IoTC Device Template
self.logger.info("[NEW DEVICE INFO] %s" % newDevice)
# Merge the Data with the New Devices, this is the
# one we commit to the devicescache.json when completed
self.data["Devices"].append(newDevice)
# working list of just the new devices in this session
self.new_devices.append(newDevice)
else:
pass
except Exception as ex:
self.logger.error("[ERROR] %s" % ex)
self.logger.error("[TERMINATING] We encountered an error scanning for BLE Devices" )
return
if new_devices_discovered:
# Update the Cache
devicescache.update_file(self.data)
return