diff --git a/README.md b/README.md index 6bbfc4a9a..657cfe19f 100644 --- a/README.md +++ b/README.md @@ -131,12 +131,41 @@ adjusting flags given to the Avro Console Producer and tweaking the config setti ## Integration Testing the Connector +There is a legacy Docker-based integration test for the connector, and newer integration tests that +programmatically instantiate an embedded Connect cluster. + +### Embedded integration tests + +Currently these tests only verify the connector's upsert/delete feature. They should eventually +replace all of the existing Docker-based tests. + +#### Configuring the tests + +You must supply the following environment variables in order to run the tests: + +- `$KCBQ_TEST_PROJECT`: The name of the BigQuery project to use for the test +- `$KCBQ_TEST_DATASET`: The name of the BigQuery dataset to use for the test +- `$KCBQ_TEST_KEYFILE`: The key (either file or raw contents) used to authenticate with BigQuery +during the test + +Additionally, the `$KCBQ_TEST_KEYSOURCE` variable can be supplied to specify whether the value of +`$KCBQ_TEST_KEYFILE` are a path to a key file (if set to `FILE`) or the raw contents of a key file +(if set to `JSON`). The default is `FILE`. + +#### Running the Integration Tests + +```bash +./gradlew embeddedIntegrationTest +``` + +### Docker-based tests + > **NOTE**: You must have [Docker] installed and running on your machine in order to run integration tests for the connector. This all takes place in the `kcbq-connector` directory. -### How Integration Testing Works +#### How Integration Testing Works Integration tests run by creating [Docker] instances for [Zookeeper], [Kafka], [Schema Registry], and the BigQuery Connector itself, then verifying the results using a [JUnit] test. @@ -148,7 +177,7 @@ The project and dataset they write to, as well as the specific JSON key file the specified by command-line flag, environment variable, or configuration file — the exact details of each can be found by running the integration test script with the `-?` flag. -### Data Corruption Concerns +#### Data Corruption Concerns In order to ensure the validity of each test, any table that will be written to in the course of integration testing is preemptively deleted before the connector is run. This will only be an issue @@ -161,7 +190,7 @@ tests will corrupt any existing data that is already on your machine, and there free up any of your ports that might currently be in use by real instances of the programs that are faked in the process of testing. -### Running the Integration Tests +#### Running the Integration Tests Running the series of integration tests is easy: @@ -176,7 +205,7 @@ the `--help` flag. > **NOTE:** You must have a recent version of [boot2docker], [Docker Machine], [Docker], etc. installed. Older versions will hang when cleaning containers, and linking doesn't work properly. -### Adding New Integration Tests +#### Adding New Integration Tests Adding an integration test is a little more involved, and consists of two major steps: specifying Avro data to be sent to Kafka, and specifying via JUnit test how to verify that such data made diff --git a/build.gradle b/build.gradle index 1652f5af4..13bfe3e49 100644 --- a/build.gradle +++ b/build.gradle @@ -28,6 +28,7 @@ project.ext { ioConfluentVersion = '5.5.0' junitVersion = '4.12' kafkaVersion = '2.5.0' + kafkaScalaVersion = '2.12' // For integration testing only mockitoVersion = '3.2.4' slf4jVersion = '1.6.1' } @@ -153,6 +154,26 @@ project(':kcbq-connector') { } } + test { + useJUnit { + // Exclude embedded integration tests from normal testing since they require BigQuery + // credentials and can take a while + excludeCategories 'org.apache.kafka.test.IntegrationTest' + } + } + + task embeddedIntegrationTest(type: Test) { + useJUnit { + includeCategories 'org.apache.kafka.test.IntegrationTest' + } + + // Enable logging for integration tests + testLogging { + outputs.upToDateWhen {false} + showStandardStreams = true + } + } + task integrationTestPrep() { dependsOn 'integrationTestTablePrep' dependsOn 'integrationTestBucketPrep' @@ -226,7 +247,12 @@ project(':kcbq-connector') { "junit:junit:$junitVersion", "org.mockito:mockito-core:$mockitoVersion", "org.mockito:mockito-inline:$mockitoVersion", - "org.apache.kafka:connect-api:$kafkaVersion" + "org.apache.kafka:kafka_$kafkaScalaVersion:$kafkaVersion", + "org.apache.kafka:kafka_$kafkaScalaVersion:$kafkaVersion:test", + "org.apache.kafka:kafka-clients:$kafkaVersion:test", + "org.apache.kafka:connect-api:$kafkaVersion", + "org.apache.kafka:connect-runtime:$kafkaVersion", + "org.apache.kafka:connect-runtime:$kafkaVersion:test", ) } diff --git a/kcbq-api/src/main/java/com/wepay/kafka/connect/bigquery/api/SchemaRetriever.java b/kcbq-api/src/main/java/com/wepay/kafka/connect/bigquery/api/SchemaRetriever.java index 704ec5828..b21a36932 100644 --- a/kcbq-api/src/main/java/com/wepay/kafka/connect/bigquery/api/SchemaRetriever.java +++ b/kcbq-api/src/main/java/com/wepay/kafka/connect/bigquery/api/SchemaRetriever.java @@ -30,5 +30,4 @@ public interface SchemaRetriever { * @return The value Schema for the given record. */ Schema retrieveValueSchema(SinkRecord record); - } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index ca30f382c..f29ec470a 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -69,6 +70,8 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.intTable; + /** * A {@link SinkTask} used to translate Kafka Connect {@link SinkRecord SinkRecords} into BigQuery * {@link RowToInsert RowToInserts} and subsequently write them to BigQuery. @@ -131,6 +134,11 @@ public BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever, @Override public void flush(Map offsets) { + if (upsertDelete) { + throw new ConnectException("This connector cannot perform upsert/delete on older versions of " + + "the Connect framework; please upgrade to version 0.10.2.0 or later"); + } + try { executor.awaitCurrentTasks(); } catch (InterruptedException err) { @@ -459,33 +467,36 @@ private void maybeStartMergeFlushTask() { @Override public void stop() { + maybeStopExecutor(loadExecutor, "load executor"); + maybeStopExecutor(executor, "table write executor"); + if (upsertDelete) { + mergeBatches.intermediateTables().forEach(table -> { + logger.debug("Deleting {}", intTable(table)); + getBigQuery().delete(table); + }); + } + + logger.trace("task.stop()"); + } + + private void maybeStopExecutor(ExecutorService executor, String executorName) { + if (executor == null) { + return; + } + try { if (upsertDelete) { - mergeBatches.intermediateTables().forEach(table -> { - logger.debug("Deleting intermediate table {}", table); - getBigQuery().delete(table); - }); - } - } finally { - try { + logger.trace("Forcibly shutting down {}", executorName); + executor.shutdownNow(); + } else { + logger.trace("Requesting shutdown for {}", executorName); executor.shutdown(); - executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); - if (loadExecutor != null) { - try { - logger.info("Attempting to shut down load executor."); - loadExecutor.shutdown(); - loadExecutor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - logger.warn("Could not shut down load executor within {}s.", - EXECUTOR_SHUTDOWN_TIMEOUT_SEC); - } - } - } catch (InterruptedException ex) { - logger.warn("{} active threads are still executing tasks {}s after shutdown is signaled.", - executor.getActiveCount(), EXECUTOR_SHUTDOWN_TIMEOUT_SEC); - } finally { - logger.trace("task.stop()"); } + logger.trace("Awaiting termination of {}", executorName); + executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); + logger.trace("Shut down {} successfully", executorName); + } catch (Exception e) { + logger.warn("Failed to shut down {}", executorName, e); } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java index 168e3e1b9..6612c1409 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java @@ -26,6 +26,7 @@ import com.google.cloud.bigquery.TableId; import com.google.common.annotations.VisibleForTesting; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; +import com.wepay.kafka.connect.bigquery.exception.ExpectedInterruptException; import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor; import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches; import org.apache.kafka.connect.errors.ConnectException; @@ -37,9 +38,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.destTable; +import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.intTable; + public class MergeQueries { public static final String INTERMEDIATE_TABLE_KEY_FIELD_NAME = "key"; public static final String INTERMEDIATE_TABLE_VALUE_FIELD_NAME = "value"; + public static final String INTERMEDIATE_TABLE_ITERATION_FIELD_NAME = "i"; public static final String INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME = "partitionTime"; public static final String INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD = "batchNumber"; @@ -105,16 +110,16 @@ public void mergeFlushAll() { public void mergeFlush(TableId intermediateTable) { final TableId destinationTable = mergeBatches.destinationTableFor(intermediateTable); final int batchNumber = mergeBatches.incrementBatch(intermediateTable); - logger.trace("Triggering merge flush from intermediate table {} to destination table {} for batch {}", - intermediateTable, destinationTable, batchNumber); + logger.trace("Triggering merge flush from {} to {} for batch {}", + intTable(intermediateTable), destTable(destinationTable), batchNumber); executor.execute(() -> { try { mergeFlush(intermediateTable, destinationTable, batchNumber); } catch (InterruptedException e) { - throw new ConnectException(String.format( + throw new ExpectedInterruptException(String.format( "Interrupted while performing merge flush of batch %d from %s to %s", - batchNumber, intermediateTable, destinationTable)); + batchNumber, intTable(intermediateTable), destTable(destinationTable))); } }); } @@ -124,211 +129,312 @@ private void mergeFlush( ) throws InterruptedException{ // If there are rows to flush in this batch, flush them if (mergeBatches.prepareToFlush(intermediateTable, batchNumber)) { - logger.debug("Running merge query on batch {} from intermediate table {}", - batchNumber, intermediateTable); + logger.debug("Running merge query on batch {} from {}", + batchNumber, intTable(intermediateTable)); String mergeFlushQuery = mergeFlushQuery(intermediateTable, destinationTable, batchNumber); logger.trace(mergeFlushQuery); bigQuery.query(QueryJobConfiguration.of(mergeFlushQuery)); - logger.trace("Merge from intermediate table {} to destination table {} completed", - intermediateTable, destinationTable); + logger.trace("Merge from {} to {} completed", + intTable(intermediateTable), destTable(destinationTable)); logger.debug("Recording flush success for batch {} from {}", - batchNumber, intermediateTable); + batchNumber, intTable(intermediateTable)); mergeBatches.recordSuccessfulFlush(intermediateTable, batchNumber); // Commit those offsets ASAP context.requestCommit(); logger.info("Completed merge flush of batch {} from {} to {}", - batchNumber, intermediateTable, destinationTable); + batchNumber, intTable(intermediateTable), destTable(destinationTable)); } // After, regardless of whether we flushed or not, clean up old batches from the intermediate // table. Some rows may be several batches old but still in the table if they were in the // streaming buffer during the last purge. - logger.trace("Clearing batches from {} on back from intermediate table {}", batchNumber, intermediateTable); + logger.trace("Clearing batches from {} on back from {}", batchNumber, intTable(intermediateTable)); String batchClearQuery = batchClearQuery(intermediateTable, batchNumber); logger.trace(batchClearQuery); bigQuery.query(QueryJobConfiguration.of(batchClearQuery)); } - /* + @VisibleForTesting + String mergeFlushQuery(TableId intermediateTable, TableId destinationTable, int batchNumber) { + Schema intermediateSchema = schemaManager.cachedSchema(intermediateTable); - upsert+delete: - - MERGE ``.`` - USING ( - SELECT * FROM ( - SELECT ARRAY_AGG( - x ORDER BY partitionTime DESC LIMIT 1 - )[OFFSET(0)] src - FROM ``.`` x - WHERE batchNumber= - GROUP BY key.[, key....] - ) - ) - ON ``.=`src`.key - WHEN MATCHED AND `src`.value IS NOT NULL - THEN UPDATE SET =`src`.value.[, =`src`.value....] - WHEN MATCHED AND `src`.value IS NULL - THEN DELETE - WHEN NOT MATCHED AND `src`.value IS NOT NULL - THEN INSERT (, _PARTITIONTIME, [, ]) - VALUES ( - `src`.key, - CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), - `src`.value.[, `src`.value....] - ); + if (upsertEnabled && deleteEnabled) { + return upsertDeleteMergeFlushQuery(intermediateTable, destinationTable, batchNumber, intermediateSchema); + } else if (upsertEnabled) { + return upsertMergeFlushQuery(intermediateTable, destinationTable, batchNumber, intermediateSchema); + } else if (deleteEnabled) { + return deleteMergeFlushQuery(intermediateTable, destinationTable, batchNumber, intermediateSchema); + } else { + throw new IllegalStateException("At least one of upsert or delete must be enabled for merge flushing to occur."); + } + } + /* + MERGE ``.`` + USING ( + SELECT * FROM ( + SELECT ARRAY_AGG( + x ORDER BY i DESC LIMIT 1 + )[OFFSET(0)] src + FROM ``.`` x + WHERE batchNumber= + GROUP BY key.[, key....] + ) + ) + ON ``.=src.key + WHEN MATCHED AND src.value IS NOT NULL + THEN UPDATE SET =src.value.[, =src.value....] + WHEN MATCHED AND src.value IS NULL + THEN DELETE + WHEN NOT MATCHED AND src.value IS NOT NULL + THEN INSERT (, [_PARTITIONTIME, ][, ]) + VALUES ( + src.key, + [CAST(CAST(DATE(src.partitionTime) AS DATE) AS TIMESTAMP),] + src.value.[, src.value....] + ); + */ + private String upsertDeleteMergeFlushQuery( + TableId intermediateTable, TableId destinationTable, int batchNumber, Schema intermediateSchema + ) { + List keyFields = listFields( + intermediateSchema.getFields().get(INTERMEDIATE_TABLE_KEY_FIELD_NAME).getSubFields(), + INTERMEDIATE_TABLE_KEY_FIELD_NAME + "." + ); - delete only: + List valueColumns = valueColumns(intermediateSchema); + + final String key = INTERMEDIATE_TABLE_KEY_FIELD_NAME; + final String i = INTERMEDIATE_TABLE_ITERATION_FIELD_NAME; + final String value = INTERMEDIATE_TABLE_VALUE_FIELD_NAME; + final String batch = INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD; + + return "MERGE " + table(destinationTable) + " " + + "USING (" + + "SELECT * FROM (" + + "SELECT ARRAY_AGG(" + + "x ORDER BY " + i + " DESC LIMIT 1" + + ")[OFFSET(0)] src " + + "FROM " + table(intermediateTable) + " x " + + "WHERE " + batch + "=" + batchNumber + " " + + "GROUP BY " + String.join(", ", keyFields) + + ")" + + ") " + + "ON `" + destinationTable.getTable() + "`." + keyFieldName + "=src." + key + " " + + "WHEN MATCHED AND src." + value + " IS NOT NULL " + + "THEN UPDATE SET " + valueColumns.stream().map(col -> col + "=src." + value + "." + col).collect(Collectors.joining(", ")) + " " + + "WHEN MATCHED AND src." + value + " IS NULL " + + "THEN DELETE " + + "WHEN NOT MATCHED AND src." + value + " IS NOT NULL " + + "THEN INSERT (" + + keyFieldName + ", " + + partitionTimePseudoColumn() + + String.join(", ", valueColumns) + ") " + + "VALUES (" + + "src." + key + ", " + + partitionTimeValue() + + valueColumns.stream().map(col -> "src." + value + "." + col).collect(Collectors.joining(", ")) + + ");"; + } - MERGE ``.`` - USING ( - SELECT * FROM ( - SELECT ARRAY_AGG( - x ORDER BY partitionTime DESC LIMIT 1 - )[OFFSET(0)] src - FROM ``.`` x - WHERE batchNumber= - GROUP BY key.[, key....] + /* + MERGE ``.`` + USING ( + SELECT * FROM ( + SELECT ARRAY_AGG( + x ORDER BY i DESC LIMIT 1 + )[OFFSET(0)] src + FROM ``.`` x + WHERE batchNumber= + GROUP BY key.[, key....] + ) ) - ) - ON ``.=`src`.key AND `src`.value IS NULL - WHEN MATCHED - THEN DELETE - WHEN NOT MATCHED - THEN INSERT (, _PARTITIONTIME, [, ]) - VALUES ( - `src`.key, - CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), - `src`.value.[, `src`.value....] - ); - + ON ``.=src.key + WHEN MATCHED + THEN UPDATE SET =src.value.[, ...] + WHEN NOT MATCHED + THEN INSERT ([, src.value....] + ); + */ + private String upsertMergeFlushQuery( + TableId intermediateTable, TableId destinationTable, int batchNumber, Schema intermediateSchema + ) { + List keyFields = listFields( + intermediateSchema.getFields().get(INTERMEDIATE_TABLE_KEY_FIELD_NAME).getSubFields(), + INTERMEDIATE_TABLE_KEY_FIELD_NAME + "." + ); - upsert only: + List valueColumns = valueColumns(intermediateSchema); + + final String key = INTERMEDIATE_TABLE_KEY_FIELD_NAME; + final String i = INTERMEDIATE_TABLE_ITERATION_FIELD_NAME; + final String value = INTERMEDIATE_TABLE_VALUE_FIELD_NAME; + final String batch = INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD; + + return "MERGE " + table(destinationTable) + " " + + "USING (" + + "SELECT * FROM (" + + "SELECT ARRAY_AGG(" + + "x ORDER BY " + i + " DESC LIMIT 1" + + ")[OFFSET(0)] src " + + "FROM " + table(intermediateTable) + " x " + + "WHERE " + batch + "=" + batchNumber + " " + + "GROUP BY " + String.join(", ", keyFields) + + ")" + + ") " + + "ON `" + destinationTable.getTable() + "`." + keyFieldName + "=src." + key + " " + + "WHEN MATCHED " + + "THEN UPDATE SET " + valueColumns.stream().map(col -> col + "=src." + value + "." + col).collect(Collectors.joining(", ")) + " " + + "WHEN NOT MATCHED " + + "THEN INSERT (" + + keyFieldName + ", " + + partitionTimePseudoColumn() + + String.join(", ", valueColumns) + ") " + + "VALUES (" + + "src." + key + ", " + + partitionTimeValue() + + valueColumns.stream().map(col -> "src." + value + "." + col).collect(Collectors.joining(", ")) + + ");"; + } - MERGE ``.`` - USING ( - SELECT * FROM ( - SELECT ARRAY_AGG( - x ORDER BY partitionTime DESC LIMIT 1 - )[OFFSET(0)] src - FROM ``.`` x - WHERE batchNumber= - GROUP BY key.[, key....] - ) - ) - ON ``.=`src`.key - WHEN MATCHED - THEN UPDATE SET =`src`.value.[, ...] - WHEN NOT MATCHED - THEN INSERT (`.`` + USING ( + SELECT batch.key AS key, [partitionTime, ]value + FROM ( + SELECT src.i, src.key FROM ( + SELECT ARRAY_AGG( + x ORDER BY i DESC LIMIT 1 + )[OFFSET(0)] src + FROM ( + SELECT * FROM ``.`` + WHERE batchNumber= + ) x + WHERE x.value IS NULL + GROUP BY key.[, key....])) AS deletes + RIGHT JOIN ( + SELECT * FROM ``.` + ) AS batch + USING (key) + WHERE deletes.i IS NULL OR batch.i >= deletes.i + ORDER BY batch.i ASC) AS src + ON ``.=src.key AND src.value IS NULL + WHEN MATCHED + THEN DELETE + WHEN NOT MATCHED AND src.value IS NOT NULL + THEN INSERT (, [_PARTITIONTIME, ][, ]) VALUES ( - `src`.key, - CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), - `src`.value.[, `src`.value....] + src.key, + [CAST(CAST(DATE(src.partitionTime) AS DATE) AS TIMESTAMP),] + src.value.[, src.value....] ); - */ - @VisibleForTesting - String mergeFlushQuery(TableId intermediateTable, TableId destinationTable, int batchNumber) { - Schema intermediateSchema = schemaManager.cachedSchema(intermediateTable); - - String srcKey = INTERMEDIATE_TABLE_KEY_FIELD_NAME; - + private String deleteMergeFlushQuery( + TableId intermediateTable, TableId destinationTable, int batchNumber, Schema intermediateSchema + ) { List keyFields = listFields( - intermediateSchema.getFields().get(srcKey).getSubFields(), - srcKey + "." + intermediateSchema.getFields().get(INTERMEDIATE_TABLE_KEY_FIELD_NAME).getSubFields(), + INTERMEDIATE_TABLE_KEY_FIELD_NAME + "." ); - List dstValueFields = intermediateSchema.getFields().get(INTERMEDIATE_TABLE_VALUE_FIELD_NAME).getSubFields() + + List valueColumns = valueColumns(intermediateSchema); + + final String key = INTERMEDIATE_TABLE_KEY_FIELD_NAME; + final String i = INTERMEDIATE_TABLE_ITERATION_FIELD_NAME; + final String value = INTERMEDIATE_TABLE_VALUE_FIELD_NAME; + final String batch = INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD; + + return "MERGE " + table(destinationTable) + " " + + "USING (" + + "SELECT batch." + key + " AS " + key + ", " + partitionTimeColumn() + value + " " + + "FROM (" + + "SELECT src." + i + ", src." + key + " FROM (" + + "SELECT ARRAY_AGG(" + + "x ORDER BY " + i + " DESC LIMIT 1" + + ")[OFFSET(0)] src " + + "FROM (" + + "SELECT * FROM " + table(intermediateTable) + " " + + "WHERE " + batch + "=" + batchNumber + + ") x " + + "WHERE x." + value + " IS NULL " + + "GROUP BY " + String.join(", ", keyFields) + ")) AS deletes " + + "RIGHT JOIN (" + + "SELECT * FROM " + table(intermediateTable) + " " + + "WHERE " + batch + "=" + batchNumber + + ") AS batch " + + "USING (" + key + ") " + + "WHERE deletes." + i + " IS NULL OR batch." + i + " >= deletes." + i + " " + + "ORDER BY batch." + i + " ASC) AS src " + + "ON `" + destinationTable.getTable() + "`." + keyFieldName + "=src." + key + " AND src." + value + " IS NULL " + + "WHEN MATCHED " + + "THEN DELETE " + + "WHEN NOT MATCHED AND src." + value + " IS NOT NULL " + + "THEN INSERT (" + + keyFieldName + ", " + + partitionTimePseudoColumn() + + String.join(", ", valueColumns) + ") " + + "VALUES (" + + "src." + key + ", " + + partitionTimeValue() + + valueColumns.stream().map(col -> "src." + value + "." + col).collect(Collectors.joining(", ")) + + ");"; + } + + private String table(TableId tableId) { + return String.format("`%s`.`%s`", tableId.getDataset(), tableId.getTable()); + } + + private List valueColumns(Schema intermediateTableSchema) { + return intermediateTableSchema.getFields().get(INTERMEDIATE_TABLE_VALUE_FIELD_NAME).getSubFields() .stream() .map(Field::getName) .collect(Collectors.toList()); + } - List srcValueFields = dstValueFields.stream() - .map(field -> "`src`." + INTERMEDIATE_TABLE_VALUE_FIELD_NAME + "." + field) - .collect(Collectors.toList()); - List updateValues = dstValueFields.stream() - .map(field -> field + "=`src`." + INTERMEDIATE_TABLE_VALUE_FIELD_NAME + "." + field) - .collect(Collectors.toList()); + private String partitionTimePseudoColumn() { + return insertPartitionTime ? "_PARTITIONTIME, " : ""; + } - String partitionTimeField = insertPartitionTime ? "_PARTITIONTIME, " : ""; - String partitionTimeValue = insertPartitionTime - ? "CAST(CAST(DATE(`src`." + INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME + ") AS DATE) AS TIMESTAMP), " + private String partitionTimeValue() { + return insertPartitionTime + ? "CAST(CAST(DATE(src." + INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME + ") AS DATE) AS TIMESTAMP), " : ""; + } - String dst = destinationTable.getTable(); - - StringBuilder keysMatch = new StringBuilder("`").append(dst).append("`.").append(keyFieldName).append("=`src`.").append(srcKey); - - StringBuilder mergeOpening = new StringBuilder("MERGE `").append(destinationTable.getDataset()).append("`.`").append(destinationTable.getTable()).append("` ") - .append("USING (") - .append("SELECT * FROM (") - .append("SELECT ARRAY_AGG(") - .append("x ORDER BY ").append(INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME).append(" DESC LIMIT 1") - .append(")[OFFSET(0)] src ") - .append("FROM `").append(intermediateTable.getDataset()).append("`.`").append(intermediateTable.getTable()).append("` x ") - .append("WHERE ").append(INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD).append("=").append(batchNumber).append(" ") - .append("GROUP BY ").append(String.join(", ", keyFields)) - .append(")") - .append(") "); - - StringBuilder insertClause = new StringBuilder("THEN INSERT (") - .append(keyFieldName).append(", ") - .append(partitionTimeField) - .append(String.join(", ", dstValueFields)) - .append(") ") - .append("VALUES (") - .append("`src`.").append(srcKey).append(", ") - .append(partitionTimeValue) - .append(String.join(", ", srcValueFields)) - .append(")"); - - StringBuilder updateClause = new StringBuilder("THEN UPDATE SET ") - .append(String.join(", ", updateValues)); - - StringBuilder valueIs = new StringBuilder("`src`.").append(INTERMEDIATE_TABLE_VALUE_FIELD_NAME).append(" IS "); - - if (upsertEnabled && deleteEnabled) { - // Delete rows with null values, and upsert all others - return mergeOpening - .append("ON ").append(keysMatch).append(" ") - .append("WHEN MATCHED AND ").append(valueIs).append("NOT NULL ") - .append(updateClause).append(" ") - .append("WHEN MATCHED AND ").append(valueIs).append("NULL ") - .append("THEN DELETE ") - .append("WHEN NOT MATCHED AND ").append(valueIs).append("NOT NULL ") - .append(insertClause) - .append(";") - .toString(); - } else if (deleteEnabled) { - // Delete rows with null values, and insert all others - return mergeOpening - .append("ON ").append(keysMatch).append(" ") - .append("AND ").append(valueIs).append("NULL ") - .append("WHEN MATCHED ") - .append("THEN DELETE ") - .append("WHEN NOT MATCHED ") - .append(insertClause) - .append(";") - .toString(); - } else if (upsertEnabled) { - // Assume all rows have non-null values and upsert them all - return mergeOpening - .append("ON ").append(keysMatch).append(" ") - .append("WHEN MATCHED ") - .append(updateClause).append(" ") - .append("WHEN NOT MATCHED ") - .append(insertClause) - .append(";") - .toString(); - } else { - throw new IllegalStateException("At least one of upsert or delete must be enabled for merge flushing to occur."); - } + private String partitionTimeColumn() { + return insertPartitionTime + ? INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME + ", " + : ""; } - // DELETE FROM `` WHERE batchNumber <= AND _PARTITIONTIME IS NOT NULL; + // DELETE FROM ``.`` WHERE batchNumber <= AND _PARTITIONTIME IS NOT NULL; @VisibleForTesting static String batchClearQuery(TableId intermediateTable, int batchNumber) { return new StringBuilder("DELETE FROM `").append(intermediateTable.getDataset()).append("`.`").append(intermediateTable.getTable()).append("` ") diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java index 78e47f37d..129de9a09 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java @@ -17,17 +17,19 @@ import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder; import com.wepay.kafka.connect.bigquery.convert.SchemaConverter; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; - +import com.wepay.kafka.connect.bigquery.utils.TableNameUtils; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; import java.util.function.Function; import java.util.stream.Collectors; import java.util.concurrent.ConcurrentHashMap; @@ -249,8 +251,7 @@ private TableInfo getTableInfo(TableId table, Set records) { } catch (BigQueryConnectException exception) { throw new BigQueryConnectException("Failed to unionize schemas of records for the table " + table, exception); } - TableInfo tableInfo = constructTableInfo(table, schema, tableDescription); - return tableInfo; + return constructTableInfo(table, schema, tableDescription); } /** @@ -294,14 +295,8 @@ private com.google.cloud.bigquery.Schema getUnionizedSchema(List firstSchemaFields = firstSchema - .getFields() - .stream() - .collect(Collectors.toMap(Field::getName, Function.identity())); - Map secondSchemaFields = secondSchema - .getFields() - .stream() - .collect(Collectors.toMap(Field::getName, Function.identity())); + Map firstSchemaFields = schemaFields(firstSchema); + Map secondSchemaFields = schemaFields(secondSchema); for (Map.Entry entry : secondSchemaFields.entrySet()) { if (!firstSchemaFields.containsKey(entry.getKey())) { if (allowNewBQFields && (entry.getValue().getMode().equals(Field.Mode.NULLABLE) @@ -340,6 +335,18 @@ private String getUnionizedTableDescription(Set records) { return tableDescription; } + /** + * Returns a dictionary providing lookup of each field in the schema by name. The ordering of the + * fields in the schema is preserved in the returned map. + * @param schema The BigQuery schema + * @return A map allowing lookup of schema fields by name + */ + private Map schemaFields(com.google.cloud.bigquery.Schema schema) { + Map result = new LinkedHashMap<>(); + schema.getFields().forEach(field -> result.put(field.getName(), field)); + return result; + } + // package private for testing. TableInfo constructTableInfo(TableId table, com.google.cloud.bigquery.Schema bigQuerySchema, String tableDescription) { StandardTableDefinition.Builder builder = StandardTableDefinition.newBuilder() @@ -416,6 +423,12 @@ private List getIntermediateSchemaFields(com.google.cloud.bigquery.Schema .build(); result.add(kafkaKeyField); + Field iterationField = Field + .newBuilder(MergeQueries.INTERMEDIATE_TABLE_ITERATION_FIELD_NAME, LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.REQUIRED) + .build(); + result.add(iterationField); + Field partitionTimeField = Field .newBuilder(MergeQueries.INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME, LegacySQLTypeName.TIMESTAMP) .setMode(Field.Mode.NULLABLE) @@ -450,9 +463,9 @@ private List getRegularSchemaFields(com.google.cloud.bigquery.Schema valu } private String table(TableId table) { - return (intermediateTables ? "intermediate " : "") - + "table " - + table; + return intermediateTables + ? TableNameUtils.intTable(table) + : TableNameUtils.table(table); } private com.google.cloud.bigquery.Schema readTableSchema(TableId table) { diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/ExpectedInterruptException.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/ExpectedInterruptException.java new file mode 100644 index 000000000..093e704d0 --- /dev/null +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/ExpectedInterruptException.java @@ -0,0 +1,28 @@ +package com.wepay.kafka.connect.bigquery.exception; + +/* + * Copyright 2016 WePay, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.kafka.connect.errors.ConnectException; + +public class ExpectedInterruptException extends ConnectException { + + public ExpectedInterruptException(String message) { + super(message); + } +} diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java index 0cd9ac389..7e45dee40 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java @@ -106,6 +106,7 @@ private Map getUpsertDeleteRow(SinkRecord record, TableId table) result.put(MergeQueries.INTERMEDIATE_TABLE_KEY_FIELD_NAME, convertedKey); result.put(MergeQueries.INTERMEDIATE_TABLE_VALUE_FIELD_NAME, convertedValue); + result.put(MergeQueries.INTERMEDIATE_TABLE_ITERATION_FIELD_NAME, totalBatchSize); if (usePartitionDecorator && useMessageTimeDatePartitioning) { if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) { throw new ConnectException( diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/TableNameUtils.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/TableNameUtils.java new file mode 100644 index 000000000..ed52c9b83 --- /dev/null +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/TableNameUtils.java @@ -0,0 +1,36 @@ +package com.wepay.kafka.connect.bigquery.utils; + +/* + * Copyright 2016 WePay, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import com.google.cloud.bigquery.TableId; + +public class TableNameUtils { + + public static String table(TableId table) { + return String.format("table `%s`.`%s`", table.getDataset(), table.getTable()); + } + + public static String intTable(TableId table) { + return "intermediate " + table(table); + } + + public static String destTable(TableId table) { + return "destination " + table(table); + } +} diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java index 9e128cdaa..9494e036b 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java @@ -20,15 +20,16 @@ import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; +import com.wepay.kafka.connect.bigquery.exception.ExpectedInterruptException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -64,7 +65,8 @@ public KCBQThreadPoolExecutor(BigQuerySinkTaskConfig config, protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); - if (throwable != null) { + // Skip interrupted exceptions, as they are thrown by design on task shutdown + if (throwable != null && !(throwable instanceof ExpectedInterruptException)) { logger.error("Task failed with {} error: {}", throwable.getClass().getName(), throwable.getMessage()); @@ -110,10 +112,8 @@ public void maybeThrowEncounteredErrors() { } private static String createErrorString(Collection errors) { - List exceptionTypeStrings = new ArrayList<>(errors.size()); - exceptionTypeStrings.addAll(errors.stream() - .map(throwable -> throwable.getClass().getName()) - .collect(Collectors.toList())); - return String.join(", ", exceptionTypeStrings); + return errors.stream() + .map(Objects::toString) + .collect(Collectors.joining(", ")); } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java index 8a95ab317..bae1effd0 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java @@ -25,6 +25,7 @@ import com.google.common.collect.HashBiMap; import com.google.common.collect.Maps; import com.wepay.kafka.connect.bigquery.MergeQueries; +import com.wepay.kafka.connect.bigquery.exception.ExpectedInterruptException; import com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -42,6 +43,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.intTable; + public class MergeBatches { private static final Logger logger = LoggerFactory.getLogger(MergeBatches.class); private static final long STREAMING_BUFFER_AVAILABILITY_WAIT_MS = 10_000L; @@ -156,8 +159,8 @@ public long addToBatch(SinkRecord record, TableId intermediateTable, Map allBatchesForTable = batches.get(intermediateTable); Batch batch = allBatchesForTable.remove(batchNumber); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java index e8747f851..26a65e6ba 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java @@ -23,6 +23,7 @@ import com.google.cloud.bigquery.TableId; import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter; +import com.wepay.kafka.connect.bigquery.exception.ExpectedInterruptException; import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId; import com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter; @@ -102,7 +103,7 @@ public void run() { } } } catch (InterruptedException err) { - throw new ConnectException("Thread interrupted while writing to BigQuery.", err); + throw new ExpectedInterruptException("Thread interrupted while writing to BigQuery."); } // Common case is 1 successful call and 0 failed calls: diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index e3b970459..0fce65174 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -28,6 +28,7 @@ import com.wepay.kafka.connect.bigquery.SchemaManager; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; +import com.wepay.kafka.connect.bigquery.exception.ExpectedInterruptException; import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId; import org.apache.kafka.connect.sink.SinkRecord; @@ -130,6 +131,7 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) { writeResponse = bigQuery.insertAll(request); } catch (BigQueryException exception) { // no-op, we want to keep retrying the insert + logger.trace("insertion failed", exception); } } else { return writeResponse.getInsertErrors(); @@ -143,7 +145,7 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) { try { Thread.sleep(RETRY_WAIT_TIME); } catch (InterruptedException e) { - // no-op, we want to keep retrying the insert + throw new ExpectedInterruptException("Interrupted while waiting to retry write"); } } logger.debug("table insertion completed successfully"); @@ -178,6 +180,7 @@ protected void attemptTableCreate(TableId tableId, Set records) { * This is why we can't have nice things, Google. */ private boolean onlyContainsInvalidSchemaErrors(Map> errors) { + logger.trace("write response contained errors: \n{}", errors); boolean invalidSchemaError = false; for (List errorList : errors.values()) { for (BigQueryError error : errorList) { diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index 51646dcc3..4192089f9 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; @@ -40,6 +41,7 @@ import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.storage.Storage; +import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType; import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; @@ -150,7 +152,9 @@ public void testSimplePutWhenSchemaRetrieverIsNotNull() { testTask.initialize(sinkTaskContext); testTask.start(properties); - testTask.put(Collections.singletonList(spoofSinkRecord(topic))); + SinkRecord spoofedRecord = + spoofSinkRecord(topic, "k", "key", "v", "value", TimestampType.NO_TIMESTAMP_TYPE, null); + testTask.put(Collections.singletonList(spoofedRecord)); testTask.flush(Collections.emptyMap()); verify(bigQuery, times(1)).insertAll(any(InsertAllRequest.class)); } diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java index 9675df8c2..bba3add2f 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java @@ -98,26 +98,25 @@ private static Schema constructIntermediateTable() { public void testUpsertQueryWithPartitionTime() { String expectedQuery = "MERGE " + table(DESTINATION_TABLE) + " " - + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY i DESC LIMIT 1)[OFFSET(0)] src " + "FROM " + table(INTERMEDIATE_TABLE) + " x " + "WHERE batchNumber=" + BATCH_NUMBER + " " + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " - + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key " + + "ON `" + DESTINATION_TABLE.getTable() + "`." + KEY + "=src.key " + "WHEN MATCHED " - + "THEN UPDATE SET f1=`src`.value.f1, f2=`src`.value.f2, f3=`src`.value.f3, f4=`src`.value.f4 " + + "THEN UPDATE SET f1=src.value.f1, f2=src.value.f2, f3=src.value.f3, f4=src.value.f4 " + "WHEN NOT MATCHED " - + "THEN INSERT (" + + "THEN INSERT (" + KEY + ", " - + "_PARTITIONTIME, " + + "_PARTITIONTIME, " + "f1, f2, f3, f4) " - + "VALUES (" - + "`src`.key, " - + "CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), " - + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + "VALUES (" + + "src.key, " + + "CAST(CAST(DATE(src.partitionTime) AS DATE) AS TIMESTAMP), " + + "src.value.f1, src.value.f2, src.value.f3, src.value.f4" + ");"; String actualQuery = mergeQueries(true, true, false) .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); - System.out.println(actualQuery); assertEquals(expectedQuery, actualQuery); } @@ -125,24 +124,23 @@ public void testUpsertQueryWithPartitionTime() { public void testUpsertQueryWithoutPartitionTime() { String expectedQuery = "MERGE " + table(DESTINATION_TABLE) + " " - + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY i DESC LIMIT 1)[OFFSET(0)] src " + "FROM " + table(INTERMEDIATE_TABLE) + " x " + "WHERE batchNumber=" + BATCH_NUMBER + " " + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " - + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key " + + "ON `" + DESTINATION_TABLE.getTable() + "`." + KEY + "=src.key " + "WHEN MATCHED " - + "THEN UPDATE SET f1=`src`.value.f1, f2=`src`.value.f2, f3=`src`.value.f3, f4=`src`.value.f4 " + + "THEN UPDATE SET f1=src.value.f1, f2=src.value.f2, f3=src.value.f3, f4=src.value.f4 " + "WHEN NOT MATCHED " + "THEN INSERT (" + KEY + ", " + "f1, f2, f3, f4) " + "VALUES (" - + "`src`.key, " - + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + "src.key, " + + "src.value.f1, src.value.f2, src.value.f3, src.value.f4" + ");"; String actualQuery = mergeQueries(false, true, false) .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); - System.out.println(actualQuery); assertEquals(expectedQuery, actualQuery); } @@ -150,26 +148,41 @@ public void testUpsertQueryWithoutPartitionTime() { public void testDeleteQueryWithPartitionTime() { String expectedQuery = "MERGE " + table(DESTINATION_TABLE) + " " - + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " - + "FROM " + table(INTERMEDIATE_TABLE) + " x " - + "WHERE batchNumber=" + BATCH_NUMBER + " " - + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " - + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key AND `src`.value IS NULL " - + "WHEN MATCHED " - + "THEN DELETE " - + "WHEN NOT MATCHED " - + "THEN INSERT (" - + KEY + ", " - + "_PARTITIONTIME, " - + "f1, f2, f3, f4) " - + "VALUES (" - + "`src`.key, " - + "CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), " - + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + "USING (" + + "SELECT batch.key AS key, partitionTime, value " + + "FROM (" + + "SELECT src.i, src.key FROM (" + + "SELECT ARRAY_AGG(" + + "x ORDER BY i DESC LIMIT 1" + + ")[OFFSET(0)] src " + + "FROM (" + + "SELECT * FROM " + table(INTERMEDIATE_TABLE) + " " + + "WHERE batchNumber=" + BATCH_NUMBER + + ") x " + + "WHERE x.value IS NULL " + + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) AS deletes " + + "RIGHT JOIN (" + + "SELECT * FROM " + table(INTERMEDIATE_TABLE) + " " + + "WHERE batchNumber=" + BATCH_NUMBER + + ") AS batch " + + "USING (key) " + + "WHERE deletes.i IS NULL OR batch.i >= deletes.i " + + "ORDER BY batch.i ASC) AS src " + + "ON `" + DESTINATION_TABLE.getTable() + "`." + KEY + "=src.key AND src.value IS NULL " + + "WHEN MATCHED " + + "THEN DELETE " + + "WHEN NOT MATCHED AND src.value IS NOT NULL " + + "THEN INSERT (" + + KEY + ", " + + "_PARTITIONTIME, " + + "f1, f2, f3, f4) " + + "VALUES (" + + "src.key, " + + "CAST(CAST(DATE(src.partitionTime) AS DATE) AS TIMESTAMP), " + + "src.value.f1, src.value.f2, src.value.f3, src.value.f4" + ");"; String actualQuery = mergeQueries(true, false, true) .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); - System.out.println(actualQuery); assertEquals(expectedQuery, actualQuery); } @@ -177,24 +190,39 @@ public void testDeleteQueryWithPartitionTime() { public void testDeleteQueryWithoutPartitionTime() { String expectedQuery = "MERGE " + table(DESTINATION_TABLE) + " " - + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " - + "FROM " + table(INTERMEDIATE_TABLE) + " x " - + "WHERE batchNumber=" + BATCH_NUMBER + " " - + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " - + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key AND `src`.value IS NULL " - + "WHEN MATCHED " - + "THEN DELETE " - + "WHEN NOT MATCHED " - + "THEN INSERT (" - + KEY + ", " - + "f1, f2, f3, f4) " - + "VALUES (" - + "`src`.key, " - + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + "USING (" + + "SELECT batch.key AS key, value " + + "FROM (" + + "SELECT src.i, src.key FROM (" + + "SELECT ARRAY_AGG(" + + "x ORDER BY i DESC LIMIT 1" + + ")[OFFSET(0)] src " + + "FROM (" + + "SELECT * FROM " + table(INTERMEDIATE_TABLE) + " " + + "WHERE batchNumber=" + BATCH_NUMBER + + ") x " + + "WHERE x.value IS NULL " + + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) AS deletes " + + "RIGHT JOIN (" + + "SELECT * FROM " + table(INTERMEDIATE_TABLE) + " " + + "WHERE batchNumber=" + BATCH_NUMBER + + ") AS batch " + + "USING (key) " + + "WHERE deletes.i IS NULL OR batch.i >= deletes.i " + + "ORDER BY batch.i ASC) AS src " + + "ON `" + DESTINATION_TABLE.getTable() + "`." + KEY + "=src.key AND src.value IS NULL " + + "WHEN MATCHED " + + "THEN DELETE " + + "WHEN NOT MATCHED AND src.value IS NOT NULL " + + "THEN INSERT (" + + KEY + ", " + + "f1, f2, f3, f4) " + + "VALUES (" + + "src.key, " + + "src.value.f1, src.value.f2, src.value.f3, src.value.f4" + ");"; String actualQuery = mergeQueries(false, false, true) .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); - System.out.println(actualQuery); assertEquals(expectedQuery, actualQuery); } @@ -202,28 +230,27 @@ public void testDeleteQueryWithoutPartitionTime() { public void testUpsertDeleteQueryWithPartitionTime() { String expectedQuery = "MERGE " + table(DESTINATION_TABLE) + " " - + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " - + "FROM " + table(INTERMEDIATE_TABLE) + " x " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY i DESC LIMIT 1)[OFFSET(0)] src " + + "FROM " + table(INTERMEDIATE_TABLE) + " x " + "WHERE batchNumber=" + BATCH_NUMBER + " " - + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " - + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key " - + "WHEN MATCHED AND `src`.value IS NOT NULL " - + "THEN UPDATE SET f1=`src`.value.f1, f2=`src`.value.f2, f3=`src`.value.f3, f4=`src`.value.f4 " - + "WHEN MATCHED AND `src`.value IS NULL " - + "THEN DELETE " - + "WHEN NOT MATCHED AND `src`.value IS NOT NULL " - + "THEN INSERT (" - + KEY + ", " - + "_PARTITIONTIME, " - + "f1, f2, f3, f4) " - + "VALUES (" - + "`src`.key, " - + "CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), " - + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " + + "ON `" + DESTINATION_TABLE.getTable() + "`." + KEY + "=src.key " + + "WHEN MATCHED AND src.value IS NOT NULL " + + "THEN UPDATE SET f1=src.value.f1, f2=src.value.f2, f3=src.value.f3, f4=src.value.f4 " + + "WHEN MATCHED AND src.value IS NULL " + + "THEN DELETE " + + "WHEN NOT MATCHED AND src.value IS NOT NULL " + + "THEN INSERT (" + + KEY + ", " + + "_PARTITIONTIME, " + + "f1, f2, f3, f4) " + + "VALUES (" + + "src.key, " + + "CAST(CAST(DATE(src.partitionTime) AS DATE) AS TIMESTAMP), " + + "src.value.f1, src.value.f2, src.value.f3, src.value.f4" + ");"; String actualQuery = mergeQueries(true, true, true) .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); - System.out.println(actualQuery); assertEquals(expectedQuery, actualQuery); } @@ -231,38 +258,35 @@ public void testUpsertDeleteQueryWithPartitionTime() { public void testUpsertDeleteQueryWithoutPartitionTime() { String expectedQuery = "MERGE " + table(DESTINATION_TABLE) + " " - + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY i DESC LIMIT 1)[OFFSET(0)] src " + "FROM " + table(INTERMEDIATE_TABLE) + " x " + "WHERE batchNumber=" + BATCH_NUMBER + " " + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " - + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key " - + "WHEN MATCHED AND `src`.value IS NOT NULL " - + "THEN UPDATE SET f1=`src`.value.f1, f2=`src`.value.f2, f3=`src`.value.f3, f4=`src`.value.f4 " - + "WHEN MATCHED AND `src`.value IS NULL " + + "ON `" + DESTINATION_TABLE.getTable() + "`." + KEY + "=src.key " + + "WHEN MATCHED AND src.value IS NOT NULL " + + "THEN UPDATE SET f1=src.value.f1, f2=src.value.f2, f3=src.value.f3, f4=src.value.f4 " + + "WHEN MATCHED AND src.value IS NULL " + "THEN DELETE " - + "WHEN NOT MATCHED AND `src`.value IS NOT NULL " + + "WHEN NOT MATCHED AND src.value IS NOT NULL " + "THEN INSERT (" + KEY + ", " + "f1, f2, f3, f4) " + "VALUES (" - + "`src`.key, " - + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + "src.key, " + + "src.value.f1, src.value.f2, src.value.f3, src.value.f4" + ");"; String actualQuery = mergeQueries(false, true, true) .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); - System.out.println(actualQuery); assertEquals(expectedQuery, actualQuery); } @Test public void testBatchClearQuery() { - String expectedQuery = + String expectedQuery = "DELETE FROM " + table(INTERMEDIATE_TABLE) + " WHERE batchNumber <= " + BATCH_NUMBER + " AND _PARTITIONTIME IS NOT NULL;"; // No difference in batch clearing between upsert, delete, and both, or with or without partition time - String actualQuery = mergeQueries(false, false, false) - .batchClearQuery(INTERMEDIATE_TABLE, BATCH_NUMBER); - System.out.println(actualQuery); + String actualQuery = MergeQueries.batchClearQuery(INTERMEDIATE_TABLE, BATCH_NUMBER); assertEquals(expectedQuery, actualQuery); } diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/BaseConnectorIT.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/BaseConnectorIT.java new file mode 100644 index 000000000..980ec295b --- /dev/null +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/BaseConnectorIT.java @@ -0,0 +1,352 @@ +package com.wepay.kafka.connect.bigquery.integration; + +/* + * Copyright 2016 WePay, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableResult; +import com.wepay.kafka.connect.bigquery.BigQueryHelper; +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.runtime.AbstractStatus; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.cloud.bigquery.LegacySQLTypeName.BOOLEAN; +import static com.google.cloud.bigquery.LegacySQLTypeName.BYTES; +import static com.google.cloud.bigquery.LegacySQLTypeName.DATE; +import static com.google.cloud.bigquery.LegacySQLTypeName.FLOAT; +import static com.google.cloud.bigquery.LegacySQLTypeName.INTEGER; +import static com.google.cloud.bigquery.LegacySQLTypeName.STRING; +import static com.google.cloud.bigquery.LegacySQLTypeName.TIMESTAMP; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.assertTrue; + +@Category(IntegrationTest.class) +public abstract class BaseConnectorIT { + private static final Logger logger = LoggerFactory.getLogger(BaseConnectorIT.class); + + private static final String KEYFILE_ENV_VAR = "KCBQ_TEST_KEYFILE"; + private static final String PROJECT_ENV_VAR = "KCBQ_TEST_PROJECT"; + private static final String DATASET_ENV_VAR = "KCBQ_TEST_DATASET"; + private static final String KEYSOURCE_ENV_VAR = "KCBQ_TEST_KEYSOURCE"; + + protected static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(10); + protected static final long COMMIT_MAX_DURATION_MS = TimeUnit.MINUTES.toMillis(5); + protected static final long OFFSETS_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); + protected static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60); + + protected EmbeddedConnectCluster connect; + private Admin kafkaAdminClient; + + protected void startConnect() { + Map workerProps = new HashMap<>(); + workerProps.put( + WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, Long.toString(OFFSET_COMMIT_INTERVAL_MS)); + // Allow per-connector consumer configuration for throughput testing + workerProps.put( + WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All"); + + connect = new EmbeddedConnectCluster.Builder() + .name("kcbq-connect-cluster") + .workerProps(workerProps) + .build(); + + // start the clusters + connect.start(); + + kafkaAdminClient = connect.kafka().createAdminClient(); + + // the exception handler installed by the embedded zookeeper instance is noisy and unnecessary + Thread.setDefaultUncaughtExceptionHandler((t, e) -> { }); + } + + protected void stopConnect() { + if (kafkaAdminClient != null) { + kafkaAdminClient.close(); + kafkaAdminClient = null; + } + + // stop all Connect, Kafka and Zk threads. + if (connect != null) { + connect.stop(); + connect = null; + } + } + + protected Map baseConnectorProps(int tasksMax) { + Map result = new HashMap<>(); + + result.put(CONNECTOR_CLASS_CONFIG, "BigQuerySinkConnector"); + result.put(TASKS_MAX_CONFIG, Integer.toString(tasksMax)); + + result.put(BigQuerySinkConfig.PROJECT_CONFIG, project()); + result.put(BigQuerySinkConfig.DEFAULT_DATASET_CONFIG, dataset()); + result.put(BigQuerySinkConfig.KEYFILE_CONFIG, keyFile()); + result.put(BigQuerySinkConfig.KEY_SOURCE_CONFIG, keySource()); + + return result; + } + + protected BigQuery newBigQuery() { + return new BigQueryHelper() + .setKeySource(keySource()) + .connect(project(), keyFile()); + } + + protected void clearPriorTable(BigQuery bigQuery, String table) { + boolean deleted = bigQuery.delete(TableId.of(dataset(), table)); + if (deleted) { + logger.info("Deleted existing test table `{}`.`{}`", dataset(), table); + } + } + + protected void waitForCommittedRecords( + String connector, String topic, long numRecords, int numTasks + ) throws InterruptedException { + waitForCommittedRecords(connector, topic, numRecords, numTasks, COMMIT_MAX_DURATION_MS); + } + + protected void waitForCommittedRecords( + String connector, String topic, long numRecords, int numTasks, long timeoutMs) throws InterruptedException { + waitForCondition( + () -> { + long totalCommittedRecords = totalCommittedRecords(connector, topic); + if (totalCommittedRecords >= numRecords) { + return true; + } else { + // Check to make sure the connector is still running. If not, fail fast + assertTrue( + "Connector or one of its tasks failed during testing", + assertConnectorAndTasksRunning(connector, numTasks).orElse(false)); + logger.debug("Connector has only committed {} records for topic {} so far; {} expected", + totalCommittedRecords, topic, numRecords); + // Sleep here so as not to spam Kafka with list-offsets requests + Thread.sleep(OFFSET_COMMIT_INTERVAL_MS / 2); + return false; + } + }, + timeoutMs, + "Either the connector failed, or the message commit duration expired without all expected messages committed"); + } + + protected synchronized long totalCommittedRecords(String connector, String topic) throws TimeoutException, ExecutionException, InterruptedException { + // See https://github.com/apache/kafka/blob/f7c38d83c727310f4b0678886ba410ae2fae9379/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java + // for how the consumer group ID is constructed for sink connectors + Map offsets = kafkaAdminClient + .listConsumerGroupOffsets("connect-" + connector) + .partitionsToOffsetAndMetadata() + .get(OFFSETS_READ_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + logger.trace("Connector {} has so far committed offsets {}", connector, offsets); + + return offsets.entrySet().stream() + .filter(entry -> topic.equals(entry.getKey().topic())) + .mapToLong(entry -> entry.getValue().offset()) + .sum(); + } + + /** + * Read all rows from the given table. + * @param bigQuery used to connect to BigQuery + * @param tableName the table to read + * @param sortColumn a column to sort rows by (can use dot notation to refer to nested fields) + * @return a list of all rows from the table, in random order. + */ + protected List> readAllRows( + BigQuery bigQuery, String tableName, String sortColumn) throws InterruptedException { + + Table table = bigQuery.getTable(dataset(), tableName); + Schema schema = table.getDefinition().getSchema(); + + TableResult tableResult = bigQuery.query(QueryJobConfiguration.of(String.format( + "SELECT * FROM `%s`.`%s` ORDER BY %s ASC", + dataset(), + tableName, + sortColumn + ))); + + return StreamSupport.stream(tableResult.iterateAll().spliterator(), false) + .map(fieldValues -> convertRow(schema.getFields(), fieldValues)) + .collect(Collectors.toList()); + } + + private static List boxByteArray(byte[] bytes) { + Byte[] result = new Byte[bytes.length]; + for (int i = 0; i < bytes.length; i++) { + result[i] = bytes[i]; + } + return Arrays.asList(result); + } + + private Object convertField(Field fieldSchema, FieldValue field) { + if (field.isNull()) { + return null; + } + switch (field.getAttribute()) { + case PRIMITIVE: + if (fieldSchema.getType().equals(BOOLEAN)) { + return field.getBooleanValue(); + } else if (fieldSchema.getType().equals(BYTES)) { + // Do this in order for assertEquals() to work when this is an element of two compared + // lists + return boxByteArray(field.getBytesValue()); + } else if (fieldSchema.getType().equals(DATE)) { + DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + return LocalDate.parse(field.getStringValue(), dateFormatter) + .atStartOfDay(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(); + } else if (fieldSchema.getType().equals(FLOAT)) { + return field.getDoubleValue(); + } else if (fieldSchema.getType().equals(INTEGER)) { + return field.getLongValue(); + } else if (fieldSchema.getType().equals(STRING)) { + return field.getStringValue(); + } else if (fieldSchema.getType().equals(TIMESTAMP)) { + return field.getTimestampValue(); + } else { + throw new RuntimeException("Cannot convert primitive field type " + + fieldSchema.getType()); + } + case REPEATED: + List result = new ArrayList<>(); + for (FieldValue arrayField : field.getRepeatedValue()) { + result.add(convertField(fieldSchema, arrayField)); + } + return result; + case RECORD: + List recordSchemas = fieldSchema.getSubFields(); + List recordFields = field.getRecordValue(); + return convertRow(recordSchemas, recordFields); + default: + throw new RuntimeException("Unknown field attribute: " + field.getAttribute()); + } + } + + private List convertRow(List rowSchema, List row) { + List result = new ArrayList<>(); + assert (rowSchema.size() == row.size()); + + for (int i = 0; i < rowSchema.size(); i++) { + result.add(convertField(rowSchema.get(i), row.get(i))); + } + + return result; + } + + /** + * Wait up to {@link #CONNECTOR_STARTUP_DURATION_MS maximum time limit} for the connector with the given + * name to start the specified number of tasks. + * + * @param name the name of the connector + * @param numTasks the minimum number of tasks that are expected + * @return the time this method discovered the connector has started, in milliseconds past epoch + * @throws InterruptedException if this was interrupted + */ + protected long waitForConnectorToStart(String name, int numTasks) throws InterruptedException { + TestUtils.waitForCondition( + () -> assertConnectorAndTasksRunning(name, numTasks).orElse(false), + CONNECTOR_STARTUP_DURATION_MS, + "Connector tasks did not start in time." + ); + return System.currentTimeMillis(); + } + + /** + * Confirm that a connector with an exact number of tasks is running. + * + * @param connectorName the connector + * @param numTasks the minimum number of tasks + * @return true if the connector and tasks are in RUNNING state; false otherwise + */ + protected Optional assertConnectorAndTasksRunning(String connectorName, int numTasks) { + try { + ConnectorStateInfo info = connect.connectorStatus(connectorName); + boolean result = info != null + && info.tasks().size() >= numTasks + && info.connector().state().equals(AbstractStatus.State.RUNNING.toString()) + && info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); + return Optional.of(result); + } catch (Exception e) { + logger.error("Could not check connector state info.", e); + return Optional.empty(); + } + } + + private String readEnvVar(String var) { + String result = System.getenv(var); + if (result == null) { + throw new IllegalStateException(String.format( + "Environment variable '%s' must be supplied to run integration tests", + var)); + } + return result; + } + + private String readEnvVar(String var, String defaultVal) { + return System.getenv().getOrDefault(var, defaultVal); + } + + protected String keyFile() { + return readEnvVar(KEYFILE_ENV_VAR); + } + + protected String project() { + return readEnvVar(PROJECT_ENV_VAR); + } + + protected String dataset() { + return readEnvVar(DATASET_ENV_VAR); + } + + protected String keySource() { + return readEnvVar(KEYSOURCE_ENV_VAR, BigQuerySinkConfig.KEY_SOURCE_DEFAULT); + } +} diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/UpsertDeleteBigQuerySinkConnectorIT.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/UpsertDeleteBigQuerySinkConnectorIT.java new file mode 100644 index 000000000..67c30a3f2 --- /dev/null +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/UpsertDeleteBigQuerySinkConnectorIT.java @@ -0,0 +1,408 @@ +package com.wepay.kafka.connect.bigquery.integration; + +/* + * Copyright 2016 WePay, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import com.google.cloud.bigquery.BigQuery; +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; +import com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.SinkConnectorConfig; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.junit.Assert.assertEquals; + +@Category(IntegrationTest.class) +public class UpsertDeleteBigQuerySinkConnectorIT extends BaseConnectorIT { + + private static final Logger logger = LoggerFactory.getLogger(UpsertDeleteBigQuerySinkConnectorIT.class); + + private static final String CONNECTOR_NAME = "kcbq-sink-connector"; + private static final long NUM_RECORDS_PRODUCED = 20; + private static final int TASKS_MAX = 3; + private static final String KAFKA_FIELD_NAME = "kafkaKey"; + + private BigQuery bigQuery; + + @Before + public void setup() { + bigQuery = newBigQuery(); + startConnect(); + } + + @After + public void close() { + bigQuery = null; + stopConnect(); + } + + private Map upsertDeleteProps( + boolean upsert, + boolean delete, + long mergeRecordsThreshold) { + if (!upsert && !delete) { + throw new IllegalArgumentException("At least one of upsert or delete must be enabled"); + } + + Map result = new HashMap<>(); + + // use the JSON converter with schemas enabled + result.put(KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + result.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + + if (upsert) { + result.put(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG, "true"); + } + if (delete) { + result.put(BigQuerySinkConfig.DELETE_ENABLED_CONFIG, "true"); + } + + // Hardcode merge flushes to just use number of records for now, as it's more deterministic and + // faster to test + result.put(BigQuerySinkConfig.MERGE_INTERVAL_MS_CONFIG, "-1"); + result.put(BigQuerySinkConfig.MERGE_RECORDS_THRESHOLD_CONFIG, Long.toString(mergeRecordsThreshold)); + + result.put(BigQuerySinkConfig.KAFKA_KEY_FIELD_NAME_CONFIG, KAFKA_FIELD_NAME); + + return result; + } + + @Test + public void testUpsert() throws Throwable { + // create topic in Kafka + final String topic = "test-upsert"; + // Make sure each task gets to read from at least one partition + connect.kafka().createTopic(topic, TASKS_MAX); + + final String table = "test_upsert"; + clearPriorTable(bigQuery, table); + + // setup props for the sink connector + Map props = baseConnectorProps(TASKS_MAX); + props.put(SinkConnectorConfig.TOPICS_CONFIG, topic); + + props.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); + props.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, IdentitySchemaRetriever.class.getName()); + props.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "true"); + + // Enable only upsert and not delete, and merge flush every other record + props.putAll(upsertDeleteProps(true, false, 2)); + + // start a sink connector + connect.configureConnector(CONNECTOR_NAME, props); + + // wait for tasks to spin up + waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX); + + // Instantiate the converters we'll use to send records to the connector + Converter keyConverter = converter(true); + Converter valueConverter = converter(false); + + // Send records to Kafka + for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { + // Each pair of records will share a key. Only the second record of each pair should be + // present in the table at the end of the test + String kafkaKey = key(keyConverter, topic, i / 2); + String kafkaValue = value(valueConverter, topic, i, false); + logger.debug("Sending message with key '{}' and value '{}' to topic '{}'", kafkaKey, kafkaValue, topic); + connect.kafka().produce(topic, kafkaKey, kafkaValue); + } + + // wait for tasks to write to BigQuery and commit offsets for their records + waitForCommittedRecords(CONNECTOR_NAME, topic, NUM_RECORDS_PRODUCED, TASKS_MAX); + + List> allRows = readAllRows(bigQuery, table, KAFKA_FIELD_NAME + ".k1"); + List> expectedRows = LongStream.range(0, NUM_RECORDS_PRODUCED / 2) + .mapToObj(i -> Arrays.asList( + "another string", + (i - 1) % 3 == 0, + (i * 2 + 1) / 0.69, + Collections.singletonList(i))) + .collect(Collectors.toList()); + assertEquals(expectedRows, allRows); + } + + @Test + public void testDelete() throws Throwable { + // create topic in Kafka + final String topic = "test-delete"; + // Make sure each task gets to read from at least one partition + connect.kafka().createTopic(topic, TASKS_MAX); + + final String table = "test_delete"; + clearPriorTable(bigQuery, table); + + // setup props for the sink connector + Map props = baseConnectorProps(TASKS_MAX); + props.put(SinkConnectorConfig.TOPICS_CONFIG, topic); + + props.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); + props.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, IdentitySchemaRetriever.class.getName()); + props.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "true"); + + // Enable only delete and not upsert, and merge flush every other record + props.putAll(upsertDeleteProps(false, true, 2)); + + // start a sink connector + connect.configureConnector(CONNECTOR_NAME, props); + + // wait for tasks to spin up + waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX); + + // Instantiate the converters we'll use to send records to the connector + Converter keyConverter = converter(true); + Converter valueConverter = converter(false); + + // Send records to Kafka + for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { + // Each pair of records will share a key. Because upsert is not enabled, no deduplication will take place + // and, unless a tombstone is written for that key, both will be inserted + String kafkaKey = key(keyConverter, topic, i / 2); + // Every fourth record will be a tombstone, so every record pair with an odd-numbered key will be dropped + String kafkaValue = value(valueConverter, topic, i, i % 4 == 3); + logger.debug("Sending message with key '{}' and value '{}' to topic '{}'", kafkaKey, kafkaValue, topic); + connect.kafka().produce(topic, kafkaKey, kafkaValue); + } + + // wait for tasks to write to BigQuery and commit offsets for their records + waitForCommittedRecords(CONNECTOR_NAME, topic, NUM_RECORDS_PRODUCED, TASKS_MAX); + + // Since we have multiple rows per key, order by key and the f3 field (which should be + // monotonically increasing in insertion order) + List> allRows = readAllRows(bigQuery, table, KAFKA_FIELD_NAME + ".k1, f3"); + List> expectedRows = LongStream.range(0, NUM_RECORDS_PRODUCED) + .filter(i -> i % 4 < 2) + .mapToObj(i -> Arrays.asList( + i % 4 == 0 ? "a string" : "another string", + i % 3 == 0, + i / 0.69, + Collections.singletonList(i * 2 / 4))) + .collect(Collectors.toList()); + assertEquals(expectedRows, allRows); + } + + @Test + public void testUpsertDelete() throws Throwable { + // create topic in Kafka + final String topic = "test-upsert-delete"; + // Make sure each task gets to read from at least one partition + connect.kafka().createTopic(topic, TASKS_MAX); + + final String table = "test_upsert_delete"; + clearPriorTable(bigQuery, table); + + // setup props for the sink connector + Map props = baseConnectorProps(TASKS_MAX); + props.put(SinkConnectorConfig.TOPICS_CONFIG, topic); + + props.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); + props.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, IdentitySchemaRetriever.class.getName()); + props.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "true"); + + // Enable upsert and delete, and merge flush every other record + props.putAll(upsertDeleteProps(true, true, 2)); + + // start a sink connector + connect.configureConnector(CONNECTOR_NAME, props); + + // wait for tasks to spin up + waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX); + + // Instantiate the converters we'll use to send records to the connector + Converter keyConverter = converter(true); + Converter valueConverter = converter(false); + + // Send records to Kafka + for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { + // Each pair of records will share a key. Only the second record of each pair should be + // present in the table at the end of the test + String kafkaKey = key(keyConverter, topic, i / 2); + // Every fourth record will be a tombstone, so every record pair with an odd-numbered key will be dropped + String kafkaValue = value(valueConverter, topic, i, i % 4 == 3); + logger.debug("Sending message with key '{}' and value '{}' to topic '{}'", kafkaKey, kafkaValue, topic); + connect.kafka().produce(topic, kafkaKey, kafkaValue); + } + + // wait for tasks to write to BigQuery and commit offsets for their records + waitForCommittedRecords(CONNECTOR_NAME, topic, NUM_RECORDS_PRODUCED, TASKS_MAX); + + // Since we have multiple rows per key, order by key and the f3 field (which should be + // monotonically increasing in insertion order) + List> allRows = readAllRows(bigQuery, table, KAFKA_FIELD_NAME + ".k1, f3"); + List> expectedRows = LongStream.range(0, NUM_RECORDS_PRODUCED) + .filter(i -> i % 4 == 1) + .mapToObj(i -> Arrays.asList( + "another string", + i % 3 == 0, + i / 0.69, + Collections.singletonList(i * 2 / 4))) + .collect(Collectors.toList()); + assertEquals(expectedRows, allRows); + } + + @Test + @Ignore("Skipped during regular testing; comment-out annotation to run") + public void testUpsertDeleteHighThroughput() throws Throwable { + final long numRecords = 1_000_000L; + final int numPartitions = 10; + final int tasksMax = 1; + + // create topic in Kafka + final String topic = "test-upsert-delete-throughput"; + connect.kafka().createTopic(topic, numPartitions); + + final String table = "test_upsert_delete_throughput"; + clearPriorTable(bigQuery, table); + + // Instantiate the converters we'll use to send records to the connector + Converter keyConverter = converter(true); + Converter valueConverter = converter(false); + + // Send records to Kafka. Pre-populate Kafka before starting the connector as we want to measure + // the connector's throughput cleanly + logger.info("Pre-populating Kafka with test data"); + for (int i = 0; i < numRecords; i++) { + if (i % 10000 == 0) { + logger.info("{} records produced so far", i); + } + // Each pair of records will share a key. Only the second record of each pair should be + // present in the table at the end of the test + String kafkaKey = key(keyConverter, topic, i / 2); + // Every fourth record will be a tombstone, so every record pair with an odd-numbered key will + // be dropped + String kafkaValue = value(valueConverter, topic, i, i % 4 == 3); + connect.kafka().produce(topic, kafkaKey, kafkaValue); + } + + // setup props for the sink connector + // use a single task + Map props = baseConnectorProps(tasksMax); + props.put(SinkConnectorConfig.TOPICS_CONFIG, topic); + // Allow for at most 10,000 records per call to poll + props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + "10000"); + // Try to get at least 1 MB per partition with each request + props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + + ConsumerConfig.FETCH_MIN_BYTES_CONFIG, + Integer.toString(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES * numPartitions)); + // Wait up to one second for each batch to reach the requested size + props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, + "1000" + ); + + props.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); + props.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, IdentitySchemaRetriever.class.getName()); + props.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "true"); + + // Enable upsert and delete, and schedule ten total flushes + props.putAll(upsertDeleteProps(true, true, numRecords / 10)); + + logger.info("Pre-population complete; creating connector"); + long start = System.currentTimeMillis(); + // start a sink connector + connect.configureConnector(CONNECTOR_NAME, props); + + // wait for tasks to spin up + waitForConnectorToStart(CONNECTOR_NAME, tasksMax); + + // wait for tasks to write to BigQuery and commit offsets for their records + waitForCommittedRecords(CONNECTOR_NAME, topic, numRecords, tasksMax, TimeUnit.MINUTES.toMillis(10)); + long time = System.currentTimeMillis() - start; + logger.info("All records have been read and committed by the connector; " + + "total time from start to finish: {} seconds", time / 1000.0); + + // Since we have multiple rows per key, order by key and the f3 field (which should be + // monotonically increasing in insertion order) + List> allRows = readAllRows(bigQuery, table, KAFKA_FIELD_NAME + ".k1, f3"); + List> expectedRows = LongStream.range(0, numRecords) + .filter(i -> i % 4 == 1) + .mapToObj(i -> Arrays.asList( + "another string", + i % 3 == 0, + i / 0.69, + Collections.singletonList(i * 2 / 4))) + .collect(Collectors.toList()); + assertEquals(expectedRows, allRows); + } + + private Converter converter(boolean isKey) { + Map props = new HashMap<>(); + props.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true); + Converter result = new JsonConverter(); + result.configure(props, isKey); + return result; + } + + private String key(Converter converter, String topic, long iteration) { + final Schema schema = SchemaBuilder.struct() + .field("k1", Schema.INT64_SCHEMA) + .build(); + + final Struct struct = new Struct(schema) + .put("k1", iteration); + + return new String(converter.fromConnectData(topic, schema, struct)); + } + + private String value(Converter converter, String topic, long iteration, boolean tombstone) { + final Schema schema = SchemaBuilder.struct() + .optional() + .field("f1", Schema.STRING_SCHEMA) + .field("f2", Schema.BOOLEAN_SCHEMA) + .field("f3", Schema.FLOAT64_SCHEMA) + .build(); + + if (tombstone) { + return new String(converter.fromConnectData(topic, schema, null)); + } + + final Struct struct = new Struct(schema) + .put("f1", iteration % 2 == 0 ? "a string" : "another string") + .put("f2", iteration % 3 == 0) + .put("f3", iteration / 0.69); + + return new String(converter.fromConnectData(topic, schema, struct)); + } +} diff --git a/kcbq-connector/src/test/resources/log4j.properties b/kcbq-connector/src/test/resources/log4j.properties new file mode 100644 index 000000000..60a108fb4 --- /dev/null +++ b/kcbq-connector/src/test/resources/log4j.properties @@ -0,0 +1,20 @@ +log4j.rootLogger=INFO, stdout + +# Send the logs to the console. +# +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout + +connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n +log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern} +log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern} + +# These are used in the log4j properties file that ships by default with Connect +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.reflections=ERROR + +# We see a lot of WARN-level messages from this class when a table is created by the connector and +# then written to shortly after. No need for that much noise during routine tests +log4j.logger.com.wepay.kafka.connect.bigquery.write.batch.TableWriter=ERROR +# Logs a message at INFO on every http request +log4j.logger.org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster=WARN \ No newline at end of file