Skip to content

Commit

Permalink
Various VDBE enhancements (#522)
Browse files Browse the repository at this point in the history
- Record metrics per VDBE and handle them appropriately
- Validate table accesses
  • Loading branch information
geoffxy authored Jan 27, 2025
1 parent f2c8979 commit 00cc7bc
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 42 deletions.
6 changes: 3 additions & 3 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,6 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None:
if isinstance(message, MetricsReport):
self._monitor.handle_metric_report(message)

elif isinstance(message, VdbeMetricsReport):
self._monitor.handle_vdbe_metric_report(message)

elif isinstance(message, InternalCommandRequest):
task = asyncio.create_task(
self._run_internal_command_request_response(message)
Expand Down Expand Up @@ -575,6 +572,9 @@ async def _read_vdbe_messages(self, vdbe_process: "_VdbeFrontEndProcess") -> Non
)
vdbe_process.mailbox.on_new_message((None,))

elif isinstance(message, VdbeMetricsReport):
self._monitor.handle_vdbe_metric_report(message)

else:
logger.debug(
"Received unexpected message from front end %d: %s",
Expand Down
4 changes: 4 additions & 0 deletions src/brad/daemon/vdbe_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ def _metrics_logger(self) -> Optional[MetricsLogger]:

def handle_metric_report(self, report: VdbeMetricsReport) -> None:
now = universal_now()
logger.debug("Handling VDBE metrics report: (ts: %s)", now)
for vdbe_id, sketch in report.query_latency_sketches():
p90 = sketch.get_quantile_value(0.9)
logger.debug("Has sketch for VDBE %d. p90: %f", vdbe_id, p90)

for vdbe_id, sketch in report.query_latency_sketches():
if vdbe_id not in self._sketch_front_end_metrics:
Expand Down
59 changes: 40 additions & 19 deletions src/brad/front_end/vdbe/vdbe_front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
from brad.front_end.errors import QueryError
from brad.front_end.session import SessionManager, SessionId
from brad.front_end.watchdog import Watchdog
from brad.front_end.vdbe.vdbe_endpoint_manager import VdbeEndpointManager
from brad.provisioning.directory import Directory
from brad.row_list import RowList
from brad.utils import log_verbose, create_custom_logger
from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff
from brad.utils.run_time_reservoir import RunTimeReservoir
from brad.utils.time_periods import universal_now
from brad.query_rep import QueryRep
from brad.vdbe.manager import VdbeFrontEndManager
from brad.vdbe.models import VirtualInfrastructure
from brad.front_end.vdbe.vdbe_endpoint_manager import VdbeEndpointManager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -94,7 +95,8 @@ def __init__(
)
self._daemon_messages_task: Optional[asyncio.Task[None]] = None

self._reset_latency_sketches()
# Stored per VDBE.
self._query_latency_sketches: Dict[int, DDSketch] = {}
self._brad_metrics_reporting_task: Optional[asyncio.Task[None]] = None

# Used to re-establish engine connections.
Expand Down Expand Up @@ -247,9 +249,17 @@ async def _run_query_impl(
# semicolon if it exists.
# NOTE: BRAD does not yet support having multiple
# semicolon-separated queries in one request.
query = self._clean_query_str(query)
query_rep = QueryRep(self._clean_query_str(query))

# Verify that the query is not accessing tables that are not part of
# the VDBE.
for table_name in query_rep.tables():
if table_name not in vdbe.table_names_set:
raise QueryError(
f"Table '{table_name}' not found in VDBE '{vdbe.name}'",
is_transient=False,
)

# TODO: Validate table accesses.
engine_to_use = vdbe.mapped_to

log_verbose(
Expand All @@ -268,10 +278,10 @@ async def _run_query_impl(
# HACK: To work around dialect differences between
# Athena/Aurora/Redshift for now. This should be replaced by
# a more robust translation layer.
if engine_to_use == Engine.Athena and "ascii" in query:
translated_query = query.replace("ascii", "codepoint")
if engine_to_use == Engine.Athena and "ascii" in query_rep.raw_query:
translated_query = query_rep.raw_query.replace("ascii", "codepoint")
else:
translated_query = query
translated_query = query_rep.raw_query
start = universal_now()
await cursor.execute(translated_query)
end = universal_now()
Expand Down Expand Up @@ -299,11 +309,14 @@ async def _run_query_impl(
# Error when executing the query.
raise QueryError.from_exception(ex, is_transient_error)

# Decide whether to log the query.
# Record the run time for later reporting.
run_time_s = end - start
run_time_s_float = run_time_s.total_seconds()
# TODO: Should be per VDBE.
self._query_latency_sketch.add(run_time_s_float)
try:
self._query_latency_sketches[vdbe_id].add(run_time_s_float)
except KeyError:
self._query_latency_sketches[vdbe_id] = self._get_empty_sketch()
self._query_latency_sketches[vdbe_id].add(run_time_s_float)

# Extract and return the results, if any.
try:
Expand Down Expand Up @@ -431,17 +444,23 @@ async def _report_metrics_to_daemon(self) -> None:
self._config.front_end_metrics_reporting_period_seconds
)

report_data = []
for vdbe_id, sketch in self._query_latency_sketches.items():
report_data.append((vdbe_id, sketch))
query_p90 = sketch.get_quantile_value(0.9)
if query_p90 is not None:
logger.debug(
"VDBE %d Query latency p90 (s): %.4f", vdbe_id, query_p90
)
logger.info(
"Sending VDBE metrics report for %d VDBEs", len(report_data)
)

# If the input queue is full, we just drop this message.
metrics_report = VdbeMetricsReport.from_data(
self.NUMERIC_IDENTIFIER,
[(0, self._query_latency_sketch)],
self.NUMERIC_IDENTIFIER, report_data
)
self._output_queue.put_nowait(metrics_report)

query_p90 = self._query_latency_sketch.get_quantile_value(0.9)
if query_p90 is not None:
logger.debug("Query latency p90 (s): %.4f", query_p90)

self._reset_latency_sketches()

except Exception as ex:
Expand Down Expand Up @@ -551,9 +570,11 @@ async def _do_reestablish_connections(self) -> None:
self._reestablish_connections_task = None

def _reset_latency_sketches(self) -> None:
# TODO: Store per VDBE.
self._query_latency_sketches.clear()

def _get_empty_sketch(self) -> DDSketch:
sketch_rel_accuracy = 0.01
self._query_latency_sketch = DDSketch(relative_accuracy=sketch_rel_accuracy)
return DDSketch(relative_accuracy=sketch_rel_accuracy)


async def _orchestrate_shutdown(fe: BradVdbeFrontEnd) -> None:
Expand Down
7 changes: 6 additions & 1 deletion src/brad/vdbe/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import enum
from typing import List, Optional
from functools import cached_property
from typing import List, Optional, Set
from pydantic import BaseModel

from brad.config.engine import Engine
Expand Down Expand Up @@ -33,6 +34,10 @@ class VirtualEngine(BaseModel):
mapped_to: Engine
endpoint: Optional[str] = None

@cached_property
def table_names_set(self) -> Set[str]:
return {table.name for table in self.tables}


class VirtualInfrastructure(BaseModel):
schema_name: str
Expand Down
2 changes: 1 addition & 1 deletion ui/src/components/OverallInfraView.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ function OverallInfraView({
</ConfirmDialog>
<Snackbar
open={showVdbeChangeSuccess}
autoHideDuration={3000}
autoHideDuration={2000}
message="VDBE changes successfully saved."
onClose={handleSnackbarClose}
/>
Expand Down
33 changes: 16 additions & 17 deletions ui/src/components/PerfView.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,19 @@ function PerfView({ virtualInfra, showingPreview, showVdbeSpecificMetrics }) {
windowSizeMinutes: 10,
metrics: {},
});

if (displayMetricsData.windowSizeMinutes !== windowSizeMinutes) {
changeDisplayMetricsWindow(windowSizeMinutes);
}
const changeDisplayMetricsWindow = useCallback(
(windowSizeMinutes) => {
const metricsManager = getMetricsManager();
setDisplayMetricsData({
windowSizeMinutes,
metrics: metricsManager.getMetricsInWindow(
windowSizeMinutes,
/*extendForward=*/ true,
),
});
},
[getMetricsManager, setDisplayMetricsData],
);

const refreshMetrics = useCallback(async () => {
const rawMetrics = await fetchMetrics(60, /*useGenerated=*/ false);
Expand Down Expand Up @@ -101,19 +110,9 @@ function PerfView({ virtualInfra, showingPreview, showVdbeSpecificMetrics }) {
};
}, [refreshMetrics]);

const changeDisplayMetricsWindow = useCallback(
(windowSizeMinutes) => {
const metricsManager = getMetricsManager();
setDisplayMetricsData({
windowSizeMinutes,
metrics: metricsManager.getMetricsInWindow(
windowSizeMinutes,
/*extendForward=*/ true,
),
});
},
[getMetricsManager, setDisplayMetricsData],
);
if (displayMetricsData.windowSizeMinutes !== windowSizeMinutes) {
changeDisplayMetricsWindow(windowSizeMinutes);
}

const columnStyle = {
flexGrow: 2,
Expand Down
2 changes: 1 addition & 1 deletion ui/src/components/VdbeView.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ function VdbeView({
</div>
<Snackbar
open={showSnackbar}
autoHideDuration={3000}
autoHideDuration={2000}
message="Endpoint copied to clipboard"
onClose={handleClose}
/>
Expand Down

0 comments on commit 00cc7bc

Please sign in to comment.