From 6af84f2f5a9b690198a774d23c870e170d4453a1 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Mon, 8 Jul 2024 18:55:35 -0700 Subject: [PATCH 1/3] store status changes with timestamps, show run time based on those timestamps --- server/app/query/servers.tsx | 49 +++++++++++++++------- server/app/query/view/[id]/components.tsx | 27 +++++++++++- server/app/query/view/[id]/page.tsx | 28 +++++++++---- sidecar/app/query/base.py | 51 +++++++++++++++++++---- sidecar/app/routes/websockets.py | 9 ++-- 5 files changed, 127 insertions(+), 37 deletions(-) diff --git a/server/app/query/servers.tsx b/server/app/query/servers.tsx index 40494ef..610c3f7 100644 --- a/server/app/query/servers.tsx +++ b/server/app/query/servers.tsx @@ -59,11 +59,20 @@ export const initialStatsByRemoteServer: StatsByRemoteServer = Object.values(RemoteServerNames).map((serverName) => [[serverName], []]), ); -export type RunTimeByRemoteServer = { +export type StartTimeByRemoteServer = { [key in RemoteServerNames]: number | null; }; -export const initialRunTimeByRemoteServer: RunTimeByRemoteServer = +export type EndTimeByRemoteServer = { + [key in RemoteServerNames]: number | null; +}; + +export const initialStartTimeByRemoteServer: StartTimeByRemoteServer = + Object.fromEntries( + Object.values(RemoteServerNames).map((serverName) => [[serverName], null]), + ); + +export const initialEndTimeByRemoteServer: StartTimeByRemoteServer = Object.fromEntries( Object.values(RemoteServerNames).map((serverName) => [[serverName], null]), ); @@ -179,6 +188,8 @@ export class RemoteServer { openStatusSocket( id: string, setStatus: React.Dispatch>, + setStartTime: React.Dispatch>, + setEndTime: React.Dispatch>, ): WebSocket { const ws = this.statusSocket(id); @@ -189,8 +200,29 @@ export class RemoteServer { })); }; + const updateStartTime = (runTime: number) => { + setStartTime((prevStartTime) => { + return { + ...prevStartTime, + [this.remoteServerName]: runTime, + }; + }); + }; + + const updateEndTime = (runTime: number) => { + setEndTime((prevEndTime) => { + return { + ...prevEndTime, + [this.remoteServerName]: runTime, + }; + }); + }; + ws.onmessage = (event) => { - const statusString: string = JSON.parse(event.data).status; + const eventData = JSON.parse(event.data); + updateStartTime(eventData.start_time); + updateEndTime(eventData.end_time ?? null); + const statusString: string = eventData.status; const status = getStatusFromString(statusString); updateStatus(status); }; @@ -207,7 +239,6 @@ export class RemoteServer { openStatsSocket( id: string, setStats: React.Dispatch>, - setRunTime: React.Dispatch>, ): WebSocket { const ws = this.statsSocket(id); @@ -221,18 +252,8 @@ export class RemoteServer { }); }; - const updateRunTime = (runTime: number) => { - setRunTime((prevRunTime) => { - return { - ...prevRunTime, - [this.remoteServerName]: runTime, - }; - }); - }; - ws.onmessage = (event) => { const eventData = JSON.parse(event.data); - updateRunTime(eventData.run_time); const statsDataPoint: StatsDataPoint = { timestamp: eventData.timestamp, memoryRSSUsage: eventData.memory_rss_usage, diff --git a/server/app/query/view/[id]/components.tsx b/server/app/query/view/[id]/components.tsx index 73c6927..7e3e1b8 100644 --- a/server/app/query/view/[id]/components.tsx +++ b/server/app/query/view/[id]/components.tsx @@ -1,3 +1,4 @@ +import { useEffect, useState, useRef } from "react"; import { Source_Code_Pro } from "next/font/google"; import clsx from "clsx"; import { ChevronDownIcon, ChevronRightIcon } from "@heroicons/react/24/solid"; @@ -65,12 +66,34 @@ function secondsToTime(e: number) { export function RunTimePill({ status, - runTime, + startTime, + endTime, }: { status: Status; - runTime: number | null; + startTime: number | null; + endTime: number | null; }) { + const [runTime, setRunTime] = useState(0); const runTimeStr = runTime ? secondsToTime(runTime) : "N/A"; + const intervalId = useRef | null>(null); + + useEffect(() => { + if (startTime !== null) { + if (endTime !== null && startTime !== null) { + setRunTime(endTime - startTime); + } else { + if (intervalId.current !== null) { + clearTimeout(intervalId.current); + } + + let newIntervalId = setInterval(() => { + setRunTime(Date.now() / 1000 - startTime); + }, 1000); + intervalId.current = newIntervalId; + } + } + }, [startTime, endTime]); + return (
{runTimeStr} diff --git a/server/app/query/view/[id]/page.tsx b/server/app/query/view/[id]/page.tsx index 166c50f..083d5f1 100644 --- a/server/app/query/view/[id]/page.tsx +++ b/server/app/query/view/[id]/page.tsx @@ -16,10 +16,12 @@ import { IPARemoteServers, //hack until the queryId is stored in a DB StatusByRemoteServer, StatsByRemoteServer, - RunTimeByRemoteServer, + StartTimeByRemoteServer, + EndTimeByRemoteServer, initialStatusByRemoteServer, initialStatsByRemoteServer, - initialRunTimeByRemoteServer, + initialStartTimeByRemoteServer, + initialEndTimeByRemoteServer, } from "@/app/query/servers"; import { StatsComponent } from "@/app/query/view/[id]/charts"; import { JSONSafeParse } from "@/app/utils"; @@ -48,8 +50,10 @@ export default function QueryPage({ params }: { params: { id: string } }) { useState(initialStatusByRemoteServer); const [statsByRemoteServer, setStatsByRemoteServer] = useState(initialStatsByRemoteServer); - const [runTimeByRemoteServer, setRunTimeByRemoteServer] = - useState(initialRunTimeByRemoteServer); + const [startTimeByRemoteServer, setStartTimeByRemoteServer] = + useState(initialStartTimeByRemoteServer); + const [endTimeByRemoteServer, setEndTimeByRemoteServer] = + useState(initialEndTimeByRemoteServer); function flipLogsHidden() { setLogsHidden(!logsHidden); @@ -108,11 +112,12 @@ export default function QueryPage({ params }: { params: { id: string } }) { const statusWs = remoteServer.openStatusSocket( query.uuid, setStatusByRemoteServer, + setStartTimeByRemoteServer, + setEndTimeByRemoteServer, ); const statsWs = remoteServer.openStatsSocket( query.uuid, setStatsByRemoteServer, - setRunTimeByRemoteServer, ); webSockets = [...webSockets, loggingWs, statusWs, statsWs]; } @@ -212,8 +217,11 @@ export default function QueryPage({ params }: { params: { id: string } }) {
{Object.values(IPARemoteServers).map( (remoteServer: RemoteServer) => { - const runTime = - runTimeByRemoteServer[remoteServer.remoteServerName]; + const startTime = + startTimeByRemoteServer[remoteServer.remoteServerName]; + const endTime = + endTimeByRemoteServer[remoteServer.remoteServerName]; + const status = statusByRemoteServer[remoteServer.remoteServerName] ?? Status.UNKNOWN; @@ -227,7 +235,11 @@ export default function QueryPage({ params }: { params: { id: string } }) { {remoteServer.toString()} Run Time
- +
); diff --git a/sidecar/app/query/base.py b/sidecar/app/query/base.py index f3b1332..ff416d3 100644 --- a/sidecar/app/query/base.py +++ b/sidecar/app/query/base.py @@ -2,6 +2,7 @@ from __future__ import annotations import time +from collections import namedtuple from collections.abc import Iterable from dataclasses import dataclass, field from pathlib import Path @@ -22,12 +23,14 @@ class QueryExistsError(Exception): pass +StatusChangeEvent = namedtuple("StatusChangeEvent", ["status", "timestamp"]) + + @dataclass class Query: # pylint: disable=too-many-instance-attributes query_id: str current_step: Optional[Step] = field(init=False, default=None, repr=True) - _status: Status = field(init=False, default=Status.UNKNOWN) start_time: Optional[float] = field(init=False, default=None) end_time: Optional[float] = field(init=False, default=None) stopped: bool = field(init=False, default=False) @@ -35,6 +38,9 @@ class Query: _logger_id: int = field(init=False, repr=False) step_classes: ClassVar[list[type[Step]]] = [] _log_dir: Path = settings.root_path / Path("logs") + _status_history: list[StatusChangeEvent] = field( + init=False, default_factory=list, repr=True + ) _status_dir: Path = settings.root_path / Path("status_semaphore") def __post_init__(self): @@ -79,21 +85,48 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]: raise e if query.status_file_path.exists(): with query.status_file_path.open("r") as f: - status_str = f.readline() - query.status = Status[status_str] - return query + for line in f: + status_str, timestamp = line.split(",") + query.add_status_event( + status=Status[status_str], timestamp=float(timestamp) + ) + return query return None + @property + def _last_status_event(self): + if not self._status_history: + return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time()) + return self._status_history[-1] + + @property + def status_event_json(self): + status_event = { + "status": self._last_status_event.status.name, + "start_time": self._last_status_event.timestamp, + } + if self.status >= Status.COMPLETE and len(self._status_history) >= 2: + status_event["start_time"] = self._status_history[-2].timestamp + status_event["end_time"] = self._last_status_event.timestamp + return status_event + @property def status(self) -> Status: - return self._status + return self._last_status_event.status @status.setter def status(self, status: Status): - self._status = status - with self.status_file_path.open("w") as f: - self.logger.debug(f"setting status: {status=}") - f.write(str(status.name)) + if self.status <= Status.COMPLETE: + now = time.time() + self.add_status_event(status, now) + + def add_status_event(self, status: Status, timestamp: float): + self._status_history.append( + StatusChangeEvent(status=status, timestamp=timestamp) + ) + with self.status_file_path.open("a") as f: + self.logger.debug(f"updating status: {status=}") + f.write(f"{status.name},{timestamp}\n") @property def running(self): diff --git a/sidecar/app/routes/websockets.py b/sidecar/app/routes/websockets.py index fb5db78..c915085 100644 --- a/sidecar/app/routes/websockets.py +++ b/sidecar/app/routes/websockets.py @@ -34,15 +34,17 @@ async def status_websocket(websocket: WebSocket, query_id: str): async with use_websocket(websocket) as websocket: if query is None: logger.warning(f"{query_id=} Status: {Status.NOT_FOUND.name}") - await websocket.send_json({"status": Status.NOT_FOUND.name}) + await websocket.send_json( + {"status": Status.NOT_FOUND.name, "start_time": time.time()} + ) else: while query.running: logger.debug(f"{query_id=} Status: {query.status.name}") - await websocket.send_json({"status": query.status.name}) + await websocket.send_json(query.status_event_json) await asyncio.sleep(1) logger.debug(f"{query_id=} Status: {query.status.name}") - await websocket.send_json({"status": query.status.name}) + await websocket.send_json(query.status_event_json) @router.websocket("/logs/{query_id}") @@ -81,7 +83,6 @@ async def stats_websocket(websocket: WebSocket, query_id: str): while query.running: await websocket.send_json( { - "run_time": query.run_time, "cpu_percent": query.cpu_usage_percent, "memory_rss_usage": query.memory_rss_usage, "timestamp": time.time(), From 5857e716b022d6a1f96e72a183b1c3f44b917e1c Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Tue, 9 Jul 2024 12:50:12 -0700 Subject: [PATCH 2/3] fix some bugs --- server/app/query/view/[id]/components.tsx | 15 ++++++---- sidecar/app/query/base.py | 35 +++++++++++++---------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/server/app/query/view/[id]/components.tsx b/server/app/query/view/[id]/components.tsx index 7e3e1b8..2cacdc8 100644 --- a/server/app/query/view/[id]/components.tsx +++ b/server/app/query/view/[id]/components.tsx @@ -73,19 +73,22 @@ export function RunTimePill({ startTime: number | null; endTime: number | null; }) { - const [runTime, setRunTime] = useState(0); + const [runTime, setRunTime] = useState(null); const runTimeStr = runTime ? secondsToTime(runTime) : "N/A"; const intervalId = useRef | null>(null); useEffect(() => { - if (startTime !== null) { + if (intervalId.current !== null) { + // any time startTime or endTime change, we remove the old setInterval + // which runs the timer. if a new one is needed, it's created. + clearTimeout(intervalId.current); + } + if (startTime === null) { + setRunTime(null); + } else { if (endTime !== null && startTime !== null) { setRunTime(endTime - startTime); } else { - if (intervalId.current !== null) { - clearTimeout(intervalId.current); - } - let newIntervalId = setInterval(() => { setRunTime(Date.now() / 1000 - startTime); }, 1000); diff --git a/sidecar/app/query/base.py b/sidecar/app/query/base.py index ff416d3..cf63149 100644 --- a/sidecar/app/query/base.py +++ b/sidecar/app/query/base.py @@ -83,15 +83,25 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]: if query: return query raise e - if query.status_file_path.exists(): - with query.status_file_path.open("r") as f: + query.load_history_from_file() + if query.status == Status.UNKNOWN: + return None + return query + + def load_history_from_file(self): + if self.status_file_path.exists(): + self.logger.debug( + f"Loading query {self.query_id} status history " + f"from file {self.status_file_path}" + ) + with self.status_file_path.open("r") as f: for line in f: status_str, timestamp = line.split(",") - query.add_status_event( - status=Status[status_str], timestamp=float(timestamp) + self._status_history.append( + StatusChangeEvent( + status=Status[status_str], timestamp=float(timestamp) + ) ) - return query - return None @property def _last_status_event(self): @@ -118,15 +128,10 @@ def status(self) -> Status: def status(self, status: Status): if self.status <= Status.COMPLETE: now = time.time() - self.add_status_event(status, now) - - def add_status_event(self, status: Status, timestamp: float): - self._status_history.append( - StatusChangeEvent(status=status, timestamp=timestamp) - ) - with self.status_file_path.open("a") as f: - self.logger.debug(f"updating status: {status=}") - f.write(f"{status.name},{timestamp}\n") + self._status_history.append(StatusChangeEvent(status=status, timestamp=now)) + with self.status_file_path.open("a") as f: + self.logger.debug(f"updating status: {status=}") + f.write(f"{status.name},{now}\n") @property def running(self): From b83ed1963b2e722f157072f0edcc5c4a3da56cda Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Fri, 12 Jul 2024 15:43:15 -0700 Subject: [PATCH 3/3] fix starttime check in components.tsx --- server/app/query/view/[id]/components.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/app/query/view/[id]/components.tsx b/server/app/query/view/[id]/components.tsx index 2cacdc8..5aaf35a 100644 --- a/server/app/query/view/[id]/components.tsx +++ b/server/app/query/view/[id]/components.tsx @@ -86,7 +86,7 @@ export function RunTimePill({ if (startTime === null) { setRunTime(null); } else { - if (endTime !== null && startTime !== null) { + if (endTime !== null) { setRunTime(endTime - startTime); } else { let newIntervalId = setInterval(() => {