diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java index 2981adb8a..a8ecaf562 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java @@ -20,6 +20,8 @@ import com.google.cloud.bigquery.BigQuery; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; + +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException; import com.wepay.kafka.connect.bigquery.utils.Version; @@ -106,12 +108,12 @@ public List> taskConfigs(int maxTasks) { logger.trace("connector.taskConfigs()"); List> taskConfigs = new ArrayList<>(); for (int i = 0; i < maxTasks; i++) { - // Copy configProperties so that tasks can't interfere with each others' configurations HashMap taskConfig = new HashMap<>(configProperties); if (i == 0 && !config.getList(BigQuerySinkConfig.ENABLE_BATCH_CONFIG).isEmpty()) { // if batch loading is enabled, configure first task to do the GCS -> BQ loading taskConfig.put(GCS_BQ_TASK_CONFIG_KEY, "true"); } + taskConfig.put(BigQuerySinkTaskConfig.TASK_ID_CONFIG, Integer.toString(i)); taskConfigs.add(taskConfig); } return taskConfigs; diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index b338e27b0..96679b5f3 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -28,7 +28,6 @@ import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; -import com.wepay.kafka.connect.bigquery.convert.RecordConverter; import com.wepay.kafka.connect.bigquery.convert.SchemaConverter; import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; @@ -38,12 +37,14 @@ import com.wepay.kafka.connect.bigquery.utils.Version; import com.wepay.kafka.connect.bigquery.write.batch.GCSBatchTableWriter; import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor; +import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches; import com.wepay.kafka.connect.bigquery.write.batch.TableWriter; import com.wepay.kafka.connect.bigquery.write.batch.TableWriterBuilder; import com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter; import com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter; import com.wepay.kafka.connect.bigquery.write.row.GCSToBQWriter; import com.wepay.kafka.connect.bigquery.write.row.SimpleBigQueryWriter; +import com.wepay.kafka.connect.bigquery.write.row.UpsertDeleteBigQueryWriter; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; @@ -66,6 +67,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; /** * A {@link SinkTask} used to translate Kafka Connect {@link SinkRecord SinkRecords} into BigQuery @@ -74,6 +76,8 @@ public class BigQuerySinkTask extends SinkTask { private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkTask.class); + private AtomicReference bigQuery; + private AtomicReference schemaManager; private SchemaRetriever schemaRetriever; private BigQueryWriter bigQueryWriter; private GCSToBQWriter gcsToBQWriter; @@ -83,6 +87,10 @@ public class BigQuerySinkTask extends SinkTask { private boolean useMessageTimeDatePartitioning; private boolean usePartitionDecorator; private boolean sanitize; + private boolean upsertDelete; + private MergeBatches mergeBatches; + private MergeQueries mergeQueries; + private long mergeRecordsThreshold; private TopicPartitionManager topicPartitionManager; @@ -94,7 +102,7 @@ public class BigQuerySinkTask extends SinkTask { private final SchemaManager testSchemaManager; private final UUID uuid = UUID.randomUUID(); - private ScheduledExecutorService gcsLoadExecutor; + private ScheduledExecutorService loadExecutor; /** * Create a new BigquerySinkTask. @@ -137,11 +145,23 @@ private void maybeEnsureExistingTable(TableId table) { BigQuery bigQuery = getBigQuery(); if (bigQuery.getTable(table) == null && !config.getBoolean(config.TABLE_CREATE_CONFIG)) { throw new BigQueryConnectException("Table '" + table + "' does not exist. " + - "You may want to enable auto table creation by setting " + config.TABLE_CREATE_CONFIG - + "=true in the properties file"); + "You may want to enable auto table creation by setting " + config.TABLE_CREATE_CONFIG + + "=true in the properties file"); } } + @Override + public Map preCommit(Map offsets) { + if (upsertDelete) { + Map result = mergeBatches.latestOffsets(); + checkQueueSize(); + return result; + } + + flush(offsets); + return offsets; + } + private PartitionedTableId getRecordTable(SinkRecord record) { String tableName; String dataset = config.getString(config.DEFAULT_DATASET_CONFIG); @@ -161,11 +181,16 @@ private PartitionedTableId getRecordTable(SinkRecord record) { tableName = FieldNameSanitizer.sanitizeName(tableName); } TableId baseTableId = TableId.of(dataset, tableName); - maybeEnsureExistingTable(baseTableId); + if (upsertDelete) { + TableId intermediateTableId = mergeBatches.intermediateTableFor(baseTableId); + // If upsert/delete is enabled, we want to stream into a non-partitioned intermediate table + return new PartitionedTableId.Builder(intermediateTableId).build(); + } else { + maybeEnsureExistingTable(baseTableId); + } PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId); if (usePartitionDecorator) { - if (useMessageTimeDatePartitioning) { if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) { throw new ConnectException( @@ -182,7 +207,12 @@ private PartitionedTableId getRecordTable(SinkRecord record) { @Override public void put(Collection records) { - logger.info("Putting {} records in the sink.", records.size()); + if (upsertDelete) { + // Periodically poll for errors here instead of doing a stop-the-world check in flush() + executor.maybeThrowEncounteredErrors(); + } + + logger.debug("Putting {} records in the sink.", records.size()); // create tableWriters Map tableWriterBuilders = new HashMap<>(); @@ -206,12 +236,17 @@ public void put(Collection records) { gcsBlobName, recordConverter); } else { - tableWriterBuilder = + TableWriter.Builder simpleTableWriterBuilder = new TableWriter.Builder(bigQueryWriter, table, recordConverter); + if (upsertDelete) { + simpleTableWriterBuilder.onFinish(rows -> + mergeBatches.onRowWrites(table.getBaseTableId(), rows)); + } + tableWriterBuilder = simpleTableWriterBuilder; } tableWriterBuilders.put(table, tableWriterBuilder); } - tableWriterBuilders.get(table).addRow(record); + tableWriterBuilders.get(table).addRow(record, table.getBaseTableId()); } } @@ -221,6 +256,18 @@ public void put(Collection records) { } // check if we should pause topics + checkQueueSize(); + } + + // Important: this method is only safe to call during put(), flush(), or preCommit(); otherwise, + // a ConcurrentModificationException may be triggered if the Connect framework is in the middle of + // a method invocation on the consumer for this task. This becomes especially likely if all topics + // have been paused as the framework will most likely be in the middle of a poll for that consumer + // which, because all of its topics have been paused, will not return until it's time for the next + // offset commit. Invoking context.requestCommit() won't wake up the consumer in that case, so we + // really have no choice but to wait for the framework to call a method on this task that implies + // that it's safe to pause or resume partitions on the consumer. + private void checkQueueSize() { long queueSoftLimit = config.getLong(BigQuerySinkTaskConfig.QUEUE_SIZE_CONFIG); if (queueSoftLimit != -1) { int currentQueueSize = executor.getQueue().size(); @@ -237,16 +284,24 @@ private BigQuery getBigQuery() { if (testBigQuery != null) { return testBigQuery; } + return bigQuery.updateAndGet(bq -> bq != null ? bq : newBigQuery()); + } + + private BigQuery newBigQuery() { String projectName = config.getString(config.PROJECT_CONFIG); String keyFile = config.getKeyFile(); String keySource = config.getString(config.KEY_SOURCE_CONFIG); return new BigQueryHelper().setKeySource(keySource).connect(projectName, keyFile); } - private SchemaManager getSchemaManager(BigQuery bigQuery) { + private SchemaManager getSchemaManager() { if (testSchemaManager != null) { return testSchemaManager; } + return schemaManager.updateAndGet(sm -> sm != null ? sm : newSchemaManager()); + } + + private SchemaManager newSchemaManager() { schemaRetriever = config.getSchemaRetriever(); SchemaConverter schemaConverter = config.getSchemaConverter(); @@ -256,8 +311,10 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) { Optional> clusteringFieldName = config.getClusteringPartitionFieldName(); boolean allowNewBQFields = config.getBoolean(config.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG); boolean allowReqFieldRelaxation = config.getBoolean(config.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG); - return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, allowNewBQFields, allowReqFieldRelaxation, kafkaKeyFieldName, - kafkaDataFieldName, timestampPartitionFieldName, clusteringFieldName); + return new SchemaManager(schemaRetriever, schemaConverter, getBigQuery(), + allowNewBQFields, allowReqFieldRelaxation, + kafkaKeyFieldName, kafkaDataFieldName, + timestampPartitionFieldName, clusteringFieldName); } private BigQueryWriter getBigQueryWriter() { @@ -267,9 +324,16 @@ private BigQueryWriter getBigQueryWriter() { int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG); long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG); BigQuery bigQuery = getBigQuery(); - if (autoCreateTables || allowNewBigQueryFields || allowRequiredFieldRelaxation ) { + if (upsertDelete) { + return new UpsertDeleteBigQueryWriter(bigQuery, + getSchemaManager(), + retry, + retryWait, + autoCreateTables, + mergeBatches.intermediateToDestinationTables()); + } else if (autoCreateTables || allowNewBigQueryFields || allowRequiredFieldRelaxation) { return new AdaptiveBigQueryWriter(bigQuery, - getSchemaManager(bigQuery), + getSchemaManager(), retry, retryWait, autoCreateTables); @@ -296,7 +360,7 @@ private GCSToBQWriter getGcsWriter() { boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG); // schemaManager shall only be needed for creating table hence do not fetch instance if not // needed. - SchemaManager schemaManager = autoCreateTables ? getSchemaManager(bigQuery) : null; + SchemaManager schemaManager = autoCreateTables ? getSchemaManager() : null; return new GCSToBQWriter(getGcs(), bigQuery, schemaManager, @@ -305,11 +369,8 @@ private GCSToBQWriter getGcsWriter() { autoCreateTables); } - private SinkRecordConverter getConverter(BigQuerySinkConfig config) { - return new SinkRecordConverter(config.getRecordConverter(), - config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG), - config.getKafkaKeyFieldName(), - config.getKafkaDataFieldName()); + private SinkRecordConverter getConverter(BigQuerySinkTaskConfig config) { + return new SinkRecordConverter(config, mergeBatches, mergeQueries); } @Override @@ -325,6 +386,22 @@ public void start(Map properties) { err ); } + upsertDelete = config.getBoolean(config.UPSERT_ENABLED_CONFIG) + || config.getBoolean(config.DELETE_ENABLED_CONFIG); + + bigQuery = new AtomicReference<>(); + schemaManager = new AtomicReference<>(); + + if (upsertDelete) { + String intermediateTableSuffix = String.format("_%s_%d_%s_%d", + config.getString(config.INTERMEDIATE_TABLE_SUFFIX_CONFIG), + config.getInt(config.TASK_ID_CONFIG), + uuid, + Instant.now().toEpochMilli() + ); + mergeBatches = new MergeBatches(intermediateTableSuffix); + mergeRecordsThreshold = config.getLong(config.MERGE_RECORDS_THRESHOLD_CONFIG); + } bigQueryWriter = getBigQueryWriter(); gcsToBQWriter = getGcsWriter(); @@ -339,12 +416,16 @@ public void start(Map properties) { config.getBoolean(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG); if (hasGCSBQTask) { startGCSToBQLoadTask(); + } else if (upsertDelete) { + mergeQueries = + new MergeQueries(config, mergeBatches, executor, getBigQuery(), getSchemaManager(), context); + maybeStartMergeFlushTask(); } } private void startGCSToBQLoadTask() { logger.info("Attempting to start GCS Load Executor."); - gcsLoadExecutor = Executors.newScheduledThreadPool(1); + loadExecutor = Executors.newScheduledThreadPool(1); String bucketName = config.getString(config.GCS_BUCKET_NAME_CONFIG); Storage gcs = getGcs(); // get the bucket, or create it if it does not exist. @@ -362,29 +443,50 @@ private void startGCSToBQLoadTask() { GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket); int intervalSec = config.getInt(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG); - gcsLoadExecutor.scheduleAtFixedRate(loadRunnable, intervalSec, intervalSec, TimeUnit.SECONDS); + loadExecutor.scheduleAtFixedRate(loadRunnable, intervalSec, intervalSec, TimeUnit.SECONDS); + } + + private void maybeStartMergeFlushTask() { + long intervalMs = config.getLong(config.MERGE_INTERVAL_MS_CONFIG); + if (intervalMs == -1) { + logger.info("{} is set to -1; periodic merge flushes are disabled", config.MERGE_INTERVAL_MS_CONFIG); + return; + } + logger.info("Attempting to start upsert/delete load executor"); + loadExecutor = Executors.newScheduledThreadPool(1); + loadExecutor.scheduleAtFixedRate( + mergeQueries::mergeFlushAll, intervalMs, intervalMs, TimeUnit.MILLISECONDS); } @Override public void stop() { try { - executor.shutdown(); - executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); - if (gcsLoadExecutor != null) { - try { - logger.info("Attempting to shut down GCS Load Executor."); - gcsLoadExecutor.shutdown(); - gcsLoadExecutor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - logger.warn("Could not shut down GCS Load Executor within {}s.", - EXECUTOR_SHUTDOWN_TIMEOUT_SEC); - } + if (upsertDelete) { + mergeBatches.intermediateTables().forEach(table -> { + logger.debug("Deleting intermediate table {}", table); + getBigQuery().delete(table); + }); } - } catch (InterruptedException ex) { - logger.warn("{} active threads are still executing tasks {}s after shutdown is signaled.", - executor.getActiveCount(), EXECUTOR_SHUTDOWN_TIMEOUT_SEC); } finally { - logger.trace("task.stop()"); + try { + executor.shutdown(); + executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); + if (loadExecutor != null) { + try { + logger.info("Attempting to shut down load executor."); + loadExecutor.shutdown(); + loadExecutor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + logger.warn("Could not shut down load executor within {}s.", + EXECUTOR_SHUTDOWN_TIMEOUT_SEC); + } + } + } catch (InterruptedException ex) { + logger.warn("{} active threads are still executing tasks {}s after shutdown is signaled.", + executor.getActiveCount(), EXECUTOR_SHUTDOWN_TIMEOUT_SEC); + } finally { + logger.trace("task.stop()"); + } } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java new file mode 100644 index 000000000..df9272af7 --- /dev/null +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java @@ -0,0 +1,336 @@ +package com.wepay.kafka.connect.bigquery; + +/* + * Copyright 2016 WePay, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableId; +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; +import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class MergeQueries { + public static final String INTERMEDIATE_TABLE_KEY_FIELD_NAME = "key"; + public static final String INTERMEDIATE_TABLE_VALUE_FIELD_NAME = "value"; + public static final String INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME = "partitionTime"; + public static final String INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD = "batchNumber"; + + private static final Logger logger = LoggerFactory.getLogger(MergeQueries.class); + + private final String keyFieldName; + private final boolean usePartitionDecorator; + private final boolean upsertEnabled; + private final boolean deleteEnabled; + private final MergeBatches mergeBatches; + private final ExecutorService executor; + private final BigQuery bigQuery; + private final SchemaManager schemaManager; + private final SinkTaskContext context; + + public MergeQueries(BigQuerySinkTaskConfig config, + MergeBatches mergeBatches, + ExecutorService executor, + BigQuery bigQuery, + SchemaManager schemaManager, + SinkTaskContext context) { + this.keyFieldName = config.getKafkaKeyFieldName().orElseThrow(() -> + new ConnectException("Kafka key field must be configured when upsert/delete is enabled") + ); + this.usePartitionDecorator = config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG); + this.upsertEnabled = config.getBoolean(config.UPSERT_ENABLED_CONFIG); + this.deleteEnabled = config.getBoolean(config.DELETE_ENABLED_CONFIG); + + this.mergeBatches = mergeBatches; + this.executor = executor; + this.bigQuery = bigQuery; + this.schemaManager = schemaManager; + this.context = context; + } + + public void mergeFlushAll() { + logger.debug("Triggering merge flush for all tables"); + mergeBatches.intermediateTables().forEach(this::mergeFlush); + } + + public void mergeFlush(TableId intermediateTable) { + final TableId destinationTable = mergeBatches.destinationTableFor(intermediateTable); + final int batchNumber = mergeBatches.incrementBatch(intermediateTable); + logger.trace("Triggering merge flush from intermediate table {} to destination table {} for batch {}", + intermediateTable, destinationTable, batchNumber); + + executor.submit(() -> { + // If there are rows to flush in this batch, flush them + if (mergeBatches.prepareToFlush(intermediateTable, batchNumber)) { + try { + logger.debug("Running merge query on batch {} from intermediate table {}", + batchNumber, intermediateTable); + String mergeFlushQuery = mergeFlushQuery(intermediateTable, destinationTable, batchNumber); + logger.trace(mergeFlushQuery); + bigQuery.query(QueryJobConfiguration.of(mergeFlushQuery)); + logger.trace("Merge from intermediate table {} to destination table {} completed", + intermediateTable, destinationTable); + } catch (Throwable t) { + logger.warn("Failed on merge flush from intermediate table {} to destination table {}", + intermediateTable, destinationTable, t); + throw new ConnectException( + String.format("Failed to perform merge flush from intermediate table %s to destination table %s", + intermediateTable, + destinationTable), + t); + } + + logger.debug("Recording flush success for batch {} from {}", + batchNumber, intermediateTable); + mergeBatches.recordSuccessfulFlush(intermediateTable, batchNumber); + + // Commit those offsets ASAP + context.requestCommit(); + + logger.info("Completed merge flush of batch {} from {} to {}", + batchNumber, intermediateTable, destinationTable); + } + + // After, regardless of whether we flushed or not, clean up old batches from the intermediate + // table. Some rows may be several batches old but still in the table if they were in the + // streaming buffer during the last purge. + try { + logger.trace("Clearing batches from {} on back from intermediate table {}", batchNumber, intermediateTable); + String tableClearQuery = clearBatchQuery(intermediateTable, batchNumber); + logger.trace(tableClearQuery); + bigQuery.query(QueryJobConfiguration.of(tableClearQuery)); + } catch (Throwable t) { + logger.error("Failed to clear old batches from intermediate table {}", intermediateTable, t); + throw new ConnectException( + String.format("Failed to clear old batches from intermediate table %s", + intermediateTable), + t); + } + }); + } + + /* + + upsert+delete: + + MERGE ``.`` + USING ( + SELECT * FROM ( + SELECT ARRAY_AGG( + x ORDER BY partitionTime DESC LIMIT 1 + )[OFFSET(0)] src + FROM ``.`` x + WHERE batchNumber= + GROUP BY key.[, key....] + ) + ) + ON ``.=`src`.key + WHEN MATCHED AND `src`.value IS NOT NULL + THEN UPDATE SET =`src`.value.[, =`src`.value....] + WHEN MATCHED AND `src`.value IS NULL + THEN DELETE + WHEN NOT MATCHED AND `src`.value IS NOT NULL + THEN INSERT (, _PARTITIONTIME, [, ]) + VALUES ( + `src`.key, + CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), + `src`.value.[, `src`.value....] + ); + + + delete only: + + MERGE ``.`` + USING ( + SELECT * FROM ( + SELECT ARRAY_AGG( + x ORDER BY partitionTime DESC LIMIT 1 + )[OFFSET(0)] src + FROM ``.`` x + WHERE batchNumber= + GROUP BY key.[, key....] + ) + ) + ON ``.=`src`.key AND `src`.value IS NULL + WHEN MATCHED + THEN DELETE + WHEN NOT MATCHED + THEN INSERT (, _PARTITIONTIME, [, ]) + VALUES ( + `src`.key, + CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), + `src`.value.[, `src`.value....] + ); + + + upsert only: + + MERGE ``.`` + USING ( + SELECT * FROM ( + SELECT ARRAY_AGG( + x ORDER BY partitionTime DESC LIMIT 1 + )[OFFSET(0)] src + FROM ``.`` x + WHERE batchNumber= + GROUP BY key.[, key....] + ) + ) + ON ``.=`src`.key + WHEN MATCHED + THEN UPDATE SET =`src`.value.[, ...] + WHEN NOT MATCHED + THEN INSERT ([, `src`.value....] + ); + + */ + private String mergeFlushQuery(TableId intermediateTable, TableId destinationTable, int batchNumber) { + Schema intermediateSchema = schemaManager.cachedSchema(intermediateTable); + + String srcKey = INTERMEDIATE_TABLE_KEY_FIELD_NAME; + + List keyFields = listFields( + intermediateSchema.getFields().get(keyFieldName).getSubFields(), + srcKey + "." + ); + List dstValueFields = intermediateSchema.getFields().get(INTERMEDIATE_TABLE_VALUE_FIELD_NAME).getSubFields() + .stream() + .map(Field::getName) + .collect(Collectors.toList()); + + List srcValueFields = dstValueFields.stream() + .map(field -> "`src`." + INTERMEDIATE_TABLE_VALUE_FIELD_NAME + "." + field) + .collect(Collectors.toList()); + List updateValues = dstValueFields.stream() + .map(field -> field + "=`src`." + INTERMEDIATE_TABLE_VALUE_FIELD_NAME + "." + field) + .collect(Collectors.toList()); + + String partitionTimeField = usePartitionDecorator ? "_PARTITIONTIME, " : ""; + String partitionTimeValue = usePartitionDecorator + ? "CAST(CAST(DATE(`src`." + INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME + ") AS DATE) AS TIMESTAMP), " + : ""; + + String dst = destinationTable.getTable(); + + StringBuilder keysMatch = new StringBuilder("`").append(dst).append("`.").append(keyFieldName).append("=`src`.").append(srcKey); + + StringBuilder mergeOpening = new StringBuilder("MERGE `").append(destinationTable.getDataset()).append("`.`").append(destinationTable.getTable()).append("` ") + .append("USING (") + .append("SELECT * FROM (") + .append("SELECT ARRAY_AGG(") + .append("x ORDER BY ").append(INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME).append(" DESC LIMIT 1") + .append(")[OFFSET(0)] src ") + .append("FROM `").append(intermediateTable.getDataset()).append("`.`").append(intermediateTable.getTable()).append("` x ") + .append("WHERE ").append(INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD).append("=").append(batchNumber).append(" ") + .append("GROUP BY ").append(String.join(", ", keyFields)) + .append(")") + .append(") "); + + StringBuilder insertClause = new StringBuilder("THEN INSERT (") + .append(keyFieldName).append(", ") + .append(partitionTimeField) + .append(String.join(", ", dstValueFields)) + .append(") ") + .append("VALUES (") + .append("`src`.").append(srcKey).append(", ") + .append(partitionTimeValue) + .append(String.join(", ", srcValueFields)) + .append(")"); + + StringBuilder updateClause = new StringBuilder("THEN UPDATE SET ") + .append(String.join(", ", updateValues)); + + StringBuilder valueIs = new StringBuilder("`src`.").append(INTERMEDIATE_TABLE_VALUE_FIELD_NAME).append(" IS "); + + if (upsertEnabled && deleteEnabled) { + // Delete rows with null values, and upsert all others + return mergeOpening + .append("ON ").append(keysMatch).append(" ") + .append("WHEN MATCHED AND ").append(valueIs).append("NOT NULL ") + .append(updateClause).append(" ") + .append("WHEN MATCHED AND ").append(valueIs).append("NULL ") + .append("THEN DELETE ") + .append("WHEN NOT MATCHED AND ").append(valueIs).append("NOT NULL ") + .append(insertClause) + .append(";") + .toString(); + } else if (deleteEnabled) { + // Delete rows with null values, and insert all others + return mergeOpening + .append("ON ").append(keysMatch).append(" ") + .append("AND ").append(valueIs).append("NULL ") + .append("WHEN MATCHED ") + .append("THEN DELETE ") + .append("WHEN NOT MATCHED ") + .append(insertClause) + .append(";") + .toString(); + } else if (upsertEnabled) { + // Assume all rows have non-null values and upsert them all + return mergeOpening + .append("ON ").append(keysMatch).append(" ") + .append("WHEN MATCHED") + .append(updateClause).append(" ") + .append("WHEN NOT MATCHED") + .append(insertClause) + .append(";") + .toString(); + } else { + throw new IllegalStateException("At least one of upsert or delete must be enabled for merge flushing to occur."); + } + } + + // DELETE FROM `` WHERE batchNumber <= AND _PARTITIONTIME IS NOT NULL; + private static String clearBatchQuery(TableId intermediateTable, int batchNumber) { + return new StringBuilder("DELETE FROM `").append(intermediateTable.getDataset()).append("`.`").append(intermediateTable.getTable()).append("` ") + .append("WHERE ") + .append(INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD).append(" <= ").append(batchNumber).append(" ") + // Use this clause to filter out rows that are still in the streaming buffer, which should + // not be subjected to UPDATE or DELETE operations or the query will FAIL + .append("AND _PARTITIONTIME IS NOT NULL") + .append(";") + .toString(); + } + + private static List listFields(FieldList keyFields, String prefix) { + return keyFields.stream() + .flatMap(field -> { + String fieldName = prefix + field.getName(); + FieldList subFields = field.getSubFields(); + if (subFields == null) { + return Stream.of(fieldName); + } + return listFields(subFields, fieldName + ".").stream(); + }).collect(Collectors.toList()); + } +} diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java index e10839b65..b0968759b 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java @@ -30,11 +30,14 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Class for managing Schemas of BigQuery tables (creating and updating). */ public class SchemaManager { + private static final Logger logger = LoggerFactory.getLogger(SchemaManager.class); private final SchemaRetriever schemaRetriever; @@ -46,6 +49,10 @@ public class SchemaManager { private final Optional kafkaDataFieldName; private final Optional timestampPartitionFieldName; private final Optional> clusteringFieldName; + private final boolean intermediateTables; + private final ConcurrentMap tableCreateLocks; + private final ConcurrentMap tableUpdateLocks; + private final ConcurrentMap schemaCache; /** * @param schemaRetriever Used to determine the Kafka Connect Schema that should be used for a @@ -55,9 +62,14 @@ public class SchemaManager { * @param allowNewBQFields If set to true, allows new fields to be added to BigQuery Schema. * @param allowBQRequiredFieldRelaxation If set to true, allows changing field mode from REQUIRED to NULLABLE * @param kafkaKeyFieldName The name of kafka key field to be used in BigQuery. - * If set to null, Kafka Key Field will not be included in BigQuery. + * If set to null, Kafka Key Field will not be included in BigQuery. * @param kafkaDataFieldName The name of kafka data field to be used in BigQuery. * If set to null, Kafka Data Field will not be included in BigQuery. + * @param timestampPartitionFieldName The name of the field to use for column-based time + * partitioning in BigQuery. + * If set to null, ingestion time-based partitioning will be + * used instead. + * @param clusteringFieldName */ public SchemaManager( SchemaRetriever schemaRetriever, @@ -69,6 +81,36 @@ public SchemaManager( Optional kafkaDataFieldName, Optional timestampPartitionFieldName, Optional> clusteringFieldName) { + this( + schemaRetriever, + schemaConverter, + bigQuery, + allowNewBQFields, + allowBQRequiredFieldRelaxation, + kafkaKeyFieldName, + kafkaDataFieldName, + timestampPartitionFieldName, + clusteringFieldName, + false, + new ConcurrentHashMap<>(), + new ConcurrentHashMap<>(), + new ConcurrentHashMap<>()); + } + + private SchemaManager( + SchemaRetriever schemaRetriever, + SchemaConverter schemaConverter, + BigQuery bigQuery, + boolean allowNewBQFields, + boolean allowBQRequiredFieldRelaxation, + Optional kafkaKeyFieldName, + Optional kafkaDataFieldName, + Optional timestampPartitionFieldName, + Optional> clusteringFieldName, + boolean intermediateTables, + ConcurrentMap tableCreateLocks, + ConcurrentMap tableUpdateLocks, + ConcurrentMap schemaCache) { this.schemaRetriever = schemaRetriever; this.schemaConverter = schemaConverter; this.bigQuery = bigQuery; @@ -78,6 +120,59 @@ public SchemaManager( this.kafkaDataFieldName = kafkaDataFieldName; this.timestampPartitionFieldName = timestampPartitionFieldName; this.clusteringFieldName = clusteringFieldName; + this.intermediateTables = intermediateTables; + this.tableCreateLocks = tableCreateLocks; + this.tableUpdateLocks = tableUpdateLocks; + this.schemaCache = schemaCache; + } + + public SchemaManager forIntermediateTables() { + return new SchemaManager( + schemaRetriever, + schemaConverter, + bigQuery, + allowNewBQFields, + allowBQRequiredFieldRelaxation, + kafkaKeyFieldName, + kafkaDataFieldName, + timestampPartitionFieldName, + clusteringFieldName, + true, + tableCreateLocks, + tableUpdateLocks, + schemaCache + ); + } + + /** + * Fetch the most recent schema for the given table, assuming it has been created and/or updated + * over the lifetime of this schema manager. + * @param table the table to fetch the schema for; may be null + * @return the latest schema for that table; may be null if the table does not exist or has not + * been created or updated by this schema manager + */ + public com.google.cloud.bigquery.Schema cachedSchema(TableId table) { + return schemaCache.get(table); + } + + /** + * Create a new table in BigQuery, if it doesn't already exist. Otherwise, update the existing + * table to use the most-current schema. + * @param table The BigQuery table to create, + * @param records The sink records used to determine the schema. + */ + public void createOrUpdateTable(TableId table, Set records) { + synchronized (lock(tableCreateLocks, table)) { + if (bigQuery.getTable(table) == null) { + logger.debug("{} doesn't exist; creating instead of updating", table(table)); + createTable(table, records); + return; + } + } + + // Table already existed; attempt to update instead + logger.debug("{} already exists; updating instead of creating", table(table)); + updateSchema(table, records); } /** @@ -86,8 +181,27 @@ public SchemaManager( * @param records The sink records used to determine the schema. */ public void createTable(TableId table, Set records) { + synchronized (lock(tableCreateLocks, table)) { + if (schemaCache.containsKey(table)) { + // Table already exists; noop + logger.debug("Skipping create of {} as it should already exist or appear very soon", table(table)); + return; + } TableInfo tableInfo = getTableInfo(table, records); - bigQuery.create(tableInfo); + logger.info("Attempting to create {} with schema {}", + table(table), tableInfo.getDefinition().getSchema()); + try { + bigQuery.create(tableInfo); + logger.debug("Successfully created {}", table(table)); + schemaCache.put(table, tableInfo.getDefinition().getSchema()); + } catch (BigQueryException e) { + if (e.getCode() == 409) { + logger.debug("Failed to create {} as it already exists (possibly created by another task)", table(table)); + com.google.cloud.bigquery.Schema schema = bigQuery.getTable(table).getDefinition().getSchema(); + schemaCache.put(table, schema); + } + } + } } /** @@ -96,10 +210,25 @@ public void createTable(TableId table, Set records) { * @param records The sink records used to update the schema. */ public void updateSchema(TableId table, Set records) { - TableInfo tableInfo = getTableInfo(table, records); - logger.info("Attempting to update table `{}` with schema {}", - table, tableInfo.getDefinition().getSchema()); - bigQuery.update(tableInfo); + synchronized (tableUpdateLocks.computeIfAbsent(table, t -> new Object())) { + TableInfo tableInfo = getTableInfo(table, records); + + if (!schemaCache.containsKey(table)) { + logger.debug("Reading schema for {}", table(table)); + schemaCache.put(table, bigQuery.getTable(table).getDefinition().getSchema()); + } + + if (!schemaCache.get(table).equals(tableInfo.getDefinition().getSchema())) { + logger.info("Attempting to update {} with schema {}", + table(table), tableInfo.getDefinition().getSchema()); + bigQuery.update(tableInfo); + logger.debug("Successfully updated {}", table(table)); + schemaCache.put(table, tableInfo.getDefinition().getSchema()); + } else { + logger.debug("Skipping update of {} since current schema should be compatible", table(table)); + } + } + } /** @@ -211,46 +340,119 @@ private String getUnionizedTableDescription(Set records) { // package private for testing. TableInfo constructTableInfo(TableId table, com.google.cloud.bigquery.Schema bigQuerySchema, String tableDescription) { - TimePartitioning timePartitioning = TimePartitioning.of(Type.DAY); - if (timestampPartitionFieldName.isPresent()) { - timePartitioning = timePartitioning.toBuilder().setField(timestampPartitionFieldName.get()).build(); - } - StandardTableDefinition.Builder builder = StandardTableDefinition.newBuilder() - .setSchema(bigQuerySchema) - .setTimePartitioning(timePartitioning); - - if (timestampPartitionFieldName.isPresent() && clusteringFieldName.isPresent()) { - Clustering clustering = Clustering.newBuilder() - .setFields(clusteringFieldName.get()) - .build(); - builder.setClustering(clustering); + .setSchema(bigQuerySchema); + + if (intermediateTables) { + // Shameful hack: make the table ingestion time-partitioned here so that the _PARTITIONTIME + // pseudocolumn can be queried to filter out rows that are still in the streaming buffer + builder.setTimePartitioning(TimePartitioning.of(Type.DAY)); + } else { + TimePartitioning timePartitioning = TimePartitioning.of(Type.DAY); + if (timestampPartitionFieldName.isPresent()) { + timePartitioning = timePartitioning.toBuilder().setField(timestampPartitionFieldName.get()).build(); + } + + builder.setTimePartitioning(timePartitioning); + + if (timestampPartitionFieldName.isPresent() && clusteringFieldName.isPresent()) { + Clustering clustering = Clustering.newBuilder() + .setFields(clusteringFieldName.get()) + .build(); + builder.setClustering(clustering); + } } StandardTableDefinition tableDefinition = builder.build(); TableInfo.Builder tableInfoBuilder = TableInfo.newBuilder(table, tableDefinition); - if (tableDescription != null) { + if (intermediateTables) { + tableInfoBuilder.setDescription("Temporary table"); + } else if (tableDescription != null) { tableInfoBuilder.setDescription(tableDescription); } + return tableInfoBuilder.build(); } private com.google.cloud.bigquery.Schema getBigQuerySchema(Schema kafkaKeySchema, Schema kafkaValueSchema) { - List allFields = new ArrayList<> (); com.google.cloud.bigquery.Schema valueSchema = schemaConverter.convertSchema(kafkaValueSchema); - allFields.addAll(valueSchema.getFields()); - if (kafkaKeyFieldName.isPresent()) { - com.google.cloud.bigquery.Schema keySchema = schemaConverter.convertSchema(kafkaKeySchema); - Field kafkaKeyField = Field.newBuilder(kafkaKeyFieldName.get(), LegacySQLTypeName.RECORD, keySchema.getFields()) - .setMode(Field.Mode.NULLABLE).build(); - allFields.add(kafkaKeyField); + + List schemaFields = intermediateTables + ? getIntermediateSchemaFields(valueSchema, kafkaKeySchema) + : getRegularSchemaFields(valueSchema, kafkaKeySchema); + + return com.google.cloud.bigquery.Schema.of(schemaFields); + } + + private List getIntermediateSchemaFields(com.google.cloud.bigquery.Schema valueSchema, Schema kafkaKeySchema) { + if (kafkaKeySchema == null) { + throw new BigQueryConnectException(String.format( + "Cannot create intermediate table without specifying a value for '%s'", + BigQuerySinkConfig.KAFKA_KEY_FIELD_NAME_CONFIG + )); } + + List result = new ArrayList<>(); + + List valueFields = new ArrayList<>(valueSchema.getFields()); if (kafkaDataFieldName.isPresent()) { - Field kafkaDataField = KafkaDataBuilder.buildKafkaDataField(kafkaDataFieldName.get()); - allFields.add(kafkaDataField); + Field kafkaDataField = KafkaDataBuilder.buildKafkaDataField(kafkaDataFieldName.get()); + valueFields.add(kafkaDataField); } - return com.google.cloud.bigquery.Schema.of(allFields); + + // Wrap the sink record value (and possibly also its Kafka data) in a struct in order to support deletes + Field wrappedValueField = Field + .newBuilder(MergeQueries.INTERMEDIATE_TABLE_VALUE_FIELD_NAME, LegacySQLTypeName.RECORD, valueFields.toArray(new Field[0])) + .setMode(Field.Mode.NULLABLE) + .build(); + result.add(wrappedValueField); + + com.google.cloud.bigquery.Schema keySchema = schemaConverter.convertSchema(kafkaKeySchema); + Field kafkaKeyField = Field.newBuilder(MergeQueries.INTERMEDIATE_TABLE_KEY_FIELD_NAME, LegacySQLTypeName.RECORD, keySchema.getFields()) + .setMode(Field.Mode.REQUIRED).build(); + result.add(kafkaKeyField); + + Field partitionTimeField = Field + .newBuilder(MergeQueries.INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME, LegacySQLTypeName.TIMESTAMP) + .setMode(Field.Mode.NULLABLE) + .build(); + result.add(partitionTimeField); + + Field batchNumberField = Field + .newBuilder(MergeQueries.INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD, LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.REQUIRED) + .build(); + result.add(batchNumberField); + + return result; + } + + private List getRegularSchemaFields(com.google.cloud.bigquery.Schema valueSchema, Schema kafkaKeySchema) { + List result = new ArrayList<>(valueSchema.getFields()); + + if (kafkaDataFieldName.isPresent()) { + Field kafkaDataField = KafkaDataBuilder.buildKafkaDataField(kafkaDataFieldName.get()); + result.add(kafkaDataField); + } + + if (kafkaKeyFieldName.isPresent()) { + com.google.cloud.bigquery.Schema keySchema = schemaConverter.convertSchema(kafkaKeySchema); + Field kafkaKeyField = Field.newBuilder(kafkaKeyFieldName.get(), LegacySQLTypeName.RECORD, keySchema.getFields()) + .setMode(Field.Mode.NULLABLE).build(); + result.add(kafkaKeyField); + } + + return result; + } + + private String table(TableId table) { + return (intermediateTables ? "intermediate " : "") + + "table " + + table; } + private Object lock(ConcurrentMap locks, TableId table) { + return locks.computeIfAbsent(table, t -> new Object()); + } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index e1ab964c8..8cfc9c272 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -51,7 +51,6 @@ * Base class for connector and task configs; contains properties shared between the two of them. */ public class BigQuerySinkConfig extends AbstractConfig { - private static final ConfigDef config; private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConfig.class); // Values taken from https://github.com/apache/kafka/blob/1.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L33 @@ -239,8 +238,61 @@ public class BigQuerySinkConfig extends AbstractConfig { private static final String ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_DOC = "If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE"; - static { - config = new ConfigDef() + public static final String UPSERT_ENABLED_CONFIG = "upsertEnabled"; + private static final ConfigDef.Type UPSERT_ENABLED_TYPE = ConfigDef.Type.BOOLEAN; + public static final boolean UPSERT_ENABLED_DEFAULT = false; + private static final ConfigDef.Importance UPSERT_ENABLED_IMPORTANCE = ConfigDef.Importance.LOW; + private static final String UPSERT_ENABLED_DOC = + "Enable upsert functionality on the connector through the use of record keys, intermediate " + + "tables, and periodic merge flushes. Row-matching will be performed based on the contents " + + "of record keys."; + + public static final String DELETE_ENABLED_CONFIG = "deleteEnabled"; + private static final ConfigDef.Type DELETE_ENABLED_TYPE = ConfigDef.Type.BOOLEAN; + public static final boolean DELETE_ENABLED_DEFAULT = false; + private static final ConfigDef.Importance DELETE_ENABLED_IMPORTANCE = ConfigDef.Importance.LOW; + private static final String DELETE_ENABLED_DOC = + "Enable delete functionality on the connector through the use of record keys, intermediate " + + "tables, and periodic merge flushes. A delete will be performed when a record with a null " + + "value (i.e., a tombstone record) is read."; + + public static final String INTERMEDIATE_TABLE_SUFFIX_CONFIG = "intermediateTableSuffix"; + private static final ConfigDef.Type INTERMEDIATE_TABLE_SUFFIX_TYPE = ConfigDef.Type.STRING; + public static final String INTERMEDIATE_TABLE_SUFFIX_DEFAULT = "tmp"; + private static final ConfigDef.Validator INTERMEDIATE_TABLE_SUFFIX_VALIDATOR = new ConfigDef.NonEmptyString(); + private static final ConfigDef.Importance INTERMEDIATE_TTABLE_SUFFIX_IMPORTANCE = ConfigDef.Importance.LOW; + private static final String INTERMEDIATE_TABLE_SUFFIX_DOC = + "A suffix that will be appended to the names of destination tables to create the names for " + + "the corresponding intermediate tables. Multiple intermediate tables may be created for a " + + "single destination table, but their names will always start with the name of the " + + "destination table, followed by this suffix, and possibly followed by an additional " + + "suffix."; + + public static final String MERGE_INTERVAL_MS_CONFIG = "mergeIntervalMs"; + private static final ConfigDef.Type MERGE_INTERVAL_MS_TYPE = ConfigDef.Type.LONG; + public static final long MERGE_INTERVAL_MS_DEFAULT = 60_000L; + private static final ConfigDef.Validator MERGE_INTERVAL_MS_VALIDATOR = ConfigDef.Range.atLeast(-1); + private static final ConfigDef.Importance MERGE_INTERVAL_MS_IMPORTANCE = ConfigDef.Importance.LOW; + private static final String MERGE_INTERVAL_MS_DOC = + "How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be " + + "set to -1 to disable periodic flushing."; + + public static final String MERGE_RECORDS_THRESHOLD_CONFIG = "mergeRecordsThreshold"; + private static final ConfigDef.Type MERGE_RECORDS_THRESHOLD_TYPE = ConfigDef.Type.LONG; + public static final long MERGE_RECORDS_THRESHOLD_DEFAULT = -1; + private static final ConfigDef.Validator MERGE_RECORDS_THRESHOLD_VALIDATOR = ConfigDef.Range.atLeast(-1); + private static final ConfigDef.Importance MERGE_RECORDS_THRESHOLD_IMPORTANCE = ConfigDef.Importance.LOW; + private static final String MERGE_RECORDS_THRESHOLD_DOC = + "How many records to write to an intermediate table before performing a merge flush, if " + + "upsert/delete is enabled. Can be set to -1 to disable record count-based flushing."; + + /** + * Return a ConfigDef object used to define this config's fields. + * + * @return A ConfigDef object used to define this config's fields. + */ + public static ConfigDef getConfig() { + return new ConfigDef() .define( TOPICS_CONFIG, TOPICS_TYPE, @@ -382,25 +434,85 @@ public class BigQuerySinkConfig extends AbstractConfig { ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_DEFAULT, ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_IMPORTANCE, ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_DOC + ).define( + UPSERT_ENABLED_CONFIG, + UPSERT_ENABLED_TYPE, + UPSERT_ENABLED_DEFAULT, + UPSERT_ENABLED_IMPORTANCE, + UPSERT_ENABLED_DOC + ).define( + DELETE_ENABLED_CONFIG, + DELETE_ENABLED_TYPE, + DELETE_ENABLED_DEFAULT, + DELETE_ENABLED_IMPORTANCE, + DELETE_ENABLED_DOC + ).define( + INTERMEDIATE_TABLE_SUFFIX_CONFIG, + INTERMEDIATE_TABLE_SUFFIX_TYPE, + INTERMEDIATE_TABLE_SUFFIX_DEFAULT, + INTERMEDIATE_TABLE_SUFFIX_VALIDATOR, + INTERMEDIATE_TTABLE_SUFFIX_IMPORTANCE, + INTERMEDIATE_TABLE_SUFFIX_DOC + ).define( + MERGE_INTERVAL_MS_CONFIG, + MERGE_INTERVAL_MS_TYPE, + MERGE_INTERVAL_MS_DEFAULT, + MERGE_INTERVAL_MS_VALIDATOR, + MERGE_INTERVAL_MS_IMPORTANCE, + MERGE_INTERVAL_MS_DOC + ).define( + MERGE_RECORDS_THRESHOLD_CONFIG, + MERGE_RECORDS_THRESHOLD_TYPE, + MERGE_RECORDS_THRESHOLD_DEFAULT, + MERGE_RECORDS_THRESHOLD_VALIDATOR, + MERGE_RECORDS_THRESHOLD_IMPORTANCE, + MERGE_RECORDS_THRESHOLD_DOC ); } - /** - * Throw an exception if the passed-in properties do not constitute a valid sink. - * @param props sink configuration properties - */ - public static void validate(Map props) { - final boolean hasTopicsConfig = hasTopicsConfig(props); - final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props); - - if (hasTopicsConfig && hasTopicsRegexConfig) { - throw new ConfigException(TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG + - " are mutually exclusive options, but both are set."); - } - - if (!hasTopicsConfig && !hasTopicsRegexConfig) { - throw new ConfigException("Must configure one of " + - TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG); - } + + /** + * Throw an exception if the passed-in properties do not constitute a valid sink. + * @param props sink configuration properties + */ + public static void validate(Map props) { + final boolean hasTopicsConfig = hasTopicsConfig(props); + final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props); + + if (hasTopicsConfig && hasTopicsRegexConfig) { + throw new ConfigException(TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG + + " are mutually exclusive options, but both are set."); + } + + if (!hasTopicsConfig && !hasTopicsRegexConfig) { + throw new ConfigException("Must configure one of " + + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG); + } + + if (upsertDeleteEnabled(props)) { + String mergeIntervalStr = Optional.ofNullable(props.get(MERGE_INTERVAL_MS_CONFIG)) + .map(String::trim) + .orElse(Long.toString(MERGE_INTERVAL_MS_DEFAULT)); + String mergeRecordsThresholdStr = Optional.ofNullable(props.get(MERGE_RECORDS_THRESHOLD_CONFIG)) + .map(String::trim) + .orElse(Long.toString(MERGE_RECORDS_THRESHOLD_DEFAULT)); + if ("-1".equals(mergeIntervalStr) && "-1".equals(mergeRecordsThresholdStr)) { + throw new ConfigException(MERGE_INTERVAL_MS_CONFIG + " and " + + MERGE_RECORDS_THRESHOLD_CONFIG + " cannot both be -1"); + } + + if ("0".equals(mergeIntervalStr)) { + throw new ConfigException(MERGE_INTERVAL_MS_CONFIG, mergeIntervalStr, "cannot be zero"); + } + if ("0".equals(mergeRecordsThresholdStr)) { + throw new ConfigException(MERGE_RECORDS_THRESHOLD_CONFIG, mergeRecordsThresholdStr, "cannot be zero"); + } + + String kafkaKeyFieldStr = props.get(KAFKA_KEY_FIELD_NAME_CONFIG); + if (kafkaKeyFieldStr == null || kafkaKeyFieldStr.trim().isEmpty()) { + throw new ConfigException(KAFKA_KEY_FIELD_NAME_CONFIG + " must be specified when " + + UPSERT_ENABLED_CONFIG + " and/or " + DELETE_ENABLED_CONFIG + " are set to true"); + } + } } public static boolean hasTopicsConfig(Map props) { @@ -413,6 +525,13 @@ public static boolean hasTopicsRegexConfig(Map props) { return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty(); } + public static boolean upsertDeleteEnabled(Map props) { + String upsertStr = props.get(UPSERT_ENABLED_CONFIG); + String deleteStr = props.get(DELETE_ENABLED_CONFIG); + return Boolean.TRUE.toString().equalsIgnoreCase(upsertStr) + || Boolean.TRUE.toString().equalsIgnoreCase(deleteStr); + } + /** * Returns the keyfile */ @@ -516,6 +635,10 @@ public Optional getKafkaDataFieldName() { return Optional.ofNullable(getString(KAFKA_DATA_FIELD_NAME_CONFIG)); } + public boolean isUpsertDeleteEnabled() { + return getBoolean(UPSERT_ENABLED_CONFIG) || getBoolean(DELETE_ENABLED_CONFIG); + } + /** * Verifies that a bucket is specified if GCS batch loading is enabled. * @throws ConfigException Exception thrown if no bucket is specified and batch loading is on. @@ -550,22 +673,13 @@ private void checkBigQuerySchemaUpdateConfigs() { } } - /** - * Return the ConfigDef object used to define this config's fields. - * - * @return The ConfigDef object used to define this config's fields. - */ - public static ConfigDef getConfig() { - return config; - } - protected BigQuerySinkConfig(ConfigDef config, Map properties) { super(config, properties); verifyBucketSpecified(); } public BigQuerySinkConfig(Map properties) { - super(config, properties); + super(getConfig(), properties); verifyBucketSpecified(); checkAutoCreateTables(); checkBigQuerySchemaUpdateConfigs(); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java index 9b395700b..1ba973dc7 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java @@ -55,7 +55,7 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig { "The maximum size (or -1 for no maximum size) of the worker queue for bigQuery write " + "requests before all topics are paused. This is a soft limit; the size of the queue can " + "go over this before topics are paused. All topics will be resumed once a flush is " - + "requested or the size of the queue drops under half of the maximum size."; + + "triggered or the size of the queue drops under half of the maximum size."; public static final String BIGQUERY_RETRY_CONFIG = "bigQueryRetry"; private static final ConfigDef.Type BIGQUERY_RETRY_TYPE = ConfigDef.Type.INT; @@ -120,6 +120,11 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig { private static final String BIGQUERY_CLUSTERING_FIELD_NAMES_DOC = "List of fields on which data should be clustered by in BigQuery, separated by commas"; + public static final String TASK_ID_CONFIG = "taskId"; + private static final ConfigDef.Type TASK_ID_TYPE = ConfigDef.Type.INT; + public static final ConfigDef.Importance TASK_ID_IMPORTANCE = ConfigDef.Importance.LOW; + private static final String TASK_ID_DOC = "A unique for each task created by the connector"; + static { config = BigQuerySinkConfig.getConfig() .define( @@ -174,6 +179,11 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig { BIGQUERY_CLUSTERING_FIELD_NAMES_DEFAULT, BIGQUERY_CLUSTERING_FIELD_NAMES_IMPORTANCE, BIGQUERY_CLUSTERING_FIELD_NAMES_DOC + ).define( + TASK_ID_CONFIG, + TASK_ID_TYPE, + TASK_ID_IMPORTANCE, + TASK_ID_DOC ); } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java index 835935bd8..816e74b4e 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java @@ -47,12 +47,12 @@ */ public class BigQueryRecordConverter implements RecordConverter> { - private static final Set BASIC_TYPES = new HashSet( + private static final Set> BASIC_TYPES = new HashSet<>( Arrays.asList( Boolean.class, Character.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class, String.class) ); - private boolean shouldConvertSpecialDouble; + private final boolean shouldConvertSpecialDouble; static { // force registration @@ -72,6 +72,7 @@ public BigQueryRecordConverter(boolean shouldConvertDoubleSpecial) { * @param recordType The type of the record to convert, either value or key. * @return The result BigQuery row content. */ + @SuppressWarnings("unchecked") public Map convertRecord(SinkRecord record, KafkaSchemaRecordType recordType) { Schema kafkaConnectSchema = recordType == KafkaSchemaRecordType.KEY ? record.keySchema() : record.valueSchema(); Object kafkaConnectStruct = recordType == KafkaSchemaRecordType.KEY ? record.key() : record.value(); @@ -89,6 +90,7 @@ public Map convertRecord(SinkRecord record, KafkaSchemaRecordTyp return convertStruct(kafkaConnectStruct, kafkaConnectSchema); } + @SuppressWarnings("unchecked") private Object convertSchemalessRecord(Object value) { if (value == null) { return null; @@ -103,10 +105,9 @@ private Object convertSchemalessRecord(Object value) { return convertBytes(value); } if (value instanceof List) { - return - ((List) value).stream().map( - v -> convertSchemalessRecord(v) - ).collect(Collectors.toList()); + return ((List) value).stream() + .map(this::convertSchemalessRecord) + .collect(Collectors.toList()); } if (value instanceof Map) { return @@ -128,7 +129,6 @@ private Object convertSchemalessRecord(Object value) { " found in schemaless record data. Can't convert record to bigQuery format"); } - @SuppressWarnings("unchecked") private Object convertObject(Object kafkaConnectObject, Schema kafkaConnectSchema) { if (kafkaConnectObject == null) { if (kafkaConnectSchema.isOptional()) { @@ -152,22 +152,16 @@ private Object convertObject(Object kafkaConnectObject, Schema kafkaConnectSchem return convertStruct(kafkaConnectObject, kafkaConnectSchema); case BYTES: return convertBytes(kafkaConnectObject); - case BOOLEAN: - return (Boolean) kafkaConnectObject; - case FLOAT32: - return (Float) kafkaConnectObject; case FLOAT64: return convertDouble((Double)kafkaConnectObject); + case BOOLEAN: + case FLOAT32: case INT8: - return (Byte) kafkaConnectObject; case INT16: - return (Short) kafkaConnectObject; case INT32: - return (Integer) kafkaConnectObject; case INT64: - return (Long) kafkaConnectObject; case STRING: - return (String) kafkaConnectObject; + return kafkaConnectObject; default: throw new ConversionConnectException("Unrecognized schema type: " + kafkaConnectSchemaType); } @@ -214,7 +208,7 @@ private List> convertMap(Object kafkaConnectObject, Schema kafkaConnectValueSchema = kafkaConnectSchema.valueSchema(); List> bigQueryEntryList = new ArrayList<>(); Map kafkaConnectMap = (Map) kafkaConnectObject; - for (Map.Entry kafkaConnectMapEntry : kafkaConnectMap.entrySet()) { + for (Map.Entry kafkaConnectMapEntry : kafkaConnectMap.entrySet()) { Map bigQueryEntry = new HashMap<>(); Object bigQueryKey = convertObject( kafkaConnectMapEntry.getKey(), diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQuerySchemaConverter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQuerySchemaConverter.java index 175d6f884..a48c3d2eb 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQuerySchemaConverter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQuerySchemaConverter.java @@ -97,6 +97,7 @@ public BigQuerySchemaConverter(boolean allFieldsNullable) { * existing one. */ public com.google.cloud.bigquery.Schema convertSchema(Schema kafkaConnectSchema) { + // TODO: Permit non-struct keys if (kafkaConnectSchema.type() != Schema.Type.STRUCT) { throw new ConversionConnectException("Top-level Kafka Connect schema must be of type 'struct'"); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/FieldNameSanitizer.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/FieldNameSanitizer.java index 09aeb70c2..a55399331 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/FieldNameSanitizer.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/FieldNameSanitizer.java @@ -20,6 +20,7 @@ public static String sanitizeName(String name) { // letters, numbers, and underscores. // Note: a.b and a/b will have the same value after sanitization which will cause Duplicate key // Exception. + @SuppressWarnings("unchecked") public static Map replaceInvalidKeys(Map map) { return map.entrySet().stream().collect(Collectors.toMap( (entry) -> sanitizeName(entry.getKey()), diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java index 96cd2144b..4af836142 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java @@ -18,11 +18,20 @@ */ import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.TableId; +import com.wepay.kafka.connect.bigquery.MergeQueries; import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType; +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder; import com.wepay.kafka.connect.bigquery.convert.RecordConverter; +import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -30,36 +39,104 @@ * A class for converting a {@link SinkRecord SinkRecord} to {@link InsertAllRequest.RowToInsert BigQuery row} */ public class SinkRecordConverter { + private static final Logger logger = LoggerFactory.getLogger(SinkRecordConverter.class); + + private final BigQuerySinkTaskConfig config; + private final MergeBatches mergeBatches; + private final MergeQueries mergeQueries; + private final RecordConverter> recordConverter; - private final boolean sanitizeFieldName; - private final Optional kafkaKeyFieldName; - private final Optional kafkaDataFieldName; - - public SinkRecordConverter(RecordConverter> recordConverter, boolean sanitizeFieldName, Optional kafkaKeyFieldName, Optional kafkaDataFieldName) { - this.recordConverter = recordConverter; - this.sanitizeFieldName = sanitizeFieldName; - this.kafkaKeyFieldName = kafkaKeyFieldName; - this.kafkaDataFieldName = kafkaDataFieldName; + private final long mergeRecordsThreshold; + private final boolean useMessageTimeDatePartitioning; + private final boolean usePartitionDecorator; + + + public SinkRecordConverter(BigQuerySinkTaskConfig config, + MergeBatches mergeBatches, MergeQueries mergeQueries) { + this.config = config; + this.mergeBatches = mergeBatches; + this.mergeQueries = mergeQueries; + + this.recordConverter = config.getRecordConverter(); + this.mergeRecordsThreshold = config.getLong(config.MERGE_RECORDS_THRESHOLD_CONFIG); + this.useMessageTimeDatePartitioning = + config.getBoolean(config.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG); + this.usePartitionDecorator = + config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG); + + } + + public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record, TableId table) { + Map convertedRecord = config.isUpsertDeleteEnabled() + ? getUpsertDeleteRow(record, table) + : getRegularRow(record); + + Map result = config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG) + ? FieldNameSanitizer.replaceInvalidKeys(convertedRecord) + : convertedRecord; + + return InsertAllRequest.RowToInsert.of(getRowId(record), result); } - public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record) { - Map convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE); - if (kafkaKeyFieldName.isPresent()) { - convertedRecord.put(kafkaKeyFieldName.get(), recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY)); + private Map getUpsertDeleteRow(SinkRecord record, TableId table) { + // Unconditionally allow tombstone records if delete is enabled. + Map convertedValue = config.getBoolean(config.DELETE_ENABLED_CONFIG) && record.value() == null + ? null + : recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE); + + Map convertedKey = recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY); + + if (convertedValue != null) { + config.getKafkaDataFieldName().ifPresent( + fieldName -> convertedValue.put(fieldName, KafkaDataBuilder.buildKafkaDataRecord(record)) + ); } - if (kafkaDataFieldName.isPresent()) { - convertedRecord.put(kafkaDataFieldName.get(), KafkaDataBuilder.buildKafkaDataRecord(record)); + + Map result = new HashMap<>(); + long totalBatchSize = mergeBatches.addToBatch(record, table, result); + if (mergeRecordsThreshold != -1 && totalBatchSize >= mergeRecordsThreshold) { + logger.debug("Triggering merge flush for table {} since the size of its current batch has " + + "exceeded the configured threshold of {}}", + table, mergeRecordsThreshold); + mergeQueries.mergeFlush(table); } - if (sanitizeFieldName) { - convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord); + + result.put(MergeQueries.INTERMEDIATE_TABLE_KEY_FIELD_NAME, convertedKey); + result.put(MergeQueries.INTERMEDIATE_TABLE_VALUE_FIELD_NAME, convertedValue); + if (usePartitionDecorator && useMessageTimeDatePartitioning) { + if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) { + throw new ConnectException( + "Message has no timestamp type, cannot use message timestamp to partition."); + } + result.put(MergeQueries.INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME, record.timestamp()); + } else { + // Provide a value for this column even if it's not used for partitioning in the destination + // table, so that it can be used to deduplicate rows during merge flushes + result.put(MergeQueries.INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME, System.currentTimeMillis() / 1000); } - return InsertAllRequest.RowToInsert.of(getRowId(record), convertedRecord); + + return result; + } + + private Map getRegularRow(SinkRecord record) { + Map result = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE); + + config.getKafkaDataFieldName().ifPresent( + fieldName -> result.put(fieldName, KafkaDataBuilder.buildKafkaDataRecord(record)) + ); + + config.getKafkaKeyFieldName().ifPresent(fieldName -> { + Map keyData = recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY); + result.put(fieldName, keyData); + }); + + return result; } private String getRowId(SinkRecord record) { return String.format("%s-%d-%d", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset()); + record.topic(), + record.kafkaPartition(), + record.kafkaOffset()); } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java index 45d12e1dd..d876bc2d8 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java @@ -47,7 +47,7 @@ public class GCSBatchTableWriter implements Runnable { private final String bucketName; private final String blobName; - private SortedMap rows; + private final SortedMap rows; private final GCSToBQWriter writer; /** @@ -90,7 +90,7 @@ public static class Builder implements TableWriterBuilder { private String blobName; private final TableId tableId; - private SortedMap rows; + private final SortedMap rows; private final SinkRecordConverter recordConverter; private final GCSToBQWriter writer; @@ -119,19 +119,12 @@ public Builder(GCSToBQWriter writer, this.writer = writer; } - public Builder setBlobName(String blobName) { - this.blobName = blobName; - return this; - } - - /** - * Adds a record to the builder. - * @param record the row to add - */ - public void addRow(SinkRecord record) { - rows.put(record, recordConverter.getRecordRow(record)); + @Override + public void addRow(SinkRecord record, TableId table) { + rows.put(record, recordConverter.getRecordRow(record, table)); } + @Override public GCSBatchTableWriter build() { return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName); } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java index 637ac2ea4..9e128cdaa 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java @@ -91,11 +91,21 @@ public void awaitCurrentTasks() throws InterruptedException, BigQueryConnectExce execute(new CountDownRunnable(countDownLatch)); } countDownLatch.await(); + maybeThrowEncounteredErrors(); + } + + /** + * Immediately throw an exception if any unrecoverable errors were encountered by any of the write + * tasks. + * + * @throws BigQueryConnectException if any of the tasks failed. + */ + public void maybeThrowEncounteredErrors() { if (encounteredErrors.size() > 0) { String errorString = createErrorString(encounteredErrors); encounteredErrors.clear(); throw new BigQueryConnectException("Some write threads encountered unrecoverable errors: " - + errorString + "; See logs for more detail"); + + errorString + "; See logs for more detail"); } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java new file mode 100644 index 000000000..92558a251 --- /dev/null +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java @@ -0,0 +1,332 @@ +package com.wepay.kafka.connect.bigquery.write.batch; + +/* + * Copyright 2016 WePay, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.TableId; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Maps; +import com.wepay.kafka.connect.bigquery.MergeQueries; +import com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class MergeBatches { + private static final Logger logger = LoggerFactory.getLogger(MergeBatches.class); + private static final long STREAMING_BUFFER_AVAILABILITY_WAIT_MS = 10_000L; + + private final String intermediateTableSuffix; + private final BiMap intermediateToDestinationTables; + private final ConcurrentMap batchNumbers; + private final ConcurrentMap> batches; + private final Map offsets; + + public MergeBatches(String intermediateTableSuffix) { + this.intermediateTableSuffix = intermediateTableSuffix; + + this.intermediateToDestinationTables = Maps.synchronizedBiMap(HashBiMap.create()); + this.batchNumbers = new ConcurrentHashMap<>(); + this.batches = new ConcurrentHashMap<>(); + this.offsets = new HashMap<>(); + } + + /** + * Get the latest safe-to-commit offsets for every topic partition that has had at least one + * record make its way to a destination table. + * @return the offsets map which can be used in + * {@link org.apache.kafka.connect.sink.SinkTask#preCommit(Map)}; never null + */ + public Map latestOffsets() { + synchronized (offsets) { + return offsets.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + entry -> new OffsetAndMetadata(entry.getValue()) + )); + } + } + + /** + * @return a thread-safe map from intermediate tables to destination tables; never null + */ + public Map intermediateToDestinationTables() { + return intermediateToDestinationTables; + } + + /** + * @return a collection of all currently-in-use intermediate tables; never null + */ + public Collection intermediateTables() { + return intermediateToDestinationTables.keySet(); + } + + /** + * Get the intermediate table for a given destination table, computing a new one if necessary + * @param destinationTable the destination table to fetch an intermediate table for + * @return the {@link TableId} of the intermediate table; never null + */ + public TableId intermediateTableFor(TableId destinationTable) { + return intermediateToDestinationTables.inverse() + .computeIfAbsent(destinationTable, this::newIntermediateTable); + } + + private TableId newIntermediateTable(TableId destinationTable) { + String tableName = FieldNameSanitizer.sanitizeName( + destinationTable.getTable() + intermediateTableSuffix + ); + TableId result = TableId.of( + destinationTable.getDataset(), + tableName + ); + + batchNumbers.put(result, new AtomicInteger()); + batches.put(result, new ConcurrentHashMap<>()); + + return result; } + + public TableId destinationTableFor(TableId intermediateTable) { + return intermediateToDestinationTables.get(intermediateTable); + } + + /** + * Find a batch number for the record, insert that number into the converted value, record the + * offset for that record, and return the total size of that batch. + * @param record the record for the batch + * @param intermediateTable the intermediate table the record will be streamed into + * @param convertedRecord the converted record that will be passed to the BigQuery client + * @return the total number of records in the batch that this record is added to + */ + public long addToBatch(SinkRecord record, TableId intermediateTable, Map convertedRecord) { + AtomicInteger batchCount = batchNumbers.get(intermediateTable); + // Synchronize here to ensure that the batch number isn't bumped in the middle of this method. + // On its own, that wouldn't be such a bad thing, but since a merge flush is supposed to + // immediately follow that bump, it might cause some trouble if we want to add this row to the + // batch but a merge flush on that batch has already started. This way, either the batch number + // is bumped before we add the row to the batch (in which case, the row is added to the fresh + // batch), or the row is added to the batch before preparation for the flush takes place and it + // is safely counted and tracked there. + synchronized (batchCount) { + int batchNumber = batchCount.get(); + convertedRecord.put(MergeQueries.INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD, batchNumber); + + Batch batch = batches.get(intermediateTable).computeIfAbsent(batchNumber, n -> new Batch()); + batch.recordOffsetFor(record); + + long pendingBatchSize = batch.increment(); + logger.trace("Added record to batch {} for intermediate table {}; {} rows are currently pending", + batchNumber, intermediateTable, pendingBatchSize); + return batch.total(); + } + } + + /** + * Record a successful write of a list of rows to the given intermediate table, and decrease the + * pending record counts for every applicable batch accordingly. + * @param intermediateTable the intermediate table + * @param rows the rows + */ + public void onRowWrites(TableId intermediateTable, Collection rows) { + Map rowsByBatch = rows.stream().collect(Collectors.groupingBy( + row -> (Integer) row.getContent().get(MergeQueries.INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD), + Collectors.counting() + )); + + rowsByBatch.forEach((batchNumber, batchSize) -> { + Batch batch = batch(intermediateTable, batchNumber); + synchronized (batch) { + long remainder = batch.recordWrites(batchSize); + batch.notifyAll(); + logger.trace("Notified merge flush executor of successful write of {} rows " + + "for batch {} for intermediate table {}; {} rows remaining", + batchSize, batchNumber, intermediateTable, remainder); + } + }); + } + + /** + * Increment the batch number for the given table, and return the old batch number. + * @param intermediateTable the table whose batch number should be incremented + * @return the batch number for the table, pre-increment + */ + public int incrementBatch(TableId intermediateTable) { + AtomicInteger batchCount = batchNumbers.get(intermediateTable); + // See addToBatch for an explanation of the synchronization here + synchronized (batchCount) { + return batchCount.getAndIncrement(); + } + } + + /** + * Prepare to merge the batch for the given table, by ensuring that all prior batches for that + * table have completed and that all rows for the batch itself have been written. + * @param intermediateTable the table for the batch + * @param batchNumber the batch number to prepare to flush + * @return whether a flush is necessary (will be false if no rows were present in the given batch) + */ + public boolean prepareToFlush(TableId intermediateTable, int batchNumber) { + final ConcurrentMap allBatchesForTable = batches.get(intermediateTable); + if (batchNumber != 0) { + final int priorBatchNumber = batchNumber - 1; + synchronized (allBatchesForTable) { + logger.debug("Ensuring batch {} is completed for intermediate table {} before flushing batch {}", + priorBatchNumber, intermediateTable, batchNumber); + while (allBatchesForTable.containsKey(priorBatchNumber)) { + try { + allBatchesForTable.wait(); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for batch {} to complete for intermediate table {}", + batchNumber, intermediateTable); + throw new ConnectException(String.format( + "Interrupted while waiting for batch %d to complete for intermediate tabld %s", + batchNumber, intermediateTable + )); + } + } + } + } else { + logger.debug("Flushing first batch for intermediate table {}", intermediateTable); + } + + final Batch currentBatch = allBatchesForTable.get(batchNumber); + if (currentBatch == null) { + logger.trace("No rows to write in batch {} for intermediate table {}", batchNumber, intermediateTable); + return false; + } + + synchronized (currentBatch) { + logger.debug("{} rows currently remaining for batch {} for intermediate table {}", + currentBatch.pending(), batchNumber, intermediateTable); + while (currentBatch.pending() != 0) { + logger.trace("Waiting for all rows for batch {} from intermediate table {} to be written before flushing; {} remaining", + batchNumber, intermediateTable, currentBatch.pending()); + try { + currentBatch.wait(); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for all rows for batch {} from intermediate table {} to be written", + batchNumber, intermediateTable); + throw new ConnectException(String.format( + "Interrupted while waiting for all rows for batch %d from intermediate table %s to be written", + batchNumber, intermediateTable + )); + } + } + } + + try { + logger.trace( + "Waiting {} seconds before running merge query on batch {} from intermediate table {} " + + "in order to ensure that all rows are available in the streaming buffer", + STREAMING_BUFFER_AVAILABILITY_WAIT_MS, batchNumber, intermediateTable); + Thread.sleep(STREAMING_BUFFER_AVAILABILITY_WAIT_MS); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting before merge flushing batch {} for intermediate table {}", + batchNumber, intermediateTable); + throw new ConnectException(String.format( + "Interrupted while waiting before merge flushing batch %d for intermediate table %s", + batchNumber, intermediateTable + )); + } + return true; + } + + /** + * Record a successful merge flush of all of the rows for the given batch in the intermediate + * table, alert any waiting merge flushes that are predicated on the completion of this merge + * flush, and marke the offsets for all of those rows as safe to commit. + * @param intermediateTable the source of the merge flush + * @param batchNumber the batch for the merge flush + */ + public void recordSuccessfulFlush(TableId intermediateTable, int batchNumber) { + logger.trace("Successfully merge flushed batch {} for intermediate table {}", + batchNumber, intermediateTable); + final ConcurrentMap allBatchesForTable = batches.get(intermediateTable); + Batch batch = allBatchesForTable.remove(batchNumber); + + synchronized (allBatchesForTable) { + allBatchesForTable.notifyAll(); + } + + synchronized (offsets) { + offsets.putAll(batch.offsets()); + } + } + + private Batch batch(TableId intermediateTable, int batchNumber) { + return batches.get(intermediateTable).get(batchNumber); + } + + private static class Batch { + private final AtomicLong pending; + private final AtomicLong total; + private final Map offsets; + + public Batch() { + this.total = new AtomicLong(); + this.pending = new AtomicLong(); + this.offsets = new HashMap<>(); + } + + public long pending() { + return pending.get(); + } + + public long total() { + return total.get(); + } + + public Map offsets() { + return offsets; + } + + public void recordOffsetFor(SinkRecord record) { + offsets.put( + new TopicPartition(record.topic(), record.kafkaPartition()), + // Use the offset of the record plus one here since that'll be the offset that we'll + // resume at if/when this record is the last-committed record and then the task is + // restarted + record.kafkaOffset() + 1); + } + + /** + * Increment the total and pending number of records, and return the number of pending records + * @return the number of pending records for this batch + */ + public long increment() { + total.incrementAndGet(); + return pending.incrementAndGet(); + } + + public long recordWrites(long numRows) { + return pending.addAndGet(-numRows); + } + } +} diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java index dee011142..e8747f851 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java @@ -21,6 +21,7 @@ import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; +import com.google.cloud.bigquery.TableId; import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter; import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId; import com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter; @@ -31,11 +32,14 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.Objects; +import java.util.function.Consumer; /** * Simple Table Writer that attempts to write all the rows it is given at once. @@ -50,18 +54,23 @@ public class TableWriter implements Runnable { private final BigQueryWriter writer; private final PartitionedTableId table; private final SortedMap rows; + private final Consumer> onFinish; /** * @param writer the {@link BigQueryWriter} to use. * @param table the BigQuery table to write to. * @param rows the rows to write. + * @param onFinish a callback to invoke after all rows have been written successfully, which is + * called with all the rows written by the writer */ public TableWriter(BigQueryWriter writer, PartitionedTableId table, - SortedMap rows) { + SortedMap rows, + Consumer> onFinish) { this.writer = writer; this.table = table; this.rows = rows; + this.onFinish = onFinish; } @Override @@ -106,6 +115,7 @@ public void run() { logger.debug(logMessage, rows.size(), successCount, failureCount); } + onFinish.accept(rows.values()); } private static int getNewBatchSize(int currentBatchSize) { @@ -152,6 +162,7 @@ public static class Builder implements TableWriterBuilder { private SortedMap rows; private SinkRecordConverter recordConverter; + private Consumer> onFinish; /** * @param writer the BigQueryWriter to use @@ -165,22 +176,31 @@ public Builder(BigQueryWriter writer, PartitionedTableId table, SinkRecordConver this.rows = new TreeMap<>(Comparator.comparing(SinkRecord::kafkaPartition) .thenComparing(SinkRecord::kafkaOffset)); this.recordConverter = recordConverter; + + this.onFinish = null; } - /** - * Add a record to the builder. - * @param record the row to add - */ - public void addRow(SinkRecord record) { - rows.put(record, recordConverter.getRecordRow(record)); + @Override + public void addRow(SinkRecord record, TableId table) { + rows.put(record, recordConverter.getRecordRow(record, table)); } /** - * Create a {@link TableWriter} from this builder. - * @return a TableWriter containing the given writer, table, topic, and all added rows. + * Specify a callback to be invoked after all rows have been written. The callback will be + * invoked with the full list of rows written by this table writer. + * @param onFinish the callback to invoke; may not be null + * @throws IllegalStateException if invoked more than once on a single builder instance */ + public void onFinish(Consumer> onFinish) { + if (this.onFinish != null) { + throw new IllegalStateException("Cannot overwrite existing finish callback"); + } + this.onFinish = Objects.requireNonNull(onFinish, "Finish callback cannot be null"); + } + + @Override public TableWriter build() { - return new TableWriter(writer, table, rows); + return new TableWriter(writer, table, rows, onFinish != null ? onFinish : n -> { }); } } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java index 556ae98e3..cbd9ba8b8 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java @@ -18,6 +18,7 @@ */ +import com.google.cloud.bigquery.TableId; import org.apache.kafka.connect.sink.SinkRecord; /** @@ -28,8 +29,9 @@ public interface TableWriterBuilder { /** * Add a record to the builder. * @param sinkRecord the row to add. + * @param table the table the row will be written to. */ - void addRow(SinkRecord sinkRecord); + void addRow(SinkRecord sinkRecord, TableId table); /** * Create a {@link TableWriter} from this builder. diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index 10c30babc..e3b970459 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -47,7 +47,7 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter { private static final Logger logger = LoggerFactory.getLogger(AdaptiveBigQueryWriter.class); // The maximum number of retries we will attempt to write rows after creating a table or updating a BQ table schema. - private static final int RETRY_LIMIT = 5; + private static final int RETRY_LIMIT = 10; // Wait for about 30s between each retry since both creating table and updating schema take up to 2~3 minutes to take effect. private static final int RETRY_WAIT_TIME = 30000; @@ -60,6 +60,7 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter { * @param schemaManager Used to update BigQuery tables. * @param retry How many retries to make in the event of a 500/503 error. * @param retryWait How long to wait in between retries. + * @param autoCreateTables Whether tables should be automatically created */ public AdaptiveBigQueryWriter(BigQuery bigQuery, SchemaManager schemaManager, @@ -149,7 +150,7 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) { return new HashMap<>(); } - private void attemptSchemaUpdate(PartitionedTableId tableId, Set records) { + protected void attemptSchemaUpdate(PartitionedTableId tableId, Set records) { try { schemaManager.updateSchema(tableId.getBaseTableId(), records); } catch (BigQueryException exception) { @@ -158,9 +159,8 @@ private void attemptSchemaUpdate(PartitionedTableId tableId, Set rec } } - private void attemptTableCreate(TableId tableId, Set records) { + protected void attemptTableCreate(TableId tableId, Set records) { try { - logger.info("Table {} does not exist, auto-creating table", tableId); schemaManager.createTable(tableId, records); } catch (BigQueryException exception) { throw new BigQueryConnectException( diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/UpsertDeleteBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/UpsertDeleteBigQueryWriter.java new file mode 100644 index 000000000..20e30226d --- /dev/null +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/UpsertDeleteBigQueryWriter.java @@ -0,0 +1,94 @@ +package com.wepay.kafka.connect.bigquery.write.row; + +/* + * Copyright 2016 WePay, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.TableId; +import com.wepay.kafka.connect.bigquery.SchemaManager; +import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; +import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; + +public class UpsertDeleteBigQueryWriter extends AdaptiveBigQueryWriter { + + private final SchemaManager schemaManager; + private final boolean autoCreateTables; + private final Map intermediateToDestinationTables; + + /** + * @param bigQuery Used to send write requests to BigQuery. + * @param schemaManager Used to update BigQuery tables. + * @param retry How many retries to make in the event of a 500/503 error. + * @param retryWait How long to wait in between retries. + * @param autoCreateTables Whether destination tables should be automatically created + * @param intermediateToDestinationTables A mapping used to determine the destination table for + * given intermediate tables; used for create/update + * operations in order to propagate them to the destination + * table + */ + public UpsertDeleteBigQueryWriter(BigQuery bigQuery, + SchemaManager schemaManager, + int retry, + long retryWait, + boolean autoCreateTables, + Map intermediateToDestinationTables) { + // Hardcode autoCreateTables to true in the superclass so that intermediate tables will be + // automatically created + // The super class will handle all of the logic for writing to, creating, and updating + // intermediate tables; this class will handle logic for creating/updating the destination table + super(bigQuery, schemaManager.forIntermediateTables(), retry, retryWait, true); + this.schemaManager = schemaManager; + this.autoCreateTables = autoCreateTables; + this.intermediateToDestinationTables = intermediateToDestinationTables; + } + + @Override + protected void attemptSchemaUpdate(PartitionedTableId tableId, Set records) { + // Update the intermediate table here... + super.attemptSchemaUpdate(tableId, records); + try { + // ... and update the destination table here + schemaManager.updateSchema(intermediateToDestinationTables.get(tableId.getBaseTableId()), records); + } catch (BigQueryException exception) { + throw new BigQueryConnectException( + "Failed to update destination table schema for: " + tableId.getBaseTableId(), exception); + } + } + + @Override + protected void attemptTableCreate(TableId tableId, Set records) { + // Create the intermediate table here... + super.attemptTableCreate(tableId, records); + if (autoCreateTables) { + try { + // ... and create or update the destination table here, if it doesn't already exist and auto + // table creation is enabled + schemaManager.createOrUpdateTable(intermediateToDestinationTables.get(tableId), records); + } catch (BigQueryException exception) { + throw new BigQueryConnectException( + "Failed to create table " + tableId, exception); + } + } + } +} diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java index 1da4bafe1..026e989ba 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java @@ -38,6 +38,7 @@ import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException; import org.apache.kafka.common.config.ConfigException; @@ -102,6 +103,7 @@ public void testTaskConfigs() { List> taskConfigs = testConnector.taskConfigs(i); assertEquals(i, taskConfigs.size()); for (int j = 0; j < i; j++) { + expectedProperties.put(BigQuerySinkTaskConfig.TASK_ID_CONFIG, Integer.toString(j)); assertEquals( "Connector properties should match task configs", expectedProperties, @@ -127,7 +129,7 @@ public void testTaskConfigs() { @Test public void testConfig() { - assertEquals(BigQuerySinkConfig.getConfig(), new BigQuerySinkConnector().config()); + assertNotNull(new BigQuerySinkConnector().config()); } // Make sure that a config exception is properly translated into a SinkConfigConnectException diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index 358a3c8ab..b24e7f40a 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -314,7 +314,7 @@ public void testPutWhenPartitioningOnMessageTimeWhenNoTimestampType() { TimestampType.NO_TIMESTAMP_TYPE, null))); } - // It's important that the buffer be completely wiped after a call to flush, since any execption + // It's important that the buffer be completely wiped after a call to flush, since any exception // thrown during flush causes Kafka Connect to not commit the offsets for any records sent to the // task since the last flush @Test diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkTaskPropertiesFactory.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkTaskPropertiesFactory.java index 8e7b139af..aec3ed471 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkTaskPropertiesFactory.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkTaskPropertiesFactory.java @@ -29,6 +29,7 @@ public Map getProperties() { Map properties = super.getProperties(); properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false"); + properties.put(BigQuerySinkTaskConfig.TASK_ID_CONFIG, "4"); return properties; } diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfigTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfigTest.java index 6e916fc0a..e76fc192d 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfigTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfigTest.java @@ -18,9 +18,7 @@ */ -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import com.wepay.kafka.connect.bigquery.SinkPropertiesFactory; @@ -31,7 +29,6 @@ import org.junit.Before; import org.junit.Test; -import java.util.HashMap; import java.util.Map; public class BigQuerySinkConfigTest {