Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InstanceId TimeSeries #1825

Merged
merged 27 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5b2a7ea
refactor: instance_id for .insert
doctrino Jun 21, 2024
07eca7e
refactor: move back nodes and edges
doctrino Jun 21, 2024
586c5d3
style: happy mypy
doctrino Jun 21, 2024
717ce90
tests: updated tests
doctrino Jun 21, 2024
b9adc58
tests: updated test data
doctrino Jun 21, 2024
f81916e
Ãrefactor: instance_idà n Datapoints
doctrino Jun 21, 2024
ef0e1ed
feat: added instance_id to Datapoints as well
doctrino Jun 21, 2024
c15ab73
tests: Setup test
doctrino Jun 21, 2024
a2510b0
fix: all validation
doctrino Jun 21, 2024
c5bcd98
refactor: added alpha feature
doctrino Jun 21, 2024
2577642
refactor: implemented retrieve
doctrino Jun 21, 2024
6912c1b
feat: implemented instance id delete range
doctrino Jun 21, 2024
8f7940a
feat: implement multiple
doctrino Jun 21, 2024
e13f8ae
refactor: moved alpha client
doctrino Jun 21, 2024
2ef1547
refactor: update tests
doctrino Jun 21, 2024
2af865f
ci: fix
doctrino Jun 21, 2024
2545a51
style: linting
doctrino Jun 21, 2024
4589c0d
build; changelog
doctrino Jun 21, 2024
859607b
style: lintin
doctrino Jun 21, 2024
42213cc
build: trigger tests
doctrino Jun 25, 2024
33d9c60
refactor: nodeId and not InstanceId
doctrino Jun 25, 2024
28c6c98
Merge remote-tracking branch 'origin/master' into instance-id-timeseries
doctrino Jun 25, 2024
824900f
refactor: review feedback
doctrino Jun 25, 2024
caf2863
Merge remote-tracking branch 'origin/master' into instance-id-timeseries
doctrino Jun 26, 2024
613096b
build; bump
doctrino Jun 26, 2024
0e47249
Small simplification in datapoints api
erlendvollset Jun 26, 2024
8fe3752
Improve identifier tests and fix Identifier.as_tuple()
erlendvollset Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ runs:
- name: Install dependencies
shell: bash
run: |
pip install --upgrade pip poetry
python -m pip install --upgrade pip poetry
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Windows failed without this one

poetry config virtualenvs.create false
poetry install --no-interaction --no-ansi ${{ inputs.extras }}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ playground.py
scripts/tmp/
my_file.txt
.venv/
alpha.env
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.51.2] - 2024-06-24
### Added

- Alpha feature, in `client.time_series.data` support for `instance_id` in `.insert`, `insert_multiple`,
`.delete`, and `.retrieve` methods. This is an experimental feature and may change without warning.

## [7.51.1] - 2024-06-18
### Fixed

