Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable max rows per streaming request #237

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ public void put(Collection<SinkRecord> records) {
recordConverter);
} else {
TableWriter.Builder simpleTableWriterBuilder =
new TableWriter.Builder(bigQueryWriter, table, recordConverter);
new TableWriter.Builder(bigQueryWriter, table, recordConverter,
config.getInt(BigQuerySinkConfig.BQ_STREAMING_MAX_ROWS_PER_REQUEST_CONFIG));
if (upsertDelete) {
simpleTableWriterBuilder.onFinish(rows ->
mergeBatches.onRowWrites(table.getBaseTableId(), rows));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ public class BigQuerySinkConfig extends AbstractConfig {
"The interval, in seconds, in which to attempt to run GCS to BQ load jobs. Only relevant "
+ "if enableBatchLoad is configured.";

public static final String BQ_STREAMING_MAX_ROWS_PER_REQUEST_CONFIG = "bqStreamingMaxRowsPerRequest";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to - maxRowsPerRequest

private static final ConfigDef.Type BQ_STREAMING_MAX_ROWS_PER_REQUEST_TYPE = ConfigDef.Type.INT;
private static final Integer BQ_STREAMING_MAX_ROWS_PER_REQUEST_DEFAULT = 50000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have the default behaviour same. We can use '-1' to say this is disabled and have that as the default

private static final ConfigDef.Importance BQ_STREAMING_MAX_ROWS_PER_REQUEST_IMPORTANCE = ConfigDef.Importance.LOW;
private static final String BQ_STREAMING_MAX_ROWS_PER_REQUEST_DOC =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum number of rows to be sent in one batch in the request payload to bigquery.
This can reduce number of failed calls due to Request Too Large if the payload exceeds BigQuery specified quota limits. (https://cloud.google.com/bigquery/quotas#write-api-limits)
Setting it to a low value can result in degraded performance of the connector

"Due to BQ streaming put limitations, the max request size is 10MB. " +
"Hence, considering that in average 1 record takes at least 20 bytes, " +
"if we have big batches (e.g. 500000) we might need to run against BigQuery multiple requests " +
"that would return a `Request Too Large` before finding the right size. " +
"This config allows starting from a lower value altogether and reduce the amount of failed requests. " +
"Only works with simple TableWriter (no GCS)";

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add a validator as well with minimum and maximum values allowed.
-1 -> default
1 -> min
50,000 -> max (https://cloud.google.com/bigquery/quotas#write-api-limits)

public static final String GCS_BUCKET_NAME_CONFIG = "gcsBucketName";
private static final ConfigDef.Type GCS_BUCKET_NAME_TYPE = ConfigDef.Type.STRING;
private static final Object GCS_BUCKET_NAME_DEFAULT = "";
Expand Down Expand Up @@ -518,6 +530,12 @@ public static ConfigDef getConfig() {
GCS_FOLDER_NAME_DEFAULT,
GCS_FOLDER_NAME_IMPORTANCE,
GCS_FOLDER_NAME_DOC
).define(
BQ_STREAMING_MAX_ROWS_PER_REQUEST_CONFIG,
BQ_STREAMING_MAX_ROWS_PER_REQUEST_TYPE,
BQ_STREAMING_MAX_ROWS_PER_REQUEST_DEFAULT,
BQ_STREAMING_MAX_ROWS_PER_REQUEST_IMPORTANCE,
BQ_STREAMING_MAX_ROWS_PER_REQUEST_DOC
).define(
PROJECT_CONFIG,
PROJECT_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,32 @@ public class TableWriter implements Runnable {
private final PartitionedTableId table;
private final SortedMap<SinkRecord, RowToInsert> rows;
private final Consumer<Collection<RowToInsert>> onFinish;
private int bqStreamingMaxRowsPerRequest;

/**
* @param writer the {@link BigQueryWriter} to use.
* @param table the BigQuery table to write to.
* @param rows the rows to write.
* @param onFinish a callback to invoke after all rows have been written successfully, which is
* called with all the rows written by the writer
* @param writer the {@link BigQueryWriter} to use.
* @param table the BigQuery table to write to.
* @param rows the rows to write.
* @param onFinish a callback to invoke after all rows have been written successfully, which is
* called with all the rows written by the writer
* @param bqStreamingMaxRowsPerRequest max rows per InsertAll request
*/
public TableWriter(BigQueryWriter writer,
PartitionedTableId table,
SortedMap<SinkRecord, RowToInsert> rows,
Consumer<Collection<RowToInsert>> onFinish) {
Consumer<Collection<RowToInsert>> onFinish,
int bqStreamingMaxRowsPerRequest) {
this.writer = writer;
this.table = table;
this.rows = rows;
this.onFinish = onFinish;
this.bqStreamingMaxRowsPerRequest = bqStreamingMaxRowsPerRequest;
}

@Override
public void run() {
int currentIndex = 0;
int currentBatchSize = rows.size();
int currentBatchSize = Math.min(bqStreamingMaxRowsPerRequest, rows.size());
int successCount = 0;
int failureCount = 0;

Expand All @@ -95,9 +99,9 @@ public void run() {
successCount++;
} catch (BigQueryException err) {
logger.warn(
"Could not write batch of size {} to BigQuery. "
"Could not write batch of size {} to BigQuery table `{}`. "
+ "Error code: {}, underlying error (if present): {}",
currentBatchList.size(), err.getCode(), err.getError(), err);
currentBatchList.size(), table, err.getCode(), err.getError(), err);
if (isBatchSizeError(err)) {
failureCount++;
currentBatchSize = getNewBatchSize(currentBatchSize, err);
Expand All @@ -114,11 +118,11 @@ public void run() {
// Common case is 1 successful call and 0 failed calls:
// Write to info if uncommon case,
// Write to debug if common case
String logMessage = "Wrote {} rows over {} successful calls and {} failed calls.";
String logMessage = "Wrote {} rows over {} successful calls and {} failed calls for table {}.";
if (successCount + failureCount > 1) {
logger.info(logMessage, rows.size(), successCount, failureCount);
logger.info(logMessage, rows.size(), successCount, failureCount, table);
} else {
logger.debug(logMessage, rows.size(), successCount, failureCount);
logger.debug(logMessage, rows.size(), successCount, failureCount, table);
}

onFinish.accept(rows.values());
Expand Down Expand Up @@ -170,6 +174,7 @@ private static boolean isBatchSizeError(BigQueryException exception) {
public static class Builder implements TableWriterBuilder {
private final BigQueryWriter writer;
private final PartitionedTableId table;
private final int bqStreamingMaxRowsPerRequest;

private SortedMap<SinkRecord, RowToInsert> rows;
private SinkRecordConverter recordConverter;
Expand All @@ -180,9 +185,11 @@ public static class Builder implements TableWriterBuilder {
* @param table the BigQuery table to write to.
* @param recordConverter the record converter used to convert records to rows
*/
public Builder(BigQueryWriter writer, PartitionedTableId table, SinkRecordConverter recordConverter) {
public Builder(BigQueryWriter writer, PartitionedTableId table, SinkRecordConverter recordConverter,
int bqStreamingMaxRowsPerRequest) {
this.writer = writer;
this.table = table;
this.bqStreamingMaxRowsPerRequest = bqStreamingMaxRowsPerRequest;

this.rows = new TreeMap<>(Comparator.comparing(SinkRecord::kafkaPartition)
.thenComparing(SinkRecord::kafkaOffset));
Expand Down Expand Up @@ -211,7 +218,7 @@ public void onFinish(Consumer<Collection<RowToInsert>> onFinish) {

@Override
public TableWriter build() {
return new TableWriter(writer, table, rows, onFinish != null ? onFinish : n -> { });
return new TableWriter(writer, table, rows, onFinish != null ? onFinish : n -> { }, bqStreamingMaxRowsPerRequest);
}
}
}