Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-264: Implement upsert/delete #283

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,41 @@ adjusting flags given to the Avro Console Producer and tweaking the config setti

## Integration Testing the Connector

There is a legacy Docker-based integration test for the connector, and newer integration tests that
programmatically instantiate an embedded Connect cluster.

### Embedded integration tests

Currently these tests only verify the connector's upsert/delete feature. They should eventually
replace all of the existing Docker-based tests.

#### Configuring the tests

You must supply the following environment variables in order to run the tests:

- `$KCBQ_TEST_PROJECT`: The name of the BigQuery project to use for the test
- `$KCBQ_TEST_DATASET`: The name of the BigQuery dataset to use for the test
- `$KCBQ_TEST_KEYFILE`: The key (either file or raw contents) used to authenticate with BigQuery
during the test

Additionally, the `$KCBQ_TEST_KEYSOURCE` variable can be supplied to specify whether the value of
`$KCBQ_TEST_KEYFILE` are a path to a key file (if set to `FILE`) or the raw contents of a key file
(if set to `JSON`). The default is `FILE`.

#### Running the Integration Tests

```bash
./gradlew embeddedIntegrationTest
```

### Docker-based tests

> **NOTE**: You must have [Docker] installed and running on your machine in order to run integration
tests for the connector.

This all takes place in the `kcbq-connector` directory.

### How Integration Testing Works
#### How Integration Testing Works

Integration tests run by creating [Docker] instances for [Zookeeper], [Kafka], [Schema Registry],
and the BigQuery Connector itself, then verifying the results using a [JUnit] test.
Expand All @@ -148,7 +177,7 @@ The project and dataset they write to, as well as the specific JSON key file the
specified by command-line flag, environment variable, or configuration file — the exact details of
each can be found by running the integration test script with the `-?` flag.

### Data Corruption Concerns
#### Data Corruption Concerns

In order to ensure the validity of each test, any table that will be written to in the course of
integration testing is preemptively deleted before the connector is run. This will only be an issue
Expand All @@ -161,7 +190,7 @@ tests will corrupt any existing data that is already on your machine, and there
free up any of your ports that might currently be in use by real instances of the programs that are
faked in the process of testing.

### Running the Integration Tests
#### Running the Integration Tests

Running the series of integration tests is easy:

Expand All @@ -176,7 +205,7 @@ the `--help` flag.
> **NOTE:** You must have a recent version of [boot2docker], [Docker Machine], [Docker], etc.
installed. Older versions will hang when cleaning containers, and linking doesn't work properly.

### Adding New Integration Tests
#### Adding New Integration Tests

Adding an integration test is a little more involved, and consists of two major steps: specifying
Avro data to be sent to Kafka, and specifying via JUnit test how to verify that such data made
Expand Down
28 changes: 27 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ project.ext {
ioConfluentVersion = '5.5.0'
junitVersion = '4.12'
kafkaVersion = '2.5.0'
kafkaScalaVersion = '2.12' // For integration testing only
mockitoVersion = '3.2.4'
slf4jVersion = '1.6.1'
}
Expand Down Expand Up @@ -153,6 +154,26 @@ project(':kcbq-connector') {
}
}

test {
useJUnit {
// Exclude embedded integration tests from normal testing since they require BigQuery
// credentials and can take a while
excludeCategories 'org.apache.kafka.test.IntegrationTest'
}
}

task embeddedIntegrationTest(type: Test) {
useJUnit {
includeCategories 'org.apache.kafka.test.IntegrationTest'
}

// Enable logging for integration tests
testLogging {
outputs.upToDateWhen {false}
showStandardStreams = true
}
}

task integrationTestPrep() {
dependsOn 'integrationTestTablePrep'
dependsOn 'integrationTestBucketPrep'
Expand Down Expand Up @@ -226,7 +247,12 @@ project(':kcbq-connector') {
"junit:junit:$junitVersion",
"org.mockito:mockito-core:$mockitoVersion",
"org.mockito:mockito-inline:$mockitoVersion",
"org.apache.kafka:connect-api:$kafkaVersion"
"org.apache.kafka:kafka_$kafkaScalaVersion:$kafkaVersion",
"org.apache.kafka:kafka_$kafkaScalaVersion:$kafkaVersion:test",
"org.apache.kafka:kafka-clients:$kafkaVersion:test",
"org.apache.kafka:connect-api:$kafkaVersion",
"org.apache.kafka:connect-runtime:$kafkaVersion",
"org.apache.kafka:connect-runtime:$kafkaVersion:test",
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface SchemaRetriever {
* {@link org.apache.kafka.connect.sink.SinkConnector#start(Map)} method.
* @param properties The configuration settings of the connector.
*/
public void configure(Map<String, String> properties);
void configure(Map<String, String> properties);

/**
* Retrieve the most current schema for the given topic.
Expand All @@ -25,13 +25,30 @@ public interface SchemaRetriever {
* @param schemaType The type of kafka schema, either "value" or "key".
* @return The Schema for the given table.
*/
public Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType);
Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType);

/**
* Set the last seen schema for a given topic
* Set the last seen schema for a given topic.
* @param table The table that will be created.
* @param topic The topic to retrieve a schema for.
* @param schema The last seen Kafka Connect Schema
*/
public void setLastSeenSchema(TableId table, String topic, Schema schema);
void setLastSeenSchema(TableId table, String topic, Schema schema);

/**
* Set the last seen schema for a given topic and record type.
* In order to preserve backwards compatibility, will invoke
* {@link #setLastSeenSchema(TableId, String, Schema)} by default if the schema is for a record
* value, and otherwise be a no-op.
* @param table The table that will be created.
* @param topic The topic to retrieve a schema for.
* @param schema The last seen Kafka Connect Schema.
* @param schemaType The type of the schema (key or value).
* @since 1.7.0
*/
default void setLastSeenSchema(TableId table, String topic, Schema schema, KafkaSchemaRecordType schemaType) {
if (KafkaSchemaRecordType.VALUE.equals(schemaType)) {
setLastSeenSchema(table, topic, schema, KafkaSchemaRecordType.VALUE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;

import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;

import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
Expand Down Expand Up @@ -143,12 +144,12 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
logger.trace("connector.taskConfigs()");
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
// Copy configProperties so that tasks can't interfere with each others' configurations
HashMap<String, String> taskConfig = new HashMap<>(configProperties);
if (i == 0 && !config.getList(BigQuerySinkConfig.ENABLE_BATCH_CONFIG).isEmpty()) {
// if batch loading is enabled, configure first task to do the GCS -> BQ loading
taskConfig.put(GCS_BQ_TASK_CONFIG_KEY, "true");
}
taskConfig.put(BigQuerySinkTaskConfig.TASK_ID_CONFIG, Integer.toString(i));
taskConfigs.add(taskConfig);
}
return taskConfigs;
Expand Down
Loading