Skip to content

Commit

Permalink
Update paper index with populate_journal from last indexed date
Browse files Browse the repository at this point in the history
Switch to using indexed timestamp as main scraping timestamp
Added rocketry as dep which requires us to pin pydantic<2 because of Miksus/rocketry#210
  • Loading branch information
sneakers-the-rat committed Jan 6, 2024
1 parent e2b7a28 commit 4c3c6e9
Show file tree
Hide file tree
Showing 14 changed files with 295 additions and 413 deletions.
4 changes: 3 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ LOG_DIR=./logs
HOST="0.0.0.0"
PORT=8000
ENV=dev
CROSSREF_EMAIL=
CROSSREF_EMAIL=
REFRESH_SCHEDULE="0 3 * * *"
REFRESH_THREADS=12
445 changes: 85 additions & 360 deletions poetry.lock

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,23 @@ classifiers = [

[tool.poetry.dependencies]
python = "^3.11"
fastapi = "^0.108.0"
sqlmodel = "^0.0.14"
pydantic-settings = "^2.1.0"
uvicorn = "^0.25.0"
jinja2 = "^3.1.2"
requests = "^2.31.0"
python-multipart = "^0.0.6"
fastapi-rss = "^0.2.2"
alembic = "^1.13.1"
fastapi = ">=0.108.0"
sqlmodel = ">=0.0.14"
#pydantic-settings = ">=2.1.0"
uvicorn = ">=0.25.0"
jinja2 = ">=3.1.2"
requests = ">=2.31.0"
python-multipart = ">=0.0.6"
fastapi-rss = ">=0.2.2"
alembic = ">=1.13.1"
rocketry = ">=2.5.1"
pydantic = {version = "<2", extras = ["dotenv"]} # until rocketry fixes pydantic 2 compatibility - https://github.com/Miksus/rocketry/issues/210
pytest = {version = "^7.4.4", optional = true}
requests-cache = {version = "^1.1.1", optional = true}
coveralls = {version = "^3.3.1", optional = true}
pytest-cov = {version = "^4.1.0", optional = true}
pytest-alembic = {version = "^0.10.7", optional = true}
pytest-timeout = {version = "^2.2.0", optional = true}
celery = "^5.3.6"

[tool.poetry.extras]
tests = ['pytest', 'requests-cache', 'pytest-cov', 'coveralls', 'pytest-alembic', 'pytest-timeout']
Expand Down
25 changes: 22 additions & 3 deletions src/paper_feeds/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

from sqlmodel import select, desc
from sqlmodel import Session
from rocketry import Rocketry
from rocketry.conds import cron

from paper_feeds.config import Config
from paper_feeds.db import create_tables, get_engine, get_session
from paper_feeds.services import crossref
from paper_feeds.services import crossref, refresh
from paper_feeds.models.paper import PaperRead
from paper_feeds.models.rss import PaperRSSFeed
from paper_feeds import models

from fastapi_rss import RSSResponse


# --------------------------------------------------
# FastAPI Module-level objects (since that's how FastAPI likes to work i guess)

app = FastAPI()
config = Config()
Expand All @@ -32,10 +34,27 @@
directory=(Path(__file__).parents[1] / 'templates').resolve()
)

# --------------------------------------------------
# Rocketry does this too...

scheduler = Rocketry(
execution="async"
)

# --------------------------------------------------
# Lifecycle and periodic tasks

@app.on_event("startup")
def on_startup():
create_tables(engine)

@scheduler.task(cron(config.refresh_schedule))
def run_refresh():
refresh.update_feeds()

# --------------------------------------------------
# Routes

