Skip to content

Commit

Permalink
Merge pull request #26 from dule1322/CTX-4079
Browse files Browse the repository at this point in the history
CTX-4079: Redesigned terminate condition for experiment worker process
  • Loading branch information
igorperic17 authored Jul 11, 2023
2 parents 5ae9dbc + 5284968 commit a56f872
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions coretex/project/experiment_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
from multiprocessing.connection import Connection

import time
import os
import logging

from ..coretex import MetricType
import psutil

from ..coretex import MetricType, Experiment
from ..networking import networkManager, NetworkRequestError
from ..coretex.experiment import Experiment
from ..logging import initializeLogger, LogSeverity
from ..folder_management import FolderManager
from ..coretex.experiment.metrics.metric_factory import createMetric


Expand Down Expand Up @@ -65,8 +70,10 @@ def setupGPUMetrics() -> None:
createMetric("gpu_usage", "time (s)", MetricType.interval, "usage (%)", MetricType.percent, None, [0, 100]),
createMetric("gpu_temperature", "time (s)", MetricType.interval, "usage (%)", MetricType.percent)
])

logging.getLogger("coretexpylib").debug(">> [Coretex] Initialized GPU metrics")
except:
pass
logging.getLogger("coretexpylib").debug(">> [Coretex] Failed to initialize GPU metrics")


def _isAlive(output: Connection) -> bool:
Expand Down Expand Up @@ -101,29 +108,51 @@ def _uploadMetrics(experiment: Experiment) -> None:
experiment.submitMetrics(metricValues)


def experimentWorker(outputStream: Connection, refreshToken: str, experimentId: int) -> None:
def experimentWorker(output: Connection, refreshToken: str, experimentId: int) -> None:
workerLogPath = FolderManager.instance().logs / f"experiment_worker_{experimentId}.log"
initializeLogger(LogSeverity.debug, workerLogPath)

currentProcess = psutil.Process(os.getpid())

setupGPUMetrics()

response = networkManager.authenticateWithRefreshToken(refreshToken)
if response.hasFailed():
sendFailure(outputStream, "Failed to authenticate with refresh token")
sendFailure(output, "Failed to authenticate with refresh token")
return

try:
experiment: Experiment = Experiment.fetchById(experimentId)
except NetworkRequestError:
sendFailure(outputStream, f"Failed to fetch experiment with id \"{experimentId}\"")
sendFailure(output, f"Failed to fetch experiment with id \"{experimentId}\"")
return

try:
experiment.createMetrics(METRICS)
except NetworkRequestError:
sendFailure(outputStream, "Failed to create metrics")
sendFailure(output, "Failed to create metrics")

sendSuccess(outputStream, "Experiment worker succcessfully started")
sendSuccess(output, "Experiment worker succcessfully started")

while _isAlive(outputStream):
while (parent := currentProcess.parent()) is not None:
logging.getLogger("coretexpylib").debug(f">> [Coretex] Worker process id {currentProcess.pid}, parent process id {parent.pid}...")

# If parent process ID is set to 1 then that means that the parent process has terminated
# the process (this is only true for Unix-based systems), but since we run the Node
# from the docker container which uses Linux as a base then it is safe to use.
#
# In docker container the pid of the Node process is 1, but we are safe to chech since the
# node should never be a parent of this process for metric upload, only the experiment
# process can be the parent.
if parent.pid == 1 or not parent.is_running() or not _isAlive(output):
logging.getLogger("coretexpylib").debug(">> [Coretex] Terminating worker process...")
break

logging.getLogger("coretexpylib").debug(">> [Coretex] Heartbeat")
_heartbeat(experiment)

logging.getLogger("coretexpylib").debug(">> [Coretex] Uploading metrics")
_uploadMetrics(experiment)

logging.getLogger("coretexpylib").debug(">> [Coretex] Sleeping for 5s")
time.sleep(5) # delay between sending generic metrics

0 comments on commit a56f872

Please sign in to comment.