Skip to content

Commit

Permalink
Collect and show VDBE-specific metrics (#521)
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy authored Jan 27, 2025
1 parent 30d5f61 commit f2c8979
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 116 deletions.
11 changes: 10 additions & 1 deletion src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ShutdownFrontEnd,
Sentinel,
MetricsReport,
VdbeMetricsReport,
InternalCommandRequest,
InternalCommandResponse,
NewBlueprint,
Expand Down Expand Up @@ -117,7 +118,6 @@ def __init__(
self._blueprint_mgr = BlueprintManager(
self._config, self._assets, self._schema_name
)
self._monitor = Monitor(self._config, self._blueprint_mgr)
self._estimator_provider = _EstimatorProvider()
self._providers: Optional[BlueprintProviders] = None
self._planner: Optional[BlueprintPlanner] = None
Expand Down Expand Up @@ -145,6 +145,12 @@ def __init__(
self._vdbe_manager = None
self._vdbe_process: Optional[_VdbeFrontEndProcess] = None

self._monitor = Monitor(
self._config,
self._blueprint_mgr,
create_vdbe_metrics=self._vdbe_manager is not None,
)

# This is used to hold references to internal command tasks we create.
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
self._internal_command_tasks: Set[asyncio.Task] = set()
Expand Down Expand Up @@ -459,6 +465,9 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None:
if isinstance(message, MetricsReport):
self._monitor.handle_metric_report(message)

elif isinstance(message, VdbeMetricsReport):
self._monitor.handle_vdbe_metric_report(message)

elif isinstance(message, InternalCommandRequest):
task = asyncio.create_task(
self._run_internal_command_request_response(message)
Expand Down
27 changes: 24 additions & 3 deletions src/brad/daemon/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

from brad.blueprint.manager import BlueprintManager
from brad.config.file import ConfigFile
from brad.daemon.messages import MetricsReport
from brad.daemon.messages import MetricsReport, VdbeMetricsReport
from brad.daemon.metrics_source import MetricsSourceWithForecasting
from brad.daemon.aurora_metrics import AuroraMetrics
from brad.daemon.front_end_metrics import FrontEndMetrics
from brad.daemon.redshift_metrics import RedshiftMetrics
from brad.daemon.vdbe_metrics import VdbeMetrics

logger = logging.getLogger(__name__)

Expand All @@ -21,6 +22,7 @@ def __init__(
blueprint_mgr: BlueprintManager,
forecasting_method: str = "constant", # {constant, moving_average, linear}
forecasting_window_size: int = 5, # (Up to) how many past samples to base the forecast on
create_vdbe_metrics: bool = False,
) -> None:
self._config = config
self._blueprint_mgr = blueprint_mgr
Expand All @@ -35,6 +37,12 @@ def __init__(
self._front_end_metrics = FrontEndMetrics(
config, forecasting_method, forecasting_window_size
)
if create_vdbe_metrics:
self._vdbe_metrics: Optional[VdbeMetrics] = VdbeMetrics(
config, forecasting_method, forecasting_window_size
)
else:
self._vdbe_metrics = None

def set_up_metrics_sources(self) -> None:
"""
Expand Down Expand Up @@ -120,12 +128,15 @@ async def fetch_latest(self) -> None:
"""
logger.debug("Fetching latest metrics...")
futures = []
for source in chain(
chain_parts = [
[self._aurora_writer_metrics],
self._aurora_reader_metrics,
[self._redshift_metrics],
[self._front_end_metrics],
):
]
if self._vdbe_metrics is not None:
chain_parts.append([self._vdbe_metrics])
for source in chain(*chain_parts): # type: ignore
if source is None:
continue
futures.append(source.fetch_latest())
Expand All @@ -146,6 +157,13 @@ def handle_metric_report(self, report: MetricsReport) -> None:
"""
self._front_end_metrics.handle_metric_report(report)

def handle_vdbe_metric_report(self, report: VdbeMetricsReport) -> None:
"""
Used to pass on VDBE metrics to the underlying metrics source.
"""
if self._vdbe_metrics is not None:
self._vdbe_metrics.handle_metric_report(report)

def _print_key_metrics(self) -> None:
# Used for debug purposes.
if logger.level > logging.DEBUG:
Expand Down Expand Up @@ -190,3 +208,6 @@ def redshift_metrics(self) -> MetricsSourceWithForecasting:
def front_end_metrics(self) -> MetricsSourceWithForecasting:
assert self._front_end_metrics is not None
return self._front_end_metrics

def vdbe_metrics(self) -> Optional[MetricsSourceWithForecasting]:
return self._vdbe_metrics
132 changes: 132 additions & 0 deletions src/brad/daemon/vdbe_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import math
import logging
import pandas as pd
import pytz
import copy
from typing import Dict, List, Optional
from datetime import datetime
from ddsketch import DDSketch

from .metrics_source import MetricsSourceWithForecasting
from brad.config.file import ConfigFile
from brad.daemon.messages import VdbeMetricsReport
from brad.daemon.metrics_logger import MetricsLogger
from brad.utils.streaming_metric import StreamingMetric
from brad.utils import log_verbose
from brad.utils.time_periods import universal_now

logger = logging.getLogger(__name__)


class VdbeMetrics(MetricsSourceWithForecasting):
def __init__(
self,
config: ConfigFile,
forecasting_method: str,
forecasting_window_size: int,
) -> None:
self._config = config
self._epoch_length = self._config.epoch_length
samples_per_epoch = (
self._epoch_length.total_seconds()
/ self._config.front_end_metrics_reporting_period_seconds
)
self._sm_window_size = math.ceil(200 * samples_per_epoch)

# (vdbe_id -> metric)
self._sketch_front_end_metrics: Dict[int, StreamingMetric[DDSketch]] = {}
# All known VDBE IDs.
self._ordered_metrics: List[int] = []
self._values_df = pd.DataFrame([])
self._logger = MetricsLogger.create_from_config(
self._config, "brad_vdbe_metrics_front_end.log"
)

super().__init__(
self._epoch_length, forecasting_method, forecasting_window_size
)

async def fetch_latest(self) -> None:
now = universal_now()
num_epochs = 5
end_time = (
now - (now - datetime.min.replace(tzinfo=pytz.UTC)) % self._epoch_length
)
start_time = end_time - num_epochs * self._epoch_length

timestamps = []
data_cols: Dict[str, List[float]] = {
str(metric_name): [] for metric_name in self._ordered_metrics
}

for offset in range(num_epochs):
window_start = start_time + offset * self._epoch_length
window_end = window_start + self._epoch_length

logger.debug(
"Loading front end metrics for %s -- %s", window_start, window_end
)

for vdbe_id, sketches in self._sketch_front_end_metrics.items():
merged = None
num_matching = 0
min_ts = None
max_ts = None

for sketch, ts in sketches.window_iterator(window_start, window_end):
# These stats are for debug logging.
num_matching += 1
if min_ts is not None:
min_ts = min(min_ts, ts)
else:
min_ts = ts
if max_ts is not None:
max_ts = max(max_ts, ts)
else:
max_ts = ts

if merged is not None:
merged.merge(sketch)
else:
# DDSketch.merge() is an inplace method. We want
# to avoid modifying the stored sketches so we
# make a copy.
merged = copy.deepcopy(sketch)

if merged is None:
logger.warning("Missing latency sketch values for VDBE %d", vdbe_id)
p90_val = 0.0
else:
p90_val_cand = merged.get_quantile_value(0.9)
p90_val = p90_val_cand if p90_val_cand is not None else 0.0

data_cols[str(vdbe_id)].append(p90_val)

timestamps.append(window_end)

new_metrics = pd.DataFrame(data_cols, index=timestamps)
self._values_df = self._get_updated_metrics(new_metrics)
await super().fetch_latest()

def _metrics_values(self) -> pd.DataFrame:
return self._values_df

def _metrics_logger(self) -> Optional[MetricsLogger]:
return self._logger

def handle_metric_report(self, report: VdbeMetricsReport) -> None:
now = universal_now()

for vdbe_id, sketch in report.query_latency_sketches():
if vdbe_id not in self._sketch_front_end_metrics:
self._sketch_front_end_metrics[vdbe_id] = StreamingMetric(
self._sm_window_size
)
self._ordered_metrics.append(vdbe_id)
self._sketch_front_end_metrics[vdbe_id].add_sample(sketch, now)

log_verbose(
logger,
"Received VDBE metrics report: (ts: %s)",
now,
)
30 changes: 30 additions & 0 deletions src/brad/ui/manager_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,46 @@ def get_metrics(num_values: int = 3, use_generated: bool = False) -> MetricsData
tlat = metrics[FrontEndMetric.TxnLatencySecondP90.value]
tlat_tm = TimestampedMetrics(timestamps=list(tlat.index), values=list(tlat))

vdbe_metrics = manager.monitor.vdbe_metrics()
assert vdbe_metrics is not None
vdbe_metrics_values = vdbe_metrics.read_k_most_recent(k=num_values)
vdbes = list(vdbe_metrics_values.columns)
vdbe_latency_dict = {}
for vdbe_id in vdbes:
vdbe_tm = TimestampedMetrics(
timestamps=list(vdbe_metrics_values.index),
values=list(vdbe_metrics_values[vdbe_id]),
)
vdbe_latency_dict[f"vdbe:{vdbe_id}"] = vdbe_tm

if use_generated:
qlat_gen = np.random.normal(loc=15.0, scale=5.0, size=len(qlat))
tlat_gen = np.random.normal(loc=0.015, scale=0.005, size=len(tlat))
qlat_tm.values = list(qlat_gen)
tlat_tm.values = list(tlat_gen)

# When VDBEs are newly created, we have no historical metric data. We fill
# in zeros so that the dashboard can display something other than an empty
# graph. We use the qlat/tlat metrics to determine the timestamps as our
# monitor fills them in with zeros when missing data.
known_vdbes = manager.vdbe_mgr.engines()
if len(vdbe_latency_dict) != len(known_vdbes):
timestamps = list(qlat.index)
zeros = [0.0] * len(timestamps)
for vdbe in known_vdbes:
metric_key = f"vdbe:{vdbe.internal_id}"
if metric_key in vdbe_latency_dict:
continue
vdbe_latency_dict[metric_key] = TimestampedMetrics(
timestamps=timestamps,
values=zeros,
)

return MetricsData(
named_metrics={
FrontEndMetric.QueryLatencySecondP90.value: qlat_tm,
FrontEndMetric.TxnLatencySecondP90.value: tlat_tm,
**vdbe_latency_dict,
}
)

Expand Down
51 changes: 38 additions & 13 deletions ui/src/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { useCallback, useState, useEffect } from "react";
import Header from "./components/Header";
import PerfView from "./components/PerfView";
import OverallInfraView from "./components/OverallInfraView";
import SystemConfig from "./components/SystemConfig";
import { fetchSystemState } from "./api";

import "./App.css";
Expand All @@ -26,35 +27,45 @@ function App() {
shownVdbe: null,
},
});
const [config, setConfig] = useState({
showVdbeSpecificMetrics: true,
});
const [showConfigModal, setShowConfigModal] = useState(false);

const refreshData = useCallback(async () => {
// Used for system state refresh.
const refreshSystemState = useCallback(async () => {
const newSystemState = await fetchSystemState();
// Not the best way to check for equality.
if (JSON.stringify(systemState) !== JSON.stringify(newSystemState)) {
setSystemState(newSystemState);
}
}, [systemState, setSystemState]);

// Fetch updated system state periodically.
// Periodically refresh system state.
useEffect(() => {
refreshData();
const intervalId = setInterval(refreshData, REFRESH_INTERVAL_MS);
refreshSystemState();
const intervalId = setInterval(refreshSystemState, REFRESH_INTERVAL_MS);
return () => {
if (intervalId === null) {
return;
}
clearInterval(intervalId);
};
}, [refreshData]);
}, [refreshSystemState]);

// Bind keyboard shortcut for internal config menu.
const handleKeyPress = useCallback((event) => {
if (document.activeElement !== document.body) {
// We only want to handle key presses when no input is focused.
return;
}
// Currently a no-op.
}, []);
const handleKeyPress = useCallback(
(event) => {
if (document.activeElement !== document.body) {
// We only want to handle key presses when no input is focused.
return;
}
if (event.key === "c") {
setShowConfigModal(true);
}
},
[setShowConfigModal],
);

useEffect(() => {
document.addEventListener("keyup", handleKeyPress);
Expand Down Expand Up @@ -104,6 +115,13 @@ function App() {
setAppState({ ...appState, vdbeForm: { open: false, shownVdbe: null } });
};

const changeConfig = useCallback(
(changes) => {
setConfig({ ...config, ...changes });
},
[config, setConfig],
);

return (
<>
<Header
Expand All @@ -119,13 +137,20 @@ function App() {
openVdbeForm={openVdbeForm}
closeVdbeForm={closeVdbeForm}
setPreviewBlueprint={setPreviewBlueprint}
refreshData={refreshData}
refreshData={refreshSystemState}
/>
<PerfView
virtualInfra={systemState.virtual_infra}
showingPreview={previewForm.shownPreviewBlueprint != null}
showVdbeSpecificMetrics={config.showVdbeSpecificMetrics}
/>
</div>
<SystemConfig
open={showConfigModal}
onCloseClick={() => setShowConfigModal(false)}
config={config}
onConfigChange={changeConfig}
/>
</>
);
}
Expand Down
Loading

0 comments on commit f2c8979

Please sign in to comment.