@app.get('/', response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse('pages/index.html', {"request": request})
Expand Down
27 changes: 21 additions & 6 deletions src/paper_feeds/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from typing import Optional, Literal
from pathlib import Path
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field, DirectoryPath, computed_field, field_validator, model_validator, FieldValidationInfo
from importlib.metadata import version
if version('pydantic').startswith('2'):
from pydantic_settings import BaseSettings, SettingsConfigDict
else:
from pydantic import BaseSettings

class Config(BaseSettings):
model_config = SettingsConfigDict(
env_file='.env',
env_file_encoding='utf-8',
env_prefix="jrss_")
if version('pydantic').startswith('2'):
model_config = SettingsConfigDict(
env_file='.env',
env_file_encoding='utf-8',
env_prefix="paperfeeds_")
else:
class Config:
env_prefix = 'paperfeeds_'
env_file_encoding='utf-8'
env_file='.env'


db: Optional[Path] = Path('./db.sqlite')
"""
Expand All @@ -23,6 +33,11 @@ class Config(BaseSettings):
their API! https://github.com/CrossRef/rest-api-doc#good-manners--more-reliable-service
"""
public_url: str = "http://localhost"
refresh_schedule: str = "0 3 * * *"
"""
Crontab expression to schedule when to refresh feeds. Default is every day at 3am
"""
refresh_threads: int = 12


@property
Expand Down
5 changes: 5 additions & 0 deletions src/paper_feeds/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
class DBMigrationError(RuntimeError):
"""
Our database needs migrations!
"""

class FetchError(RuntimeError):
"""
Something wrong with fetching data!
"""
12 changes: 12 additions & 0 deletions src/paper_feeds/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@

from paper_feeds.config import Config

class Server(uvicorn.Server):
"""
Uvicorn server that also shuts down rocketry
References:
- https://rocketry.readthedocs.io/en/stable/cookbook/fastapi.html
"""
def handle_exit(self, sig: int, frame) -> None:
app_rocketry.session.shut_down()
return super().handle_exit(sig, frame)


def start():
config = Config()
uvicorn.run(
Expand Down
5 changes: 5 additions & 0 deletions src/paper_feeds/models/journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,8 @@ class ISSNRead(ISSNBase):

class ISSNCreate(ISSNBase):
pass

from importlib import metadata
if not metadata.version('pydantic').startswith('2'):
JournalCreate.update_forward_refs()
JournalRead.update_forward_refs()
2 changes: 1 addition & 1 deletion src/paper_feeds/models/paper.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class Paper(PaperBase, table=True):
# update-to: Optional['Paper']

class PaperRead(PaperBase):
pass
id: int

class PaperCreate(PaperBase):

Expand Down
99 changes: 78 additions & 21 deletions src/paper_feeds/services/crossref.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import pdb
from typing import Optional, List, Generator, overload, Literal
from datetime import datetime
from datetime import datetime, timezone

import requests
from sqlmodel import Session, select
from sqlmodel import Session, select, desc
from sqlalchemy.orm import selectinload

from paper_feeds import Config
from paper_feeds.models.journal import JournalCreate, Journal, JournalRead, ISSN
from paper_feeds.models.paper import PaperCreate, Paper
from paper_feeds.models.paper import PaperCreate, Paper, PaperRead
from paper_feeds.db import get_engine
from paper_feeds import init_logger
from paper_feeds.exceptions import FetchError


CROSSREF_API_URL = 'https://api.crossref.org/'
Expand Down Expand Up @@ -98,6 +99,8 @@ def load_journal(issn: str) -> JournalRead:
).join(ISSN
).where(ISSN.value == issn)
db_journal = session.exec(read_statement).first()
if db_journal is None:
raise ValueError(f"Journal with issn {issn} is not in the database!")
journal = JournalRead.model_validate(db_journal)
return journal

Expand All @@ -110,7 +113,7 @@ def fetch_paper_page(
issn:str,
rows: int = 100,
offset: int = 0,
since_date: Optional[datetime] = None,
from_index_date: Optional[datetime] = None,
clean:bool = True,
**kwargs
) -> list[PaperCreate] | dict:
Expand All @@ -120,36 +123,40 @@ def fetch_paper_page(
issn (str): ISSN of journal (any ISSN for a given journal gives the same results)
rows (int): Number of items to fetch
offset (int): Number of items to offset from the most recent (sorted by published date)
since_date (:class:`datetime.datetime`): Optional: Get papers only published after this date
from_index_date (:class:`datetime.datetime`): Optional: Get papers only published after this date
clean (bool): If ``True`` (default), cast as :class:`.PaperCreate` before returning.
Otherwise, return raw result from the `GET` request. Useful mostly for testing
"""
params = {
'sort': 'published',
'sort': 'indexed',
'order': 'desc',
'rows': rows,
'offset': offset,
'select': ','.join(PaperCreate.crossref_select),
}
# explicitly passed kwargs override defaults
params.update(kwargs)
if since_date:
params['from-pub-date'] = since_date.strftime('%y-%m-%d')
if from_index_date:
params['filter'] = 'from-index-date:' + from_index_date.strftime('%Y-%m-%d')

res = crossref_get(
f'journals/{issn}/works',
params = params
)
res_json = res.json()
if res_json['status'] != 'ok':
raise FetchError(f"Error fetching from crossref! Got error message back:\n{res_json['message']}")

if clean:
return _clean_paper_page(res.json())
return _clean_paper_page(res_json)
else:
return res.json()
return res_json

def _clean_paper_page(res: dict) -> list[PaperCreate]:
"""Making a separate function in case we need to do some filtering here"""
return [PaperCreate.from_crossref(item) for item in res['message']['items'] if item.get('type', None) in PAPER_TYPES]

def store_papers(papers: list[PaperCreate], issn: str) -> list[Paper]:
def store_papers(papers: list[PaperCreate], issn: str) -> list[PaperRead]:
engine = get_engine()
ret = []
with Session(engine) as session:
Expand All @@ -176,45 +183,95 @@ def store_papers(papers: list[PaperCreate], issn: str) -> list[Paper]:
session.add(store_paper)
session.commit()
session.refresh(store_paper)
ret.append(store_paper)
# TODO: get the JournalRead object too
read = PaperRead.model_validate(store_paper)
ret.append(read)

return ret

def fetch_papers(issn: str, limit: int = 1000, rows=100) -> Generator[list[Paper],None, None]:
def fetch_papers(issn: str, limit: int = 1000, rows:int=100, offset:int=0, **kwargs) -> Generator[list[Paper],None, None]:
# get the most recent paper to subset paging
# then get pages and write them as we get the pages
# then return the completed sql models

# TODO: Only get papers since the last time we updated
got_papers = fetch_paper_page(issn, rows)
# then return the completed sql models=
got_papers = fetch_paper_page(issn, rows, offset, **kwargs)
stored_papers = store_papers(got_papers, issn)
yield stored_papers

n_papers = len(got_papers)
while n_papers < limit and len(got_papers) == rows:
get_rows = min(limit-n_papers, rows)
got_papers = fetch_paper_page(issn, get_rows, n_papers)
got_papers = fetch_paper_page(issn, get_rows, offset+n_papers, **kwargs)
stored_papers = store_papers(got_papers, issn)
n_papers += len(got_papers)
yield stored_papers

def populate_papers(issn: str, limit: int = 1000, rows=100):
def populate_papers(
issn: str,
limit: int = 1000,
rows=100,
force:bool=False,
return_papers:bool=False,
**kwargs) -> Optional[list[Paper]]:
"""
Background task for :func:`.fetch_papers`.
Background task for :func:`.fetch_papers`
By default, only get papers newer than the last indexed date for this issn
Args:
issn (str): ISSN of journal to fetch for
limit (int): Total number of papers to fetch
rows (int): Number of papers to fetch in each page
force (bool): if ``True``, ignore previously stored papers and fetch everything
return_papers (bool): If ``True`` , Return fetched papers. If ``False``, return ``None`` - mostly useful for testing,
since everywhere else :func:`.fetch_papers` is more useful
**kwargs:
Returns:
"""
logger = init_logger()
logger.debug('fetching papers for ISSN %s', issn)
fetcher = fetch_papers(issn, limit, rows)

rows = min(rows, limit)

all_papers = []
if force:
update_from = None
else:
update_from = last_indexed(issn)
fetcher = fetch_papers(issn, limit, rows, from_index_date=update_from, **kwargs)
fetched = 0

while True:
try:
papers = next(fetcher)
fetched += len(papers)
if return_papers:
all_papers.extend(papers)
logger.debug('fetched %d papers', fetched)
except StopIteration:
break

logger.debug('completed paper fetch for %s', issn)
if return_papers:
return all_papers


def last_indexed(issn:str) -> Optional[datetime]:
"""
Get the last indexed timestamp from the most recent paper for a given issn.
"""
# first try and get the most recent paper from this ISSN, if we have any
engine = get_engine()
with Session(engine) as session:
journal = load_journal(issn)
existing_statement = select(Paper).join(Journal
).where(Paper.journal_id == journal.id
).order_by(desc(Paper.indexed)
).limit(1)
existing = session.exec(existing_statement).first()
if existing is None:
most_recent = None
else:
most_recent = existing.indexed
return most_recent
9 changes: 0 additions & 9 deletions src/paper_feeds/services/db.py

This file was deleted.

Loading

0 comments on commit 4c3c6e9

Please sign in to comment.