Skip to content

Commit

Permalink
CC-8804: Integration tests for upsert/delete (#16)
Browse files Browse the repository at this point in the history
* GH-264: Add embedded integration test for upsert/delete

Also fixes a bug in the schema retriever logic where key schemas were not being
reported to schema retrievers, and improves shutdown logic so that tasks can
stop gracefully when requested by the framework.

* GH-264: Clean up shutdown logic, make logs easier to read

* GC-264: Retain prior shutdown behavior when upsert/delete is not enabled

* GC-264: Refactor merge query construction logic

* GC-264: Fix infinite recursion bug in SchemaRetriever interface
  • Loading branch information
C0urante committed Sep 10, 2020
1 parent d8aae10 commit 57fb6cf
Show file tree
Hide file tree
Showing 18 changed files with 1,395 additions and 331 deletions.
37 changes: 33 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,41 @@ adjusting flags given to the Avro Console Producer and tweaking the config setti

## Integration Testing the Connector

There is a legacy Docker-based integration test for the connector, and newer integration tests that
programmatically instantiate an embedded Connect cluster.

### Embedded integration tests

Currently these tests only verify the connector's upsert/delete feature. They should eventually
replace all of the existing Docker-based tests.

#### Configuring the tests

You must supply the following environment variables in order to run the tests:

- `$KCBQ_TEST_PROJECT`: The name of the BigQuery project to use for the test
- `$KCBQ_TEST_DATASET`: The name of the BigQuery dataset to use for the test
- `$KCBQ_TEST_KEYFILE`: The key (either file or raw contents) used to authenticate with BigQuery
during the test

Additionally, the `$KCBQ_TEST_KEYSOURCE` variable can be supplied to specify whether the value of
`$KCBQ_TEST_KEYFILE` are a path to a key file (if set to `FILE`) or the raw contents of a key file
(if set to `JSON`). The default is `FILE`.

#### Running the Integration Tests

```bash
./gradlew embeddedIntegrationTest
```

### Docker-based tests

> **NOTE**: You must have [Docker] installed and running on your machine in order to run integration
tests for the connector.

This all takes place in the `kcbq-connector` directory.

### How Integration Testing Works
#### How Integration Testing Works

Integration tests run by creating [Docker] instances for [Zookeeper], [Kafka], [Schema Registry],
and the BigQuery Connector itself, then verifying the results using a [JUnit] test.
Expand All @@ -148,7 +177,7 @@ The project and dataset they write to, as well as the specific JSON key file the
specified by command-line flag, environment variable, or configuration file — the exact details of
each can be found by running the integration test script with the `-?` flag.

### Data Corruption Concerns
#### Data Corruption Concerns

In order to ensure the validity of each test, any table that will be written to in the course of
integration testing is preemptively deleted before the connector is run. This will only be an issue
Expand All @@ -161,7 +190,7 @@ tests will corrupt any existing data that is already on your machine, and there
free up any of your ports that might currently be in use by real instances of the programs that are
faked in the process of testing.

### Running the Integration Tests
#### Running the Integration Tests

Running the series of integration tests is easy:

Expand All @@ -176,7 +205,7 @@ the `--help` flag.
> **NOTE:** You must have a recent version of [boot2docker], [Docker Machine], [Docker], etc.
installed. Older versions will hang when cleaning containers, and linking doesn't work properly.

### Adding New Integration Tests
#### Adding New Integration Tests

Adding an integration test is a little more involved, and consists of two major steps: specifying
Avro data to be sent to Kafka, and specifying via JUnit test how to verify that such data made
Expand Down
28 changes: 27 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ project.ext {
ioConfluentVersion = '5.5.0'
junitVersion = '4.12'
kafkaVersion = '2.5.0'
kafkaScalaVersion = '2.12' // For integration testing only
mockitoVersion = '3.2.4'
slf4jVersion = '1.6.1'
}
Expand Down Expand Up @@ -153,6 +154,26 @@ project(':kcbq-connector') {
}
}

