From c8509b50ba145fcdbe31e75297504afec68c2e81 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Tue, 16 Jun 2020 13:23:11 -0700 Subject: [PATCH] GH-264: Update existing unit tests --- .../bigquery/config/BigQuerySinkConfig.java | 2 -- .../bigquery/config/BigQuerySinkTaskConfig.java | 16 ++++++++-------- .../connect/bigquery/SinkPropertiesFactory.java | 1 + .../bigquery/write/row/BigQueryWriterTest.java | 1 + .../bigquery/write/row/GCSToBQWriterTest.java | 1 + 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index 8cfc9c272..949305965 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -51,8 +51,6 @@ * Base class for connector and task configs; contains properties shared between the two of them. */ public class BigQuerySinkConfig extends AbstractConfig { - private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConfig.class); - // Values taken from https://github.com/apache/kafka/blob/1.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L33 public static final String TOPICS_CONFIG = SinkConnector.TOPICS_CONFIG; private static final ConfigDef.Type TOPICS_TYPE = ConfigDef.Type.LIST; diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java index 1ba973dc7..e9e35bc3b 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java @@ -32,7 +32,6 @@ * Class for task-specific configuration properties. */ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig { - private static final ConfigDef config; private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkTaskConfig.class); public static final String THREAD_POOL_SIZE_CONFIG = "threadPoolSize"; @@ -125,8 +124,13 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig { public static final ConfigDef.Importance TASK_ID_IMPORTANCE = ConfigDef.Importance.LOW; private static final String TASK_ID_DOC = "A unique for each task created by the connector"; - static { - config = BigQuerySinkConfig.getConfig() + /** + * Return a ConfigDef object used to define this config's fields. + * + * @return A ConfigDef object used to define this config's fields. + */ + public static ConfigDef getConfig() { + return BigQuerySinkConfig.getConfig() .define( THREAD_POOL_SIZE_CONFIG, THREAD_POOL_SIZE_TYPE, @@ -251,15 +255,11 @@ private void checkClusteringConfigs() { } } - public static ConfigDef getConfig() { - return config; - } - /** * @param properties A Map detailing configuration properties and their respective values. */ public BigQuerySinkTaskConfig(Map properties) { - super(config, properties); + super(getConfig(), properties); checkSchemaUpdates(); checkPartitionConfigs(); checkClusteringConfigs(); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java index 9e20cc73a..9198bb8c2 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java @@ -19,6 +19,7 @@ import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import java.util.HashMap; import java.util.Map; diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java index d1d4d9477..d1ab8b77b 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java @@ -298,6 +298,7 @@ private Map makeProperties(String bigqueryRetry, Map properties = propertiesFactory.getProperties(); properties.put(BigQuerySinkTaskConfig.BIGQUERY_RETRY_CONFIG, bigqueryRetry); properties.put(BigQuerySinkTaskConfig.BIGQUERY_RETRY_WAIT_CONFIG, bigqueryRetryWait); + properties.put(BigQuerySinkTaskConfig.TASK_ID_CONFIG, "6"); properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic); properties.put(BigQuerySinkConfig.DEFAULT_DATASET_CONFIG, dataset); return properties; diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java index 435328b55..bce4ff9bf 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java @@ -163,6 +163,7 @@ private Map makeProperties(String bigqueryRetry, Map properties = propertiesFactory.getProperties(); properties.put(BigQuerySinkTaskConfig.BIGQUERY_RETRY_CONFIG, bigqueryRetry); properties.put(BigQuerySinkTaskConfig.BIGQUERY_RETRY_WAIT_CONFIG, bigqueryRetryWait); + properties.put(BigQuerySinkTaskConfig.TASK_ID_CONFIG, "9"); properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic); properties.put(BigQuerySinkConfig.DEFAULT_DATASET_CONFIG, dataset); // gcs config