From ccae6e0287ba6cf3c14f15a907b2106b11ef1fdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 2 Dec 2024 11:33:24 +0100 Subject: [PATCH] perf: optimize ResultSet decoding (#1244) * perf: optimize ResultSet decoding ResultSet decoding went through a long if-elif-else construct for every row and every column to determine how to decode that specific cell. This caused large result sets to see a significantly higher decoding time than necessary, as determining how to decode a column only needs to be determined once for the entire ResultSet. This change therefore collects the decoders once before starting to decode any rows. It does this by: 1. Iterating over the columns in the ResultSet and get a decoder for the specific type of that column. 2. Store those decoders as function references in an array. 3. Pick the appropriate function directly from this array each time a column needs to be decoded. Selecting and decoding a query result with 100 rows consisting of 24 columns (one for each supported data type) takes ~35-40ms without this change, and ~18-20ms with this change. The following benchmarks were executed locally against an in-mem mock Spanner server running in Java. The latter was chosen because: 1. We have a random ResultSet generator in Java that can be used for this. 2. Having the mock Spanner server running in a separate process and in another programming language reduces the chance that the mock server itself has an impact on the differences that we see between the different runs. Results without this change (100 iterations): ``` Elapsed: 43.5490608215332 ms Elapsed: 39.53838348388672 ms Elapsed: 38.68389129638672 ms Elapsed: 38.26117515563965 ms Elapsed: 38.28692436218262 ms Elapsed: 38.12098503112793 ms Elapsed: 39.016008377075195 ms Elapsed: 38.15174102783203 ms Elapsed: 38.3448600769043 ms Elapsed: 38.00082206726074 ms Elapsed: 38.0091667175293 ms Elapsed: 38.02800178527832 ms Elapsed: 38.03110122680664 ms Elapsed: 38.42306137084961 ms Elapsed: 38.535356521606445 ms Elapsed: 38.86699676513672 ms Elapsed: 38.702964782714844 ms Elapsed: 38.881778717041016 ms Elapsed: 38.08116912841797 ms Elapsed: 38.084983825683594 ms Elapsed: 38.04278373718262 ms Elapsed: 38.74492645263672 ms Elapsed: 38.57111930847168 ms Elapsed: 38.17009925842285 ms Elapsed: 38.64407539367676 ms Elapsed: 38.00559043884277 ms Elapsed: 38.06161880493164 ms Elapsed: 38.233280181884766 ms Elapsed: 38.48695755004883 ms Elapsed: 38.71011734008789 ms Elapsed: 37.92428970336914 ms Elapsed: 38.8491153717041 ms Elapsed: 38.90705108642578 ms Elapsed: 38.20919990539551 ms Elapsed: 38.07401657104492 ms Elapsed: 38.30099105834961 ms Elapsed: 38.07377815246582 ms Elapsed: 38.61117362976074 ms Elapsed: 39.58392143249512 ms Elapsed: 39.69216346740723 ms Elapsed: 38.27810287475586 ms Elapsed: 37.88185119628906 ms Elapsed: 38.763999938964844 ms Elapsed: 39.05320167541504 ms Elapsed: 38.82408142089844 ms Elapsed: 38.47217559814453 ms Elapsed: 38.024187088012695 ms Elapsed: 38.07687759399414 ms Elapsed: 38.11931610107422 ms Elapsed: 37.9488468170166 ms Elapsed: 38.04421424865723 ms Elapsed: 38.57421875 ms Elapsed: 39.543867111206055 ms Elapsed: 38.4981632232666 ms Elapsed: 37.89806365966797 ms Elapsed: 38.0861759185791 ms Elapsed: 38.72990608215332 ms Elapsed: 38.47217559814453 ms Elapsed: 38.71774673461914 ms Elapsed: 38.27619552612305 ms Elapsed: 38.08403015136719 ms Elapsed: 38.6350154876709 ms Elapsed: 38.03229331970215 ms Elapsed: 39.01100158691406 ms Elapsed: 38.4981632232666 ms Elapsed: 38.25807571411133 ms Elapsed: 38.59400749206543 ms Elapsed: 38.83624076843262 ms Elapsed: 38.584232330322266 ms Elapsed: 39.54625129699707 ms Elapsed: 38.268089294433594 ms Elapsed: 39.3218994140625 ms Elapsed: 37.9948616027832 ms Elapsed: 38.05804252624512 ms Elapsed: 38.88821601867676 ms Elapsed: 38.08021545410156 ms Elapsed: 38.22588920593262 ms Elapsed: 37.97507286071777 ms Elapsed: 38.03110122680664 ms Elapsed: 37.91308403015137 ms Elapsed: 38.00201416015625 ms Elapsed: 38.529157638549805 ms Elapsed: 38.44308853149414 ms Elapsed: 38.87534141540527 ms Elapsed: 38.85912895202637 ms Elapsed: 38.48695755004883 ms Elapsed: 38.41686248779297 ms Elapsed: 38.10882568359375 ms Elapsed: 37.98198699951172 ms Elapsed: 38.50507736206055 ms Elapsed: 38.16986083984375 ms Elapsed: 38.07711601257324 ms Elapsed: 37.92715072631836 ms Elapsed: 37.93692588806152 ms Elapsed: 38.04588317871094 ms Elapsed: 38.62190246582031 ms Elapsed: 38.5129451751709 ms Elapsed: 37.960052490234375 ms Elapsed: 37.99295425415039 ms Elapsed: 38.45930099487305 ms ``` Results with this change: ``` Elapsed: 21.09503746032715 ms Elapsed: 17.00878143310547 ms Elapsed: 17.43626594543457 ms Elapsed: 16.201019287109375 ms Elapsed: 16.66712760925293 ms Elapsed: 15.926837921142578 ms Elapsed: 16.408205032348633 ms Elapsed: 16.13783836364746 ms Elapsed: 16.27206802368164 ms Elapsed: 17.15087890625 ms Elapsed: 16.06607437133789 ms Elapsed: 16.852855682373047 ms Elapsed: 23.713111877441406 ms Elapsed: 17.20905303955078 ms Elapsed: 16.60609245300293 ms Elapsed: 16.30997657775879 ms Elapsed: 15.933990478515625 ms Elapsed: 15.688180923461914 ms Elapsed: 16.228914260864258 ms Elapsed: 16.252994537353516 ms Elapsed: 16.33000373840332 ms Elapsed: 15.842676162719727 ms Elapsed: 16.328096389770508 ms Elapsed: 16.4949893951416 ms Elapsed: 16.47210121154785 ms Elapsed: 16.674041748046875 ms Elapsed: 15.768766403198242 ms Elapsed: 16.48569107055664 ms Elapsed: 15.876054763793945 ms Elapsed: 16.852140426635742 ms Elapsed: 16.035079956054688 ms Elapsed: 16.407012939453125 ms Elapsed: 15.882015228271484 ms Elapsed: 16.71886444091797 ms Elapsed: 15.86294174194336 ms Elapsed: 16.566038131713867 ms Elapsed: 15.904903411865234 ms Elapsed: 16.289234161376953 ms Elapsed: 16.14999771118164 ms Elapsed: 16.31784439086914 ms Elapsed: 16.106843948364258 ms Elapsed: 16.581058502197266 ms Elapsed: 16.435861587524414 ms Elapsed: 15.904903411865234 ms Elapsed: 16.408205032348633 ms Elapsed: 16.062021255493164 ms Elapsed: 16.256093978881836 ms Elapsed: 15.87367057800293 ms Elapsed: 16.23702049255371 ms Elapsed: 16.745805740356445 ms Elapsed: 15.92707633972168 ms Elapsed: 16.142845153808594 ms Elapsed: 16.492843627929688 ms Elapsed: 21.553754806518555 ms Elapsed: 17.05002784729004 ms Elapsed: 16.932964324951172 ms Elapsed: 16.810894012451172 ms Elapsed: 16.577720642089844 ms Elapsed: 15.714168548583984 ms Elapsed: 16.2351131439209 ms Elapsed: 16.072988510131836 ms Elapsed: 16.038894653320312 ms Elapsed: 16.055822372436523 ms Elapsed: 16.378164291381836 ms Elapsed: 15.806913375854492 ms Elapsed: 15.5792236328125 ms Elapsed: 15.954732894897461 ms Elapsed: 15.566825866699219 ms Elapsed: 15.707969665527344 ms Elapsed: 15.514135360717773 ms Elapsed: 15.43116569519043 ms Elapsed: 15.332937240600586 ms Elapsed: 15.470027923583984 ms Elapsed: 15.269756317138672 ms Elapsed: 15.250921249389648 ms Elapsed: 15.47694206237793 ms Elapsed: 15.306949615478516 ms Elapsed: 15.72728157043457 ms Elapsed: 15.938043594360352 ms Elapsed: 16.324996948242188 ms Elapsed: 16.198158264160156 ms Elapsed: 15.982627868652344 ms Elapsed: 16.308069229125977 ms Elapsed: 17.843246459960938 ms Elapsed: 15.820026397705078 ms Elapsed: 16.428232192993164 ms Elapsed: 15.978097915649414 ms Elapsed: 16.347885131835938 ms Elapsed: 16.026020050048828 ms Elapsed: 16.362905502319336 ms Elapsed: 16.900062561035156 ms Elapsed: 17.3337459564209 ms Elapsed: 17.65608787536621 ms Elapsed: 20.101070404052734 ms Elapsed: 18.137216567993164 ms Elapsed: 16.952991485595703 ms Elapsed: 16.7691707611084 ms Elapsed: 16.71290397644043 ms Elapsed: 16.3421630859375 ms Elapsed: 16.36195182800293 ms ``` * chore: remove unused field_types variable --- google/cloud/spanner_v1/_helpers.py | 178 ++++++++++++++++++++-------- google/cloud/spanner_v1/snapshot.py | 54 ++++++++- google/cloud/spanner_v1/streamed.py | 65 ++++++++-- tests/system/test_session_api.py | 42 ++++--- 4 files changed, 260 insertions(+), 79 deletions(-) diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index a1d6a60cb0..a4d66fc20f 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -266,66 +266,69 @@ def _parse_value_pb(value_pb, field_type, field_name, column_info=None): :returns: value extracted from value_pb :raises ValueError: if unknown type is passed """ + decoder = _get_type_decoder(field_type, field_name, column_info) + return _parse_nullable(value_pb, decoder) + + +def _get_type_decoder(field_type, field_name, column_info=None): + """Returns a function that converts a Value protobuf to cell data. + + :type field_type: :class:`~google.cloud.spanner_v1.types.Type` + :param field_type: type code for the value + + :type field_name: str + :param field_name: column name + + :type column_info: dict + :param column_info: (Optional) dict of column name and column information. + An object where column names as keys and custom objects as corresponding + values for deserialization. It's specifically useful for data types like + protobuf where deserialization logic is on user-specific code. When provided, + the custom object enables deserialization of backend-received column data. + If not provided, data remains serialized as bytes for Proto Messages and + integer for Proto Enums. + + :rtype: a function that takes a single protobuf value as an input argument + :returns: a function that can be used to extract a value from a protobuf value + :raises ValueError: if unknown type is passed + """ + type_code = field_type.code - if value_pb.HasField("null_value"): - return None if type_code == TypeCode.STRING: - return value_pb.string_value + return _parse_string elif type_code == TypeCode.BYTES: - return value_pb.string_value.encode("utf8") + return _parse_bytes elif type_code == TypeCode.BOOL: - return value_pb.bool_value + return _parse_bool elif type_code == TypeCode.INT64: - return int(value_pb.string_value) + return _parse_int64 elif type_code == TypeCode.FLOAT64: - if value_pb.HasField("string_value"): - return float(value_pb.string_value) - else: - return value_pb.number_value + return _parse_float elif type_code == TypeCode.FLOAT32: - if value_pb.HasField("string_value"): - return float(value_pb.string_value) - else: - return value_pb.number_value + return _parse_float elif type_code == TypeCode.DATE: - return _date_from_iso8601_date(value_pb.string_value) + return _parse_date elif type_code == TypeCode.TIMESTAMP: - DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds - return DatetimeWithNanoseconds.from_rfc3339(value_pb.string_value) - elif type_code == TypeCode.ARRAY: - return [ - _parse_value_pb( - item_pb, field_type.array_element_type, field_name, column_info - ) - for item_pb in value_pb.list_value.values - ] - elif type_code == TypeCode.STRUCT: - return [ - _parse_value_pb( - item_pb, field_type.struct_type.fields[i].type_, field_name, column_info - ) - for (i, item_pb) in enumerate(value_pb.list_value.values) - ] + return _parse_timestamp elif type_code == TypeCode.NUMERIC: - return decimal.Decimal(value_pb.string_value) + return _parse_numeric elif type_code == TypeCode.JSON: - return JsonObject.from_str(value_pb.string_value) + return _parse_json elif type_code == TypeCode.PROTO: - bytes_value = base64.b64decode(value_pb.string_value) - if column_info is not None and column_info.get(field_name) is not None: - default_proto_message = column_info.get(field_name) - if isinstance(default_proto_message, Message): - proto_message = type(default_proto_message)() - proto_message.ParseFromString(bytes_value) - return proto_message - return bytes_value + return lambda value_pb: _parse_proto(value_pb, column_info, field_name) elif type_code == TypeCode.ENUM: - int_value = int(value_pb.string_value) - if column_info is not None and column_info.get(field_name) is not None: - proto_enum = column_info.get(field_name) - if isinstance(proto_enum, EnumTypeWrapper): - return proto_enum.Name(int_value) - return int_value + return lambda value_pb: _parse_proto_enum(value_pb, column_info, field_name) + elif type_code == TypeCode.ARRAY: + element_decoder = _get_type_decoder( + field_type.array_element_type, field_name, column_info + ) + return lambda value_pb: _parse_array(value_pb, element_decoder) + elif type_code == TypeCode.STRUCT: + element_decoders = [ + _get_type_decoder(item_field.type_, field_name, column_info) + for item_field in field_type.struct_type.fields + ] + return lambda value_pb: _parse_struct(value_pb, element_decoders) else: raise ValueError("Unknown type: %s" % (field_type,)) @@ -351,6 +354,87 @@ def _parse_list_value_pbs(rows, row_type): return result +def _parse_string(value_pb) -> str: + return value_pb.string_value + + +def _parse_bytes(value_pb): + return value_pb.string_value.encode("utf8") + + +def _parse_bool(value_pb) -> bool: + return value_pb.bool_value + + +def _parse_int64(value_pb) -> int: + return int(value_pb.string_value) + + +def _parse_float(value_pb) -> float: + if value_pb.HasField("string_value"): + return float(value_pb.string_value) + else: + return value_pb.number_value + + +def _parse_date(value_pb): + return _date_from_iso8601_date(value_pb.string_value) + + +def _parse_timestamp(value_pb): + DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds + return DatetimeWithNanoseconds.from_rfc3339(value_pb.string_value) + + +def _parse_numeric(value_pb): + return decimal.Decimal(value_pb.string_value) + + +def _parse_json(value_pb): + return JsonObject.from_str(value_pb.string_value) + + +def _parse_proto(value_pb, column_info, field_name): + bytes_value = base64.b64decode(value_pb.string_value) + if column_info is not None and column_info.get(field_name) is not None: + default_proto_message = column_info.get(field_name) + if isinstance(default_proto_message, Message): + proto_message = type(default_proto_message)() + proto_message.ParseFromString(bytes_value) + return proto_message + return bytes_value + + +def _parse_proto_enum(value_pb, column_info, field_name): + int_value = int(value_pb.string_value) + if column_info is not None and column_info.get(field_name) is not None: + proto_enum = column_info.get(field_name) + if isinstance(proto_enum, EnumTypeWrapper): + return proto_enum.Name(int_value) + return int_value + + +def _parse_array(value_pb, element_decoder) -> []: + return [ + _parse_nullable(item_pb, element_decoder) + for item_pb in value_pb.list_value.values + ] + + +def _parse_struct(value_pb, element_decoders): + return [ + _parse_nullable(item_pb, element_decoders[i]) + for (i, item_pb) in enumerate(value_pb.list_value.values) + ] + + +def _parse_nullable(value_pb, decoder): + if value_pb.HasField("null_value"): + return None + else: + return decoder(value_pb) + + class _SessionWrapper(object): """Base class for objects wrapping a session. diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index a02776b27c..143e17c503 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -192,6 +192,7 @@ def read( retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, column_info=None, + lazy_decode=False, ): """Perform a ``StreamingRead`` API request for rows in a table. @@ -255,6 +256,18 @@ def read( If not provided, data remains serialized as bytes for Proto Messages and integer for Proto Enums. + :type lazy_decode: bool + :param lazy_decode: + (Optional) If this argument is set to ``true``, the iterator + returns the underlying protobuf values instead of decoded Python + objects. This reduces the time that is needed to iterate through + large result sets. The application is responsible for decoding + the data that is needed. The returned row iterator contains two + functions that can be used for this. ``iterator.decode_row(row)`` + decodes all the columns in the given row to an array of Python + objects. ``iterator.decode_column(row, column_index)`` decodes one + specific column in the given row. + :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet` :returns: a result set instance which can be used to consume rows. @@ -330,10 +343,15 @@ def read( self._read_request_count += 1 if self._multi_use: return StreamedResultSet( - iterator, source=self, column_info=column_info + iterator, + source=self, + column_info=column_info, + lazy_decode=lazy_decode, ) else: - return StreamedResultSet(iterator, column_info=column_info) + return StreamedResultSet( + iterator, column_info=column_info, lazy_decode=lazy_decode + ) else: iterator = _restart_on_unavailable( restart, @@ -348,9 +366,13 @@ def read( self._read_request_count += 1 if self._multi_use: - return StreamedResultSet(iterator, source=self, column_info=column_info) + return StreamedResultSet( + iterator, source=self, column_info=column_info, lazy_decode=lazy_decode + ) else: - return StreamedResultSet(iterator, column_info=column_info) + return StreamedResultSet( + iterator, column_info=column_info, lazy_decode=lazy_decode + ) def execute_sql( self, @@ -366,6 +388,7 @@ def execute_sql( data_boost_enabled=False, directed_read_options=None, column_info=None, + lazy_decode=False, ): """Perform an ``ExecuteStreamingSql`` API request. @@ -438,6 +461,18 @@ def execute_sql( If not provided, data remains serialized as bytes for Proto Messages and integer for Proto Enums. + :type lazy_decode: bool + :param lazy_decode: + (Optional) If this argument is set to ``true``, the iterator + returns the underlying protobuf values instead of decoded Python + objects. This reduces the time that is needed to iterate through + large result sets. The application is responsible for decoding + the data that is needed. The returned row iterator contains two + functions that can be used for this. ``iterator.decode_row(row)`` + decodes all the columns in the given row to an array of Python + objects. ``iterator.decode_column(row, column_index)`` decodes one + specific column in the given row. + :raises ValueError: for reuse of single-use snapshots, or if a transaction ID is already pending for multiple-use snapshots. @@ -517,6 +552,7 @@ def execute_sql( trace_attributes, column_info, observability_options, + lazy_decode=lazy_decode, ) else: return self._get_streamed_result_set( @@ -525,6 +561,7 @@ def execute_sql( trace_attributes, column_info, observability_options, + lazy_decode=lazy_decode, ) def _get_streamed_result_set( @@ -534,6 +571,7 @@ def _get_streamed_result_set( trace_attributes, column_info, observability_options=None, + lazy_decode=False, ): iterator = _restart_on_unavailable( restart, @@ -548,9 +586,13 @@ def _get_streamed_result_set( self._execute_sql_count += 1 if self._multi_use: - return StreamedResultSet(iterator, source=self, column_info=column_info) + return StreamedResultSet( + iterator, source=self, column_info=column_info, lazy_decode=lazy_decode + ) else: - return StreamedResultSet(iterator, column_info=column_info) + return StreamedResultSet( + iterator, column_info=column_info, lazy_decode=lazy_decode + ) def partition_read( self, diff --git a/google/cloud/spanner_v1/streamed.py b/google/cloud/spanner_v1/streamed.py index 89bde0e334..7c067e97b6 100644 --- a/google/cloud/spanner_v1/streamed.py +++ b/google/cloud/spanner_v1/streamed.py @@ -21,7 +21,7 @@ from google.cloud.spanner_v1 import PartialResultSet from google.cloud.spanner_v1 import ResultSetMetadata from google.cloud.spanner_v1 import TypeCode -from google.cloud.spanner_v1._helpers import _parse_value_pb +from google.cloud.spanner_v1._helpers import _get_type_decoder, _parse_nullable class StreamedResultSet(object): @@ -37,7 +37,13 @@ class StreamedResultSet(object): :param source: Snapshot from which the result set was fetched. """ - def __init__(self, response_iterator, source=None, column_info=None): + def __init__( + self, + response_iterator, + source=None, + column_info=None, + lazy_decode: bool = False, + ): self._response_iterator = response_iterator self._rows = [] # Fully-processed rows self._metadata = None # Until set from first PRS @@ -46,6 +52,8 @@ def __init__(self, response_iterator, source=None, column_info=None): self._pending_chunk = None # Incomplete value self._source = source # Source snapshot self._column_info = column_info # Column information + self._field_decoders = None + self._lazy_decode = lazy_decode # Return protobuf values @property def fields(self): @@ -77,6 +85,17 @@ def stats(self): """ return self._stats + @property + def _decoders(self): + if self._field_decoders is None: + if self._metadata is None: + raise ValueError("iterator not started") + self._field_decoders = [ + _get_type_decoder(field.type_, field.name, self._column_info) + for field in self.fields + ] + return self._field_decoders + def _merge_chunk(self, value): """Merge pending chunk with next value. @@ -99,16 +118,14 @@ def _merge_values(self, values): :type values: list of :class:`~google.protobuf.struct_pb2.Value` :param values: non-chunked values from partial result set. """ - field_types = [field.type_ for field in self.fields] - field_names = [field.name for field in self.fields] - width = len(field_types) + decoders = self._decoders + width = len(self.fields) index = len(self._current_row) for value in values: - self._current_row.append( - _parse_value_pb( - value, field_types[index], field_names[index], self._column_info - ) - ) + if self._lazy_decode: + self._current_row.append(value) + else: + self._current_row.append(_parse_nullable(value, decoders[index])) index += 1 if index == width: self._rows.append(self._current_row) @@ -152,6 +169,34 @@ def __iter__(self): except StopIteration: return + def decode_row(self, row: []) -> []: + """Decodes a row from protobuf values to Python objects. This function + should only be called for result sets that use ``lazy_decoding=True``. + The array that is returned by this function is the same as the array + that would have been returned by the rows iterator if ``lazy_decoding=False``. + + :returns: an array containing the decoded values of all the columns in the given row + """ + if not hasattr(row, "__len__"): + raise TypeError("row", "row must be an array of protobuf values") + decoders = self._decoders + return [ + _parse_nullable(row[index], decoders[index]) for index in range(len(row)) + ] + + def decode_column(self, row: [], column_index: int): + """Decodes a column from a protobuf value to a Python object. This function + should only be called for result sets that use ``lazy_decoding=True``. + The object that is returned by this function is the same as the object + that would have been returned by the rows iterator if ``lazy_decoding=False``. + + :returns: the decoded column value + """ + if not hasattr(row, "__len__"): + raise TypeError("row", "row must be an array of protobuf values") + decoders = self._decoders + return _parse_nullable(row[column_index], decoders[column_index]) + def one(self): """Return exactly one result, or raise an exception. diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 5322527d12..b7337cb258 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -2018,17 +2018,20 @@ def test_execute_sql_w_manual_consume(sessions_database): row_count = 3000 committed = _set_up_table(sessions_database, row_count) - with sessions_database.snapshot(read_timestamp=committed) as snapshot: - streamed = snapshot.execute_sql(sd.SQL) + for lazy_decode in [False, True]: + with sessions_database.snapshot(read_timestamp=committed) as snapshot: + streamed = snapshot.execute_sql(sd.SQL, lazy_decode=lazy_decode) - keyset = spanner_v1.KeySet(all_=True) + keyset = spanner_v1.KeySet(all_=True) - with sessions_database.snapshot(read_timestamp=committed) as snapshot: - rows = list(snapshot.read(sd.TABLE, sd.COLUMNS, keyset)) + with sessions_database.snapshot(read_timestamp=committed) as snapshot: + rows = list( + snapshot.read(sd.TABLE, sd.COLUMNS, keyset, lazy_decode=lazy_decode) + ) - assert list(streamed) == rows - assert streamed._current_row == [] - assert streamed._pending_chunk is None + assert list(streamed) == rows + assert streamed._current_row == [] + assert streamed._pending_chunk is None def test_execute_sql_w_to_dict_list(sessions_database): @@ -2057,16 +2060,23 @@ def _check_sql_results( if order and "ORDER" not in sql: sql += " ORDER BY pkey" - with database.snapshot() as snapshot: - rows = list( - snapshot.execute_sql( - sql, params=params, param_types=param_types, column_info=column_info + for lazy_decode in [False, True]: + with database.snapshot() as snapshot: + iterator = snapshot.execute_sql( + sql, + params=params, + param_types=param_types, + column_info=column_info, + lazy_decode=lazy_decode, ) - ) + rows = list(iterator) + if lazy_decode: + for index, row in enumerate(rows): + rows[index] = iterator.decode_row(row) - _sample_data._check_rows_data( - rows, expected=expected, recurse_into_lists=recurse_into_lists - ) + _sample_data._check_rows_data( + rows, expected=expected, recurse_into_lists=recurse_into_lists + ) def test_multiuse_snapshot_execute_sql_isolation_strong(sessions_database):