Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InstanceId TimeSeries #1825

Merged
merged 27 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5b2a7ea
refactor: instance_id for .insert
doctrino Jun 21, 2024
07eca7e
refactor: move back nodes and edges
doctrino Jun 21, 2024
586c5d3
style: happy mypy
doctrino Jun 21, 2024
717ce90
tests: updated tests
doctrino Jun 21, 2024
b9adc58
tests: updated test data
doctrino Jun 21, 2024
f81916e
Ãrefactor: instance_idà n Datapoints
doctrino Jun 21, 2024
ef0e1ed
feat: added instance_id to Datapoints as well
doctrino Jun 21, 2024
c15ab73
tests: Setup test
doctrino Jun 21, 2024
a2510b0
fix: all validation
doctrino Jun 21, 2024
c5bcd98
refactor: added alpha feature
doctrino Jun 21, 2024
2577642
refactor: implemented retrieve
doctrino Jun 21, 2024
6912c1b
feat: implemented instance id delete range
doctrino Jun 21, 2024
8f7940a
feat: implement multiple
doctrino Jun 21, 2024
e13f8ae
refactor: moved alpha client
doctrino Jun 21, 2024
2ef1547
refactor: update tests
doctrino Jun 21, 2024
2af865f
ci: fix
doctrino Jun 21, 2024
2545a51
style: linting
doctrino Jun 21, 2024
4589c0d
build; changelog
doctrino Jun 21, 2024
859607b
style: lintin
doctrino Jun 21, 2024
42213cc
build: trigger tests
doctrino Jun 25, 2024
33d9c60
refactor: nodeId and not InstanceId
doctrino Jun 25, 2024
28c6c98
Merge remote-tracking branch 'origin/master' into instance-id-timeseries
doctrino Jun 25, 2024
824900f
refactor: review feedback
doctrino Jun 25, 2024
caf2863
Merge remote-tracking branch 'origin/master' into instance-id-timeseries
doctrino Jun 26, 2024
613096b
build; bump
doctrino Jun 26, 2024
0e47249
Small simplification in datapoints api
erlendvollset Jun 26, 2024
8fe3752
Improve identifier tests and fix Identifier.as_tuple()
erlendvollset Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.52.1] - 2024-06-18
## [7.52.2] - 2024-06-26
### Added

- Alpha feature, in `client.time_series.data` support for `instance_id` in `.insert`, `insert_multiple`,
`.delete`, and `.retrieve` methods. This is an experimental feature and may change without warning.

## [7.52.1] - 2024-06-26
### Fixed

