forked from ayushsharma82/AsyncElegantOTA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ota_update.py
109 lines (86 loc) · 2.94 KB
/
ota_update.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import requests
import argparse
import pathlib
import hashlib
import sys
import os
from zeroconf import ServiceBrowser, Zeroconf
from concurrent.futures import Future
from requests.auth import HTTPDigestAuth
class ESPFinder:
def __init__(self, espid):
self.espid = espid
self.expected_suffix = f'-{espid}'
self.future = Future()
self.wait = self.future.result
def remove_service(self, zeroconf, type, name):
pass
def decode(self, value):
return value.decode(errors='ignore', encoding='utf-8')
def add_service(self, zeroconf, type, name):
if self.future.done():
return
info = zeroconf.get_service_info(type, name)
addresses = info.parsed_addresses()
if not addresses:
return
host = addresses[0]
def found():
self.future.set_result(host)
properties = { self.decode(k): self.decode(v) for k, v in info.properties.items() }
if properties.get('espid') == self.espid:
return found()
name2 = info.get_name()
if name2 and name2.endswith(self.expected_suffix):
return found()
name3 = properties.get('name')
if name3 and name3.endswith(self.expected_suffix):
return found()
def find_esp(espid, timeout=10):
zeroconf = Zeroconf()
listener = ESPFinder(espid)
browser = ServiceBrowser(zeroconf, "_http._tcp.local.", listener)
return listener.wait(timeout)
def parse_args():
p = argparse.ArgumentParser()
p.add_argument('-f', '--firmware')
p.add_argument('--espid')
p.add_argument('--host')
p.add_argument('--user', default=os.environ.get('OTA_USER', 'ota'))
p.add_argument('--password', default=os.environ.get('OTA_PASSWORD', ''))
return p.parse_args()
def main():
args = parse_args()
if not args.host:
if not args.espid:
print('host or espid must be specified')
return 1
args.host = find_esp(args.espid)
print(f'address of {args.espid} is {args.host}')
# return
auth = HTTPDigestAuth(args.user, args.password)
url = f'http://{args.host}/update'
r = requests.get(f'{url}/identity', auth=auth)
r.raise_for_status()
device_id = r.json()['id']
if args.espid and args.espid != device_id:
print(f'ESP ID mismatch: wanted {args.espid}, got {device_id}')
return 1
print(f'ESP ID verified: {device_id}')
if not args.firmware:
print('no firmware file specified')
return 2
fwpath = pathlib.Path(args.firmware)
fwdata = fwpath.read_bytes()
fwhash = hashlib.md5(fwdata).hexdigest()
print(f'firmware hash is md5={fwhash}')
r = requests.post(url, auth=auth, files={
"MD5": (None, fwhash),
"firmware": ("firmware", fwdata, "application/octet-stream"),
})
print(r)
print(r.text)
if __name__ == '__main__':
ret = main()
if ret:
sys.exit(ret)