Expand Down
16 changes: 13 additions & 3 deletions cognite/client/_api/datapoint_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
_DatapointsPayloadItem,
)
from cognite.client.utils._auxiliary import is_unlimited
from cognite.client.utils._identifier import InstanceId
from cognite.client.utils._text import convert_all_keys_to_snake_case, to_snake_case
from cognite.client.utils._time import (
ZoneInfo,
Expand Down Expand Up @@ -87,6 +88,7 @@
DatapointsExternalId = Union[
str, DatapointsQuery, Dict[str, Any], SequenceNotStr[Union[str, DatapointsQuery, Dict[str, Any]]]
]
DatapointsInstanceId = Union[InstanceId, Sequence[InstanceId]]


@dataclass
Expand All @@ -100,6 +102,7 @@ class _FullDatapointsQuery:
end: int | str | datetime.datetime | None = None
id: DatapointsId | None = None
external_id: DatapointsExternalId | None = None
instance_id: InstanceId | Sequence[InstanceId] | None = None
aggregates: Aggregate | str | list[Aggregate | str] | None = None
granularity: str | None = None
timezone: str | datetime.timezone | ZoneInfo | None = None
Expand All @@ -117,9 +120,11 @@ def is_single_identifier(self) -> bool:
# No lists given and exactly one of id/xid was given:
return (
isinstance(self.id, (dict, DatapointsQuery, numbers.Integral))
and self.external_id is None
and (self.external_id is None and self.instance_id is None)
or isinstance(self.external_id, (dict, DatapointsQuery, str))
and self.id is None
and (self.id is None and self.instance_id is None)
or isinstance(self.instance_id, InstanceId)
and (self.id is None and self.external_id is None)
)

@cached_property
Expand All @@ -146,12 +151,17 @@ def parse_into_queries(self) -> list[DatapointsQuery]:
queries.extend(self._parse(id_, arg_name="id", exp_type=numbers.Integral))
if (xid := self.external_id) is not None:
queries.extend(self._parse(xid, arg_name="external_id", exp_type=str))
if (iid := self.instance_id) is not None:
queries.extend(self._parse(iid, arg_name="instance_id", exp_type=InstanceId))
if queries:
return queries
raise ValueError("Pass at least one time series `id` or `external_id`!")

def _parse(
self, id_or_xid: DatapointsId | DatapointsExternalId, arg_name: Literal["id", "external_id"], exp_type: type
self,
id_or_xid: DatapointsId | DatapointsExternalId | DatapointsInstanceId,
arg_name: Literal["id", "external_id", "instance_id"],
exp_type: type,
) -> list[DatapointsQuery]:
user_queries: SequenceNotStr[int | str | DatapointsQuery | dict[str, Any]]
if isinstance(id_or_xid, (dict, DatapointsQuery, exp_type)):
Expand Down
95 changes: 78 additions & 17 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
unpack_items_in_payload,
)
from cognite.client.utils._concurrency import ConcurrencySettings, execute_tasks
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore, InstanceId
from cognite.client.utils._importing import import_as_completed, local_import
from cognite.client.utils._time import (
ZoneInfo,
Expand Down Expand Up @@ -89,18 +90,20 @@
_TResLst = TypeVar("_TResLst", DatapointsList, DatapointsArrayList)


def select_dps_fetch_strategy(dps_client: DatapointsAPI, full_query: _FullDatapointsQuery) -> DpsFetchStrategy:
def select_dps_fetch_strategy(
dps_client: DatapointsAPI, full_query: _FullDatapointsQuery, cdf_version: str | None = None
) -> DpsFetchStrategy:
all_queries = full_query.parse_into_queries()
full_query.validate(all_queries, dps_limit_raw=dps_client._DPS_LIMIT_RAW, dps_limit_agg=dps_client._DPS_LIMIT_AGG)
agg_queries, raw_queries = split_queries_into_raw_and_aggs(all_queries)

# Running mode is decided based on how many time series are requested VS. number of workers:
if len(all_queries) <= (max_workers := dps_client._config.max_workers):
# Start shooting requests from the hip immediately:
return EagerDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers)
return EagerDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers, cdf_version)
# Fetch a smaller, chunked batch of dps from all time series - which allows us to do some rudimentary
# guesstimation of dps density - then chunk away:
return ChunkingDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers)
return ChunkingDpsFetcher(dps_client, all_queries, agg_queries, raw_queries, max_workers, cdf_version)


def split_queries_into_raw_and_aggs(all_queries: _TSQueryList) -> tuple[_TSQueryList, _TSQueryList]:
Expand All @@ -118,13 +121,15 @@ def __init__(
agg_queries: _TSQueryList,
raw_queries: _TSQueryList,
max_workers: int,
cdf_version: str | None = None,
) -> None:
self.dps_client = dps_client
self.all_queries = all_queries
self.agg_queries = agg_queries
self.raw_queries = raw_queries
self.max_workers = max_workers
self.n_queries = len(all_queries)
self.cdf_version = cdf_version

def fetch_all_datapoints(self) -> DatapointsList:
pool = ConcurrencySettings.get_executor(max_workers=self.max_workers)
Expand All @@ -141,13 +146,18 @@ def fetch_all_datapoints_numpy(self) -> DatapointsArrayList:
)

