-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
InstanceId TimeSeries #1825
InstanceId TimeSeries #1825
Changes from 19 commits
5b2a7ea
07eca7e
586c5d3
717ce90
b9adc58
f81916e
ef0e1ed
c15ab73
a2510b0
c5bcd98
2577642
6912c1b
8f7940a
e13f8ae
2ef1547
2af865f
2545a51
4589c0d
859607b
42213cc
33d9c60
28c6c98
824900f
caf2863
613096b
0e47249
8fe3752
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,3 +59,4 @@ playground.py | |
scripts/tmp/ | ||
my_file.txt | ||
.venv/ | ||
alpha.env |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,8 @@ | |
unpack_items_in_payload, | ||
) | ||
from cognite.client.utils._concurrency import ConcurrencySettings, execute_tasks | ||
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore | ||
from cognite.client.utils._experimental import FeaturePreviewWarning | ||
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore, InstanceId | ||
from cognite.client.utils._importing import import_as_completed, local_import | ||
from cognite.client.utils._time import ( | ||
ZoneInfo, | ||
|
@@ -89,18 +90,20 @@ | |
_TResLst = TypeVar("_TResLst", DatapointsList, DatapointsArrayList) | ||
|
||
|
||
def select_dps_fetch_strategy(dps_client: DatapointsAPI, full_query: _FullDatapointsQuery) -> DpsFetchStrategy: | ||
def select_dps_fetch_strategy( | ||
dps_client: DatapointsAPI, full_query: _FullDatapointsQuery, cdf_version: str | None = None | ||
) -> DpsFetchStrategy: | ||
all_queries = full_query.parse_into_queries() | ||
full_query.validate(all_queries, dps_limit_raw=dps_client._DPS_LIMIT_RAW, dps_limit_agg=dps_client._DPS_LIMIT_AGG) | ||
agg_queries, raw_queries = split_queries_into_raw_and_aggs(all_queries) | ||
|
||
# Running mode is decided based on how many time series are requested VS. number of workers: | ||
if len(all_queries) <= (max_workers := dps_client._config.max_workers): | ||
# Start shooting requests from the hip immediately: | ||
return EagerDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers) | ||
return EagerDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers, cdf_version) | ||
# Fetch a smaller, chunked batch of dps from all time series - which allows us to do some rudimentary | ||
# guesstimation of dps density - then chunk away: | ||
return ChunkingDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers) | ||
return ChunkingDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers, cdf_version) | ||
|
||
|
||
def split_queries_into_raw_and_aggs(all_queries: _TSQueryList) -> tuple[_TSQueryList, _TSQueryList]: | ||
|
@@ -118,13 +121,15 @@ def __init__( | |
agg_queries: _TSQueryList, | ||
raw_queries: _TSQueryList, | ||
max_workers: int, | ||
cdf_version: str | None = None, | ||
) -> None: | ||
self.dps_client = dps_client | ||
self.all_queries = all_queries | ||
self.agg_queries = agg_queries | ||
self.raw_queries = raw_queries | ||
self.max_workers = max_workers | ||
self.n_queries = len(all_queries) | ||
self.cdf_version = cdf_version | ||
|
||
def fetch_all_datapoints(self) -> DatapointsList: | ||
pool = ConcurrencySettings.get_executor(max_workers=self.max_workers) | ||
|
@@ -141,13 +146,18 @@ def fetch_all_datapoints_numpy(self) -> DatapointsArrayList: | |
) | ||
|
||
def _request_datapoints(self, payload: _DatapointsPayload) -> Sequence[DataPointListItem]: | ||
headers: dict | None = None | ||
if self.cdf_version: | ||
headers = {"cdf-version": self.cdf_version} | ||
|
||
(res := DataPointListResponse()).MergeFromString( | ||
self.dps_client._do_request( | ||
json=payload, | ||
method="POST", | ||
url_path=f"{self.dps_client._RESOURCE_PATH}/list", | ||
accept="application/protobuf", | ||
timeout=self.dps_client._config.timeout, | ||
headers=headers, | ||
).content | ||
) | ||
return res.items | ||
|
@@ -531,6 +541,7 @@ def retrieve( | |
| DatapointsQuery | ||
| dict[str, Any] | ||
| SequenceNotStr[str | DatapointsQuery | dict[str, Any]] = None, | ||
instance_id: None | InstanceId | Sequence[InstanceId] = None, | ||
doctrino marked this conversation as resolved.
Show resolved
Hide resolved
|
||
start: int | str | datetime.datetime | None = None, | ||
end: int | str | datetime.datetime | None = None, | ||
aggregates: Aggregate | str | list[Aggregate | str] | None = None, | ||
|
@@ -562,6 +573,7 @@ def retrieve( | |
Args: | ||
id (None | int | DatapointsQuery | dict[str, Any] | Sequence[int | DatapointsQuery | dict[str, Any]]): Id, dict (with id) or (mixed) sequence of these. See examples below. | ||
external_id (None | str | DatapointsQuery | dict[str, Any] | SequenceNotStr[str | DatapointsQuery | dict[str, Any]]): External id, dict (with external id) or (mixed) sequence of these. See examples below. | ||
instance_id (None | InstanceId | Sequence[InstanceId]): Instance id or sequence of instance ids. If provided, the `id` and `external_id` arguments are ignored. | ||
start (int | str | datetime.datetime | None): Inclusive start. Default: 1970-01-01 UTC. | ||
end (int | str | datetime.datetime | None): Exclusive end. Default: "now" | ||
aggregates (Aggregate | str | list[Aggregate | str] | None): Single aggregate or list of aggregates to retrieve. Available options: ``average``, ``continuous_variance``, ``count``, ``count_bad``, ``count_good``, | ||
|
@@ -741,6 +753,7 @@ def retrieve( | |
end=end, | ||
id=id, | ||
external_id=external_id, | ||
instance_id=instance_id, | ||
aggregates=aggregates, | ||
granularity=granularity, | ||
timezone=timezone, | ||
|
@@ -753,7 +766,12 @@ def retrieve( | |
ignore_bad_datapoints=ignore_bad_datapoints, | ||
treat_uncertain_as_bad=treat_uncertain_as_bad, | ||
) | ||
fetcher = select_dps_fetch_strategy(self, full_query=query) | ||
cdf_version: str | None = None | ||
if instance_id: | ||
cdf_version = "alpha" | ||
self._use_instance_api() | ||
|
||
fetcher = select_dps_fetch_strategy(self, full_query=query, cdf_version=cdf_version) | ||
|
||
dps_lst = fetcher.fetch_all_datapoints() | ||
if not query.is_single_identifier: | ||
|
@@ -1311,6 +1329,7 @@ def insert( | |
| Sequence[tuple[int | float | datetime.datetime, int | float | str]], | ||
id: int | None = None, | ||
external_id: str | None = None, | ||
instance_id: InstanceId | None = None, | ||
doctrino marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> None: | ||
"""Insert datapoints into a time series | ||
|
||
|
@@ -1324,6 +1343,7 @@ def insert( | |
datapoints (Datapoints | DatapointsArray | Sequence[dict[str, int | float | str | datetime.datetime]] | Sequence[tuple[int | float | datetime.datetime, int | float | str]]): The datapoints you wish to insert. Can either be a list of tuples, a list of dictionaries, a Datapoints object or a DatapointsArray object. See examples below. | ||
id (int | None): Id of time series to insert datapoints into. | ||
external_id (str | None): External id of time series to insert datapoint into. | ||
instance_id (InstanceId | None): (Alpha) Instance ID of time series to insert datapoints into. | ||
|
||
Note: | ||
All datapoints inserted without a status code (or symbol) is assumed to be good (code 0). To mark a value, pass | ||
|
@@ -1388,9 +1408,19 @@ def insert( | |
... ) | ||
>>> client.time_series.data.insert(data, external_id="foo") | ||
""" | ||
post_dps_object = Identifier.of_either(id, external_id).as_dict() | ||
|
||
post_dps_object = Identifier.of_either(id, external_id, instance_id).as_dict() | ||
post_dps_object["datapoints"] = datapoints | ||
DatapointsPoster(self).insert([post_dps_object]) | ||
cdf_version: str | None = None | ||
if instance_id is not None: | ||
self._use_instance_api() | ||
cdf_version = "alpha" | ||
DatapointsPoster(self, cdf_version).insert([post_dps_object]) | ||
|
||
def _use_instance_api(self) -> None: | ||
FeaturePreviewWarning( | ||
api_maturity="alpha", feature_name="Datapoint with Instance API", sdk_maturity="alpha" | ||
).warn() | ||
|
||
def insert_multiple(self, datapoints: list[dict[str, str | int | list | Datapoints | DatapointsArray]]) -> None: | ||
"""`Insert datapoints into multiple time series <https://developer.cognite.com/api#tag/Time-series/operation/postMultiTimeSeriesDatapoints>`_ | ||
|
@@ -1451,14 +1481,27 @@ def insert_multiple(self, datapoints: list[dict[str, str | int | list | Datapoin | |
>>> to_insert.append({"external_id": "bar-clone", "datapoints": data_to_clone}) | ||
>>> client.time_series.data.insert_multiple(to_insert) | ||
""" | ||
DatapointsPoster(self).insert(datapoints) | ||
if not isinstance(datapoints, Sequence): | ||
raise ValueError("Input must be a list of dictionaries") | ||
cdf_version: str | None = None | ||
if any("instance_id" in d or "instanceId" in d for d in datapoints): | ||
self._use_instance_api() | ||
cdf_version = "alpha" | ||
for d in datapoints: | ||
if "instance_id" in d and isinstance(d["instance_id"], InstanceId): | ||
d["instance_id"] = d["instance_id"].dump(include_instance_type=False) # type: ignore[assignment] | ||
elif "instanceId" in d and isinstance(d["instanceId"], InstanceId): | ||
d["instanceId"] = d["instanceId"].dump(include_instance_type=False) # type: ignore[assignment] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why handle both instance_id and instanceId? I say just use the camelCased version. I find that it's easier for both users and developers when there's only one way to do a particular thing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also seems like this logic rather should live in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, it is not necessary. It is already handled in |
||
|
||
DatapointsPoster(self, cdf_version).insert(datapoints) | ||
|
||
def delete_range( | ||
self, | ||
start: int | str | datetime.datetime, | ||
end: int | str | datetime.datetime, | ||
id: int | None = None, | ||
external_id: str | None = None, | ||
instance_id: InstanceId | None = None, | ||
doctrino marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> None: | ||
"""Delete a range of datapoints from a time series. | ||
|
||
|
@@ -1467,6 +1510,7 @@ def delete_range( | |
end (int | str | datetime.datetime): Exclusive end of delete range | ||
id (int | None): Id of time series to delete data from | ||
external_id (str | None): External id of time series to delete data from | ||
instance_id (InstanceId | None): (Alpha) Instance ID of time series to delete data from | ||
|
||
Examples: | ||
|
||
|
@@ -1481,9 +1525,13 @@ def delete_range( | |
if end_ms <= start_ms: | ||
raise ValueError(f"{end=} must be larger than {start=}") | ||
|
||
identifier = Identifier.of_either(id, external_id).as_dict() | ||
identifier = Identifier.of_either(id, external_id, instance_id).as_dict() | ||
cdf_version: str | None = None | ||
if instance_id is not None: | ||
cdf_version = "alpha" | ||
self._use_instance_api() | ||
delete_dps_object = {**identifier, "inclusiveBegin": start_ms, "exclusiveEnd": end_ms} | ||
self._delete_datapoints_ranges([delete_dps_object]) | ||
self._delete_datapoints_ranges([delete_dps_object], cdf_version=cdf_version) | ||
|
||
def delete_ranges(self, ranges: list[dict[str, Any]]) -> None: | ||
"""`Delete a range of datapoints from multiple time series. <https://developer.cognite.com/api#tag/Time-series/operation/deleteDatapoints>`_ | ||
|
@@ -1511,8 +1559,11 @@ def delete_ranges(self, ranges: list[dict[str, Any]]) -> None: | |
valid_ranges.append(valid_range) | ||
self._delete_datapoints_ranges(valid_ranges) | ||
|
||
def _delete_datapoints_ranges(self, delete_range_objects: list[dict]) -> None: | ||
self._post(url_path=self._RESOURCE_PATH + "/delete", json={"items": delete_range_objects}) | ||
def _delete_datapoints_ranges(self, delete_range_objects: list[dict], cdf_version: str | None = None) -> None: | ||
headers: dict | None = None | ||
if cdf_version: | ||
headers = {"cdf-version": cdf_version} | ||
self._post(url_path=self._RESOURCE_PATH + "/delete", json={"items": delete_range_objects}, headers=headers) | ||
|
||
def insert_dataframe(self, df: pd.DataFrame, external_id_headers: bool = True, dropna: bool = True) -> None: | ||
"""Insert a dataframe (columns must be unique). | ||
|
@@ -1592,11 +1643,12 @@ def dump(self) -> dict[str, Any]: | |
|
||
|
||
class DatapointsPoster: | ||
def __init__(self, dps_client: DatapointsAPI) -> None: | ||
def __init__(self, dps_client: DatapointsAPI, cdf_version: str | None = None) -> None: | ||
self.dps_client = dps_client | ||
self.dps_limit = self.dps_client._DPS_INSERT_LIMIT | ||
self.ts_limit = self.dps_client._POST_DPS_OBJECTS_LIMIT | ||
self.max_workers = self.dps_client._config.max_workers | ||
self.cdf_version = cdf_version | ||
|
||
def insert(self, dps_object_lst: list[dict[str, Any]]) -> None: | ||
to_insert = self._verify_and_prepare_dps_objects(dps_object_lst) | ||
|
@@ -1615,18 +1667,23 @@ def insert(self, dps_object_lst: list[dict[str, Any]]) -> None: | |
|
||
def _verify_and_prepare_dps_objects( | ||
self, dps_object_lst: list[dict[str, Any]] | ||
) -> list[tuple[tuple[str, int], list[_InsertDatapoint]]]: | ||
dps_to_insert = defaultdict(list) | ||
) -> list[tuple[tuple[str, int | str | dict], list[_InsertDatapoint]]]: | ||
dps_to_insert: dict[tuple[str, int | str | tuple[str, str]], list[_InsertDatapoint]] = defaultdict(list) | ||
for obj in dps_object_lst: | ||
validated: dict[str, Any] = validate_user_input_dict_with_identifier(obj, required_keys={"datapoints"}) | ||
validated_dps = self._parse_and_validate_dps(obj["datapoints"]) | ||
|
||
# Concatenate datapoints using identifier as key: | ||
if (xid := validated.get("externalId")) is not None: | ||
dps_to_insert["externalId", xid].extend(validated_dps) | ||
elif (instance_id := validated.get("instanceId")) is not None: | ||
dps_to_insert["instanceId", (instance_id["space"], instance_id["externalId"])].extend(validated_dps) | ||
else: | ||
dps_to_insert["id", validated["id"]].extend(validated_dps) | ||
return list(dps_to_insert.items()) | ||
return [ | ||
((id_name, {"space": id_[0], "externalId": id_[1]} if id_name == "instanceId" else id_), data) # type: ignore[index, misc] | ||
for (id_name, id_), data in dps_to_insert.items() | ||
] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here you're transforming from a dict to a tuple and then back to a dict - presumably because the dict is not hashable? Might require less back-and-forth and be simpler if validate_user_input_dict_with_identifier returns There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you are right, simplified it. Note sure what made me go for this complex solution. |
||
|
||
def _parse_and_validate_dps(self, dps: Datapoints | DatapointsArray | list[tuple | dict]) -> list[_InsertDatapoint]: | ||
if not dps: | ||
|
@@ -1694,7 +1751,11 @@ def _insert_datapoints(self, payload: list[dict[str, Any]]) -> None: | |
# Convert to memory intensive format as late as possible (and clean up after insert) | ||
for dct in payload: | ||
dct["datapoints"] = [dp.dump() for dp in dct["datapoints"]] | ||
self.dps_client._post(url_path=self.dps_client._RESOURCE_PATH, json={"items": payload}) | ||
headers: dict[str, str] | None = None | ||
if self.cdf_version: | ||
headers = {"cdf-version": self.cdf_version} | ||
|
||
self.dps_client._post(url_path=self.dps_client._RESOURCE_PATH, json={"items": payload}, headers=headers) | ||
for dct in payload: | ||
dct["datapoints"].clear() | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
from __future__ import annotations | ||
|
||
__version__ = "7.51.1" | ||
__version__ = "7.51.2" | ||
__api_subversion__ = "20230101" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Windows failed without this one