Skip to content

Commit

Permalink
Merge pull request #18 from vinted/feat/handle-closed-streams
Browse files Browse the repository at this point in the history
feat: handle when writer is closed
  • Loading branch information
gintarasm authored Feb 8, 2024
2 parents f562e73 + 3443e8d commit 7a8d911
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,21 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
switch (status.getCode()) {
case INTERNAL:
case CANCELLED:
case ABORTED: {
logger.warn(createLogMessage.apply("Recoverable error. Retrying.., "), error);
try {
Thread.sleep(clientProvider.writeSettings().getRetryPause().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (retryCount > 0) {
writeWithRetry(errorTraceId, errorRows, retryCount - 1);
} else {
throw error;
}
break;
case UNAVAILABLE:
case ABORTED: {
if (retryCount > 0) {
writeWithRetry(errorTraceId, errorRows, retryCount - 1);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ public BigQueryJsonBufferedSinkWriter(Sink.InitContext sinkInitContext, RowValue
protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
var rowArray = new JSONArray();
rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());

if (writer.isClosed() || writer.isUserClosed()) {
recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
writer = streamWriter(traceId, rows.getStream(), rows.getTable());
}

try {
return streamWriter(traceId, rows.getStream(), rows.getTable()).append(rowArray, rows.getOffset());
return writer.append(rowArray, rows.getOffset());
} catch (Throwable t) {
return ApiFutures.immediateFailedFuture(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size));
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());

if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
writer = streamWriter(traceId, rows.getStream(), rows.getTable());
}

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.protobuf.Descriptors;
Expand All @@ -28,6 +29,13 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
var rowArray = new JSONArray();
rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());

if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
writer = streamWriter(traceId, rows.getStream(), rows.getTable());
}

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {

var writer = streamWriter(traceId, rows.getStream(), rows.getTable());

if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
writer = streamWriter(traceId, rows.getStream(), rows.getTable());
}

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());
try {
return writer.append(prows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ public void onFailure(Throwable t) {
var status = Status.fromThrowable(t);
switch (status.getCode()) {
case INTERNAL:
case ABORTED:
case CANCELLED:
case FAILED_PRECONDITION:
case DEADLINE_EXCEEDED:
doPauseBeforeRetry();
retryWrite(t, retryCount - 1);
break;
case ABORTED:
case UNAVAILABLE: {
this.parent.recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
retryWrite(t, retryCount - 1);
Expand Down

0 comments on commit 7a8d911

Please sign in to comment.