Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-46527: Add client methods for waiting for data to land in ConsDB #122

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions python/lsst/summit/utils/consdbClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import os
import time
from dataclasses import dataclass
from typing import Any
from urllib.parse import quote, urlparse
Expand Down Expand Up @@ -607,6 +608,130 @@ def query(self, query: str) -> Table:
return Table(rows=[])
return Table(rows=result["data"], names=result["columns"])

def wait_for_row_to_exist(self, query: str, timeout: float, poll_frequency_hz: float = 2) -> Table:
"""Returns a row once it exists, or an empty table if it times out.
The supplied ``query`` must be expected to return exactly a single row,
(once it exists), e.g. it should be something like
'select * from cdb_latiss.exposure where exposure_id = 2024100200541'
or similar. If the query were like
'select * from cdb_latiss.exposure where exposure_id in (2024100200541,
2024100200542)', then the query would return multiple rows and an error
would be raised. The user is expected to check that the query meets
this criterion, because if 2024100200541 existed but 2024100200542 was
about to be created the error would not be raised, and downstream
beaviour would be undefined.
Parameters
----------
query : `str`
A SQL query (currently) to the database.
timeout : `float`
Maximum time to wait for a non-empty result, in seconds.
poll_frequency_hz : `float`, optional
Frequency to poll the database for results, in Hz.
Returns
-------
result : `Table`
An ``astropy.Table`` containing the query results, or an empty
if the row was not inserted before the timeout.
Raises
------
requests.RequestException
Raised if any kind of connection error occurs.
requests.HTTPError
Raised if a non-successful status is returned.
ValueError
Raised if the query returns more than one row.
"""
sleep_duration = 1 / poll_frequency_hz
t0 = time.time()
while time.time() - t0 < timeout:
result = self.query(query)
if len(result) > 1:
raise ValueError(f"Query {query} returned more than one row")
elif len(result) == 1:
return result
time.sleep(sleep_duration)

logger.warning(f"Query {query} did not return any results within {timeout}s")
return Table(rows=[])

def wait_for_item_in_row(
self, query: str, item: str, timeout: float, poll_frequency_hz: float = 2
) -> Table:
"""Returns the value of an item in a row once it exists, or ``None``
if it times out.
If the item is not in the schema of the table, an error will be raised.
The supplied ``query`` must be expected to return exactly a single row,
(once it exists), e.g. it should be something like
'select * from cdb_latiss.exposure where exposure_id = 2024100200541'
or similar. If the query were like
'select * from cdb_latiss.exposure where exposure_id in (2024100200541,
2024100200542)', then the query would return multiple rows and an error
would be raised. The user is expected to check that the query meets
this criterion, because if 2024100200541 existed but 2024100200542 was
about to be created the error would not be raised, and downstream
beaviour would be undefined.
Parameters
----------
query : `str`
A SQL query (currently) to the database.
item : `str`
The item to check for in the query results.
timeout : `float`
Maximum time to wait for a non-empty result, in seconds.
poll_frequency_hz : `float`, optional
Frequency to poll the database for results, in Hz.
Returns
-------
value : `Any`
The corresponding value of the item in the row in the table, or
``None`` if the item was not found before the timeout.
Raises
------
requests.RequestException
Raised if any kind of connection error occurs.
requests.HTTPError
Raised if a non-successful status is returned.
ValueError
Raised if the query returns more than one row, or if the requested
item is not in the schema of the table.
"""

row = self.wait_for_row_to_exist(query, timeout, poll_frequency_hz)
if len(row) == 0:
# wait_for_row_to_exist already logged a warning if table is empty
return None

# we know the row now exists but the required item may not be there yet
sleep_duration = 1 / poll_frequency_hz
t0 = time.time()
while time.time() - t0 < timeout:
result = self.query(query)
if len(result) > 1:
raise ValueError(f"Query {query} returned more than one row")
assert len(result) == 1, "Somehow no rows came back, which should be impossible"
row = result[0]
if item not in row.columns:
raise ValueError(f"Query {query} did not return a column named {item} - check the schema")
value = result[0][item]
if value is not None:
return value
time.sleep(sleep_duration)

logger.warning(
f"The row returned by {query} did not end up containing a value for {item} within {timeout}s"
)
return None

def schema(
self, instrument: str | None = None, table: str | None = None
) -> dict[str, tuple[str, str]] | list[str]:
Expand Down
Loading