Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

train rf model in background thread #1178

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f5d8822
- train rf model in the background while worker processes use older v…
Dec 4, 2024
642d8fc
work in progress: replace threads by multiprocessing, use shared memo…
Dec 10, 2024
7492c48
work in progress: keep handle to shared memory until change instead o…
Dec 11, 2024
d9c3867
refactor: encapsulation: move multiprocessing implementation to dedic…
Dec 11, 2024
6a449b9
delete metadata
Dec 11, 2024
413e905
- refactor: encapsulation of some code in growing array class
Dec 11, 2024
984c59f
- refactor: encapsulation: move training data init where it can be us…
Dec 11, 2024
75eb34e
minor fixes in sync semantics
Dec 11, 2024
ed50624
minor fixes in sync
Dec 11, 2024
65edb8d
add flag to allow user switch to emulating the old behavior, i.e., to…
Dec 12, 2024
818ffd0
- fix bugs in arrays backed by shared memory
Dec 12, 2024
cc89e16
disable resource tracking for shared memory instances to avoid warnings
Dec 12, 2024
880e0cc
use uuids instead of incremental ids to avoid collisions when running…
Dec 12, 2024
8458d41
minor aesthetic refactors
Dec 12, 2024
ae9b821
more minor aesthetic refactors
Dec 12, 2024
f05d0bf
- add cleanup semantics in hierarchy of objects containing random for…
Dec 13, 2024
cb76f8c
- improve sync between optimization and training loops
Dec 13, 2024
6198d8f
add flag to switch off (concurrent) background training entirely (def…
Dec 13, 2024
d956bfb
fix typo that caused bug
Dec 13, 2024
396d2f5
default enable background training to run tests
Dec 17, 2024
4b30167
use setproctitle, if available to rename rf trainer process, so it's …
Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions smac/acquisition/function/abstract_acquisition_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ class AbstractAcquisitionFunction:
def __init__(self) -> None:
self._model: AbstractModel | None = None

def close(self):
if self._model:
self._model.close()

def __del__(self):
self.close()

@property
def name(self) -> str:
"""Returns the full name of the acquisition function."""
Expand Down
7 changes: 7 additions & 0 deletions smac/acquisition/maximizer/abstract_acqusition_maximizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def __init__(
self._seed = seed
self._rng = np.random.RandomState(seed=seed)

def close(self):
if self.acquisition_function:
self.acquisition_function.close()

def __del__(self):
self.close()

@property
def acquisition_function(self) -> AbstractAcquisitionFunction | None:
"""The acquisition function used for maximization."""
Expand Down
13 changes: 13 additions & 0 deletions smac/facade/abstract_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,19 @@ def __init__(
# every time new information are available
self._optimizer.register_callback(self._intensifier.get_callback(), index=0)

def close(self):
if self._model:
self._model.close()
if self._acquisition_function:
self._acquisition_function.close()
if self._acquisition_maximizer:
self._acquisition_maximizer.close()
if self._config_selector:
self._config_selector.close()

def __del__(self):
self.close()

@property
def scenario(self) -> Scenario:
"""The scenario object which holds all environment information."""
Expand Down
9 changes: 8 additions & 1 deletion smac/intensifier/abstract_intensifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(
):
self._scenario = scenario
self._config_selector: ConfigSelector | None = None
self._config_generator: Iterator[ConfigSelector] | None = None
self._config_generator: Iterator[Configuration] | None = None
self._runhistory: RunHistory | None = None

if seed is None:
Expand All @@ -80,6 +80,13 @@ def __init__(
# Reset everything
self.reset()

def close(self):
if self._config_selector:
self._config_selector.close()

def __del__(self):
self.close()

def reset(self) -> None:
"""Reset the internal variables of the intensifier."""
self._tf_seeds: list[int] = []
Expand Down
11 changes: 11 additions & 0 deletions smac/main/config_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ def __init__(
# Processed configurations should be stored here; this is important to not return the same configuration twice
self._processed_configs: list[Configuration] = []

def close(self):
if self._model:
self._model.close()
if self._acquisition_maximizer:
self._acquisition_maximizer.close()
if self._acquisition_function:
self._acquisition_function.close()

def __del__(self):
self.close()

def _set_components(
self,
initial_design: AbstractInitialDesign,
Expand Down
6 changes: 6 additions & 0 deletions smac/model/abstract_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ def __init__(
# Initial types array which is used to reset the type array at every call to `self.train()`
self._initial_types = copy.deepcopy(self._types)

def close(self):
pass

def __del__(self):
self.close()

@property
def meta(self) -> dict[str, Any]:
"""Returns the meta data of the created object."""
Expand Down
156 changes: 156 additions & 0 deletions smac/model/random_forest/multiproc_util/GrowingSharedArray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional

import math
from multiprocessing import Lock

import uuid
import numpy as np
from numpy import typing as npt

# from multiprocessing.shared_memory import SharedMemory
from .SharedMemory import SharedMemory as UntrackableSharedMemory


def SharedMemory(*args, **kwargs) -> UntrackableSharedMemory:
return UntrackableSharedMemory(*args, track=False, **kwargs)


def dtypes_are_equal(dtype1: np.dtype, dtype2: np.dtype) -> bool:
return np.issubdtype(dtype2, dtype1) and np.issubdtype(dtype1, dtype2)


class GrowingSharedArrayReaderView:
basename_X: str = 'X'
basename_y: str = 'y'

def __init__(self, lock: Lock):
self.lock = lock
self.shm_id: Optional[int] = None
self.shm_X: Optional[SharedMemory] = None
self.shm_y: Optional[SharedMemory] = None
self.size: Optional[int] = None

def open(self, shm_id: int, size: int):
if shm_id != self.shm_id:
self.close()
self.shm_X = SharedMemory(f'{self.basename_X}_{shm_id}')
self.shm_y = SharedMemory(f'{self.basename_y}_{shm_id}')
self.shm_id = shm_id
self.size = size

def close_shm(self, unlink=False):
if self.shm_X is not None:
self.shm_X.close()
if unlink:
self.shm_X.unlink()
del self.shm_X
self.shm_X = None
if self.shm_y is not None:
self.shm_y.close()
if unlink:
self.shm_y.unlink()
del self.shm_y
self.shm_y = None
self.shm_id = None
self.size = None

def close(self):
self.close_shm()

def __del__(self):
self.close()

@property
def capacity(self) -> int:
if self.shm_y is None:
return 0
assert self.shm_y.size % np.dtype(np.float64).itemsize == 0
return self.shm_y.size // np.dtype(np.float64).itemsize

@property
def row_size(self) -> Optional[int]:
if self.shm_X is None:
return None
if self.shm_X.size == 0:
return None
assert self.shm_X.size % self.shm_y.size == 0
return self.shm_X.size // self.shm_y.size

@property
def X(self):
X = np.ndarray(shape=(self.capacity, self.row_size), dtype=np.float64, buffer=self.shm_X.buf)
return X[:self.size]

@property
def y(self):
y = np.ndarray(shape=(self.capacity,), dtype=np.float64, buffer=self.shm_y.buf)
return y[:self.size]

def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
with self.lock:
self.open(shm_id, size)
X, y = np.array(self.X), np.array(self.y) # make copies and release lock to minimize critical section

return X, y


class GrowingSharedArray(GrowingSharedArrayReaderView):
def __init__(self):
self.growth_rate = 1.5
super().__init__(lock=Lock())

def close(self):
self.close_shm(unlink=True)

def __del__(self):
self.close()

def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> None:
assert len(X) == len(y)
assert X.ndim == 2
assert y.ndim == 1
assert dtypes_are_equal(X.dtype, np.float64)
assert dtypes_are_equal(y.dtype, np.float64)
assert X.dtype.itemsize == 8
assert y.dtype.itemsize == 8

size = len(y)
grow = size > self.capacity
if grow:
if self.capacity:
n_growth = math.ceil(math.log(size / self.capacity, self.growth_rate))
capacity = int(math.ceil(self.capacity * self.growth_rate ** n_growth))
else:
assert self.shm_X is None
assert self.shm_y is None
capacity = size

shm_id = uuid.uuid4().int # self.shm_id + 1 if self.shm_id else 0

row_size = X.shape[1]
if self.row_size is not None:
assert row_size == self.row_size
shm_X = SharedMemory(f'{self.basename_X}_{shm_id}', create=True,
size=capacity * row_size * X.dtype.itemsize)
shm_y = SharedMemory(f'{self.basename_y}_{shm_id}', create=True, size=capacity * y.dtype.itemsize)

with self.lock:
if grow:
if self.capacity:
# here, before, reallocating we unlink the underlying shared memory without making sure that the
# training loop process has had a chance to close() it first, so this might lead to some warnings
# references:
# - https://stackoverflow.com/a/63004750/2447427
# - https://github.com/python/cpython/issues/84140
# - https://github.com/python/cpython/issues/82300
# - comment provides a fix that turns off tracking:
# https://github.com/python/cpython/issues/82300#issuecomment-2169035092
self.close()
self.shm_X = shm_X
self.shm_y = shm_y
self.shm_id = shm_id
self.size = size
self.X[...] = X
self.y[...] = y
Loading