Skip to content

Commit

Permalink
Merge pull request #4 from watchdogpolska/task_log
Browse files Browse the repository at this point in the history
v1.3.01 - completed task result
  • Loading branch information
PiotrIw authored Jun 19, 2024
2 parents 56ee3fd + 3aad2e2 commit 43da1ba
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[bumpversion]
commit = True
tag = True
current_version = 1.2.10
current_version = 1.3.01
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-(?P<release>[a-z]+))?
serialize =
{major}.{minor}.{patch}-{release}
Expand Down
27 changes: 27 additions & 0 deletions .github/workflows/module.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Python package

on:
push:
branches:
- master
pull_request:

jobs:
module_tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
lfs: true
- name: Build application
run: docker-compose build app
env:
PYTHON_VERSION: "3.12"
DJANGO_VERSION: "4.2"
- name: Show installed dependencies
run: docker-compose run app pip list
- name: Test application
run: docker-compose run app tox
- name: Show docker process
run: docker ps
if: always()
23 changes: 23 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "tox tests debug",
"type": "debugpy",
"request": "launch",
"program": "/usr/local/bin/tox",
"args": [
"exec",
"-e",
"py312-django42",
"--",
"python",
"./runtests.py"
], // Add any tox arguments here
"console": "integratedTerminal"
}
]
}
2 changes: 1 addition & 1 deletion background_task/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
__version__ = "1.2.10"
__version__ = "1.3.01"

default_app_config = "background_task.apps.BackgroundTasksAppConfig"

Expand Down
18 changes: 18 additions & 0 deletions background_task/migrations/0005_completedtask_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.13 on 2024-06-19 15:09

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('background_task', '0004_auto_20220202_1721'),
]

operations = [
migrations.AddField(
model_name='completedtask',
name='result',
field=models.TextField(blank=True, null=True),
),
]
141 changes: 81 additions & 60 deletions background_task/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from django.db import models
from django.db.models import Q
from django.utils import timezone
from six import python_2_unicode_compatible

from background_task.exceptions import InvalidTaskError
from background_task.settings import app_settings
Expand Down Expand Up @@ -50,14 +49,17 @@ def find_available(self, queue=None):
if queue:
qs = qs.filter(queue=queue)
ready = qs.filter(run_at__lte=now, failed_at=None)
_priority_ordering = '{}priority'.format(app_settings.BACKGROUND_TASK_PRIORITY_ORDERING)
ready = ready.order_by(_priority_ordering, 'run_at')
_priority_ordering = "{}priority".format(
app_settings.BACKGROUND_TASK_PRIORITY_ORDERING
)
ready = ready.order_by(_priority_ordering, "run_at")

if app_settings.BACKGROUND_TASK_RUN_ASYNC:
currently_failed = self.failed().count()
currently_locked = self.locked(now).count()
count = app_settings.BACKGROUND_TASK_ASYNC_THREADS - \
(currently_locked - currently_failed)
count = app_settings.BACKGROUND_TASK_ASYNC_THREADS - (
currently_locked - currently_failed
)
if count > 0:
ready = ready[:count]
else:
Expand Down Expand Up @@ -86,10 +88,20 @@ def failed(self):
qs = self.get_queryset()
return qs.filter(failed_at__isnull=False)

