Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from clusterfuzz._internal.base import feature_flags
from clusterfuzz._internal.batch import service as batch_service
from clusterfuzz._internal.k8s import service as k8s_service
from clusterfuzz._internal.swarming.service import SwarmingService


class RemoteTaskAdapters(Enum):
Expand All @@ -37,6 +38,8 @@ class RemoteTaskAdapters(Enum):
feature_flags.FeatureFlags.K8S_JOBS_FREQUENCY, 0.0)
GCP_BATCH = ('gcp_batch', batch_service.GcpBatchService,
feature_flags.FeatureFlags.GCP_BATCH_JOBS_FREQUENCY, 1.0)
SWARMING = ('swarming', SwarmingService,
feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION, 0.0)

def __init__(self, adapter_id, service, feature_flag, default_weight):
self.id = adapter_id
Expand Down
47 changes: 35 additions & 12 deletions src/clusterfuzz/_internal/remote_task/remote_task_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import collections
import random

from clusterfuzz._internal.base import feature_flags
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.remote_task import remote_task_adapters
from clusterfuzz._internal.remote_task import remote_task_types
Expand Down Expand Up @@ -106,6 +107,12 @@ def get_job_frequency(self):

def create_utask_main_job(self, module, job_type, input_download_url):
"""Creates a single remote task, selecting a backend dynamically."""
if feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
result = self._service_map['swarming'].create_utask_main_job(
module, job_type, input_download_url)
if result is None:
return None

adapter_id = self._get_adapter()
service = self._service_map[adapter_id]
return service.create_utask_main_job(module, job_type, input_download_url)
Expand All @@ -114,21 +121,38 @@ def create_utask_main_jobs(self,
remote_tasks: list[remote_task_types.RemoteTask]):
"""Creates a batch of remote tasks, distributing them across backends.

This method handles two cases:
1. If there is only one task, it uses a weighted random choice to select
a backend, similar to `create_utask_main_job`.
2. If there are multiple tasks, it distributes them deterministically
across the available backends based on their configured frequencies.
This ensures that a batch of 100 tasks with a 70/30 split sends
exactly 70 tasks to one backend and 30 to the other.
This method manages the distribution of tasks in two stages:

1. Swarming Interception (Optional): If the `SWARMING_REMOTE_EXECUTION`
feature flag is enabled, all tasks are first passed to the Swarming
service. The service schedules swarming-eligible tasks and returns
any tasks that it didn't schedule (e.g., non-swarming tasks or those
it failed to schedule).

2. Weighted Distribution: Any remaining tasks are then distributed across
the available remote adapters based on their configured frequencies:
- If only one task remains, it is assigned using a weighted random
choice to maintain overall distribution over time.
- If multiple tasks remain, they are distributed deterministically
according to their frequencies. This ensures precise adherence to
the distribution ratios (e.g., exactly 70/30 split for 100 tasks).
"""
tasks_by_adapter = collections.defaultdict(list)

if len(remote_tasks) == 1:
unscheduled_tasks = []

if feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could keep this function cleaner if we could move this logic to an auxiliary function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I see an auxiliary function with this single line. I guess I probably do not understand very well what would we encapsulate in that auxiliary function. Can you clarify?

# TODO(jardondiego): SwarmingService
# returns all tasks that weren't scheduled, which includes both
# failed swarming tasks AND non-swarming tasks.
remote_tasks = self._service_map['swarming'].create_utask_main_jobs(
remote_tasks)

if not remote_tasks:
pass
elif len(remote_tasks) == 1:
# For a single task, use a random distribution.
adapter_id = self._get_adapter()
tasks_by_adapter[adapter_id].extend(remote_tasks)
unscheduled_tasks = []
else:
# For multiple tasks, use deterministic slicing to ensure the
# distribution precisely matches the frequency configuration.
Expand All @@ -147,9 +171,8 @@ def create_utask_main_jobs(self,
for i, task in enumerate(remaining_tasks):
adapter_id = list(frequencies.keys())[i % len(frequencies)]
tasks_by_adapter[adapter_id].append(task)
unscheduled_tasks = []
else:
unscheduled_tasks = list(remaining_tasks)
unscheduled_tasks.extend(remaining_tasks)

for adapter_id, tasks in tasks_by_adapter.items():
if tasks:
Expand Down
2 changes: 0 additions & 2 deletions src/clusterfuzz/_internal/swarming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ def _get_new_task_spec(command: str, job_name: str,
swarming_pb2.StringPair(key=var['key'], value=var['value'])) # pylint: disable=no-member
swarming_bot_environment.append(_env_vars_to_json(default_task_environment))
swarming_bot_environment.extend(default_task_environment)

dimensions = instance_spec.get('dimensions', [])

cas_input_root = instance_spec.get('cas_input_root', {})

new_task_request = swarming_pb2.NewTaskRequest( # pylint: disable=no-member
Expand Down
58 changes: 58 additions & 0 deletions src/clusterfuzz/_internal/swarming/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Swarming service."""

from clusterfuzz._internal import swarming
from clusterfuzz._internal.base.tasks import task_utils
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.remote_task import remote_task_types


class SwarmingService(remote_task_types.RemoteTaskInterface):
"""Remote task service implementation for Swarming."""

def create_utask_main_job(self, module: str, job_type: str,
input_download_url: str):
"""Creates a single swarming task for a uworker main task."""
command = task_utils.get_command_from_module(module)
swarming_task = remote_task_types.RemoteTask(command, job_type,
input_download_url)
result = self.create_utask_main_jobs([swarming_task])

if not result:
return None

return result[0]

def create_utask_main_jobs(self,
remote_tasks: list[remote_task_types.RemoteTask]
) -> list[remote_task_types.RemoteTask]:
"""Creates many remote tasks for uworker main tasks.
Returns the tasks that couldn't be created.
"""
unscheduled_tasks = []
for task in remote_tasks:
try:
if not swarming.is_swarming_task(task.command, task.job_type):
unscheduled_tasks.append(task)
continue

swarming.push_swarming_task(task.command, task.input_download_url,
task.job_type)
except Exception: # pylint: disable=broad-except
logs.error(
f'Failed to push task to Swarming: {task.command}, {task.job_type}.'
)
unscheduled_tasks.append(task)
return unscheduled_tasks
Loading
Loading