Skip to content

Commit 4b05608

Browse files
committed
global: support Jinja templating for job args
* Closes #36.
1 parent 531bc14 commit 4b05608

File tree

3 files changed

+73
-16
lines changed

3 files changed

+73
-16
lines changed

invenio_jobs/models.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from sqlalchemy_utils.types import ChoiceType, JSONType, UUIDType
2424
from werkzeug.utils import cached_property
2525

26+
from .utils import eval_cfg_value, walk_dict_values
27+
2628
JSON = (
2729
db.JSON()
2830
.with_variant(postgresql.JSONB(none_as_null=True), "postgresql")
@@ -44,12 +46,17 @@ class Job(db.Model, Timestamp):
4446
default_args = db.Column(JSON, default=lambda: dict(), nullable=True)
4547
schedule = db.Column(JSON, nullable=True)
4648

47-
# TODO: See if we move this to an API class
4849
@property
4950
def last_run(self):
5051
"""Last run of the job."""
5152
return self.runs.order_by(Run.created.desc()).first()
5253

54+
def last_run_with_status(self, status):
55+
"""Last run of the job with a specific status."""
56+
return (
57+
self.runs.filter(Run.status == status).order_by(Run.created.desc()).first()
58+
)
59+
5360
@property
5461
def parsed_schedule(self):
5562
"""Return schedule parsed as crontab or timedelta."""
@@ -63,6 +70,13 @@ def parsed_schedule(self):
6370
elif stype == "interval":
6471
return timedelta(**schedule)
6572

73+
def render_args(self, **ctx):
74+
"""Render job arguments."""
75+
args = deepcopy(self.default_args)
76+
ctx.setdefault("job", self)
77+
walk_dict_values(args, lambda val: eval_cfg_value(val, ctx))
78+
return args
79+
6680

6781
class RunStatusEnum(enum.Enum):
6882
"""Enumeration of a run's possible states."""

invenio_jobs/services/scheduler.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
import traceback
1111
import uuid
12-
from typing import Any
1312

1413
from celery.beat import ScheduleEntry, Scheduler, logger
1514
from invenio_db import db
16-
from sqlalchemy import and_
1715

18-
from invenio_jobs.models import Job, Run, Task
16+
from invenio_jobs.models import Job, Run
1917
from invenio_jobs.tasks import execute_run
2018

2119

@@ -49,27 +47,23 @@ class RunScheduler(Scheduler):
4947
Entry = JobEntry
5048
entries = {}
5149

52-
def __init__(self, *args: Any, **kwargs: Any) -> None:
53-
"""Initialize the database scheduler."""
54-
super().__init__(*args, **kwargs)
55-
5650
@property
5751
def schedule(self):
5852
"""Get currently scheduled entries."""
5953
return self.entries
6054

61-
# Celery internal override
55+
#
56+
# Celery overrides
57+
#
6258
def setup_schedule(self):
6359
"""Setup schedule."""
6460
self.sync()
6561

66-
# Celery internal override
6762
def reserve(self, entry):
6863
"""Update entry to next run execution time."""
6964
new_entry = self.schedule[entry.job.id] = next(entry)
7065
return new_entry
7166

72-
# Celery internal override
7367
def apply_entry(self, entry, producer=None):
7468
"""Create and apply a JobEntry."""
7569
with self.app.flask_app.app_context():
@@ -93,24 +87,27 @@ def apply_entry(self, entry, producer=None):
9387
else:
9488
logger.debug("%s sent.", entry.task)
9589

96-
# Celery internal override
9790
def sync(self):
9891
"""Sync Jobs from db to the scheduler."""
9992
# TODO Should we also have a cleaup task for runs? "stale" run (status running, starttime > hour, Run pending for > 1 hr)
10093
with self.app.flask_app.app_context():
10194
jobs = Job.query.filter(
102-
and_(Job.active == True, Job.schedule != None)
103-
).all()
95+
Job.active.is_(True),
96+
Job.schedule.isnot(None),
97+
)
10498
self.entries = {} # because some jobs might be deactivated
10599
for job in jobs:
106100
self.entries[job.id] = JobEntry.from_job(job)
107101

102+
#
103+
# Helpers
104+
#
108105
def create_run(self, entry):
109106
"""Create run from a JobEntry."""
110-
job = Job.query.filter_by(id=entry.job.id).one()
107+
job = Job.query.get(entry.job.id)
111108
run = Run(
112109
job=job,
113-
args=job.default_args,
110+
args=job.render_args(),
114111
queue=job.default_queue,
115112
task_id=uuid.uuid4(),
116113
)

invenio_jobs/utils.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2024 CERN.
4+
#
5+
# Invenio-Jobs is free software; you can redistribute it and/or modify it
6+
# under the terms of the MIT License; see LICENSE file for more details.
7+
8+
"""Utilities."""
9+
10+
import ast
11+
12+
from jinja2.sandbox import SandboxedEnvironment
13+
14+
jinja_env = SandboxedEnvironment()
15+
16+
17+
def eval_cfg_value(val, ctx):
18+
tpl = jinja_env.from_string(val)
19+
res = tpl.render(**ctx)
20+
21+
try:
22+
res = ast.literal_eval(res)
23+
except Exception:
24+
pass
25+
26+
return res
27+
28+
29+
def walk_dict_values(obj, transform_fn):
30+
for key, val in obj.items():
31+
if isinstance(val, str):
32+
try:
33+
obj[key] = transform_fn(val)
34+
except Exception:
35+
pass
36+
elif isinstance(val, dict):
37+
walk_dict_values(val, transform_fn)
38+
elif isinstance(val, list):
39+
for i, v in enumerate(val):
40+
if isinstance(v, dict):
41+
walk_dict_values(v, transform_fn)
42+
elif isinstance(v, str):
43+
try:
44+
val[i] = transform_fn(v)
45+
except Exception:
46+
pass

0 commit comments

Comments
 (0)