Skip to content

Commit 9702d8d

Browse files
author
Diego Jardon
committed
Include a Swarming check in Remote Task Gate
1 parent d1281ed commit 9702d8d

2 files changed

Lines changed: 93 additions & 19 deletions

File tree

src/clusterfuzz/_internal/remote_task/remote_task_gate.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import collections
2323
import random
2424

25+
from clusterfuzz._internal.base.tasks import task_utils
2526
from clusterfuzz._internal.metrics import logs
2627
from clusterfuzz._internal.remote_task import remote_task_adapters
2728
from clusterfuzz._internal.remote_task import remote_task_types
29+
import clusterfuzz._internal.swarming as swarming
2830

2931

3032
class RemoteTaskGate(remote_task_types.RemoteTaskInterface):
@@ -106,6 +108,11 @@ def get_job_frequency(self):
106108

107109
def create_utask_main_job(self, module, job_type, input_download_url):
108110
"""Creates a single remote task, selecting a backend dynamically."""
111+
command = task_utils.get_command_from_module(module)
112+
if swarming.is_swarming_task(command, job_type):
113+
return self._service_map['swarming'].create_utask_main_job(
114+
module, job_type, input_download_url)
115+
109116
adapter_id = self._get_adapter()
110117
service = self._service_map[adapter_id]
111118
return service.create_utask_main_job(module, job_type, input_download_url)

src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py

Lines changed: 86 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from clusterfuzz._internal.remote_task import remote_task_adapters
2424
from clusterfuzz._internal.remote_task import remote_task_gate
2525
from clusterfuzz._internal.remote_task import remote_task_types
26+
from clusterfuzz._internal.swarming.remote_task_service import \
27+
RemoteTaskSwarmingService
2628

2729

2830
class RemoteTaskGateTest(unittest.TestCase):
@@ -32,9 +34,11 @@ def setUp(self):
3234
super().setUp()
3335
self.mock_k8s_service = mock.Mock(spec=KubernetesService)
3436
self.mock_gcp_batch_service = mock.Mock(spec=GcpBatchService)
37+
self.mock_swarming_service = mock.Mock(spec=RemoteTaskSwarmingService)
3538

3639
self.mock_k8s_service.create_utask_main_jobs.return_value = []
3740
self.mock_gcp_batch_service.create_utask_main_jobs.return_value = []
41+
self.mock_swarming_service.create_utask_main_jobs.return_value = []
3842

3943
# Patch RemoteTaskAdapters to return our mock services
4044
self.patcher = mock.patch.dict(
@@ -51,6 +55,12 @@ def setUp(self):
5155
service=mock.Mock(return_value=self.mock_gcp_batch_service),
5256
feature_flag=None,
5357
default_weight=1.0),
58+
'SWARMING':
59+
mock.Mock(
60+
id='swarming',
61+
service=mock.Mock(return_value=self.mock_swarming_service),
62+
feature_flag=None,
63+
default_weight=0.0),
5464
})
5565
self.patcher.start()
5666
self.addCleanup(self.patcher.stop)
@@ -61,41 +71,76 @@ def test_init(self):
6171
gate = remote_task_gate.RemoteTaskGate()
6272
self.assertIn('kubernetes', gate._service_map)
6373
self.assertIn('gcp_batch', gate._service_map)
74+
self.assertIn('swarming', gate._service_map)
6475
self.assertEqual(gate._service_map['kubernetes'], self.mock_k8s_service)
6576
self.assertEqual(gate._service_map['gcp_batch'],
6677
self.mock_gcp_batch_service)
78+
self.assertEqual(gate._service_map['swarming'], self.mock_swarming_service)
6779

6880
@mock.patch('random.choices')
6981
@mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency')
7082
def test_get_adapter(self, mock_get_job_frequency, mock_random_choices):
7183
"""Tests that _get_adapter returns the correct adapter based on
7284
job_frequency."""
73-
mock_get_job_frequency.return_value = {'kubernetes': 0.3, 'gcp_batch': 0.7}
85+
mock_get_job_frequency.return_value = {
86+
'kubernetes': 0.3,
87+
'gcp_batch': 0.7,
88+
'swarming': 0.0
89+
}
7490
mock_random_choices.return_value = ['gcp_batch']
7591

7692
gate = remote_task_gate.RemoteTaskGate()
7793
selected_adapter = gate._get_adapter()
7894

