Skip to content

Commit

Permalink
mibioa/query: Teach QuerySet.iterate() to return rows when indexes ar…
Browse files Browse the repository at this point in the history
…e not sorted by PK

This is part 2 of 3 of the ChainedQuerySet series.
  • Loading branch information
robert102 committed Nov 22, 2024
1 parent 8a38c1f commit 40cc2f8
Showing 1 changed file with 49 additions and 38 deletions.
87 changes: 49 additions & 38 deletions mibios/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,17 @@ class BaseIterable:
once. Results are always ordered by primary key.
"""
DEFAULT_CHUNK_SIZE = 20000
DEFAULT_SORT_KEY = 'pk'

def __init__(self, queryset, chunk_size=None):
def __init__(self, queryset, chunk_size=None, sortkey=None):
if chunk_size is None:
chunk_size = self.DEFAULT_CHUNK_SIZE
elif chunk_size <= 0:
raise ValueError('chunk size must be positive')
if sortkey is None:
self.sortkey = self.DEFAULT_SORT_KEY
else:
self.sortkey = sortkey
self.queryset = queryset
self.chunk_size = chunk_size
self._it = None
Expand Down Expand Up @@ -521,22 +526,23 @@ def _iter(self):

class ModelIterable(BaseIterable):
""" Iterable over regular model-based querysets """
def __init__(self, queryset, chunk_size, cache):
super().__init__(queryset, chunk_size)
def __init__(self, queryset, cache, chunk_size=None, sortkey=None):
super().__init__(queryset, chunk_size, sortkey)
if cache is True:
# auto-caching-mode
# TODO: pick up fields from only()?
cache = FKCache(self.queryset.model, fk_fields=None)
self.cache = cache
self.queryset = self.queryset.order_by('pk')
self.queryset = self.queryset.order_by(self.sortkey)

def _iter(self):
qs = self.queryset
chunk_size = self.chunk_size
sortkey = self.sortkey
cache = self.cache
last_pk = 0
last_sk = 0
while True:
chunk = qs.filter(pk__gt=last_pk)[:chunk_size]
chunk = qs.filter(**{sortkey + '__gt': last_sk})[:chunk_size]

if cache:
# update in-place
Expand All @@ -548,7 +554,7 @@ def _iter(self):
# no further results
break

last_pk = chunk[len(chunk) - 1].pk
last_sk = getattr(chunk[len(chunk) - 1], sortkey)

# so debug() can display this if we're closed
self._final_iter_vars = pformat(locals())
Expand All @@ -557,21 +563,21 @@ def _iter(self):
class ValuesListIterable(BaseIterable):
""" Iterable over values-listing querysets """

def __init__(self, queryset, chunk_size, cache):
super().__init__(queryset, chunk_size)
def __init__(self, queryset, cache, chunk_size=None, sortkey=None):
super().__init__(queryset, chunk_size, sortkey)
qs = self.queryset
outnames = qs.get_output_field_names()
hide_pk = False
hide_sk = False
try:
pk_pos = outnames.index(qs.model._meta.pk.name)
sk_pos = outnames.index(self.sortkey)
except ValueError:
if 'pk' in outnames:
pk_pos = outnames.index('pk')
if self.sortkey in outnames:
sk_pos = outnames.index(self.sortkey)
else:
# we have to get PK to make chunking work
qs = qs.values_list('pk', *outnames)
pk_pos = 0
hide_pk = True
qs = qs.values_list(self.sortkey, *outnames)
sk_pos = 0
hide_sk = True

if cache is True:
# auto-caching-mode
Expand All @@ -583,45 +589,46 @@ def __init__(self, queryset, chunk_size, cache):
fk_fields.append(i)
cache = FKCache(qs.model, fk_fields=fk_fields)

qs = qs.order_by('pk')
qs = qs.order_by(self.sortkey)

self.queryset = qs
self.cache = cache
self.pk_pos = pk_pos
self.hide_pk = hide_pk
self.sk_pos = sk_pos
self.hide_sk = hide_sk

@staticmethod
def _rm_pk(row):
""" helper to remove PK from a list row """
del row[0] # PK is always first elem if we have to remove it
def _rm_sk(row):
""" helper to remove sortkey from a list row """
del row[0] # sortkey is always first elem if we have to remove it
return row

def _iter(self):
qs = self.queryset
cache = self.cache
chunk_size = self.chunk_size
pk_pos = self.pk_pos
hide_pk = self.hide_pk
rm_pk = self._rm_pk
sortkey = self.sortkey
sk_pos = self.sk_pos
hide_sk = self.hide_sk
rm_sk = self._rm_sk

last_pk = 0
last_sk = 0
while True:
chunk = qs.filter(pk__gt=last_pk)[:chunk_size]
chunk = qs.filter(**{sortkey + '__gt': last_sk})[:chunk_size]

# For non-empty chunk get last PK before they are removed. Must
# also avoid negative indexing in no-cache case where chunk is
# For non-empty chunk get last sortkey before they are removed.
# Must also avoid negative indexing in no-cache case where chunk is
# queryset, so calculate last row via length.
if chunk_length := len(chunk):
last_pk = chunk[chunk_length - 1][pk_pos]
last_sk = chunk[chunk_length - 1][sk_pos]

if cache:
# chunk is replaced, type is list now (was tuple)
chunk = cache.update_values_list(chunk)
if hide_pk:
# rm PK from list
chunk = ((rm_pk(row) for row in chunk))
elif hide_pk:
# rm PK from tuple (get new tuple via slicing)
if hide_sk:
# rm sortkey from list
chunk = ((rm_sk(row) for row in chunk))
elif hide_sk:
# rm sortkey from tuple (get new tuple via slicing)
chunk = ((row[slice(1, None)] for row in chunk))

yield from chunk
Expand Down Expand Up @@ -1027,7 +1034,7 @@ def get_output_field_names(self):
*self.query.annotation_select,
]

def iterate(self, chunk_size=None, cache=None):
def iterate(self, cache=None, chunk_size=None, sortkey=None):
"""
Alternative iterator implementation for large table data export
Expand All @@ -1038,12 +1045,16 @@ def iterate(self, chunk_size=None, cache=None):
An optional FKCache object to be used to populate FK related
objects. If this is None or an empty dict then no such cache will
be used. If True, then an FKCache will be used automatically.
sortkey:
Name of indexed field, of the scanned index. By default the
primary key is used. The wrong value here will be paid by bad
performance.
"""
if self._fields is None:
# normal model-instance queryset
return ModelIterable(self, chunk_size, cache)
return ModelIterable(self, cache, chunk_size, sortkey)
else:
return ValuesListIterable(self, chunk_size, cache)
return ValuesListIterable(self, cache, chunk_size, sortkey)

def split_by_fk(self, fk_field, subquery):
return ChainedQuerySet(self, fk_field, subquery)
Expand Down

0 comments on commit 40cc2f8

Please sign in to comment.