-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Query manager #72
Query manager #72
Changes from 8 commits
74f9d04
ec73ca2
acf8270
cf26f7d
cd11441
64d4eb1
5f28cca
94a7d2e
875e519
73c9ad1
2fd072a
ba349b9
51aad4c
3e7257b
b3d260b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,54 +8,63 @@ | |
import loguru | ||
|
||
from ..helpers import Role | ||
from ..logger import logger | ||
from ..settings import settings | ||
from ..logger import get_logger | ||
from ..settings import get_settings | ||
from .status import Status, StatusHistory | ||
from .step import Step | ||
|
||
# Dictionary to store queries | ||
queries: dict[str, "Query"] = {} | ||
|
||
|
||
class QueryExistsError(Exception): | ||
pass | ||
|
||
|
||
def status_file_path(query_id: str) -> Path: | ||
settings = get_settings() | ||
return settings.status_dir_path / Path(query_id) | ||
akoshelev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def log_file_path(query_id: str) -> Path: | ||
settings = get_settings() | ||
return settings.log_dir_path / Path(query_id) | ||
|
||
|
||
@dataclass | ||
class Query: | ||
# pylint: disable=too-many-instance-attributes | ||
query_id: str | ||
current_step: Optional[Step] = field(init=False, default=None, repr=True) | ||
logger: loguru.Logger = field(init=False, repr=False) | ||
_logger_id: int = field(init=False, repr=False) | ||
logger: loguru.Logger = field(init=False, repr=False, compare=False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
_logger_id: int = field(init=False, repr=False, compare=False) | ||
role: Role = field(init=False, repr=True) | ||
_status_history: StatusHistory = field(init=False, repr=True) | ||
step_classes: ClassVar[list[type[Step]]] = [] | ||
|
||
def __post_init__(self): | ||
self.logger = logger.bind(task=self.query_id) | ||
status_dir = settings.root_path / Path("status") | ||
status_dir.mkdir(exist_ok=True) | ||
status_file_path = status_dir / Path(f"{self.query_id}") | ||
self._status_history = StatusHistory(file_path=status_file_path, logger=logger) | ||
|
||
self._log_dir.mkdir(exist_ok=True) | ||
self._logger_id = logger.add( | ||
settings = get_settings() | ||
_logger = get_logger() | ||
|
||
self.logger = _logger.bind(task=self.query_id) | ||
self.role = settings.role | ||
|
||
self._status_history = StatusHistory( | ||
file_path=self.status_file_path, logger=self.logger | ||
) | ||
|
||
self._logger_id = self.logger.add( | ||
self.log_file_path, | ||
serialize=True, | ||
filter=lambda record: record["extra"].get("task") == self.query_id, | ||
enqueue=True, | ||
) | ||
self.logger.debug(f"adding new Query {self}.") | ||
if queries.get(self.query_id) is not None: | ||
raise QueryExistsError(f"{self.query_id} already exists") | ||
queries[self.query_id] = self | ||
|
||
@property | ||
def _log_dir(self) -> Path: | ||
return settings.root_path / Path("logs") | ||
def status_file_path(self) -> Path: | ||
return status_file_path(self.query_id) | ||
|
||
@property | ||
def role(self) -> Role: | ||
return settings.role | ||
def log_file_path(self) -> Path: | ||
return log_file_path(self.query_id) | ||
|
||
@property | ||
def started(self) -> bool: | ||
|
@@ -65,23 +74,6 @@ def started(self) -> bool: | |
def finished(self) -> bool: | ||
return self.status >= Status.COMPLETE | ||
|
||
@classmethod | ||
def get_from_query_id(cls, query_id) -> Optional["Query"]: | ||
query = queries.get(query_id) | ||
if query: | ||
return query | ||
try: | ||
query = cls(query_id) | ||
except QueryExistsError as e: | ||
# avoid race condition on queries | ||
query = queries.get(query_id) | ||
if query: | ||
return query | ||
raise e | ||
if query.status == Status.UNKNOWN: | ||
return None | ||
return query | ||
|
||
@property | ||
def status(self) -> Status: | ||
return self._status_history.current_status | ||
|
@@ -99,10 +91,6 @@ def status_event_json(self): | |
def running(self): | ||
return self.started and not self.finished | ||
|
||
@property | ||
def log_file_path(self) -> Path: | ||
return self._log_dir / Path(f"{self.query_id}.log") | ||
|
||
@property | ||
def steps(self) -> Iterable[Step]: | ||
for step_class in self.step_classes: | ||
|
@@ -154,11 +142,9 @@ def crash(self): | |
def _cleanup(self): | ||
self.current_step = None | ||
try: | ||
logger.remove(self._logger_id) | ||
self.logger.remove(self._logger_id) | ||
except ValueError: | ||
pass | ||
if queries.get(self.query_id) is not None: | ||
del queries[self.query_id] | ||
|
||
@property | ||
def cpu_usage_percent(self) -> float: | ||
|
@@ -174,3 +160,37 @@ def memory_rss_usage(self) -> int: | |
|
||
|
||
QueryTypeT = TypeVar("QueryTypeT", bound=Query) | ||
|
||
|
||
class MaxQueriesRunningError(Exception): | ||
pass | ||
|
||
|
||
@dataclass | ||
class QueryManager: | ||
eriktaubeneck marked this conversation as resolved.
Show resolved
Hide resolved
|
||
max_parallel_queries: int = field(init=True, repr=False, default=1) | ||
running_queries: dict[str, Query] = field( | ||
init=False, repr=True, default_factory=dict | ||
) | ||
|
||
def get_from_query_id(self, cls, query_id: str) -> Optional[Query]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which are the different classes of Query? |
||
if query_id in self.running_queries: | ||
return self.running_queries[query_id] | ||
if status_file_path(query_id).exists(): | ||
return cls(query_id) | ||
return None | ||
|
||
def run_query(self, query: Query): | ||
print("run called") | ||
eriktaubeneck marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not self.capacity_available: | ||
raise MaxQueriesRunningError( | ||
f"Only {self.max_parallel_queries} allowed. Currently running {self}" | ||
) | ||
|
||
self.running_queries[query.query_id] = query | ||
query.start() | ||
del self.running_queries[query.query_id] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is If it is a blocking call, should we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's blocking. It's defined here, and it's already wrapped in a |
||
|
||
@property | ||
def capacity_available(self): | ||
return len(self.running_queries) < self.max_parallel_queries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how expensive is to reconfigure the logger each time this function is called? is it a mistake to call it more than once and is it also ok to call it from more than one thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unclear, and easy enough to fix. I can put the logger on settings, so it's only called once. commit coming.