From c4210b28466cfd88fffe546140a005a8e0a1af23 Mon Sep 17 00:00:00 2001 From: Astha Mohta <35952883+asthamohta@users.noreply.github.com> Date: Tue, 9 Jan 2024 16:31:05 +0530 Subject: [PATCH] feat: Add support for Directed Reads (#1000) * changes * changes * docs * docs * linting * feat(spanner): remove client side validations for directed read options * feat(spanner): update the auto_failover_disabled field * feat(spanner): update unit tests * feat(spanner): update test * feat(spanner): update documentation * feat(spanner): add system test to validate exception in case of RW transaction * feat(spanner): update unit test * feat(spanner): add dro for batchsnapshot and update system tests * feat(spanner): fix unit tests for batchsnapshot * feat(spanner): add unit tests for partition read and query * feat(spanner): lint fixes * feat(spanner): code refactor remove TransactionType * feat(spanner): comment refactor * feat(spanner): remove comments --------- Co-authored-by: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Co-authored-by: Sri Harsha CH --- google/cloud/spanner_v1/__init__.py | 2 + google/cloud/spanner_v1/client.py | 30 +++++ google/cloud/spanner_v1/database.py | 17 +++ google/cloud/spanner_v1/snapshot.py | 26 ++++ samples/samples/snippets.py | 76 +++++++++++ samples/samples/snippets_test.py | 7 + tests/system/test_database_api.py | 79 +++++++++++ tests/unit/spanner_dbapi/test_connection.py | 3 +- tests/unit/test_client.py | 25 ++++ tests/unit/test_database.py | 141 +++++++++++++++++++- tests/unit/test_instance.py | 1 + tests/unit/test_snapshot.py | 88 +++++++++++- tests/unit/test_spanner.py | 75 ++++++++++- tests/unit/test_transaction.py | 2 + 14 files changed, 564 insertions(+), 8 deletions(-) diff --git a/google/cloud/spanner_v1/__init__.py b/google/cloud/spanner_v1/__init__.py index 3b59bb3ef0c..47805d4ebc1 100644 --- a/google/cloud/spanner_v1/__init__.py +++ b/google/cloud/spanner_v1/__init__.py @@ -40,6 +40,7 @@ from .types.spanner import CommitRequest from .types.spanner import CreateSessionRequest from .types.spanner import DeleteSessionRequest +from .types.spanner import DirectedReadOptions from .types.spanner import ExecuteBatchDmlRequest from .types.spanner import ExecuteBatchDmlResponse from .types.spanner import ExecuteSqlRequest @@ -108,6 +109,7 @@ "CommitResponse", "CreateSessionRequest", "DeleteSessionRequest", + "DirectedReadOptions", "ExecuteBatchDmlRequest", "ExecuteBatchDmlResponse", "ExecuteSqlRequest", diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index a0e848228be..f8f3fdb72c2 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -120,6 +120,12 @@ class Client(ClientWithProject): disable leader aware routing. Disabling leader aware routing would route all requests in RW/PDML transactions to the closest region. + :type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: (Optional) Client options used to set the directed_read_options + for all ReadRequests and ExecuteSqlRequests that indicates which replicas + or regions should be used for non-transactional reads or queries. + :raises: :class:`ValueError ` if both ``read_only`` and ``admin`` are :data:`True` """ @@ -139,6 +145,7 @@ def __init__( client_options=None, query_options=None, route_to_leader_enabled=True, + directed_read_options=None, ): self._emulator_host = _get_spanner_emulator_host() @@ -179,6 +186,7 @@ def __init__( warnings.warn(_EMULATOR_HOST_HTTP_SCHEME) self._route_to_leader_enabled = route_to_leader_enabled + self._directed_read_options = directed_read_options @property def credentials(self): @@ -260,6 +268,17 @@ def route_to_leader_enabled(self): """ return self._route_to_leader_enabled + @property + def directed_read_options(self): + """Getter for directed_read_options. + + :rtype: + :class:`~google.cloud.spanner_v1.DirectedReadOptions` + or :class:`dict` + :returns: The directed_read_options for the client. + """ + return self._directed_read_options + def copy(self): """Make a copy of this client. @@ -383,3 +402,14 @@ def list_instances(self, filter_="", page_size=None): request=request, metadata=metadata ) return page_iter + + @directed_read_options.setter + def directed_read_options(self, directed_read_options): + """Sets directed_read_options for the client + :type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: Client options used to set the directed_read_options + for all ReadRequests and ExecuteSqlRequests that indicates which replicas + or regions should be used for non-transactional reads or queries. + """ + self._directed_read_options = directed_read_options diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 758547cf86d..e5f00c8ebdb 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -167,6 +167,7 @@ def __init__( self._route_to_leader_enabled = self._instance._client.route_to_leader_enabled self._enable_drop_protection = enable_drop_protection self._reconciling = False + self._directed_read_options = self._instance._client.directed_read_options if pool is None: pool = BurstyPool(database_role=database_role) @@ -1226,6 +1227,7 @@ def generate_read_batches( partition_size_bytes=None, max_partitions=None, data_boost_enabled=False, + directed_read_options=None, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1265,6 +1267,12 @@ def generate_read_batches( (Optional) If this is for a partitioned read and this field is set ``true``, the request will be executed via offline access. + :type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: (Optional) Request level option used to set the directed_read_options + for ReadRequests that indicates which replicas + or regions should be used for non-transactional reads. + :type retry: :class:`~google.api_core.retry.Retry` :param retry: (Optional) The retry settings for this request. @@ -1293,6 +1301,7 @@ def generate_read_batches( "keyset": keyset._to_dict(), "index": index, "data_boost_enabled": data_boost_enabled, + "directed_read_options": directed_read_options, } for partition in partitions: yield {"partition": partition, "read": read_info.copy()} @@ -1337,6 +1346,7 @@ def generate_query_batches( max_partitions=None, query_options=None, data_boost_enabled=False, + directed_read_options=None, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1388,6 +1398,12 @@ def generate_query_batches( (Optional) If this is for a partitioned query and this field is set ``true``, the request will be executed via offline access. + :type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: (Optional) Request level option used to set the directed_read_options + for ExecuteSqlRequests that indicates which replicas + or regions should be used for non-transactional queries. + :type retry: :class:`~google.api_core.retry.Retry` :param retry: (Optional) The retry settings for this request. @@ -1412,6 +1428,7 @@ def generate_query_batches( query_info = { "sql": sql, "data_boost_enabled": data_boost_enabled, + "directed_read_options": directed_read_options, } if params: query_info["params"] = params diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 1e515bd8e69..37bed11d7e3 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -173,6 +173,7 @@ def read( partition=None, request_options=None, data_boost_enabled=False, + directed_read_options=None, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -224,6 +225,12 @@ def read( ``partition_token``, the API will return an ``INVALID_ARGUMENT`` error. + :type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: (Optional) Request level option used to set the directed_read_options + for all ReadRequests and ExecuteSqlRequests that indicates which replicas + or regions should be used for non-transactional reads or queries. + :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet` :returns: a result set instance which can be used to consume rows. @@ -253,6 +260,11 @@ def read( if self._read_only: # Transaction tags are not supported for read only transactions. request_options.transaction_tag = None + if ( + directed_read_options is None + and database._directed_read_options is not None + ): + directed_read_options = database._directed_read_options elif self.transaction_tag is not None: request_options.transaction_tag = self.transaction_tag @@ -266,6 +278,7 @@ def read( partition_token=partition, request_options=request_options, data_boost_enabled=data_boost_enabled, + directed_read_options=directed_read_options, ) restart = functools.partial( api.streaming_read, @@ -322,6 +335,7 @@ def execute_sql( retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, data_boost_enabled=False, + directed_read_options=None, ): """Perform an ``ExecuteStreamingSql`` API request. @@ -379,6 +393,12 @@ def execute_sql( ``partition_token``, the API will return an ``INVALID_ARGUMENT`` error. + :type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: (Optional) Request level option used to set the directed_read_options + for all ReadRequests and ExecuteSqlRequests that indicates which replicas + or regions should be used for non-transactional reads or queries. + :raises ValueError: for reuse of single-use snapshots, or if a transaction ID is already pending for multiple-use snapshots. @@ -419,6 +439,11 @@ def execute_sql( if self._read_only: # Transaction tags are not supported for read only transactions. request_options.transaction_tag = None + if ( + directed_read_options is None + and database._directed_read_options is not None + ): + directed_read_options = database._directed_read_options elif self.transaction_tag is not None: request_options.transaction_tag = self.transaction_tag @@ -433,6 +458,7 @@ def execute_sql( query_options=query_options, request_options=request_options, data_boost_enabled=data_boost_enabled, + directed_read_options=directed_read_options, ) restart = functools.partial( api.execute_streaming_sql, diff --git a/samples/samples/snippets.py b/samples/samples/snippets.py index f7c403cfc41..3ffd579f4ab 100644 --- a/samples/samples/snippets.py +++ b/samples/samples/snippets.py @@ -31,6 +31,7 @@ from google.cloud import spanner from google.cloud.spanner_admin_instance_v1.types import spanner_instance_admin from google.cloud.spanner_v1 import param_types +from google.cloud.spanner_v1 import DirectedReadOptions from google.type import expr_pb2 from google.iam.v1 import policy_pb2 from google.cloud.spanner_v1.data_types import JsonObject @@ -2723,6 +2724,78 @@ def drop_sequence(instance_id, database_id): # [END spanner_drop_sequence] + +def directed_read_options( + instance_id, + database_id, +): + """ + Shows how to run an execute sql request with directed read options. + Only one of exclude_replicas or include_replicas can be set + Each accepts a list of replicaSelections which contains location and type + * `location` - The location must be one of the regions within the + multi-region configuration of your database. + * `type_` - The type of the replica + Some examples of using replica_selectors are: + * `location:us-east1` --> The "us-east1" replica(s) of any available type + will be used to process the request. + * `type:READ_ONLY` --> The "READ_ONLY" type replica(s) in nearest + available location will be used to process the + request. + * `location:us-east1 type:READ_ONLY` --> The "READ_ONLY" type replica(s) + in location "us-east1" will be used to process + the request. + include_replicas also contains an option for auto_failover_disabled which when set + Spanner will not route requests to a replica outside the + include_replicas list when all the specified replicas are unavailable + or unhealthy. The default value is `false` + """ + # [START spanner_directed_read] + # instance_id = "your-spanner-instance" + # database_id = "your-spanner-db-id" + + directed_read_options_for_client = { + "exclude_replicas": { + "replica_selections": [ + { + "location": "us-east4", + }, + ], + }, + } + + # directed_read_options can be set at client level and will be used in all + # read-only transaction requests + spanner_client = spanner.Client( + directed_read_options=directed_read_options_for_client + ) + instance = spanner_client.instance(instance_id) + database = instance.database(database_id) + + directed_read_options_for_request = { + "include_replicas": { + "replica_selections": [ + { + "type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY, + }, + ], + "auto_failover_disabled": True, + }, + } + + with database.snapshot() as snapshot: + # Read rows while passing directed_read_options directly to the query. + # These will override the options passed at Client level. + results = snapshot.execute_sql( + "SELECT SingerId, AlbumId, AlbumTitle FROM Albums", + directed_read_options=directed_read_options_for_request, + ) + + for row in results: + print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row)) + # [END spanner_directed_read] + + if __name__ == "__main__": # noqa: C901 parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter @@ -2862,6 +2935,7 @@ def drop_sequence(instance_id, database_id): "--database_role", default="new_parent" ) enable_fine_grained_access_parser.add_argument("--title", default="condition title") + subparsers.add_parser("directed_read_options", help=directed_read_options.__doc__) args = parser.parse_args() @@ -2993,3 +3067,5 @@ def drop_sequence(instance_id, database_id): args.database_role, args.title, ) + elif args.command == "directed_read_options": + directed_read_options(args.instance_id, args.database_id) diff --git a/samples/samples/snippets_test.py b/samples/samples/snippets_test.py index 85999363bbd..a49a4ee4801 100644 --- a/samples/samples/snippets_test.py +++ b/samples/samples/snippets_test.py @@ -852,3 +852,10 @@ def test_drop_sequence(capsys, instance_id, bit_reverse_sequence_database): "Altered Customers table to drop DEFAULT from CustomerId column and dropped the Seq sequence on database" in out ) + + +@pytest.mark.dependency(depends=["insert_data"]) +def test_directed_read_options(capsys, instance_id, sample_database): + snippets.directed_read_options(instance_id, sample_database.database_id) + out, _ = capsys.readouterr() + assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out diff --git a/tests/system/test_database_api.py b/tests/system/test_database_api.py index 153567810a4..052e6281888 100644 --- a/tests/system/test_database_api.py +++ b/tests/system/test_database_api.py @@ -22,6 +22,7 @@ from google.cloud import spanner_v1 from google.cloud.spanner_v1.pool import FixedSizePool, PingingPool from google.cloud.spanner_admin_database_v1 import DatabaseDialect +from google.cloud.spanner_v1 import DirectedReadOptions from google.type import expr_pb2 from . import _helpers from . import _sample_data @@ -31,6 +32,17 @@ FKADC_CUSTOMERS_COLUMNS = ("CustomerId", "CustomerName") FKADC_SHOPPING_CARTS_COLUMNS = ("CartId", "CustomerId", "CustomerName") ALL_KEYSET = spanner_v1.KeySet(all_=True) +DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY, + }, + ], + "auto_failover_disabled": True, + }, +} @pytest.fixture(scope="module") @@ -740,3 +752,70 @@ def test_update_database_invalid(not_emulator, shared_database): # Empty `fields` is not supported. with pytest.raises(exceptions.InvalidArgument): shared_database.update([]) + + +def test_snapshot_read_w_directed_read_options( + shared_database, not_postgres, not_emulator +): + _helpers.retry_has_all_dll(shared_database.reload)() + table = "users_history" + columns = ["id", "commit_ts", "name", "email", "deleted"] + user_id = 1234 + name = "phred" + email = "phred@example.com" + row_data = [[user_id, spanner_v1.COMMIT_TIMESTAMP, name, email, False]] + sd = _sample_data + + with shared_database.batch() as batch: + batch.delete(table, sd.ALL) + batch.insert(table, columns, row_data) + + with shared_database.snapshot() as snapshot: + rows = list( + snapshot.read( + table, columns, sd.ALL, directed_read_options=DIRECTED_READ_OPTIONS + ) + ) + + assert len(rows) == 1 + + +def test_execute_sql_w_directed_read_options( + shared_database, not_postgres, not_emulator +): + _helpers.retry_has_all_dll(shared_database.reload)() + sd = _sample_data + + with shared_database.batch() as batch: + batch.delete(sd.TABLE, sd.ALL) + + def _unit_of_work(transaction, test): + transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) + + shared_database.run_in_transaction(_unit_of_work, test=sd) + + with shared_database.snapshot() as snapshot: + rows = list( + snapshot.execute_sql(sd.SQL, directed_read_options=DIRECTED_READ_OPTIONS) + ) + sd._check_rows_data(rows) + + +def test_readwrite_transaction_w_directed_read_options_w_error( + shared_database, not_emulator, not_postgres +): + _helpers.retry_has_all_dll(shared_database.reload)() + sd = _sample_data + + def _transaction_read(transaction): + list( + transaction.read( + sd.TABLE, + sd.COLUMNS, + sd.ALL, + directed_read_options=DIRECTED_READ_OPTIONS, + ) + ) + + with pytest.raises(exceptions.InvalidArgument): + shared_database.run_in_transaction(_transaction_read) diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index de028c32062..8996a06ce6f 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -63,7 +63,8 @@ def _make_connection(self, **kwargs): from google.cloud.spanner_v1.client import Client # We don't need a real Client object to test the constructor - instance = Instance(INSTANCE, client=Client) + client = Client() + instance = Instance(INSTANCE, client=client) database = instance.database(DATABASE) return Connection(instance, database, **kwargs) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 049ee1124fc..8fb5b13a9ab 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -15,6 +15,7 @@ import unittest import mock +from google.cloud.spanner_v1 import DirectedReadOptions def _make_credentials(): @@ -40,6 +41,17 @@ class TestClient(unittest.TestCase): LABELS = {"test": "true"} TIMEOUT_SECONDS = 80 LEADER_OPTIONS = ["leader1", "leader2"] + DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY, + }, + ], + "auto_failover_disabled": True, + }, + } def _get_target_class(self): from google.cloud import spanner @@ -59,6 +71,7 @@ def _constructor_test_helper( query_options=None, expected_query_options=None, route_to_leader_enabled=True, + directed_read_options=None, ): import google.api_core.client_options from google.cloud.spanner_v1 import client as MUT @@ -84,6 +97,7 @@ def _constructor_test_helper( project=self.PROJECT, credentials=creds, query_options=query_options, + directed_read_options=directed_read_options, **kwargs ) @@ -112,6 +126,8 @@ def _constructor_test_helper( self.assertEqual(client.route_to_leader_enabled, route_to_leader_enabled) else: self.assertFalse(client.route_to_leader_enabled) + if directed_read_options is not None: + self.assertEqual(client.directed_read_options, directed_read_options) @mock.patch("google.cloud.spanner_v1.client._get_spanner_emulator_host") @mock.patch("warnings.warn") @@ -225,6 +241,15 @@ def test_constructor_custom_query_options_env_config(self, mock_ver, mock_stats) expected_query_options=expected_query_options, ) + def test_constructor_w_directed_read_options(self): + from google.cloud.spanner_v1 import client as MUT + + expected_scopes = (MUT.SPANNER_ADMIN_SCOPE,) + creds = _make_credentials() + self._constructor_test_helper( + expected_scopes, creds, directed_read_options=self.DIRECTED_READ_OPTIONS + ) + def test_constructor_route_to_leader_disbled(self): from google.cloud.spanner_v1 import client as MUT diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index cac45a26acc..5f563773bc9 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -22,7 +22,7 @@ from google.api_core.retry import Retry from google.protobuf.field_mask_pb2 import FieldMask -from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1 import RequestOptions, DirectedReadOptions DML_WO_PARAM = """ DELETE FROM citizens @@ -35,6 +35,17 @@ PARAMS = {"age": 30} PARAM_TYPES = {"age": INT64} MODE = 2 # PROFILE +DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY, + }, + ], + "auto_failover_disabled": True, + }, +} def _make_credentials(): # pragma: NO COVER @@ -196,6 +207,16 @@ def test_ctor_w_encryption_config(self): self.assertIs(database._instance, instance) self.assertEqual(database._encryption_config, encryption_config) + def test_ctor_w_directed_read_options(self): + client = _Client(directed_read_options=DIRECTED_READ_OPTIONS) + instance = _Instance(self.INSTANCE_NAME, client=client) + database = self._make_one( + self.DATABASE_ID, instance, database_role=self.DATABASE_ROLE + ) + self.assertEqual(database.database_id, self.DATABASE_ID) + self.assertIs(database._instance, instance) + self.assertEqual(database._directed_read_options, DIRECTED_READ_OPTIONS) + def test_from_pb_bad_database_name(self): from google.cloud.spanner_admin_database_v1 import Database @@ -2193,6 +2214,7 @@ def test_generate_read_batches_w_max_partitions(self): "keyset": {"all": True}, "index": "", "data_boost_enabled": False, + "directed_read_options": None, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2235,6 +2257,7 @@ def test_generate_read_batches_w_retry_and_timeout_params(self): "keyset": {"all": True}, "index": "", "data_boost_enabled": False, + "directed_read_options": None, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2276,6 +2299,7 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): "keyset": {"all": True}, "index": self.INDEX, "data_boost_enabled": False, + "directed_read_options": None, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2317,6 +2341,48 @@ def test_generate_read_batches_w_data_boost_enabled(self): "keyset": {"all": True}, "index": self.INDEX, "data_boost_enabled": True, + "directed_read_options": None, + } + self.assertEqual(len(batches), len(self.TOKENS)) + for batch, token in zip(batches, self.TOKENS): + self.assertEqual(batch["partition"], token) + self.assertEqual(batch["read"], expected_read) + + snapshot.partition_read.assert_called_once_with( + table=self.TABLE, + columns=self.COLUMNS, + keyset=keyset, + index=self.INDEX, + partition_size_bytes=None, + max_partitions=None, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + ) + + def test_generate_read_batches_w_directed_read_options(self): + keyset = self._make_keyset() + database = self._make_database() + batch_txn = self._make_one(database) + snapshot = batch_txn._snapshot = self._make_snapshot() + snapshot.partition_read.return_value = self.TOKENS + + batches = list( + batch_txn.generate_read_batches( + self.TABLE, + self.COLUMNS, + keyset, + index=self.INDEX, + directed_read_options=DIRECTED_READ_OPTIONS, + ) + ) + + expected_read = { + "table": self.TABLE, + "columns": self.COLUMNS, + "keyset": {"all": True}, + "index": self.INDEX, + "data_boost_enabled": False, + "directed_read_options": DIRECTED_READ_OPTIONS, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2414,6 +2480,7 @@ def test_generate_query_batches_w_max_partitions(self): "sql": sql, "data_boost_enabled": False, "query_options": client._query_options, + "directed_read_options": None, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2456,6 +2523,7 @@ def test_generate_query_batches_w_params_w_partition_size_bytes(self): "params": params, "param_types": param_types, "query_options": client._query_options, + "directed_read_options": None, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2503,6 +2571,7 @@ def test_generate_query_batches_w_retry_and_timeout_params(self): "params": params, "param_types": param_types, "query_options": client._query_options, + "directed_read_options": None, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2534,6 +2603,43 @@ def test_generate_query_batches_w_data_boost_enabled(self): "sql": sql, "data_boost_enabled": True, "query_options": client._query_options, + "directed_read_options": None, + } + self.assertEqual(len(batches), len(self.TOKENS)) + for batch, token in zip(batches, self.TOKENS): + self.assertEqual(batch["partition"], token) + self.assertEqual(batch["query"], expected_query) + + snapshot.partition_query.assert_called_once_with( + sql=sql, + params=None, + param_types=None, + partition_size_bytes=None, + max_partitions=None, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + ) + + def test_generate_query_batches_w_directed_read_options(self): + sql = "SELECT COUNT(*) FROM table_name" + client = _Client(self.PROJECT_ID) + instance = _Instance(self.INSTANCE_NAME, client=client) + database = _Database(self.DATABASE_NAME, instance=instance) + batch_txn = self._make_one(database) + snapshot = batch_txn._snapshot = self._make_snapshot() + snapshot.partition_query.return_value = self.TOKENS + + batches = list( + batch_txn.generate_query_batches( + sql, directed_read_options=DIRECTED_READ_OPTIONS + ) + ) + + expected_query = { + "sql": sql, + "data_boost_enabled": False, + "query_options": client._query_options, + "directed_read_options": DIRECTED_READ_OPTIONS, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2608,6 +2714,30 @@ def test_process_query_batch_w_retry_timeout(self): timeout=2.0, ) + def test_process_query_batch_w_directed_read_options(self): + sql = "SELECT first_name, last_name, email FROM citizens" + token = b"TOKEN" + batch = { + "partition": token, + "query": {"sql": sql, "directed_read_options": DIRECTED_READ_OPTIONS}, + } + database = self._make_database() + batch_txn = self._make_one(database) + snapshot = batch_txn._snapshot = self._make_snapshot() + expected = snapshot.execute_sql.return_value = object() + + found = batch_txn.process_query_batch(batch) + + self.assertIs(found, expected) + + snapshot.execute_sql.assert_called_once_with( + sql=sql, + partition=token, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + directed_read_options=DIRECTED_READ_OPTIONS, + ) + def test_close_wo_session(self): database = self._make_database() batch_txn = self._make_one(database) @@ -2873,7 +3003,12 @@ def _make_instance_api(): class _Client(object): - def __init__(self, project=TestDatabase.PROJECT_ID, route_to_leader_enabled=True): + def __init__( + self, + project=TestDatabase.PROJECT_ID, + route_to_leader_enabled=True, + directed_read_options=None, + ): from google.cloud.spanner_v1 import ExecuteSqlRequest self.project = project @@ -2884,6 +3019,7 @@ def __init__(self, project=TestDatabase.PROJECT_ID, route_to_leader_enabled=True self._client_options = mock.Mock() self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") self.route_to_leader_enabled = route_to_leader_enabled + self.directed_read_options = directed_read_options class _Instance(object): @@ -2910,6 +3046,7 @@ def __init__(self, name, instance=None): from logging import Logger self.logger = mock.create_autospec(Logger, instance=True) + self._directed_read_options = None class _Pool(object): diff --git a/tests/unit/test_instance.py b/tests/unit/test_instance.py index 20064e7e884..2313ee31310 100644 --- a/tests/unit/test_instance.py +++ b/tests/unit/test_instance.py @@ -1015,6 +1015,7 @@ def __init__(self, project, timeout_seconds=None): self.project_name = "projects/" + self.project self.timeout_seconds = timeout_seconds self.route_to_leader_enabled = True + self.directed_read_options = None def copy(self): from copy import deepcopy diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index a2799262dc2..aec20c2f54d 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -16,7 +16,7 @@ from google.api_core import gapic_v1 import mock -from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1 import RequestOptions, DirectedReadOptions from tests._helpers import ( OpenTelemetryBase, StatusCode, @@ -46,6 +46,26 @@ "db.instance": "testing", "net.host.name": "spanner.googleapis.com", } +DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY, + }, + ], + "auto_failover_disabled": True, + }, +} +DIRECTED_READ_OPTIONS_FOR_CLIENT = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-east1", + }, + ], + }, +} def _makeTimestamp(): @@ -607,6 +627,8 @@ def _read_helper( timeout=gapic_v1.method.DEFAULT, retry=gapic_v1.method.DEFAULT, request_options=None, + directed_read_options=None, + directed_read_options_at_client_level=None, ): from google.protobuf.struct_pb2 import Struct from google.cloud.spanner_v1 import ( @@ -646,7 +668,9 @@ def _read_helper( keyset = KeySet(keys=KEYS) INDEX = "email-address-index" LIMIT = 20 - database = _Database() + database = _Database( + directed_read_options=directed_read_options_at_client_level + ) api = database.spanner_api = self._make_spanner_api() api.streaming_read.return_value = _MockIterator(*result_sets) session = _Session(database) @@ -671,6 +695,7 @@ def _read_helper( retry=retry, timeout=timeout, request_options=request_options, + directed_read_options=directed_read_options, ) else: result_set = derived.read( @@ -682,6 +707,7 @@ def _read_helper( retry=retry, timeout=timeout, request_options=request_options, + directed_read_options=directed_read_options, ) self.assertEqual(derived._read_request_count, count + 1) @@ -716,6 +742,12 @@ def _read_helper( expected_request_options = request_options expected_request_options.transaction_tag = None + expected_directed_read_options = ( + directed_read_options + if directed_read_options is not None + else directed_read_options_at_client_level + ) + expected_request = ReadRequest( session=self.SESSION_NAME, table=TABLE_NAME, @@ -726,6 +758,7 @@ def _read_helper( limit=expected_limit, partition_token=partition, request_options=expected_request_options, + directed_read_options=expected_directed_read_options, ) api.streaming_read.assert_called_once_with( request=expected_request, @@ -801,6 +834,22 @@ def test_read_w_timeout_and_retry_params(self): multi_use=True, first=False, retry=Retry(deadline=60), timeout=2.0 ) + def test_read_w_directed_read_options(self): + self._read_helper(multi_use=False, directed_read_options=DIRECTED_READ_OPTIONS) + + def test_read_w_directed_read_options_at_client_level(self): + self._read_helper( + multi_use=False, + directed_read_options_at_client_level=DIRECTED_READ_OPTIONS_FOR_CLIENT, + ) + + def test_read_w_directed_read_options_override(self): + self._read_helper( + multi_use=False, + directed_read_options=DIRECTED_READ_OPTIONS, + directed_read_options_at_client_level=DIRECTED_READ_OPTIONS_FOR_CLIENT, + ) + def test_execute_sql_other_error(self): database = _Database() database.spanner_api = self._make_spanner_api() @@ -840,6 +889,8 @@ def _execute_sql_helper( request_options=None, timeout=gapic_v1.method.DEFAULT, retry=gapic_v1.method.DEFAULT, + directed_read_options=None, + directed_read_options_at_client_level=None, ): from google.protobuf.struct_pb2 import Struct from google.cloud.spanner_v1 import ( @@ -880,7 +931,9 @@ def _execute_sql_helper( for i in range(len(result_sets)): result_sets[i].values.extend(VALUE_PBS[i]) iterator = _MockIterator(*result_sets) - database = _Database() + database = _Database( + directed_read_options=directed_read_options_at_client_level + ) api = database.spanner_api = self._make_spanner_api() api.execute_streaming_sql.return_value = iterator session = _Session(database) @@ -906,6 +959,7 @@ def _execute_sql_helper( partition=partition, retry=retry, timeout=timeout, + directed_read_options=directed_read_options, ) self.assertEqual(derived._read_request_count, count + 1) @@ -946,6 +1000,12 @@ def _execute_sql_helper( expected_request_options = request_options expected_request_options.transaction_tag = None + expected_directed_read_options = ( + directed_read_options + if directed_read_options is not None + else directed_read_options_at_client_level + ) + expected_request = ExecuteSqlRequest( session=self.SESSION_NAME, sql=SQL_QUERY_WITH_PARAM, @@ -957,6 +1017,7 @@ def _execute_sql_helper( request_options=expected_request_options, partition_token=partition, seqno=sql_count, + directed_read_options=expected_directed_read_options, ) api.execute_streaming_sql.assert_called_once_with( request=expected_request, @@ -1043,6 +1104,24 @@ def test_execute_sql_w_incorrect_tag_dictionary_error(self): with self.assertRaises(ValueError): self._execute_sql_helper(multi_use=False, request_options=request_options) + def test_execute_sql_w_directed_read_options(self): + self._execute_sql_helper( + multi_use=False, directed_read_options=DIRECTED_READ_OPTIONS + ) + + def test_execute_sql_w_directed_read_options_at_client_level(self): + self._execute_sql_helper( + multi_use=False, + directed_read_options_at_client_level=DIRECTED_READ_OPTIONS_FOR_CLIENT, + ) + + def test_execute_sql_w_directed_read_options_override(self): + self._execute_sql_helper( + multi_use=False, + directed_read_options=DIRECTED_READ_OPTIONS, + directed_read_options_at_client_level=DIRECTED_READ_OPTIONS_FOR_CLIENT, + ) + def _partition_read_helper( self, multi_use, @@ -1748,10 +1827,11 @@ def __init__(self): class _Database(object): - def __init__(self): + def __init__(self, directed_read_options=None): self.name = "testing" self._instance = _Instance() self._route_to_leader_enabled = True + self._directed_read_options = directed_read_options class _Session(object): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index 314b964fa6b..3663d8bdc9b 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -28,6 +28,7 @@ StructType, TransactionOptions, TransactionSelector, + DirectedReadOptions, ExecuteBatchDmlRequest, ExecuteBatchDmlResponse, param_types, @@ -73,6 +74,17 @@ MODE = 2 RETRY = gapic_v1.method.DEFAULT TIMEOUT = gapic_v1.method.DEFAULT +DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY, + }, + ], + "auto_failover_disabled": True, + }, +} insert_dml = "INSERT INTO table(pkey, desc) VALUES (%pkey, %desc)" insert_params = {"pkey": 12345, "desc": "DESCRIPTION"} insert_param_types = {"pkey": param_types.INT64, "desc": param_types.STRING} @@ -191,6 +203,7 @@ def _execute_sql_helper( partition=None, sql_count=0, query_options=None, + directed_read_options=None, ): VALUES = [["bharney", "rhubbyl", 31], ["phred", "phlyntstone", 32]] VALUE_PBS = [[_make_value_pb(item) for item in row] for row in VALUES] @@ -229,6 +242,7 @@ def _execute_sql_helper( partition=partition, retry=RETRY, timeout=TIMEOUT, + directed_read_options=directed_read_options, ) self.assertEqual(transaction._read_request_count, count + 1) @@ -246,6 +260,7 @@ def _execute_sql_expected_request( begin=True, sql_count=0, transaction_tag=False, + directed_read_options=None, ): if begin is True: expected_transaction = TransactionSelector( @@ -282,6 +297,7 @@ def _execute_sql_expected_request( request_options=expected_request_options, partition_token=partition, seqno=sql_count, + directed_read_options=directed_read_options, ) return expected_request @@ -292,6 +308,7 @@ def _read_helper( api, count=0, partition=None, + directed_read_options=None, ): VALUES = [["bharney", 31], ["phred", 32]] VALUE_PBS = [[_make_value_pb(item) for item in row] for row in VALUES] @@ -330,6 +347,7 @@ def _read_helper( retry=RETRY, timeout=TIMEOUT, request_options=RequestOptions(), + directed_read_options=directed_read_options, ) else: result_set = transaction.read( @@ -341,6 +359,7 @@ def _read_helper( retry=RETRY, timeout=TIMEOUT, request_options=RequestOptions(), + directed_read_options=directed_read_options, ) self.assertEqual(transaction._read_request_count, count + 1) @@ -352,7 +371,12 @@ def _read_helper( self.assertEqual(result_set.stats, stats_pb) def _read_helper_expected_request( - self, partition=None, begin=True, count=0, transaction_tag=False + self, + partition=None, + begin=True, + count=0, + transaction_tag=False, + directed_read_options=None, ): if begin is True: expected_transaction = TransactionSelector( @@ -384,6 +408,7 @@ def _read_helper_expected_request( limit=expected_limit, partition_token=partition, request_options=expected_request_options, + directed_read_options=directed_read_options, ) return expected_request @@ -621,6 +646,52 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): ], ) + def test_transaction_execute_sql_w_directed_read_options(self): + database = _Database() + session = _Session(database) + api = database.spanner_api = self._make_spanner_api() + transaction = self._make_one(session) + + self._execute_sql_helper( + transaction=transaction, + api=api, + directed_read_options=DIRECTED_READ_OPTIONS, + ) + api.execute_streaming_sql.assert_called_once_with( + request=self._execute_sql_expected_request( + database=database, directed_read_options=DIRECTED_READ_OPTIONS + ), + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + ) + + def test_transaction_streaming_read_w_directed_read_options(self): + database = _Database() + session = _Session(database) + api = database.spanner_api = self._make_spanner_api() + transaction = self._make_one(session) + + self._read_helper( + transaction=transaction, + api=api, + directed_read_options=DIRECTED_READ_OPTIONS, + ) + api.streaming_read.assert_called_once_with( + request=self._read_helper_expected_request( + directed_read_options=DIRECTED_READ_OPTIONS + ), + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + retry=RETRY, + timeout=TIMEOUT, + ) + def test_transaction_should_use_transaction_id_returned_by_first_read(self): database = _Database() session = _Session(database) @@ -941,6 +1012,7 @@ def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") + self.directed_read_options = None class _Instance(object): @@ -953,6 +1025,7 @@ def __init__(self): self.name = "testing" self._instance = _Instance() self._route_to_leader_enabled = True + self._directed_read_options = None class _Session(object): diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index ffcffa115e4..2d2f208424f 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -894,6 +894,7 @@ def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") + self.directed_read_options = None class _Instance(object): @@ -906,6 +907,7 @@ def __init__(self): self.name = "testing" self._instance = _Instance() self._route_to_leader_enabled = True + self._directed_read_options = None class _Session(object):