-
Notifications
You must be signed in to change notification settings - Fork 0
/
subsystem.py
87 lines (81 loc) · 2.72 KB
/
subsystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
import websockets
import json, os, time, signal, subprocess
import RPi.GPIO as GPIO
from smbus2 import SMBus
from mlx90614 import MLX90614
shutdown = False
camera = subprocess.Popen("node camera", stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid)
bus = SMBus(1)
sensor = MLX90614(bus, address=0x5A)
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
buzzer = 14
button = 4
red = 19
green = 26
GPIO.setup(buzzer, GPIO.OUT)
GPIO.setup(green, GPIO.OUT)
GPIO.setup(red, GPIO.OUT)
GPIO.setup(button, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def beep(times, delay):
for x in range(times):
GPIO.output(buzzer, GPIO.HIGH)
time.sleep(delay)
GPIO.output(buzzer, GPIO.LOW)
time.sleep(delay)
def flash(times, delay, LED):
for x in range(times):
GPIO.output(LED, GPIO.HIGH)
time.sleep(delay)
GPIO.output(LED, GPIO.LOW)
time.sleep(delay)
async def echo(websocket, path):
async for message in websocket:
try:
request = json.loads(message)
if(request == "temperature"):
if(GPIO.input(button) == False):
try:
bus.close()
os.killpg(os.getpgid(camera.pid), signal.SIGTERM)
except:
pass
await websocket.close(1000)
os.system('sudo shutdown')
quit()
else:
try:
temperature = round(sensor.get_object_1()+4, 1)
except:
await websocket.close(1000)
os.system('sudo shutdown')
quit()
await websocket.send(json.dumps({"temperature": temperature}))
if(temperature >= 38.5):
beep(5, 0.5)
flash(5, 0.5)
elif(temperature >= 35.5):
beep(2, 0.2)
flash(2, 0.2, green)
elif(request=="warn"):
beep(5, 0.5)
elif(request == "code"):
f = open("codes.txt", "r")
await websocket.send(json.dumps({"code": f.read()}))
f.close()
except KeyboardInterrupt:
try:
bus.close()
os.killpg(os.getpgid(camera.pid), signal.SIGTERM)
except:
pass
await websocket.close(1000)
quit()
starter = websockets.serve(echo, "localhost", 8081)
asyncio.get_event_loop().run_until_complete(starter)
try:
print("Server is running")
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
print("Server is stopping")