From 40cc2f85ca95508e5b06faf0577df8494e3631c9 Mon Sep 17 00:00:00 2001 From: Robert Date: Fri, 22 Nov 2024 15:44:57 -0500 Subject: [PATCH] mibioa/query: Teach QuerySet.iterate() to return rows when indexes are not sorted by PK This is part 2 of 3 of the ChainedQuerySet series. --- mibios/query.py | 87 ++++++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/mibios/query.py b/mibios/query.py index 43d58bc..02f18af 100644 --- a/mibios/query.py +++ b/mibios/query.py @@ -483,12 +483,17 @@ class BaseIterable: once. Results are always ordered by primary key. """ DEFAULT_CHUNK_SIZE = 20000 + DEFAULT_SORT_KEY = 'pk' - def __init__(self, queryset, chunk_size=None): + def __init__(self, queryset, chunk_size=None, sortkey=None): if chunk_size is None: chunk_size = self.DEFAULT_CHUNK_SIZE elif chunk_size <= 0: raise ValueError('chunk size must be positive') + if sortkey is None: + self.sortkey = self.DEFAULT_SORT_KEY + else: + self.sortkey = sortkey self.queryset = queryset self.chunk_size = chunk_size self._it = None @@ -521,22 +526,23 @@ def _iter(self): class ModelIterable(BaseIterable): """ Iterable over regular model-based querysets """ - def __init__(self, queryset, chunk_size, cache): - super().__init__(queryset, chunk_size) + def __init__(self, queryset, cache, chunk_size=None, sortkey=None): + super().__init__(queryset, chunk_size, sortkey) if cache is True: # auto-caching-mode # TODO: pick up fields from only()? cache = FKCache(self.queryset.model, fk_fields=None) self.cache = cache - self.queryset = self.queryset.order_by('pk') + self.queryset = self.queryset.order_by(self.sortkey) def _iter(self): qs = self.queryset chunk_size = self.chunk_size + sortkey = self.sortkey cache = self.cache - last_pk = 0 + last_sk = 0 while True: - chunk = qs.filter(pk__gt=last_pk)[:chunk_size] + chunk = qs.filter(**{sortkey + '__gt': last_sk})[:chunk_size] if cache: # update in-place @@ -548,7 +554,7 @@ def _iter(self): # no further results break - last_pk = chunk[len(chunk) - 1].pk + last_sk = getattr(chunk[len(chunk) - 1], sortkey) # so debug() can display this if we're closed self._final_iter_vars = pformat(locals()) @@ -557,21 +563,21 @@ def _iter(self): class ValuesListIterable(BaseIterable): """ Iterable over values-listing querysets """ - def __init__(self, queryset, chunk_size, cache): - super().__init__(queryset, chunk_size) + def __init__(self, queryset, cache, chunk_size=None, sortkey=None): + super().__init__(queryset, chunk_size, sortkey) qs = self.queryset outnames = qs.get_output_field_names() - hide_pk = False + hide_sk = False try: - pk_pos = outnames.index(qs.model._meta.pk.name) + sk_pos = outnames.index(self.sortkey) except ValueError: - if 'pk' in outnames: - pk_pos = outnames.index('pk') + if self.sortkey in outnames: + sk_pos = outnames.index(self.sortkey) else: # we have to get PK to make chunking work - qs = qs.values_list('pk', *outnames) - pk_pos = 0 - hide_pk = True + qs = qs.values_list(self.sortkey, *outnames) + sk_pos = 0 + hide_sk = True if cache is True: # auto-caching-mode @@ -583,45 +589,46 @@ def __init__(self, queryset, chunk_size, cache): fk_fields.append(i) cache = FKCache(qs.model, fk_fields=fk_fields) - qs = qs.order_by('pk') + qs = qs.order_by(self.sortkey) self.queryset = qs self.cache = cache - self.pk_pos = pk_pos - self.hide_pk = hide_pk + self.sk_pos = sk_pos + self.hide_sk = hide_sk @staticmethod - def _rm_pk(row): - """ helper to remove PK from a list row """ - del row[0] # PK is always first elem if we have to remove it + def _rm_sk(row): + """ helper to remove sortkey from a list row """ + del row[0] # sortkey is always first elem if we have to remove it return row def _iter(self): qs = self.queryset cache = self.cache chunk_size = self.chunk_size - pk_pos = self.pk_pos - hide_pk = self.hide_pk - rm_pk = self._rm_pk + sortkey = self.sortkey + sk_pos = self.sk_pos + hide_sk = self.hide_sk + rm_sk = self._rm_sk - last_pk = 0 + last_sk = 0 while True: - chunk = qs.filter(pk__gt=last_pk)[:chunk_size] + chunk = qs.filter(**{sortkey + '__gt': last_sk})[:chunk_size] - # For non-empty chunk get last PK before they are removed. Must - # also avoid negative indexing in no-cache case where chunk is + # For non-empty chunk get last sortkey before they are removed. + # Must also avoid negative indexing in no-cache case where chunk is # queryset, so calculate last row via length. if chunk_length := len(chunk): - last_pk = chunk[chunk_length - 1][pk_pos] + last_sk = chunk[chunk_length - 1][sk_pos] if cache: # chunk is replaced, type is list now (was tuple) chunk = cache.update_values_list(chunk) - if hide_pk: - # rm PK from list - chunk = ((rm_pk(row) for row in chunk)) - elif hide_pk: - # rm PK from tuple (get new tuple via slicing) + if hide_sk: + # rm sortkey from list + chunk = ((rm_sk(row) for row in chunk)) + elif hide_sk: + # rm sortkey from tuple (get new tuple via slicing) chunk = ((row[slice(1, None)] for row in chunk)) yield from chunk @@ -1027,7 +1034,7 @@ def get_output_field_names(self): *self.query.annotation_select, ] - def iterate(self, chunk_size=None, cache=None): + def iterate(self, cache=None, chunk_size=None, sortkey=None): """ Alternative iterator implementation for large table data export @@ -1038,12 +1045,16 @@ def iterate(self, chunk_size=None, cache=None): An optional FKCache object to be used to populate FK related objects. If this is None or an empty dict then no such cache will be used. If True, then an FKCache will be used automatically. + sortkey: + Name of indexed field, of the scanned index. By default the + primary key is used. The wrong value here will be paid by bad + performance. """ if self._fields is None: # normal model-instance queryset - return ModelIterable(self, chunk_size, cache) + return ModelIterable(self, cache, chunk_size, sortkey) else: - return ValuesListIterable(self, chunk_size, cache) + return ValuesListIterable(self, cache, chunk_size, sortkey) def split_by_fk(self, fk_field, subquery): return ChainedQuerySet(self, fk_field, subquery)