Skip to content

Commit

Permalink
Expose VDBE endpoints for connections, hook into API (#520)
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy authored Jan 26, 2025
1 parent 5a0476a commit 30d5f61
Show file tree
Hide file tree
Showing 12 changed files with 1,084 additions and 35 deletions.
1 change: 1 addition & 0 deletions config/system_config_demo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ std_datasets:
path: workloads/IMDB_100GB/adhoc_test/

bootstrap_vdbe_path: config/vdbe_demo/imdb_extended_vdbes.json
# bootstrap_vdbe_path: config/vdbe_demo/imdb_editable_vdbes.json

aurora_max_query_factor: 4.0
aurora_max_query_factor_replace: 10000.0
Expand Down
8 changes: 8 additions & 0 deletions src/brad/connection/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ async def connect_to(
if config.stub_mode_path() is not None:
return cls.connect_to_stub(config)

# HACK: Schema aliasing for convenience.
if schema_name is not None and schema_name == "imdb_editable_100g":
schema_name = "imdb_extended_100g"

connection_details = config.get_connection_details(engine)
if engine == Engine.Redshift:
cluster = directory.redshift_cluster()
Expand Down Expand Up @@ -153,6 +157,10 @@ async def connect_to_sidecar(
if config.stub_mode_path() is not None:
return cls.connect_to_stub(config)

# HACK: Schema aliasing for convenience.
if schema_name is not None and schema_name == "imdb_editable_100g":
schema_name = "imdb_extended_100g"

connection_details = config.get_sidecar_db_details()
if (
_USE_PSYCOPG_KEY in connection_details
Expand Down
166 changes: 162 additions & 4 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
InternalCommandResponse,
NewBlueprint,
NewBlueprintAck,
ReconcileVirtualInfrastructure,
ReconcileVirtualInfrastructureAck,
)
from brad.daemon.monitor import Monitor
from brad.daemon.system_event_logger import SystemEventLogger
Expand All @@ -37,7 +39,8 @@
from brad.data_stats.postgres_estimator import PostgresEstimator
from brad.data_stats.stub_estimator import StubEstimator
from brad.data_sync.execution.executor import DataSyncExecutor
from brad.front_end.start_front_end import start_front_end
from brad.front_end.start_front_end import start_front_end, start_vdbe_front_end
from brad.front_end.vdbe.vdbe_front_end import BradVdbeFrontEnd
from brad.planner.abstract import BlueprintPlanner
from brad.planner.compare.provider import (
BlueprintComparatorProvider,
Expand Down Expand Up @@ -69,8 +72,10 @@
from brad.routing.tree_based.forest_policy import ForestPolicy
from brad.row_list import RowList
from brad.utils.time_periods import period_start, universal_now
from brad.utils.mailbox import Mailbox
from brad.ui.manager import UiManager
from brad.vdbe.manager import VdbeManager
from brad.vdbe.models import VirtualInfrastructure

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -134,9 +139,11 @@ def __init__(
self._vdbe_manager: Optional[VdbeManager] = VdbeManager.load_from(
load_vdbe_path,
starting_port=9876,
apply_infra=self._apply_virtual_infra,
)
else:
self._vdbe_manager = None
self._vdbe_process: Optional[_VdbeFrontEndProcess] = None

# This is used to hold references to internal command tasks we create.
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
Expand Down Expand Up @@ -343,6 +350,31 @@ async def _run_setup(self) -> None:
for fe in self._front_ends:
fe.process.start()

if self._vdbe_manager is not None:
v_input_queue = self._process_manager.Queue()
v_output_queue = self._process_manager.Queue()
process = mp.Process(
target=start_vdbe_front_end,
args=(
self._config,
self._schema_name,
self._path_to_system_config,
self._debug_mode,
self._blueprint_mgr.get_directory(),
self._vdbe_manager.infra(),
v_input_queue,
v_output_queue,
),
)
self._vdbe_process = _VdbeFrontEndProcess(
process, v_input_queue, v_output_queue
)
reader_task = asyncio.create_task(
self._read_vdbe_messages(self._vdbe_process)
)
self._vdbe_process.message_reader_task = reader_task
self._vdbe_process.process.start()

if (
self._config.routing_policy == RoutingPolicy.ForestTableSelectivity
or self._config.routing_policy == RoutingPolicy.Default
Expand Down Expand Up @@ -375,6 +407,15 @@ async def _run_teardown(self) -> None:
if fe.message_reader_task is not None:
fe.output_queue.put(Sentinel(fe_index))

if self._vdbe_process is not None:
logger.info("Telling the VDBE front end to shut down...")
self._vdbe_process.input_queue.put(
ShutdownFrontEnd(BradVdbeFrontEnd.NUMERIC_IDENTIFIER)
)
self._vdbe_process.output_queue.put(
Sentinel(BradVdbeFrontEnd.NUMERIC_IDENTIFIER)
)

if self._timed_sync_task is not None:
self._timed_sync_task.cancel()
self._timed_sync_task = None
Expand All @@ -394,6 +435,11 @@ async def _run_teardown(self) -> None:
fe.process.join()
self._front_ends.clear()

if self._vdbe_process is not None:
logger.info("Waiting for the VDBE front end to shut down...")
self._vdbe_process.process.join()
self._vdbe_process = None

async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None:
"""
Waits for messages from the specified front end process and processes them.
Expand Down Expand Up @@ -465,6 +511,83 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None:
front_end.fe_index,
)

async def _read_vdbe_messages(self, vdbe_process: "_VdbeFrontEndProcess") -> None:
loop = asyncio.get_running_loop()
while True:
try:
message = await loop.run_in_executor(
None, vdbe_process.output_queue.get
)
if message.fe_index != BradVdbeFrontEnd.NUMERIC_IDENTIFIER:
logger.warning(
"Received message with invalid front end index. Expected %d. Received %d.",
BradVdbeFrontEnd.NUMERIC_IDENTIFIER,
message.fe_index,
)
continue

if isinstance(message, NewBlueprintAck):
if self._transition_orchestrator is None:
logger.error(
"Received blueprint ack message but no transition is in progress. Version: %d, Front end: %d",
message.version,
message.fe_index,
)
continue

# Sanity check.
next_version = self._transition_orchestrator.next_version()
if next_version != message.version:
logger.error(
"Received a blueprint ack for a mismatched version. Received %d, Expected %d",
message.version,
next_version,
)
continue

logger.info(
"Received blueprint ack. Version: %d, Front end: %d",
message.version,
message.fe_index,
)

self._transition_orchestrator.decrement_waiting_for_front_ends()
if self._transition_orchestrator.waiting_for_front_ends() == 0:
# Schedule the second half of the transition.
self._transition_task = asyncio.create_task(
self._run_transition_part_two()
)

elif isinstance(message, ReconcileVirtualInfrastructureAck):
logger.info(
"Received reconcile ack from VDBE front end. Added %d, Removed %d",
message.num_added,
message.num_removed,
)
vdbe_process.mailbox.on_new_message((None,))

else:
logger.debug(
"Received unexpected message from front end %d: %s",
BradVdbeFrontEnd.NUMERIC_IDENTIFIER,
str(message),
)

except Exception as ex:
if not isinstance(ex, asyncio.CancelledError):
logger.exception(
"Unexpected error when handling front end message. Front end: %d",
BradVdbeFrontEnd.NUMERIC_IDENTIFIER,
)

async def _apply_virtual_infra(self, virtual_infra: VirtualInfrastructure) -> None:
"""
Used by the VDBE manager to apply a change to the virtual infrastructure
on the VDBE front end. This returns after the change is applied.
"""
assert self._vdbe_process is not None
await self._vdbe_process.mailbox.send_recv(virtual_infra)

async def _handle_new_blueprint(
self, blueprint: Blueprint, score: Score, trigger: Optional[Trigger]
) -> None:
Expand Down Expand Up @@ -857,9 +980,6 @@ def update_monitor_sources():
"Notifying %d front ends about the new blueprint.",
len(self._front_ends),
)
self._transition_orchestrator.set_waiting_for_front_ends(
len(self._front_ends)
)
for fe in self._front_ends:
fe.input_queue.put(
NewBlueprint(
Expand All @@ -869,6 +989,18 @@ def update_monitor_sources():
)
)

total_wait = len(self._front_ends)
if self._vdbe_process is not None:
self._vdbe_process.input_queue.put(
NewBlueprint(
BradVdbeFrontEnd.NUMERIC_IDENTIFIER,
tm.next_version,
self._blueprint_mgr.get_directory(),
)
)
total_wait += 1

self._transition_orchestrator.set_waiting_for_front_ends(total_wait)
self._transition_task = None

# We finish the transition after all front ends acknowledge that they
Expand Down Expand Up @@ -999,3 +1131,29 @@ def __init__(
self.input_queue = input_queue
self.output_queue = output_queue
self.message_reader_task: Optional[asyncio.Task] = None


class _VdbeFrontEndProcess:
"""
Used to manage state associated with the VDBE front end process.
"""

def __init__(
self,
process: mp.Process,
input_queue: queue.Queue,
output_queue: queue.Queue,
) -> None:
self.process = process
self.input_queue = input_queue
self.output_queue = output_queue
self.message_reader_task: Optional[asyncio.Task] = None
self.mailbox: Mailbox[VirtualInfrastructure, Tuple] = Mailbox(
do_send_msg=self._send_message
)

async def _send_message(self, infra: VirtualInfrastructure) -> None:
logger.debug("Sending reconcile VDBE IPC message")
msg = ReconcileVirtualInfrastructure(BradVdbeFrontEnd.NUMERIC_IDENTIFIER, infra)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.input_queue.put, msg)
57 changes: 57 additions & 0 deletions src/brad/daemon/messages.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Tuple, List
from ddsketch import DDSketch
from ddsketch.pb.proto import DDSketchProto, pb as ddspb

from brad.provisioning.directory import Directory
from brad.row_list import RowList
from brad.vdbe.models import VirtualInfrastructure


class IpcMessage:
Expand Down Expand Up @@ -88,6 +90,40 @@ def query_latency_sketch(self) -> DDSketch:
return DDSketchProto.from_proto(pb_sketch)


class VdbeMetricsReport(IpcMessage):
"""
Sent from the VDBE front end to the daemon to report BRAD's client-side metrics.
"""

@classmethod
def from_data(
cls,
fe_index: int,
latency_sketches: List[Tuple[int, DDSketch]],
) -> "VdbeMetricsReport":
serialized_sketches = [
(vdbe_id, DDSketchProto.to_proto(sketch).SerializeToString())
for vdbe_id, sketch in latency_sketches
]
return cls(fe_index, latency_sketches=serialized_sketches)

def __init__(
self,
fe_index: int,
latency_sketches: List[Tuple[int, bytes]],
) -> None:
super().__init__(fe_index)
self.serialized_latency_sketches = latency_sketches

def query_latency_sketches(self) -> List[Tuple[int, DDSketch]]:
results = []
for vdbe_id, serialized_sketch in self.serialized_latency_sketches:
pb_sketch = ddspb.DDSketch()
pb_sketch.ParseFromString(serialized_sketch)
results.append((vdbe_id, DDSketchProto.from_proto(pb_sketch)))
return results


class InternalCommandRequest(IpcMessage):
"""
Sent from the front end to the daemon to handle an internal command.
Expand All @@ -108,6 +144,27 @@ def __init__(self, fe_index: int, response: RowList) -> None:
self.response = response


class ReconcileVirtualInfrastructure(IpcMessage):
"""
Sent from the daemon to the VDBE front end to update its virtual infrastructure.
"""

def __init__(self, fe_index: int, virtual_infra: VirtualInfrastructure) -> None:
super().__init__(fe_index)
self.virtual_infra = virtual_infra


class ReconcileVirtualInfrastructureAck(IpcMessage):
"""
Sent from the VDBE front end back to the daemon to acknowledge the virtual infrastructure update.
"""

def __init__(self, fe_index: int, num_added: int, num_removed: int) -> None:
super().__init__(fe_index)
self.num_added = num_added
self.num_removed = num_removed


class ShutdownFrontEnd(IpcMessage):
"""
Sent from the daemon to the front end indicating that it should shut down.
Expand Down
Loading

0 comments on commit 30d5f61

Please sign in to comment.