Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spanner): support transaction tags in partition DML #3473

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ If you are using Maven without the BOM, add this to your dependencies:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>6.81.0</version>
<version>6.81.1</version>
</dependency>

```
Expand Down Expand Up @@ -516,6 +516,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-spanner/tree/
| Create Instance Partition Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstancePartitionSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstancePartitionSample.java) |
| Create Instance With Autoscaling Config Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAutoscalingConfigExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAutoscalingConfigExample.java) |
| Create Instance With Processing Units Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithProcessingUnitsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithProcessingUnitsExample.java) |
| Create Instance Without Default Backup Schedules Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithoutDefaultBackupSchedulesExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithoutDefaultBackupSchedulesExample.java) |
| Create Sequence Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateSequenceSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateSequenceSample.java) |
| Create Table With Foreign Key Delete Cascade Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateTableWithForeignKeyDeleteCascadeSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateTableWithForeignKeyDeleteCascadeSample.java) |
| Custom Timeout And Retry Settings Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CustomTimeoutAndRetrySettingsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CustomTimeoutAndRetrySettingsExample.java) |
Expand Down Expand Up @@ -571,6 +572,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-spanner/tree/
| Update Database Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateDatabaseSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateDatabaseSample.java) |
| Update Database With Default Leader Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateDatabaseWithDefaultLeaderSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateDatabaseWithDefaultLeaderSample.java) |
| Update Instance Config Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateInstanceConfigSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateInstanceConfigSample.java) |
| Update Instance Default Backup Schedule Type Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateInstanceDefaultBackupScheduleTypeExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateInstanceDefaultBackupScheduleTypeExample.java) |
| Update Instance Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateInstanceExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateInstanceExample.java) |
| Update Json Data Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateJsonDataSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateJsonDataSample.java) |
| Update Jsonb Data Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/UpdateJsonbDataSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/UpdateJsonbDataSample.java) |
Expand Down
6 changes: 5 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -790,5 +790,9 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isAutoBatchDmlUpdateCountVerification()</method>
</difference>

<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>long executePartitionedUpdate(com.google.cloud.spanner.Statement, java.lang.String, com.google.cloud.spanner.Options$UpdateOption[])</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;
import javax.annotation.Nullable;

/**
* Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link
Expand Down Expand Up @@ -56,4 +57,10 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
throw new UnsupportedOperationException();
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
rahul2393 marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;
import javax.annotation.Nullable;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -601,4 +602,21 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* idempotent, such as deleting old rows from a very large table.
*/
long executePartitionedUpdate(Statement stmt, UpdateOption... options);

/**
* Executes a Partitioned DML statement with the specified transaction tag.
*
* <p>This method has the same behavior as {@link #executePartitionedUpdate(Statement,
* UpdateOption...)} but allows specifying a transaction tag that will be applied to all
* partitioned operations.
*
* @param stmt The Partitioned DML statement to execute
* @param transactionTag The transaction tag to apply to all partitioned operations. The tag must
* be a printable string (ASCII 32-126) with maximum length of 50 characters.
* @param options The options to use for the update operation
* @return The total number of rows modified by the statement
* @throws SpannerException if the operation failed
*/
long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,21 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti

@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
return executePartitionedUpdateWithOptions(stmt, null, options);
}

@Override
public long executePartitionedUpdate(
final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) {
return executePartitionedUpdateWithOptions(stmt, transactionTag, options);
}

private long executePartitionedUpdateWithOptions(
final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
return runWithSessionRetry(
session -> session.executePartitionedUpdate(stmt, transactionTag, options));
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;

Expand All @@ -54,13 +55,16 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction
private final SessionImpl session;
private final SpannerRpc rpc;
private final Ticker ticker;
private final @Nullable String transactionTag;
private final IsRetryableInternalError isRetryableInternalErrorPredicate;
private volatile boolean isValid = true;

PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) {
PartitionedDmlTransaction(
SessionImpl session, SpannerRpc rpc, Ticker ticker, @Nullable String transactionTag) {
this.session = session;
this.rpc = rpc;
this.ticker = ticker;
this.transactionTag = transactionTag;
this.isRetryableInternalErrorPredicate = new IsRetryableInternalError();
}

Expand Down Expand Up @@ -194,22 +198,29 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
if (options.hasTag()) {
requestOptionsBuilder.setRequestTag(options.tag());
}
if (transactionTag != null) {
requestOptionsBuilder.setTransactionTag(transactionTag);
}
builder.setRequestOptions(requestOptionsBuilder.build());
}
return builder.build();
}

private ByteString initTransaction(final Options options) {
final BeginTransactionRequest request =
BeginTransactionRequest.Builder builder =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE));

if (transactionTag != null) {
builder.setRequestOptions(
RequestOptions.newBuilder().setTransactionTag(transactionTag).build());
}
Transaction tx = rpc.beginTransaction(builder.build(), session.getOptions(), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,18 @@ public DatabaseId getDatabaseId() {
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
setActive(null);
PartitionedDmlTransaction txn =
new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker());
new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker(), null);
return txn.executeStreamingPartitionedUpdate(
stmt, spanner.getOptions().getPartitionedDmlTimeout(), options);
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
setActive(null);
PartitionedDmlTransaction txn =
new PartitionedDmlTransaction(
this, spanner.getRpc(), Ticker.systemTicker(), transactionTag);
return txn.executeStreamingPartitionedUpdate(
stmt, spanner.getOptions().getPartitionedDmlTimeout(), options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,11 @@ default AsyncTransactionManager transactionManagerAsync(TransactionOption... opt
default long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
return get().executePartitionedUpdate(stmt, options);
}

default long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
return get().executePartitionedUpdate(stmt, transactionTag, options);
}
}

class PooledSessionFutureWrapper implements SessionFutureWrapper<PooledSessionFuture> {
Expand Down Expand Up @@ -1494,6 +1499,16 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
}
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
try {
return get(true).executePartitionedUpdate(stmt, transactionTag, options);
} finally {
close();
}
}

@Override
public String getName() {
return get().getName();
Expand Down Expand Up @@ -1709,6 +1724,18 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
}
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options)
throws SpannerException {
try {
markUsed();
return delegate.executePartitionedUpdate(stmt, transactionTag, options);
} catch (SpannerException e) {
throw lastException = e;
}
}

@Override
public ReadContext singleUse() {
return delegate.singleUse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1944,6 +1944,30 @@ public void testPartitionedDMLWithTag() {
assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
}

@Test
public void testPartitionedDMLWithTransactionTag() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
client.executePartitionedUpdate(
UPDATE_STATEMENT, "testTransactionTag", Options.tag("app=spanner,env=test,action=dml"));

List<BeginTransactionRequest> beginTransactions =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
assertThat(beginTransactions).hasSize(1);
BeginTransactionRequest beginTransaction = beginTransactions.get(0);
assertNotNull(beginTransaction.getOptions());
assertTrue(beginTransaction.getOptions().hasPartitionedDml());
assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams());

List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
assertThat(requests).hasSize(1);
ExecuteSqlRequest request = requests.get(0);
assertNotNull(request.getRequestOptions());
assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("testTransactionTag");
assertThat(request.getRequestOptions().getRequestTag())
.isEqualTo("app=spanner,env=test,action=dml");
}

@Test
public void testCommitWithTag() {
DatabaseClient client =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setup() {
when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)))
.thenReturn(Transaction.newBuilder().setId(txId).build());

tx = new PartitionedDmlTransaction(session, rpc, ticker);
tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
}

@Test
Expand Down Expand Up @@ -332,7 +332,7 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() {
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream2);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));

assertThat(count).isEqualTo(1000L);
Expand Down Expand Up @@ -371,7 +371,7 @@ public void testExecuteStreamingPartitionedUpdateRSTstream() {
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream2);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));

assertThat(count).isEqualTo(1000L);
Expand Down Expand Up @@ -400,7 +400,7 @@ public void testExecuteStreamingPartitionedUpdateGenericInternalException() {
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream1);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
SpannerException e =
assertThrows(
SpannerException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1729,10 +1729,13 @@ public void testSessionNotFoundPartitionedUpdate() {
SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName);
Statement statement = Statement.of("UPDATE FOO SET BAR=1 WHERE 1=1");
final SessionImpl closedSession = mockSession();
when(closedSession.executePartitionedUpdate(statement)).thenThrow(sessionNotFound);
when(closedSession.executePartitionedUpdate(any(Statement.class))).thenThrow(sessionNotFound);
when(closedSession.executePartitionedUpdate(any(Statement.class), any(), any()))
.thenThrow(sessionNotFound);

final SessionImpl openSession = mockSession();
when(openSession.executePartitionedUpdate(statement)).thenReturn(1L);
when(openSession.executePartitionedUpdate(any(Statement.class))).thenReturn(1L);
when(openSession.executePartitionedUpdate(any(Statement.class), any(), any())).thenReturn(1L);
doAnswer(
invocation -> {
executor.submit(
Expand Down
Loading