Skip to content

Commit d126d9b

Browse files
authored
RSDK-3634 - Data wrappers (#337)
1 parent 016b4d8 commit d126d9b

File tree

1 file changed

+150
-13
lines changed

1 file changed

+150
-13
lines changed

src/viam/app/data/client.py

Lines changed: 150 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,24 @@
33
from grpclib.client import Channel
44

55
from viam import logging
6-
from viam.proto.app.data import DataServiceStub, Filter
6+
from viam.proto.app.data import (
7+
BinaryDataByFilterRequest,
8+
BinaryDataByFilterResponse,
9+
BinaryDataByIDsRequest,
10+
BinaryDataByIDsResponse,
11+
BinaryID,
12+
DataServiceStub,
13+
DataRequest,
14+
DeleteBinaryDataByIDsRequest,
15+
DeleteBinaryDataByIDsResponse,
16+
DeleteBinaryDataByFilterRequest,
17+
DeleteBinaryDataByFilterResponse,
18+
DeleteTabularDataByFilterRequest,
19+
DeleteTabularDataByFilterResponse,
20+
Filter,
21+
TabularDataByFilterRequest,
22+
TabularDataByFilterResponse,
23+
)
724
from viam.proto.app.datasync import DataSyncServiceStub, FileData, SensorData, UploadMetadata
825

926
LOGGER = logging.getLogger(__name__)
@@ -31,31 +48,151 @@ def __init__(self, channel: Channel, metadata: str):
3148
_data_sync_client: DataSyncServiceStub
3249
_metadata: str
3350

34-
async def tabular_data_by_filter(self, filter: Optional[Filter], dest: Optional[str]) -> List[Mapping[str, Any]]:
35-
raise NotImplementedError()
51+
async def tabular_data_by_filter(
52+
self,
53+
filter: Optional[Filter] = None,
54+
dest: Optional[str] = None,
55+
) -> List[Mapping[str, Any]]:
56+
"""Filter and download tabular data
3657
37-
async def binary_data_by_filter(self, data_request: Optional[Filter], dest: Optional[str]) -> List[bytes]:
38-
raise NotImplementedError()
58+
Args:
59+
filter (viam.app.data.Filter): When supplied, the tabular data will be
60+
filtered based on the provided constraints. Otherwise, all data is returned.
61+
dest (str): When supplied, the tabular data will be saved to the provided file path.
3962
40-
async def binary_data_by_ids(self, file_ids: Optional[List[str]]) -> List[bytes]:
41-
raise NotImplementedError()
63+
Returns:
64+
List[Mapping[str, Any]]: A list of tabular data
65+
"""
66+
filter = filter if filter else Filter()
67+
last = ""
68+
data = []
69+
70+
# `DataRequest`s are limited to 100 pieces of data, so we loop through calls until
71+
# we are certain we've received everything.
72+
while True:
73+
data_request = DataRequest(filter=filter, limit=100, last=last)
74+
request = TabularDataByFilterRequest(data_request=data_request, count_only=False)
75+
response: TabularDataByFilterResponse = await self._data_client.TabularDataByFilter(request, metadata=self._metadata)
76+
if not response.data or len(response.data) == 0:
77+
break
78+
data += [struct.data for struct in response.data]
79+
last = response.last
80+
81+
if dest:
82+
try:
83+
file = open(dest, 'w')
84+
file.write(f"{data}")
85+
except Exception as e:
86+
LOGGER.error(f"Failed to write tabular data to file {dest}", exc_info=e)
87+
return data
88+
89+
async def binary_data_by_filter(
90+
self,
91+
filter: Optional[Filter] = None,
92+
dest: Optional[str] = None,
93+
) -> List[bytes]:
94+
"""Filter and download binary data
95+
96+
Args:
97+
filter (viam.app.data.Filter): When supplied, the binary data will be
98+
filtered based on the provided constraints. Otherwise, all data is returned.
99+
dest (str): When supplied, the binary data will be saved to the provided file path
100+
101+
Returns:
102+
List[bytes]: The binary data
103+
"""
104+
filter = filter if filter else Filter()
105+
last = ""
106+
data = []
107+
108+
# `DataRequest`s are limited to 100 pieces of data, so we loop through calls until
109+
# we are certain we've received everything.
110+
while True:
111+
data_request = DataRequest(filter=filter, limit=100, last=last)
112+
request = BinaryDataByFilterRequest(data_request=data_request, count_only=False)
113+
response: BinaryDataByFilterResponse = await self._data_client.BinaryDataByFilter(request, metadata=self._metadata)
114+
if not response.data or len(response.data) == 0:
115+
break
116+
data += list(response.data)
117+
last = response.last
118+
119+
if dest:
120+
try:
121+
file = open(dest, 'w')
122+
file.write(f"{data}")
123+
except Exception as e:
124+
LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e)
125+
126+
return data
127+
128+
async def binary_data_by_ids(
129+
self,
130+
binary_ids: List[BinaryID],
131+
dest: Optional[str] = None,
132+
) -> List[bytes]:
133+
"""Filter and download binary data
134+
135+
Args:
136+
binary_ids (List[viam.proto.app.BinaryID]): IDs of the desired data. Must be non-empty.
137+
dest (str): When supplied, the binary data will be saved to the provided file path.
138+
139+
Returns:
140+
List[bytes]: The binary data.
141+
142+
Raises:
143+
GRPCError: if no binary_ids are provided.
144+
"""
145+
request = BinaryDataByIDsRequest(binary_ids=binary_ids, include_binary=True)
146+
response: BinaryDataByIDsResponse = await self._data_client.BinaryDataByIDs(request, metadata=self._metadata)
147+
if dest:
148+
try:
149+
file = open(dest, 'w')
150+
file.write(f"{response.data}")
151+
except Exception as e:
152+
LOGGER.error(f"Failed to write binary data to file {dest}", exc_info=e)
153+
return response.data
42154

