From a6bec0089cad8de2e49e57187a57911785f80680 Mon Sep 17 00:00:00 2001 From: Charles Oliveira Date: Thu, 19 Oct 2023 05:44:39 -0300 Subject: [PATCH] ci/models.py: bring pending function to TestJob When ProjectStatus's are computed, there's a call to Build.finished method that determines whether or not such build is done. Part of this check counts the number of testjobs with fetched=False. If the count is zero the build id finished. Due to the nature of SQUAD, testjobs are processed by multiple parallel workers and race conditions might happen. A mitigation mechanism SQUAD does have in place is to acquire a database lock in a testjob being fetched while it attempts to fetch it from its backend. Once results are retrieved SQUAD marks the testjob with fetched=True and continue processing it. The processing includes result parsing and plugins. Some testjobs can get pretty slow depending on volume of results. There is a problem with this approach though. If a build has 2 jobs, and one of them finishes faster than the other (fetching + processing), while the slow one is already marked as fetched but not yet done with its results, it will appear that the build is already finished. With this patch, I change TestJob.job_status to 'Fetching' while still on database lock and only remove it after plugin processing takes place. Finally I move the pending check of testjobs from Build.finished method to within TestJobManager class and add an extra condition which checks if a job is marked as fetched but still have job_status=Fetching. This should guarantee that when builds are marked as finished, all testjobs results will actually be processed within SQUAD. Signed-off-by: Charles Oliveira --- squad/ci/models.py | 31 ++++++++++++++++- squad/core/models.py | 2 +- test/ci/test_tasks.py | 80 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 2 deletions(-) diff --git a/squad/ci/models.py b/squad/ci/models.py index 22bd2f4ca..acd86ced7 100644 --- a/squad/ci/models.py +++ b/squad/ci/models.py @@ -4,6 +4,7 @@ import yaml from io import StringIO from django.db import models, transaction, DatabaseError +from django.db.models import Q from django.utils import timezone from dateutil.relativedelta import relativedelta @@ -66,6 +67,19 @@ def poll(self): yield test_job def fetch(self, job_id): + # Job statuses can be one of: + # * None + # * Submitted + # * Scheduling + # * Scheduled + # * Running + # * Complete + # * Incomplete + # * Canceled + # * Fetching + # Only jobs in 'Complete', 'Canceled' and 'Incomplete' are eligible for fetching + + job_status = None with transaction.atomic(): try: test_job = TestJob.objects.select_for_update(nowait=True).get(pk=job_id) @@ -91,6 +105,8 @@ def fetch(self, job_id): test_job.save() return + job_status = test_job.job_status + test_job.job_status = 'Fetching' test_job.fetched = True test_job.fetched_at = timezone.now() test_job.save() @@ -130,10 +146,16 @@ def fetch(self, job_id): except DuplicatedTestJob as exception: logger.error('Failed to fetch test_job(%d): "%s"' % (test_job.id, str(exception))) + if test_job.testrun: + self.__postprocess_testjob__(test_job) + + # Removed the 'Fetching' job_status only after eventual plugins + # are finished, this garantees extra tests and metadata to + # be in SQUAD before the build is considered finished + test_job.job_status = job_status test_job.save() if test_job.testrun: - self.__postprocess_testjob__(test_job) UpdateProjectStatus()(test_job.testrun) def __postprocess_testjob__(self, test_job): @@ -177,9 +199,16 @@ def __str__(self): return '%s (%s)' % (self.name, self.implementation_type) +class TestJobManager(models.Manager): + + def pending(self): + return self.filter(Q(fetched=False) | Q(job_status='Fetching')) + + class TestJob(models.Model): __test__ = False + objects = TestJobManager() # input - internal backend = models.ForeignKey(Backend, related_name='test_jobs', on_delete=models.CASCADE) diff --git a/squad/core/models.py b/squad/core/models.py index 80ef1f95a..7a6576501 100644 --- a/squad/core/models.py +++ b/squad/core/models.py @@ -620,7 +620,7 @@ def finished(self): # dependency on squad.ci, what in theory violates our architecture. testjobs = self.test_jobs if testjobs.count() > 0: - if testjobs.filter(fetched=False).count() > 0: + if testjobs.pending().count() > 0: # a build that has pending CI jobs is NOT finished reasons.append("There are unfinished CI jobs") else: diff --git a/test/ci/test_tasks.py b/test/ci/test_tasks.py index 5cff82803..19c10a8c7 100644 --- a/test/ci/test_tasks.py +++ b/test/ci/test_tasks.py @@ -10,6 +10,7 @@ from squad.ci import models from squad.core import models as core_models +from squad.core.tasks import ReceiveTestRun from squad.ci.tasks import poll, fetch, submit from squad.ci.utils import task_id from squad.ci.exceptions import SubmissionIssue, TemporarySubmissionIssue @@ -164,6 +165,85 @@ def thread(testjob_id): self.assertEqual(1, fetch_method.call_count) +__sleeping__ = False + + +class FetchTestRaceConditionWaitAllJobsToBeFetched(TransactionTestCase): + """ + If another testjob for this build is finished, it'll trigger UpdateProjectStatus + which will invoke Build.finished and will see that all testjobs for this build + are finished. Except that if this current test job is still running plugins, like + VTS/CTS which take long time, the build will be considered to be finished + and finishing events will be triggered such as email reports and callbacks + """ + + def setUp(self): + group = core_models.Group.objects.create(slug='test') + project = group.projects.create(slug='test') + self.build = project.builds.create(version='test-build') + backend = models.Backend.objects.create() + self.testjob1 = models.TestJob.objects.create( + backend=backend, + target=project, + target_build=self.build, + job_id='job-1', + ) + self.testjob2 = models.TestJob.objects.create( + backend=backend, + target=project, + target_build=self.build, + job_id='job-2', + ) + + def mock_backend_fetch(test_job): + status = '' + completed = True + metadata = {} + tests = {} + metrics = {} + logs = '' + return status, completed, metadata, tests, metrics, logs + + def mock_receive_testrun(target, update_project_status): + global __sleeping__ + # Let's present job1 takes a bit longer + if __sleeping__ is False: + time.sleep(2) + __sleeping__ = True + return ReceiveTestRun(target, update_project_status=update_project_status) + + @tag('skip_sqlite') + @patch('squad.ci.backend.null.Backend.job_url') + @patch('squad.ci.models.ReceiveTestRun', side_effect=mock_receive_testrun) + @patch('squad.ci.backend.null.Backend.fetch', side_effect=mock_backend_fetch) + def test_race_condition_on_fetch(self, fetch_method, mock_receive, mock_url): + mock_url.return_value = "job-url" + + def thread(testjob_id): + fetch(testjob_id) + connection.close() + + parallel_task_1 = threading.Thread(target=thread, args=(self.testjob1.id,)) + parallel_task_2 = threading.Thread(target=thread, args=(self.testjob2.id,)) + + parallel_task_1.start() + parallel_task_2.start() + + time.sleep(1) + + self.testjob1.refresh_from_db() + self.testjob2.refresh_from_db() + + finished, _ = self.build.finished + self.assertFalse(finished) + + parallel_task_1.join() + parallel_task_2.join() + + finished, _ = self.build.finished + self.assertTrue(finished) + + class SubmitTest(TestCase): def setUp(self):