From 6a0d9a27cf97a3acab61865db1f8ed3401464c8c Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Thu, 9 Feb 2023 12:28:01 -0500 Subject: [PATCH 1/4] dont use take with arrow --- packages/vaex-core/vaex/column.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/vaex-core/vaex/column.py b/packages/vaex-core/vaex/column.py index 40de5aab72..84a50e5837 100644 --- a/packages/vaex-core/vaex/column.py +++ b/packages/vaex-core/vaex/column.py @@ -382,8 +382,17 @@ def __getitem__(self, slice): # arrow and numpy do not like the negative indices, so we set them to 0 take_indices = indices.copy() take_indices[mask] = 0 - if isinstance(ar_unfiltered, supported_arrow_array_types): - ar = ar_unfiltered.take(vaex.array_types.to_arrow(take_indices)) + # Don't use .take in arrow anymore + # https://issues.apache.org/jira/browse/ARROW-9773 + # slice is zero-copy + if isinstance(ar_unfiltered, pa.Array): + ar = pa.concat_arrays( + [ar_unfiltered.slice(i, 1) for i in take_indices] + ) + elif isinstance(ar_unfiltered, pa.ChunkedArray): + ar = pa.concat_arrays( + [ar_unfiltered.slice(i, 1).combine_chunks() for i in take_indices] + ) else: ar = ar_unfiltered[take_indices] assert not np.ma.isMaskedArray(indices) @@ -825,4 +834,4 @@ def get_mask(self): return self.string_sequence.mask() def astype(self, type): - return self.to_numpy().astype(type) \ No newline at end of file + return self.to_numpy().astype(type) From 5d7ea6044d351934827372e105f6005df54a2cf2 Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Mon, 13 Feb 2023 13:44:05 -0500 Subject: [PATCH 2/4] array_types take updated --- packages/vaex-core/vaex/array_types.py | 12 +++++++++++- packages/vaex-core/vaex/column.py | 18 +++--------------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/packages/vaex-core/vaex/array_types.py b/packages/vaex-core/vaex/array_types.py index 3e10f3189c..4da642bf09 100644 --- a/packages/vaex-core/vaex/array_types.py +++ b/packages/vaex-core/vaex/array_types.py @@ -49,7 +49,17 @@ def filter(ar, boolean_mask): def take(ar, indices): - return ar.take(indices) + if isinstance(ar, pa.ChunkedArray): + # Don't use .take in arrow for chunked arrays + # https://issues.apache.org/jira/browse/ARROW-9773 + # slice is zero-copy + return pa.concat_arrays( + [ar.slice(i, 1).combine_chunks() for i in indices] + ) + elif isinstance(ar, pa.lib.Array): + return ar.take(vaex.array_types.to_arrow(indices)) + else: + return ar[indices] def slice(ar, offset, length=None): diff --git a/packages/vaex-core/vaex/column.py b/packages/vaex-core/vaex/column.py index 84a50e5837..c11dd208c8 100644 --- a/packages/vaex-core/vaex/column.py +++ b/packages/vaex-core/vaex/column.py @@ -11,7 +11,7 @@ import vaex import vaex.utils import vaex.cache -from .array_types import supported_array_types, supported_arrow_array_types, string_types, is_string_type +from .array_types import supported_array_types, supported_arrow_array_types, string_types, is_string_type, take if vaex.utils.has_c_extension: @@ -382,19 +382,7 @@ def __getitem__(self, slice): # arrow and numpy do not like the negative indices, so we set them to 0 take_indices = indices.copy() take_indices[mask] = 0 - # Don't use .take in arrow anymore - # https://issues.apache.org/jira/browse/ARROW-9773 - # slice is zero-copy - if isinstance(ar_unfiltered, pa.Array): - ar = pa.concat_arrays( - [ar_unfiltered.slice(i, 1) for i in take_indices] - ) - elif isinstance(ar_unfiltered, pa.ChunkedArray): - ar = pa.concat_arrays( - [ar_unfiltered.slice(i, 1).combine_chunks() for i in take_indices] - ) - else: - ar = ar_unfiltered[take_indices] + ar = take(ar_unfiltered, take_indices) assert not np.ma.isMaskedArray(indices) if self.masked: # TODO: we probably want to keep this as arrow array if it originally was @@ -603,7 +591,7 @@ def _is_stringy(x): def _to_string_sequence(x, force=True): if isinstance(x, pa.DictionaryArray): - x = x.dictionary.take(x.indices) # equivalent to PyArrow 5.0.0's dictionary_decode() but backwards compatible + x = take(x.dictionary, x.indices) # equivalent to PyArrow 5.0.0's dictionary_decode() but backwards compatible if isinstance(x, pa.ChunkedArray): # turn into pa.Array, TODO: do we want this, this may result in a big mem copy table = pa.Table.from_arrays([x], ["single"]) From 4b21a6430e4e85c585df3cbef248ae85a63d423f Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Mon, 13 Feb 2023 13:49:15 -0500 Subject: [PATCH 3/4] in module already --- packages/vaex-core/vaex/array_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/vaex-core/vaex/array_types.py b/packages/vaex-core/vaex/array_types.py index 4da642bf09..d64a07199c 100644 --- a/packages/vaex-core/vaex/array_types.py +++ b/packages/vaex-core/vaex/array_types.py @@ -57,7 +57,7 @@ def take(ar, indices): [ar.slice(i, 1).combine_chunks() for i in indices] ) elif isinstance(ar, pa.lib.Array): - return ar.take(vaex.array_types.to_arrow(indices)) + return ar.take(to_arrow(indices)) else: return ar[indices] From 0a47d13ae017c33e37885408c8a44b0b0555dc34 Mon Sep 17 00:00:00 2001 From: "Maarten A. Breddels" Date: Tue, 14 Feb 2023 19:57:30 +0100 Subject: [PATCH 4/4] more takes --- packages/vaex-core/vaex/cpu.py | 4 ++-- packages/vaex-core/vaex/dataframe.py | 4 ++-- packages/vaex-core/vaex/functions.py | 2 +- packages/vaex-core/vaex/groupby.py | 10 +++++----- packages/vaex-core/vaex/hash.py | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/vaex-core/vaex/cpu.py b/packages/vaex-core/vaex/cpu.py index 3c6240d453..361a9dfff6 100644 --- a/packages/vaex-core/vaex/cpu.py +++ b/packages/vaex-core/vaex/cpu.py @@ -251,7 +251,7 @@ def reduce(self, others: List["TaskPartValueCounts"]): deletes.append(counter.nan_index) if vaex.array_types.is_arrow_array(keys): indices = np.delete(np.arange(len(keys)), deletes) - keys = keys.take(indices) + keys = vaex.array_types.take(keys. indices) else: keys = np.delete(keys, deletes) if not self.dropmissing and counter.has_null: @@ -264,7 +264,7 @@ def reduce(self, others: List["TaskPartValueCounts"]): if not self.ascending: order = order[::-1] counts = counts[order] - keys = keys.take(order) + keys = vaex.array_types.take(keys, order) keys = keys.tolist() if None in keys: diff --git a/packages/vaex-core/vaex/dataframe.py b/packages/vaex-core/vaex/dataframe.py index febf8e78af..13fb4522f4 100644 --- a/packages/vaex-core/vaex/dataframe.py +++ b/packages/vaex-core/vaex/dataframe.py @@ -599,7 +599,7 @@ def unique(self, expression, return_inverse=False, dropna=False, dropnan=False, keys = pa.array(self.category_labels(expression)) @delayed def encode(codes): - used_keys = keys.take(codes) + used_keys = vaex.array_types.take(keys, codes) return vaex.array_types.convert(used_keys, array_type) codes = self[expression].index_values().unique(delay=True) return self._delay(delay, encode(codes)) @@ -659,7 +659,7 @@ def reduce(a, b): if isinstance(keys, (vaex.strings.StringList32, vaex.strings.StringList64)): keys = vaex.strings.to_arrow(keys) indices = np.delete(np.arange(len(keys)), deletes) - keys = keys.take(indices) + keys = vaex.array_types.take(keys, indices) else: keys = np.delete(keys, deletes) if not dropmissing and hash_map_unique.has_null: diff --git a/packages/vaex-core/vaex/functions.py b/packages/vaex-core/vaex/functions.py index fcfc69e6ee..09d4ac61cd 100644 --- a/packages/vaex-core/vaex/functions.py +++ b/packages/vaex-core/vaex/functions.py @@ -2493,7 +2493,7 @@ def _map(ar, value_to_index, choices, default_value=None, use_missing=False, axi ar = vaex.array_types.to_numpy(ar) indices = value_to_index.map(ar) + 1 - values = choices.take(indices) + values = vaex.array_types.take(choices, indices) if np.ma.isMaskedArray(ar): mask = np.ma.getmaskarray(ar).copy() # also mask out the missing (which had -1 and was moved to 0) diff --git a/packages/vaex-core/vaex/groupby.py b/packages/vaex-core/vaex/groupby.py index 4eab91d3b0..a34a5e4836 100644 --- a/packages/vaex-core/vaex/groupby.py +++ b/packages/vaex-core/vaex/groupby.py @@ -284,7 +284,7 @@ def process(hashmap_unique: vaex.hash.HashMapUnique): indices = pa.compute.sort_indices(self.bin_values, sort_keys=[("x", "ascending" if ascending else "descending")]) self.sort_indices = vaex.array_types.to_numpy(indices) # the bin_values will still be pre sorted, maybe that is confusing (implementation detail) - self.bin_values = pa.compute.take(self.bin_values, self.sort_indices) + self.bin_values = vaex.array_types.take(self.bin_values, self.sort_indices) else: self.sort_indices = None self.hashmap_unique = hashmap_unique @@ -377,10 +377,10 @@ def compress(ar): if dtype.is_struct: # collapse parent struct into our flat struct for field, ar in zip(parent.bin_values.type, parent.bin_values.flatten()): - bin_values[field.name] = ar.take(indices) + bin_values[field.name] = vaex.array_types.take(ar, indices) # bin_values[field.name] = pa.DictionaryArray.from_arrays(indices, ar) else: - bin_values[parent.label] = parent.bin_values.take(indices) + bin_values[parent.label] = vaex.array_types.take(parent.bin_values, indices) # bin_values[parent.label] = pa.DictionaryArray.from_arrays(indices, parent.bin_values) logger.info(f"extracing labels of parent groupers done") return pa.StructArray.from_arrays(bin_values.values(), bin_values.keys()) @@ -418,7 +418,7 @@ def __init__(self, expression, df=None, sort=False, ascending=True, row_limit=No if self.sort: # not pre-sorting is faster sort_indices = pa.compute.sort_indices(self.bin_values, sort_keys=[("x", "ascending" if ascending else "descending")]) - self.bin_values = pa.compute.take(self.bin_values, sort_indices) + self.bin_values = vaex.array_types.take((self.bin_values, sort_indices) if self.pre_sort: # we will map from int to int sort_indices = vaex.array_types.to_numpy(sort_indices) @@ -481,7 +481,7 @@ def __init__(self, expression, values, keep_other=True, other_value=None, sort=F values = pa.concat_arrays(values.chunks) if sort: indices = pa.compute.sort_indices(values, sort_keys=[("x", "ascending" if ascending else "descending")]) - values = pa.compute.take(values, indices) + values = vaex.array_types.take(values, indices) if self.keep_other: self.bin_values = pa.array(vaex.array_types.tolist(values) + [other_value]) diff --git a/packages/vaex-core/vaex/hash.py b/packages/vaex-core/vaex/hash.py index 5442073da3..77ef48af0d 100644 --- a/packages/vaex-core/vaex/hash.py +++ b/packages/vaex-core/vaex/hash.py @@ -254,7 +254,7 @@ def sorted(self, keys=None, ascending=True, indices=None, return_keys=False): indices = pa.compute.sort_indices(keys, sort_keys=[('x', "ascending" if ascending else "descending")]) if indices is None else indices # arrow sorts with null last null_index = -1 if not self.has_null else len(keys)-1 - keys = pa.compute.take(keys, indices) + keys = vaex.array_types.take(keys, indices) fingerprint = self._internal.fingerprint + "-sorted" if self.dtype_item.is_string: # TODO: supported 32 bit in hashmap