Skip to content
This repository has been archived by the owner on Jul 25, 2024. It is now read-only.

Commit

Permalink
ci/models.py: bring pending function to TestJob
Browse files Browse the repository at this point in the history
When ProjectStatus's are computed, there's a call to Build.finished
method that determines whether or not such build is done. Part of this
check counts the number of testjobs with fetched=False. If the count is
zero the build id finished.

Due to the nature of SQUAD, testjobs are processed by multiple parallel
workers and race conditions might happen. A mitigation mechanism SQUAD does
have in place is to acquire a database lock in a testjob being fetched
while it attempts to fetch it from its backend. Once results are retrieved
SQUAD marks the testjob with fetched=True and continue processing it. The
processing includes result parsing and plugins. Some testjobs can get pretty
slow depending on volume of results.

There is a problem with this approach though. If a build has 2 jobs, and
one of them finishes faster than the other (fetching + processing), while
the slow one is already marked as fetched but not yet done with its results,
it will appear that the build is already finished.

With this patch, I change TestJob.job_status to 'Fetching' while still on
database lock and only remove it after plugin processing takes place. Finally
I move the pending check of testjobs from Build.finished method to within TestJobManager
class and add an extra condition which checks if a job is marked as fetched but
still have job_status=Fetching.

This should guarantee that when builds are marked as finished, all testjobs
results will actually be processed within SQUAD.

Signed-off-by: Charles Oliveira <[email protected]>
  • Loading branch information
chaws committed Oct 19, 2023
1 parent 3cf14a6 commit a6bec00
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 2 deletions.
31 changes: 30 additions & 1 deletion squad/ci/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import yaml
from io import StringIO
from django.db import models, transaction, DatabaseError
from django.db.models import Q
from django.utils import timezone
from dateutil.relativedelta import relativedelta

Expand Down Expand Up @@ -66,6 +67,19 @@ def poll(self):
yield test_job

def fetch(self, job_id):
# Job statuses can be one of:
# * None
# * Submitted
# * Scheduling
# * Scheduled
# * Running
# * Complete
# * Incomplete
# * Canceled
# * Fetching
# Only jobs in 'Complete', 'Canceled' and 'Incomplete' are eligible for fetching

job_status = None
with transaction.atomic():
try:
test_job = TestJob.objects.select_for_update(nowait=True).get(pk=job_id)
Expand All @@ -91,6 +105,8 @@ def fetch(self, job_id):
test_job.save()
return

job_status = test_job.job_status
test_job.job_status = 'Fetching'
test_job.fetched = True
test_job.fetched_at = timezone.now()
test_job.save()
Expand Down Expand Up @@ -130,10 +146,16 @@ def fetch(self, job_id):
except DuplicatedTestJob as exception:
logger.error('Failed to fetch test_job(%d): "%s"' % (test_job.id, str(exception)))

if test_job.testrun:
self.__postprocess_testjob__(test_job)

# Removed the 'Fetching' job_status only after eventual plugins
# are finished, this garantees extra tests and metadata to
# be in SQUAD before the build is considered finished
test_job.job_status = job_status
test_job.save()

if test_job.testrun:
self.__postprocess_testjob__(test_job)
UpdateProjectStatus()(test_job.testrun)

def __postprocess_testjob__(self, test_job):
Expand Down Expand Up @@ -177,9 +199,16 @@ def __str__(self):
return '%s (%s)' % (self.name, self.implementation_type)


class TestJobManager(models.Manager):

def pending(self):
return self.filter(Q(fetched=False) | Q(job_status='Fetching'))


class TestJob(models.Model):

__test__ = False
objects = TestJobManager()

# input - internal
backend = models.ForeignKey(Backend, related_name='test_jobs', on_delete=models.CASCADE)
Expand Down
2 changes: 1 addition & 1 deletion squad/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ def finished(self):
# dependency on squad.ci, what in theory violates our architecture.
testjobs = self.test_jobs
if testjobs.count() > 0:
if testjobs.filter(fetched=False).count() > 0:
if testjobs.pending().count() > 0:
# a build that has pending CI jobs is NOT finished
reasons.append("There are unfinished CI jobs")
else:
Expand Down
80 changes: 80 additions & 0 deletions test/ci/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from squad.ci import models
from squad.core import models as core_models
from squad.core.tasks import ReceiveTestRun
from squad.ci.tasks import poll, fetch, submit
from squad.ci.utils import task_id
from squad.ci.exceptions import SubmissionIssue, TemporarySubmissionIssue
Expand Down Expand Up @@ -164,6 +165,85 @@ def thread(testjob_id):
self.assertEqual(1, fetch_method.call_count)


__sleeping__ = False


class FetchTestRaceConditionWaitAllJobsToBeFetched(TransactionTestCase):
"""
If another testjob for this build is finished, it'll trigger UpdateProjectStatus
which will invoke Build.finished and will see that all testjobs for this build
are finished. Except that if this current test job is still running plugins, like
VTS/CTS which take long time, the build will be considered to be finished
and finishing events will be triggered such as email reports and callbacks
"""

def setUp(self):
group = core_models.Group.objects.create(slug='test')
project = group.projects.create(slug='test')
self.build = project.builds.create(version='test-build')
backend = models.Backend.objects.create()
self.testjob1 = models.TestJob.objects.create(
backend=backend,
target=project,
target_build=self.build,
job_id='job-1',
)
self.testjob2 = models.TestJob.objects.create(
backend=backend,
target=project,
target_build=self.build,
job_id='job-2',
)

def mock_backend_fetch(test_job):
status = ''
completed = True
metadata = {}
tests = {}
metrics = {}
logs = ''
return status, completed, metadata, tests, metrics, logs

def mock_receive_testrun(target, update_project_status):
global __sleeping__
# Let's present job1 takes a bit longer
if __sleeping__ is False:
time.sleep(2)
__sleeping__ = True
return ReceiveTestRun(target, update_project_status=update_project_status)

@tag('skip_sqlite')
@patch('squad.ci.backend.null.Backend.job_url')
@patch('squad.ci.models.ReceiveTestRun', side_effect=mock_receive_testrun)
@patch('squad.ci.backend.null.Backend.fetch', side_effect=mock_backend_fetch)
def test_race_condition_on_fetch(self, fetch_method, mock_receive, mock_url):
mock_url.return_value = "job-url"

def thread(testjob_id):
fetch(testjob_id)
connection.close()

parallel_task_1 = threading.Thread(target=thread, args=(self.testjob1.id,))
parallel_task_2 = threading.Thread(target=thread, args=(self.testjob2.id,))

parallel_task_1.start()
parallel_task_2.start()

time.sleep(1)

self.testjob1.refresh_from_db()
self.testjob2.refresh_from_db()

finished, _ = self.build.finished
self.assertFalse(finished)

parallel_task_1.join()
parallel_task_2.join()

finished, _ = self.build.finished
self.assertTrue(finished)


class SubmitTest(TestCase):

def setUp(self):
Expand Down

0 comments on commit a6bec00

Please sign in to comment.