diff --git a/python/lsst/summit/utils/consdbClient.py b/python/lsst/summit/utils/consdbClient.py index 76d9ad9a..659b2b6a 100644 --- a/python/lsst/summit/utils/consdbClient.py +++ b/python/lsst/summit/utils/consdbClient.py @@ -21,6 +21,7 @@ import logging import os +import time from dataclasses import dataclass from typing import Any from urllib.parse import quote, urlparse @@ -607,6 +608,130 @@ def query(self, query: str) -> Table: return Table(rows=[]) return Table(rows=result["data"], names=result["columns"]) + def wait_for_row_to_exist(self, query: str, timeout: float, poll_frequency_hz: float = 2) -> Table: + """Returns a row once it exists, or an empty table if it times out. + + The supplied ``query`` must be expected to return exactly a single row, + (once it exists), e.g. it should be something like + 'select * from cdb_latiss.exposure where exposure_id = 2024100200541' + or similar. If the query were like + 'select * from cdb_latiss.exposure where exposure_id in (2024100200541, + 2024100200542)', then the query would return multiple rows and an error + would be raised. The user is expected to check that the query meets + this criterion, because if 2024100200541 existed but 2024100200542 was + about to be created the error would not be raised, and downstream + beaviour would be undefined. + + Parameters + ---------- + query : `str` + A SQL query (currently) to the database. + timeout : `float` + Maximum time to wait for a non-empty result, in seconds. + poll_frequency_hz : `float`, optional + Frequency to poll the database for results, in Hz. + + Returns + ------- + result : `Table` + An ``astropy.Table`` containing the query results, or an empty + if the row was not inserted before the timeout. + + Raises + ------ + requests.RequestException + Raised if any kind of connection error occurs. + requests.HTTPError + Raised if a non-successful status is returned. + ValueError + Raised if the query returns more than one row. + """ + sleep_duration = 1 / poll_frequency_hz + t0 = time.time() + while time.time() - t0 < timeout: + result = self.query(query) + if len(result) > 1: + raise ValueError(f"Query {query} returned more than one row") + elif len(result) == 1: + return result + time.sleep(sleep_duration) + + logger.warning(f"Query {query} did not return any results within {timeout}s") + return Table(rows=[]) + + def wait_for_item_in_row( + self, query: str, item: str, timeout: float, poll_frequency_hz: float = 2 + ) -> Table: + """Returns the value of an item in a row once it exists, or ``None`` + if it times out. + + If the item is not in the schema of the table, an error will be raised. + + The supplied ``query`` must be expected to return exactly a single row, + (once it exists), e.g. it should be something like + 'select * from cdb_latiss.exposure where exposure_id = 2024100200541' + or similar. If the query were like + 'select * from cdb_latiss.exposure where exposure_id in (2024100200541, + 2024100200542)', then the query would return multiple rows and an error + would be raised. The user is expected to check that the query meets + this criterion, because if 2024100200541 existed but 2024100200542 was + about to be created the error would not be raised, and downstream + beaviour would be undefined. + + Parameters + ---------- + query : `str` + A SQL query (currently) to the database. + item : `str` + The item to check for in the query results. + timeout : `float` + Maximum time to wait for a non-empty result, in seconds. + poll_frequency_hz : `float`, optional + Frequency to poll the database for results, in Hz. + + Returns + ------- + value : `Any` + The corresponding value of the item in the row in the table, or + ``None`` if the item was not found before the timeout. + + Raises + ------ + requests.RequestException + Raised if any kind of connection error occurs. + requests.HTTPError + Raised if a non-successful status is returned. + ValueError + Raised if the query returns more than one row, or if the requested + item is not in the schema of the table. + """ + + row = self.wait_for_row_to_exist(query, timeout, poll_frequency_hz) + if len(row) == 0: + # wait_for_row_to_exist already logged a warning if table is empty + return None + + # we know the row now exists but the required item may not be there yet + sleep_duration = 1 / poll_frequency_hz + t0 = time.time() + while time.time() - t0 < timeout: + result = self.query(query) + if len(result) > 1: + raise ValueError(f"Query {query} returned more than one row") + assert len(result) == 1, "Somehow no rows came back, which should be impossible" + row = result[0] + if item not in row.columns: + raise ValueError(f"Query {query} did not return a column named {item} - check the schema") + value = result[0][item] + if value is not None: + return value + time.sleep(sleep_duration) + + logger.warning( + f"The row returned by {query} did not end up containing a value for {item} within {timeout}s" + ) + return None + def schema( self, instrument: str | None = None, table: str | None = None ) -> dict[str, tuple[str, str]] | list[str]: