|
| 1 | +import socket |
| 2 | +from time import sleep |
| 3 | +import json |
| 4 | +try: |
| 5 | + import requests |
| 6 | +except: |
| 7 | + import urequests as requests |
| 8 | + |
| 9 | +# UPnP SSDP Search request header |
| 10 | +HEADER = b"""M-SEARCH * HTTP/1.1\r |
| 11 | +HOST: 239.255.255.250:1900\r |
| 12 | +MAN: "ssdp:discover"\r |
| 13 | +ST: ssdp:all\r |
| 14 | +MX: 3\r |
| 15 | +\r |
| 16 | +""" |
| 17 | + |
| 18 | +class Bridge: |
| 19 | + """Provides methods for connecting to and using Hue Bridge. Supports |
| 20 | + Micropython, Python 2, and 3.""" |
| 21 | + |
| 22 | + def __init__(self,autosetup=True, debug=1): |
| 23 | + self.debug = debug #0=no prints, 1=messages, 2=debug |
| 24 | + self.IP = None |
| 25 | + self.username = None |
| 26 | + if autosetup: |
| 27 | + self.setup() |
| 28 | + |
| 29 | + |
| 30 | + def show(self,str,level=1): |
| 31 | + """ Show debug output. """ |
| 32 | + if self.debug >= level: |
| 33 | + print(str) |
| 34 | + |
| 35 | + |
| 36 | + def setup(self): |
| 37 | + """ Loads bridge settings or attempts to establish them, if needed.""" |
| 38 | + success = self.loadSettings() |
| 39 | + if success: |
| 40 | + # verify bridge settings work |
| 41 | + try: |
| 42 | + self.idLights() |
| 43 | + success = True |
| 44 | + except: |
| 45 | + success = False |
| 46 | + if not success: |
| 47 | + if self.discover(): |
| 48 | + self.show('Bridge located at {}'.format(self.IP)) |
| 49 | + self.show('>>> Press link button on Hue bridge to register <<<') |
| 50 | + if self.getUsername(): |
| 51 | + success = self.saveSettings() |
| 52 | + else: |
| 53 | + self.show("Couldn't get username from bridge.") |
| 54 | + else: |
| 55 | + self.show("Couldn't find bridge on LAN.") |
| 56 | + return success |
| 57 | + |
| 58 | + |
| 59 | + def discover(self): |
| 60 | + """ Locate Hue Bridge IP using UPnP SSDP search. Discovery will return |
| 61 | + when bridge is found or 3 seconds after last device response. Returns IP |
| 62 | + address or None.""" |
| 63 | + #On ESP8266, disable AP WLAN to force use of STA interface |
| 64 | + #import network |
| 65 | + #ap = network.WLAN(network.AP_IF) |
| 66 | + #ap.active(False) |
| 67 | + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 68 | + s.sendto(HEADER, ('239.255.255.250',1900)) #UPnP Multicast |
| 69 | + s.settimeout(3) |
| 70 | + |
| 71 | + IP = None |
| 72 | + while IP == None: |
| 73 | + data, addr = s.recvfrom(1024) |
| 74 | + self.show(str(data),2) |
| 75 | + lines = data.split(b'\r\n') |
| 76 | + for l in lines: |
| 77 | + tokens = l.split(b' ') |
| 78 | + if tokens[0] == b'SERVER:': |
| 79 | + product = tokens[3].split(b'/') |
| 80 | + if product[0] == b'IpBridge': |
| 81 | + IP = str(addr[0]) |
| 82 | + break |
| 83 | + |
| 84 | + s.close() |
| 85 | + self.IP = IP |
| 86 | + return IP |
| 87 | + |
| 88 | + |
| 89 | + def getUsername(self): |
| 90 | + """ Get a developer API username from bridge. |
| 91 | + Requires that the bridge link button be pressed sometime while polling. |
| 92 | + Polls for 20 seconds (20 attempts at 1 second intervals). |
| 93 | + Can timeout with error if bridge is non-responsive. |
| 94 | + Returns username on success or None on failure.""" |
| 95 | + url = 'http://{}/api'.format(self.IP) |
| 96 | + data = '{"devicetype":"TapLight#mydevice"}' |
| 97 | + username = None |
| 98 | + count = 20 |
| 99 | + while count > 0 and username == None: |
| 100 | + resp = requests.post(url,data=data) |
| 101 | + if resp.status_code == 200: |
| 102 | + j = resp.json()[0] |
| 103 | + self.show(j,2) |
| 104 | + if j.get('success'): |
| 105 | + username = str(j['success']['username']) |
| 106 | + self.username = username |
| 107 | + sleep(1) |
| 108 | + count -= 1 |
| 109 | + return username |
| 110 | + |
| 111 | + |
| 112 | + def saveSettings(self): |
| 113 | + """ Save bridge IP and username to bridge.dat file. |
| 114 | + Returns True on success.""" |
| 115 | + if self.IP and self.username: |
| 116 | + f=open('bridge.dat','w') |
| 117 | + f.write(json.dumps([self.IP,self.username])) |
| 118 | + f.close() |
| 119 | + return True |
| 120 | + else: |
| 121 | + return None |
| 122 | + |
| 123 | + |
| 124 | + def loadSettings(self): |
| 125 | + """ Load bridge IP and username from bridge.dat file and set base URL. |
| 126 | + Returns True on success. """ |
| 127 | + try: |
| 128 | + f=open('bridge.dat') |
| 129 | + except: |
| 130 | + return None |
| 131 | + l = json.load(f) |
| 132 | + f.close() |
| 133 | + self.IP = str(l[0]) |
| 134 | + self.username = str(l[1]) |
| 135 | + self.show('Loaded settings {} {}'.format(self.IP,self.username),2) |
| 136 | + return True |
| 137 | + |
| 138 | + |
| 139 | + def resetSettings(self): |
| 140 | + """Delete current saved bridge settings and reinitiate.""" |
| 141 | + from os import remove |
| 142 | + remove('bridge.dat') |
| 143 | + self.IP = None |
| 144 | + self.username = None |
| 145 | + self.setup() |
| 146 | + |
| 147 | + |
| 148 | + def url(self,path): |
| 149 | + """Return url for API calls.""" |
| 150 | + return 'http://{}/api/{}/{}'.format(self.IP,self.username,path) |
| 151 | + |
| 152 | + |
| 153 | + def get(self, path): |
| 154 | + """Perform GET request and return json result.""" |
| 155 | + url = self.url(path) |
| 156 | + self.show(url,2) |
| 157 | + resp = requests.get(url).json() |
| 158 | + self.show(resp,2) |
| 159 | + return resp |
| 160 | + |
| 161 | + |
| 162 | + def put(self, path, data): |
| 163 | + """Perform PUT request and return response.""" |
| 164 | + url = self.url(path) |
| 165 | + self.show(url,2) |
| 166 | + data = json.dumps(data) |
| 167 | + self.show(data,2) |
| 168 | + resp = requests.put(url, data=data).json() |
| 169 | + self.show(resp,2) |
| 170 | + return resp |
| 171 | + |
| 172 | + |
| 173 | + def allLights(self): |
| 174 | + """Returns dictionary containing all lights, with detail.""" |
| 175 | + """Large return set, not ideal for controllers with limited RAM.""" |
| 176 | + return self.get('lights') |
| 177 | + |
| 178 | + |
| 179 | + def idLights(self): |
| 180 | + """Returns list of all light IDs.""" |
| 181 | + ids = self.get('groups/0')['lights'] |
| 182 | + for i in range(len(ids)): |
| 183 | + ids[i] = int(ids[i]) |
| 184 | + return ids |
| 185 | + |
| 186 | + |
| 187 | + def getLight(self,id): |
| 188 | + """Returns dictionary of light details for given ID.""" |
| 189 | + return self.get('lights/{}'.format(str(id))) |
| 190 | + |
| 191 | + |
| 192 | + def getLights(self): |
| 193 | + """Iterates through each light to build and return a dictionary |
| 194 | + of light IDs and names.""" |
| 195 | + dict = {} |
| 196 | + for i in self.idLights(): |
| 197 | + dict[i] = str(self.getLight(i)['name']) |
| 198 | + return dict |
| 199 | + |
| 200 | + |
| 201 | + def setLight(self,id,**kwargs): |
| 202 | + """Set one or more states of a light. |
| 203 | + Ex: setLight(1,on=True,bri=254,hue=50000,sat=254)""" |
| 204 | + self.put('lights/{}/state'.format(str(id)),kwargs) |
| 205 | + |
| 206 | + |
| 207 | + def allGroups(self): |
| 208 | + """Returns dictionary containing all groups, with detail.""" |
| 209 | + return self.get('groups') |
| 210 | + |
| 211 | + |
| 212 | + def getGroup(self,id): |
| 213 | + """Returns dictionary of group details.""" |
| 214 | + return self.get('groups/{}'.format(str(id))) |
| 215 | + |
| 216 | + |
| 217 | + def getGroups(self): |
| 218 | + """Returns dictionary of group IDs and names.""" |
| 219 | + dict = {} |
| 220 | + groups = self.allGroups() |
| 221 | + for g in groups: |
| 222 | + dict[int(g)] = str(groups[g]['name']) |
| 223 | + return dict |
| 224 | + |
| 225 | + |
| 226 | + def setGroup(self,id,**kwargs): |
| 227 | + """Set one or more states of a group. |
| 228 | + Ex: setGroup(1,bri_inc=100,transitiontime=40)""" |
| 229 | + self.put('groups/{}/action'.format(str(id)),kwargs) |
| 230 | + |
| 231 | + |
0 commit comments