From 6805375948b033378dad6dac6c8118ae68aa47f8 Mon Sep 17 00:00:00 2001 From: Charles Oliveira Date: Wed, 6 Dec 2023 09:24:41 -0300 Subject: [PATCH] ci/tasks.py: offload testjob post processing to its own task The reason for having this is for deployments of SQUAD on auto-scalable systems such as Kubernetes. When the load in SQUAD is high, Kubernetes creates new replicas of workers to consume from the queue. When the load is back to low, Kubernetes starts trimming workers no longer being used. There is a very specific corner case with this approach though. When Kubernetes trims a worker, it sends SIGTERM to it and wait 30s by default for the worker to self terminate. In Linaro's deployment of SQUAD, there is a particular kind of test job that comes from Android CTS/VTS. They are huge and take a lot more than 30s to finish. If the worker is not finished by the 30s mark, Kubernetes sends SIGKILL to it and it dies abruptly, causing inconsistencies. Yes we can increase the 30s timeout, but if SQUAD is under heavy load, increasing the timeout might still cause inconsistency if the worker doesn't self terminate in that timeout. The solution fo this problem is the creation of a new queue called 'ci_fetch_postprocess'. Deployments with great load should then create a different kind of worker that never dies and does not auto-scale, thus eliminating the problem completely. Tasks in 'ci_fetch_postprocess' are the plugin ones, which are the culprit of the issue. --- squad/ci/models.py | 21 +++++++++++---------- squad/ci/tasks.py | 22 ++++++++++++++++++++++ squad/settings.py | 1 + test/ci/test_models.py | 4 ++-- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/squad/ci/models.py b/squad/ci/models.py index 702952ce..78a2b947 100644 --- a/squad/ci/models.py +++ b/squad/ci/models.py @@ -143,17 +143,18 @@ def fetch(self, job_id): except DuplicatedTestJob as exception: logger.error('Failed to fetch test_job(%d): "%s"' % (test_job.id, str(exception))) - if test_job.testrun: - self.__postprocess_testjob__(test_job) - - # Remove the 'Fetching' job_status only after eventual plugins - # are finished, this garantees extra tests and metadata to - # be in SQUAD before the build is considered finished - test_job.job_status = status - test_job.save() + if test_job.testrun and test_job.target.enabled_plugins and any(test_job.target.enabled_plugins): + # Avoids cyclic import errors + from squad.ci.tasks import postprocess_testjob + test_job.save() + postprocess_testjob.delay(test_job.id, status) + else: + # Remove the 'Fetching' job_status only after all work is done + test_job.job_status = status + test_job.save() - if test_job.testrun: - UpdateProjectStatus()(test_job.testrun) + if test_job.testrun: + UpdateProjectStatus()(test_job.testrun) def __postprocess_testjob__(self, test_job): project = test_job.target diff --git a/squad/ci/tasks.py b/squad/ci/tasks.py index 0640859c..b7d73aa3 100644 --- a/squad/ci/tasks.py +++ b/squad/ci/tasks.py @@ -1,4 +1,5 @@ from squad.celery import app as celery +from squad.core.tasks import UpdateProjectStatus from squad.ci.models import Backend, TestJob from squad.ci.exceptions import SubmissionIssue from squad.ci.utils import task_id @@ -89,3 +90,24 @@ def send_testjob_resubmit_admin_email(job_id, resubmitted_job_id): if test_job.target.html_mail: message.attach_alternative(html_message, "text/html") message.send() + + +@celery.task +def postprocess_testjob(job_id, status): + logger.info("postprocessing %s" % job_id) + testjob = TestJob.objects.get(pk=job_id) + backend = testjob.backend + backend.__postprocess_testjob__(testjob) + + # Some plugins, like tradefed, can still trigger sub-tasks creating tests, + # ending the current thread, thus triggering build-finished events + # we need a way to tell the plugins to wait for all sub-tasks to be finished + + # Remove the 'Fetching' job_status only after eventual plugins + # are finished, this garantees extra tests and metadata to + # be in SQUAD before the build is considered finished + testjob.job_status = status + testjob.save() + + if testjob.testrun: + UpdateProjectStatus()(testjob.testrun) diff --git a/squad/settings.py b/squad/settings.py index 2c6646e1..1ebcc848 100644 --- a/squad/settings.py +++ b/squad/settings.py @@ -384,6 +384,7 @@ 'squad.core.tasks.notification.*': {'queue': 'core_notification'}, 'squad.ci.tasks.poll': {'queue': 'ci_poll'}, 'squad.ci.tasks.fetch': {'queue': 'ci_fetch'}, + 'squad.ci.tasks.postprocess_testjob': {'queue': 'ci_fetch_postprocess'}, 'squad.ci.tasks.submit': {'queue': 'ci_quick'}, 'squad.ci.tasks.send_testjob_resubmit_admin_email': {'queue': 'ci_quick'}, } diff --git a/test/ci/test_models.py b/test/ci/test_models.py index ddc8623f..1d0f342f 100644 --- a/test/ci/test_models.py +++ b/test/ci/test_models.py @@ -35,7 +35,7 @@ class BackendTestBase(TestCase): def setUp(self): self.group = core_models.Group.objects.create(slug='mygroup') - self.project = self.group.projects.create(slug='myproject') + self.project = self.group.projects.create(slug='myproject', enabled_plugins_list=['linux-log-parser']) self.backend = models.Backend.objects.create() self.build = self.project.builds.create(version='1') @@ -430,7 +430,7 @@ def test_fetch_sets_fetched_at(self, receive, backend_fetch, backend_job_url): test_job.refresh_from_db() self.assertIsNotNone(test_job.fetched_at) - @patch.object(models.Backend, '__postprocess_testjob__') + @patch('squad.ci.models.Backend.__postprocess_testjob__') @patch('squad.ci.backend.null.Backend.job_url', return_value="http://example.com/123") @patch('squad.ci.backend.null.Backend.fetch') @patch('squad.ci.models.ReceiveTestRun.__call__')