def _request_datapoints(self, payload: _DatapointsPayload) -> Sequence[DataPointListItem]:
headers: dict | None = None
if self.cdf_version:
headers = {"cdf-version": self.cdf_version}

(res := DataPointListResponse()).MergeFromString(
self.dps_client._do_request(
json=payload,
method="POST",
url_path=f"{self.dps_client._RESOURCE_PATH}/list",
accept="application/protobuf",
timeout=self.dps_client._config.timeout,
headers=headers,
).content
)
return res.items
Expand Down Expand Up @@ -531,6 +541,7 @@ def retrieve(
| DatapointsQuery
| dict[str, Any]
| SequenceNotStr[str | DatapointsQuery | dict[str, Any]] = None,
instance_id: None | InstanceId | Sequence[InstanceId] = None,
doctrino marked this conversation as resolved.
Show resolved Hide resolved
start: int | str | datetime.datetime | None = None,
end: int | str | datetime.datetime | None = None,
aggregates: Aggregate | str | list[Aggregate | str] | None = None,
Expand Down Expand Up @@ -562,6 +573,7 @@ def retrieve(
Args:
id (None | int | DatapointsQuery | dict[str, Any] | Sequence[int | DatapointsQuery | dict[str, Any]]): Id, dict (with id) or (mixed) sequence of these. See examples below.
external_id (None | str | DatapointsQuery | dict[str, Any] | SequenceNotStr[str | DatapointsQuery | dict[str, Any]]): External id, dict (with external id) or (mixed) sequence of these. See examples below.
instance_id (None | InstanceId | Sequence[InstanceId]): Instance id or sequence of instance ids. If provided, the `id` and `external_id` arguments are ignored.
start (int | str | datetime.datetime | None): Inclusive start. Default: 1970-01-01 UTC.
end (int | str | datetime.datetime | None): Exclusive end. Default: "now"
aggregates (Aggregate | str | list[Aggregate | str] | None): Single aggregate or list of aggregates to retrieve. Available options: ``average``, ``continuous_variance``, ``count``, ``count_bad``, ``count_good``,
Expand Down Expand Up @@ -741,6 +753,7 @@ def retrieve(
end=end,
id=id,
external_id=external_id,
instance_id=instance_id,
aggregates=aggregates,
granularity=granularity,
timezone=timezone,
Expand All @@ -753,7 +766,12 @@ def retrieve(
ignore_bad_datapoints=ignore_bad_datapoints,
treat_uncertain_as_bad=treat_uncertain_as_bad,
)
fetcher = select_dps_fetch_strategy(self, full_query=query)
cdf_version: str | None = None
if instance_id:
cdf_version = "alpha"
self._use_instance_api()

fetcher = select_dps_fetch_strategy(self, full_query=query, cdf_version=cdf_version)

dps_lst = fetcher.fetch_all_datapoints()
if not query.is_single_identifier:
Expand Down Expand Up @@ -1311,6 +1329,7 @@ def insert(
| Sequence[tuple[int | float | datetime.datetime, int | float | str]],
id: int | None = None,
external_id: str | None = None,
instance_id: InstanceId | None = None,
doctrino marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Insert datapoints into a time series

Expand All @@ -1324,6 +1343,7 @@ def insert(
datapoints (Datapoints | DatapointsArray | Sequence[dict[str, int | float | str | datetime.datetime]] | Sequence[tuple[int | float | datetime.datetime, int | float | str]]): The datapoints you wish to insert. Can either be a list of tuples, a list of dictionaries, a Datapoints object or a DatapointsArray object. See examples below.
id (int | None): Id of time series to insert datapoints into.
external_id (str | None): External id of time series to insert datapoint into.
instance_id (InstanceId | None): (Alpha) Instance ID of time series to insert datapoints into.

Note:
All datapoints inserted without a status code (or symbol) is assumed to be good (code 0). To mark a value, pass
Expand Down Expand Up @@ -1388,9 +1408,19 @@ def insert(
... )
>>> client.time_series.data.insert(data, external_id="foo")
"""
post_dps_object = Identifier.of_either(id, external_id).as_dict()

post_dps_object = Identifier.of_either(id, external_id, instance_id).as_dict()
post_dps_object["datapoints"] = datapoints
DatapointsPoster(self).insert([post_dps_object])
cdf_version: str | None = None
if instance_id is not None:
self._use_instance_api()
cdf_version = "alpha"
DatapointsPoster(self, cdf_version).insert([post_dps_object])

def _use_instance_api(self) -> None:
FeaturePreviewWarning(
api_maturity="alpha", feature_name="Datapoint with Instance API", sdk_maturity="alpha"
).warn()

def insert_multiple(self, datapoints: list[dict[str, str | int | list | Datapoints | DatapointsArray]]) -> None:
"""`Insert datapoints into multiple time series <https://developer.cognite.com/api#tag/Time-series/operation/postMultiTimeSeriesDatapoints>`_
Expand Down Expand Up @@ -1451,14 +1481,27 @@ def insert_multiple(self, datapoints: list[dict[str, str | int | list | Datapoin
>>> to_insert.append({"external_id": "bar-clone", "datapoints": data_to_clone})
>>> client.time_series.data.insert_multiple(to_insert)
"""
DatapointsPoster(self).insert(datapoints)
if not isinstance(datapoints, Sequence):
raise ValueError("Input must be a list of dictionaries")
cdf_version: str | None = None
if any("instance_id" in d or "instanceId" in d for d in datapoints):
self._use_instance_api()
cdf_version = "alpha"
for d in datapoints:
if "instance_id" in d and isinstance(d["instance_id"], InstanceId):
d["instance_id"] = d["instance_id"].dump(include_instance_type=False) # type: ignore[assignment]
elif "instanceId" in d and isinstance(d["instanceId"], InstanceId):
d["instanceId"] = d["instanceId"].dump(include_instance_type=False) # type: ignore[assignment]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why handle both instance_id and instanceId? I say just use the camelCased version. I find that it's easier for both users and developers when there's only one way to do a particular thing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also seems like this logic rather should live in _verify_and_prepare_dps_objects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it is not necessary. It is already handled in _verify_and_prepare_dps_objects 🤦


DatapointsPoster(self, cdf_version).insert(datapoints)

def delete_range(
self,
start: int | str | datetime.datetime,
end: int | str | datetime.datetime,
id: int | None = None,
external_id: str | None = None,
instance_id: InstanceId | None = None,
doctrino marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Delete a range of datapoints from a time series.

Expand All @@ -1467,6 +1510,7 @@ def delete_range(
end (int | str | datetime.datetime): Exclusive end of delete range
id (int | None): Id of time series to delete data from
external_id (str | None): External id of time series to delete data from
instance_id (InstanceId | None): (Alpha) Instance ID of time series to delete data from

Examples:

Expand All @@ -1481,9 +1525,13 @@ def delete_range(
if end_ms <= start_ms:
raise ValueError(f"{end=} must be larger than {start=}")

identifier = Identifier.of_either(id, external_id).as_dict()
identifier = Identifier.of_either(id, external_id, instance_id).as_dict()
cdf_version: str | None = None
if instance_id is not None:
cdf_version = "alpha"
self._use_instance_api()
delete_dps_object = {**identifier, "inclusiveBegin": start_ms, "exclusiveEnd": end_ms}
self._delete_datapoints_ranges([delete_dps_object])
self._delete_datapoints_ranges([delete_dps_object], cdf_version=cdf_version)

def delete_ranges(self, ranges: list[dict[str, Any]]) -> None:
"""`Delete a range of datapoints from multiple time series. <https://developer.cognite.com/api#tag/Time-series/operation/deleteDatapoints>`_
Expand Down Expand Up @@ -1511,8 +1559,11 @@ def delete_ranges(self, ranges: list[dict[str, Any]]) -> None:
valid_ranges.append(valid_range)
self._delete_datapoints_ranges(valid_ranges)

def _delete_datapoints_ranges(self, delete_range_objects: list[dict]) -> None:
self._post(url_path=self._RESOURCE_PATH + "/delete", json={"items": delete_range_objects})
def _delete_datapoints_ranges(self, delete_range_objects: list[dict], cdf_version: str | None = None) -> None:
headers: dict | None = None
if cdf_version:
headers = {"cdf-version": cdf_version}
self._post(url_path=self._RESOURCE_PATH + "/delete", json={"items": delete_range_objects}, headers=headers)

def insert_dataframe(self, df: pd.DataFrame, external_id_headers: bool = True, dropna: bool = True) -> None:
"""Insert a dataframe (columns must be unique).
Expand Down Expand Up @@ -1592,11 +1643,12 @@ def dump(self) -> dict[str, Any]:


class DatapointsPoster:
def __init__(self, dps_client: DatapointsAPI) -> None:
def __init__(self, dps_client: DatapointsAPI, cdf_version: str | None = None) -> None:
self.dps_client = dps_client
self.dps_limit = self.dps_client._DPS_INSERT_LIMIT
self.ts_limit = self.dps_client._POST_DPS_OBJECTS_LIMIT
self.max_workers = self.dps_client._config.max_workers
self.cdf_version = cdf_version

def insert(self, dps_object_lst: list[dict[str, Any]]) -> None:
to_insert = self._verify_and_prepare_dps_objects(dps_object_lst)
Expand All @@ -1615,18 +1667,23 @@ def insert(self, dps_object_lst: list[dict[str, Any]]) -> None:

def _verify_and_prepare_dps_objects(
self, dps_object_lst: list[dict[str, Any]]
) -> list[tuple[tuple[str, int], list[_InsertDatapoint]]]:
dps_to_insert = defaultdict(list)
) -> list[tuple[tuple[str, int | str | dict], list[_InsertDatapoint]]]:
dps_to_insert: dict[tuple[str, int | str | tuple[str, str]], list[_InsertDatapoint]] = defaultdict(list)
for obj in dps_object_lst:
validated: dict[str, Any] = validate_user_input_dict_with_identifier(obj, required_keys={"datapoints"})
validated_dps = self._parse_and_validate_dps(obj["datapoints"])

# Concatenate datapoints using identifier as key:
if (xid := validated.get("externalId")) is not None:
dps_to_insert["externalId", xid].extend(validated_dps)
elif (instance_id := validated.get("instanceId")) is not None:
dps_to_insert["instanceId", (instance_id["space"], instance_id["externalId"])].extend(validated_dps)
else:
dps_to_insert["id", validated["id"]].extend(validated_dps)
return list(dps_to_insert.items())
return [
((id_name, {"space": id_[0], "externalId": id_[1]} if id_name == "instanceId" else id_), data) # type: ignore[index, misc]
for (id_name, id_), data in dps_to_insert.items()
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you're transforming from a dict to a tuple and then back to a dict - presumably because the dict is not hashable? Might require less back-and-forth and be simpler if validate_user_input_dict_with_identifier returns Identifier objects - which should be hashable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right, simplified it. Note sure what made me go for this complex solution.


def _parse_and_validate_dps(self, dps: Datapoints | DatapointsArray | list[tuple | dict]) -> list[_InsertDatapoint]:
if not dps:
Expand Down Expand Up @@ -1694,7 +1751,11 @@ def _insert_datapoints(self, payload: list[dict[str, Any]]) -> None:
# Convert to memory intensive format as late as possible (and clean up after insert)
for dct in payload:
dct["datapoints"] = [dp.dump() for dp in dct["datapoints"]]
self.dps_client._post(url_path=self.dps_client._RESOURCE_PATH, json={"items": payload})
headers: dict[str, str] | None = None
if self.cdf_version:
headers = {"cdf-version": self.cdf_version}

self.dps_client._post(url_path=self.dps_client._RESOURCE_PATH, json={"items": payload}, headers=headers)
for dct in payload:
dct["datapoints"].clear()

Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.51.1"
__version__ = "7.51.2"
__api_subversion__ = "20230101"
Loading