From 5901eb066b1c2594068609e86e24657e1dae1093 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 17 Jun 2020 14:50:00 -0700 Subject: [PATCH] GH-264: Address some review comments, add unit tests --- .../connect/bigquery/BigQuerySinkTask.java | 7 +- .../kafka/connect/bigquery/MergeQueries.java | 141 +++++---- .../kafka/connect/bigquery/SchemaManager.java | 3 +- .../bigquery/config/BigQuerySinkConfig.java | 104 ++++--- .../bigquery/utils/SinkRecordConverter.java | 7 +- .../bigquery/write/batch/MergeBatches.java | 19 +- .../bigquery/BigQuerySinkTaskTest.java | 157 ++++++++-- .../connect/bigquery/MergeQueriesTest.java | 272 ++++++++++++++++++ 8 files changed, 569 insertions(+), 141 deletions(-) create mode 100644 kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 96679b5f3..ca30f382c 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -90,7 +90,6 @@ public class BigQuerySinkTask extends SinkTask { private boolean upsertDelete; private MergeBatches mergeBatches; private MergeQueries mergeQueries; - private long mergeRecordsThreshold; private TopicPartitionManager topicPartitionManager; @@ -218,7 +217,7 @@ public void put(Collection records) { Map tableWriterBuilders = new HashMap<>(); for (SinkRecord record : records) { - if (record.value() != null) { + if (record.value() != null || config.getBoolean(config.DELETE_ENABLED_CONFIG)) { PartitionedTableId table = getRecordTable(record); if (!tableWriterBuilders.containsKey(table)) { TableWriterBuilder tableWriterBuilder; @@ -400,12 +399,10 @@ public void start(Map properties) { Instant.now().toEpochMilli() ); mergeBatches = new MergeBatches(intermediateTableSuffix); - mergeRecordsThreshold = config.getLong(config.MERGE_RECORDS_THRESHOLD_CONFIG); } bigQueryWriter = getBigQueryWriter(); gcsToBQWriter = getGcsWriter(); - recordConverter = getConverter(config); executor = new KCBQThreadPoolExecutor(config, new LinkedBlockingQueue<>()); topicPartitionManager = new TopicPartitionManager(); useMessageTimeDatePartitioning = @@ -421,6 +418,8 @@ public void start(Map properties) { new MergeQueries(config, mergeBatches, executor, getBigQuery(), getSchemaManager(), context); maybeStartMergeFlushTask(); } + + recordConverter = getConverter(config); } private void startGCSToBQLoadTask() { diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java index df9272af7..168e3e1b9 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java @@ -24,7 +24,9 @@ import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableId; +import com.google.common.annotations.VisibleForTesting; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; +import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor; import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkTaskContext; @@ -32,7 +34,6 @@ import org.slf4j.LoggerFactory; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -45,28 +46,50 @@ public class MergeQueries { private static final Logger logger = LoggerFactory.getLogger(MergeQueries.class); private final String keyFieldName; - private final boolean usePartitionDecorator; + private final boolean insertPartitionTime; private final boolean upsertEnabled; private final boolean deleteEnabled; private final MergeBatches mergeBatches; - private final ExecutorService executor; + private final KCBQThreadPoolExecutor executor; private final BigQuery bigQuery; private final SchemaManager schemaManager; private final SinkTaskContext context; public MergeQueries(BigQuerySinkTaskConfig config, MergeBatches mergeBatches, - ExecutorService executor, + KCBQThreadPoolExecutor executor, BigQuery bigQuery, SchemaManager schemaManager, SinkTaskContext context) { - this.keyFieldName = config.getKafkaKeyFieldName().orElseThrow(() -> - new ConnectException("Kafka key field must be configured when upsert/delete is enabled") + this( + config.getKafkaKeyFieldName().orElseThrow(() -> + new ConnectException("Kafka key field must be configured when upsert/delete is enabled") + ), + config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG), + config.getBoolean(config.UPSERT_ENABLED_CONFIG), + config.getBoolean(config.DELETE_ENABLED_CONFIG), + mergeBatches, + executor, + bigQuery, + schemaManager, + context ); - this.usePartitionDecorator = config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG); - this.upsertEnabled = config.getBoolean(config.UPSERT_ENABLED_CONFIG); - this.deleteEnabled = config.getBoolean(config.DELETE_ENABLED_CONFIG); + } + @VisibleForTesting + MergeQueries(String keyFieldName, + boolean insertPartitionTime, + boolean upsertEnabled, + boolean deleteEnabled, + MergeBatches mergeBatches, + KCBQThreadPoolExecutor executor, + BigQuery bigQuery, + SchemaManager schemaManager, + SinkTaskContext context) { + this.keyFieldName = keyFieldName; + this.insertPartitionTime = insertPartitionTime; + this.upsertEnabled = upsertEnabled; + this.deleteEnabled = deleteEnabled; this.mergeBatches = mergeBatches; this.executor = executor; this.bigQuery = bigQuery; @@ -85,56 +108,50 @@ public void mergeFlush(TableId intermediateTable) { logger.trace("Triggering merge flush from intermediate table {} to destination table {} for batch {}", intermediateTable, destinationTable, batchNumber); - executor.submit(() -> { - // If there are rows to flush in this batch, flush them - if (mergeBatches.prepareToFlush(intermediateTable, batchNumber)) { - try { - logger.debug("Running merge query on batch {} from intermediate table {}", - batchNumber, intermediateTable); - String mergeFlushQuery = mergeFlushQuery(intermediateTable, destinationTable, batchNumber); - logger.trace(mergeFlushQuery); - bigQuery.query(QueryJobConfiguration.of(mergeFlushQuery)); - logger.trace("Merge from intermediate table {} to destination table {} completed", - intermediateTable, destinationTable); - } catch (Throwable t) { - logger.warn("Failed on merge flush from intermediate table {} to destination table {}", - intermediateTable, destinationTable, t); - throw new ConnectException( - String.format("Failed to perform merge flush from intermediate table %s to destination table %s", - intermediateTable, - destinationTable), - t); - } - - logger.debug("Recording flush success for batch {} from {}", - batchNumber, intermediateTable); - mergeBatches.recordSuccessfulFlush(intermediateTable, batchNumber); - - // Commit those offsets ASAP - context.requestCommit(); - - logger.info("Completed merge flush of batch {} from {} to {}", - batchNumber, intermediateTable, destinationTable); - } - - // After, regardless of whether we flushed or not, clean up old batches from the intermediate - // table. Some rows may be several batches old but still in the table if they were in the - // streaming buffer during the last purge. + executor.execute(() -> { try { - logger.trace("Clearing batches from {} on back from intermediate table {}", batchNumber, intermediateTable); - String tableClearQuery = clearBatchQuery(intermediateTable, batchNumber); - logger.trace(tableClearQuery); - bigQuery.query(QueryJobConfiguration.of(tableClearQuery)); - } catch (Throwable t) { - logger.error("Failed to clear old batches from intermediate table {}", intermediateTable, t); - throw new ConnectException( - String.format("Failed to clear old batches from intermediate table %s", - intermediateTable), - t); + mergeFlush(intermediateTable, destinationTable, batchNumber); + } catch (InterruptedException e) { + throw new ConnectException(String.format( + "Interrupted while performing merge flush of batch %d from %s to %s", + batchNumber, intermediateTable, destinationTable)); } }); } + private void mergeFlush( + TableId intermediateTable, TableId destinationTable, int batchNumber + ) throws InterruptedException{ + // If there are rows to flush in this batch, flush them + if (mergeBatches.prepareToFlush(intermediateTable, batchNumber)) { + logger.debug("Running merge query on batch {} from intermediate table {}", + batchNumber, intermediateTable); + String mergeFlushQuery = mergeFlushQuery(intermediateTable, destinationTable, batchNumber); + logger.trace(mergeFlushQuery); + bigQuery.query(QueryJobConfiguration.of(mergeFlushQuery)); + logger.trace("Merge from intermediate table {} to destination table {} completed", + intermediateTable, destinationTable); + + logger.debug("Recording flush success for batch {} from {}", + batchNumber, intermediateTable); + mergeBatches.recordSuccessfulFlush(intermediateTable, batchNumber); + + // Commit those offsets ASAP + context.requestCommit(); + + logger.info("Completed merge flush of batch {} from {} to {}", + batchNumber, intermediateTable, destinationTable); + } + + // After, regardless of whether we flushed or not, clean up old batches from the intermediate + // table. Some rows may be several batches old but still in the table if they were in the + // streaming buffer during the last purge. + logger.trace("Clearing batches from {} on back from intermediate table {}", batchNumber, intermediateTable); + String batchClearQuery = batchClearQuery(intermediateTable, batchNumber); + logger.trace(batchClearQuery); + bigQuery.query(QueryJobConfiguration.of(batchClearQuery)); + } + /* upsert+delete: @@ -214,13 +231,14 @@ THEN INSERT ( keyFields = listFields( - intermediateSchema.getFields().get(keyFieldName).getSubFields(), + intermediateSchema.getFields().get(srcKey).getSubFields(), srcKey + "." ); List dstValueFields = intermediateSchema.getFields().get(INTERMEDIATE_TABLE_VALUE_FIELD_NAME).getSubFields() @@ -235,8 +253,8 @@ private String mergeFlushQuery(TableId intermediateTable, TableId destinationTab .map(field -> field + "=`src`." + INTERMEDIATE_TABLE_VALUE_FIELD_NAME + "." + field) .collect(Collectors.toList()); - String partitionTimeField = usePartitionDecorator ? "_PARTITIONTIME, " : ""; - String partitionTimeValue = usePartitionDecorator + String partitionTimeField = insertPartitionTime ? "_PARTITIONTIME, " : ""; + String partitionTimeValue = insertPartitionTime ? "CAST(CAST(DATE(`src`." + INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME + ") AS DATE) AS TIMESTAMP), " : ""; @@ -299,9 +317,9 @@ private String mergeFlushQuery(TableId intermediateTable, TableId destinationTab // Assume all rows have non-null values and upsert them all return mergeOpening .append("ON ").append(keysMatch).append(" ") - .append("WHEN MATCHED") + .append("WHEN MATCHED ") .append(updateClause).append(" ") - .append("WHEN NOT MATCHED") + .append("WHEN NOT MATCHED ") .append(insertClause) .append(";") .toString(); @@ -311,7 +329,8 @@ private String mergeFlushQuery(TableId intermediateTable, TableId destinationTab } // DELETE FROM `` WHERE batchNumber <= AND _PARTITIONTIME IS NOT NULL; - private static String clearBatchQuery(TableId intermediateTable, int batchNumber) { + @VisibleForTesting + static String batchClearQuery(TableId intermediateTable, int batchNumber) { return new StringBuilder("DELETE FROM `").append(intermediateTable.getDataset()).append("`.`").append(intermediateTable.getTable()).append("` ") .append("WHERE ") .append(INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD).append(" <= ").append(batchNumber).append(" ") diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java index b0968759b..fb49450aa 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java @@ -410,7 +410,8 @@ private List getIntermediateSchemaFields(com.google.cloud.bigquery.Schema com.google.cloud.bigquery.Schema keySchema = schemaConverter.convertSchema(kafkaKeySchema); Field kafkaKeyField = Field.newBuilder(MergeQueries.INTERMEDIATE_TABLE_KEY_FIELD_NAME, LegacySQLTypeName.RECORD, keySchema.getFields()) - .setMode(Field.Mode.REQUIRED).build(); + .setMode(Field.Mode.REQUIRED) + .build(); result.add(kafkaKeyField); Field partitionTimeField = Field diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index 949305965..0a2dc8775 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -45,6 +45,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.Optional; /** @@ -473,62 +476,71 @@ public static ConfigDef getConfig() { * @param props sink configuration properties */ public static void validate(Map props) { - final boolean hasTopicsConfig = hasTopicsConfig(props); - final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props); + final boolean hasTopicsConfig = hasTopicsConfig(props); + final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props); - if (hasTopicsConfig && hasTopicsRegexConfig) { - throw new ConfigException(TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG + - " are mutually exclusive options, but both are set."); + if (hasTopicsConfig && hasTopicsRegexConfig) { + throw new ConfigException(TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG + + " are mutually exclusive options, but both are set."); + } + + if (!hasTopicsConfig && !hasTopicsRegexConfig) { + throw new ConfigException("Must configure one of " + + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG); + } + + if (upsertDeleteEnabled(props)) { + if (gcsBatchLoadingEnabled(props)) { + throw new ConfigException("Cannot enable both upsert/delete and GCS batch loading"); } - if (!hasTopicsConfig && !hasTopicsRegexConfig) { - throw new ConfigException("Must configure one of " + - TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG); + String mergeIntervalStr = Optional.ofNullable(props.get(MERGE_INTERVAL_MS_CONFIG)) + .map(String::trim) + .orElse(Long.toString(MERGE_INTERVAL_MS_DEFAULT)); + String mergeRecordsThresholdStr = Optional.ofNullable(props.get(MERGE_RECORDS_THRESHOLD_CONFIG)) + .map(String::trim) + .orElse(Long.toString(MERGE_RECORDS_THRESHOLD_DEFAULT)); + if ("-1".equals(mergeIntervalStr) && "-1".equals(mergeRecordsThresholdStr)) { + throw new ConfigException(MERGE_INTERVAL_MS_CONFIG + " and " + + MERGE_RECORDS_THRESHOLD_CONFIG + " cannot both be -1"); } - if (upsertDeleteEnabled(props)) { - String mergeIntervalStr = Optional.ofNullable(props.get(MERGE_INTERVAL_MS_CONFIG)) - .map(String::trim) - .orElse(Long.toString(MERGE_INTERVAL_MS_DEFAULT)); - String mergeRecordsThresholdStr = Optional.ofNullable(props.get(MERGE_RECORDS_THRESHOLD_CONFIG)) - .map(String::trim) - .orElse(Long.toString(MERGE_RECORDS_THRESHOLD_DEFAULT)); - if ("-1".equals(mergeIntervalStr) && "-1".equals(mergeRecordsThresholdStr)) { - throw new ConfigException(MERGE_INTERVAL_MS_CONFIG + " and " - + MERGE_RECORDS_THRESHOLD_CONFIG + " cannot both be -1"); - } - - if ("0".equals(mergeIntervalStr)) { - throw new ConfigException(MERGE_INTERVAL_MS_CONFIG, mergeIntervalStr, "cannot be zero"); - } - if ("0".equals(mergeRecordsThresholdStr)) { - throw new ConfigException(MERGE_RECORDS_THRESHOLD_CONFIG, mergeRecordsThresholdStr, "cannot be zero"); - } - - String kafkaKeyFieldStr = props.get(KAFKA_KEY_FIELD_NAME_CONFIG); - if (kafkaKeyFieldStr == null || kafkaKeyFieldStr.trim().isEmpty()) { - throw new ConfigException(KAFKA_KEY_FIELD_NAME_CONFIG + " must be specified when " - + UPSERT_ENABLED_CONFIG + " and/or " + DELETE_ENABLED_CONFIG + " are set to true"); - } + if ("0".equals(mergeIntervalStr)) { + throw new ConfigException(MERGE_INTERVAL_MS_CONFIG, mergeIntervalStr, "cannot be zero"); + } + if ("0".equals(mergeRecordsThresholdStr)) { + throw new ConfigException(MERGE_RECORDS_THRESHOLD_CONFIG, mergeRecordsThresholdStr, "cannot be zero"); } - } - public static boolean hasTopicsConfig(Map props) { - String topicsStr = props.get(TOPICS_CONFIG); - return topicsStr != null && !topicsStr.trim().isEmpty(); + String kafkaKeyFieldStr = props.get(KAFKA_KEY_FIELD_NAME_CONFIG); + if (kafkaKeyFieldStr == null || kafkaKeyFieldStr.trim().isEmpty()) { + throw new ConfigException(KAFKA_KEY_FIELD_NAME_CONFIG + " must be specified when " + + UPSERT_ENABLED_CONFIG + " and/or " + DELETE_ENABLED_CONFIG + " are set to true"); + } } + } - public static boolean hasTopicsRegexConfig(Map props) { - String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG); - return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty(); - } + public static boolean hasTopicsConfig(Map props) { + String topicsStr = props.get(TOPICS_CONFIG); + return topicsStr != null && !topicsStr.trim().isEmpty(); + } - public static boolean upsertDeleteEnabled(Map props) { - String upsertStr = props.get(UPSERT_ENABLED_CONFIG); - String deleteStr = props.get(DELETE_ENABLED_CONFIG); - return Boolean.TRUE.toString().equalsIgnoreCase(upsertStr) - || Boolean.TRUE.toString().equalsIgnoreCase(deleteStr); - } + public static boolean hasTopicsRegexConfig(Map props) { + String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG); + return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty(); + } + + public static boolean upsertDeleteEnabled(Map props) { + String upsertStr = props.get(UPSERT_ENABLED_CONFIG); + String deleteStr = props.get(DELETE_ENABLED_CONFIG); + return Boolean.TRUE.toString().equalsIgnoreCase(upsertStr) + || Boolean.TRUE.toString().equalsIgnoreCase(deleteStr); + } + + public static boolean gcsBatchLoadingEnabled(Map props) { + String batchLoadStr = props.get(ENABLE_BATCH_CONFIG); + return batchLoadStr != null && !batchLoadStr.isEmpty(); + } /** * Returns the keyfile diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java index 4af836142..0cd9ac389 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java @@ -84,8 +84,6 @@ private Map getUpsertDeleteRow(SinkRecord record, TableId table) ? null : recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE); - Map convertedKey = recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY); - if (convertedValue != null) { config.getKafkaDataFieldName().ifPresent( fieldName -> convertedValue.put(fieldName, KafkaDataBuilder.buildKafkaDataRecord(record)) @@ -101,6 +99,11 @@ private Map getUpsertDeleteRow(SinkRecord record, TableId table) mergeQueries.mergeFlush(table); } + Map convertedKey = recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY); + if (convertedKey == null) { + throw new ConnectException("Record keys must be non-null when upsert/delete is enabled"); + } + result.put(MergeQueries.INTERMEDIATE_TABLE_KEY_FIELD_NAME, convertedKey); result.put(MergeQueries.INTERMEDIATE_TABLE_VALUE_FIELD_NAME, convertedValue); if (usePartitionDecorator && useMessageTimeDatePartitioning) { diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java index 92558a251..28eabc3a3 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java @@ -20,6 +20,7 @@ import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.TableId; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Maps; @@ -45,12 +46,24 @@ public class MergeBatches { private static final Logger logger = LoggerFactory.getLogger(MergeBatches.class); private static final long STREAMING_BUFFER_AVAILABILITY_WAIT_MS = 10_000L; + private static long streamingBufferAvailabilityWaitMs = STREAMING_BUFFER_AVAILABILITY_WAIT_MS; + private final String intermediateTableSuffix; private final BiMap intermediateToDestinationTables; private final ConcurrentMap batchNumbers; private final ConcurrentMap> batches; private final Map offsets; + @VisibleForTesting + public static void setStreamingBufferAvailabilityWait(long waitMs) { + streamingBufferAvailabilityWaitMs = waitMs; + } + + @VisibleForTesting + public static void resetStreamingBufferAvailabilityWait() { + streamingBufferAvailabilityWaitMs = STREAMING_BUFFER_AVAILABILITY_WAIT_MS; + } + public MergeBatches(String intermediateTableSuffix) { this.intermediateTableSuffix = intermediateTableSuffix; @@ -243,10 +256,10 @@ public boolean prepareToFlush(TableId intermediateTable, int batchNumber) { try { logger.trace( - "Waiting {} seconds before running merge query on batch {} from intermediate table {} " + "Waiting {}ms before running merge query on batch {} from intermediate table {} " + "in order to ensure that all rows are available in the streaming buffer", - STREAMING_BUFFER_AVAILABILITY_WAIT_MS, batchNumber, intermediateTable); - Thread.sleep(STREAMING_BUFFER_AVAILABILITY_WAIT_MS); + streamingBufferAvailabilityWaitMs, batchNumber, intermediateTable); + Thread.sleep(streamingBufferAvailabilityWaitMs); } catch (InterruptedException e) { logger.warn("Interrupted while waiting before merge flushing batch {} for intermediate table {}", batchNumber, intermediateTable); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index b24e7f40a..51646dcc3 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; @@ -31,10 +32,12 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryError; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.storage.Storage; import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; @@ -42,6 +45,7 @@ import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException; +import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.record.TimestampType; @@ -52,24 +56,42 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.mockito.Captor; +import java.util.Arrays; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public class BigQuerySinkTaskTest { private static SinkTaskPropertiesFactory propertiesFactory; + private static AtomicLong spoofedRecordOffset = new AtomicLong(); + @BeforeClass public static void initializePropertiesFactory() { propertiesFactory = new SinkTaskPropertiesFactory(); } + @Before + public void setUp() { + MergeBatches.setStreamingBufferAvailabilityWait(0); + spoofedRecordOffset.set(0); + } + + @After + public void cleanUp() { + MergeBatches.resetStreamingBufferAvailabilityWait(); + } + @Test public void testSimplePut() { final String topic = "test-topic"; @@ -172,8 +194,6 @@ public void testEmptyRecordPut() { testTask.put(Collections.singletonList(emptyRecord)); } - @Captor ArgumentCaptor captor; - @Test public void testPutWhenPartitioningOnMessageTime() { final String topic = "test-topic"; @@ -314,6 +334,76 @@ public void testPutWhenPartitioningOnMessageTimeWhenNoTimestampType() { TimestampType.NO_TIMESTAMP_TYPE, null))); } + @Test + public void testPutWithUpsertDelete() throws Exception { + final String topic = "test-topic"; + final String key = "kafkaKey"; + final String value = "recordValue"; + + Map properties = propertiesFactory.getProperties(); + properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic); + properties.put(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG, "true"); + properties.put(BigQuerySinkConfig.DELETE_ENABLED_CONFIG, "true"); + properties.put(BigQuerySinkConfig.MERGE_INTERVAL_MS_CONFIG, "-1"); + properties.put(BigQuerySinkConfig.MERGE_RECORDS_THRESHOLD_CONFIG, "2"); + properties.put(BigQuerySinkConfig.KAFKA_KEY_FIELD_NAME_CONFIG, key); + + BigQuery bigQuery = mock(BigQuery.class); + Storage storage = mock(Storage.class); + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + + InsertAllResponse insertAllResponse = mock(InsertAllResponse.class); + when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); + when(insertAllResponse.hasErrors()).thenReturn(false); + + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + Field keyField = Field.of(key, LegacySQLTypeName.STRING); + Field valueField = Field.of(value, LegacySQLTypeName.STRING); + com.google.cloud.bigquery.Schema intermediateSchema = com.google.cloud.bigquery.Schema.of( + Field.newBuilder(MergeQueries.INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD, LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.REQUIRED) + .build(), + Field.newBuilder(MergeQueries.INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME, LegacySQLTypeName.TIMESTAMP) + .setMode(Field.Mode.NULLABLE) + .build(), + Field.newBuilder(MergeQueries.INTERMEDIATE_TABLE_KEY_FIELD_NAME, LegacySQLTypeName.RECORD, keyField) + .setMode(Field.Mode.REQUIRED) + .build(), + Field.newBuilder(MergeQueries.INTERMEDIATE_TABLE_VALUE_FIELD_NAME, LegacySQLTypeName.RECORD, valueField) + .build() + ); + when(schemaManager.cachedSchema(any())).thenReturn(intermediateSchema); + + CountDownLatch executedMerges = new CountDownLatch(2); + CountDownLatch executedBatchClears = new CountDownLatch(2); + + when(bigQuery.query(any(QueryJobConfiguration.class))).then(invocation -> { + String query = invocation.getArgument(0, QueryJobConfiguration.class).getQuery(); + if (query.startsWith("MERGE")) { + executedMerges.countDown(); + } else if (query.startsWith("DELETE")) { + executedBatchClears.countDown(); + } + return null; + }); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + + // Insert a few regular records and one tombstone record + testTask.put(Arrays.asList( + spoofSinkRecord(topic, key, "4761", "value", "message text", TimestampType.NO_TIMESTAMP_TYPE, null), + spoofSinkRecord(topic, key, "489", "value", "other message text", TimestampType.NO_TIMESTAMP_TYPE, null), + spoofSinkRecord(topic, key, "28980", "value", "more message text", TimestampType.NO_TIMESTAMP_TYPE, null), + spoofSinkRecord(topic, key, "4761", null, null, TimestampType.NO_TIMESTAMP_TYPE, null) + )); + + assertTrue("Merge queries should be executed", executedMerges.await(5, TimeUnit.SECONDS)); + assertTrue("Batch clears should be executed", executedBatchClears.await(1, TimeUnit.SECONDS)); + } + // It's important that the buffer be completely wiped after a call to flush, since any exception // thrown during flush causes Kafka Connect to not commit the offsets for any records sent to the // task since the last flush @@ -582,38 +672,57 @@ public void testStop() { } /** - * Utility method for spoofing InsertAllRequests that should be sent to a BigQuery object. - * @param table The table to write to. - * @param rows The rows to write. - * @return The spoofed InsertAllRequest. + * Utility method for spoofing SinkRecords that should be passed to SinkTask.put() + * @param topic The topic of the record. + * @param keyField The field name for the record key; may be null. + * @param key The content of the record key; may be null. + * @param valueField The field name for the record value; may be null + * @param value The content of the record value; may be null + * @param timestampType The type of timestamp embedded in the message + * @param timestamp The timestamp in milliseconds + * @return The spoofed SinkRecord. */ - public static InsertAllRequest buildExpectedInsertAllRequest( - TableId table, - InsertAllRequest.RowToInsert... rows) { - return InsertAllRequest.newBuilder(table, rows) - .setIgnoreUnknownValues(false) - .setSkipInvalidRows(false) - .build(); + public static SinkRecord spoofSinkRecord(String topic, String keyField, String key, + String valueField, String value, + TimestampType timestampType, Long timestamp) { + Schema basicKeySchema = null; + Struct basicKey = null; + if (keyField != null) { + basicKeySchema = SchemaBuilder + .struct() + .field(keyField, Schema.STRING_SCHEMA) + .build(); + basicKey = new Struct(basicKeySchema); + basicKey.put(keyField, key); + } + + Schema basicValueSchema = null; + Struct basicValue = null; + if (valueField != null) { + basicValueSchema = SchemaBuilder + .struct() + .field(valueField, Schema.STRING_SCHEMA) + .build(); + basicValue = new Struct(basicValueSchema); + basicValue.put(valueField, value); + } + + return new SinkRecord(topic, 0, basicKeySchema, basicKey, + basicValueSchema, basicValue, spoofedRecordOffset.getAndIncrement(), timestamp, timestampType); } /** * Utility method for spoofing SinkRecords that should be passed to SinkTask.put() * @param topic The topic of the record. - * @param value The content of the record. + * @param field The field name for the record value. + * @param value The content of the record value. * @param timestampType The type of timestamp embedded in the message * @param timestamp The timestamp in milliseconds * @return The spoofed SinkRecord. */ public static SinkRecord spoofSinkRecord(String topic, String field, String value, TimestampType timestampType, Long timestamp) { - Schema basicRowSchema = SchemaBuilder - .struct() - .field(field, Schema.STRING_SCHEMA) - .build(); - Struct basicRowValue = new Struct(basicRowSchema); - basicRowValue.put(field, value); - return new SinkRecord(topic, 0, null, null, - basicRowSchema, basicRowValue, 0, timestamp, timestampType); + return spoofSinkRecord(topic, null, null, field, value, timestampType, timestamp); } /** diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java new file mode 100644 index 000000000..9675df8c2 --- /dev/null +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java @@ -0,0 +1,272 @@ +package com.wepay.kafka.connect.bigquery; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableId; +import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor; +import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class MergeQueriesTest { + + private static final String KEY = "kafkaKey"; + + private static final int BATCH_NUMBER = 42; + private static final TableId DESTINATION_TABLE = TableId.of("ds1", "t"); + private static final TableId INTERMEDIATE_TABLE = TableId.of("ds1", "t_tmp_6_uuid_epoch"); + private static final Schema INTERMEDIATE_TABLE_SCHEMA = constructIntermediateTable(); + + @Mock private MergeBatches mergeBatches; + @Mock private KCBQThreadPoolExecutor executor; + @Mock private BigQuery bigQuery; + @Mock private SchemaManager schemaManager; + @Mock private SinkTaskContext context; + + @Before + public void setUp() { + when(schemaManager.cachedSchema(INTERMEDIATE_TABLE)).thenReturn(INTERMEDIATE_TABLE_SCHEMA); + } + + private MergeQueries mergeQueries(boolean insertPartitionTime, boolean upsert, boolean delete) { + return new MergeQueries( + KEY, insertPartitionTime, upsert, delete, mergeBatches, executor, bigQuery, schemaManager, context + ); + } + + private static Schema constructIntermediateTable() { + List fields = new ArrayList<>(); + + List valueFields = Arrays.asList( + Field.of("f1", LegacySQLTypeName.STRING), + Field.of("f2", LegacySQLTypeName.RECORD, + Field.of("nested_f1", LegacySQLTypeName.INTEGER) + ), + Field.of("f3", LegacySQLTypeName.BOOLEAN), + Field.of("f4", LegacySQLTypeName.BYTES) + ); + Field wrappedValueField = Field + .newBuilder(MergeQueries.INTERMEDIATE_TABLE_VALUE_FIELD_NAME, LegacySQLTypeName.RECORD, valueFields.toArray(new Field[0])) + .setMode(Field.Mode.NULLABLE) + .build(); + fields.add(wrappedValueField); + + List keyFields = Arrays.asList( + Field.of("k1", LegacySQLTypeName.STRING), + Field.of("k2", LegacySQLTypeName.RECORD, + Field.of("nested_k1", LegacySQLTypeName.RECORD, + Field.of("doubly_nested_k", LegacySQLTypeName.BOOLEAN) + ), + Field.of("nested_k2", LegacySQLTypeName.INTEGER) + ) + ); + Field kafkaKeyField = Field.newBuilder(MergeQueries.INTERMEDIATE_TABLE_KEY_FIELD_NAME, LegacySQLTypeName.RECORD, keyFields.toArray(new Field[0])) + .setMode(Field.Mode.REQUIRED) + .build(); + fields.add(kafkaKeyField); + + Field partitionTimeField = Field + .newBuilder(MergeQueries.INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME, LegacySQLTypeName.TIMESTAMP) + .setMode(Field.Mode.NULLABLE) + .build(); + fields.add(partitionTimeField); + + Field batchNumberField = Field + .newBuilder(MergeQueries.INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD, LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.REQUIRED) + .build(); + fields.add(batchNumberField); + + return Schema.of(fields); + } + + @Test + public void testUpsertQueryWithPartitionTime() { + String expectedQuery = + "MERGE " + table(DESTINATION_TABLE) + " " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " + + "FROM " + table(INTERMEDIATE_TABLE) + " x " + + "WHERE batchNumber=" + BATCH_NUMBER + " " + + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " + + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key " + + "WHEN MATCHED " + + "THEN UPDATE SET f1=`src`.value.f1, f2=`src`.value.f2, f3=`src`.value.f3, f4=`src`.value.f4 " + + "WHEN NOT MATCHED " + + "THEN INSERT (" + + KEY + ", " + + "_PARTITIONTIME, " + + "f1, f2, f3, f4) " + + "VALUES (" + + "`src`.key, " + + "CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), " + + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + ");"; + String actualQuery = mergeQueries(true, true, false) + .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); + System.out.println(actualQuery); + assertEquals(expectedQuery, actualQuery); + } + + @Test + public void testUpsertQueryWithoutPartitionTime() { + String expectedQuery = + "MERGE " + table(DESTINATION_TABLE) + " " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " + + "FROM " + table(INTERMEDIATE_TABLE) + " x " + + "WHERE batchNumber=" + BATCH_NUMBER + " " + + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " + + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key " + + "WHEN MATCHED " + + "THEN UPDATE SET f1=`src`.value.f1, f2=`src`.value.f2, f3=`src`.value.f3, f4=`src`.value.f4 " + + "WHEN NOT MATCHED " + + "THEN INSERT (" + + KEY + ", " + + "f1, f2, f3, f4) " + + "VALUES (" + + "`src`.key, " + + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + ");"; + String actualQuery = mergeQueries(false, true, false) + .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); + System.out.println(actualQuery); + assertEquals(expectedQuery, actualQuery); + } + + @Test + public void testDeleteQueryWithPartitionTime() { + String expectedQuery = + "MERGE " + table(DESTINATION_TABLE) + " " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " + + "FROM " + table(INTERMEDIATE_TABLE) + " x " + + "WHERE batchNumber=" + BATCH_NUMBER + " " + + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " + + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key AND `src`.value IS NULL " + + "WHEN MATCHED " + + "THEN DELETE " + + "WHEN NOT MATCHED " + + "THEN INSERT (" + + KEY + ", " + + "_PARTITIONTIME, " + + "f1, f2, f3, f4) " + + "VALUES (" + + "`src`.key, " + + "CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), " + + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + ");"; + String actualQuery = mergeQueries(true, false, true) + .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); + System.out.println(actualQuery); + assertEquals(expectedQuery, actualQuery); + } + + @Test + public void testDeleteQueryWithoutPartitionTime() { + String expectedQuery = + "MERGE " + table(DESTINATION_TABLE) + " " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " + + "FROM " + table(INTERMEDIATE_TABLE) + " x " + + "WHERE batchNumber=" + BATCH_NUMBER + " " + + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " + + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key AND `src`.value IS NULL " + + "WHEN MATCHED " + + "THEN DELETE " + + "WHEN NOT MATCHED " + + "THEN INSERT (" + + KEY + ", " + + "f1, f2, f3, f4) " + + "VALUES (" + + "`src`.key, " + + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + ");"; + String actualQuery = mergeQueries(false, false, true) + .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); + System.out.println(actualQuery); + assertEquals(expectedQuery, actualQuery); + } + + @Test + public void testUpsertDeleteQueryWithPartitionTime() { + String expectedQuery = + "MERGE " + table(DESTINATION_TABLE) + " " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " + + "FROM " + table(INTERMEDIATE_TABLE) + " x " + + "WHERE batchNumber=" + BATCH_NUMBER + " " + + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " + + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key " + + "WHEN MATCHED AND `src`.value IS NOT NULL " + + "THEN UPDATE SET f1=`src`.value.f1, f2=`src`.value.f2, f3=`src`.value.f3, f4=`src`.value.f4 " + + "WHEN MATCHED AND `src`.value IS NULL " + + "THEN DELETE " + + "WHEN NOT MATCHED AND `src`.value IS NOT NULL " + + "THEN INSERT (" + + KEY + ", " + + "_PARTITIONTIME, " + + "f1, f2, f3, f4) " + + "VALUES (" + + "`src`.key, " + + "CAST(CAST(DATE(`src`.partitionTime) AS DATE) AS TIMESTAMP), " + + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + ");"; + String actualQuery = mergeQueries(true, true, true) + .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); + System.out.println(actualQuery); + assertEquals(expectedQuery, actualQuery); + } + + @Test + public void testUpsertDeleteQueryWithoutPartitionTime() { + String expectedQuery = + "MERGE " + table(DESTINATION_TABLE) + " " + + "USING (SELECT * FROM (SELECT ARRAY_AGG(x ORDER BY partitionTime DESC LIMIT 1)[OFFSET(0)] src " + + "FROM " + table(INTERMEDIATE_TABLE) + " x " + + "WHERE batchNumber=" + BATCH_NUMBER + " " + + "GROUP BY key.k1, key.k2.nested_k1.doubly_nested_k, key.k2.nested_k2)) " + + "ON `" + DESTINATION_TABLE.getTable() + "`.kafkaKey=`src`.key " + + "WHEN MATCHED AND `src`.value IS NOT NULL " + + "THEN UPDATE SET f1=`src`.value.f1, f2=`src`.value.f2, f3=`src`.value.f3, f4=`src`.value.f4 " + + "WHEN MATCHED AND `src`.value IS NULL " + + "THEN DELETE " + + "WHEN NOT MATCHED AND `src`.value IS NOT NULL " + + "THEN INSERT (" + + KEY + ", " + + "f1, f2, f3, f4) " + + "VALUES (" + + "`src`.key, " + + "`src`.value.f1, `src`.value.f2, `src`.value.f3, `src`.value.f4" + + ");"; String actualQuery = mergeQueries(false, true, true) + .mergeFlushQuery(INTERMEDIATE_TABLE, DESTINATION_TABLE, BATCH_NUMBER); + System.out.println(actualQuery); + assertEquals(expectedQuery, actualQuery); + } + + @Test + public void testBatchClearQuery() { + String expectedQuery = + "DELETE FROM " + table(INTERMEDIATE_TABLE) + + " WHERE batchNumber <= " + BATCH_NUMBER + + " AND _PARTITIONTIME IS NOT NULL;"; + // No difference in batch clearing between upsert, delete, and both, or with or without partition time + String actualQuery = mergeQueries(false, false, false) + .batchClearQuery(INTERMEDIATE_TABLE, BATCH_NUMBER); + System.out.println(actualQuery); + assertEquals(expectedQuery, actualQuery); + } + + private String table(TableId table) { + return String.format("`%s`.`%s`", table.getDataset(), table.getTable()); + } +}