Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CTX-4079: Redesigned terminate condition for experiment worker process #26

Merged
merged 1 commit into from
Jul 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions coretex/project/experiment_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
from multiprocessing.connection import Connection

import time
import os
import logging

from ..coretex import MetricType
import psutil

from ..coretex import MetricType, Experiment
from ..networking import networkManager, NetworkRequestError
from ..coretex.experiment import Experiment
from ..logging import initializeLogger, LogSeverity
from ..folder_management import FolderManager
from ..coretex.experiment.metrics.metric_factory import createMetric


Expand Down Expand Up @@ -65,8 +70,10 @@ def setupGPUMetrics() -> None:
createMetric("gpu_usage", "time (s)", MetricType.interval, "usage (%)", MetricType.percent, None, [0, 100]),
createMetric("gpu_temperature", "time (s)", MetricType.interval, "usage (%)", MetricType.percent)
])

logging.getLogger("coretexpylib").debug(">> [Coretex] Initialized GPU metrics")
except:
pass
logging.getLogger("coretexpylib").debug(">> [Coretex] Failed to initialize GPU metrics")


def _isAlive(output: Connection) -> bool:
Expand Down Expand Up @@ -101,29 +108,51 @@ def _uploadMetrics(experiment: Experiment) -> None:
experiment.submitMetrics(metricValues)


def experimentWorker(outputStream: Connection, refreshToken: str, experimentId: int) -> None:
def experimentWorker(output: Connection, refreshToken: str, experimentId: int) -> None:
workerLogPath = FolderManager.instance().logs / f"experiment_worker_{experimentId}.log"
initializeLogger(LogSeverity.debug, workerLogPath)

currentProcess = psutil.Process(os.getpid())

setupGPUMetrics()

response = networkManager.authenticateWithRefreshToken(refreshToken)
if response.hasFailed():
sendFailure(outputStream, "Failed to authenticate with refresh token")
sendFailure(output, "Failed to authenticate with refresh token")
return

try:
experiment: Experiment = Experiment.fetchById(experimentId)
except NetworkRequestError:
sendFailure(outputStream, f"Failed to fetch experiment with id \"{experimentId}\"")
sendFailure(output, f"Failed to fetch experiment with id \"{experimentId}\"")
return

try:
experiment.createMetrics(METRICS)
except NetworkRequestError:
sendFailure(outputStream, "Failed to create metrics")
sendFailure(output, "Failed to create metrics")

sendSuccess(outputStream, "Experiment worker succcessfully started")
sendSuccess(output, "Experiment worker succcessfully started")

while _isAlive(outputStream):
while (parent := currentProcess.parent()) is not None:
logging.getLogger("coretexpylib").debug(f">> [Coretex] Worker process id {currentProcess.pid}, parent process id {parent.pid}...")

# If parent process ID is set to 1 then that means that the parent process has terminated
# the process (this is only true for Unix-based systems), but since we run the Node
# from the docker container which uses Linux as a base then it is safe to use.
#
# In docker container the pid of the Node process is 1, but we are safe to chech since the
# node should never be a parent of this process for metric upload, only the experiment
# process can be the parent.
if parent.pid == 1 or not parent.is_running() or not _isAlive(output):
logging.getLogger("coretexpylib").debug(">> [Coretex] Terminating worker process...")
break

logging.getLogger("coretexpylib").debug(">> [Coretex] Heartbeat")
_heartbeat(experiment)

logging.getLogger("coretexpylib").debug(">> [Coretex] Uploading metrics")
_uploadMetrics(experiment)

logging.getLogger("coretexpylib").debug(">> [Coretex] Sleeping for 5s")
time.sleep(5) # delay between sending generic metrics