Skip to content

Commit

Permalink
Status events (#70)
Browse files Browse the repository at this point in the history
* store status changes with timestamps, show run time based on those timestamps

* fix some bugs

* fix starttime check in components.tsx
  • Loading branch information
eriktaubeneck authored Jul 12, 2024
1 parent 62f5e23 commit 8b6b38d
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 40 deletions.
49 changes: 35 additions & 14 deletions server/app/query/servers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,20 @@ export const initialStatsByRemoteServer: StatsByRemoteServer =
Object.values(RemoteServerNames).map((serverName) => [[serverName], []]),
);

export type RunTimeByRemoteServer = {
export type StartTimeByRemoteServer = {
[key in RemoteServerNames]: number | null;
};

export const initialRunTimeByRemoteServer: RunTimeByRemoteServer =
export type EndTimeByRemoteServer = {
[key in RemoteServerNames]: number | null;
};

export const initialStartTimeByRemoteServer: StartTimeByRemoteServer =
Object.fromEntries(
Object.values(RemoteServerNames).map((serverName) => [[serverName], null]),
);

export const initialEndTimeByRemoteServer: StartTimeByRemoteServer =
Object.fromEntries(
Object.values(RemoteServerNames).map((serverName) => [[serverName], null]),
);
Expand Down Expand Up @@ -179,6 +188,8 @@ export class RemoteServer {
openStatusSocket(
id: string,
setStatus: React.Dispatch<React.SetStateAction<StatusByRemoteServer>>,
setStartTime: React.Dispatch<React.SetStateAction<StartTimeByRemoteServer>>,
setEndTime: React.Dispatch<React.SetStateAction<EndTimeByRemoteServer>>,
): WebSocket {
const ws = this.statusSocket(id);

Expand All @@ -189,8 +200,29 @@ export class RemoteServer {
}));
};

const updateStartTime = (runTime: number) => {
setStartTime((prevStartTime) => {
return {
...prevStartTime,
[this.remoteServerName]: runTime,
};
});
};

const updateEndTime = (runTime: number) => {
setEndTime((prevEndTime) => {
return {
...prevEndTime,
[this.remoteServerName]: runTime,
};
});
};

ws.onmessage = (event) => {
const statusString: string = JSON.parse(event.data).status;
const eventData = JSON.parse(event.data);
updateStartTime(eventData.start_time);
updateEndTime(eventData.end_time ?? null);
const statusString: string = eventData.status;
const status = getStatusFromString(statusString);
updateStatus(status);
};
Expand All @@ -207,7 +239,6 @@ export class RemoteServer {
openStatsSocket(
id: string,
setStats: React.Dispatch<React.SetStateAction<StatsByRemoteServer>>,
setRunTime: React.Dispatch<React.SetStateAction<RunTimeByRemoteServer>>,
): WebSocket {
const ws = this.statsSocket(id);

Expand All @@ -221,18 +252,8 @@ export class RemoteServer {
});
};

const updateRunTime = (runTime: number) => {
setRunTime((prevRunTime) => {
return {
...prevRunTime,
[this.remoteServerName]: runTime,
};
});
};

