Skip to content

Commit

Permalink
Simplify trigger_benchmark coordinator workflow
Browse files Browse the repository at this point in the history
Remove the usage of groups and chains and simply trigger the jobs one
after another.
The chain approach lead to issues where the filename argument would get
duplicated if multiple groups were triggered due to the nature of
argument passing in celery chaining.

Signed-off-by: Colin Wilk <[email protected]>
  • Loading branch information
kliwniloc committed Jul 22, 2024
1 parent bb60db1 commit 0fda3ad
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 26 deletions.
32 changes: 16 additions & 16 deletions src/benchmarking_tool/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import datetime
import time
import uuid
from itertools import product
from typing import Any, NoReturn, Self

import schedule
from celery import chain, group
from loguru import logger as l

from .celery_app import run_benchmark, worker
Expand Down Expand Up @@ -89,27 +89,27 @@ def __schedule_every_2_hours(self) -> None:
def __init__(self) -> None:
pass

def trigger_benchmark(self) -> list:
def trigger_benchmark(self) -> None:
trigger_time: datetime.datetime = datetime.datetime.now()
wave_id: str = str(uuid.uuid4())
l.info("Triggering Benchmark")
all_results: list = []
for filename in self.filenames:
groups: list = []
for worker_group in self.worker_groups:
workers: set[Any] = worker.get_workers(worker_group)
grouped_tasks = group(
for worker_group, filename in product(self.worker_groups, self.filenames):
workers: set[Any] = worker.get_workers(worker_group)
for worker_instance in workers:
queue_name = worker_instance.decode()
print(f"Queue: {queue_name}, Worker: {worker}, Filename: {filename}")
task = (
run_benchmark.s(
filename=filename, wave_id=wave_id, timestamp=trigger_time
).set(queue=worker.decode())
for worker in workers
)
.set(queue=queue_name)
.apply_async()
)
groups.append(grouped_tasks)

chained_tasks = chain(*groups)
result = chained_tasks.apply_async()
all_results.append(result)
return all_results
l.info(
f"Created task for group {worker_group}, "
+ f"worker {worker_instance}, queue {queue_name}: {task}"
)
task.get() # Wait until the task is done

def run(self) -> NoReturn:
self.__schedule_every_2_hours()
Expand Down
12 changes: 2 additions & 10 deletions src/benchmarking_tool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,6 @@
help="Trigger a Job immediately and then exit. Useful for testing",
action="store_true",
)
parser_coordinator.add_argument(
"-w",
"--wait",
help="Wait for all Jobs to finish before exiting.",
action="store_true",
)

if __name__ == "__main__":
args = parser.parse_args()
Expand All @@ -185,9 +179,7 @@
Coordinator().set_worker_groups(args.groups).set_filenames(args.filenames)
)
if args.trigger:
all_results: list = coordinator.trigger_benchmark()
if args.wait:
for result in all_results:
result.get()
coordinator.trigger_benchmark()

else:
coordinator.run()

0 comments on commit 0fda3ad

Please sign in to comment.