Skip to content

Commit

Permalink
update and simplify cognite/client/data_classes/datapoints.py
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt committed Oct 19, 2023
1 parent 24d2910 commit 20774dc
Showing 1 changed file with 21 additions and 59 deletions.
80 changes: 21 additions & 59 deletions cognite/client/data_classes/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,65 +657,45 @@ def _repr_html_(self) -> str:
return notebook_display_with_fallback(self, include_errors=is_synthetic_dps)


class DatapointsArrayList(CogniteResourceList[DatapointsArray]):
_RESOURCE = DatapointsArray

class DuplicatedIdsMixin:
def __init__(self, resources: Collection[Any], cognite_client: CogniteClient | None = None) -> None:
super().__init__(resources, cognite_client)

# Fix what happens for duplicated identifiers:
ids = [dps.id for dps in self if dps.id is not None]
xids = [dps.external_id for dps in self if dps.external_id is not None]
dupe_ids, id_dct = find_duplicates(ids), defaultdict(list)
dupe_xids, xid_dct = find_duplicates(xids), defaultdict(list)
idents_to_update = defaultdict(list)
dupe_ids, dupe_xids = find_duplicates(ids), find_duplicates(xids)

for dps in self:
if (id_ := dps.id) is not None and id_ in dupe_ids:
id_dct[id_].append(dps)
idents_to_update[("id", id_)].append(dps)
if (xid := dps.external_id) is not None and xid in dupe_xids:
xid_dct[xid].append(dps)
idents_to_update[("external_id", xid)].append(dps)

self._id_to_item.update(id_dct)
self._external_id_to_item.update(xid_dct)
self._identifier_to_items.update(idents_to_update)

def concat_duplicate_ids(self) -> None:
"""
Concatenates all arrays with duplicated IDs.

Arrays with the same ids are stacked in chronological order.
class DatapointsArrayList(DuplicatedIdsMixin, CogniteResourceList[DatapointsArray]):
_RESOURCE = DatapointsArray

**Caveat** This method is not guaranteed to preserve the order of the list.
"""
# Rebuilt list instead of removing duplicated one at a time at the cost of O(n).
def concat_duplicate_ids(self) -> None:
"""Concatenates all arrays with duplicated IDs in chronological order."""
old_data = self.data[:]
self.data.clear()

# This implementation takes advantage of the ordering of the duplicated in the __init__ method
has_external_ids = set()
for ext_id, items in self._external_id_to_item.items():
if not isinstance(items, list):
self.data.append(items)
if items.id is not None:
has_external_ids.add(items.id)
for item in old_data:
if item.id is None:
raise ValueError("'concat_duplicate_ids' requires 'id' to be set on all elements")
if not isinstance(item, list):
self.data.append(item)
continue
concatenated = DatapointsArray.create_from_arrays(*items)
self._external_id_to_item[ext_id] = concatenated
if concatenated.id is not None:
has_external_ids.add(concatenated.id)
self._id_to_item[concatenated.id] = concatenated
self.data.append(concatenated)

if not (only_ids := set(self._id_to_item) - has_external_ids):
return

for id_, items in self._id_to_item.items():
if id_ not in only_ids:
continue
if not isinstance(items, list):
self.data.append(items)
continue
concatenated = DatapointsArray.create_from_arrays(*items)
self._id_to_item[id_] = concatenated
concatenated = DatapointsArray.create_from_arrays(*item)
self.data.append(concatenated)
self._identifier_to_items[("id", concatenated.id)] = concatenated.id
if concatenated.external_id is not None:
self._identifier_to_items[("external_id", concatenated.external_id)] = concatenated.external_id

def get( # type: ignore [override]
self,
Expand Down Expand Up @@ -775,27 +755,9 @@ def dump(self, camel_case: bool = False, convert_timestamps: bool = False) -> li
return [dps.dump(camel_case, convert_timestamps) for dps in self]


class DatapointsList(CogniteResourceList[Datapoints]):
class DatapointsList(DuplicatedIdsMixin, CogniteResourceList[Datapoints]):
_RESOURCE = Datapoints

def __init__(self, resources: Collection[Any], cognite_client: CogniteClient | None = None) -> None:
super().__init__(resources, cognite_client)

# Fix what happens for duplicated identifiers:
ids = [dps.id for dps in self if dps.id is not None]
xids = [dps.external_id for dps in self if dps.external_id is not None]
dupe_ids, id_dct = find_duplicates(ids), defaultdict(list)
dupe_xids, xid_dct = find_duplicates(xids), defaultdict(list)

for dps in self:
if (id_ := dps.id) is not None and id_ in dupe_ids:
id_dct[id_].append(dps)
if (xid := dps.external_id) is not None and xid in dupe_xids:
xid_dct[xid].append(dps)

self._id_to_item.update(id_dct)
self._external_id_to_item.update(xid_dct)

def get( # type: ignore [override]
self,
id: int | None = None,
Expand Down

0 comments on commit 20774dc

Please sign in to comment.