Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add BatchWrite API #1011

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions google/cloud/spanner_v1/gapic_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
"batch_create_sessions"
]
},
"BatchWrite": {
"methods": [
"batch_write"
]
},
"BeginTransaction": {
"methods": [
"begin_transaction"
Expand Down Expand Up @@ -95,6 +100,11 @@
"batch_create_sessions"
]
},
"BatchWrite": {
"methods": [
"batch_write"
]
},
"BeginTransaction": {
"methods": [
"begin_transaction"
Expand Down Expand Up @@ -175,6 +185,11 @@
"batch_create_sessions"
]
},
"BatchWrite": {
"methods": [
"batch_write"
]
},
"BeginTransaction": {
"methods": [
"begin_transaction"
Expand Down
137 changes: 137 additions & 0 deletions google/cloud/spanner_v1/services/spanner/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,143 @@ async def sample_partition_read():
# Done; return the response.
return response

def batch_write(
self,
request: Optional[Union[spanner.BatchWriteRequest, dict]] = None,
*,
session: Optional[str] = None,
mutation_groups: Optional[
MutableSequence[spanner.BatchWriteRequest.MutationGroup]
] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> Awaitable[AsyncIterable[spanner.BatchWriteResponse]]:
r"""Batches the supplied mutation groups in a collection
of efficient transactions. All mutations in a group are
committed atomically. However, mutations across groups
can be committed non-atomically in an unspecified order
and thus, they must be independent of each other.
Partial failure is possible, i.e., some groups may have
been committed successfully, while some may have failed.
The results of individual batches are streamed into the
response as the batches are applied.

BatchWrite requests are not replay protected, meaning
that each mutation group may be applied more than once.
Replays of non-idempotent mutations may have undesirable
effects. For example, replays of an insert mutation may
produce an already exists error or if you use generated
or commit timestamp-based keys, it may result in
additional rows being added to the mutation's table. We
recommend structuring your mutation groups to be
idempotent to avoid this issue.

.. code-block:: python

# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in:
# https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import spanner_v1

async def sample_batch_write():
# Create a client
client = spanner_v1.SpannerAsyncClient()

# Initialize request argument(s)
mutation_groups = spanner_v1.MutationGroup()
mutation_groups.mutations.insert.table = "table_value"

request = spanner_v1.BatchWriteRequest(
session="session_value",
mutation_groups=mutation_groups,
)

# Make the request
stream = await client.batch_write(request=request)

# Handle the response
async for response in stream:
print(response)

Args:
request (Optional[Union[google.cloud.spanner_v1.types.BatchWriteRequest, dict]]):
The request object. The request for
[BatchWrite][google.spanner.v1.Spanner.BatchWrite].
session (:class:`str`):
Required. The session in which the
batch request is to be run.

This corresponds to the ``session`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
mutation_groups (:class:`MutableSequence[google.cloud.spanner_v1.types.BatchWriteRequest.MutationGroup]`):
Required. The groups of mutations to
be applied.

This corresponds to the ``mutation_groups`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.

Returns:
AsyncIterable[google.cloud.spanner_v1.types.BatchWriteResponse]:
The result of applying a batch of
mutations.

"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([session, mutation_groups])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

request = spanner.BatchWriteRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
if session is not None:
request.session = session
if mutation_groups:
request.mutation_groups.extend(mutation_groups)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.batch_write,
default_timeout=3600.0,
client_info=DEFAULT_CLIENT_INFO,
)

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((("session", request.session),)),
)

# Send the request.
response = rpc(
request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

async def __aenter__(self) -> "SpannerAsyncClient":
return self

Expand Down
137 changes: 137 additions & 0 deletions google/cloud/spanner_v1/services/spanner/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2119,6 +2119,143 @@ def sample_partition_read():
# Done; return the response.
return response

def batch_write(
self,
request: Optional[Union[spanner.BatchWriteRequest, dict]] = None,
*,
session: Optional[str] = None,
mutation_groups: Optional[
MutableSequence[spanner.BatchWriteRequest.MutationGroup]
] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> Iterable[spanner.BatchWriteResponse]:
r"""Batches the supplied mutation groups in a collection
of efficient transactions. All mutations in a group are
committed atomically. However, mutations across groups
can be committed non-atomically in an unspecified order
and thus, they must be independent of each other.
Partial failure is possible, i.e., some groups may have
been committed successfully, while some may have failed.
The results of individual batches are streamed into the
response as the batches are applied.

BatchWrite requests are not replay protected, meaning
that each mutation group may be applied more than once.
Replays of non-idempotent mutations may have undesirable
effects. For example, replays of an insert mutation may
produce an already exists error or if you use generated
or commit timestamp-based keys, it may result in
additional rows being added to the mutation's table. We
recommend structuring your mutation groups to be
idempotent to avoid this issue.

.. code-block:: python

# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in:
# https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import spanner_v1

def sample_batch_write():
# Create a client
client = spanner_v1.SpannerClient()

# Initialize request argument(s)
mutation_groups = spanner_v1.MutationGroup()
mutation_groups.mutations.insert.table = "table_value"

request = spanner_v1.BatchWriteRequest(
session="session_value",
mutation_groups=mutation_groups,
)

# Make the request
stream = client.batch_write(request=request)

# Handle the response
for response in stream:
print(response)

Args:
request (Union[google.cloud.spanner_v1.types.BatchWriteRequest, dict]):
The request object. The request for
[BatchWrite][google.spanner.v1.Spanner.BatchWrite].
session (str):
Required. The session in which the
batch request is to be run.

This corresponds to the ``session`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
mutation_groups (MutableSequence[google.cloud.spanner_v1.types.BatchWriteRequest.MutationGroup]):
Required. The groups of mutations to
be applied.

This corresponds to the ``mutation_groups`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.

Returns:
Iterable[google.cloud.spanner_v1.types.BatchWriteResponse]:
The result of applying a batch of
mutations.

"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([session, mutation_groups])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a spanner.BatchWriteRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
if not isinstance(request, spanner.BatchWriteRequest):
request = spanner.BatchWriteRequest(request)
# If we have keyword arguments corresponding to fields on the
# request, apply these.
if session is not None:
request.session = session
if mutation_groups is not None:
request.mutation_groups = mutation_groups

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[self._transport.batch_write]

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((("session", request.session),)),
)

# Send the request.
response = rpc(
request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

def __enter__(self) -> "SpannerClient":
return self

Expand Down
14 changes: 14 additions & 0 deletions google/cloud/spanner_v1/services/spanner/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=30.0,
client_info=client_info,
),
self.batch_write: gapic_v1.method.wrap_method(
self.batch_write,
default_timeout=3600.0,
client_info=client_info,
),
}

def close(self):
Expand Down Expand Up @@ -473,6 +478,15 @@ def partition_read(
]:
raise NotImplementedError()

@property
def batch_write(
self,
) -> Callable[
[spanner.BatchWriteRequest],
Union[spanner.BatchWriteResponse, Awaitable[spanner.BatchWriteResponse]],
]:
raise NotImplementedError()

@property
def kind(self) -> str:
raise NotImplementedError()
Expand Down
Loading