43155
async def delete_tabular_data_by_filter(self, filter: Optional[Filter]) -> None:
44-
raise NotImplementedError()
156+
"""Delete tabular data
157+
158+
Args:
159+
filter (viam.app.data.Filter): When supplied, the tabular data to delete will be filtered based on the provided constraints.
160+
If not provided, all data will be deleted. Exercise caution before using this option.
161+
"""
162+
filter = filter if filter else Filter()
163+
request = DeleteTabularDataByFilterRequest(filter=filter)
164+
_: DeleteTabularDataByFilterResponse = await self._data_client.DeleteTabularDataByFilter(request, metadata=self._metadata)
45165

46166
async def delete_binary_data_by_filter(self, filter: Optional[Filter]) -> None:
47-
raise NotImplementedError()
167+
"""Delete binary data
48168
49-
async def delete_binary_data_by_ids(self, file_ids: Optional[List[str]]) -> None:
50-
raise NotImplementedError()
169+
Args:
170+
filter (viam.app.data.Filter): When supplied, the binary data to delete will be filtered based on the provided constraints.
171+
If not provided, all data will be deleted. Exercise caution before using this option.
172+
"""
173+
filter = filter if filter else Filter()
174+
request = DeleteBinaryDataByFilterRequest(filter=filter)
175+
_: DeleteBinaryDataByFilterResponse = await self._data_client.DeleteBinaryDataByFilter(request, metadata=self._metadata)
176+
177+
async def delete_binary_data_by_ids(self, binary_ids: List[BinaryID]) -> None:
178+
"""Delete binary data
179+
180+
Args:
181+
binary_ids (List[viam.proto.app.BinaryID]): The binary IDs of the data to be deleted. Must be non-empty.
182+
183+
Raises:
184+
GRPCError: if no binary_ids are provided.
185+
"""
186+
request = DeleteBinaryDataByIDsRequest(binary_ids=binary_ids)
187+
_: DeleteBinaryDataByIDsResponse = await self._data_client.DeleteBinaryDataByIDs(request, metadata=self._metadata)
51188

52-
async def add_tags_to_binary_data_by_file_ids(self, file_ids: Optional[List[str]], tags: Optional[List[str]]) -> None:
189+
async def add_tags_to_binary_data_by_binary_ids(self, binary_ids: Optional[List[str]], tags: Optional[List[str]]) -> None:
53190
raise NotImplementedError()
54191

55192
async def add_tags_to_binary_data_by_filter(self, filter: Optional[Filter], tags: Optional[List[str]]) -> None:
56193
raise NotImplementedError()
57194

58-
async def remove_tags_from_binary_data_by_file_ids(self, file_ids: Optional[List[str]], tags: Optional[List[str]]) -> None:
195+
async def remove_tags_from_binary_data_by_binary_ids(self, binary_ids: Optional[List[str]], tags: Optional[List[str]]) -> None:
59196
raise NotImplementedError()
60197

61198
async def remove_tags_from_binary_data_by_filter(self, filter: Optional[Filter], tags: Optional[List[str]]) -> None:

0 commit comments

Comments
 (0)