Skip to content

Commit

Permalink
Improve worker error reporting and allow more RAM.
Browse files Browse the repository at this point in the history
By changing the way containers are started, we can now better report on
the cause of a failure.

RAM allowance is increased, because it sometimes caused a 137 (OOM)
which would kill the container.
  • Loading branch information
GJFR committed Dec 4, 2024
1 parent 95358d5 commit eff772b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
39 changes: 29 additions & 10 deletions bci/distribution/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def __run_container(self, params: WorkerParameters, blocking_wait=True) -> None:
container_name = f'bh_worker_{container_id}'

def start_container_thread():
if (host_pwd := os.getenv('HOST_PWD', None)) is None:
raise AttributeError('Could not find HOST_PWD environment var')
try:
# Sometimes, it takes a while for Docker to remove the container
while True:
Expand All @@ -60,17 +62,20 @@ def start_container_thread():
for container in active_containers:
logger.info(f'Removing old container \'{container.attrs["Name"]}\' to start new one')
container.remove(force=True)
if (host_pwd := os.getenv('HOST_PWD', None)) is None:
raise AttributeError('Could not find HOST_PWD environment var')
self.client.containers.run(
except docker.errors.APIError:
logger.error("Could not consult list of active containers", exc_info=True)

container = None
try:
container = self.client.containers.run(
f'bughog/worker:{Global.get_tag()}',
name=container_name,
hostname=container_name,
shm_size='2gb',
network='bh_net',
mem_limit='1g', # To prevent one container from consuming multiple gigs of memory (was the case for a Firefox evaluation)
detach=False,
remove=True,
mem_limit='4g', # To prevent one container from consuming multiple gigs of memory (was the case for a Firefox evaluation)
mem_reservation='2g',
detach=True,
labels=['bh_worker'],
command=[params.serialize()],
volumes=[
Expand All @@ -86,20 +91,34 @@ def start_container_thread():
'/dev/shm:/dev/shm',
],
)
logger.debug(f"Container '{container_name}' finished experiments for '{params.state}'")
Clients.push_results_to_all()
result = container.wait()
if result["StatusCode"] != 0:
logger.error(
f"'{container_name}' exited unexpectedly with status code {result['StatusCode']}. "
"Check the worker logs in ./logs/ for more information."
)
else:
logger.debug(f"Container '{container_name}' finished experiments for '{params.state}'")
Clients.push_results_to_all()
except docker.errors.APIError:
logger.error("Received a docker error", exc_info=True)
except docker.errors.ContainerError:
logger.error(
f"Could not run container '{container_name}' or container was unexpectedly removed", exc_info=True
)
if container is not None:
container_info = container.attrs["State"]
logger.error(f"'{container_name}' exited unexpectedly with {container_info}", exc_info=True)
finally:
if container is not None:
container.remove()
self.container_id_pool.put(container_id)

thread = threading.Thread(target=start_container_thread)
thread.start()
logger.info(f"Container '{container_name}' started experiments for '{params.state}'")
# To avoid race-condition where more than max containers are started
time.sleep(3)
# Sleep to avoid all workers downloading browser binaries at once, clogging up all IO.
time.sleep(5)

def get_nb_of_running_worker_containers(self):
return len(self.get_runnning_containers())
Expand Down
9 changes: 5 additions & 4 deletions bci/evaluations/evaluation_framework.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import os
import re
import traceback
from abc import ABC, abstractmethod

from bci.browser.configuration.browser import Browser
Expand All @@ -17,7 +16,7 @@ class EvaluationFramework(ABC):
def __init__(self):
self.should_stop = False

def evaluate(self, worker_params: WorkerParameters):
def evaluate(self, worker_params: WorkerParameters, is_worker=False):
test_params = worker_params.create_test_params()

if MongoDB().has_result(test_params):
Expand All @@ -42,8 +41,10 @@ def evaluate(self, worker_params: WorkerParameters):
logger.info(f'Test finalized: {test_params}')
except Exception as e:
state.condition = StateCondition.FAILED
logger.error('An error occurred during evaluation', exc_info=True)
traceback.print_exc()
if is_worker:
raise e
else:
logger.error('An error occurred during evaluation', exc_info=True)
finally:
browser.post_test_cleanup()

Expand Down
8 changes: 7 additions & 1 deletion bci/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ def run(params: WorkerParameters):
evaluation_framework = CustomEvaluationFramework()
# browser_build, repo_state = get_browser_build_and_repo_state(params)

evaluation_framework.evaluate(params)
try:
evaluation_framework.evaluate(params, is_worker=True)
except Exception:
logger.fatal("An exception occurred during evaluation", exc_info=True)
logging.shutdown()
os._exit(1)


if __name__ == '__main__':
Expand All @@ -35,4 +40,5 @@ def run(params: WorkerParameters):
logger.info('Worker started')
run(params)
logger.info('Worker finished, exiting...')
logging.shutdown()
os._exit(0)

0 comments on commit eff772b

Please sign in to comment.