ws.onmessage = (event) => {
const eventData = JSON.parse(event.data);
updateRunTime(eventData.run_time);
const statsDataPoint: StatsDataPoint = {
timestamp: eventData.timestamp,
memoryRSSUsage: eventData.memory_rss_usage,
Expand Down
30 changes: 28 additions & 2 deletions server/app/query/view/[id]/components.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { useEffect, useState, useRef } from "react";
import { Source_Code_Pro } from "next/font/google";
import clsx from "clsx";
import { ChevronDownIcon, ChevronRightIcon } from "@heroicons/react/24/solid";
Expand Down Expand Up @@ -65,12 +66,37 @@ function secondsToTime(e: number) {

export function RunTimePill({
status,
runTime,
startTime,
endTime,
}: {
status: Status;
runTime: number | null;
startTime: number | null;
endTime: number | null;
}) {
const [runTime, setRunTime] = useState<number | null>(null);
const runTimeStr = runTime ? secondsToTime(runTime) : "N/A";
const intervalId = useRef<ReturnType<typeof setTimeout> | null>(null);

useEffect(() => {
if (intervalId.current !== null) {
// any time startTime or endTime change, we remove the old setInterval
// which runs the timer. if a new one is needed, it's created.
clearTimeout(intervalId.current);
}
if (startTime === null) {
setRunTime(null);
} else {
if (endTime !== null) {
setRunTime(endTime - startTime);
} else {
let newIntervalId = setInterval(() => {
setRunTime(Date.now() / 1000 - startTime);
}, 1000);
intervalId.current = newIntervalId;
}
}
}, [startTime, endTime]);

return (
<div className={clsx(`rounded-full px-2`, StatusClassNameMixins[status])}>
{runTimeStr}
Expand Down
28 changes: 20 additions & 8 deletions server/app/query/view/[id]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import {
IPARemoteServers, //hack until the queryId is stored in a DB
StatusByRemoteServer,
StatsByRemoteServer,
RunTimeByRemoteServer,
StartTimeByRemoteServer,
EndTimeByRemoteServer,
initialStatusByRemoteServer,
initialStatsByRemoteServer,
initialRunTimeByRemoteServer,
initialStartTimeByRemoteServer,
initialEndTimeByRemoteServer,
} from "@/app/query/servers";
import { StatsComponent } from "@/app/query/view/[id]/charts";
import { JSONSafeParse } from "@/app/utils";
Expand Down Expand Up @@ -48,8 +50,10 @@ export default function QueryPage({ params }: { params: { id: string } }) {
useState<StatusByRemoteServer>(initialStatusByRemoteServer);
const [statsByRemoteServer, setStatsByRemoteServer] =
useState<StatsByRemoteServer>(initialStatsByRemoteServer);
const [runTimeByRemoteServer, setRunTimeByRemoteServer] =
useState<RunTimeByRemoteServer>(initialRunTimeByRemoteServer);
const [startTimeByRemoteServer, setStartTimeByRemoteServer] =
useState<StartTimeByRemoteServer>(initialStartTimeByRemoteServer);
const [endTimeByRemoteServer, setEndTimeByRemoteServer] =
useState<EndTimeByRemoteServer>(initialEndTimeByRemoteServer);

function flipLogsHidden() {
setLogsHidden(!logsHidden);
Expand Down Expand Up @@ -108,11 +112,12 @@ export default function QueryPage({ params }: { params: { id: string } }) {
const statusWs = remoteServer.openStatusSocket(
query.uuid,
setStatusByRemoteServer,
setStartTimeByRemoteServer,
setEndTimeByRemoteServer,
);
const statsWs = remoteServer.openStatsSocket(
query.uuid,
setStatsByRemoteServer,
setRunTimeByRemoteServer,
);
webSockets = [...webSockets, loggingWs, statusWs, statsWs];
}
Expand Down Expand Up @@ -212,8 +217,11 @@ export default function QueryPage({ params }: { params: { id: string } }) {
<dl className="grid grid-cols-1 gap-2 sm:grid-cols-2 lg:grid-cols-4">
{Object.values(IPARemoteServers).map(
(remoteServer: RemoteServer) => {
const runTime =
runTimeByRemoteServer[remoteServer.remoteServerName];
const startTime =
startTimeByRemoteServer[remoteServer.remoteServerName];
const endTime =
endTimeByRemoteServer[remoteServer.remoteServerName];

const status =
statusByRemoteServer[remoteServer.remoteServerName] ??
Status.UNKNOWN;
Expand All @@ -227,7 +235,11 @@ export default function QueryPage({ params }: { params: { id: string } }) {
{remoteServer.toString()} Run Time
</dt>
<dd>
<RunTimePill status={status} runTime={runTime} />
<RunTimePill
status={status}
startTime={startTime}
endTime={endTime}
/>
</dd>
</div>
);
Expand Down
62 changes: 50 additions & 12 deletions sidecar/app/query/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import time
from collections import namedtuple
from collections.abc import Iterable
from dataclasses import dataclass, field
from pathlib import Path
Expand All @@ -22,19 +23,24 @@ class QueryExistsError(Exception):
pass


StatusChangeEvent = namedtuple("StatusChangeEvent", ["status", "timestamp"])


@dataclass
class Query:
# pylint: disable=too-many-instance-attributes
query_id: str
current_step: Optional[Step] = field(init=False, default=None, repr=True)
_status: Status = field(init=False, default=Status.UNKNOWN)
start_time: Optional[float] = field(init=False, default=None)
end_time: Optional[float] = field(init=False, default=None)
stopped: bool = field(init=False, default=False)
logger: loguru.Logger = field(init=False, repr=False)
_logger_id: int = field(init=False, repr=False)
step_classes: ClassVar[list[type[Step]]] = []
_log_dir: Path = settings.root_path / Path("logs")
_status_history: list[StatusChangeEvent] = field(
init=False, default_factory=list, repr=True
)
_status_dir: Path = settings.root_path / Path("status_semaphore")

def __post_init__(self):
Expand Down Expand Up @@ -77,23 +83,55 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]:
if query:
return query
raise e
if query.status_file_path.exists():
with query.status_file_path.open("r") as f:
status_str = f.readline()
query.status = Status[status_str]
return query
return None
query.load_history_from_file()
if query.status == Status.UNKNOWN:
return None
return query

def load_history_from_file(self):
if self.status_file_path.exists():
self.logger.debug(
f"Loading query {self.query_id} status history "
f"from file {self.status_file_path}"
)
with self.status_file_path.open("r") as f:
for line in f:
status_str, timestamp = line.split(",")
self._status_history.append(
StatusChangeEvent(
status=Status[status_str], timestamp=float(timestamp)
)
)

@property
def _last_status_event(self):
if not self._status_history:
return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time())
return self._status_history[-1]

@property
def status_event_json(self):
status_event = {
"status": self._last_status_event.status.name,
"start_time": self._last_status_event.timestamp,
}
if self.status >= Status.COMPLETE and len(self._status_history) >= 2:
status_event["start_time"] = self._status_history[-2].timestamp
status_event["end_time"] = self._last_status_event.timestamp
return status_event

@property
def status(self) -> Status:
return self._status
return self._last_status_event.status

@status.setter
def status(self, status: Status):
self._status = status
with self.status_file_path.open("w") as f:
self.logger.debug(f"setting status: {status=}")
f.write(str(status.name))
if self.status <= Status.COMPLETE:
now = time.time()
self._status_history.append(StatusChangeEvent(status=status, timestamp=now))
with self.status_file_path.open("a") as f:
self.logger.debug(f"updating status: {status=}")
f.write(f"{status.name},{now}\n")

@property
def running(self):
Expand Down
9 changes: 5 additions & 4 deletions sidecar/app/routes/websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ async def status_websocket(websocket: WebSocket, query_id: str):
async with use_websocket(websocket) as websocket:
if query is None:
logger.warning(f"{query_id=} Status: {Status.NOT_FOUND.name}")
await websocket.send_json({"status": Status.NOT_FOUND.name})
await websocket.send_json(
{"status": Status.NOT_FOUND.name, "start_time": time.time()}
)
else:
while query.running:
logger.debug(f"{query_id=} Status: {query.status.name}")
await websocket.send_json({"status": query.status.name})
await websocket.send_json(query.status_event_json)
await asyncio.sleep(1)

logger.debug(f"{query_id=} Status: {query.status.name}")
await websocket.send_json({"status": query.status.name})
await websocket.send_json(query.status_event_json)


@router.websocket("/logs/{query_id}")
Expand Down Expand Up @@ -81,7 +83,6 @@ async def stats_websocket(websocket: WebSocket, query_id: str):
while query.running:
await websocket.send_json(
{
"run_time": query.run_time,
"cpu_percent": query.cpu_usage_percent,
"memory_rss_usage": query.memory_rss_usage,
"timestamp": time.time(),
Expand Down

0 comments on commit 8b6b38d

Please sign in to comment.