Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status events #70

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions server/app/query/servers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,20 @@ export const initialStatsByRemoteServer: StatsByRemoteServer =
Object.values(RemoteServerNames).map((serverName) => [[serverName], []]),
);

export type RunTimeByRemoteServer = {
export type StartTimeByRemoteServer = {
[key in RemoteServerNames]: number | null;
};

export const initialRunTimeByRemoteServer: RunTimeByRemoteServer =
export type EndTimeByRemoteServer = {
[key in RemoteServerNames]: number | null;
};

export const initialStartTimeByRemoteServer: StartTimeByRemoteServer =
Object.fromEntries(
Object.values(RemoteServerNames).map((serverName) => [[serverName], null]),
);

export const initialEndTimeByRemoteServer: StartTimeByRemoteServer =
Object.fromEntries(
Object.values(RemoteServerNames).map((serverName) => [[serverName], null]),
);
Expand Down Expand Up @@ -179,6 +188,8 @@ export class RemoteServer {
openStatusSocket(
id: string,
setStatus: React.Dispatch<React.SetStateAction<StatusByRemoteServer>>,
setStartTime: React.Dispatch<React.SetStateAction<StartTimeByRemoteServer>>,
setEndTime: React.Dispatch<React.SetStateAction<EndTimeByRemoteServer>>,
): WebSocket {
const ws = this.statusSocket(id);

Expand All @@ -189,8 +200,29 @@ export class RemoteServer {
}));
};

const updateStartTime = (runTime: number) => {
setStartTime((prevStartTime) => {
return {
...prevStartTime,
[this.remoteServerName]: runTime,
};
});
};

const updateEndTime = (runTime: number) => {
setEndTime((prevEndTime) => {
return {
...prevEndTime,
[this.remoteServerName]: runTime,
};
});
};

ws.onmessage = (event) => {
const statusString: string = JSON.parse(event.data).status;
const eventData = JSON.parse(event.data);
updateStartTime(eventData.start_time);
updateEndTime(eventData.end_time ?? null);
const statusString: string = eventData.status;
const status = getStatusFromString(statusString);
updateStatus(status);
};
Expand All @@ -207,7 +239,6 @@ export class RemoteServer {
openStatsSocket(
id: string,
setStats: React.Dispatch<React.SetStateAction<StatsByRemoteServer>>,
setRunTime: React.Dispatch<React.SetStateAction<RunTimeByRemoteServer>>,
): WebSocket {
const ws = this.statsSocket(id);

Expand All @@ -221,18 +252,8 @@ export class RemoteServer {
});
};

const updateRunTime = (runTime: number) => {
setRunTime((prevRunTime) => {
return {
...prevRunTime,
[this.remoteServerName]: runTime,
};
});
};

