-
-
Notifications
You must be signed in to change notification settings - Fork 25
/
example.py
72 lines (57 loc) · 1.42 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import asyncio
from pathlib import Path as FilePath
from typing_extensions import Annotated
from kui.asgi import (
HTTPException,
HttpRoute,
Kui,
OpenAPI,
Path,
SocketRoute,
allow_cors,
required_method,
websocket,
)
async def homepage():
"""
Homepage
"""
return "Kuí"
async def message():
"""
Message
For testing server send event response
"""
async def message_gen():
for i in range(5):
await asyncio.sleep(app.state.wait_time)
yield {"id": i, "data": "hello"}
return message_gen()
async def sources(filepath: Annotated[str, Path()]):
"""
Return source files
"""
realpath = FilePath(".") / filepath.lstrip("./")
if realpath.exists() and realpath.is_file():
return realpath
else:
raise HTTPException(404)
async def ws():
await websocket.accept()
while not await websocket.is_disconnected():
await websocket.send_json({"data": "(^_^)"})
await asyncio.sleep(app.state.wait_time)
await websocket.close()
app = Kui(
routes=[
HttpRoute("/", homepage),
HttpRoute("/message", message),
HttpRoute("/sources/{filepath:any}", sources) @ required_method("GET"),
SocketRoute("/", ws),
],
http_middlewares=[
allow_cors(),
],
)
app.router <<= "/docs" // OpenAPI(template_name="redoc").routes
app.state.wait_time = 1