-
Notifications
You must be signed in to change notification settings - Fork 601
Add Swarming service for Remote Task Gate #5206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
da2d5ed
d1281ed
7200eda
4a3a30b
bd63f6e
3c9f9c9
5b1690f
267c584
39c1ea4
4d35f03
e266779
b79d077
150e132
451afe1
618107d
4b81e69
d4e44d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import collections | ||
| import random | ||
|
|
||
| from clusterfuzz._internal.base import feature_flags | ||
| from clusterfuzz._internal.metrics import logs | ||
| from clusterfuzz._internal.remote_task import remote_task_adapters | ||
| from clusterfuzz._internal.remote_task import remote_task_types | ||
|
|
@@ -106,6 +107,12 @@ def get_job_frequency(self): | |
|
|
||
| def create_utask_main_job(self, module, job_type, input_download_url): | ||
| """Creates a single remote task, selecting a backend dynamically.""" | ||
| if feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: | ||
| result = self._service_map['swarming'].create_utask_main_job( | ||
| module, job_type, input_download_url) | ||
| if result is None: | ||
| return None | ||
|
|
||
| adapter_id = self._get_adapter() | ||
| service = self._service_map[adapter_id] | ||
| return service.create_utask_main_job(module, job_type, input_download_url) | ||
|
|
@@ -114,21 +121,38 @@ def create_utask_main_jobs(self, | |
| remote_tasks: list[remote_task_types.RemoteTask]): | ||
| """Creates a batch of remote tasks, distributing them across backends. | ||
|
|
||
| This method handles two cases: | ||
| 1. If there is only one task, it uses a weighted random choice to select | ||
| a backend, similar to `create_utask_main_job`. | ||
| 2. If there are multiple tasks, it distributes them deterministically | ||
| across the available backends based on their configured frequencies. | ||
| This ensures that a batch of 100 tasks with a 70/30 split sends | ||
| exactly 70 tasks to one backend and 30 to the other. | ||
| This method manages the distribution of tasks in two stages: | ||
|
|
||
| 1. Swarming Interception (Optional): If the `SWARMING_REMOTE_EXECUTION` | ||
| feature flag is enabled, all tasks are first passed to the Swarming | ||
| service. The service schedules swarming-eligible tasks and returns | ||
| any tasks that it didn't schedule (e.g., non-swarming tasks or those | ||
| it failed to schedule). | ||
|
|
||
| 2. Weighted Distribution: Any remaining tasks are then distributed across | ||
| the available remote adapters based on their configured frequencies: | ||
| - If only one task remains, it is assigned using a weighted random | ||
| choice to maintain overall distribution over time. | ||
| - If multiple tasks remain, they are distributed deterministically | ||
| according to their frequencies. This ensures precise adherence to | ||
| the distribution ratios (e.g., exactly 70/30 split for 100 tasks). | ||
| """ | ||
| tasks_by_adapter = collections.defaultdict(list) | ||
|
|
||
| if len(remote_tasks) == 1: | ||
| unscheduled_tasks = [] | ||
|
|
||
| if feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could keep this function cleaner if we could move this logic to an auxiliary function.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I see an auxiliary function with this single line. I guess I probably do not understand very well what would we encapsulate in that auxiliary function. Can you clarify? |
||
| # TODO(jardondiego): SwarmingService | ||
| # returns all tasks that weren't scheduled, which includes both | ||
| # failed swarming tasks AND non-swarming tasks. | ||
| remote_tasks = self._service_map['swarming'].create_utask_main_jobs( | ||
| remote_tasks) | ||
|
|
||
| if not remote_tasks: | ||
| pass | ||
| elif len(remote_tasks) == 1: | ||
jardondiego marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # For a single task, use a random distribution. | ||
| adapter_id = self._get_adapter() | ||
| tasks_by_adapter[adapter_id].extend(remote_tasks) | ||
| unscheduled_tasks = [] | ||
| else: | ||
| # For multiple tasks, use deterministic slicing to ensure the | ||
| # distribution precisely matches the frequency configuration. | ||
|
|
@@ -147,9 +171,8 @@ def create_utask_main_jobs(self, | |
| for i, task in enumerate(remaining_tasks): | ||
| adapter_id = list(frequencies.keys())[i % len(frequencies)] | ||
| tasks_by_adapter[adapter_id].append(task) | ||
| unscheduled_tasks = [] | ||
| else: | ||
| unscheduled_tasks = list(remaining_tasks) | ||
| unscheduled_tasks.extend(remaining_tasks) | ||
|
|
||
| for adapter_id, tasks in tasks_by_adapter.items(): | ||
| if tasks: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| # Copyright 2026 Google LLC | ||
jardondiego marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """Swarming service.""" | ||
|
|
||
| from clusterfuzz._internal import swarming | ||
| from clusterfuzz._internal.base.tasks import task_utils | ||
| from clusterfuzz._internal.metrics import logs | ||
| from clusterfuzz._internal.remote_task import remote_task_types | ||
|
|
||
|
|
||
| class SwarmingService(remote_task_types.RemoteTaskInterface): | ||
| """Remote task service implementation for Swarming.""" | ||
|
|
||
| def create_utask_main_job(self, module: str, job_type: str, | ||
| input_download_url: str): | ||
| """Creates a single swarming task for a uworker main task.""" | ||
| command = task_utils.get_command_from_module(module) | ||
| swarming_task = remote_task_types.RemoteTask(command, job_type, | ||
| input_download_url) | ||
| result = self.create_utask_main_jobs([swarming_task]) | ||
jardondiego marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if not result: | ||
| return None | ||
|
|
||
| return result[0] | ||
jardondiego marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def create_utask_main_jobs(self, | ||
| remote_tasks: list[remote_task_types.RemoteTask] | ||
| ) -> list[remote_task_types.RemoteTask]: | ||
| """Creates many remote tasks for uworker main tasks. | ||
| Returns the tasks that couldn't be created. | ||
| """ | ||
| unscheduled_tasks = [] | ||
| for task in remote_tasks: | ||
| try: | ||
| if not swarming.is_swarming_task(task.command, task.job_type): | ||
jardondiego marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| unscheduled_tasks.append(task) | ||
| continue | ||
|
|
||
| swarming.push_swarming_task(task.command, task.input_download_url, | ||
| task.job_type) | ||
| except Exception: # pylint: disable=broad-except | ||
jardondiego marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logs.error( | ||
| f'Failed to push task to Swarming: {task.command}, {task.job_type}.' | ||
| ) | ||
| unscheduled_tasks.append(task) | ||
| return unscheduled_tasks | ||
Uh oh!
There was an error while loading. Please reload this page.