Skip to content

Commit

Permalink
GH-264: Update existing unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
C0urante committed Sep 9, 2020
1 parent e77c405 commit c8509b5
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
* Base class for connector and task configs; contains properties shared between the two of them.
*/
public class BigQuerySinkConfig extends AbstractConfig {
private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConfig.class);

// Values taken from https://github.com/apache/kafka/blob/1.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L33
public static final String TOPICS_CONFIG = SinkConnector.TOPICS_CONFIG;
private static final ConfigDef.Type TOPICS_TYPE = ConfigDef.Type.LIST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
* Class for task-specific configuration properties.
*/
public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
private static final ConfigDef config;
private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkTaskConfig.class);

public static final String THREAD_POOL_SIZE_CONFIG = "threadPoolSize";
Expand Down Expand Up @@ -125,8 +124,13 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
public static final ConfigDef.Importance TASK_ID_IMPORTANCE = ConfigDef.Importance.LOW;
private static final String TASK_ID_DOC = "A unique for each task created by the connector";

static {
config = BigQuerySinkConfig.getConfig()
/**
* Return a ConfigDef object used to define this config's fields.
*
* @return A ConfigDef object used to define this config's fields.
*/
public static ConfigDef getConfig() {
return BigQuerySinkConfig.getConfig()
.define(
THREAD_POOL_SIZE_CONFIG,
THREAD_POOL_SIZE_TYPE,
Expand Down Expand Up @@ -251,15 +255,11 @@ private void checkClusteringConfigs() {
}
}

public static ConfigDef getConfig() {
return config;
}

/**
* @param properties A Map detailing configuration properties and their respective values.
*/
public BigQuerySinkTaskConfig(Map<String, String> properties) {
super(config, properties);
super(getConfig(), properties);
checkSchemaUpdates();
checkPartitionConfigs();
checkClusteringConfigs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;

import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ private Map<String,String> makeProperties(String bigqueryRetry,
Map<String, String> properties = propertiesFactory.getProperties();
properties.put(BigQuerySinkTaskConfig.BIGQUERY_RETRY_CONFIG, bigqueryRetry);
properties.put(BigQuerySinkTaskConfig.BIGQUERY_RETRY_WAIT_CONFIG, bigqueryRetryWait);
properties.put(BigQuerySinkTaskConfig.TASK_ID_CONFIG, "6");
properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic);
properties.put(BigQuerySinkConfig.DEFAULT_DATASET_CONFIG, dataset);
return properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ private Map<String,String> makeProperties(String bigqueryRetry,
Map<String, String> properties = propertiesFactory.getProperties();
properties.put(BigQuerySinkTaskConfig.BIGQUERY_RETRY_CONFIG, bigqueryRetry);
properties.put(BigQuerySinkTaskConfig.BIGQUERY_RETRY_WAIT_CONFIG, bigqueryRetryWait);
properties.put(BigQuerySinkTaskConfig.TASK_ID_CONFIG, "9");
properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic);
properties.put(BigQuerySinkConfig.DEFAULT_DATASET_CONFIG, dataset);
// gcs config
Expand Down

0 comments on commit c8509b5

Please sign in to comment.