ws.onmessage = (event) => {
const eventData = JSON.parse(event.data);
updateRunTime(eventData.run_time);
const statsDataPoint: StatsDataPoint = {
timestamp: eventData.timestamp,
memoryRSSUsage: eventData.memory_rss_usage,
Expand Down
30 changes: 28 additions & 2 deletions server/app/query/view/[id]/components.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { useEffect, useState, useRef } from "react";
import { Source_Code_Pro } from "next/font/google";
import clsx from "clsx";
import { ChevronDownIcon, ChevronRightIcon } from "@heroicons/react/24/solid";
Expand Down Expand Up @@ -65,12 +66,37 @@ function secondsToTime(e: number) {

export function RunTimePill({
status,
runTime,
startTime,
endTime,
}: {
status: Status;
runTime: number | null;
startTime: number | null;
endTime: number | null;
}) {
const [runTime, setRunTime] = useState<number | null>(null);
const runTimeStr = runTime ? secondsToTime(runTime) : "N/A";
const intervalId = useRef<ReturnType<typeof setTimeout> | null>(null);

useEffect(() => {
if (intervalId.current !== null) {
// any time startTime or endTime change, we remove the old setInterval
// which runs the timer. if a new one is needed, it's created.
clearTimeout(intervalId.current);
}
if (startTime === null) {
setRunTime(null);
} else {
if (endTime !== null) {
setRunTime(endTime - startTime);
} else {
let newIntervalId = setInterval(() => {
setRunTime(Date.now() / 1000 - startTime);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you end up here with startTime == null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be covered above on line 86

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, is there a reason for checking startTime again on the line 89?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, nope! just a vestige of a refactor.

}, 1000);
intervalId.current = newIntervalId;
}
}
}, [startTime, endTime]);

return (
<div className={clsx(`rounded-full px-2`, StatusClassNameMixins[status])}>
{runTimeStr}
Expand Down
28 changes: 20 additions & 8 deletions server/app/query/view/[id]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import {
IPARemoteServers, //hack until the queryId is stored in a DB
StatusByRemoteServer,
StatsByRemoteServer,
RunTimeByRemoteServer,
StartTimeByRemoteServer,
EndTimeByRemoteServer,
initialStatusByRemoteServer,
initialStatsByRemoteServer,
initialRunTimeByRemoteServer,
initialStartTimeByRemoteServer,
initialEndTimeByRemoteServer,
} from "@/app/query/servers";
import { StatsComponent } from "@/app/query/view/[id]/charts";
import { JSONSafeParse } from "@/app/utils";
Expand Down Expand Up @@ -48,8 +50,10 @@ export default function QueryPage({ params }: { params: { id: string } }) {
useState<StatusByRemoteServer>(initialStatusByRemoteServer);
const [statsByRemoteServer, setStatsByRemoteServer] =
useState<StatsByRemoteServer>(initialStatsByRemoteServer);
const [runTimeByRemoteServer, setRunTimeByRemoteServer] =
useState<RunTimeByRemoteServer>(initialRunTimeByRemoteServer);
const [startTimeByRemoteServer, setStartTimeByRemoteServer] =
useState<StartTimeByRemoteServer>(initialStartTimeByRemoteServer);
const [endTimeByRemoteServer, setEndTimeByRemoteServer] =
useState<EndTimeByRemoteServer>(initialEndTimeByRemoteServer);

function flipLogsHidden() {
setLogsHidden(!logsHidden);
Expand Down Expand Up @@ -108,11 +112,12 @@ export default function QueryPage({ params }: { params: { id: string } }) {
const statusWs = remoteServer.openStatusSocket(
query.uuid,
setStatusByRemoteServer,
setStartTimeByRemoteServer,
setEndTimeByRemoteServer,
);
const statsWs = remoteServer.openStatsSocket(
query.uuid,
setStatsByRemoteServer,
setRunTimeByRemoteServer,
);
webSockets = [...webSockets, loggingWs, statusWs, statsWs];
}
Expand Down Expand Up @@ -212,8 +217,11 @@ export default function QueryPage({ params }: { params: { id: string } }) {
<dl className="grid grid-cols-1 gap-2 sm:grid-cols-2 lg:grid-cols-4">
{Object.values(IPARemoteServers).map(
(remoteServer: RemoteServer) => {
const runTime =
runTimeByRemoteServer[remoteServer.remoteServerName];
const startTime =
startTimeByRemoteServer[remoteServer.remoteServerName];
const endTime =
endTimeByRemoteServer[remoteServer.remoteServerName];

const status =
statusByRemoteServer[remoteServer.remoteServerName] ??
Status.UNKNOWN;
Expand All @@ -227,7 +235,11 @@ export default function QueryPage({ params }: { params: { id: string } }) {
{remoteServer.toString()} Run Time
</dt>
<dd>
<RunTimePill status={status} runTime={runTime} />
<RunTimePill
status={status}
startTime={startTime}
endTime={endTime}
/>
</dd>
</div>
);
Expand Down
62 changes: 50 additions & 12 deletions sidecar/app/query/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import time
from collections import namedtuple
from collections.abc import Iterable
from dataclasses import dataclass, field
from pathlib import Path
Expand All @@ -22,19 +23,24 @@ class QueryExistsError(Exception):
pass


StatusChangeEvent = namedtuple("StatusChangeEvent", ["status", "timestamp"])


@dataclass
class Query:
# pylint: disable=too-many-instance-attributes
query_id: str
current_step: Optional[Step] = field(init=False, default=None, repr=True)
_status: Status = field(init=False, default=Status.UNKNOWN)
start_time: Optional[float] = field(init=False, default=None)
end_time: Optional[float] = field(init=False, default=None)
stopped: bool = field(init=False, default=False)
logger: loguru.Logger = field(init=False, repr=False)
_logger_id: int = field(init=False, repr=False)
step_classes: ClassVar[list[type[Step]]] = []
_log_dir: Path = settings.root_path / Path("logs")
_status_history: list[StatusChangeEvent] = field(
init=False, default_factory=list, repr=True
)
_status_dir: Path = settings.root_path / Path("status_semaphore")

def __post_init__(self):
Expand Down Expand Up @@ -77,23 +83,55 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]:
if query:
return query
raise e
if query.status_file_path.exists():
with query.status_file_path.open("r") as f:
status_str = f.readline()
query.status = Status[status_str]
return query
return None
query.load_history_from_file()
if query.status == Status.UNKNOWN:
return None
return query

def load_history_from_file(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to add a comment showing an example how log file looks like, so we can compare and check that it is doing the right thing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's also a test that does this in the next PR

if self.status_file_path.exists():
self.logger.debug(
f"Loading query {self.query_id} status history "
f"from file {self.status_file_path}"
)
with self.status_file_path.open("r") as f:
for line in f:
status_str, timestamp = line.split(",")
self._status_history.append(
StatusChangeEvent(
status=Status[status_str], timestamp=float(timestamp)
)
)

@property
def _last_status_event(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it expected to be public with _ or @Property does not work as I think it works?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is intended to be private. @property is just a helper that makes instance.method a shorthand for instance.method().

if not self._status_history:
return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time())
return self._status_history[-1]

@property
def status_event_json(self):
status_event = {
"status": self._last_status_event.status.name,
"start_time": self._last_status_event.timestamp,
}
if self.status >= Status.COMPLETE and len(self._status_history) >= 2:
status_event["start_time"] = self._status_history[-2].timestamp
status_event["end_time"] = self._last_status_event.timestamp
return status_event

@property
def status(self) -> Status:
return self._status
return self._last_status_event.status

@status.setter
def status(self, status: Status):
self._status = status
with self.status_file_path.open("w") as f:
self.logger.debug(f"setting status: {status=}")
f.write(str(status.name))
if self.status <= Status.COMPLETE:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't see the enum definition, but does this check include failed queries as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's defined here as

class Status(IntEnum):
    UNKNOWN = auto()
    STARTING = auto()
    COMPILING = auto()
    WAITING_TO_START = auto()
    IN_PROGRESS = auto()
    COMPLETE = auto()
    KILLED = auto()
    NOT_FOUND = auto()
    CRASHED = auto()

so this prevents changing the status once it's COMPLETE or higher (including KILLED and CRASHED).

now = time.time()
self._status_history.append(StatusChangeEvent(status=status, timestamp=now))
with self.status_file_path.open("a") as f:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better to have a class that manages the history and flushes it to the backing storage, so Query can take itself off this unrelated to its core functionality business

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, that's exactly what #71 does 🙂

self.logger.debug(f"updating status: {status=}")
f.write(f"{status.name},{now}\n")

@property
def running(self):
Expand Down
9 changes: 5 additions & 4 deletions sidecar/app/routes/websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ async def status_websocket(websocket: WebSocket, query_id: str):
async with use_websocket(websocket) as websocket:
if query is None:
logger.warning(f"{query_id=} Status: {Status.NOT_FOUND.name}")
await websocket.send_json({"status": Status.NOT_FOUND.name})
await websocket.send_json(
{"status": Status.NOT_FOUND.name, "start_time": time.time()}
)
else:
while query.running:
logger.debug(f"{query_id=} Status: {query.status.name}")
await websocket.send_json({"status": query.status.name})
await websocket.send_json(query.status_event_json)
await asyncio.sleep(1)

logger.debug(f"{query_id=} Status: {query.status.name}")
await websocket.send_json({"status": query.status.name})
await websocket.send_json(query.status_event_json)


@router.websocket("/logs/{query_id}")
Expand Down Expand Up @@ -81,7 +83,6 @@ async def stats_websocket(websocket: WebSocket, query_id: str):
while query.running:
await websocket.send_json(
{
"run_time": query.run_time,
"cpu_percent": query.cpu_usage_percent,
"memory_rss_usage": query.memory_rss_usage,
"timestamp": time.time(),
Expand Down
Loading