Skip to content

Commit

Permalink
mibios/query: Add ChainedQuerySet container/wrapper
Browse files Browse the repository at this point in the history
The ChainedQuerySet hooks into the QuerySet.iterate() machinery via the
new QuerySet.split_by_fk() method to be used as alternative to a
filter(relation__in=subquery) when the subquery prevents an index scan
(the first application will be data export of ReadAbundance for a
selection of samples.)

This is part 1 of 3 of series of commits to further address slow data
export of very large tables.
  • Loading branch information
robert102 committed Nov 22, 2024
1 parent d9dbc4e commit 8a38c1f
Showing 1 changed file with 108 additions and 0 deletions.
108 changes: 108 additions & 0 deletions mibios/query.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import OrderedDict
from decimal import Decimal
from inspect import getgeneratorstate, getgeneratorlocals
from itertools import chain
import json
from logging import getLogger
from operator import attrgetter, itemgetter
Expand Down Expand Up @@ -1043,3 +1044,110 @@ def iterate(self, chunk_size=None, cache=None):
return ModelIterable(self, chunk_size, cache)
else:
return ValuesListIterable(self, chunk_size, cache)

def split_by_fk(self, fk_field, subquery):
return ChainedQuerySet(self, fk_field, subquery)


class ChainedQuerySet:
"""
Chain querysets, split along relation to avoid WHERE w/subquery.
This is a wrapper/container for querysets. It replaces a Queryset that had
filter(relation__in=subquery) applied with a chain of querysets with
filter(relation__pk=a_pk) for each object in the subquery. The resulting
objects can with limitations be used like the original queryset.
It is primarily meant to have iterate() applied further splitting the
queries in order to take advantange of indexes and avoiding sequence scan
on very large tables..
"""
def __init__(self, queryset, fk_field, subqueryset):
model = queryset.model
if isinstance(fk_field, str):
fk_field = model._meta.get_field(fk_field)
else:
try:
_fk_field = model._meta.get_field(fk_field.name)
except FieldDoesNotExist:
raise ValueError('field does not belong to model of queryset')
else:
if _fk_field is not fk_field:
raise ValueError('not a field of model (but name match?)')
if not fk_field.many_to_one:
raise ValueError('only ForeignKey fields are admitted here')

if fk_field.related_model is not subqueryset.model:
raise ValueError('related model does not match subquery\'s model')

self.model = model
self.base_qs = queryset
self.fk_field = fk_field
self.iterate_sortkey = None
self.subqueryset = subqueryset

def _clone(self):
"""
Make a copy of us ready for other operations on our querysets
"""
obj = self.__class__(
self.base_qs._chain(),
self.fk_field,
self.subqueryset._chain(),
)
obj.iterate_sortkey = self.iterate_sortkey
return obj

def _apply_queryset_method(self, meth, *args, **kwargs):
"""
Apply a queryset method to all members of the chain.
meth: str, name of queryset method
The args and kwargs are passed to the method. Only methods that also
return a QuerySet should be used, else the result is unlikely to do any
good. Returns a new instance.
"""
obj = self._clone()
qs_meth = getattr(obj.base_qs, meth)
obj.base_qs = qs_meth(*args, **kwargs)
return obj

def get_split_querysets(self):
"""
Generate the queryset iterables.
"""
for pk in self.subqueryset.values_list('pk', flat=True):
yield self.base_qs.filter(**{self.fk_field.attname: pk})

# Below we implement a few of the usual queryset methods

def iterate(self, **kwargs):
"""
Chains QuerySet.interate() calls along split querysets
kwargs are passed to each Queryset.iterate().
"""
kwargs.setdefault('sortkey', self.iterate_sortkey)
it = ((qs.iterate(**kwargs) for qs in self.get_split_querysets()))
return chain.from_iterable(it)

def values_list(self, *args, **kwargs):
return self._apply_queryset_method('values_list', *args, **kwargs)

def prefetch_related(self, *args, **kwargs):
return self._apply_queryset_method('prefetch_related', *args, **kwargs)

def count(self):
"""
Not implemented but declared here to make class pass django_table2's
TableQuerySetData.validate().
"""
raise NotImplementedError

def order_by(self, *field_names):
"""
Not implemented but declared here to make class pass django_table2's
TableQuerySetData.validate().
"""
raise NotImplementedError

0 comments on commit 8a38c1f

Please sign in to comment.