diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java
index 1a7badd..c233837 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java
@@ -65,7 +65,6 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro
switch (status.getCode()) {
case INTERNAL:
case CANCELLED:
- case ABORTED: {
logger.warn(createLogMessage.apply("Recoverable error. Retrying.., "), error);
try {
Thread.sleep(clientProvider.writeSettings().getRetryPause().toMillis());
@@ -73,6 +72,14 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro
throw new RuntimeException(e);
}
+ if (retryCount > 0) {
+ writeWithRetry(errorTraceId, errorRows, retryCount - 1);
+ } else {
+ throw error;
+ }
+ break;
+ case UNAVAILABLE:
+ case ABORTED: {
if (retryCount > 0) {
writeWithRetry(errorTraceId, errorRows, retryCount - 1);
} else {
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java
index 781bd2f..929d384 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java
@@ -24,9 +24,15 @@ public BigQueryJsonBufferedSinkWriter(Sink.InitContext sinkInitContext, RowValue
protected ApiFuture append(String traceId, Rows rows) {
var rowArray = new JSONArray();
rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
+ var writer = streamWriter(traceId, rows.getStream(), rows.getTable());
+
+ if (writer.isClosed() || writer.isUserClosed()) {
+ recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
+ writer = streamWriter(traceId, rows.getStream(), rows.getTable());
+ }
try {
- return streamWriter(traceId, rows.getStream(), rows.getTable()).append(rowArray, rows.getOffset());
+ return writer.append(rowArray, rows.getOffset());
} catch (Throwable t) {
return ApiFutures.immediateFailedFuture(t);
}
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java
index a45ec4b..db41a64 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java
@@ -34,6 +34,12 @@ protected ApiFuture append(String traceId, Rows rows) {
Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size));
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());
+ if (writer.isClosed() || writer.isUserClosed()) {
+ logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
+ recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
+ writer = streamWriter(traceId, rows.getStream(), rows.getTable());
+ }
+
logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());
try {
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java
index fb3b408..e011d51 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java
@@ -2,6 +2,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
+import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.protobuf.Descriptors;
@@ -28,6 +29,13 @@ protected ApiFuture append(String traceId, Rows rows) {
var rowArray = new JSONArray();
rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());
+
+ if (writer.isClosed() || writer.isUserClosed()) {
+ logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
+ recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
+ writer = streamWriter(traceId, rows.getStream(), rows.getTable());
+ }
+
logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());
try {
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java
index aa156ed..f124e7f 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java
@@ -41,6 +41,12 @@ protected ApiFuture append(String traceId, Rows rows) {
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());
+ if (writer.isClosed() || writer.isUserClosed()) {
+ logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
+ recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
+ writer = streamWriter(traceId, rows.getStream(), rows.getTable());
+ }
+
logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());
try {
return writer.append(prows);
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
index 5f53a3f..45899b3 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
@@ -130,13 +130,13 @@ public void onFailure(Throwable t) {
var status = Status.fromThrowable(t);
switch (status.getCode()) {
case INTERNAL:
- case ABORTED:
case CANCELLED:
case FAILED_PRECONDITION:
case DEADLINE_EXCEEDED:
doPauseBeforeRetry();
retryWrite(t, retryCount - 1);
break;
+ case ABORTED:
case UNAVAILABLE: {
this.parent.recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
retryWrite(t, retryCount - 1);