7995
mock_get_job_frequency.assert_called_once()
80-
mock_random_choices.assert_called_once_with(['kubernetes', 'gcp_batch'],
81-
[0.3, 0.7])
96+
mock_random_choices.assert_called_once_with(
97+
['kubernetes', 'gcp_batch', 'swarming'], [0.3, 0.7, 0.0])
8298
self.assertEqual(selected_adapter, 'gcp_batch')
8399

100+
@mock.patch('clusterfuzz._internal.swarming.is_swarming_task')
101+
@mock.patch('clusterfuzz._internal.base.tasks.task_utils.get_command_from_module')
102+
def test_create_utask_main_job_swarming_priority(self, mock_get_command,
103+
mock_is_swarming):
104+
"""Tests that create_utask_main_job prioritizes Swarming tasks."""
105+
mock_get_command.return_value = 'fuzz'
106+
mock_is_swarming.return_value = True
107+
108+
gate = remote_task_gate.RemoteTaskGate()
109+
gate.create_utask_main_job('module', 'job', 'url')
110+
111+
self.mock_swarming_service.create_utask_main_job.assert_called_once_with(
112+
'module', 'job', 'url')
113+
self.mock_k8s_service.create_utask_main_job.assert_not_called()
114+
self.mock_gcp_batch_service.create_utask_main_job.assert_not_called()
115+
116+
@mock.patch('clusterfuzz._internal.swarming.is_swarming_task')
117+
@mock.patch('clusterfuzz._internal.base.tasks.task_utils.get_command_from_module')
84118
@mock.patch.object(remote_task_gate.RemoteTaskGate, '_get_adapter')
85-
def test_create_utask_main_job_kubernetes(self, mock_get_adapter):
119+
def test_create_utask_main_job_kubernetes(self, mock_get_adapter,
120+
mock_get_command,
121+
mock_is_swarming):
86122
"""Tests that create_utask_main_job calls the Kubernetes service
87-
when kubernetes adapter is chosen."""
123+
when kubernetes adapter is chosen and it's not a swarming task."""
124+
mock_get_command.return_value = 'fuzz'
125+
mock_is_swarming.return_value = False
88126
mock_get_adapter.return_value = 'kubernetes'
89127
gate = remote_task_gate.RemoteTaskGate()
90128
gate.create_utask_main_job('module', 'job', 'url')
91129
self.mock_k8s_service.create_utask_main_job.assert_called_once_with(
92130
'module', 'job', 'url')
93131
self.mock_gcp_batch_service.create_utask_main_job.assert_not_called()
132+
self.mock_swarming_service.create_utask_main_job.assert_not_called()
94133

134+
@mock.patch('clusterfuzz._internal.swarming.is_swarming_task')
135+
@mock.patch('clusterfuzz._internal.base.tasks.task_utils.get_command_from_module')
95136
@mock.patch.object(remote_task_gate.RemoteTaskGate, '_get_adapter')
96-
def test_create_utask_main_job_gcp_batch(self, mock_get_adapter):
137+
def test_create_utask_main_job_gcp_batch(self, mock_get_adapter,
138+
mock_get_command,
139+
mock_is_swarming):
97140
"""Tests that create_utask_main_job calls the GCP Batch service
98-
when gcp_batch adapter is chosen."""
141+
when gcp_batch adapter is chosen and it's not a swarming task."""
142+
mock_get_command.return_value = 'fuzz'
143+
mock_is_swarming.return_value = False
99144
mock_get_adapter.return_value = 'gcp_batch'
100145
gate = remote_task_gate.RemoteTaskGate()
101146
gate.create_utask_main_job('module', 'job', 'url')
@@ -105,6 +150,7 @@ def test_create_utask_main_job_gcp_batch(self, mock_get_adapter):
105150
'url',
106151
)
107152
self.mock_k8s_service.create_utask_main_job.assert_not_called()
153+
self.mock_swarming_service.create_utask_main_job.assert_not_called()
108154

109155
@mock.patch.object(remote_task_gate.RemoteTaskGate, '_get_adapter')
110156
def test_create_utask_main_jobs_single_task(self, mock_get_adapter):
@@ -133,7 +179,11 @@ def test_create_utask_main_jobs_multiple_tasks_slicing(
133179
]
134180

135181
# 50% split
136-
mock_get_job_frequency.return_value = {'kubernetes': 0.5, 'gcp_batch': 0.5}
182+
mock_get_job_frequency.return_value = {
183+
'kubernetes': 0.5,
184+
'gcp_batch': 0.5,
185+
'swarming': 0.0
186+
}
137187

