Skip to content

Commit

Permalink
Include index freshness in catalog, fixes #335, #361
Browse files Browse the repository at this point in the history
  • Loading branch information
pudo committed Nov 21, 2023
1 parent 77ef5f9 commit 36db979
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 28 deletions.
1 change: 1 addition & 0 deletions tests/fixtures/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ datasets:
- name: parteispenden
title: German political party donations
path: tests/fixtures/donations.ijson
version: "100"
17 changes: 16 additions & 1 deletion tests/unit/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ def test_algorithms():
assert len(data["algorithms"]) > 3


def test_catalog():
res = client.get("/catalog")
assert res.status_code == 200, res
data = res.json()
assert "datasets" in data
datasets = {d["name"]: d for d in data["datasets"]}
assert datasets["us_ofac_sdn"]["index_current"] is False
assert datasets["eu_fsf"]["index_current"] is True
donations = datasets["parteispenden"]
assert donations["load"] is True
assert donations["index_current"] is True
assert donations["index_version"] == "100"
assert donations["version"] == "100"


def test_updatez_get():
res = client.get("/updatez")
assert res.status_code == 405, res.text
Expand All @@ -47,7 +62,7 @@ def test_updatez_no_token_configured():


def test_updatez_no_token():
res = client.post(f"/updatez?sync=true")
res = client.post("/updatez?sync=true")
assert res.status_code == 403, res.text


Expand Down
26 changes: 26 additions & 0 deletions yente/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import time
import aiocron # type: ignore
from uuid import uuid4
from contextlib import asynccontextmanager
from elasticsearch import ApiError, TransportError
from fastapi import FastAPI
from fastapi import Request, Response
Expand All @@ -11,10 +13,33 @@
from yente import settings
from yente.logs import get_logger
from yente.routers import reconcile, search, match, admin
from yente.data import refresh_catalog
from yente.search.base import close_es
from yente.search.indexer import update_index_threaded

log = get_logger("yente")


async def cron_task() -> None:
await refresh_catalog()
if settings.AUTO_REINDEX:
update_index_threaded()


@asynccontextmanager
async def lifespan(app: FastAPI):
log.info(
"Setting up background refresh",
crontab=settings.CRONTAB,
auto_reindex=settings.AUTO_REINDEX,
)
settings.CRON = aiocron.crontab(settings.CRONTAB, func=cron_task)
if settings.AUTO_REINDEX:
update_index_threaded()
yield
await close_es()


async def request_middleware(
request: Request, call_next: RequestResponseEndpoint
) -> Response:
Expand Down Expand Up @@ -67,6 +92,7 @@ def create_app() -> FastAPI:
contact=settings.CONTACT,
openapi_tags=settings.TAGS,
redoc_url="/",
lifespan=lifespan,
)
app.middleware("http")(request_middleware)
app.add_middleware(
Expand Down
2 changes: 2 additions & 0 deletions yente/data/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class DatasetModel(BaseModel):
load: bool
entities_url: Optional[str] = None
version: str
index_version: Optional[str] = None
index_current: bool = False
children: List[str]


Expand Down
6 changes: 5 additions & 1 deletion yente/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, catalog: DataCatalog["Dataset"], data: Dict[str, Any]):
self.entities_url = self._get_entities_url(data)
namespace = as_bool(data.get("namespace"), False)
self.ns = Namespace(self.name) if namespace else None
self.index_version: Optional[str] = None

def _get_entities_url(self, data: Dict[str, Any]) -> Optional[str]:
if "entities_url" in data:
Expand All @@ -53,7 +54,10 @@ def _get_entities_url(self, data: Dict[str, Any]) -> Optional[str]:
def to_dict(self) -> Dict[str, Any]:
data = super().to_dict()
data["load"] = self.load
data["entities_url"] = self.entities_url
if self.load:
data["entities_url"] = self.entities_url
data["index_version"] = self.index_version
data["index_current"] = self.index_version == self.version
if self.ns is not None:
data["namespace"] = True
if "children" not in data:
Expand Down
29 changes: 3 additions & 26 deletions yente/routers/admin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import aiocron # type: ignore
from typing import List
from fastapi import APIRouter, Query
from fastapi import HTTPException
Expand All @@ -8,40 +7,17 @@

from yente import settings
from yente.logs import get_logger
from yente.data import get_catalog, refresh_catalog
from yente.data import get_catalog
from yente.data.common import ErrorResponse, StatusResponse
from yente.data.common import DataCatalogModel, AlgorithmResponse, Algorithm
from yente.search.search import get_index_status
from yente.search.indexer import update_index, update_index_threaded
from yente.search.base import close_es
from yente.search.status import sync_dataset_versions

log = get_logger(__name__)
router = APIRouter()


async def cron_task() -> None:
await refresh_catalog()
if settings.AUTO_REINDEX:
update_index_threaded()


@router.on_event("startup")
async def startup_event() -> None:
log.info(
"Setting up background refresh",
crontab=settings.CRONTAB,
auto_reindex=settings.AUTO_REINDEX,
)
settings.CRON = aiocron.crontab(settings.CRONTAB, func=cron_task)
if settings.AUTO_REINDEX:
update_index_threaded()


@router.on_event("shutdown")
async def shutdown_event() -> None:
await close_es()


@router.get(
"/healthz",
summary="Health check",
Expand Down Expand Up @@ -89,6 +65,7 @@ async def catalog() -> DataCatalogModel:
data sources are included, and how often they should be loaded.
"""
catalog = await get_catalog()
await sync_dataset_versions(catalog)
return DataCatalogModel.model_validate(catalog.to_dict())


Expand Down
27 changes: 27 additions & 0 deletions yente/search/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from yente import settings
from yente.logs import get_logger
from yente.search.base import get_es, close_es
from yente.data.manifest import Catalog

log = get_logger(__name__)


async def sync_dataset_versions(catalog: Catalog) -> None:
es = await get_es()
res = await es.indices.get_alias(name=settings.ENTITY_INDEX)
for aliased_index in res.body.keys():
aliased_end: str = aliased_index[len(settings.ENTITY_INDEX) + 1 :]
dataset_name, index_version = aliased_end.split("-", 1)
version = index_version[len(settings.INDEX_VERSION) :]
dataset = catalog.get(dataset_name)
if dataset is None:
log.warn("Dataset has index but no metadata: %s" % dataset_name)
continue
if version != dataset.version:
log.info(
"Dataset %s is outdated" % dataset_name,
indexed=version,
available=dataset.version,
)
dataset.index_version = version
await close_es()

0 comments on commit 36db979

Please sign in to comment.