Skip to content

Commit

Permalink
Add client methods for waiting for data to land in ConsDB
Browse files Browse the repository at this point in the history
  • Loading branch information
mfisherlevine committed Oct 3, 2024
1 parent f901ca4 commit a83fd44
Showing 1 changed file with 125 additions and 0 deletions.
125 changes: 125 additions & 0 deletions python/lsst/summit/utils/consdbClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import os
import time
from dataclasses import dataclass
from typing import Any
from urllib.parse import quote, urlparse
Expand Down Expand Up @@ -607,6 +608,130 @@ def query(self, query: str) -> Table:
return Table(rows=[])
return Table(rows=result["data"], names=result["columns"])

def wait_for_row_to_exist(self, query: str, timeout: float, poll_frequency_hz: float = 2) -> Table:
"""Returns a row once it exists, or an empty table if it times out.
The supplied ``query`` must be expected to return exactly a single row,
(once it exists), e.g. it should be something like
'select * from cdb_latiss.exposure where exposure_id = 2024100200541'
or similar. If the query were like
'select * from cdb_latiss.exposure where exposure_id in (2024100200541,
2024100200542)', then the query would return multiple rows and an error
would be raised. The user is expected to check that the query meets
this criterion, because if 2024100200541 existed but 2024100200542 was
about to be created the error would not be raised, and downstream
beaviour would be undefined.
Parameters
----------
query : `str`
A SQL query (currently) to the database.
timeout : `float`
Maximum time to wait for a non-empty result, in seconds.
poll_frequency_hz : `float`, optional
Frequency to poll the database for results, in Hz.
Returns
-------
result : `Table`
An ``astropy.Table`` containing the query results, or an empty
if the row was not inserted before the timeout.
Raises
------
requests.RequestException
Raised if any kind of connection error occurs.
requests.HTTPError
Raised if a non-successful status is returned.
ValueError
Raised if the query returns more than one row.
"""
sleep_duration = 1 / poll_frequency_hz
t0 = time.time()
while time.time() - t0 < timeout:
result = self.query(query)
if len(result) > 1:
raise ValueError(f"Query {query} returned more than one row")
elif len(result) == 1:
return result
time.sleep(sleep_duration)

logger.warning(f"Query {query} did not return any results within {timeout}s")
return Table(rows=[])

def wait_for_item_in_row(
self, query: str, item: str, timeout: float, poll_frequency_hz: float = 2
) -> Table:
"""Returns the value of an item in a row once it exists, or ``None``
if it times out.
If the item is not in the schema of the table, an error will be raised.
The supplied ``query`` must be expected to return exactly a single row,
(once it exists), e.g. it should be something like
'select * from cdb_latiss.exposure where exposure_id = 2024100200541'
or similar. If the query were like
'select * from cdb_latiss.exposure where exposure_id in (2024100200541,
2024100200542)', then the query would return multiple rows and an error
would be raised. The user is expected to check that the query meets
this criterion, because if 2024100200541 existed but 2024100200542 was
about to be created the error would not be raised, and downstream
beaviour would be undefined.
Parameters
----------
query : `str`
A SQL query (currently) to the database.
item : `str`
The item to check for in the query results.
timeout : `float`
Maximum time to wait for a non-empty result, in seconds.
poll_frequency_hz : `float`, optional
Frequency to poll the database for results, in Hz.
Returns
-------
value : `Any`
The corresponding value of the item in the row in the table, or
``None`` if the item was not found before the timeout.
Raises
------
requests.RequestException
Raised if any kind of connection error occurs.
requests.HTTPError
Raised if a non-successful status is returned.
ValueError
Raised if the query returns more than one row, or if the requested
item is not in the schema of the table.
"""

row = self.wait_for_row_to_exist(query, timeout, poll_frequency_hz)
if len(row) == 0:
# wait_for_row_to_exist already logged a warning if table is empty
return None

# we know the row now exists but the required item may not be there yet
sleep_duration = 1 / poll_frequency_hz
t0 = time.time()
while time.time() - t0 < timeout:
result = self.query(query)
if len(result) > 1:
raise ValueError(f"Query {query} returned more than one row")
assert len(result) == 1, "Somehow no rows came back, which should be impossible"
row = result[0]
if item not in row.columns:
raise ValueError(f"Query {query} did not return a column named {item} - check the schema")
value = result[0][item]
if value is not None:
return value
time.sleep(sleep_duration)

logger.warning(
f"The row returned by {query} did not end up containing a value for {item} within {timeout}s"
)
return None

def schema(
self, instrument: str | None = None, table: str | None = None
) -> dict[str, tuple[str, str]] | list[str]:
Expand Down

0 comments on commit a83fd44

Please sign in to comment.