138188
gate = remote_task_gate.RemoteTaskGate()
139189
gate.create_utask_main_jobs(tasks)
@@ -156,13 +206,16 @@ def test_create_utask_main_jobs_remainder_distribution(
156206
]
157207

158208
# 50/50 split - one task will be a remainder
159-
mock_get_job_frequency.return_value = {'kubernetes': 0.5, 'gcp_batch': 0.5}
209+
mock_get_job_frequency.return_value = {
210+
'kubernetes': 0.5,
211+
'gcp_batch': 0.5,
212+
'swarming': 0.0
213+
}
160214

161215
gate = remote_task_gate.RemoteTaskGate()
162216
gate.create_utask_main_jobs(tasks)
163217

164218
# Expect 1 for k8s, 1 for gcp_batch, and 1 remainder distributed round robin.
165-
# In this case, first k8s gets 1, then gcp_batch gets 1, then k8s gets the last one.
166219
self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(
167220
[tasks[0], tasks[2]])
168221
self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with(
@@ -182,22 +235,18 @@ def test_create_utask_main_jobs_unscheduled(self, mock_get_job_frequency):
182235
# 0.25 each. Sum 0.5.
183236
mock_get_job_frequency.return_value = {
184237
'kubernetes': 0.25,
185-
'gcp_batch': 0.25
238+
'gcp_batch': 0.25,
239+
'swarming': 0.0
186240
}
187241

188242
gate = remote_task_gate.RemoteTaskGate()
189243
result = gate.create_utask_main_jobs(tasks)
190244

191-
# 4 * 0.25 = 1 task each.
192-
# Total assigned = 2.
193-
# Total unscheduled = 2.
194-
195245
self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(
196246
[tasks[0]])
197247
self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with(
198248
[tasks[1]])
199249

200-
# Result should contain unscheduled (tasks[2], tasks[3]).
201250
self.assertEqual(result, [tasks[2], tasks[3]])
202251

203252
@mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency')
@@ -207,7 +256,11 @@ def test_create_utask_main_jobs_full_kubernetes(self, mock_get_job_frequency):
207256
remote_task_types.RemoteTask('c', 'j', 'u1'),
208257
remote_task_types.RemoteTask('c', 'j', 'u2'),
209258
]
210-
mock_get_job_frequency.return_value = {'kubernetes': 1.0, 'gcp_batch': 0.0}
259+
mock_get_job_frequency.return_value = {
260+
'kubernetes': 1.0,
261+
'gcp_batch': 0.0,
262+
'swarming': 0.0
263+
}
211264
gate = remote_task_gate.RemoteTaskGate()
212265
gate.create_utask_main_jobs(tasks)
213266
self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(tasks)
@@ -220,7 +273,11 @@ def test_create_utask_main_jobs_full_gcp_batch(self, mock_get_job_frequency):
220273
remote_task_types.RemoteTask('c', 'j', 'u1'),
221274
remote_task_types.RemoteTask('c', 'j', 'u2'),
222275
]
223-
mock_get_job_frequency.return_value = {'kubernetes': 0.0, 'gcp_batch': 1.0}
276+
mock_get_job_frequency.return_value = {
277+
'kubernetes': 0.0,
278+
'gcp_batch': 1.0,
279+
'swarming': 0.0
280+
}
224281
gate = remote_task_gate.RemoteTaskGate()
225282
gate.create_utask_main_jobs(tasks)
226283
self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with(
@@ -238,7 +295,11 @@ def test_create_utask_main_jobs_returns_unscheduled_tasks(
238295
remote_task_types.RemoteTask('c', 'j', 'u1'),
239296
]
240297

241-
mock_get_job_frequency.return_value = {'kubernetes': 1.0, 'gcp_batch': 0.0}
298+
mock_get_job_frequency.return_value = {
299+
'kubernetes': 1.0,
300+
'gcp_batch': 0.0,
301+
'swarming': 0.0
302+
}
242303
self.mock_k8s_service.create_utask_main_jobs.return_value = unscheduled_tasks
243304

244305
gate = remote_task_gate.RemoteTaskGate()
@@ -268,6 +329,12 @@ def setUp(self):
268329
service=mock.Mock(),
269330
feature_flag=None,
270331
default_weight=1.0),
332+
'SWARMING':
333+
mock.Mock(
334+
id='swarming',
335+
service=mock.Mock(),
336+
feature_flag=None,
337+
default_weight=0.0),
271338
})
272339
self.patcher.start()
273340
self.addCleanup(self.patcher.stop)

0 commit comments

Comments
 (0)