-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
192 lines (164 loc) · 5.67 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import base64
import io
import json
from cmd import Cmd
from concurrent.futures import ThreadPoolExecutor
from ipaddress import IPv4Network
from typing import Any, Tuple, List, Optional
from urllib.parse import urlencode
import requests
class Device:
def __init__(self, address, data):
self.address = str(address)
self.data = data
def __str__(self):
return f'({self.address})'
def __eq__(self, other):
if not isinstance(other, Device):
# don't attempt to compare against unrelated types
return NotImplemented
return self.address == other.address
def __hash__(self):
# necessary for instances to behave sanely in dicts and sets.
return hash((self.address,))
def send_command(self, command: str) -> Tuple["Device", Any]:
args = {'cmnd': command}
url = f'http://{self.address}:80/cm?{urlencode(args)}'
try:
resp = requests.get(url=url, timeout=5)
json = resp.json()
return self, json
except Exception as e:
return self, None
def backup(self) -> bool:
url = f'http://{self.address}:80/dl'
try:
resp = requests.get(url=url)
config = resp.raw()
self.data = {'config': base64.b64encode(config).decode('utf-8')}
return True
except Exception as e:
return False
def restore(self) -> bool:
if not self.data['config']:
return False
try:
# Can't work out how to do this properly with aiohttp, just use requests
# Sets internal tasmota state
url = f'http://{self.address}:80/rs?'
requests.get(url)
# Upload the config
url = f'http://{self.address}:80/u2'
f = io.BytesIO(base64.b64decode(self.data['config']))
response = requests.post(url, files={'u2': f})
if response and b"Successful" in response.content:
return True
print(response)
print(response.content)
return False
except Exception as e:
print(e)
return False
def get_config(self):
return self.data
class CommandParser(Cmd):
prompt = 'tasmotaManager> '
def __init__(self):
super().__init__()
self.devices: List[Device] = []
self.do_load()
@staticmethod
def scan_address(address) -> Optional[Device]:
device = Device(address, {})
_, response = device.send_command('Status 2')
if response is not None and 'tasmota' in response['StatusFWR']['Version']:
print(f'Found device: {address}')
return device
return None
def do_scan(self, address_range: str):
"""
Scan an address range for devices.
:param address_range: The address range to scan
"""
net_address = IPv4Network(address_range)
with ThreadPoolExecutor(max_workers=1024) as executor:
scans = []
for address in net_address:
scans.append(executor.submit(self.scan_address, address))
results = [scan.result() for scan in scans]
for result in results:
if result is not None and result not in self.devices:
self.devices.append(result)
def do_cmd(self, line):
"""
Execute a command on all devices
:arg line: Command to send
"""
with ThreadPoolExecutor(max_workers=len(self.devices) + 1) as executor:
threads = []
for device in self.devices:
threads.append(executor.submit(device.send_command, line))
results = [thread.result() for thread in threads]
for result in results:
print(f"{result[0]}: {result[1]}")
print(f"\n")
def default(self, line):
self.do_cmd(line)
def do_save(self, file="devices.json"):
"""
Save devices to file
:param file: Default file is devices.json
"""
config = {}
for device in self.devices:
config[str(device.address)] = device.get_config()
with open(file, 'w') as f:
json.dump(config, f, ensure_ascii=False, indent=4)
def do_load(self, file="devices.json"):
"""
Load devices from file
:param file: The file to load device config from
"""
try:
with open(file, 'r') as f:
config = json.load(f)
for address in config:
self.devices.append(Device(address, data=config[address]))
except:
pass
def do_print(self, _):
"""
Print all devices
"""
for device in self.devices:
print(device)
def do_backup(self, _):
"""
Backup all devices
"""
with ThreadPoolExecutor(max_workers=len(self.devices) + 1) as executor:
threads = []
for device in self.devices:
threads.append(executor.submit(device.backup))
results = [thread.results() for thread in threads]
print(results)
def do_restore(self, _):
"""
Restore all devices
"""
with ThreadPoolExecutor(max_workers=len(self.devices) + 1) as executor:
threads = []
for device in self.devices:
threads.append(executor.submit(device.restore))
results = [thread.results() for thread in threads]
print(results)
def do_quit(self, _):
"""
Quit!
"""
return True
def main():
parser = CommandParser()
parser.cmdloop()
if __name__ == '__main__':
main()