diff --git a/src/clusterfuzz/_internal/remote_task/remote_task_adapters.py b/src/clusterfuzz/_internal/remote_task/remote_task_adapters.py index 3d42143fe85..b79740977bd 100644 --- a/src/clusterfuzz/_internal/remote_task/remote_task_adapters.py +++ b/src/clusterfuzz/_internal/remote_task/remote_task_adapters.py @@ -18,6 +18,7 @@ from clusterfuzz._internal.base import feature_flags from clusterfuzz._internal.batch import service as batch_service from clusterfuzz._internal.k8s import service as k8s_service +from clusterfuzz._internal.swarming.service import SwarmingService class RemoteTaskAdapters(Enum): @@ -37,6 +38,8 @@ class RemoteTaskAdapters(Enum): feature_flags.FeatureFlags.K8S_JOBS_FREQUENCY, 0.0) GCP_BATCH = ('gcp_batch', batch_service.GcpBatchService, feature_flags.FeatureFlags.GCP_BATCH_JOBS_FREQUENCY, 1.0) + SWARMING = ('swarming', SwarmingService, + feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION, 0.0) def __init__(self, adapter_id, service, feature_flag, default_weight): self.id = adapter_id diff --git a/src/clusterfuzz/_internal/remote_task/remote_task_gate.py b/src/clusterfuzz/_internal/remote_task/remote_task_gate.py index e0740aeefcb..ab563533a93 100644 --- a/src/clusterfuzz/_internal/remote_task/remote_task_gate.py +++ b/src/clusterfuzz/_internal/remote_task/remote_task_gate.py @@ -22,6 +22,9 @@ import collections import random +from clusterfuzz._internal import swarming +from clusterfuzz._internal.base import feature_flags +from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.remote_task import remote_task_adapters from clusterfuzz._internal.remote_task import remote_task_types @@ -55,6 +58,21 @@ def _get_adapter(self) -> str: weights = list(frequencies.values()) return random.choices(population, weights)[0] + def _is_swarming_applicable(self): + return feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled + + def _is_swarming_task(self, module, job_type): + return swarming.is_swarming_task( + task_utils.get_command_from_module(module), job_type) + + def _handle_swarming_job(self, module, job_type, input_download_url): + return self._service_map['swarming'].create_utask_main_job( + module, job_type, input_download_url) + + def _handle_swarming_jobs(self, + remote_tasks: list[remote_task_types.RemoteTask]): + return self._service_map['swarming'].create_utask_main_jobs(remote_tasks) + def get_job_frequency(self): """Returns the frequency distribution for all remote task adapters. @@ -106,6 +124,10 @@ def get_job_frequency(self): def create_utask_main_job(self, module, job_type, input_download_url): """Creates a single remote task, selecting a backend dynamically.""" + if self._is_swarming_applicable() and self._is_swarming_task( + module, job_type): + return self._handle_swarming_job(module, job_type, input_download_url) + adapter_id = self._get_adapter() service = self._service_map[adapter_id] return service.create_utask_main_job(module, job_type, input_download_url) @@ -114,21 +136,34 @@ def create_utask_main_jobs(self, remote_tasks: list[remote_task_types.RemoteTask]): """Creates a batch of remote tasks, distributing them across backends. - This method handles two cases: - 1. If there is only one task, it uses a weighted random choice to select - a backend, similar to `create_utask_main_job`. - 2. If there are multiple tasks, it distributes them deterministically - across the available backends based on their configured frequencies. - This ensures that a batch of 100 tasks with a 70/30 split sends - exactly 70 tasks to one backend and 30 to the other. + This method manages the distribution of tasks in two stages: + + 1. Swarming Interception (Optional): If the `SWARMING_REMOTE_EXECUTION` + feature flag is enabled, all tasks are first passed to the Swarming + service. The service schedules swarming-eligible tasks and returns + any tasks that it didn't schedule (e.g., non-swarming tasks or those + it failed to schedule). + + 2. Weighted Distribution: Any remaining tasks are then distributed across + the available remote adapters based on their configured frequencies: + - If only one task remains, it is assigned using a weighted random + choice to maintain overall distribution over time. + - If multiple tasks remain, they are distributed deterministically + according to their frequencies. This ensures precise adherence to + the distribution ratios (e.g., exactly 70/30 split for 100 tasks). """ tasks_by_adapter = collections.defaultdict(list) + unscheduled_tasks = [] + + if self._is_swarming_applicable(): + remote_tasks = self._handle_swarming_jobs(remote_tasks) - if len(remote_tasks) == 1: + if not remote_tasks: + pass + elif len(remote_tasks) == 1: # For a single task, use a random distribution. adapter_id = self._get_adapter() tasks_by_adapter[adapter_id].extend(remote_tasks) - unscheduled_tasks = [] else: # For multiple tasks, use deterministic slicing to ensure the # distribution precisely matches the frequency configuration. @@ -147,9 +182,8 @@ def create_utask_main_jobs(self, for i, task in enumerate(remaining_tasks): adapter_id = list(frequencies.keys())[i % len(frequencies)] tasks_by_adapter[adapter_id].append(task) - unscheduled_tasks = [] else: - unscheduled_tasks = list(remaining_tasks) + unscheduled_tasks.extend(remaining_tasks) for adapter_id, tasks in tasks_by_adapter.items(): if tasks: diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index cdcaea64313..4e03ab3bf6c 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -149,9 +149,7 @@ def _get_new_task_spec(command: str, job_name: str, swarming_pb2.StringPair(key=var['key'], value=var['value'])) # pylint: disable=no-member swarming_bot_environment.append(_env_vars_to_json(default_task_environment)) swarming_bot_environment.extend(default_task_environment) - dimensions = instance_spec.get('dimensions', []) - cas_input_root = instance_spec.get('cas_input_root', {}) new_task_request = swarming_pb2.NewTaskRequest( # pylint: disable=no-member diff --git a/src/clusterfuzz/_internal/swarming/service.py b/src/clusterfuzz/_internal/swarming/service.py new file mode 100644 index 00000000000..88c1169d285 --- /dev/null +++ b/src/clusterfuzz/_internal/swarming/service.py @@ -0,0 +1,58 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Swarming service.""" + +from clusterfuzz._internal import swarming +from clusterfuzz._internal.base.tasks import task_utils +from clusterfuzz._internal.metrics import logs +from clusterfuzz._internal.remote_task import remote_task_types + + +class SwarmingService(remote_task_types.RemoteTaskInterface): + """Remote task service implementation for Swarming.""" + + def create_utask_main_job(self, module: str, job_type: str, + input_download_url: str): + """Creates a single swarming task for a uworker main task.""" + command = task_utils.get_command_from_module(module) + swarming_task = remote_task_types.RemoteTask(command, job_type, + input_download_url) + result = self.create_utask_main_jobs([swarming_task]) + + if not result: + return None + + return result[0] + + def create_utask_main_jobs(self, + remote_tasks: list[remote_task_types.RemoteTask] + ) -> list[remote_task_types.RemoteTask]: + """Creates many remote tasks for uworker main tasks. + Returns the tasks that couldn't be created. + """ + unscheduled_tasks = [] + for task in remote_tasks: + try: + if not swarming.is_swarming_task(task.command, task.job_type): + unscheduled_tasks.append(task) + continue + + swarming.push_swarming_task(task.command, task.input_download_url, + task.job_type) + except Exception: # pylint: disable=broad-except + logs.error( + f'Failed to push task to Swarming: {task.command}, {task.job_type}.' + ) + unscheduled_tasks.append(task) + return unscheduled_tasks diff --git a/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py index 965b48f1bfc..e9c21331268 100644 --- a/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py +++ b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py @@ -23,6 +23,7 @@ from clusterfuzz._internal.remote_task import remote_task_adapters from clusterfuzz._internal.remote_task import remote_task_gate from clusterfuzz._internal.remote_task import remote_task_types +from clusterfuzz._internal.swarming.service import SwarmingService class RemoteTaskGateTest(unittest.TestCase): @@ -32,9 +33,11 @@ def setUp(self): super().setUp() self.mock_k8s_service = mock.Mock(spec=KubernetesService) self.mock_gcp_batch_service = mock.Mock(spec=GcpBatchService) + self.mock_swarming_service = mock.Mock(spec=SwarmingService) self.mock_k8s_service.create_utask_main_jobs.return_value = [] self.mock_gcp_batch_service.create_utask_main_jobs.return_value = [] + self.mock_swarming_service.create_utask_main_jobs.return_value = [] # Patch RemoteTaskAdapters to return our mock services self.patcher = mock.patch.dict( @@ -51,6 +54,12 @@ def setUp(self): service=mock.Mock(return_value=self.mock_gcp_batch_service), feature_flag=None, default_weight=1.0), + 'SWARMING': + mock.Mock( + id='swarming', + service=mock.Mock(return_value=self.mock_swarming_service), + feature_flag=None, + default_weight=0.0), }) self.patcher.start() self.addCleanup(self.patcher.stop) @@ -61,44 +70,87 @@ def test_init(self): gate = remote_task_gate.RemoteTaskGate() self.assertIn('kubernetes', gate._service_map) self.assertIn('gcp_batch', gate._service_map) + self.assertIn('swarming', gate._service_map) self.assertEqual(gate._service_map['kubernetes'], self.mock_k8s_service) self.assertEqual(gate._service_map['gcp_batch'], self.mock_gcp_batch_service) + self.assertEqual(gate._service_map['swarming'], self.mock_swarming_service) @mock.patch('random.choices') @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') def test_get_adapter(self, mock_get_job_frequency, mock_random_choices): """Tests that _get_adapter returns the correct adapter based on job_frequency.""" - mock_get_job_frequency.return_value = {'kubernetes': 0.3, 'gcp_batch': 0.7} + mock_get_job_frequency.return_value = { + 'kubernetes': 0.3, + 'gcp_batch': 0.7, + 'swarming': 0.0 + } mock_random_choices.return_value = ['gcp_batch'] gate = remote_task_gate.RemoteTaskGate() selected_adapter = gate._get_adapter() mock_get_job_frequency.assert_called_once() - mock_random_choices.assert_called_once_with(['kubernetes', 'gcp_batch'], - [0.3, 0.7]) + mock_random_choices.assert_called_once_with( + ['kubernetes', 'gcp_batch', 'swarming'], [0.3, 0.7, 0.0]) self.assertEqual(selected_adapter, 'gcp_batch') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_task') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_handle_swarming_job') + def test_create_utask_main_job_swarming_priority( + self, mock_handle_swarming_job, mock_is_swarming_task, + mock_is_swarming_applicable): + """Tests that create_utask_main_job prioritizes Swarming tasks when + flag is enabled and it is a swarming task.""" + mock_is_swarming_applicable.return_value = True + mock_is_swarming_task.return_value = True + mock_handle_swarming_job.return_value = mock.Mock() + + gate = remote_task_gate.RemoteTaskGate() + gate.create_utask_main_job('module', 'job', 'url') + + mock_handle_swarming_job.assert_called_once_with('module', 'job', 'url') + self.mock_k8s_service.create_utask_main_job.assert_not_called() + self.mock_gcp_batch_service.create_utask_main_job.assert_not_called() + + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_task') @mock.patch.object(remote_task_gate.RemoteTaskGate, '_get_adapter') - def test_create_utask_main_job_kubernetes(self, mock_get_adapter): + def test_create_utask_main_job_kubernetes(self, mock_get_adapter, + mock_is_swarming_task, + mock_is_swarming_applicable): """Tests that create_utask_main_job calls the Kubernetes service - when kubernetes adapter is chosen.""" + when it is NOT a swarming task.""" + mock_is_swarming_applicable.return_value = True + mock_is_swarming_task.return_value = False + mock_get_adapter.return_value = 'kubernetes' gate = remote_task_gate.RemoteTaskGate() gate.create_utask_main_job('module', 'job', 'url') + + self.mock_swarming_service.create_utask_main_job.assert_not_called() self.mock_k8s_service.create_utask_main_job.assert_called_once_with( 'module', 'job', 'url') self.mock_gcp_batch_service.create_utask_main_job.assert_not_called() + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_task') @mock.patch.object(remote_task_gate.RemoteTaskGate, '_get_adapter') - def test_create_utask_main_job_gcp_batch(self, mock_get_adapter): + def test_create_utask_main_job_gcp_batch(self, mock_get_adapter, + mock_is_swarming_task, + mock_is_swarming_applicable): """Tests that create_utask_main_job calls the GCP Batch service - when gcp_batch adapter is chosen.""" + when it is NOT a swarming task.""" + mock_is_swarming_applicable.return_value = True + mock_is_swarming_task.return_value = False + mock_get_adapter.return_value = 'gcp_batch' gate = remote_task_gate.RemoteTaskGate() gate.create_utask_main_job('module', 'job', 'url') + + self.mock_swarming_service.create_utask_main_job.assert_not_called() self.mock_gcp_batch_service.create_utask_main_job.assert_called_once_with( 'module', 'job', @@ -106,13 +158,35 @@ def test_create_utask_main_job_gcp_batch(self, mock_get_adapter): ) self.mock_k8s_service.create_utask_main_job.assert_not_called() + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_task') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_get_adapter') + def test_create_utask_main_job_swarming_disabled(self, mock_get_adapter, + mock_is_swarming_task, + mock_is_swarming_applicable): + """Tests that create_utask_main_job does NOT call Swarming when flag + is disabled.""" + mock_is_swarming_applicable.return_value = False + mock_is_swarming_task.return_value = True + mock_get_adapter.return_value = 'kubernetes' + + gate = remote_task_gate.RemoteTaskGate() + gate.create_utask_main_job('module', 'job', 'url') + + self.mock_swarming_service.create_utask_main_job.assert_not_called() + self.mock_k8s_service.create_utask_main_job.assert_called_once_with( + 'module', 'job', 'url') + + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') @mock.patch.object(remote_task_gate.RemoteTaskGate, '_get_adapter') - def test_create_utask_main_jobs_single_task(self, mock_get_adapter): + def test_create_utask_main_jobs_single_task(self, mock_get_adapter, + mock_is_swarming_applicable): """Tests that create_utask_main_jobs correctly routes a single task based on _get_adapter.""" tasks = [ remote_task_types.RemoteTask('command1', 'job1', 'url1'), ] + mock_is_swarming_applicable.return_value = False mock_get_adapter.return_value = 'kubernetes' gate = remote_task_gate.RemoteTaskGate() gate.create_utask_main_jobs(tasks) @@ -120,9 +194,10 @@ def test_create_utask_main_jobs_single_task(self, mock_get_adapter): self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(tasks) self.mock_gcp_batch_service.create_utask_main_jobs.assert_not_called() + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') def test_create_utask_main_jobs_multiple_tasks_slicing( - self, mock_get_job_frequency): + self, mock_get_job_frequency, mock_is_swarming_applicable): """Tests that create_utask_main_jobs correctly routes multiple tasks using deterministic slicing.""" tasks = [ @@ -131,9 +206,14 @@ def test_create_utask_main_jobs_multiple_tasks_slicing( remote_task_types.RemoteTask('command', 'job1', 'url3'), remote_task_types.RemoteTask('command', 'job1', 'url4'), ] + mock_is_swarming_applicable.return_value = False # 50% split - mock_get_job_frequency.return_value = {'kubernetes': 0.5, 'gcp_batch': 0.5} + mock_get_job_frequency.return_value = { + 'kubernetes': 0.5, + 'gcp_batch': 0.5, + 'swarming': 0.0 + } gate = remote_task_gate.RemoteTaskGate() gate.create_utask_main_jobs(tasks) @@ -144,9 +224,10 @@ def test_create_utask_main_jobs_multiple_tasks_slicing( self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with( tasks[2:]) + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') def test_create_utask_main_jobs_remainder_distribution( - self, mock_get_job_frequency): + self, mock_get_job_frequency, mock_is_swarming_applicable): """Tests that create_utask_main_jobs correctly distributes remainder tasks.""" tasks = [ @@ -154,22 +235,28 @@ def test_create_utask_main_jobs_remainder_distribution( remote_task_types.RemoteTask('c', 'j', 'u2'), remote_task_types.RemoteTask('c', 'j', 'u3'), ] + mock_is_swarming_applicable.return_value = False # 50/50 split - one task will be a remainder - mock_get_job_frequency.return_value = {'kubernetes': 0.5, 'gcp_batch': 0.5} + mock_get_job_frequency.return_value = { + 'kubernetes': 0.5, + 'gcp_batch': 0.5, + 'swarming': 0.0 + } gate = remote_task_gate.RemoteTaskGate() gate.create_utask_main_jobs(tasks) # Expect 1 for k8s, 1 for gcp_batch, and 1 remainder distributed round robin. - # In this case, first k8s gets 1, then gcp_batch gets 1, then k8s gets the last one. self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with( [tasks[0], tasks[2]]) self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with( [tasks[1]]) + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') - def test_create_utask_main_jobs_unscheduled(self, mock_get_job_frequency): + def test_create_utask_main_jobs_unscheduled(self, mock_get_job_frequency, + mock_is_swarming_applicable): """Tests that create_utask_main_jobs returns remainder as unscheduled when sum < 1.0.""" tasks = [ @@ -178,58 +265,70 @@ def test_create_utask_main_jobs_unscheduled(self, mock_get_job_frequency): remote_task_types.RemoteTask('c', 'j', 'u3'), remote_task_types.RemoteTask('c', 'j', 'u4'), ] + mock_is_swarming_applicable.return_value = False # 0.25 each. Sum 0.5. mock_get_job_frequency.return_value = { 'kubernetes': 0.25, - 'gcp_batch': 0.25 + 'gcp_batch': 0.25, + 'swarming': 0.0 } gate = remote_task_gate.RemoteTaskGate() result = gate.create_utask_main_jobs(tasks) - # 4 * 0.25 = 1 task each. - # Total assigned = 2. - # Total unscheduled = 2. - self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with( [tasks[0]]) self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with( [tasks[1]]) - # Result should contain unscheduled (tasks[2], tasks[3]). self.assertEqual(result, [tasks[2], tasks[3]]) + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') - def test_create_utask_main_jobs_full_kubernetes(self, mock_get_job_frequency): + def test_create_utask_main_jobs_full_kubernetes(self, mock_get_job_frequency, + mock_is_swarming_applicable): """Tests that all tasks are routed to Kubernetes when frequency is 1.0.""" tasks = [ remote_task_types.RemoteTask('c', 'j', 'u1'), remote_task_types.RemoteTask('c', 'j', 'u2'), ] - mock_get_job_frequency.return_value = {'kubernetes': 1.0, 'gcp_batch': 0.0} + mock_is_swarming_applicable.return_value = False + mock_get_job_frequency.return_value = { + 'kubernetes': 1.0, + 'gcp_batch': 0.0, + 'swarming': 0.0 + } gate = remote_task_gate.RemoteTaskGate() gate.create_utask_main_jobs(tasks) self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(tasks) self.mock_gcp_batch_service.create_utask_main_jobs.assert_not_called() + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') - def test_create_utask_main_jobs_full_gcp_batch(self, mock_get_job_frequency): + def test_create_utask_main_jobs_full_gcp_batch(self, mock_get_job_frequency, + mock_is_swarming_applicable): """Tests that all tasks are routed to GCP Batch when frequency is 1.0.""" tasks = [ remote_task_types.RemoteTask('c', 'j', 'u1'), remote_task_types.RemoteTask('c', 'j', 'u2'), ] - mock_get_job_frequency.return_value = {'kubernetes': 0.0, 'gcp_batch': 1.0} + mock_is_swarming_applicable.return_value = False + mock_get_job_frequency.return_value = { + 'kubernetes': 0.0, + 'gcp_batch': 1.0, + 'swarming': 0.0 + } gate = remote_task_gate.RemoteTaskGate() gate.create_utask_main_jobs(tasks) self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with( tasks) self.mock_k8s_service.create_utask_main_jobs.assert_not_called() + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') def test_create_utask_main_jobs_returns_unscheduled_tasks( - self, mock_get_job_frequency): + self, mock_get_job_frequency, mock_is_swarming_applicable): """Tests that create_utask_main_jobs returns unscheduled tasks directly.""" tasks = [ remote_task_types.RemoteTask('c', 'j', 'u1'), @@ -237,8 +336,12 @@ def test_create_utask_main_jobs_returns_unscheduled_tasks( unscheduled_tasks = [ remote_task_types.RemoteTask('c', 'j', 'u1'), ] - - mock_get_job_frequency.return_value = {'kubernetes': 1.0, 'gcp_batch': 0.0} + mock_is_swarming_applicable.return_value = False + mock_get_job_frequency.return_value = { + 'kubernetes': 1.0, + 'gcp_batch': 0.0, + 'swarming': 0.0 + } self.mock_k8s_service.create_utask_main_jobs.return_value = unscheduled_tasks gate = remote_task_gate.RemoteTaskGate() @@ -247,12 +350,137 @@ def test_create_utask_main_jobs_returns_unscheduled_tasks( self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(tasks) self.assertEqual(result, unscheduled_tasks) + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_handle_swarming_jobs') + @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') + def test_create_utask_main_jobs_swarming_remote_execution_enabled( + self, mock_get_job_frequency, mock_handle_swarming_jobs, + mock_is_swarming_applicable): + """Tests that create_utask_main_jobs passes tasks to swarming service when the + feature flag is enabled.""" + tasks = [ + remote_task_types.RemoteTask('swarming_cmd', 'job1', 'url1'), + remote_task_types.RemoteTask('regular_cmd', 'job2', 'url2'), + ] + + mock_is_swarming_applicable.return_value = True + # _handle_swarming_jobs should process the swarming task and return the regular task. + mock_handle_swarming_jobs.return_value = [tasks[1]] + + mock_get_job_frequency.return_value = { + 'kubernetes': 1.0, + 'gcp_batch': 0.0, + 'swarming': 0.0 + } + + gate = remote_task_gate.RemoteTaskGate() + result = gate.create_utask_main_jobs(tasks) + + # ALL tasks should be sent to SwarmingService initially via _handle_swarming_jobs. + mock_handle_swarming_jobs.assert_called_once_with(tasks) + + # The regular task returned by _handle_swarming_jobs should be routed to Kubernetes. + self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with( + [tasks[1]]) + + # No tasks should be unscheduled. + self.assertEqual(result, []) + + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_handle_swarming_jobs') + def test_create_utask_main_jobs_swarming_remote_execution_all_swarming( + self, mock_handle_swarming_jobs, mock_is_swarming_applicable): + """Tests that create_utask_main_jobs handles the case where all tasks + are swarming tasks.""" + tasks = [ + remote_task_types.RemoteTask('swarming_cmd', 'job1', 'url1'), + remote_task_types.RemoteTask('swarming_cmd', 'job2', 'url2'), + ] + + mock_is_swarming_applicable.return_value = True + # All tasks successfully scheduled as swarming. + mock_handle_swarming_jobs.return_value = [] + + gate = remote_task_gate.RemoteTaskGate() + result = gate.create_utask_main_jobs(tasks) + + # Both tasks should be sent to _handle_swarming_jobs. + mock_handle_swarming_jobs.assert_called_once_with(tasks) + self.mock_k8s_service.create_utask_main_jobs.assert_not_called() + self.mock_gcp_batch_service.create_utask_main_jobs.assert_not_called() + + # No tasks should be unscheduled. + self.assertEqual(result, []) + + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_handle_swarming_jobs') + @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') + def test_create_utask_main_jobs_swarming_failure_preservation( + self, mock_get_job_frequency, mock_handle_swarming_jobs, + mock_is_swarming_applicable): + """Tests that failed swarming tasks are correctly included in + unscheduled_tasks.""" + tasks = [ + remote_task_types.RemoteTask('swarming_cmd1', 'job1', 'url1'), + remote_task_types.RemoteTask('swarming_cmd2', 'job2', 'url2'), + remote_task_types.RemoteTask('regular_cmd', 'job3', 'url3'), + ] + + mock_is_swarming_applicable.return_value = True + # Mock one success, one failure (by returning it as unscheduled) and one regular task. + mock_handle_swarming_jobs.return_value = [tasks[1], tasks[2]] + + mock_get_job_frequency.return_value = { + 'kubernetes': 1.0, + 'gcp_batch': 0.0, + 'swarming': 0.0 + } + + gate = remote_task_gate.RemoteTaskGate() + result = gate.create_utask_main_jobs(tasks) + + # All tasks sent to _handle_swarming_jobs. + mock_handle_swarming_jobs.assert_called_once_with(tasks) + + # The tasks returned by _handle_swarming_jobs (failed swarming + regular) sent to k8s. + self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with( + [tasks[1], tasks[2]]) + + # Both should be successfully processed by K8s (mocked to return [] by default). + self.assertEqual(result, []) + + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') + @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') + def test_create_utask_main_jobs_swarming_remote_execution_disabled( + self, mock_get_job_frequency, mock_is_swarming_applicable): + """Tests that swarming tasks are NOT intercepted when the flag is disabled.""" + tasks = [ + remote_task_types.RemoteTask('swarming_cmd', 'job1', 'url1'), + ] + + mock_is_swarming_applicable.return_value = False + mock_get_job_frequency.return_value = { + 'kubernetes': 1.0, + 'gcp_batch': 0.0, + 'swarming': 0.0 + } + + gate = remote_task_gate.RemoteTaskGate() + gate.create_utask_main_jobs(tasks) + + # Flag disabled: should NOT call swarming service. + self.mock_swarming_service.create_utask_main_jobs.assert_not_called() + + # Should be routed normally to Kubernetes. + self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(tasks) + class RemoteTaskGateProcessingTest(unittest.TestCase): """Tests for logic in RemoteTaskGate that doesn't require full service mocking.""" def setUp(self): super().setUp() + self.mock_swarming_service = mock.Mock(spec=SwarmingService) # Mock adapters to avoid real service instantiation self.patcher = mock.patch.dict( remote_task_adapters.RemoteTaskAdapters._member_map_, { @@ -268,7 +496,47 @@ def setUp(self): service=mock.Mock(), feature_flag=None, default_weight=1.0), + 'SWARMING': + mock.Mock( + id='swarming', + service=mock.Mock(return_value=self.mock_swarming_service), + feature_flag=None, + default_weight=0.0), }) self.patcher.start() self.addCleanup(self.patcher.stop) self.gate = remote_task_gate.RemoteTaskGate() + + @mock.patch( + 'clusterfuzz._internal.base.feature_flags.FeatureFlags.enabled', + new_callable=mock.PropertyMock) + def test_is_swarming_applicable(self, mock_swarming_flag): + """Tests _is_swarming_applicable.""" + mock_swarming_flag.return_value = True + self.assertTrue(self.gate._is_swarming_applicable()) + + mock_swarming_flag.return_value = False + self.assertFalse(self.gate._is_swarming_applicable()) + + @mock.patch('clusterfuzz._internal.remote_task.remote_task_gate.task_utils') + @mock.patch('clusterfuzz._internal.remote_task.remote_task_gate.swarming') + def test_is_swarming_task(self, mock_swarming, mock_task_utils): + """Tests _is_swarming_task.""" + mock_task_utils.get_command_from_module.return_value = 'fuzz' + mock_swarming.is_swarming_task.return_value = True + + self.assertTrue(self.gate._is_swarming_task('module', 'job')) + mock_swarming.is_swarming_task.assert_called_once_with('fuzz', 'job') + + def test_handle_swarming_job(self): + """Tests _handle_swarming_job.""" + self.gate._handle_swarming_job('module', 'job', 'url') + self.mock_swarming_service.create_utask_main_job.assert_called_once_with( + 'module', 'job', 'url') + + def test_handle_swarming_jobs(self): + """Tests _handle_swarming_jobs.""" + tasks = [mock.Mock()] + self.gate._handle_swarming_jobs(tasks) + self.mock_swarming_service.create_utask_main_jobs.assert_called_once_with( + tasks) diff --git a/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py index c5b3d455e57..1a01c1758bb 100644 --- a/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py +++ b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py @@ -49,14 +49,17 @@ def setUp(self): self.gate = remote_task_gate.RemoteTaskGate() + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') @mock.patch.object(k8s_service.KubernetesService, 'create_utask_main_jobs') @mock.patch( 'clusterfuzz._internal.batch.service.GcpBatchService.create_utask_main_jobs' ) def test_create_utask_main_jobs_k8s_limit_reached( - self, mock_gcp_create, mock_k8s_create, mock_get_frequency): + self, mock_gcp_create, mock_k8s_create, mock_get_frequency, + mock_is_swarming_applicable): """Test delegation when K8s limit is reached (handled by service).""" + mock_is_swarming_applicable.return_value = False # Setup tasks to go to Kubernetes mock_get_frequency.return_value = {'kubernetes': 1.0} @@ -76,14 +79,17 @@ def test_create_utask_main_jobs_k8s_limit_reached( # Verify result is empty list self.assertEqual(result, []) + @mock.patch.object(remote_task_gate.RemoteTaskGate, '_is_swarming_applicable') @mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency') @mock.patch.object(k8s_service.KubernetesService, 'create_utask_main_jobs') @mock.patch( 'clusterfuzz._internal.batch.service.GcpBatchService.create_utask_main_jobs' ) def test_create_utask_main_jobs_success(self, _, mock_k8s_create, - mock_get_frequency): + mock_get_frequency, + mock_is_swarming_applicable): """Test successful creation.""" + mock_is_swarming_applicable.return_value = False mock_get_frequency.return_value = {'kubernetes': 1.0} mock_pubsub_task = mock.Mock() task = remote_task_types.RemoteTask( diff --git a/src/clusterfuzz/_internal/tests/core/swarming/service_test.py b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py new file mode 100644 index 00000000000..09c485c4267 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py @@ -0,0 +1,131 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for SwarmingService.""" + +import unittest +from unittest import mock + +from clusterfuzz._internal.remote_task import remote_task_types +from clusterfuzz._internal.swarming import service +from clusterfuzz._internal.tests.test_libs import helpers + + +class SwarmingServiceTest(unittest.TestCase): + """Tests for SwarmingService.""" + + def setUp(self): + helpers.patch(self, [ + 'clusterfuzz._internal.swarming.is_swarming_task', + 'clusterfuzz._internal.swarming.push_swarming_task', + 'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module', + 'clusterfuzz._internal.metrics.logs.error', + ]) + self.service = service.SwarmingService() + + def test_create_utask_main_job_success(self): + """Test creating a single task successfully.""" + self.mock.get_command_from_module.return_value = 'fuzz' + self.mock.is_swarming_task.return_value = True + + result = self.service.create_utask_main_job('fuzz_task', 'job_type', + 'http://url') + + # Success returns None in this interface (consistent with GcpBatchService) + self.assertIsNone(result) + + self.mock.push_swarming_task.assert_called_once_with( + 'fuzz', 'http://url', 'job_type') + + def test_create_utask_main_job_failure(self): + """Test creating a single task that is not a swarming task.""" + self.mock.get_command_from_module.return_value = 'fuzz' + self.mock.is_swarming_task.return_value = False + + result = self.service.create_utask_main_job('fuzz_task', 'job_type', + 'http://url') + + # Failure returns the task itself + self.assertIsInstance(result, remote_task_types.RemoteTask) + self.assertEqual(result.command, 'fuzz') + self.mock.push_swarming_task.assert_not_called() + + def test_create_utask_main_jobs_mixed_results(self): + """Test creating multiple tasks with mixed success/failure.""" + tasks = [ + remote_task_types.RemoteTask('fuzz', 'job1', 'url1'), + remote_task_types.RemoteTask('fuzz', 'job2', 'url2'), + remote_task_types.RemoteTask('fuzz', 'job3', 'url3'), + ] + + # job1 succeeds, job2 fails (not a swarming task), job3 succeeds + self.mock.is_swarming_task.side_effect = [True, False, True] + + unscheduled = self.service.create_utask_main_jobs(tasks) + + self.assertEqual(len(unscheduled), 1) + self.assertEqual(unscheduled[0].job_type, 'job2') + + self.assertEqual(self.mock.push_swarming_task.call_count, 2) + self.mock.push_swarming_task.assert_has_calls([ + mock.call('fuzz', 'url1', 'job1'), + mock.call('fuzz', 'url3', 'job3'), + ]) + + def test_create_utask_main_jobs_all_success(self): + """Test creating multiple tasks where all succeed.""" + tasks = [ + remote_task_types.RemoteTask('fuzz', 'job1', 'url1'), + remote_task_types.RemoteTask('fuzz', 'job2', 'url2'), + ] + self.mock.is_swarming_task.return_value = True + + unscheduled = self.service.create_utask_main_jobs(tasks) + + self.assertEqual(unscheduled, []) + self.assertEqual(self.mock.push_swarming_task.call_count, 2) + + def test_create_utask_main_jobs_all_fail(self): + """Test creating multiple tasks where all fail.""" + tasks = [ + remote_task_types.RemoteTask('fuzz', 'job1', 'url1'), + remote_task_types.RemoteTask('fuzz', 'job2', 'url2'), + ] + self.mock.is_swarming_task.return_value = False + + unscheduled = self.service.create_utask_main_jobs(tasks) + + self.assertEqual(unscheduled, tasks) + self.mock.push_swarming_task.assert_not_called() + + def test_create_utask_main_jobs_empty(self): + """Test creating tasks with an empty list.""" + unscheduled = self.service.create_utask_main_jobs([]) + self.assertEqual(unscheduled, []) + self.mock.push_swarming_task.assert_not_called() + + def test_create_utask_main_jobs_exception(self): + """Test creating tasks when push_swarming_task raises an exception.""" + tasks = [ + remote_task_types.RemoteTask('fuzz', 'job1', 'url1'), + ] + + self.mock.is_swarming_task.return_value = True + self.mock.push_swarming_task.side_effect = Exception('error') + + unscheduled = self.service.create_utask_main_jobs(tasks) + + self.assertEqual(len(unscheduled), 1) + self.assertEqual(unscheduled[0].job_type, 'job1') + self.mock.error.assert_called_once_with( + 'Failed to push task to Swarming: fuzz, job1.')