Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data-3395: Add GetLatestTabularData #793

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/viam/app/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
Filter,
GetDatabaseConnectionRequest,
GetDatabaseConnectionResponse,
GetLatestTabularDataRequest,
GetLatestTabularDataResponse,
Order,
RemoveBinaryDataFromDatasetByIDsRequest,
RemoveBoundingBoxFromImageByIDRequest,
Expand Down Expand Up @@ -306,6 +308,40 @@ async def tabular_data_by_mql(self, organization_id: str, mql_binary: List[bytes
response: TabularDataByMQLResponse = await self._data_client.TabularDataByMQL(request, metadata=self._metadata)
return [bson.decode(bson_bytes) for bson_bytes in response.raw_data]

async def get_latest_tabular_data(self, part_id: str, resource_name: str, resource_subtype: str, method_name: str) -> Optional[Tuple[datetime,datetime, Dict[str, ValueTypes]]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def get_latest_tabular_data(self, part_id: str, resource_name: str, resource_subtype: str, method_name: str) -> Optional[Tuple[datetime,datetime, Dict[str, ValueTypes]]]:
async def get_latest_tabular_data(self, part_id: str, resource_name: str, resource_subtype: str, method_name: str) -> Optional[Tuple[datetime, datetime, Dict[str, ValueTypes]]]:

Just a simple whitespace thing

"""Gets the most recent tabular data captured from the specified data source, as long as it was synced within the last year.

::

time_captured, time_synced, payload = await data_client.get_latest_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add resource_subtype?

resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>"
)


Args:
part_id (str): The ID of the part that owns the data.
resource_name (str): The name of the requested resource that captured the data.
resource_subtype (str): The subtype of the requested resource that captured the data.
method_name (str): The data capture method name.

Returns:
Optional[Tuple[Dict[str, ValueTypes], datetime, datetime]: A tuple which is None data hasn't been synced yet for the data source, otherwise the tuple contains the following:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reorder this tuple to be the actual return order

datetime: The time captured,
datetime: The time synced,
Dict[str, ValueTypes]: The latest tabular data captured from the specified data source.
For more information, see `Data Client API <https://docs.viam.com/appendix/apis/data-client/>`_.
"""

request = GetLatestTabularDataRequest(part_id=part_id, resource_name=resource_name, resource_subtype=resource_subtype, method_name=method_name)
response: GetLatestTabularDataResponse = await self._data_client.GetLatestTabularData(request, metadata=self._metadata)
if not response.payload:
return None
return response.time_captured.ToDatetime(), response.time_synced.ToDatetime(), struct_to_dict(response.payload)


async def binary_data_by_filter(
self,
filter: Optional[Filter] = None,
Expand Down
2 changes: 1 addition & 1 deletion src/viam/version_metadata.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.34.0"

API_VERSION = "v0.1.361"
API_VERSION = "v0.1.366"
SDK_VERSION = __version__
14 changes: 14 additions & 0 deletions tests/mocks/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@
DeleteTabularDataResponse,
GetDatabaseConnectionRequest,
GetDatabaseConnectionResponse,
GetLatestTabularDataRequest,
GetLatestTabularDataResponse,
RemoveBinaryDataFromDatasetByIDsRequest,
RemoveBinaryDataFromDatasetByIDsResponse,
RemoveBoundingBoxFromImageByIDRequest,
Expand Down Expand Up @@ -1001,6 +1003,18 @@ async def TabularDataByMQL(self, stream: Stream[TabularDataByMQLRequest, Tabular
assert request is not None
await stream.send_message(TabularDataByMQLResponse(raw_data=[bson.encode(dict) for dict in self.tabular_query_response]))

async def GetLatestTabularData(self, stream: Stream[GetLatestTabularDataRequest, GetLatestTabularDataResponse]) -> None:
request = await stream.recv_message()
assert request is not None
self.part_id = request.part_id
self.resource_name = request.resource_name
self.resource_subtype = request.resource_subtype
self.method_name = request.method_name
timestamp = datetime_to_timestamp(datetime(2024, 12, 25))
data=dict_to_struct(self.tabular_response[0].data)
await stream.send_message(GetLatestTabularDataResponse(payload=data, time_captured=timestamp, time_synced=timestamp))


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this is one too many new lines added


class MockDataset(DatasetServiceBase):
def __init__(self, create_response: str, datasets_response: Sequence[Dataset]):
Expand Down
15 changes: 15 additions & 0 deletions tests/test_data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,21 @@ async def test_tabular_data_by_mql(self, service: MockData):
assert isinstance(response[0]["key1"], datetime)
assert response == TABULAR_QUERY_RESPONSE

async def test_get_latest_tabular_data(self, service: MockData):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
time = datetime(2024, 12, 25)
response = await client.get_latest_tabular_data(PART_ID, COMPONENT_NAME, COMPONENT_TYPE, METHOD)
assert response != None
time_captured, time_synced, payload = response
assert service.part_id == PART_ID
assert service.resource_name == COMPONENT_NAME
assert service.resource_subtype == COMPONENT_TYPE
assert service.method_name == METHOD
assert payload == TABULAR_DATA
assert time_captured == time
assert time_synced == time

async def test_binary_data_by_filter(self, service: MockData):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
Expand Down
Loading