def new_task(self, task_name, args=None, kwargs=None,
run_at=None, priority=0, queue=None, verbose_name=None,
creator=None, repeat=None, repeat_until=None,
remove_existing_tasks=False):
def new_task(
self,
task_name,
args=None,
kwargs=None,
run_at=None,
priority=0,
queue=None,
verbose_name=None,
creator=None,
repeat=None,
repeat_until=None,
remove_existing_tasks=False,
):
"""
If `remove_existing_tasks` is True, all unlocked tasks with the identical task hash will be removed.
The attributes `repeat` and `repeat_until` are not supported at the moment.
Expand All @@ -100,35 +112,35 @@ def new_task(self, task_name, args=None, kwargs=None,
run_at = timezone.now()
task_params = json.dumps((args, kwargs), sort_keys=True)
s = "%s%s" % (task_name, task_params)
task_hash = sha1(s.encode('utf-8')).hexdigest()
task_hash = sha1(s.encode("utf-8")).hexdigest()
if remove_existing_tasks:
Task.objects.filter(task_hash=task_hash, locked_at__isnull=True).delete()
return Task(task_name=task_name,
task_params=task_params,
task_hash=task_hash,
priority=priority,
run_at=run_at,
queue=queue,
verbose_name=verbose_name,
creator=creator,
repeat=repeat or Task.NEVER,
repeat_until=repeat_until,
)
return Task(
task_name=task_name,
task_params=task_params,
task_hash=task_hash,
priority=priority,
run_at=run_at,
queue=queue,
verbose_name=verbose_name,
creator=creator,
repeat=repeat or Task.NEVER,
repeat_until=repeat_until,
)

def get_task(self, task_name, args=None, kwargs=None):
args = args or ()
kwargs = kwargs or {}
task_params = json.dumps((args, kwargs), sort_keys=True)
s = "%s%s" % (task_name, task_params)
task_hash = sha1(s.encode('utf-8')).hexdigest()
task_hash = sha1(s.encode("utf-8")).hexdigest()
qs = self.get_queryset()
return qs.filter(task_hash=task_hash)

def drop_task(self, task_name, args=None, kwargs=None):
return self.get_task(task_name, args, kwargs).delete()


@python_2_unicode_compatible
class Task(models.Model):
# the "name" of the task/function to be run
task_name = models.CharField(max_length=190, db_index=True)
Expand All @@ -153,19 +165,18 @@ class Task(models.Model):
EVERY_4_WEEKS = 4 * WEEKLY
NEVER = 0
REPEAT_CHOICES = (
(HOURLY, 'hourly'),
(DAILY, 'daily'),
(WEEKLY, 'weekly'),
(EVERY_2_WEEKS, 'every 2 weeks'),
(EVERY_4_WEEKS, 'every 4 weeks'),
(NEVER, 'never'),
(HOURLY, "hourly"),
(DAILY, "daily"),
(WEEKLY, "weekly"),
(EVERY_2_WEEKS, "every 2 weeks"),
(EVERY_4_WEEKS, "every 4 weeks"),
(NEVER, "never"),
)
repeat = models.BigIntegerField(choices=REPEAT_CHOICES, default=NEVER)
repeat_until = models.DateTimeField(null=True, blank=True)

# the "name" of the queue this is to be run on
queue = models.CharField(max_length=190, db_index=True,
null=True, blank=True)
queue = models.CharField(max_length=190, db_index=True, null=True, blank=True)

# how many times the task has been tried
attempts = models.IntegerField(default=0, db_index=True)
Expand All @@ -175,16 +186,18 @@ class Task(models.Model):
last_error = models.TextField(blank=True)

# details of who's trying to run the task at the moment
locked_by = models.CharField(max_length=64, db_index=True,
null=True, blank=True)
locked_by = models.CharField(max_length=64, db_index=True, null=True, blank=True)
locked_at = models.DateTimeField(db_index=True, null=True, blank=True)

creator_content_type = models.ForeignKey(
ContentType, null=True, blank=True,
related_name='background_task', on_delete=models.CASCADE
ContentType,
null=True,
blank=True,
related_name="background_task",
on_delete=models.CASCADE,
)
creator_object_id = models.PositiveIntegerField(null=True, blank=True)
creator = GenericForeignKey('creator_content_type', 'creator_object_id')
creator = GenericForeignKey("creator_content_type", "creator_object_id")

objects = TaskManager()

Expand All @@ -201,13 +214,15 @@ def locked_by_pid_running(self):
return False
else:
return None

locked_by_pid_running.boolean = True

def has_error(self):
"""
Check if the last_error field is empty.
"""
return bool(self.last_error)

has_error.boolean = True

def params(self):
Expand Down Expand Up @@ -241,33 +256,36 @@ def is_repeating_task(self):
return self.repeat > self.NEVER

def reschedule(self, type, err, traceback):
'''
"""
Set a new time to run the task in the future, or create a CompletedTask and delete the Task
if it has reached the maximum of allowed attempts
'''
"""
self.last_error = self._extract_error(type, err, traceback)
self.increment_attempts()
if self.has_reached_max_attempts() or isinstance(err, InvalidTaskError):
self.failed_at = timezone.now()
logger.warning('Marking task %s as failed\n', self)
logger.warning("Marking task %s as failed\n", self)
completed = self.create_completed_task()
task_failed.send(sender=self.__class__, task_id=self.id, completed_task=completed)
task_failed.send(
sender=self.__class__, task_id=self.id, completed_task=completed
)
self.delete()
else:
backoff_multiplier = app_settings.BACKGROUND_TASK_BACKOFF_MULTIPLIER
backoff = timedelta(seconds=int(self.attempts ** backoff_multiplier) + 5)
backoff = timedelta(seconds=int(self.attempts**backoff_multiplier) + 5)
self.run_at = timezone.now() + backoff
logger.warning('Rescheduling task %s for %s later at %s\n', self,
backoff, self.run_at)
logger.warning(
"Rescheduling task %s for %s later at %s\n", self, backoff, self.run_at
)
task_rescheduled.send(sender=self.__class__, task=self)
self.locked_by = None
self.locked_at = None
self.save()

def create_completed_task(self):
'''
def create_completed_task(self, result=None):
"""
Returns a new CompletedTask instance with the same values
'''
"""
completed_task = CompletedTask(
task_name=self.task_name,
task_params=self.task_params,
Expand All @@ -278,6 +296,7 @@ def create_completed_task(self):
attempts=self.attempts,
failed_at=self.failed_at,
last_error=self.last_error,
result=result,
locked_by=self.locked_by,
locked_at=self.locked_at,
verbose_name=self.verbose_name,
Expand Down Expand Up @@ -325,13 +344,10 @@ def save(self, *arg, **kw):
return super(Task, self).save(*arg, **kw)

def __str__(self):
return u'{}'.format(self.verbose_name or self.task_name)
return "{}".format(self.verbose_name or self.task_name)

class Meta:
db_table = 'background_task'



db_table = "background_task"


class CompletedTaskQuerySet(models.QuerySet):
Expand Down Expand Up @@ -374,7 +390,6 @@ def succeeded(self, within=None):
return qs


@python_2_unicode_compatible
class CompletedTask(models.Model):
# the "name" of the task/function to be run
task_name = models.CharField(max_length=190, db_index=True)
Expand All @@ -394,8 +409,7 @@ class CompletedTask(models.Model):
repeat_until = models.DateTimeField(null=True, blank=True)

# the "name" of the queue this is to be run on
queue = models.CharField(max_length=190, db_index=True,
null=True, blank=True)
queue = models.CharField(max_length=190, db_index=True, null=True, blank=True)

# how many times the task has been tried
attempts = models.IntegerField(default=0, db_index=True)
Expand All @@ -404,17 +418,22 @@ class CompletedTask(models.Model):
# details of the error that occurred
last_error = models.TextField(blank=True)

# task resut
result = models.TextField(blank=True, null=True)

# details of who's trying to run the task at the moment
locked_by = models.CharField(max_length=64, db_index=True,
null=True, blank=True)
locked_by = models.CharField(max_length=64, db_index=True, null=True, blank=True)
locked_at = models.DateTimeField(db_index=True, null=True, blank=True)

creator_content_type = models.ForeignKey(
ContentType, null=True, blank=True,
related_name='completed_background_task', on_delete=models.CASCADE
ContentType,
null=True,
blank=True,
related_name="completed_background_task",
on_delete=models.CASCADE,
)
creator_object_id = models.PositiveIntegerField(null=True, blank=True)
creator = GenericForeignKey('creator_content_type', 'creator_object_id')
creator = GenericForeignKey("creator_content_type", "creator_object_id")

objects = CompletedTaskQuerySet.as_manager()

Expand All @@ -431,17 +450,19 @@ def locked_by_pid_running(self):
return False
else:
return None

locked_by_pid_running.boolean = True

def has_error(self):
"""
Check if the last_error field is empty.
"""
return bool(self.last_error)

has_error.boolean = True

def __str__(self):
return u'{} - {}'.format(
return "{} - {}".format(
self.verbose_name or self.task_name,
self.run_at,
)
Loading

0 comments on commit 43da1ba

Please sign in to comment.