-
Notifications
You must be signed in to change notification settings - Fork 0
/
lutron-bridge-server.py
159 lines (131 loc) · 5.01 KB
/
lutron-bridge-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import asyncio
import sys
from types import FunctionType
from pylutron_caseta.smartbridge import Smartbridge
import logging
import socket
import threading
import select
import optparse
from pathlib import Path
from utils.setup import generateAndSignCertificate, generatePrivateKey, pingLeapHost
debug_log = True
bind_ip = 'localhost'
bind_port = 5000
server = None
bridge : Smartbridge = None
lutron_client_key_name = "caseta.key"
lutron_client_key_path = Path.cwd().joinpath(lutron_client_key_name)
lutron_client_cert_name = "caseta.crt"
lutron_client_cert_path = Path.cwd().joinpath(lutron_client_cert_name)
lutron_bridge_ca_cert_name = "caseta-bridge.crt"
lutron_bridge_ca_cert_path = Path.cwd().joinpath(lutron_bridge_ca_cert_name)
lutron_loop = asyncio.new_event_loop()
asyncio.set_event_loop(lutron_loop)
logging.basicConfig(
level=logging.INFO if not debug_log else logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("bridge.log"),
logging.StreamHandler()
]
)
async def lutron_command(id: str, parameter: int):
state = -1
if parameter > 3:
state = await bridge.get_led_status(id)
else:
await bridge.sim_press_button(id, parameter)
return state
def handle_client_connection(client_socket: socket, stop: FunctionType):
while not stop():
# check if there is something in the socket, wait for one second to allow stopping thread
r, _, _ = select.select((client_socket,), [], [], 1)
if r:
# we have data in the socket
request = client_socket.recv(1024)
if not request:
# Socket has been closed
break
logging.info(f'Received request: {request}')
try:
elements = request.decode("utf-8").split(",")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
state = loop.run_until_complete(lutron_command(
elements[1].lstrip(), int(elements[2])))
if(state >= 0):
response = f"~DEVICE,{elements[1]},{state}"
logging.info(f'Response to Savant: {response}')
client_socket.send(response.encode())
except Exception as err:
logging.error(f'Error occured during lutron processing: {err}')
finally:
loop.close()
client_socket.close()
def setup_bridge(host: str):
logging.info(
"Lutron QSX processor has not been paired to this program yet")
logging.info("Generating private key")
priv_key = generatePrivateKey(lutron_client_key_path)
logging.info("Asking QSX processor to accept our key")
generateAndSignCertificate(
logging, host, priv_key, lutron_client_cert_path, lutron_bridge_ca_cert_path)
def start_bridge(host: str):
stop_client_sockets = False
client_socket_threads = []
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((bind_ip, bind_port))
server.listen(5) # max backlog of connections
logging.info('Listening on {}:{}'.format(bind_ip, bind_port))
while True:
try:
client_sock, address = server.accept()
logging.info('Accepted connection from {}:{}'.format(
address[0], address[1]))
client_handler = threading.Thread(
target=handle_client_connection,
args=(client_sock, lambda: stop_client_sockets)
)
client_socket_threads.append(client_handler)
client_handler.start()
except KeyboardInterrupt:
# CTRL-C pressed
logging.info("Shutting down bridge")
stop_client_sockets = True
for thread in client_socket_threads:
thread.join()
server.close()
lutron_loop.close()
sys.exit(0)
async def connect_to_lutron(host: str):
bridge = Smartbridge.create_tls(
host, lutron_client_key_name, lutron_client_cert_name, lutron_bridge_ca_cert_name,
systemType_str="homeworks"
)
await bridge.connect()
required_options = "host".split()
if __name__ == '__main__':
parser = optparse.OptionParser()
parser.add_option('--host', dest='host',
help='Hostname/IP Address of the Lutron QSX processor')
(options, args) = parser.parse_args()
for r in required_options:
if options.__dict__[r] is None:
print(f'Parameter {r} is required')
parser.print_help()
sys.exit(1)
host = options.host
# Check if client certificate has been generated
if not lutron_client_key_path.exists() or not lutron_client_cert_path.exists() or not lutron_bridge_ca_cert_path.exists():
setup_bridge(host)
logging.info("Verifying connection to QSX processor")
ping_res = pingLeapHost(host, lutron_client_cert_path,
lutron_client_cert_path, lutron_bridge_ca_cert_path)
if not ping_res:
logging.error("Failed to connect to QSX processor. Please try restarting this application and/or the lutron QSX processor. Otherwise delete all the certificate and start fresh")
sys.exit(1)
logging.info("Connecting to QSX processor")
lutron_loop.run_until_complete(connect_to_lutron(host))
start_bridge(host)