- Calling `.extend` on a `NodeListWithCursor` or `EdgeListWithCursor` would raise a `TypeError`. This is now fixed.
Expand Down
16 changes: 13 additions & 3 deletions cognite/client/_api/datapoint_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
_DatapointsPayloadItem,
)
from cognite.client.utils._auxiliary import is_unlimited
from cognite.client.utils._identifier import InstanceId
from cognite.client.utils._text import convert_all_keys_to_snake_case, to_snake_case
from cognite.client.utils._time import (
ZoneInfo,
Expand Down Expand Up @@ -87,6 +88,7 @@
DatapointsExternalId = Union[
str, DatapointsQuery, Dict[str, Any], SequenceNotStr[Union[str, DatapointsQuery, Dict[str, Any]]]
]
DatapointsInstanceId = Union[InstanceId, Sequence[InstanceId]]


@dataclass
Expand All @@ -100,6 +102,7 @@ class _FullDatapointsQuery:
end: int | str | datetime.datetime | None = None
id: DatapointsId | None = None
external_id: DatapointsExternalId | None = None
instance_id: InstanceId | Sequence[InstanceId] | None = None
aggregates: Aggregate | str | list[Aggregate | str] | None = None
granularity: str | None = None
timezone: str | datetime.timezone | ZoneInfo | None = None
Expand All @@ -117,9 +120,11 @@ def is_single_identifier(self) -> bool:
# No lists given and exactly one of id/xid was given:
return (
isinstance(self.id, (dict, DatapointsQuery, numbers.Integral))
and self.external_id is None
and (self.external_id is None and self.instance_id is None)
or isinstance(self.external_id, (dict, DatapointsQuery, str))
and self.id is None
and (self.id is None and self.instance_id is None)
or isinstance(self.instance_id, InstanceId)
and (self.id is None and self.external_id is None)
)

@cached_property
Expand All @@ -146,12 +151,17 @@ def parse_into_queries(self) -> list[DatapointsQuery]:
queries.extend(self._parse(id_, arg_name="id", exp_type=numbers.Integral))
if (xid := self.external_id) is not None:
queries.extend(self._parse(xid, arg_name="external_id", exp_type=str))
if (iid := self.instance_id) is not None:
queries.extend(self._parse(iid, arg_name="instance_id", exp_type=InstanceId))
if queries:
return queries
raise ValueError("Pass at least one time series `id` or `external_id`!")

def _parse(
self, id_or_xid: DatapointsId | DatapointsExternalId, arg_name: Literal["id", "external_id"], exp_type: type
self,
id_or_xid: DatapointsId | DatapointsExternalId | DatapointsInstanceId,
arg_name: Literal["id", "external_id", "instance_id"],
exp_type: type,
) -> list[DatapointsQuery]:
user_queries: SequenceNotStr[int | str | DatapointsQuery | dict[str, Any]]
if isinstance(id_or_xid, (dict, DatapointsQuery, exp_type)):
Expand Down
102 changes: 75 additions & 27 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
DatapointsQuery,
LatestDatapointQuery,
)
from cognite.client.data_classes.data_modeling.ids import NodeId
from cognite.client.data_classes.datapoints import Aggregate, _DatapointsPayload, _DatapointsPayloadItem
from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError
from cognite.client.utils import _json
Expand All @@ -57,6 +58,7 @@
unpack_items_in_payload,
)
from cognite.client.utils._concurrency import ConcurrencySettings, execute_tasks
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore
from cognite.client.utils._importing import import_as_completed, local_import
from cognite.client.utils._time import (
Expand Down Expand Up @@ -89,18 +91,20 @@
_TResLst = TypeVar("_TResLst", DatapointsList, DatapointsArrayList)


def select_dps_fetch_strategy(dps_client: DatapointsAPI, full_query: _FullDatapointsQuery) -> DpsFetchStrategy:
def select_dps_fetch_strategy(
dps_client: DatapointsAPI, full_query: _FullDatapointsQuery, cdf_version: str | None = None
) -> DpsFetchStrategy:
all_queries = full_query.parse_into_queries()
full_query.validate(all_queries, dps_limit_raw=dps_client._DPS_LIMIT_RAW, dps_limit_agg=dps_client._DPS_LIMIT_AGG)
agg_queries, raw_queries = split_queries_into_raw_and_aggs(all_queries)

# Running mode is decided based on how many time series are requested VS. number of workers:
if len(all_queries) <= (max_workers := dps_client._config.max_workers):
# Start shooting requests from the hip immediately:
return EagerDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers)
return EagerDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers, cdf_version)
# Fetch a smaller, chunked batch of dps from all time series - which allows us to do some rudimentary
# guesstimation of dps density - then chunk away:
return ChunkingDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers)
return ChunkingDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers, cdf_version)


def split_queries_into_raw_and_aggs(all_queries: _TSQueryList) -> tuple[_TSQueryList, _TSQueryList]:
Expand All @@ -118,13 +122,15 @@ def __init__(
agg_queries: _TSQueryList,
raw_queries: _TSQueryList,
max_workers: int,
cdf_version: str | None = None,
) -> None:
self.dps_client = dps_client
self.all_queries = all_queries
self.agg_queries = agg_queries
self.raw_queries = raw_queries
self.max_workers = max_workers
self.n_queries = len(all_queries)
self.cdf_version = cdf_version

def fetch_all_datapoints(self) -> DatapointsList:
pool = ConcurrencySettings.get_executor(max_workers=self.max_workers)
Expand All @@ -141,13 +147,18 @@ def fetch_all_datapoints_numpy(self) -> DatapointsArrayList:
)

