-
Notifications
You must be signed in to change notification settings - Fork 17
/
app.py
196 lines (174 loc) · 6.66 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from quart import Quart, jsonify, render_template, request, Response, redirect, send_from_directory, jsonify
import re, yaml, json, typing
import lostmediafinder
app = Quart(__name__)
with open('config.yml', 'r') as file:
config_yml = yaml.safe_load(file)
@app.route("/robots.txt")
async def robots():
return await send_from_directory("static", "robots.txt")
@app.route("/find/<id>")
async def youtubev2(id):
"""
Provides backwards compatibility for the old endpoint.
"""
return (await lostmediafinder.YouTubeResponse.generate(id)).coerce_to_api_version(2).json(), {"Content-Type": "application/json"}
async def wrapperYT(id, includeRaw):
"""
Wrapper for generate
"""
try:
return await lostmediafinder.YouTubeResponse.generate(id, includeRaw)
except lostmediafinder.types.InvalidVideoIdError:
return {"status": "bad.id", "id": None}
async def wrapperYTS(id, includeRaw):
"""
Wrapper for generateStream
"""
return await lostmediafinder.YouTubeResponse.generateStream(id, includeRaw)
@app.route("/api/v<int:v>/<site>/<id>")
@app.route("/api/v<int:v>/<id>")
async def youtube(v, id, site="youtube", jsn=True):
includeRaw = True
if v == 1:
return "This API version is no longer supported.", 410
if v not in (2, 3, 4):
return "Unrecognised API version", 404
if site == "youtube":
includeRaw = True
stream = False
if v >= 4:
stream = "stream" in request.args
# Versions 4 and higher only provide `rawraw` if you ask for it
includeRaw = "includeRaw" in request.args
if stream:
async def run():
r = (await wrapperYTS(id, includeRaw=includeRaw)).coerce_to_api_version(v)
async for item in r:
if type(item) == dict or item is None:
yield json.dumps(item) + "\n"
else:
yield item.json() + "\n"
return run(), {"Content-Type": "application/json"}
else:
r = (await wrapperYT(id, includeRaw=includeRaw)).coerce_to_api_version(v)
if jsn:
return r.json(), {"Content-Type": "application/json"}
return r
return "Unrecognised site", 404
@app.route("/noscript_init.html")
async def noscript_init():
if id := request.args.get("d"):
return redirect("/noscript_load.html?d=" + id)
return await render_template("noscript/init.j2")
ID_PATTERN = re.compile(r'^[A-Za-z0-9_-]{10}[AEIMQUYcgkosw048]$')
PATTERNS = [
re.compile(r'(?:https?://)?(?:\w+\.)?youtube\.com/watch/?\?v=([A-Za-z0-9_-]{10}[AEIMQUYcgkosw048])(?:[\/&].*)?', re.IGNORECASE),
re.compile(r'(?:https?://)?(?:\w+\.)?youtube.com/(?:v|embed|shorts|video)/([A-Za-z0-9_-]{10}[AEIMQUYcgkosw048])(?:[\/&].*)?', re.IGNORECASE),
re.compile(r'(?:https?://)?youtu.be/([A-Za-z0-9_-]{10}[AEIMQUYcgkosw048])(?:\?.*)?', re.IGNORECASE),
re.compile(r'(?:https?://)?filmot.com/video/([A-Za-z0-9_-]{10}[AEIMQUYcgkosw048])(?:\?.*)?', re.IGNORECASE)
]
def coerce_to_id(vid):
if not vid:
return None
if re.match(ID_PATTERN, vid):
return vid
for pattern in PATTERNS:
newVid = re.sub(pattern, lambda match: match.group(1), vid)
if re.match(ID_PATTERN, newVid):
return newVid
return None
def get_enabled_methods():
titles = []
for key in config_yml["methods"]:
method = config_yml["methods"][key]
if method["enabled"]:
titles.append(method["title"])
return titles
@app.route("/noscript_load.html")
async def noscript_load():
if not request.args.get("d"):
return "No d param provided - It should be the video id or url", 400
id = coerce_to_id(request.args['d'])
if not id:
return await render_template("noscript/error.j2", inp=request.args['d']), 400
headers = (("FinUrl", f"/noscript_load_thing.j2?id={id}"),)
response = Response(await render_template("noscript/loading.j2", id=id), headers=headers)
return response, 302
@app.route("/api/coerce_to_id")
async def coerce_to_id_endpoint():
if not request.args.get("d"):
return '"No d param provided"', 400
id = coerce_to_id(request.args['d'])
if not id:
return '"Unable to find a video ID"', 400
return {"data":id}
@app.route("/noscript_load_thing.html")
async def load_thing():
if not request.args.get("id"):
return "Missing id parameter", 400
t = await youtube(3, request.args['id'], "youtube", jsn=False)
return await render_template("noscript/fid.j2", resp=t)
@app.route("/")
async def index():
"""
Shows the landing page
"""
default = request.args.get("q") or ""
default_id = coerce_to_id(default) or ""
return await render_template("index.j2", default=default, default_id=default_id, methods=get_enabled_methods())
# The following code should be taken out and shot
def parse_changelog(changelog):
"""
Parses a changelog out of a lostmediafinder docstring
"""
parsed = {}
for i in changelog.split("API VERSION "):
restOfLine = i.split("\n")[0]
others = i.split("\n")[1:]
parsed[restOfLine] = others
return parsed
class E:
"""
What does this do? Good question
"""
name: str
type: str
def __init__(self, name, type):
name = name.strip()
type = type.strip()
self.name = name
self.type = type
self.type = self.type.rstrip(")")
async def parse_lines(lines: list[str]) -> dict[E, str]:
"""
Parses lines into a mapping
"""
r = {}
for line in lines:
line = line.strip()
splitted = line.split(":", 1)
field = E(*splitted[0].split(" ("))
description = splitted[1].strip()
r[field] = description
return r
@app.route("/api")
async def api():
"""
API docs
"""
responseDocstring = lostmediafinder.YouTubeResponse.__doc__
serviceDocstring = lostmediafinder.Service.__doc__
changelog = [{}, {}]
rChangelog = responseDocstring.split("=Changelog=")
sChangelog = serviceDocstring.split("=Changelog=")
responseDocstring = rChangelog[0]
serviceDocstring = sChangelog[0]
if len(rChangelog) > 1:
changelog[0] = parse_changelog(rChangelog[1].strip())
if len(sChangelog) > 1:
changelog[1] = parse_changelog(sChangelog[1].strip())
# Parse the attributes list
responseDocstring = await parse_lines(rChangelog[0].split("Attributes:\n")[1].strip().split("\n"))
serviceDocstring = await parse_lines(sChangelog[0].split("Attributes:\n")[1].strip().split("\n"))
return await render_template("api.j2", fields=responseDocstring, services=serviceDocstring, changelog=changelog)