From f2c897966cd58f39e94738becacee16728d01a66 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 27 Jan 2025 11:47:16 -0500 Subject: [PATCH] Collect and show VDBE-specific metrics (#521) --- src/brad/daemon/daemon.py | 11 +- src/brad/daemon/monitor.py | 27 ++++- src/brad/daemon/vdbe_metrics.py | 132 ++++++++++++++++++++++ src/brad/ui/manager_impl.py | 30 +++++ ui/src/App.jsx | 51 ++++++--- ui/src/components/PerfView.jsx | 122 ++++++++++---------- ui/src/components/SystemConfig.jsx | 53 ++++----- ui/src/components/styles/SystemConfig.css | 11 -- ui/src/metrics_utils.js | 33 ++++++ 9 files changed, 354 insertions(+), 116 deletions(-) create mode 100644 src/brad/daemon/vdbe_metrics.py create mode 100644 ui/src/metrics_utils.js diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index 8b43223a..75e918cb 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -23,6 +23,7 @@ ShutdownFrontEnd, Sentinel, MetricsReport, + VdbeMetricsReport, InternalCommandRequest, InternalCommandResponse, NewBlueprint, @@ -117,7 +118,6 @@ def __init__( self._blueprint_mgr = BlueprintManager( self._config, self._assets, self._schema_name ) - self._monitor = Monitor(self._config, self._blueprint_mgr) self._estimator_provider = _EstimatorProvider() self._providers: Optional[BlueprintProviders] = None self._planner: Optional[BlueprintPlanner] = None @@ -145,6 +145,12 @@ def __init__( self._vdbe_manager = None self._vdbe_process: Optional[_VdbeFrontEndProcess] = None + self._monitor = Monitor( + self._config, + self._blueprint_mgr, + create_vdbe_metrics=self._vdbe_manager is not None, + ) + # This is used to hold references to internal command tasks we create. # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task self._internal_command_tasks: Set[asyncio.Task] = set() @@ -459,6 +465,9 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None: if isinstance(message, MetricsReport): self._monitor.handle_metric_report(message) + elif isinstance(message, VdbeMetricsReport): + self._monitor.handle_vdbe_metric_report(message) + elif isinstance(message, InternalCommandRequest): task = asyncio.create_task( self._run_internal_command_request_response(message) diff --git a/src/brad/daemon/monitor.py b/src/brad/daemon/monitor.py index fa422b36..724dc085 100644 --- a/src/brad/daemon/monitor.py +++ b/src/brad/daemon/monitor.py @@ -5,11 +5,12 @@ from brad.blueprint.manager import BlueprintManager from brad.config.file import ConfigFile -from brad.daemon.messages import MetricsReport +from brad.daemon.messages import MetricsReport, VdbeMetricsReport from brad.daemon.metrics_source import MetricsSourceWithForecasting from brad.daemon.aurora_metrics import AuroraMetrics from brad.daemon.front_end_metrics import FrontEndMetrics from brad.daemon.redshift_metrics import RedshiftMetrics +from brad.daemon.vdbe_metrics import VdbeMetrics logger = logging.getLogger(__name__) @@ -21,6 +22,7 @@ def __init__( blueprint_mgr: BlueprintManager, forecasting_method: str = "constant", # {constant, moving_average, linear} forecasting_window_size: int = 5, # (Up to) how many past samples to base the forecast on + create_vdbe_metrics: bool = False, ) -> None: self._config = config self._blueprint_mgr = blueprint_mgr @@ -35,6 +37,12 @@ def __init__( self._front_end_metrics = FrontEndMetrics( config, forecasting_method, forecasting_window_size ) + if create_vdbe_metrics: + self._vdbe_metrics: Optional[VdbeMetrics] = VdbeMetrics( + config, forecasting_method, forecasting_window_size + ) + else: + self._vdbe_metrics = None def set_up_metrics_sources(self) -> None: """ @@ -120,12 +128,15 @@ async def fetch_latest(self) -> None: """ logger.debug("Fetching latest metrics...") futures = [] - for source in chain( + chain_parts = [ [self._aurora_writer_metrics], self._aurora_reader_metrics, [self._redshift_metrics], [self._front_end_metrics], - ): + ] + if self._vdbe_metrics is not None: + chain_parts.append([self._vdbe_metrics]) + for source in chain(*chain_parts): # type: ignore if source is None: continue futures.append(source.fetch_latest()) @@ -146,6 +157,13 @@ def handle_metric_report(self, report: MetricsReport) -> None: """ self._front_end_metrics.handle_metric_report(report) + def handle_vdbe_metric_report(self, report: VdbeMetricsReport) -> None: + """ + Used to pass on VDBE metrics to the underlying metrics source. + """ + if self._vdbe_metrics is not None: + self._vdbe_metrics.handle_metric_report(report) + def _print_key_metrics(self) -> None: # Used for debug purposes. if logger.level > logging.DEBUG: @@ -190,3 +208,6 @@ def redshift_metrics(self) -> MetricsSourceWithForecasting: def front_end_metrics(self) -> MetricsSourceWithForecasting: assert self._front_end_metrics is not None return self._front_end_metrics + + def vdbe_metrics(self) -> Optional[MetricsSourceWithForecasting]: + return self._vdbe_metrics diff --git a/src/brad/daemon/vdbe_metrics.py b/src/brad/daemon/vdbe_metrics.py new file mode 100644 index 00000000..9535af88 --- /dev/null +++ b/src/brad/daemon/vdbe_metrics.py @@ -0,0 +1,132 @@ +import math +import logging +import pandas as pd +import pytz +import copy +from typing import Dict, List, Optional +from datetime import datetime +from ddsketch import DDSketch + +from .metrics_source import MetricsSourceWithForecasting +from brad.config.file import ConfigFile +from brad.daemon.messages import VdbeMetricsReport +from brad.daemon.metrics_logger import MetricsLogger +from brad.utils.streaming_metric import StreamingMetric +from brad.utils import log_verbose +from brad.utils.time_periods import universal_now + +logger = logging.getLogger(__name__) + + +class VdbeMetrics(MetricsSourceWithForecasting): + def __init__( + self, + config: ConfigFile, + forecasting_method: str, + forecasting_window_size: int, + ) -> None: + self._config = config + self._epoch_length = self._config.epoch_length + samples_per_epoch = ( + self._epoch_length.total_seconds() + / self._config.front_end_metrics_reporting_period_seconds + ) + self._sm_window_size = math.ceil(200 * samples_per_epoch) + + # (vdbe_id -> metric) + self._sketch_front_end_metrics: Dict[int, StreamingMetric[DDSketch]] = {} + # All known VDBE IDs. + self._ordered_metrics: List[int] = [] + self._values_df = pd.DataFrame([]) + self._logger = MetricsLogger.create_from_config( + self._config, "brad_vdbe_metrics_front_end.log" + ) + + super().__init__( + self._epoch_length, forecasting_method, forecasting_window_size + ) + + async def fetch_latest(self) -> None: + now = universal_now() + num_epochs = 5 + end_time = ( + now - (now - datetime.min.replace(tzinfo=pytz.UTC)) % self._epoch_length + ) + start_time = end_time - num_epochs * self._epoch_length + + timestamps = [] + data_cols: Dict[str, List[float]] = { + str(metric_name): [] for metric_name in self._ordered_metrics + } + + for offset in range(num_epochs): + window_start = start_time + offset * self._epoch_length + window_end = window_start + self._epoch_length + + logger.debug( + "Loading front end metrics for %s -- %s", window_start, window_end + ) + + for vdbe_id, sketches in self._sketch_front_end_metrics.items(): + merged = None + num_matching = 0 + min_ts = None + max_ts = None + + for sketch, ts in sketches.window_iterator(window_start, window_end): + # These stats are for debug logging. + num_matching += 1 + if min_ts is not None: + min_ts = min(min_ts, ts) + else: + min_ts = ts + if max_ts is not None: + max_ts = max(max_ts, ts) + else: + max_ts = ts + + if merged is not None: + merged.merge(sketch) + else: + # DDSketch.merge() is an inplace method. We want + # to avoid modifying the stored sketches so we + # make a copy. + merged = copy.deepcopy(sketch) + + if merged is None: + logger.warning("Missing latency sketch values for VDBE %d", vdbe_id) + p90_val = 0.0 + else: + p90_val_cand = merged.get_quantile_value(0.9) + p90_val = p90_val_cand if p90_val_cand is not None else 0.0 + + data_cols[str(vdbe_id)].append(p90_val) + + timestamps.append(window_end) + + new_metrics = pd.DataFrame(data_cols, index=timestamps) + self._values_df = self._get_updated_metrics(new_metrics) + await super().fetch_latest() + + def _metrics_values(self) -> pd.DataFrame: + return self._values_df + + def _metrics_logger(self) -> Optional[MetricsLogger]: + return self._logger + + def handle_metric_report(self, report: VdbeMetricsReport) -> None: + now = universal_now() + + for vdbe_id, sketch in report.query_latency_sketches(): + if vdbe_id not in self._sketch_front_end_metrics: + self._sketch_front_end_metrics[vdbe_id] = StreamingMetric( + self._sm_window_size + ) + self._ordered_metrics.append(vdbe_id) + self._sketch_front_end_metrics[vdbe_id].add_sample(sketch, now) + + log_verbose( + logger, + "Received VDBE metrics report: (ts: %s)", + now, + ) diff --git a/src/brad/ui/manager_impl.py b/src/brad/ui/manager_impl.py index 4e08181b..9efb765d 100644 --- a/src/brad/ui/manager_impl.py +++ b/src/brad/ui/manager_impl.py @@ -89,16 +89,46 @@ def get_metrics(num_values: int = 3, use_generated: bool = False) -> MetricsData tlat = metrics[FrontEndMetric.TxnLatencySecondP90.value] tlat_tm = TimestampedMetrics(timestamps=list(tlat.index), values=list(tlat)) + vdbe_metrics = manager.monitor.vdbe_metrics() + assert vdbe_metrics is not None + vdbe_metrics_values = vdbe_metrics.read_k_most_recent(k=num_values) + vdbes = list(vdbe_metrics_values.columns) + vdbe_latency_dict = {} + for vdbe_id in vdbes: + vdbe_tm = TimestampedMetrics( + timestamps=list(vdbe_metrics_values.index), + values=list(vdbe_metrics_values[vdbe_id]), + ) + vdbe_latency_dict[f"vdbe:{vdbe_id}"] = vdbe_tm + if use_generated: qlat_gen = np.random.normal(loc=15.0, scale=5.0, size=len(qlat)) tlat_gen = np.random.normal(loc=0.015, scale=0.005, size=len(tlat)) qlat_tm.values = list(qlat_gen) tlat_tm.values = list(tlat_gen) + # When VDBEs are newly created, we have no historical metric data. We fill + # in zeros so that the dashboard can display something other than an empty + # graph. We use the qlat/tlat metrics to determine the timestamps as our + # monitor fills them in with zeros when missing data. + known_vdbes = manager.vdbe_mgr.engines() + if len(vdbe_latency_dict) != len(known_vdbes): + timestamps = list(qlat.index) + zeros = [0.0] * len(timestamps) + for vdbe in known_vdbes: + metric_key = f"vdbe:{vdbe.internal_id}" + if metric_key in vdbe_latency_dict: + continue + vdbe_latency_dict[metric_key] = TimestampedMetrics( + timestamps=timestamps, + values=zeros, + ) + return MetricsData( named_metrics={ FrontEndMetric.QueryLatencySecondP90.value: qlat_tm, FrontEndMetric.TxnLatencySecondP90.value: tlat_tm, + **vdbe_latency_dict, } ) diff --git a/ui/src/App.jsx b/ui/src/App.jsx index d1192bef..f1b8d6ca 100644 --- a/ui/src/App.jsx +++ b/ui/src/App.jsx @@ -2,6 +2,7 @@ import { useCallback, useState, useEffect } from "react"; import Header from "./components/Header"; import PerfView from "./components/PerfView"; import OverallInfraView from "./components/OverallInfraView"; +import SystemConfig from "./components/SystemConfig"; import { fetchSystemState } from "./api"; import "./App.css"; @@ -26,8 +27,13 @@ function App() { shownVdbe: null, }, }); + const [config, setConfig] = useState({ + showVdbeSpecificMetrics: true, + }); + const [showConfigModal, setShowConfigModal] = useState(false); - const refreshData = useCallback(async () => { + // Used for system state refresh. + const refreshSystemState = useCallback(async () => { const newSystemState = await fetchSystemState(); // Not the best way to check for equality. if (JSON.stringify(systemState) !== JSON.stringify(newSystemState)) { @@ -35,26 +41,31 @@ function App() { } }, [systemState, setSystemState]); - // Fetch updated system state periodically. + // Periodically refresh system state. useEffect(() => { - refreshData(); - const intervalId = setInterval(refreshData, REFRESH_INTERVAL_MS); + refreshSystemState(); + const intervalId = setInterval(refreshSystemState, REFRESH_INTERVAL_MS); return () => { if (intervalId === null) { return; } clearInterval(intervalId); }; - }, [refreshData]); + }, [refreshSystemState]); // Bind keyboard shortcut for internal config menu. - const handleKeyPress = useCallback((event) => { - if (document.activeElement !== document.body) { - // We only want to handle key presses when no input is focused. - return; - } - // Currently a no-op. - }, []); + const handleKeyPress = useCallback( + (event) => { + if (document.activeElement !== document.body) { + // We only want to handle key presses when no input is focused. + return; + } + if (event.key === "c") { + setShowConfigModal(true); + } + }, + [setShowConfigModal], + ); useEffect(() => { document.addEventListener("keyup", handleKeyPress); @@ -104,6 +115,13 @@ function App() { setAppState({ ...appState, vdbeForm: { open: false, shownVdbe: null } }); }; + const changeConfig = useCallback( + (changes) => { + setConfig({ ...config, ...changes }); + }, + [config, setConfig], + ); + return ( <>
+ setShowConfigModal(false)} + config={config} + onConfigChange={changeConfig} + /> ); } diff --git a/ui/src/components/PerfView.jsx b/ui/src/components/PerfView.jsx index 3e9a8547..fc6b9b0c 100644 --- a/ui/src/components/PerfView.jsx +++ b/ui/src/components/PerfView.jsx @@ -1,45 +1,14 @@ -import { useEffect, useState, useRef, useCallback } from "react"; -import { fetchMetrics } from "../api"; -import MetricsManager from "../metrics"; +import { useCallback, useState, useEffect, useRef } from "react"; import Panel from "./Panel"; import TroubleshootRoundedIcon from "@mui/icons-material/TroubleshootRounded"; import VdbeMetricsView from "./VdbeMetricsView"; +import { parseMetrics, extractMetrics } from "../metrics_utils"; +import MetricsManager from "../metrics"; +import { fetchMetrics } from "../api"; import "./styles/PerfView.css"; const REFRESH_INTERVAL_MS = 30 * 1000; -function extractMetrics({ metrics }, metricName, multiplier) { - if (multiplier == null) { - multiplier = 1.0; - } - if (!metrics.hasOwnProperty(metricName)) { - return { - x: [], - y: [], - }; - } else { - const innerMetrics = metrics[metricName]; - return { - x: innerMetrics.timestamps.map((val) => val.toLocaleTimeString("en-US")), - y: innerMetrics.values.map((val) => val * multiplier), - }; - } -} - -function parseMetrics({ named_metrics }) { - const result = {}; - Object.entries(named_metrics).forEach(([metricName, metricValues]) => { - const parsedTs = metricValues.timestamps.map( - (timestamp) => new Date(timestamp), - ); - result[metricName] = { - timestamps: parsedTs, - values: metricValues.values, - }; - }); - return result; -} - function WindowSelector({ windowSizeMinutes, onWindowSizeChange }) { function className(windowSizeOption) { return `perf-view-winsel-button ${windowSizeOption === windowSizeMinutes ? "selected" : ""}`; @@ -64,28 +33,48 @@ function WindowSelector({ windowSizeMinutes, onWindowSizeChange }) { ); } -function PerfView({ virtualInfra, showingPreview }) { - const [windowSizeMinutes, setWindowSizeMinutes] = useState(10); - const [metricsData, setMetricsData] = useState({ - windowSizeMinutes, - metrics: {}, +function vdbeWithMetrics(virtualInfra, displayMetricsData, showSpecific) { + return virtualInfra?.engines?.map((vdbe, idx) => { + let metrics; + if (showSpecific) { + metrics = extractMetrics(displayMetricsData, `vdbe:${vdbe.internal_id}`); + } else { + metrics = extractMetrics( + displayMetricsData, + idx === 0 ? "txn_latency_s_p90" : "query_latency_s_p90", + ); + } + return { vdbe, metrics }; }); +} +function PerfView({ virtualInfra, showingPreview, showVdbeSpecificMetrics }) { const metricsManagerRef = useRef(null); - function getMetricsManager() { + const getMetricsManager = useCallback(() => { if (metricsManagerRef.current == null) { metricsManagerRef.current = new MetricsManager(); } return metricsManagerRef.current; + }, [metricsManagerRef]); + + const [windowSizeMinutes, setWindowSizeMinutes] = useState(10); + const [displayMetricsData, setDisplayMetricsData] = useState({ + windowSizeMinutes: 10, + metrics: {}, + }); + + if (displayMetricsData.windowSizeMinutes !== windowSizeMinutes) { + changeDisplayMetricsWindow(windowSizeMinutes); } - const refreshData = useCallback(async () => { + const refreshMetrics = useCallback(async () => { const rawMetrics = await fetchMetrics(60, /*useGenerated=*/ false); const fetchedMetrics = parseMetrics(rawMetrics); const metricsManager = getMetricsManager(); const addedNewMetrics = metricsManager.mergeInMetrics(fetchedMetrics); if (addedNewMetrics) { - setMetricsData({ + const { windowSizeMinutes } = displayMetricsData; + setDisplayMetricsData({ windowSizeMinutes, metrics: metricsManager.getMetricsInWindow( windowSizeMinutes, @@ -93,33 +82,38 @@ function PerfView({ virtualInfra, showingPreview }) { ), }); } - }, [metricsManagerRef, windowSizeMinutes, setMetricsData]); + }, [getMetricsManager, displayMetricsData, setDisplayMetricsData]); + + // Refresh metrics on load and when virtualInfra changes. + useEffect(() => { + refreshMetrics(); + }, [virtualInfra]); + // Set up an interval to refresh metrics every `REFRESH_INTERVAL_MS` + // milliseconds. useEffect(() => { - // Run first fetch immediately. - refreshData(); - const intervalId = setInterval(refreshData, REFRESH_INTERVAL_MS); + const intervalId = setInterval(refreshMetrics, REFRESH_INTERVAL_MS); return () => { if (intervalId === null) { return; } clearInterval(intervalId); }; - }, [refreshData]); + }, [refreshMetrics]); - if (metricsData.windowSizeMinutes !== windowSizeMinutes) { - const metricsManager = getMetricsManager(); - setMetricsData({ - windowSizeMinutes, - metrics: metricsManager.getMetricsInWindow( + const changeDisplayMetricsWindow = useCallback( + (windowSizeMinutes) => { + const metricsManager = getMetricsManager(); + setDisplayMetricsData({ windowSizeMinutes, - /*extendForward=*/ true, - ), - }); - } - - const queryLatMetrics = extractMetrics(metricsData, "query_latency_s_p90"); - const txnLatMetrics = extractMetrics(metricsData, "txn_latency_s_p90"); + metrics: metricsManager.getMetricsInWindow( + windowSizeMinutes, + /*extendForward=*/ true, + ), + }); + }, + [getMetricsManager, setDisplayMetricsData], + ); const columnStyle = { flexGrow: 2, @@ -143,11 +137,15 @@ function PerfView({ virtualInfra, showingPreview }) {
- {virtualInfra?.engines?.map((vdbe, idx) => ( + {vdbeWithMetrics( + virtualInfra, + displayMetricsData, + showVdbeSpecificMetrics, + )?.map(({ vdbe, metrics }) => ( ))}
diff --git a/ui/src/components/SystemConfig.jsx b/ui/src/components/SystemConfig.jsx index 2baee39c..0983d856 100644 --- a/ui/src/components/SystemConfig.jsx +++ b/ui/src/components/SystemConfig.jsx @@ -1,11 +1,18 @@ import TextField from "@mui/material/TextField"; -import Modal from "@mui/material/Modal"; import Button from "@mui/material/Button"; +import FormGroup from "@mui/material/FormGroup"; +import FormControlLabel from "@mui/material/FormControlLabel"; +import Switch from "@mui/material/Switch"; +import Dialog from "@mui/material/Dialog"; +import DialogTitle from "@mui/material/DialogTitle"; +import DialogContent from "@mui/material/DialogContent"; +import DialogActions from "@mui/material/DialogActions"; import "./styles/SystemConfig.css"; +// Currently unused. function EndpointInput({ name, host, port, onChange }) { return ( -
+ onChange({ host, port: +event.target.value })} /> -
+ ); } -function SystemConfig({ endpoints, open, onCloseClick, onChange }) { - const { workloadRunners } = endpoints; +function SystemConfig({ open, onCloseClick, config, onConfigChange }) { + const { showVdbeSpecificMetrics } = config; return ( - -
-

Dashboard Configuration

- {workloadRunners.map((endpoint, index) => ( - - onChange({ - field: "workloadRunners", - value: workloadRunners.map((innerEndpoint, innerIndex) => - innerIndex === index ? newEndpoint : innerEndpoint, - ), - }) + + Dashboard Configuration + + + } + label="Display VDBE-specific Metrics" + onChange={(event) => + onConfigChange({ showVdbeSpecificMetrics: event.target.checked }) } /> - ))} - -
-
+ + + + + + ); } diff --git a/ui/src/components/styles/SystemConfig.css b/ui/src/components/styles/SystemConfig.css index 90100b0e..50fd34f7 100644 --- a/ui/src/components/styles/SystemConfig.css +++ b/ui/src/components/styles/SystemConfig.css @@ -1,14 +1,3 @@ -.system-config-modal { - position: absolute; - top: 50%; - left: 50%; - transform: translate(-50%, -50%); - background-color: #fff; - border: 2px solid #000; - z-index: 100; - padding: 20px; -} - .system-config-modal .endpoint-input { margin: 20px 0; } diff --git a/ui/src/metrics_utils.js b/ui/src/metrics_utils.js new file mode 100644 index 00000000..49f8be75 --- /dev/null +++ b/ui/src/metrics_utils.js @@ -0,0 +1,33 @@ +function extractMetrics({ metrics }, metricName, multiplier) { + if (multiplier == null) { + multiplier = 1.0; + } + if (!Object.prototype.hasOwnProperty.call(metrics, metricName)) { + return { + x: [], + y: [], + }; + } else { + const innerMetrics = metrics[metricName]; + return { + x: innerMetrics.timestamps.map((val) => val.toLocaleTimeString("en-US")), + y: innerMetrics.values.map((val) => val * multiplier), + }; + } +} + +function parseMetrics({ named_metrics }) { + const result = {}; + Object.entries(named_metrics).forEach(([metricName, metricValues]) => { + const parsedTs = metricValues.timestamps.map( + (timestamp) => new Date(timestamp), + ); + result[metricName] = { + timestamps: parsedTs, + values: metricValues.values, + }; + }); + return result; +} + +export { extractMetrics, parseMetrics };