test {
useJUnit {
// Exclude embedded integration tests from normal testing since they require BigQuery
// credentials and can take a while
excludeCategories 'org.apache.kafka.test.IntegrationTest'
}
}

task embeddedIntegrationTest(type: Test) {
useJUnit {
includeCategories 'org.apache.kafka.test.IntegrationTest'
}

// Enable logging for integration tests
testLogging {
outputs.upToDateWhen {false}
showStandardStreams = true
}
}

task integrationTestPrep() {
dependsOn 'integrationTestTablePrep'
dependsOn 'integrationTestBucketPrep'
Expand Down Expand Up @@ -226,7 +247,12 @@ project(':kcbq-connector') {
"junit:junit:$junitVersion",
"org.mockito:mockito-core:$mockitoVersion",
"org.mockito:mockito-inline:$mockitoVersion",
"org.apache.kafka:connect-api:$kafkaVersion"
"org.apache.kafka:kafka_$kafkaScalaVersion:$kafkaVersion",
"org.apache.kafka:kafka_$kafkaScalaVersion:$kafkaVersion:test",
"org.apache.kafka:kafka-clients:$kafkaVersion:test",
"org.apache.kafka:connect-api:$kafkaVersion",
"org.apache.kafka:connect-runtime:$kafkaVersion",
"org.apache.kafka:connect-runtime:$kafkaVersion:test",
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ public interface SchemaRetriever {
* @return The value Schema for the given record.
*/
Schema retrieveValueSchema(SinkRecord record);

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.intTable;

/**
* A {@link SinkTask} used to translate Kafka Connect {@link SinkRecord SinkRecords} into BigQuery
* {@link RowToInsert RowToInserts} and subsequently write them to BigQuery.
Expand Down Expand Up @@ -131,6 +134,11 @@ public BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever,

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (upsertDelete) {
throw new ConnectException("This connector cannot perform upsert/delete on older versions of "
+ "the Connect framework; please upgrade to version 0.10.2.0 or later");
}

try {
executor.awaitCurrentTasks();
} catch (InterruptedException err) {
Expand Down Expand Up @@ -459,33 +467,36 @@ private void maybeStartMergeFlushTask() {

@Override
public void stop() {
maybeStopExecutor(loadExecutor, "load executor");
maybeStopExecutor(executor, "table write executor");
if (upsertDelete) {
mergeBatches.intermediateTables().forEach(table -> {
logger.debug("Deleting {}", intTable(table));
getBigQuery().delete(table);
});
}

logger.trace("task.stop()");
}

private void maybeStopExecutor(ExecutorService executor, String executorName) {
if (executor == null) {
return;
}

try {
if (upsertDelete) {
mergeBatches.intermediateTables().forEach(table -> {
logger.debug("Deleting intermediate table {}", table);
getBigQuery().delete(table);
});
}
} finally {
try {
logger.trace("Forcibly shutting down {}", executorName);
executor.shutdownNow();
} else {
logger.trace("Requesting shutdown for {}", executorName);
executor.shutdown();
executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
if (loadExecutor != null) {
try {
logger.info("Attempting to shut down load executor.");
loadExecutor.shutdown();
loadExecutor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
logger.warn("Could not shut down load executor within {}s.",
EXECUTOR_SHUTDOWN_TIMEOUT_SEC);
}
}
} catch (InterruptedException ex) {
logger.warn("{} active threads are still executing tasks {}s after shutdown is signaled.",
executor.getActiveCount(), EXECUTOR_SHUTDOWN_TIMEOUT_SEC);
} finally {
logger.trace("task.stop()");
}
logger.trace("Awaiting termination of {}", executorName);
executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
logger.trace("Shut down {} successfully", executorName);
} catch (Exception e) {
logger.warn("Failed to shut down {}", executorName, e);
}
}

Expand Down
Loading

0 comments on commit 57fb6cf

Please sign in to comment.