def _request_datapoints(self, payload: _DatapointsPayload) -> Sequence[DataPointListItem]:
headers: dict | None = None
if self.cdf_version:
headers = {"cdf-version": self.cdf_version}

(res := DataPointListResponse()).MergeFromString(
self.dps_client._do_request(
json=payload,
method="POST",
url_path=f"{self.dps_client._RESOURCE_PATH}/list",
accept="application/protobuf",
timeout=self.dps_client._config.timeout,
headers=headers,
).content
)
return res.items
Expand Down Expand Up @@ -531,6 +542,7 @@ def retrieve(
| DatapointsQuery
| dict[str, Any]
| SequenceNotStr[str | DatapointsQuery | dict[str, Any]] = None,
instance_id: None | NodeId | Sequence[NodeId] = None,
start: int | str | datetime.datetime | None = None,
end: int | str | datetime.datetime | None = None,
aggregates: Aggregate | str | list[Aggregate | str] | None = None,
Expand Down Expand Up @@ -562,6 +574,7 @@ def retrieve(
Args:
id (None | int | DatapointsQuery | dict[str, Any] | Sequence[int | DatapointsQuery | dict[str, Any]]): Id, dict (with id) or (mixed) sequence of these. See examples below.
external_id (None | str | DatapointsQuery | dict[str, Any] | SequenceNotStr[str | DatapointsQuery | dict[str, Any]]): External id, dict (with external id) or (mixed) sequence of these. See examples below.
instance_id (None | NodeId | Sequence[NodeId]): Instance id or sequence of instance ids. If provided, the `id` and `external_id` arguments are ignored.
start (int | str | datetime.datetime | None): Inclusive start. Default: 1970-01-01 UTC.
end (int | str | datetime.datetime | None): Exclusive end. Default: "now"
aggregates (Aggregate | str | list[Aggregate | str] | None): Single aggregate or list of aggregates to retrieve. Available options: ``average``, ``continuous_variance``, ``count``, ``count_bad``, ``count_good``,
Expand Down Expand Up @@ -741,6 +754,7 @@ def retrieve(
end=end,
id=id,
external_id=external_id,
instance_id=instance_id,
aggregates=aggregates,
granularity=granularity,
timezone=timezone,
Expand All @@ -753,7 +767,12 @@ def retrieve(
ignore_bad_datapoints=ignore_bad_datapoints,
treat_uncertain_as_bad=treat_uncertain_as_bad,
)
fetcher = select_dps_fetch_strategy(self, full_query=query)
cdf_version: str | None = None
if instance_id:
cdf_version = "alpha"
self._use_instance_api()

fetcher = select_dps_fetch_strategy(self, full_query=query, cdf_version=cdf_version)

dps_lst = fetcher.fetch_all_datapoints()
if not query.is_single_identifier:
Expand Down Expand Up @@ -1311,6 +1330,7 @@ def insert(
| Sequence[tuple[int | float | datetime.datetime, int | float | str]],
id: int | None = None,
external_id: str | None = None,
instance_id: NodeId | None = None,
) -> None:
"""Insert datapoints into a time series

Expand All @@ -1324,6 +1344,7 @@ def insert(
datapoints (Datapoints | DatapointsArray | Sequence[dict[str, int | float | str | datetime.datetime]] | Sequence[tuple[int | float | datetime.datetime, int | float | str]]): The datapoints you wish to insert. Can either be a list of tuples, a list of dictionaries, a Datapoints object or a DatapointsArray object. See examples below.
id (int | None): Id of time series to insert datapoints into.
external_id (str | None): External id of time series to insert datapoint into.
instance_id (NodeId | None): (Alpha) Instance ID of time series to insert datapoints into.

Note:
All datapoints inserted without a status code (or symbol) is assumed to be good (code 0). To mark a value, pass
Expand Down Expand Up @@ -1388,9 +1409,19 @@ def insert(
... )
>>> client.time_series.data.insert(data, external_id="foo")
"""
post_dps_object = Identifier.of_either(id, external_id).as_dict()

post_dps_object = Identifier.of_either(id, external_id, instance_id).as_dict()
post_dps_object["datapoints"] = datapoints
DatapointsPoster(self).insert([post_dps_object])
cdf_version: str | None = None
if instance_id is not None:
self._use_instance_api()
cdf_version = "alpha"
DatapointsPoster(self, cdf_version).insert([post_dps_object])

def _use_instance_api(self) -> None:
FeaturePreviewWarning(
api_maturity="alpha", feature_name="Datapoint with Instance API", sdk_maturity="alpha"
).warn()

def insert_multiple(self, datapoints: list[dict[str, str | int | list | Datapoints | DatapointsArray]]) -> None:
"""`Insert datapoints into multiple time series <https://developer.cognite.com/api#tag/Time-series/operation/postMultiTimeSeriesDatapoints>`_
Expand Down Expand Up @@ -1451,14 +1482,22 @@ def insert_multiple(self, datapoints: list[dict[str, str | int | list | Datapoin
>>> to_insert.append({"external_id": "bar-clone", "datapoints": data_to_clone})
>>> client.time_series.data.insert_multiple(to_insert)
"""
DatapointsPoster(self).insert(datapoints)
if not isinstance(datapoints, Sequence):
raise ValueError("Input must be a list of dictionaries")
cdf_version: str | None = None
if any("instance_id" in d or "instanceId" in d for d in datapoints):
self._use_instance_api()
cdf_version = "alpha"

DatapointsPoster(self, cdf_version).insert(datapoints)

def delete_range(
self,
start: int | str | datetime.datetime,
end: int | str | datetime.datetime,
id: int | None = None,
external_id: str | None = None,
instance_id: NodeId | None = None,
) -> None:
"""Delete a range of datapoints from a time series.

Expand All @@ -1467,6 +1506,7 @@ def delete_range(
end (int | str | datetime.datetime): Exclusive end of delete range
id (int | None): Id of time series to delete data from
external_id (str | None): External id of time series to delete data from
instance_id (NodeId | None): (Alpha) Instance ID of time series to delete data from

Examples:

Expand All @@ -1481,9 +1521,13 @@ def delete_range(
if end_ms <= start_ms:
raise ValueError(f"{end=} must be larger than {start=}")

identifier = Identifier.of_either(id, external_id).as_dict()
identifier = Identifier.of_either(id, external_id, instance_id).as_dict()
cdf_version: str | None = None
if instance_id is not None:
cdf_version = "alpha"
self._use_instance_api()
delete_dps_object = {**identifier, "inclusiveBegin": start_ms, "exclusiveEnd": end_ms}
self._delete_datapoints_ranges([delete_dps_object])
self._delete_datapoints_ranges([delete_dps_object], cdf_version=cdf_version)

def delete_ranges(self, ranges: list[dict[str, Any]]) -> None:
"""`Delete a range of datapoints from multiple time series. <https://developer.cognite.com/api#tag/Time-series/operation/deleteDatapoints>`_
Expand All @@ -1503,16 +1547,20 @@ def delete_ranges(self, ranges: list[dict[str, Any]]) -> None:
"""
valid_ranges = []
for time_range in ranges:
valid_range = validate_user_input_dict_with_identifier(time_range, required_keys={"start", "end"})
valid_range.update(
identifier = validate_user_input_dict_with_identifier(time_range, required_keys={"start", "end"})
valid_range = dict(
**identifier.as_dict(),
inclusiveBegin=timestamp_to_ms(time_range["start"]),
exclusiveEnd=timestamp_to_ms(time_range["end"]),
)
valid_ranges.append(valid_range)
self._delete_datapoints_ranges(valid_ranges)

def _delete_datapoints_ranges(self, delete_range_objects: list[dict]) -> None:
self._post(url_path=self._RESOURCE_PATH + "/delete", json={"items": delete_range_objects})
def _delete_datapoints_ranges(self, delete_range_objects: list[dict], cdf_version: str | None = None) -> None:
headers: dict | None = None
if cdf_version:
headers = {"cdf-version": cdf_version}
self._post(url_path=self._RESOURCE_PATH + "/delete", json={"items": delete_range_objects}, headers=headers)

def insert_dataframe(self, df: pd.DataFrame, external_id_headers: bool = True, dropna: bool = True) -> None:
"""Insert a dataframe (columns must be unique).
Expand Down Expand Up @@ -1592,11 +1640,12 @@ def dump(self) -> dict[str, Any]:


class DatapointsPoster:
def __init__(self, dps_client: DatapointsAPI) -> None:
def __init__(self, dps_client: DatapointsAPI, cdf_version: str | None = None) -> None:
self.dps_client = dps_client
self.dps_limit = self.dps_client._DPS_INSERT_LIMIT
self.ts_limit = self.dps_client._POST_DPS_OBJECTS_LIMIT
self.max_workers = self.dps_client._config.max_workers
self.cdf_version = cdf_version

def insert(self, dps_object_lst: list[dict[str, Any]]) -> None:
to_insert = self._verify_and_prepare_dps_objects(dps_object_lst)
Expand All @@ -1615,17 +1664,12 @@ def insert(self, dps_object_lst: list[dict[str, Any]]) -> None:

def _verify_and_prepare_dps_objects(
self, dps_object_lst: list[dict[str, Any]]
) -> list[tuple[tuple[str, int], list[_InsertDatapoint]]]:
dps_to_insert = defaultdict(list)
) -> list[tuple[Identifier, list[_InsertDatapoint]]]:
dps_to_insert: dict[Identifier, list[_InsertDatapoint]] = defaultdict(list)
for obj in dps_object_lst:
validated: dict[str, Any] = validate_user_input_dict_with_identifier(obj, required_keys={"datapoints"})
identifier = validate_user_input_dict_with_identifier(obj, required_keys={"datapoints"})
validated_dps = self._parse_and_validate_dps(obj["datapoints"])

# Concatenate datapoints using identifier as key:
if (xid := validated.get("externalId")) is not None:
dps_to_insert["externalId", xid].extend(validated_dps)
else:
dps_to_insert["id", validated["id"]].extend(validated_dps)
dps_to_insert[identifier].extend(validated_dps)
return list(dps_to_insert.items())

def _parse_and_validate_dps(self, dps: Datapoints | DatapointsArray | list[tuple | dict]) -> list[_InsertDatapoint]:
Expand Down Expand Up @@ -1674,13 +1718,13 @@ def _dps_are_dicts(dps: list[Any]) -> TypeGuard[list[dict]]:
return isinstance(dps[0], dict)

def _create_payload_tasks(
self, post_dps_objects: list[tuple[tuple[str, int], list[_InsertDatapoint]]]
self, post_dps_objects: list[tuple[Identifier, list[_InsertDatapoint]]]
) -> Iterator[list[dict[str, Any]]]:
payload = []
n_left = self.dps_limit
for (id_name, ident), dps in post_dps_objects:
for identifier, dps in post_dps_objects:
for next_chunk, is_full in self._split_datapoints(dps, n_left, self.dps_limit):
payload.append({id_name: ident, "datapoints": next_chunk})
payload.append({**identifier.as_dict(), "datapoints": next_chunk})
if is_full:
yield payload
payload = []
Expand All @@ -1694,7 +1738,11 @@ def _insert_datapoints(self, payload: list[dict[str, Any]]) -> None:
# Convert to memory intensive format as late as possible (and clean up after insert)
for dct in payload:
dct["datapoints"] = [dp.dump() for dp in dct["datapoints"]]
self.dps_client._post(url_path=self.dps_client._RESOURCE_PATH, json={"items": payload})
headers: dict[str, str] | None = None
if self.cdf_version:
headers = {"cdf-version": self.cdf_version}

self.dps_client._post(url_path=self.dps_client._RESOURCE_PATH, json={"items": payload}, headers=headers)
for dct in payload:
dct["datapoints"].clear()

Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.52.1"
__version__ = "7.52.2"
__api_subversion__ = "20230101"
Loading