Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event history refactor #71

Merged
merged 11 commits into from
Jul 12, 2024
2 changes: 1 addition & 1 deletion .github/workflows/pre-commit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:

- name: Install dependencies
run: |
pip install .
pip install -e .
- name: Setup node.js
uses: actions/setup-node@v4
with:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ tmp/
IGNORE-ME*
.pyre/*
.draft
.coverage*

# local env files
.env*.local
Expand Down
14 changes: 14 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ repos:
[
"-rn", # Only display messages
]
- id: pytest
name: pytest
language: python
entry: pytest
types: [python]
pass_filenames: false
args: [--cov=sidecar]
- id : pytest-coverage
name: coverage
language: python
entry: coverage report
types: [python]
pass_filenames: false
args: [--fail-under=9] # increase this over time
- id: pyre-check
name: pyre-check
entry: pyre check
Expand Down
3 changes: 2 additions & 1 deletion .pyre_configuration
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"site_package_search_strategy": "pep561",
"source_directories": [
"sidecar"
{"import_root": ".", "source": "sidecar"}
]

}
11 changes: 11 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies=[
"pre-commit",
"cryptography",
"httpx",
"pytest",
"pytest-cov",
]

[project.scripts]
Expand All @@ -50,6 +52,15 @@ disable = [
# "R0913",
]

[tool.pylint.main]
source-roots = ["sidecar"]

[tool.black]
target-version = ["py311", ]
include = '\.pyi?$'

[tool.pytest.ini_options]
addopts = [
"--import-mode=importlib",
]
pythonpath = "sidecar"
112 changes: 32 additions & 80 deletions sidecar/app/query/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# pylint: disable=R0801
from __future__ import annotations

import time
from collections import namedtuple
from collections.abc import Iterable
from dataclasses import dataclass, field
from pathlib import Path
Expand All @@ -13,7 +10,8 @@
from ..helpers import Role
from ..logger import logger
from ..settings import settings
from .step import Status, Step
from .status import Status, StatusHistory
from .step import Step

# Dictionary to store queries
queries: dict[str, "Query"] = {}
Expand All @@ -28,25 +26,21 @@ class QueryExistsError(Exception):

@dataclass
class Query:
# pylint: disable=too-many-instance-attributes
query_id: str
current_step: Optional[Step] = field(init=False, default=None, repr=True)
start_time: Optional[float] = field(init=False, default=None)
end_time: Optional[float] = field(init=False, default=None)
stopped: bool = field(init=False, default=False)
logger: loguru.Logger = field(init=False, repr=False)
_logger_id: int = field(init=False, repr=False)
_status_history: StatusHistory = field(init=False, repr=True)
step_classes: ClassVar[list[type[Step]]] = []
_log_dir: Path = settings.root_path / Path("logs")
_status_history: list[StatusChangeEvent] = field(
init=False, default_factory=list, repr=True
)
_status_dir: Path = settings.root_path / Path("status_semaphore")

def __post_init__(self):
self.logger = logger.bind(task=self.query_id)
status_dir = settings.root_path / Path("status")
status_dir.mkdir(exist_ok=True)
status_file_path = status_dir / Path(f"{self.query_id}")
self._status_history = StatusHistory(file_path=status_file_path, logger=logger)

self._log_dir.mkdir(exist_ok=True)
self._status_dir.mkdir(exist_ok=True)
self._logger_id = logger.add(
self.log_file_path,
serialize=True,
Expand All @@ -58,17 +52,21 @@ def __post_init__(self):
raise QueryExistsError(f"{self.query_id} already exists")
queries[self.query_id] = self

@property
def _log_dir(self) -> Path:
return settings.root_path / Path("logs")

@property
def role(self) -> Role:
return settings.role

@property
def started(self) -> bool:
return self.start_time is not None
return self.status >= Status.STARTING

@property
def finished(self) -> bool:
return self.end_time is not None
return self.status >= Status.COMPLETE

@classmethod
def get_from_query_id(cls, query_id) -> Optional["Query"]:
Expand All @@ -83,55 +81,22 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]:
if query:
return query
raise e
query.load_history_from_file()
if query.status == Status.UNKNOWN:
return None
return query

def load_history_from_file(self):
if self.status_file_path.exists():
self.logger.debug(
f"Loading query {self.query_id} status history "
f"from file {self.status_file_path}"
)
with self.status_file_path.open("r") as f:
for line in f:
status_str, timestamp = line.split(",")
self._status_history.append(
StatusChangeEvent(
status=Status[status_str], timestamp=float(timestamp)
)
)

@property
def _last_status_event(self):
if not self._status_history:
return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time())
return self._status_history[-1]

@property
def status_event_json(self):
status_event = {
"status": self._last_status_event.status.name,
"start_time": self._last_status_event.timestamp,
}
if self.status >= Status.COMPLETE and len(self._status_history) >= 2:
status_event["start_time"] = self._status_history[-2].timestamp
status_event["end_time"] = self._last_status_event.timestamp
return status_event

@property
def status(self) -> Status:
return self._last_status_event.status
return self._status_history.current_status

@status.setter
def status(self, status: Status):
if self.status <= Status.COMPLETE:
now = time.time()
self._status_history.append(StatusChangeEvent(status=status, timestamp=now))
with self.status_file_path.open("a") as f:
self.logger.debug(f"updating status: {status=}")
f.write(f"{status.name},{now}\n")
if self.status != status and self.status <= Status.COMPLETE:
self._status_history.add(status)

@property
def status_event_json(self):
return self._status_history.status_event_json

@property
def running(self):
Expand All @@ -141,18 +106,12 @@ def running(self):
def log_file_path(self) -> Path:
return self._log_dir / Path(f"{self.query_id}.log")

@property
def status_file_path(self) -> Path:
return self._status_dir / Path(f"{self.query_id}")

@property
def steps(self) -> Iterable[Step]:
for step_class in self.step_classes:
if not self.stopped:
yield step_class.build_from_query(self)
yield step_class.build_from_query(self)

def start(self):
self.start_time = time.time()
try:
for step in self.steps:
if self.finished:
Expand Down Expand Up @@ -180,37 +139,30 @@ def finish(self):
self._cleanup()

def kill(self):
self.status = Status.KILLED
self.logger.info(f"Killing: {self=}")
if self.current_step:
self.current_step.terminate()
if self.running:
self.status = Status.KILLED
self.logger.info(f"Killing: {self=}")
if self.current_step:
self.current_step.terminate()
self._cleanup()

def crash(self):
self.status = Status.CRASHED
self.logger.info(f"CRASHING! {self=}")
if self.current_step:
self.current_step.kill()
if self.running:
self.status = Status.CRASHED
self.logger.info(f"CRASHING! {self=}")
if self.current_step:
self.current_step.kill()
self._cleanup()

def _cleanup(self):
self.current_step = None
self.end_time = time.time()
try:
logger.remove(self._logger_id)
except ValueError:
pass
if queries.get(self.query_id) is not None:
del queries[self.query_id]

@property
def run_time(self):
if not self.start_time:
return 0
if not self.end_time:
return time.time() - self.start_time
return self.end_time - self.start_time

@property
def cpu_usage_percent(self) -> float:
if self.current_step:
Expand Down
83 changes: 83 additions & 0 deletions sidecar/app/query/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import annotations

import time
from dataclasses import dataclass, field
from enum import IntEnum, auto
from pathlib import Path
from typing import NamedTuple

import loguru


class Status(IntEnum):
UNKNOWN = auto()
NOT_FOUND = auto()
STARTING = auto()
COMPILING = auto()
WAITING_TO_START = auto()
IN_PROGRESS = auto()
COMPLETE = auto()
KILLED = auto()
CRASHED = auto()


StatusChangeEvent = NamedTuple(
"StatusChangeEvent", [("status", Status), ("timestamp", float)]
)


@dataclass
class StatusHistory:
file_path: Path = field(init=True, repr=False)
logger: loguru.Logger = field(init=True, repr=False, compare=False)
_status_history: list[StatusChangeEvent] = field(
init=False, default_factory=list, repr=True
)

def __post_init__(self):
if self.file_path.exists():
self.logger.debug(f"Loading status history from file {self.file_path}")
with self.file_path.open("r", encoding="utf8") as f:
for line in f:
status_str, timestamp = line.split(",")
self._status_history.append(
StatusChangeEvent(
status=Status[status_str], timestamp=float(timestamp)
)
)

@property
def locking_status(self):
"""Cannot add to history after this or higher status is reached"""
return Status.COMPLETE

def add(self, status: Status, timestamp: float = time.time()):
assert status > self.current_status
assert self.current_status < self.locking_status
self._status_history.append(
StatusChangeEvent(status=status, timestamp=timestamp)
)
with self.file_path.open("a", encoding="utf8") as f:
self.logger.debug(f"updating status: {status=}")
f.write(f"{status.name},{timestamp}\n")

@property
def current_status_event(self):
if not self._status_history:
return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time())
return self._status_history[-1]

@property
def current_status(self):
return self.current_status_event.status

@property
def status_event_json(self):
status_event = {
"status": self.current_status_event.status.name,
"start_time": self.current_status_event.timestamp,
}
if self.current_status >= Status.COMPLETE and len(self._status_history) >= 2:
status_event["start_time"] = self._status_history[-2].timestamp
status_event["end_time"] = self.current_status_event.timestamp
return status_event
14 changes: 1 addition & 13 deletions sidecar/app/query/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,17 @@
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import IntEnum, auto
from typing import TYPE_CHECKING, ClassVar, Optional

import loguru

from .command import Command
from .status import Status

if TYPE_CHECKING:
from .base import QueryTypeT


class Status(IntEnum):
UNKNOWN = auto()
STARTING = auto()
COMPILING = auto()
WAITING_TO_START = auto()
IN_PROGRESS = auto()
COMPLETE = auto()
KILLED = auto()
NOT_FOUND = auto()
CRASHED = auto()


@dataclass(kw_only=True)
class Step(ABC):
skip: bool = field(init=False, default=False)
Expand Down
Loading
Loading