From 6af84f2f5a9b690198a774d23c870e170d4453a1 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Mon, 8 Jul 2024 18:55:35 -0700 Subject: [PATCH 01/10] store status changes with timestamps, show run time based on those timestamps --- server/app/query/servers.tsx | 49 +++++++++++++++------- server/app/query/view/[id]/components.tsx | 27 +++++++++++- server/app/query/view/[id]/page.tsx | 28 +++++++++---- sidecar/app/query/base.py | 51 +++++++++++++++++++---- sidecar/app/routes/websockets.py | 9 ++-- 5 files changed, 127 insertions(+), 37 deletions(-) diff --git a/server/app/query/servers.tsx b/server/app/query/servers.tsx index 40494ef..610c3f7 100644 --- a/server/app/query/servers.tsx +++ b/server/app/query/servers.tsx @@ -59,11 +59,20 @@ export const initialStatsByRemoteServer: StatsByRemoteServer = Object.values(RemoteServerNames).map((serverName) => [[serverName], []]), ); -export type RunTimeByRemoteServer = { +export type StartTimeByRemoteServer = { [key in RemoteServerNames]: number | null; }; -export const initialRunTimeByRemoteServer: RunTimeByRemoteServer = +export type EndTimeByRemoteServer = { + [key in RemoteServerNames]: number | null; +}; + +export const initialStartTimeByRemoteServer: StartTimeByRemoteServer = + Object.fromEntries( + Object.values(RemoteServerNames).map((serverName) => [[serverName], null]), + ); + +export const initialEndTimeByRemoteServer: StartTimeByRemoteServer = Object.fromEntries( Object.values(RemoteServerNames).map((serverName) => [[serverName], null]), ); @@ -179,6 +188,8 @@ export class RemoteServer { openStatusSocket( id: string, setStatus: React.Dispatch>, + setStartTime: React.Dispatch>, + setEndTime: React.Dispatch>, ): WebSocket { const ws = this.statusSocket(id); @@ -189,8 +200,29 @@ export class RemoteServer { })); }; + const updateStartTime = (runTime: number) => { + setStartTime((prevStartTime) => { + return { + ...prevStartTime, + [this.remoteServerName]: runTime, + }; + }); + }; + + const updateEndTime = (runTime: number) => { + setEndTime((prevEndTime) => { + return { + ...prevEndTime, + [this.remoteServerName]: runTime, + }; + }); + }; + ws.onmessage = (event) => { - const statusString: string = JSON.parse(event.data).status; + const eventData = JSON.parse(event.data); + updateStartTime(eventData.start_time); + updateEndTime(eventData.end_time ?? null); + const statusString: string = eventData.status; const status = getStatusFromString(statusString); updateStatus(status); }; @@ -207,7 +239,6 @@ export class RemoteServer { openStatsSocket( id: string, setStats: React.Dispatch>, - setRunTime: React.Dispatch>, ): WebSocket { const ws = this.statsSocket(id); @@ -221,18 +252,8 @@ export class RemoteServer { }); }; - const updateRunTime = (runTime: number) => { - setRunTime((prevRunTime) => { - return { - ...prevRunTime, - [this.remoteServerName]: runTime, - }; - }); - }; - ws.onmessage = (event) => { const eventData = JSON.parse(event.data); - updateRunTime(eventData.run_time); const statsDataPoint: StatsDataPoint = { timestamp: eventData.timestamp, memoryRSSUsage: eventData.memory_rss_usage, diff --git a/server/app/query/view/[id]/components.tsx b/server/app/query/view/[id]/components.tsx index 73c6927..7e3e1b8 100644 --- a/server/app/query/view/[id]/components.tsx +++ b/server/app/query/view/[id]/components.tsx @@ -1,3 +1,4 @@ +import { useEffect, useState, useRef } from "react"; import { Source_Code_Pro } from "next/font/google"; import clsx from "clsx"; import { ChevronDownIcon, ChevronRightIcon } from "@heroicons/react/24/solid"; @@ -65,12 +66,34 @@ function secondsToTime(e: number) { export function RunTimePill({ status, - runTime, + startTime, + endTime, }: { status: Status; - runTime: number | null; + startTime: number | null; + endTime: number | null; }) { + const [runTime, setRunTime] = useState(0); const runTimeStr = runTime ? secondsToTime(runTime) : "N/A"; + const intervalId = useRef | null>(null); + + useEffect(() => { + if (startTime !== null) { + if (endTime !== null && startTime !== null) { + setRunTime(endTime - startTime); + } else { + if (intervalId.current !== null) { + clearTimeout(intervalId.current); + } + + let newIntervalId = setInterval(() => { + setRunTime(Date.now() / 1000 - startTime); + }, 1000); + intervalId.current = newIntervalId; + } + } + }, [startTime, endTime]); + return (
{runTimeStr} diff --git a/server/app/query/view/[id]/page.tsx b/server/app/query/view/[id]/page.tsx index 166c50f..083d5f1 100644 --- a/server/app/query/view/[id]/page.tsx +++ b/server/app/query/view/[id]/page.tsx @@ -16,10 +16,12 @@ import { IPARemoteServers, //hack until the queryId is stored in a DB StatusByRemoteServer, StatsByRemoteServer, - RunTimeByRemoteServer, + StartTimeByRemoteServer, + EndTimeByRemoteServer, initialStatusByRemoteServer, initialStatsByRemoteServer, - initialRunTimeByRemoteServer, + initialStartTimeByRemoteServer, + initialEndTimeByRemoteServer, } from "@/app/query/servers"; import { StatsComponent } from "@/app/query/view/[id]/charts"; import { JSONSafeParse } from "@/app/utils"; @@ -48,8 +50,10 @@ export default function QueryPage({ params }: { params: { id: string } }) { useState(initialStatusByRemoteServer); const [statsByRemoteServer, setStatsByRemoteServer] = useState(initialStatsByRemoteServer); - const [runTimeByRemoteServer, setRunTimeByRemoteServer] = - useState(initialRunTimeByRemoteServer); + const [startTimeByRemoteServer, setStartTimeByRemoteServer] = + useState(initialStartTimeByRemoteServer); + const [endTimeByRemoteServer, setEndTimeByRemoteServer] = + useState(initialEndTimeByRemoteServer); function flipLogsHidden() { setLogsHidden(!logsHidden); @@ -108,11 +112,12 @@ export default function QueryPage({ params }: { params: { id: string } }) { const statusWs = remoteServer.openStatusSocket( query.uuid, setStatusByRemoteServer, + setStartTimeByRemoteServer, + setEndTimeByRemoteServer, ); const statsWs = remoteServer.openStatsSocket( query.uuid, setStatsByRemoteServer, - setRunTimeByRemoteServer, ); webSockets = [...webSockets, loggingWs, statusWs, statsWs]; } @@ -212,8 +217,11 @@ export default function QueryPage({ params }: { params: { id: string } }) {
{Object.values(IPARemoteServers).map( (remoteServer: RemoteServer) => { - const runTime = - runTimeByRemoteServer[remoteServer.remoteServerName]; + const startTime = + startTimeByRemoteServer[remoteServer.remoteServerName]; + const endTime = + endTimeByRemoteServer[remoteServer.remoteServerName]; + const status = statusByRemoteServer[remoteServer.remoteServerName] ?? Status.UNKNOWN; @@ -227,7 +235,11 @@ export default function QueryPage({ params }: { params: { id: string } }) { {remoteServer.toString()} Run Time
- +
); diff --git a/sidecar/app/query/base.py b/sidecar/app/query/base.py index f3b1332..ff416d3 100644 --- a/sidecar/app/query/base.py +++ b/sidecar/app/query/base.py @@ -2,6 +2,7 @@ from __future__ import annotations import time +from collections import namedtuple from collections.abc import Iterable from dataclasses import dataclass, field from pathlib import Path @@ -22,12 +23,14 @@ class QueryExistsError(Exception): pass +StatusChangeEvent = namedtuple("StatusChangeEvent", ["status", "timestamp"]) + + @dataclass class Query: # pylint: disable=too-many-instance-attributes query_id: str current_step: Optional[Step] = field(init=False, default=None, repr=True) - _status: Status = field(init=False, default=Status.UNKNOWN) start_time: Optional[float] = field(init=False, default=None) end_time: Optional[float] = field(init=False, default=None) stopped: bool = field(init=False, default=False) @@ -35,6 +38,9 @@ class Query: _logger_id: int = field(init=False, repr=False) step_classes: ClassVar[list[type[Step]]] = [] _log_dir: Path = settings.root_path / Path("logs") + _status_history: list[StatusChangeEvent] = field( + init=False, default_factory=list, repr=True + ) _status_dir: Path = settings.root_path / Path("status_semaphore") def __post_init__(self): @@ -79,21 +85,48 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]: raise e if query.status_file_path.exists(): with query.status_file_path.open("r") as f: - status_str = f.readline() - query.status = Status[status_str] - return query + for line in f: + status_str, timestamp = line.split(",") + query.add_status_event( + status=Status[status_str], timestamp=float(timestamp) + ) + return query return None + @property + def _last_status_event(self): + if not self._status_history: + return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time()) + return self._status_history[-1] + + @property + def status_event_json(self): + status_event = { + "status": self._last_status_event.status.name, + "start_time": self._last_status_event.timestamp, + } + if self.status >= Status.COMPLETE and len(self._status_history) >= 2: + status_event["start_time"] = self._status_history[-2].timestamp + status_event["end_time"] = self._last_status_event.timestamp + return status_event + @property def status(self) -> Status: - return self._status + return self._last_status_event.status @status.setter def status(self, status: Status): - self._status = status - with self.status_file_path.open("w") as f: - self.logger.debug(f"setting status: {status=}") - f.write(str(status.name)) + if self.status <= Status.COMPLETE: + now = time.time() + self.add_status_event(status, now) + + def add_status_event(self, status: Status, timestamp: float): + self._status_history.append( + StatusChangeEvent(status=status, timestamp=timestamp) + ) + with self.status_file_path.open("a") as f: + self.logger.debug(f"updating status: {status=}") + f.write(f"{status.name},{timestamp}\n") @property def running(self): diff --git a/sidecar/app/routes/websockets.py b/sidecar/app/routes/websockets.py index fb5db78..c915085 100644 --- a/sidecar/app/routes/websockets.py +++ b/sidecar/app/routes/websockets.py @@ -34,15 +34,17 @@ async def status_websocket(websocket: WebSocket, query_id: str): async with use_websocket(websocket) as websocket: if query is None: logger.warning(f"{query_id=} Status: {Status.NOT_FOUND.name}") - await websocket.send_json({"status": Status.NOT_FOUND.name}) + await websocket.send_json( + {"status": Status.NOT_FOUND.name, "start_time": time.time()} + ) else: while query.running: logger.debug(f"{query_id=} Status: {query.status.name}") - await websocket.send_json({"status": query.status.name}) + await websocket.send_json(query.status_event_json) await asyncio.sleep(1) logger.debug(f"{query_id=} Status: {query.status.name}") - await websocket.send_json({"status": query.status.name}) + await websocket.send_json(query.status_event_json) @router.websocket("/logs/{query_id}") @@ -81,7 +83,6 @@ async def stats_websocket(websocket: WebSocket, query_id: str): while query.running: await websocket.send_json( { - "run_time": query.run_time, "cpu_percent": query.cpu_usage_percent, "memory_rss_usage": query.memory_rss_usage, "timestamp": time.time(), From 5857e716b022d6a1f96e72a183b1c3f44b917e1c Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Tue, 9 Jul 2024 12:50:12 -0700 Subject: [PATCH 02/10] fix some bugs --- server/app/query/view/[id]/components.tsx | 15 ++++++---- sidecar/app/query/base.py | 35 +++++++++++++---------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/server/app/query/view/[id]/components.tsx b/server/app/query/view/[id]/components.tsx index 7e3e1b8..2cacdc8 100644 --- a/server/app/query/view/[id]/components.tsx +++ b/server/app/query/view/[id]/components.tsx @@ -73,19 +73,22 @@ export function RunTimePill({ startTime: number | null; endTime: number | null; }) { - const [runTime, setRunTime] = useState(0); + const [runTime, setRunTime] = useState(null); const runTimeStr = runTime ? secondsToTime(runTime) : "N/A"; const intervalId = useRef | null>(null); useEffect(() => { - if (startTime !== null) { + if (intervalId.current !== null) { + // any time startTime or endTime change, we remove the old setInterval + // which runs the timer. if a new one is needed, it's created. + clearTimeout(intervalId.current); + } + if (startTime === null) { + setRunTime(null); + } else { if (endTime !== null && startTime !== null) { setRunTime(endTime - startTime); } else { - if (intervalId.current !== null) { - clearTimeout(intervalId.current); - } - let newIntervalId = setInterval(() => { setRunTime(Date.now() / 1000 - startTime); }, 1000); diff --git a/sidecar/app/query/base.py b/sidecar/app/query/base.py index ff416d3..cf63149 100644 --- a/sidecar/app/query/base.py +++ b/sidecar/app/query/base.py @@ -83,15 +83,25 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]: if query: return query raise e - if query.status_file_path.exists(): - with query.status_file_path.open("r") as f: + query.load_history_from_file() + if query.status == Status.UNKNOWN: + return None + return query + + def load_history_from_file(self): + if self.status_file_path.exists(): + self.logger.debug( + f"Loading query {self.query_id} status history " + f"from file {self.status_file_path}" + ) + with self.status_file_path.open("r") as f: for line in f: status_str, timestamp = line.split(",") - query.add_status_event( - status=Status[status_str], timestamp=float(timestamp) + self._status_history.append( + StatusChangeEvent( + status=Status[status_str], timestamp=float(timestamp) + ) ) - return query - return None @property def _last_status_event(self): @@ -118,15 +128,10 @@ def status(self) -> Status: def status(self, status: Status): if self.status <= Status.COMPLETE: now = time.time() - self.add_status_event(status, now) - - def add_status_event(self, status: Status, timestamp: float): - self._status_history.append( - StatusChangeEvent(status=status, timestamp=timestamp) - ) - with self.status_file_path.open("a") as f: - self.logger.debug(f"updating status: {status=}") - f.write(f"{status.name},{timestamp}\n") + self._status_history.append(StatusChangeEvent(status=status, timestamp=now)) + with self.status_file_path.open("a") as f: + self.logger.debug(f"updating status: {status=}") + f.write(f"{status.name},{now}\n") @property def running(self): From b93d672f86a26473b096df6c171c15434a27c623 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Wed, 10 Jul 2024 10:21:23 -0700 Subject: [PATCH 03/10] refactor StatusHistory into it's own dataclass --- sidecar/app/query/base.py | 158 +++++++++++++++++++------------------- 1 file changed, 81 insertions(+), 77 deletions(-) diff --git a/sidecar/app/query/base.py b/sidecar/app/query/base.py index cf63149..ca2248b 100644 --- a/sidecar/app/query/base.py +++ b/sidecar/app/query/base.py @@ -2,11 +2,10 @@ from __future__ import annotations import time -from collections import namedtuple from collections.abc import Iterable from dataclasses import dataclass, field from pathlib import Path -from typing import ClassVar, Optional, TypeVar +from typing import ClassVar, NamedTuple, Optional, TypeVar import loguru @@ -23,7 +22,59 @@ class QueryExistsError(Exception): pass -StatusChangeEvent = namedtuple("StatusChangeEvent", ["status", "timestamp"]) +StatusChangeEvent = NamedTuple( + "StatusChangeEvent", [("status", Status), ("timestamp", float)] +) + + +@dataclass +class StatusHistory: + file_path: Path = field(init=True, repr=False) + logger: loguru.Logger = field(init=True, repr=False) + _status_history: list[StatusChangeEvent] = field( + init=False, default_factory=list, repr=True + ) + + def __post_init__(self): + if self.file_path.exists(): + self.logger.debug(f"Loading status history from file {self.file_path}") + with self.file_path.open("r", encoding="utf8") as f: + for line in f: + status_str, timestamp = line.split(",") + self._status_history.append( + StatusChangeEvent( + status=Status[status_str], timestamp=float(timestamp) + ) + ) + + def add(self, status: Status, timestamp: float = time.time()): + self._status_history.append( + StatusChangeEvent(status=status, timestamp=timestamp) + ) + with self.file_path.open("a", encoding="utf8") as f: + self.logger.debug(f"updating status: {status=}") + f.write(f"{status.name},{timestamp}\n") + + @property + def last_status_event(self): + if not self._status_history: + return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time()) + return self._status_history[-1] + + @property + def current_status(self): + return self.last_status_event.status + + @property + def status_event_json(self): + status_event = { + "status": self.last_status_event.status.name, + "start_time": self.last_status_event.timestamp, + } + if self.current_status >= Status.COMPLETE and len(self._status_history) >= 2: + status_event["start_time"] = self._status_history[-2].timestamp + status_event["end_time"] = self.last_status_event.timestamp + return status_event @dataclass @@ -31,22 +82,21 @@ class Query: # pylint: disable=too-many-instance-attributes query_id: str current_step: Optional[Step] = field(init=False, default=None, repr=True) - start_time: Optional[float] = field(init=False, default=None) - end_time: Optional[float] = field(init=False, default=None) - stopped: bool = field(init=False, default=False) logger: loguru.Logger = field(init=False, repr=False) _logger_id: int = field(init=False, repr=False) - step_classes: ClassVar[list[type[Step]]] = [] _log_dir: Path = settings.root_path / Path("logs") - _status_history: list[StatusChangeEvent] = field( - init=False, default_factory=list, repr=True - ) + _status_history: StatusHistory = field(init=False, repr=True) _status_dir: Path = settings.root_path / Path("status_semaphore") + step_classes: ClassVar[list[type[Step]]] = [] def __post_init__(self): self.logger = logger.bind(task=self.query_id) + status_dir = settings.root_path / Path("status_semaphore") + status_dir.mkdir(exist_ok=True) + status_file_path = status_dir / Path(f"{self.query_id}") + self._status_history = StatusHistory(file_path=status_file_path, logger=logger) + self._log_dir.mkdir(exist_ok=True) - self._status_dir.mkdir(exist_ok=True) self._logger_id = logger.add( self.log_file_path, serialize=True, @@ -64,11 +114,11 @@ def role(self) -> Role: @property def started(self) -> bool: - return self.start_time is not None + return self.status >= Status.STARTING @property def finished(self) -> bool: - return self.end_time is not None + return self.status >= Status.COMPLETE @classmethod def get_from_query_id(cls, query_id) -> Optional["Query"]: @@ -83,55 +133,22 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]: if query: return query raise e - query.load_history_from_file() if query.status == Status.UNKNOWN: return None return query - def load_history_from_file(self): - if self.status_file_path.exists(): - self.logger.debug( - f"Loading query {self.query_id} status history " - f"from file {self.status_file_path}" - ) - with self.status_file_path.open("r") as f: - for line in f: - status_str, timestamp = line.split(",") - self._status_history.append( - StatusChangeEvent( - status=Status[status_str], timestamp=float(timestamp) - ) - ) - - @property - def _last_status_event(self): - if not self._status_history: - return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time()) - return self._status_history[-1] - - @property - def status_event_json(self): - status_event = { - "status": self._last_status_event.status.name, - "start_time": self._last_status_event.timestamp, - } - if self.status >= Status.COMPLETE and len(self._status_history) >= 2: - status_event["start_time"] = self._status_history[-2].timestamp - status_event["end_time"] = self._last_status_event.timestamp - return status_event - @property def status(self) -> Status: - return self._last_status_event.status + return self._status_history.current_status @status.setter def status(self, status: Status): - if self.status <= Status.COMPLETE: - now = time.time() - self._status_history.append(StatusChangeEvent(status=status, timestamp=now)) - with self.status_file_path.open("a") as f: - self.logger.debug(f"updating status: {status=}") - f.write(f"{status.name},{now}\n") + if self.status != status and self.status <= Status.COMPLETE: + self._status_history.add(status) + + @property + def status_event_json(self): + return self._status_history.status_event_json @property def running(self): @@ -141,18 +158,12 @@ def running(self): def log_file_path(self) -> Path: return self._log_dir / Path(f"{self.query_id}.log") - @property - def status_file_path(self) -> Path: - return self._status_dir / Path(f"{self.query_id}") - @property def steps(self) -> Iterable[Step]: for step_class in self.step_classes: - if not self.stopped: - yield step_class.build_from_query(self) + yield step_class.build_from_query(self) def start(self): - self.start_time = time.time() try: for step in self.steps: if self.finished: @@ -180,22 +191,23 @@ def finish(self): self._cleanup() def kill(self): - self.status = Status.KILLED - self.logger.info(f"Killing: {self=}") - if self.current_step: - self.current_step.terminate() + if self.running: + self.status = Status.KILLED + self.logger.info(f"Killing: {self=}") + if self.current_step: + self.current_step.terminate() self._cleanup() def crash(self): - self.status = Status.CRASHED - self.logger.info(f"CRASHING! {self=}") - if self.current_step: - self.current_step.kill() + if self.running: + self.status = Status.CRASHED + self.logger.info(f"CRASHING! {self=}") + if self.current_step: + self.current_step.kill() self._cleanup() def _cleanup(self): self.current_step = None - self.end_time = time.time() try: logger.remove(self._logger_id) except ValueError: @@ -203,14 +215,6 @@ def _cleanup(self): if queries.get(self.query_id) is not None: del queries[self.query_id] - @property - def run_time(self): - if not self.start_time: - return 0 - if not self.end_time: - return time.time() - self.start_time - return self.end_time - self.start_time - @property def cpu_usage_percent(self) -> float: if self.current_step: From b45abd1aaca48ecc40377e57fcac769e69deaa37 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Wed, 10 Jul 2024 13:10:54 -0700 Subject: [PATCH 04/10] create new file specific to status --- sidecar/app/query/base.py | 71 ++++------------------------- sidecar/app/query/status.py | 76 ++++++++++++++++++++++++++++++++ sidecar/app/query/step.py | 14 +----- sidecar/app/routes/start.py | 2 +- sidecar/app/routes/stop.py | 2 +- sidecar/app/routes/websockets.py | 2 +- 6 files changed, 88 insertions(+), 79 deletions(-) create mode 100644 sidecar/app/query/status.py diff --git a/sidecar/app/query/base.py b/sidecar/app/query/base.py index ca2248b..20c3c2d 100644 --- a/sidecar/app/query/base.py +++ b/sidecar/app/query/base.py @@ -1,18 +1,17 @@ -# pylint: disable=R0801 from __future__ import annotations -import time from collections.abc import Iterable from dataclasses import dataclass, field from pathlib import Path -from typing import ClassVar, NamedTuple, Optional, TypeVar +from typing import ClassVar, Optional, TypeVar import loguru from ..helpers import Role from ..logger import logger from ..settings import settings -from .step import Status, Step +from .status import Status, StatusHistory +from .step import Step # Dictionary to store queries queries: dict[str, "Query"] = {} @@ -22,76 +21,18 @@ class QueryExistsError(Exception): pass -StatusChangeEvent = NamedTuple( - "StatusChangeEvent", [("status", Status), ("timestamp", float)] -) - - -@dataclass -class StatusHistory: - file_path: Path = field(init=True, repr=False) - logger: loguru.Logger = field(init=True, repr=False) - _status_history: list[StatusChangeEvent] = field( - init=False, default_factory=list, repr=True - ) - - def __post_init__(self): - if self.file_path.exists(): - self.logger.debug(f"Loading status history from file {self.file_path}") - with self.file_path.open("r", encoding="utf8") as f: - for line in f: - status_str, timestamp = line.split(",") - self._status_history.append( - StatusChangeEvent( - status=Status[status_str], timestamp=float(timestamp) - ) - ) - - def add(self, status: Status, timestamp: float = time.time()): - self._status_history.append( - StatusChangeEvent(status=status, timestamp=timestamp) - ) - with self.file_path.open("a", encoding="utf8") as f: - self.logger.debug(f"updating status: {status=}") - f.write(f"{status.name},{timestamp}\n") - - @property - def last_status_event(self): - if not self._status_history: - return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time()) - return self._status_history[-1] - - @property - def current_status(self): - return self.last_status_event.status - - @property - def status_event_json(self): - status_event = { - "status": self.last_status_event.status.name, - "start_time": self.last_status_event.timestamp, - } - if self.current_status >= Status.COMPLETE and len(self._status_history) >= 2: - status_event["start_time"] = self._status_history[-2].timestamp - status_event["end_time"] = self.last_status_event.timestamp - return status_event - - @dataclass class Query: - # pylint: disable=too-many-instance-attributes query_id: str current_step: Optional[Step] = field(init=False, default=None, repr=True) logger: loguru.Logger = field(init=False, repr=False) _logger_id: int = field(init=False, repr=False) - _log_dir: Path = settings.root_path / Path("logs") _status_history: StatusHistory = field(init=False, repr=True) - _status_dir: Path = settings.root_path / Path("status_semaphore") step_classes: ClassVar[list[type[Step]]] = [] def __post_init__(self): self.logger = logger.bind(task=self.query_id) - status_dir = settings.root_path / Path("status_semaphore") + status_dir = settings.root_path / Path("status") status_dir.mkdir(exist_ok=True) status_file_path = status_dir / Path(f"{self.query_id}") self._status_history = StatusHistory(file_path=status_file_path, logger=logger) @@ -108,6 +49,10 @@ def __post_init__(self): raise QueryExistsError(f"{self.query_id} already exists") queries[self.query_id] = self + @property + def _log_dir(self) -> Path: + return settings.root_path / Path("logs") + @property def role(self) -> Role: return settings.role diff --git a/sidecar/app/query/status.py b/sidecar/app/query/status.py new file mode 100644 index 0000000..8937798 --- /dev/null +++ b/sidecar/app/query/status.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from enum import IntEnum, auto +from pathlib import Path +from typing import NamedTuple + +import loguru + + +class Status(IntEnum): + UNKNOWN = auto() + STARTING = auto() + COMPILING = auto() + WAITING_TO_START = auto() + IN_PROGRESS = auto() + COMPLETE = auto() + KILLED = auto() + NOT_FOUND = auto() + CRASHED = auto() + + +StatusChangeEvent = NamedTuple( + "StatusChangeEvent", [("status", Status), ("timestamp", float)] +) + + +@dataclass +class StatusHistory: + file_path: Path = field(init=True, repr=False) + logger: loguru.Logger = field(init=True, repr=False) + _status_history: list[StatusChangeEvent] = field( + init=False, default_factory=list, repr=True + ) + + def __post_init__(self): + if self.file_path.exists(): + self.logger.debug(f"Loading status history from file {self.file_path}") + with self.file_path.open("r", encoding="utf8") as f: + for line in f: + status_str, timestamp = line.split(",") + self._status_history.append( + StatusChangeEvent( + status=Status[status_str], timestamp=float(timestamp) + ) + ) + + def add(self, status: Status, timestamp: float = time.time()): + self._status_history.append( + StatusChangeEvent(status=status, timestamp=timestamp) + ) + with self.file_path.open("a", encoding="utf8") as f: + self.logger.debug(f"updating status: {status=}") + f.write(f"{status.name},{timestamp}\n") + + @property + def last_status_event(self): + if not self._status_history: + return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time()) + return self._status_history[-1] + + @property + def current_status(self): + return self.last_status_event.status + + @property + def status_event_json(self): + status_event = { + "status": self.last_status_event.status.name, + "start_time": self.last_status_event.timestamp, + } + if self.current_status >= Status.COMPLETE and len(self._status_history) >= 2: + status_event["start_time"] = self._status_history[-2].timestamp + status_event["end_time"] = self.last_status_event.timestamp + return status_event diff --git a/sidecar/app/query/step.py b/sidecar/app/query/step.py index 3ff58b9..dc0ea19 100644 --- a/sidecar/app/query/step.py +++ b/sidecar/app/query/step.py @@ -3,29 +3,17 @@ import os from abc import ABC, abstractmethod from dataclasses import dataclass, field -from enum import IntEnum, auto from typing import TYPE_CHECKING, ClassVar, Optional import loguru from .command import Command +from .status import Status if TYPE_CHECKING: from .base import QueryTypeT -class Status(IntEnum): - UNKNOWN = auto() - STARTING = auto() - COMPILING = auto() - WAITING_TO_START = auto() - IN_PROGRESS = auto() - COMPLETE = auto() - KILLED = auto() - NOT_FOUND = auto() - CRASHED = auto() - - @dataclass(kw_only=True) class Step(ABC): skip: bool = field(init=False, default=False) diff --git a/sidecar/app/routes/start.py b/sidecar/app/routes/start.py index 4ee7b2e..168e630 100644 --- a/sidecar/app/routes/start.py +++ b/sidecar/app/routes/start.py @@ -10,7 +10,7 @@ from ..query.base import Query from ..query.demo_logger import DemoLoggerQuery from ..query.ipa import GateType, IPACoordinatorQuery, IPAHelperQuery -from ..query.step import Status +from ..query.status import Status from ..settings import settings router = APIRouter( diff --git a/sidecar/app/routes/stop.py b/sidecar/app/routes/stop.py index 4dde4d8..c9f2ebb 100644 --- a/sidecar/app/routes/stop.py +++ b/sidecar/app/routes/stop.py @@ -2,7 +2,7 @@ from ..logger import logger from ..query.base import Query -from ..query.step import Status +from ..query.status import Status router = APIRouter( prefix="/stop", diff --git a/sidecar/app/routes/websockets.py b/sidecar/app/routes/websockets.py index c915085..6f4f2aa 100644 --- a/sidecar/app/routes/websockets.py +++ b/sidecar/app/routes/websockets.py @@ -7,7 +7,7 @@ from ..logger import logger from ..query.base import Query -from ..query.step import Status +from ..query.status import Status router = APIRouter( prefix="/ws", From 14a60743e1c9949bf3ce146f3b4921c361a3a7af Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Wed, 10 Jul 2024 15:32:49 -0700 Subject: [PATCH 05/10] add tests for StatusHistory --- .gitignore | 1 + pyproject.toml | 11 +++ sidecar/app/query/status.py | 21 +++-- sidecar/tests/__init__.py | 0 sidecar/tests/app/query/test_status.py | 114 +++++++++++++++++++++++++ sidecar/tests/demo_test.py | 6 ++ 6 files changed, 146 insertions(+), 7 deletions(-) create mode 100644 sidecar/tests/__init__.py create mode 100644 sidecar/tests/app/query/test_status.py create mode 100644 sidecar/tests/demo_test.py diff --git a/.gitignore b/.gitignore index ca32a63..094cfc7 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ tmp/ IGNORE-ME* .pyre/* .draft +.coverage # local env files .env*.local diff --git a/pyproject.toml b/pyproject.toml index 9677318..3e240fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ dependencies=[ "pre-commit", "cryptography", "httpx", + "pytest", + "pytest-cov", ] [project.scripts] @@ -50,6 +52,15 @@ disable = [ # "R0913", ] +[tool.pylint.main] +source-roots = ["sidecar"] + [tool.black] target-version = ["py311", ] include = '\.pyi?$' + +[tool.pytest.ini_options] +addopts = [ + "--import-mode=importlib", +] +pythonpath = "sidecar" diff --git a/sidecar/app/query/status.py b/sidecar/app/query/status.py index 8937798..ebb112b 100644 --- a/sidecar/app/query/status.py +++ b/sidecar/app/query/status.py @@ -11,13 +11,13 @@ class Status(IntEnum): UNKNOWN = auto() + NOT_FOUND = auto() STARTING = auto() COMPILING = auto() WAITING_TO_START = auto() IN_PROGRESS = auto() COMPLETE = auto() KILLED = auto() - NOT_FOUND = auto() CRASHED = auto() @@ -29,7 +29,7 @@ class Status(IntEnum): @dataclass class StatusHistory: file_path: Path = field(init=True, repr=False) - logger: loguru.Logger = field(init=True, repr=False) + logger: loguru.Logger = field(init=True, repr=False, compare=False) _status_history: list[StatusChangeEvent] = field( init=False, default_factory=list, repr=True ) @@ -46,7 +46,14 @@ def __post_init__(self): ) ) + @property + def locking_status(self): + """Cannot add to history after this or higher status is reached""" + return Status.COMPLETE + def add(self, status: Status, timestamp: float = time.time()): + assert status > self.current_status + assert self.current_status < self.locking_status self._status_history.append( StatusChangeEvent(status=status, timestamp=timestamp) ) @@ -55,22 +62,22 @@ def add(self, status: Status, timestamp: float = time.time()): f.write(f"{status.name},{timestamp}\n") @property - def last_status_event(self): + def current_status_event(self): if not self._status_history: return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time()) return self._status_history[-1] @property def current_status(self): - return self.last_status_event.status + return self.current_status_event.status @property def status_event_json(self): status_event = { - "status": self.last_status_event.status.name, - "start_time": self.last_status_event.timestamp, + "status": self.current_status_event.status.name, + "start_time": self.current_status_event.timestamp, } if self.current_status >= Status.COMPLETE and len(self._status_history) >= 2: status_event["start_time"] = self._status_history[-2].timestamp - status_event["end_time"] = self.last_status_event.timestamp + status_event["end_time"] = self.current_status_event.timestamp return status_event diff --git a/sidecar/tests/__init__.py b/sidecar/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sidecar/tests/app/query/test_status.py b/sidecar/tests/app/query/test_status.py new file mode 100644 index 0000000..511bf44 --- /dev/null +++ b/sidecar/tests/app/query/test_status.py @@ -0,0 +1,114 @@ +import time +from pathlib import Path + +import loguru +import pytest +from app.query.status import Status, StatusChangeEvent, StatusHistory + + +@pytest.fixture(name="status_history_fixture") +def _status_history_fixture(tmp_path): + status_history = StatusHistory( + file_path=tmp_path / Path("status"), + logger=loguru.logger, + ) + + return status_history + + +@pytest.fixture(name="full_status_history_fixture") +def _full_status_history_fixture(status_history_fixture): + status_events = [ + (Status.STARTING, 1.0), + (Status.COMPILING, 2.0), + (Status.WAITING_TO_START, 3.0), + (Status.IN_PROGRESS, 4.0), + (Status.COMPLETE, 5.0), + ] + + for status, timestamp in status_events: + status_history_fixture.add(status, timestamp) + + return status_history_fixture + + +def test_status_history_add(status_history_fixture): + now = time.time() + status_history_fixture.add(Status.COMPILING, now) + assert status_history_fixture.current_status_event == StatusChangeEvent( + Status.COMPILING, now + ) + now = time.time() + status_history_fixture.add(Status.IN_PROGRESS, now) + assert status_history_fixture.current_status_event == StatusChangeEvent( + Status.IN_PROGRESS, now + ) + + +def test_status_history_add_write_to_file(status_history_fixture): + status_history_fixture.add(Status.COMPILING, 1.0) + status_history_fixture.add(Status.IN_PROGRESS, 2.0) + with status_history_fixture.file_path.open("r", encoding="utf-8") as f: + assert f.readline() == "COMPILING,1.0\n" + assert f.readline() == "IN_PROGRESS,2.0\n" + + +def test_status_history_add_load_from_file(tmp_path, full_status_history_fixture): + status_history = StatusHistory( + file_path=tmp_path / Path("status"), + logger=loguru.logger, + ) + assert status_history == full_status_history_fixture + + +def test_status_history_cannot_add_when_locked(full_status_history_fixture): + with pytest.raises(AssertionError): + now = time.time() + full_status_history_fixture.add(Status.KILLED, now) + + +def test_status_history_cannot_add_lower_status(status_history_fixture): + now = time.time() + status_history_fixture.add(Status.IN_PROGRESS, now) + assert status_history_fixture.current_status_event == StatusChangeEvent( + Status.IN_PROGRESS, now + ) + with pytest.raises(AssertionError): + now = time.time() + status_history_fixture.add(Status.COMPILING, now) + + +def test_status_history_current_status_event(full_status_history_fixture): + assert full_status_history_fixture.current_status_event == StatusChangeEvent( + Status.COMPLETE, 5.0 + ) + + +def test_status_history_current_status(full_status_history_fixture): + assert full_status_history_fixture.current_status == Status.COMPLETE + + +def test_status_history_status_event_json( + status_history_fixture, +): + now = time.time() + status_history_fixture.add(Status.COMPILING, now) + assert status_history_fixture.status_event_json == { + "status": Status.COMPILING.name, + "start_time": now, + } + + now = time.time() + status_history_fixture.add(Status.IN_PROGRESS, now) + assert status_history_fixture.status_event_json == { + "status": Status.IN_PROGRESS.name, + "start_time": now, + } + + now2 = time.time() + status_history_fixture.add(Status.COMPLETE, now2) + assert status_history_fixture.status_event_json == { + "status": Status.COMPLETE.name, + "start_time": now, + "end_time": now2, + } diff --git a/sidecar/tests/demo_test.py b/sidecar/tests/demo_test.py new file mode 100644 index 0000000..1030a2e --- /dev/null +++ b/sidecar/tests/demo_test.py @@ -0,0 +1,6 @@ +def func(x): + return x + 1 + + +def test_answer(): + assert func(3) == 4 From d3f0a85936b06c6091a398987c6752b378d6f1cd Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Wed, 10 Jul 2024 16:02:02 -0700 Subject: [PATCH 06/10] add pytest to pre-commit --- .pre-commit-config.yaml | 13 +++++++++++++ .pyre_configuration | 3 ++- sidecar/cli/cli.py | 4 ++-- sidecar/tests/app/query/test_status.py | 3 ++- 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 877f29c..70d1064 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,6 +19,19 @@ repos: [ "-rn", # Only display messages ] + - id: pytest + name: pytest + language: python + entry: pytest + types: [python] + pass_filenames: false + - id : pytest-coverage + name: coverage + language: python + entry: coverage report + types: [python] + pass_filenames: false + args: [--fail-under=9] # increase this over time - id: pyre-check name: pyre-check entry: pyre check diff --git a/.pyre_configuration b/.pyre_configuration index f4023d7..caaf31e 100644 --- a/.pyre_configuration +++ b/.pyre_configuration @@ -1,6 +1,7 @@ { "site_package_search_strategy": "pep561", "source_directories": [ - "sidecar" + {"import_root": ".", "source": "sidecar"} ] + } diff --git a/sidecar/cli/cli.py b/sidecar/cli/cli.py index 91dc1c0..7f89aa4 100644 --- a/sidecar/cli/cli.py +++ b/sidecar/cli/cli.py @@ -9,8 +9,8 @@ import click import click_pathlib -from ..app.command import Command, start_commands_parallel -from ..app.helpers import Role +from sidecar.app.command import Command, start_commands_parallel +from sidecar.app.helpers import Role @click.group() diff --git a/sidecar/tests/app/query/test_status.py b/sidecar/tests/app/query/test_status.py index 511bf44..4a35c4e 100644 --- a/sidecar/tests/app/query/test_status.py +++ b/sidecar/tests/app/query/test_status.py @@ -3,7 +3,8 @@ import loguru import pytest -from app.query.status import Status, StatusChangeEvent, StatusHistory + +from sidecar.app.query.status import Status, StatusChangeEvent, StatusHistory @pytest.fixture(name="status_history_fixture") From 0a8205f95f74c1c854818eb8cb3b88a16b6f989e Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Thu, 11 Jul 2024 09:34:25 -0700 Subject: [PATCH 07/10] attempt to fix github runner --- .github/workflows/pre-commit.yaml | 2 +- .pre-commit-config.yaml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml index 79dc69f..08bc35c 100644 --- a/.github/workflows/pre-commit.yaml +++ b/.github/workflows/pre-commit.yaml @@ -24,7 +24,7 @@ jobs: - name: Install dependencies run: | - pip install . + pip install -e . - name: Setup node.js uses: actions/setup-node@v4 with: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 70d1064..368c543 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -25,6 +25,7 @@ repos: entry: pytest types: [python] pass_filenames: false + args: [--cov=sidecar] - id : pytest-coverage name: coverage language: python From 18d359bd9d217bc33aa839081eba6edf7c5b3e08 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Thu, 11 Jul 2024 09:46:25 -0700 Subject: [PATCH 08/10] remove demo_test, accidentally added --- sidecar/tests/demo_test.py | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 sidecar/tests/demo_test.py diff --git a/sidecar/tests/demo_test.py b/sidecar/tests/demo_test.py deleted file mode 100644 index 1030a2e..0000000 --- a/sidecar/tests/demo_test.py +++ /dev/null @@ -1,6 +0,0 @@ -def func(x): - return x + 1 - - -def test_answer(): - assert func(3) == 4 From 0c02afaacdf242515422718e8e468ef61c56cec5 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Thu, 11 Jul 2024 13:11:44 -0700 Subject: [PATCH 09/10] update .gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 094cfc7..3059798 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,7 @@ tmp/ IGNORE-ME* .pyre/* .draft -.coverage +.coverage* # local env files .env*.local From 9328cdedd15f5d7ffec0ed4f1420b5fea18fda2e Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Fri, 12 Jul 2024 16:03:33 -0700 Subject: [PATCH 10/10] clean up mere commit --- sidecar/app/query/base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sidecar/app/query/base.py b/sidecar/app/query/base.py index 25fb50a..20c3c2d 100644 --- a/sidecar/app/query/base.py +++ b/sidecar/app/query/base.py @@ -21,9 +21,6 @@ class QueryExistsError(Exception): pass -StatusChangeEvent = namedtuple("StatusChangeEvent", ["status", "timestamp"]) - - @dataclass class Query: query_id: str