Skip to content

Commit

Permalink
Merge pull request #21 from vinted/fix/timeout-configuration
Browse files Browse the repository at this point in the history
fix: add callback timeout config
  • Loading branch information
gintarasm authored Mar 6, 2024
2 parents 4b02e8e + 8d89378 commit 4df9abb
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.Descriptors;
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
Expand Down Expand Up @@ -53,7 +51,7 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();

JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
.setMaxInflightBytes(this.writerSettings.getMaxInflightBytes())
.setMaxRetryDuration(this.writerSettings.getMaxRetryDuration())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setChannelProvider(BigQueryWriteSettings.defaultTransportChannelProvider())
.setExecutorProvider(executorProvider)
.setLocation(table.getProject())
.setWriterSchema(protoSchema);

StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ public interface BigQueryStreamWriter<T> extends AutoCloseable {
String getStreamName();

String getWriterId();

long getInflightWaitSeconds();
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
}
}

@Override
public long getInflightWaitSeconds() {
return writer.getInflightWaitSeconds();
}

@Override
public String getStreamName() {
return writer.getStreamName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
return writer.append(prows, offset);
}

@Override
public long getInflightWaitSeconds() {
return writer.getInflightWaitSeconds();
}

@Override
public String getStreamName() {
return writer.getStreamName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public class BigQueryStreamMetrics {
private double batchSizeInMb = 0.0;
private long splitBatchCount = 0;

private int timeoutCount = 0;

public BigQueryStreamMetrics(String streamName) {
this.streamName = streamName;
}
Expand Down Expand Up @@ -42,4 +44,12 @@ public double getBatchSizeInMb() {
public long getSplitBatchCount() {
return splitBatchCount;
}

public int getTimeoutCount() {
return timeoutCount;
}

public void incrementTimeoutCount() {
this.timeoutCount++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class WriterSettings implements Serializable {
private Long maxInflightRequests;
private Long maxInflightBytes;
private Duration maxRetryDuration;

private Duration maxRequestWaitCallbackTime;
private Boolean enableConnectionPool;

public int getStreamsPerTable() {
Expand Down Expand Up @@ -69,6 +71,14 @@ public static WriterSettingsBuilder newBuilder() {
return new WriterSettingsBuilder();
}

public Duration getMaxRequestWaitCallbackTime() {
return maxRequestWaitCallbackTime;
}

public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
}

public static final class WriterSettingsBuilder implements Serializable {
private int streamsPerTable = 1;
private int writerThreads = 1;
Expand All @@ -78,6 +88,7 @@ public static final class WriterSettingsBuilder implements Serializable {
private Long maxInflightRequests = 1000L;
private Long maxInflightBytes = 100L * 1024L * 1024L; // 100Mb.
private Duration maxRetryDuration = Duration.ofMinutes(5);
private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5);
private Boolean enableConnectionPool = false;

private WriterSettingsBuilder() {
Expand Down Expand Up @@ -123,6 +134,11 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) {
return this;
}

public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRetryDuration;
return this;
}

public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
Expand All @@ -139,6 +155,7 @@ public WriterSettings build() {
writerSettings.maxInflightRequests = this.maxInflightRequests;
writerSettings.retryPause = this.retryPause;
writerSettings.maxRetryDuration = this.maxRetryDuration;
writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime;
return writerSettings;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,22 @@ public BigQuerySinkWriter(

}

private void registerInflightMetric(String streamName, BigQueryStreamWriter<A> writer) {
var group = metricGroup
.addGroup("stream", streamName)
.addGroup("writer_id", writer.getWriterId());

group.gauge("inflight_wait_seconds", writer::getInflightWaitSeconds);
}

protected final BigQueryStreamWriter<A> streamWriter(String traceId, String streamName, TableId table) {
var streamWithIndex = String.format("%s-%s",streamName, streamIndexIterator.next());
return streamMap.computeIfAbsent(streamWithIndex, name -> {
logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamWithIndex);
// Stream name can't contain index
return this.clientProvider.getWriter(streamName, table, rowSerializer);
var writer = this.clientProvider.getWriter(streamName, table, rowSerializer);
registerInflightMetric(streamName, writer);
return writer;
});
}

Expand All @@ -91,6 +101,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str
logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName);
}
newWriter = this.clientProvider.getWriter(streamName, table, rowSerializer);
registerInflightMetric(streamName, writer);
}
return newWriter;
});
Expand All @@ -111,6 +122,7 @@ public void write(Rows<A> rows, Context context) {
group.gauge("batch_count", metric::getBatchCount);
group.gauge("batch_size_mb", metric::getBatchSizeInMb);
group.gauge("split_batch_count", metric::getSplitBatchCount);
group.gauge("callback_timeouts", metric::getTimeoutCount);

return metric;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ public void onFailure(Throwable t) {
case UNKNOWN:
if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage());
Optional.ofNullable(this.parent.metrics.get(rows.getStream()))
.ifPresent(BigQueryStreamMetrics::incrementTimeoutCount);
this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
retryWrite(t, retryCount - 1);
} else {
Expand Down

0 comments on commit 4df9abb

Please sign in to comment.