-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Status events #70
Status events #70
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
from __future__ import annotations | ||
|
||
import time | ||
from collections import namedtuple | ||
from collections.abc import Iterable | ||
from dataclasses import dataclass, field | ||
from pathlib import Path | ||
|
@@ -22,19 +23,24 @@ class QueryExistsError(Exception): | |
pass | ||
|
||
|
||
StatusChangeEvent = namedtuple("StatusChangeEvent", ["status", "timestamp"]) | ||
|
||
|
||
@dataclass | ||
class Query: | ||
# pylint: disable=too-many-instance-attributes | ||
query_id: str | ||
current_step: Optional[Step] = field(init=False, default=None, repr=True) | ||
_status: Status = field(init=False, default=Status.UNKNOWN) | ||
start_time: Optional[float] = field(init=False, default=None) | ||
end_time: Optional[float] = field(init=False, default=None) | ||
stopped: bool = field(init=False, default=False) | ||
logger: loguru.Logger = field(init=False, repr=False) | ||
_logger_id: int = field(init=False, repr=False) | ||
step_classes: ClassVar[list[type[Step]]] = [] | ||
_log_dir: Path = settings.root_path / Path("logs") | ||
_status_history: list[StatusChangeEvent] = field( | ||
init=False, default_factory=list, repr=True | ||
) | ||
_status_dir: Path = settings.root_path / Path("status_semaphore") | ||
|
||
def __post_init__(self): | ||
|
@@ -77,23 +83,55 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]: | |
if query: | ||
return query | ||
raise e | ||
if query.status_file_path.exists(): | ||
with query.status_file_path.open("r") as f: | ||
status_str = f.readline() | ||
query.status = Status[status_str] | ||
return query | ||
return None | ||
query.load_history_from_file() | ||
if query.status == Status.UNKNOWN: | ||
return None | ||
return query | ||
|
||
def load_history_from_file(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest to add a comment showing an example how log file looks like, so we can compare and check that it is doing the right thing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's also a test that does this in the next PR |
||
if self.status_file_path.exists(): | ||
self.logger.debug( | ||
f"Loading query {self.query_id} status history " | ||
f"from file {self.status_file_path}" | ||
) | ||
with self.status_file_path.open("r") as f: | ||
for line in f: | ||
status_str, timestamp = line.split(",") | ||
self._status_history.append( | ||
StatusChangeEvent( | ||
status=Status[status_str], timestamp=float(timestamp) | ||
) | ||
) | ||
|
||
@property | ||
def _last_status_event(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it expected to be public with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is intended to be private. |
||
if not self._status_history: | ||
return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time()) | ||
return self._status_history[-1] | ||
|
||
@property | ||
def status_event_json(self): | ||
status_event = { | ||
"status": self._last_status_event.status.name, | ||
"start_time": self._last_status_event.timestamp, | ||
} | ||
if self.status >= Status.COMPLETE and len(self._status_history) >= 2: | ||
status_event["start_time"] = self._status_history[-2].timestamp | ||
status_event["end_time"] = self._last_status_event.timestamp | ||
return status_event | ||
|
||
@property | ||
def status(self) -> Status: | ||
return self._status | ||
return self._last_status_event.status | ||
|
||
@status.setter | ||
def status(self, status: Status): | ||
self._status = status | ||
with self.status_file_path.open("w") as f: | ||
self.logger.debug(f"setting status: {status=}") | ||
f.write(str(status.name)) | ||
if self.status <= Status.COMPLETE: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't see the enum definition, but does this check include failed queries as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's defined here as class Status(IntEnum):
UNKNOWN = auto()
STARTING = auto()
COMPILING = auto()
WAITING_TO_START = auto()
IN_PROGRESS = auto()
COMPLETE = auto()
KILLED = auto()
NOT_FOUND = auto()
CRASHED = auto() so this prevents changing the status once it's COMPLETE or higher (including KILLED and CRASHED). |
||
now = time.time() | ||
self._status_history.append(StatusChangeEvent(status=status, timestamp=now)) | ||
with self.status_file_path.open("a") as f: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be better to have a class that manages the history and flushes it to the backing storage, so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, that's exactly what #71 does 🙂 |
||
self.logger.debug(f"updating status: {status=}") | ||
f.write(f"{status.name},{now}\n") | ||
|
||
@property | ||
def running(self): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't you end up here with
startTime == null
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be covered above on line 86
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, is there a reason for checking
startTime
again on the line 89?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, nope! just a vestige of a refactor.