Skip to content

Commit 6d05c24

Browse files
committed
build fast api fur aql dashboard
1 parent 5eb084c commit 6d05c24

File tree

1 file changed

+393
-0
lines changed

1 file changed

+393
-0
lines changed

archive_query_log/api/main.py

Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
from datetime import datetime
2+
from pathlib import Path
3+
from typing import NamedTuple, Type, Union
4+
5+
from elasticsearch_dsl.query import Exists, Query, Term
6+
from expiringdict import ExpiringDict
7+
from fastapi import FastAPI
8+
from fastapi.middleware.cors import CORSMiddleware
9+
from mergedeep import Strategy, merge
10+
from pydantic import BaseModel
11+
from yaml import safe_load
12+
13+
from archive_query_log.config import Config
14+
from archive_query_log.orm import Archive, Provider, Source, Capture, \
15+
BaseDocument, Serp, Result, UrlQueryParser, UrlPageParser, \
16+
UrlOffsetParser, WarcQueryParser, WarcSnippetsParser
17+
from archive_query_log.utils.time import utc_now
18+
19+
_CACHE_SECONDS_STATISTICS = 60 * 5 # 5 minutes
20+
_CACHE_SECONDS_PROGRESS = 60 * 10 # 10 minutes
21+
22+
_DEFAULT_CONFIG_PATH = Path("../../config.yml")
23+
_DEFAULT_CONFIG_OVERRIDE_PATH = Path("../../config.override.yml")
24+
_DEFAULT_CONFIG_PATHS = []
25+
if _DEFAULT_CONFIG_PATH.exists():
26+
_DEFAULT_CONFIG_PATHS.append(_DEFAULT_CONFIG_PATH)
27+
if _DEFAULT_CONFIG_OVERRIDE_PATH.exists():
28+
_DEFAULT_CONFIG_PATHS.append(_DEFAULT_CONFIG_OVERRIDE_PATH)
29+
30+
31+
class Statistics(BaseModel):
32+
name: str
33+
description: str
34+
total: int
35+
disk_size: str | None
36+
last_modified: datetime | None
37+
38+
39+
class Progress(BaseModel):
40+
input_name: str
41+
output_name: str
42+
description: str
43+
total: int
44+
current: int
45+
46+
47+
DocumentType = Type[BaseDocument]
48+
49+
_statistics_cache: dict[
50+
tuple[DocumentType, str],
51+
Statistics,
52+
] = ExpiringDict(
53+
max_len=100,
54+
max_age_seconds=_CACHE_SECONDS_STATISTICS,
55+
)
56+
57+
58+
def _convert_bytes(bytes_count: int) -> str:
59+
step_unit = 1000.0
60+
bytes_count_decimal: float = bytes_count
61+
for unit in ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB", "RB"]:
62+
if bytes_count_decimal < step_unit:
63+
return f"{bytes_count_decimal:3.1f}{unit}"
64+
bytes_count_decimal /= step_unit
65+
return f"{bytes_count_decimal:3.1f} QB"
66+
67+
68+
def _get_statistics(
69+
config: Config,
70+
name: str,
71+
description: str,
72+
document: DocumentType,
73+
filter_query: Query | None = None,
74+
) -> Statistics:
75+
key = (document, repr(filter_query))
76+
if key in _statistics_cache:
77+
return _statistics_cache[key]
78+
79+
document.index().refresh(using=config.es.client)
80+
stats = document.index().stats(using=config.es.client)
81+
search = document.search(using=config.es.client)
82+
if filter_query is not None:
83+
search = search.filter(filter_query)
84+
total = search.count()
85+
last_modified_response = (
86+
search
87+
.query(Exists(field="last_modified"))
88+
.sort("-last_modified")
89+
.extra(size=1)
90+
.execute()
91+
)
92+
if last_modified_response.hits.total.value == 0:
93+
last_modified = None
94+
else:
95+
last_modified = last_modified_response.hits[0].last_modified
96+
97+
statistics = Statistics(
98+
name=name,
99+
description=description,
100+
total=total,
101+
disk_size=str(
102+
_convert_bytes(stats["_all"]["total"]["store"]["size_in_bytes"])
103+
if filter_query is None else None
104+
),
105+
last_modified=last_modified,
106+
)
107+
_statistics_cache[key] = statistics
108+
return statistics
109+
110+
111+
_progress_cache: dict[
112+
tuple[DocumentType, str, str],
113+
Progress,
114+
] = ExpiringDict(
115+
max_len=100,
116+
max_age_seconds=_CACHE_SECONDS_PROGRESS,
117+
)
118+
119+
120+
def _get_processed_progress(
121+
config: Config,
122+
input_name: str,
123+
output_name: str,
124+
description: str,
125+
document: DocumentType,
126+
status_field: str,
127+
filter_query: Query | None = None,
128+
) -> Progress:
129+
key = (document, repr(filter_query), status_field)
130+
if key in _progress_cache:
131+
return _progress_cache[key]
132+
133+
document.index().refresh(using=config.es.client)
134+
search = document.search(using=config.es.client)
135+
if filter_query is not None:
136+
search = search.filter(filter_query)
137+
total = search.count()
138+
search_processed = search.filter(Term(**{status_field: False}))
139+
total_processed = search_processed.count()
140+
progress = Progress(
141+
input_name=input_name,
142+
output_name=output_name,
143+
description=description,
144+
total=total,
145+
current=total_processed,
146+
)
147+
_progress_cache[key] = progress
148+
return progress
149+
150+
151+
app = FastAPI()
152+
153+
# CORS settings
154+
origins = [
155+
"http://localhost:3000",
156+
]
157+
158+
app.add_middleware(
159+
CORSMiddleware,
160+
allow_origins=origins,
161+
allow_credentials=True,
162+
allow_methods=["*"],
163+
allow_headers=["*"],
164+
)
165+
166+
@app.get("/statistics")
167+
def get_statistics():
168+
169+
if len(_DEFAULT_CONFIG_PATHS ) == 0:
170+
raise RuntimeError("No config file specified.")
171+
config_dict: dict = {}
172+
for config_path in _DEFAULT_CONFIG_PATHS :
173+
with config_path.open("rb") as config_file:
174+
next_config_dict = safe_load(config_file)
175+
merge(config_dict, next_config_dict, strategy=Strategy.REPLACE)
176+
config: Config = Config.from_dict(config_dict)
177+
178+
statistics_list = [
179+
_get_statistics(
180+
config=config,
181+
name="Archives",
182+
description="Web archiving services that offer CDX and Memento APIs.",
183+
document=Archive,
184+
),
185+
_get_statistics(
186+
config=config,
187+
name="Providers",
188+
description="Search providers, i.e., websites that offer a search functionality.",
189+
document=Provider,
190+
),
191+
_get_statistics(
192+
config=config,
193+
name="Sources",
194+
description="The cross product of all archives and the provider's domains and URL prefixes.",
195+
document=Source,
196+
),
197+
_get_statistics(
198+
config=config,
199+
name="Captures",
200+
description="Captures matching from the archives that match domain and URL prefixes.",
201+
document=Capture,
202+
),
203+
_get_statistics(
204+
config=config,
205+
name="SERPs",
206+
description="Search engine result pages that have been identified among the captures.",
207+
document=Serp,
208+
),
209+
_get_statistics(
210+
config=config,
211+
name="+ URL query",
212+
description="SERPs for which the query has been parsed from the URL.",
213+
document=Serp,
214+
filter_query=Exists(field="url_query"),
215+
),
216+
_get_statistics(
217+
config=config,
218+
name="+ URL page",
219+
description="SERPs for which the page has been parsed from the URL.",
220+
document=Serp,
221+
filter_query=Exists(field="url_page"),
222+
),
223+
_get_statistics(
224+
config=config,
225+
name="+ URL offset",
226+
description="SERPs for which the offset has been parsed from the URL.",
227+
document=Serp,
228+
filter_query=Exists(field="url_offset"),
229+
),
230+
_get_statistics(
231+
config=config,
232+
name="+ WARC",
233+
description="SERPs for which the WARC has been downloaded.",
234+
document=Serp,
235+
filter_query=Exists(field="warc_location"),
236+
),
237+
_get_statistics(
238+
config=config,
239+
name="+ WARC query",
240+
description="SERPs for which the query has been parsed from the WARC.",
241+
document=Serp,
242+
filter_query=Exists(field="warc_query"),
243+
),
244+
_get_statistics(
245+
config=config,
246+
name="+ WARC snippets",
247+
description="SERPs for which the snippets have been parsed from the WARC.",
248+
document=Serp,
249+
filter_query=Exists(field="warc_snippets_parser.id"),
250+
),
251+
_get_statistics(
252+
config=config,
253+
name="Results",
254+
description="Search result from the SERPs.",
255+
document=Result,
256+
),
257+
_get_statistics(
258+
config=config,
259+
name="URL query parsers",
260+
description="Parser to get the query from a SERP's URL.",
261+
document=UrlQueryParser,
262+
),
263+
_get_statistics(
264+
config=config,
265+
name="URL page parsers",
266+
description="Parser to get the page from a SERP's URL.",
267+
document=UrlPageParser,
268+
),
269+
_get_statistics(
270+
config=config,
271+
name="URL offset parsers",
272+
description="Parser to get the offset from a SERP's URL.",
273+
document=UrlOffsetParser,
274+
),
275+
_get_statistics(
276+
config=config,
277+
name="WARC query parsers",
278+
description="Parser to get the query from a SERP's WARC contents.",
279+
document=WarcQueryParser,
280+
),
281+
_get_statistics(
282+
config=config,
283+
name="WARC snippets parsers",
284+
description="Parser to get the snippets from a SERP's WARC contents.",
285+
document=WarcSnippetsParser,
286+
),
287+
]
288+
289+
return statistics_list
290+
291+
292+
@app.get("/progress")
293+
def get_progress():
294+
295+
if len(_DEFAULT_CONFIG_PATHS ) == 0:
296+
raise RuntimeError("No config file specified.")
297+
config_dict: dict = {}
298+
for config_path in _DEFAULT_CONFIG_PATHS :
299+
with config_path.open("rb") as config_file:
300+
next_config_dict = safe_load(config_file)
301+
merge(config_dict, next_config_dict, strategy=Strategy.REPLACE)
302+
config: Config = Config.from_dict(config_dict)
303+
304+
progress_list = [
305+
_get_processed_progress(
306+
config=config,
307+
input_name="Archives",
308+
output_name="Sources",
309+
description="Build sources for all archives.",
310+
document=Archive,
311+
filter_query=~Exists(field="exclusion_reason"),
312+
status_field="should_build_sources",
313+
),
314+
_get_processed_progress(
315+
config=config,
316+
input_name="Providers",
317+
output_name="Sources",
318+
description="Build sources for all search providers.",
319+
document=Provider,
320+
filter_query=~Exists(field="exclusion_reason"),
321+
status_field="should_build_sources",
322+
),
323+
_get_processed_progress(
324+
config=config,
325+
input_name="Sources",
326+
output_name="Captures",
327+
description="Fetch CDX captures for all domains and prefixes in the sources.",
328+
document=Source,
329+
status_field="should_fetch_captures",
330+
),
331+
_get_processed_progress(
332+
config=config,
333+
input_name="Captures",
334+
output_name="SERPs",
335+
description="Parse queries from capture URLs.",
336+
document=Capture,
337+
status_field="url_query_parser.should_parse",
338+
),
339+
_get_processed_progress(
340+
config=config,
341+
input_name="SERPs",
342+
output_name="SERPs",
343+
description="Parse page from SERP URLs.",
344+
document=Serp,
345+
status_field="url_page_parser.should_parse",
346+
),
347+
_get_processed_progress(
348+
config=config,
349+
input_name="SERPs",
350+
output_name="SERPs",
351+
description="Parse offset from SERP URLs.",
352+
document=Serp,
353+
status_field="url_offset_parser.should_parse",
354+
),
355+
_get_processed_progress(
356+
config=config,
357+
input_name="SERPs",
358+
output_name="SERPs",
359+
description="Download WARCs.",
360+
document=Serp,
361+
filter_query=Term(capture__status_code=200),
362+
status_field="warc_downloader.should_download",
363+
),
364+
_get_processed_progress(
365+
config=config,
366+
input_name="SERPs",
367+
output_name="SERPs",
368+
description="Parse query from WARC contents.",
369+
document=Serp,
370+
filter_query=Exists(field="warc_location"),
371+
status_field="warc_query_parser.should_parse",
372+
),
373+
_get_processed_progress(
374+
config=config,
375+
input_name="SERPs",
376+
output_name="SERPs",
377+
description="Parse snippets from WARC contents.",
378+
document=Serp,
379+
filter_query=Exists(field="warc_location"),
380+
status_field="warc_snippets_parser.should_parse",
381+
),
382+
_get_processed_progress(
383+
config=config,
384+
input_name="Results",
385+
output_name="Results",
386+
description="Download WARCs.",
387+
document=Result,
388+
filter_query=Exists(field="snippet.url"),
389+
status_field="warc_downloader.should_download",
390+
),
391+
]
392+
393+
return progress_list

0 commit comments

Comments
 (0)