-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqr_code_receiver.py
46 lines (35 loc) · 1.32 KB
/
qr_code_receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from sanic import Sanic, Request, Websocket, text
from sanic_ext import render
import json
app = Sanic("TecDaysQRReceiver")
app.ctx.open_web_sockets = {}
app.ctx.current_counter = 0
@app.get("/qr/counter_inc")
async def inc_counter(request: Request):
app.ctx.current_counter = app.ctx.current_counter + 1
to_remove = []
for ws in app.ctx.open_web_sockets.values():
try:
await ws.send(json.dumps({
'counter': app.ctx.current_counter,
'user-agent': request.headers.get('User-Agent'),
'remote-ip': request.headers.get('X-Forwarded-For')
}))
except Exception:
to_remove.append(ws)
for ws in to_remove:
del app.ctx.open_web_sockets[ws]
return await render("counter_inc.html", context={'counter': app.ctx.current_counter - 1}, status=200)
@app.get("/qr/counter_reset")
async def reset_counter(request):
app.ctx.current_counter = 0
return text("OK counter reset")
@app.websocket("/qr/feed")
async def feed(request: Request, ws: Websocket):
app.ctx.open_web_sockets[ws] = ws
async for msg in ws:
await ws.send(msg)
@app.route('/qr')
async def handle_request(request):
return await render("index.html", context={'counter': app.ctx.current_counter}, status=200)
app.run(host="0.0.0.0", port=8090)