Skip to content

Commit

Permalink
perf: optimize ResultSet decoding (#1244)
Browse files Browse the repository at this point in the history
* perf: optimize ResultSet decoding

ResultSet decoding went through a long if-elif-else construct for
every row and every column to determine how to decode that specific
cell. This caused large result sets to see a significantly higher
decoding time than necessary, as determining how to decode a column
only needs to be determined once for the entire ResultSet.

This change therefore collects the decoders once before starting to
decode any rows. It does this by:
1. Iterating over the columns in the ResultSet and get a decoder
   for the specific type of that column.
2. Store those decoders as function references in an array.
3. Pick the appropriate function directly from this array each time
   a column needs to be decoded.

Selecting and decoding a query result with 100 rows consisting of
24 columns (one for each supported data type) takes ~35-40ms without
this change, and ~18-20ms with this change. The following benchmarks
were executed locally against an in-mem mock Spanner server running
in Java. The latter was chosen because:
1. We have a random ResultSet generator in Java that can be used for
   this.
2. Having the mock Spanner server running in a separate process and
   in another programming language reduces the chance that the mock
   server itself has an impact on the differences that we see between
   the different runs.

Results without this change (100 iterations):

```
Elapsed:  43.5490608215332 ms
Elapsed:  39.53838348388672 ms
Elapsed:  38.68389129638672 ms
Elapsed:  38.26117515563965 ms
Elapsed:  38.28692436218262 ms
Elapsed:  38.12098503112793 ms
Elapsed:  39.016008377075195 ms
Elapsed:  38.15174102783203 ms
Elapsed:  38.3448600769043 ms
Elapsed:  38.00082206726074 ms
Elapsed:  38.0091667175293 ms
Elapsed:  38.02800178527832 ms
Elapsed:  38.03110122680664 ms
Elapsed:  38.42306137084961 ms
Elapsed:  38.535356521606445 ms
Elapsed:  38.86699676513672 ms
Elapsed:  38.702964782714844 ms
Elapsed:  38.881778717041016 ms
Elapsed:  38.08116912841797 ms
Elapsed:  38.084983825683594 ms
Elapsed:  38.04278373718262 ms
Elapsed:  38.74492645263672 ms
Elapsed:  38.57111930847168 ms
Elapsed:  38.17009925842285 ms
Elapsed:  38.64407539367676 ms
Elapsed:  38.00559043884277 ms
Elapsed:  38.06161880493164 ms
Elapsed:  38.233280181884766 ms
Elapsed:  38.48695755004883 ms
Elapsed:  38.71011734008789 ms
Elapsed:  37.92428970336914 ms
Elapsed:  38.8491153717041 ms
Elapsed:  38.90705108642578 ms
Elapsed:  38.20919990539551 ms
Elapsed:  38.07401657104492 ms
Elapsed:  38.30099105834961 ms
Elapsed:  38.07377815246582 ms
Elapsed:  38.61117362976074 ms
Elapsed:  39.58392143249512 ms
Elapsed:  39.69216346740723 ms
Elapsed:  38.27810287475586 ms
Elapsed:  37.88185119628906 ms
Elapsed:  38.763999938964844 ms
Elapsed:  39.05320167541504 ms
Elapsed:  38.82408142089844 ms
Elapsed:  38.47217559814453 ms
Elapsed:  38.024187088012695 ms
Elapsed:  38.07687759399414 ms
Elapsed:  38.11931610107422 ms
Elapsed:  37.9488468170166 ms
Elapsed:  38.04421424865723 ms
Elapsed:  38.57421875 ms
Elapsed:  39.543867111206055 ms
Elapsed:  38.4981632232666 ms
Elapsed:  37.89806365966797 ms
Elapsed:  38.0861759185791 ms
Elapsed:  38.72990608215332 ms
Elapsed:  38.47217559814453 ms
Elapsed:  38.71774673461914 ms
Elapsed:  38.27619552612305 ms
Elapsed:  38.08403015136719 ms
Elapsed:  38.6350154876709 ms
Elapsed:  38.03229331970215 ms
Elapsed:  39.01100158691406 ms
Elapsed:  38.4981632232666 ms
Elapsed:  38.25807571411133 ms
Elapsed:  38.59400749206543 ms
Elapsed:  38.83624076843262 ms
Elapsed:  38.584232330322266 ms
Elapsed:  39.54625129699707 ms
Elapsed:  38.268089294433594 ms
Elapsed:  39.3218994140625 ms
Elapsed:  37.9948616027832 ms
Elapsed:  38.05804252624512 ms
Elapsed:  38.88821601867676 ms
Elapsed:  38.08021545410156 ms
Elapsed:  38.22588920593262 ms
Elapsed:  37.97507286071777 ms
Elapsed:  38.03110122680664 ms
Elapsed:  37.91308403015137 ms
Elapsed:  38.00201416015625 ms
Elapsed:  38.529157638549805 ms
Elapsed:  38.44308853149414 ms
Elapsed:  38.87534141540527 ms
Elapsed:  38.85912895202637 ms
Elapsed:  38.48695755004883 ms
Elapsed:  38.41686248779297 ms
Elapsed:  38.10882568359375 ms
Elapsed:  37.98198699951172 ms
Elapsed:  38.50507736206055 ms
Elapsed:  38.16986083984375 ms
Elapsed:  38.07711601257324 ms
Elapsed:  37.92715072631836 ms
Elapsed:  37.93692588806152 ms
Elapsed:  38.04588317871094 ms
Elapsed:  38.62190246582031 ms
Elapsed:  38.5129451751709 ms
Elapsed:  37.960052490234375 ms
Elapsed:  37.99295425415039 ms
Elapsed:  38.45930099487305 ms

```

Results with this change:

```
Elapsed:  21.09503746032715 ms
Elapsed:  17.00878143310547 ms
Elapsed:  17.43626594543457 ms
Elapsed:  16.201019287109375 ms
Elapsed:  16.66712760925293 ms
Elapsed:  15.926837921142578 ms
Elapsed:  16.408205032348633 ms
Elapsed:  16.13783836364746 ms
Elapsed:  16.27206802368164 ms
Elapsed:  17.15087890625 ms
Elapsed:  16.06607437133789 ms
Elapsed:  16.852855682373047 ms
Elapsed:  23.713111877441406 ms
Elapsed:  17.20905303955078 ms
Elapsed:  16.60609245300293 ms
Elapsed:  16.30997657775879 ms
Elapsed:  15.933990478515625 ms
Elapsed:  15.688180923461914 ms
Elapsed:  16.228914260864258 ms
Elapsed:  16.252994537353516 ms
Elapsed:  16.33000373840332 ms
Elapsed:  15.842676162719727 ms
Elapsed:  16.328096389770508 ms
Elapsed:  16.4949893951416 ms
Elapsed:  16.47210121154785 ms
Elapsed:  16.674041748046875 ms
Elapsed:  15.768766403198242 ms
Elapsed:  16.48569107055664 ms
Elapsed:  15.876054763793945 ms
Elapsed:  16.852140426635742 ms
Elapsed:  16.035079956054688 ms
Elapsed:  16.407012939453125 ms
Elapsed:  15.882015228271484 ms
Elapsed:  16.71886444091797 ms
Elapsed:  15.86294174194336 ms
Elapsed:  16.566038131713867 ms
Elapsed:  15.904903411865234 ms
Elapsed:  16.289234161376953 ms
Elapsed:  16.14999771118164 ms
Elapsed:  16.31784439086914 ms
Elapsed:  16.106843948364258 ms
Elapsed:  16.581058502197266 ms
Elapsed:  16.435861587524414 ms
Elapsed:  15.904903411865234 ms
Elapsed:  16.408205032348633 ms
Elapsed:  16.062021255493164 ms
Elapsed:  16.256093978881836 ms
Elapsed:  15.87367057800293 ms
Elapsed:  16.23702049255371 ms
Elapsed:  16.745805740356445 ms
Elapsed:  15.92707633972168 ms
Elapsed:  16.142845153808594 ms
Elapsed:  16.492843627929688 ms
Elapsed:  21.553754806518555 ms
Elapsed:  17.05002784729004 ms
Elapsed:  16.932964324951172 ms
Elapsed:  16.810894012451172 ms
Elapsed:  16.577720642089844 ms
Elapsed:  15.714168548583984 ms
Elapsed:  16.2351131439209 ms
Elapsed:  16.072988510131836 ms
Elapsed:  16.038894653320312 ms
Elapsed:  16.055822372436523 ms
Elapsed:  16.378164291381836 ms
Elapsed:  15.806913375854492 ms
Elapsed:  15.5792236328125 ms
Elapsed:  15.954732894897461 ms
Elapsed:  15.566825866699219 ms
Elapsed:  15.707969665527344 ms
Elapsed:  15.514135360717773 ms
Elapsed:  15.43116569519043 ms
Elapsed:  15.332937240600586 ms
Elapsed:  15.470027923583984 ms
Elapsed:  15.269756317138672 ms
Elapsed:  15.250921249389648 ms
Elapsed:  15.47694206237793 ms
Elapsed:  15.306949615478516 ms
Elapsed:  15.72728157043457 ms
Elapsed:  15.938043594360352 ms
Elapsed:  16.324996948242188 ms
Elapsed:  16.198158264160156 ms
Elapsed:  15.982627868652344 ms
Elapsed:  16.308069229125977 ms
Elapsed:  17.843246459960938 ms
Elapsed:  15.820026397705078 ms
Elapsed:  16.428232192993164 ms
Elapsed:  15.978097915649414 ms
Elapsed:  16.347885131835938 ms
Elapsed:  16.026020050048828 ms
Elapsed:  16.362905502319336 ms
Elapsed:  16.900062561035156 ms
Elapsed:  17.3337459564209 ms
Elapsed:  17.65608787536621 ms
Elapsed:  20.101070404052734 ms
Elapsed:  18.137216567993164 ms
Elapsed:  16.952991485595703 ms
Elapsed:  16.7691707611084 ms
Elapsed:  16.71290397644043 ms
Elapsed:  16.3421630859375 ms
Elapsed:  16.36195182800293 ms

```

* chore: remove unused field_types variable
  • Loading branch information
olavloite authored Dec 2, 2024
1 parent 054a186 commit ccae6e0
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 79 deletions.
178 changes: 131 additions & 47 deletions google/cloud/spanner_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,66 +266,69 @@ def _parse_value_pb(value_pb, field_type, field_name, column_info=None):
:returns: value extracted from value_pb
:raises ValueError: if unknown type is passed
"""
decoder = _get_type_decoder(field_type, field_name, column_info)
return _parse_nullable(value_pb, decoder)


def _get_type_decoder(field_type, field_name, column_info=None):
"""Returns a function that converts a Value protobuf to cell data.
:type field_type: :class:`~google.cloud.spanner_v1.types.Type`
:param field_type: type code for the value
:type field_name: str
:param field_name: column name
:type column_info: dict
:param column_info: (Optional) dict of column name and column information.
An object where column names as keys and custom objects as corresponding
values for deserialization. It's specifically useful for data types like
protobuf where deserialization logic is on user-specific code. When provided,
the custom object enables deserialization of backend-received column data.
If not provided, data remains serialized as bytes for Proto Messages and
integer for Proto Enums.
:rtype: a function that takes a single protobuf value as an input argument
:returns: a function that can be used to extract a value from a protobuf value
:raises ValueError: if unknown type is passed
"""

type_code = field_type.code
if value_pb.HasField("null_value"):
return None
if type_code == TypeCode.STRING:
return value_pb.string_value
return _parse_string
elif type_code == TypeCode.BYTES:
return value_pb.string_value.encode("utf8")
return _parse_bytes
elif type_code == TypeCode.BOOL:
return value_pb.bool_value
return _parse_bool
elif type_code == TypeCode.INT64:
return int(value_pb.string_value)
return _parse_int64
elif type_code == TypeCode.FLOAT64:
if value_pb.HasField("string_value"):
return float(value_pb.string_value)
else:
return value_pb.number_value
return _parse_float
elif type_code == TypeCode.FLOAT32:
if value_pb.HasField("string_value"):
return float(value_pb.string_value)
else:
return value_pb.number_value
return _parse_float
elif type_code == TypeCode.DATE:
return _date_from_iso8601_date(value_pb.string_value)
return _parse_date
elif type_code == TypeCode.TIMESTAMP:
DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds
return DatetimeWithNanoseconds.from_rfc3339(value_pb.string_value)
elif type_code == TypeCode.ARRAY:
return [
_parse_value_pb(
item_pb, field_type.array_element_type, field_name, column_info
)
for item_pb in value_pb.list_value.values
]
elif type_code == TypeCode.STRUCT:
return [
_parse_value_pb(
item_pb, field_type.struct_type.fields[i].type_, field_name, column_info
)
for (i, item_pb) in enumerate(value_pb.list_value.values)
]
return _parse_timestamp
elif type_code == TypeCode.NUMERIC:
return decimal.Decimal(value_pb.string_value)
return _parse_numeric
elif type_code == TypeCode.JSON:
return JsonObject.from_str(value_pb.string_value)
return _parse_json
elif type_code == TypeCode.PROTO:
bytes_value = base64.b64decode(value_pb.string_value)
if column_info is not None and column_info.get(field_name) is not None:
default_proto_message = column_info.get(field_name)
if isinstance(default_proto_message, Message):
proto_message = type(default_proto_message)()
proto_message.ParseFromString(bytes_value)
return proto_message
return bytes_value
return lambda value_pb: _parse_proto(value_pb, column_info, field_name)
elif type_code == TypeCode.ENUM:
int_value = int(value_pb.string_value)
if column_info is not None and column_info.get(field_name) is not None:
proto_enum = column_info.get(field_name)
if isinstance(proto_enum, EnumTypeWrapper):
return proto_enum.Name(int_value)
return int_value
return lambda value_pb: _parse_proto_enum(value_pb, column_info, field_name)
elif type_code == TypeCode.ARRAY:
element_decoder = _get_type_decoder(
field_type.array_element_type, field_name, column_info
)
return lambda value_pb: _parse_array(value_pb, element_decoder)
elif type_code == TypeCode.STRUCT:
element_decoders = [
_get_type_decoder(item_field.type_, field_name, column_info)
for item_field in field_type.struct_type.fields
]
return lambda value_pb: _parse_struct(value_pb, element_decoders)
else:
raise ValueError("Unknown type: %s" % (field_type,))

Expand All @@ -351,6 +354,87 @@ def _parse_list_value_pbs(rows, row_type):
return result


def _parse_string(value_pb) -> str:
return value_pb.string_value


def _parse_bytes(value_pb):
return value_pb.string_value.encode("utf8")


def _parse_bool(value_pb) -> bool:
return value_pb.bool_value


def _parse_int64(value_pb) -> int:
return int(value_pb.string_value)


def _parse_float(value_pb) -> float:
if value_pb.HasField("string_value"):
return float(value_pb.string_value)
else:
return value_pb.number_value


def _parse_date(value_pb):
return _date_from_iso8601_date(value_pb.string_value)


def _parse_timestamp(value_pb):
DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds
return DatetimeWithNanoseconds.from_rfc3339(value_pb.string_value)


def _parse_numeric(value_pb):
return decimal.Decimal(value_pb.string_value)


def _parse_json(value_pb):
return JsonObject.from_str(value_pb.string_value)


def _parse_proto(value_pb, column_info, field_name):
bytes_value = base64.b64decode(value_pb.string_value)
if column_info is not None and column_info.get(field_name) is not None:
default_proto_message = column_info.get(field_name)
if isinstance(default_proto_message, Message):
proto_message = type(default_proto_message)()
proto_message.ParseFromString(bytes_value)
return proto_message
return bytes_value


def _parse_proto_enum(value_pb, column_info, field_name):
int_value = int(value_pb.string_value)
if column_info is not None and column_info.get(field_name) is not None:
proto_enum = column_info.get(field_name)
if isinstance(proto_enum, EnumTypeWrapper):
return proto_enum.Name(int_value)
return int_value


def _parse_array(value_pb, element_decoder) -> []:
return [
_parse_nullable(item_pb, element_decoder)
for item_pb in value_pb.list_value.values
]


def _parse_struct(value_pb, element_decoders):
return [
_parse_nullable(item_pb, element_decoders[i])
for (i, item_pb) in enumerate(value_pb.list_value.values)
]


def _parse_nullable(value_pb, decoder):
if value_pb.HasField("null_value"):
return None
else:
return decoder(value_pb)


class _SessionWrapper(object):
"""Base class for objects wrapping a session.
Expand Down
54 changes: 48 additions & 6 deletions google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def read(
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
column_info=None,
lazy_decode=False,
):
"""Perform a ``StreamingRead`` API request for rows in a table.
Expand Down Expand Up @@ -255,6 +256,18 @@ def read(
If not provided, data remains serialized as bytes for Proto Messages and
integer for Proto Enums.
:type lazy_decode: bool
:param lazy_decode:
(Optional) If this argument is set to ``true``, the iterator
returns the underlying protobuf values instead of decoded Python
objects. This reduces the time that is needed to iterate through
large result sets. The application is responsible for decoding
the data that is needed. The returned row iterator contains two
functions that can be used for this. ``iterator.decode_row(row)``
decodes all the columns in the given row to an array of Python
objects. ``iterator.decode_column(row, column_index)`` decodes one
specific column in the given row.
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
Expand Down Expand Up @@ -330,10 +343,15 @@ def read(
self._read_request_count += 1
if self._multi_use:
return StreamedResultSet(
iterator, source=self, column_info=column_info
iterator,
source=self,
column_info=column_info,
lazy_decode=lazy_decode,
)
else:
return StreamedResultSet(iterator, column_info=column_info)
return StreamedResultSet(
iterator, column_info=column_info, lazy_decode=lazy_decode
)
else:
iterator = _restart_on_unavailable(
restart,
Expand All @@ -348,9 +366,13 @@ def read(
self._read_request_count += 1

if self._multi_use:
return StreamedResultSet(iterator, source=self, column_info=column_info)
return StreamedResultSet(
iterator, source=self, column_info=column_info, lazy_decode=lazy_decode
)
else:
return StreamedResultSet(iterator, column_info=column_info)
return StreamedResultSet(
iterator, column_info=column_info, lazy_decode=lazy_decode
)

def execute_sql(
self,
Expand All @@ -366,6 +388,7 @@ def execute_sql(
data_boost_enabled=False,
directed_read_options=None,
column_info=None,
lazy_decode=False,
):
"""Perform an ``ExecuteStreamingSql`` API request.
Expand Down Expand Up @@ -438,6 +461,18 @@ def execute_sql(
If not provided, data remains serialized as bytes for Proto Messages and
integer for Proto Enums.
:type lazy_decode: bool
:param lazy_decode:
(Optional) If this argument is set to ``true``, the iterator
returns the underlying protobuf values instead of decoded Python
objects. This reduces the time that is needed to iterate through
large result sets. The application is responsible for decoding
the data that is needed. The returned row iterator contains two
functions that can be used for this. ``iterator.decode_row(row)``
decodes all the columns in the given row to an array of Python
objects. ``iterator.decode_column(row, column_index)`` decodes one
specific column in the given row.
:raises ValueError:
for reuse of single-use snapshots, or if a transaction ID is
already pending for multiple-use snapshots.
Expand Down Expand Up @@ -517,6 +552,7 @@ def execute_sql(
trace_attributes,
column_info,
observability_options,
lazy_decode=lazy_decode,
)
else:
return self._get_streamed_result_set(
Expand All @@ -525,6 +561,7 @@ def execute_sql(
trace_attributes,
column_info,
observability_options,
lazy_decode=lazy_decode,
)

def _get_streamed_result_set(
Expand All @@ -534,6 +571,7 @@ def _get_streamed_result_set(
trace_attributes,
column_info,
observability_options=None,
lazy_decode=False,
):
iterator = _restart_on_unavailable(
restart,
Expand All @@ -548,9 +586,13 @@ def _get_streamed_result_set(
self._execute_sql_count += 1

if self._multi_use:
return StreamedResultSet(iterator, source=self, column_info=column_info)
return StreamedResultSet(
iterator, source=self, column_info=column_info, lazy_decode=lazy_decode
)
else:
return StreamedResultSet(iterator, column_info=column_info)
return StreamedResultSet(
iterator, column_info=column_info, lazy_decode=lazy_decode
)

def partition_read(
self,
Expand Down
Loading

0 comments on commit ccae6e0

Please sign in to comment.