Skip to content

Commit 44683a9

Browse files
committed
Add sleeps to ensure workers are started before starting tasks
Signed-off-by: Andrew Brain <[email protected]>
1 parent 81d7a46 commit 44683a9

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

augur/application/cli/backend.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def start(disable_collection, development, port):
8686
logger.info(f'Augur is running at: {"http" if development else "https"}://{host}:{port}')
8787

8888
processes = start_celery_worker_processes(float(worker_vmem_cap), disable_collection)
89-
time.sleep(10)
89+
9090
if os.path.exists("celerybeat-schedule.db"):
9191
logger.info("Deleting old task schedule")
9292
os.remove("celerybeat-schedule.db")
@@ -150,6 +150,7 @@ def start_celery_worker_processes(vmem_cap_ratio, disable_collection=False):
150150
available_memory_in_bytes = psutil.virtual_memory().total * vmem_cap_ratio
151151
available_memory_in_megabytes = available_memory_in_bytes / (1024 ** 2)
152152
max_process_estimate = available_memory_in_megabytes // 500
153+
sleep_time = 0
153154

154155
#Get a subset of the maximum procesess available using a ratio, not exceeding a maximum value
155156
def determine_worker_processes(ratio,maximum):
@@ -158,32 +159,39 @@ def determine_worker_processes(ratio,maximum):
158159
frontend_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=1 -n frontend:{uuid.uuid4().hex}@%h -Q frontend"
159160
max_process_estimate -= 1
160161
process_list.append(subprocess.Popen(frontend_worker.split(" ")))
162+
sleep_time += 6
161163

162164
if not disable_collection:
163165

164166
#2 processes are always reserved as a baseline.
165167
scheduling_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=2 -n scheduling:{uuid.uuid4().hex}@%h -Q scheduling"
166168
max_process_estimate -= 2
167169
process_list.append(subprocess.Popen(scheduling_worker.split(" ")))
170+
sleep_time += 6
168171

169172
#60% of estimate, Maximum value of 45
170173
core_num_processes = determine_worker_processes(.6, 45)
171174
logger.info(f"Starting core worker processes with concurrency={core_num_processes}")
172175
core_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={core_num_processes} -n core:{uuid.uuid4().hex}@%h"
173176
process_list.append(subprocess.Popen(core_worker.split(" ")))
177+
sleep_time += 6
174178

175179
#20% of estimate, Maximum value of 25
176180
secondary_num_processes = determine_worker_processes(.2, 25)
177181
logger.info(f"Starting secondary worker processes with concurrency={secondary_num_processes}")
178182
secondary_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={secondary_num_processes} -n secondary:{uuid.uuid4().hex}@%h -Q secondary"
179183
process_list.append(subprocess.Popen(secondary_worker.split(" ")))
184+
sleep_time += 6
180185

181186
#15% of estimate, Maximum value of 20
182187
facade_num_processes = determine_worker_processes(.2, 20)
183188
logger.info(f"Starting facade worker processes with concurrency={facade_num_processes}")
184189
facade_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={facade_num_processes} -n facade:{uuid.uuid4().hex}@%h -Q facade"
185190

186191
process_list.append(subprocess.Popen(facade_worker.split(" ")))
192+
sleep_time += 6
193+
194+
time.sleep(sleep_time)
187195

188196
return process_list
189197

0 commit comments

Comments
 (0)