Skip to content

Commit

Permalink
GH-264: Address some review comments, add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
C0urante committed Sep 10, 2020
1 parent c8509b5 commit 5901eb0
Show file tree
Hide file tree
Showing 8 changed files with 569 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public class BigQuerySinkTask extends SinkTask {
private boolean upsertDelete;
private MergeBatches mergeBatches;
private MergeQueries mergeQueries;
private long mergeRecordsThreshold;

private TopicPartitionManager topicPartitionManager;

Expand Down Expand Up @@ -218,7 +217,7 @@ public void put(Collection<SinkRecord> records) {
Map<PartitionedTableId, TableWriterBuilder> tableWriterBuilders = new HashMap<>();

for (SinkRecord record : records) {
if (record.value() != null) {
if (record.value() != null || config.getBoolean(config.DELETE_ENABLED_CONFIG)) {
PartitionedTableId table = getRecordTable(record);
if (!tableWriterBuilders.containsKey(table)) {
TableWriterBuilder tableWriterBuilder;
Expand Down Expand Up @@ -400,12 +399,10 @@ public void start(Map<String, String> properties) {
Instant.now().toEpochMilli()
);
mergeBatches = new MergeBatches(intermediateTableSuffix);
mergeRecordsThreshold = config.getLong(config.MERGE_RECORDS_THRESHOLD_CONFIG);
}

bigQueryWriter = getBigQueryWriter();
gcsToBQWriter = getGcsWriter();
recordConverter = getConverter(config);
executor = new KCBQThreadPoolExecutor(config, new LinkedBlockingQueue<>());
topicPartitionManager = new TopicPartitionManager();
useMessageTimeDatePartitioning =
Expand All @@ -421,6 +418,8 @@ public void start(Map<String, String> properties) {
new MergeQueries(config, mergeBatches, executor, getBigQuery(), getSchemaManager(), context);
maybeStartMergeFlushTask();
}

recordConverter = getConverter(config);
}

private void startGCSToBQLoadTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableId;
import com.google.common.annotations.VisibleForTesting;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor;
import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -45,28 +46,50 @@ public class MergeQueries {
private static final Logger logger = LoggerFactory.getLogger(MergeQueries.class);

private final String keyFieldName;
private final boolean usePartitionDecorator;
private final boolean insertPartitionTime;
private final boolean upsertEnabled;
private final boolean deleteEnabled;
private final MergeBatches mergeBatches;
private final ExecutorService executor;
private final KCBQThreadPoolExecutor executor;
private final BigQuery bigQuery;
private final SchemaManager schemaManager;
private final SinkTaskContext context;

public MergeQueries(BigQuerySinkTaskConfig config,
MergeBatches mergeBatches,
ExecutorService executor,
KCBQThreadPoolExecutor executor,
BigQuery bigQuery,
SchemaManager schemaManager,
SinkTaskContext context) {
this.keyFieldName = config.getKafkaKeyFieldName().orElseThrow(() ->
new ConnectException("Kafka key field must be configured when upsert/delete is enabled")
this(
config.getKafkaKeyFieldName().orElseThrow(() ->
new ConnectException("Kafka key field must be configured when upsert/delete is enabled")
),
config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG),
config.getBoolean(config.UPSERT_ENABLED_CONFIG),
config.getBoolean(config.DELETE_ENABLED_CONFIG),
mergeBatches,
executor,
bigQuery,
schemaManager,
context
);
this.usePartitionDecorator = config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG);
this.upsertEnabled = config.getBoolean(config.UPSERT_ENABLED_CONFIG);
this.deleteEnabled = config.getBoolean(config.DELETE_ENABLED_CONFIG);
}

@VisibleForTesting
MergeQueries(String keyFieldName,
boolean insertPartitionTime,
boolean upsertEnabled,
boolean deleteEnabled,
MergeBatches mergeBatches,
KCBQThreadPoolExecutor executor,
BigQuery bigQuery,
SchemaManager schemaManager,
SinkTaskContext context) {
this.keyFieldName = keyFieldName;
this.insertPartitionTime = insertPartitionTime;
this.upsertEnabled = upsertEnabled;
this.deleteEnabled = deleteEnabled;
this.mergeBatches = mergeBatches;
this.executor = executor;
this.bigQuery = bigQuery;
Expand All @@ -85,56 +108,50 @@ public void mergeFlush(TableId intermediateTable) {
logger.trace("Triggering merge flush from intermediate table {} to destination table {} for batch {}",
intermediateTable, destinationTable, batchNumber);

executor.submit(() -> {
// If there are rows to flush in this batch, flush them
if (mergeBatches.prepareToFlush(intermediateTable, batchNumber)) {
try {
logger.debug("Running merge query on batch {} from intermediate table {}",
batchNumber, intermediateTable);
String mergeFlushQuery = mergeFlushQuery(intermediateTable, destinationTable, batchNumber);
logger.trace(mergeFlushQuery);
bigQuery.query(QueryJobConfiguration.of(mergeFlushQuery));
logger.trace("Merge from intermediate table {} to destination table {} completed",
intermediateTable, destinationTable);
} catch (Throwable t) {
logger.warn("Failed on merge flush from intermediate table {} to destination table {}",
intermediateTable, destinationTable, t);
throw new ConnectException(
String.format("Failed to perform merge flush from intermediate table %s to destination table %s",
intermediateTable,
destinationTable),
t);
}

logger.debug("Recording flush success for batch {} from {}",
batchNumber, intermediateTable);
mergeBatches.recordSuccessfulFlush(intermediateTable, batchNumber);

// Commit those offsets ASAP
context.requestCommit();

logger.info("Completed merge flush of batch {} from {} to {}",
batchNumber, intermediateTable, destinationTable);
}

// After, regardless of whether we flushed or not, clean up old batches from the intermediate
// table. Some rows may be several batches old but still in the table if they were in the
// streaming buffer during the last purge.
executor.execute(() -> {
try {
logger.trace("Clearing batches from {} on back from intermediate table {}", batchNumber, intermediateTable);
String tableClearQuery = clearBatchQuery(intermediateTable, batchNumber);
logger.trace(tableClearQuery);
bigQuery.query(QueryJobConfiguration.of(tableClearQuery));
} catch (Throwable t) {
logger.error("Failed to clear old batches from intermediate table {}", intermediateTable, t);
throw new ConnectException(
String.format("Failed to clear old batches from intermediate table %s",
intermediateTable),
t);
mergeFlush(intermediateTable, destinationTable, batchNumber);
} catch (InterruptedException e) {
throw new ConnectException(String.format(
"Interrupted while performing merge flush of batch %d from %s to %s",
batchNumber, intermediateTable, destinationTable));
}
});
}

private void mergeFlush(
TableId intermediateTable, TableId destinationTable, int batchNumber
) throws InterruptedException{
// If there are rows to flush in this batch, flush them
if (mergeBatches.prepareToFlush(intermediateTable, batchNumber)) {
logger.debug("Running merge query on batch {} from intermediate table {}",
batchNumber, intermediateTable);
String mergeFlushQuery = mergeFlushQuery(intermediateTable, destinationTable, batchNumber);
logger.trace(mergeFlushQuery);
bigQuery.query(QueryJobConfiguration.of(mergeFlushQuery));
logger.trace("Merge from intermediate table {} to destination table {} completed",
intermediateTable, destinationTable);

logger.debug("Recording flush success for batch {} from {}",
batchNumber, intermediateTable);
mergeBatches.recordSuccessfulFlush(intermediateTable, batchNumber);

// Commit those offsets ASAP
context.requestCommit();

logger.info("Completed merge flush of batch {} from {} to {}",
batchNumber, intermediateTable, destinationTable);
}

// After, regardless of whether we flushed or not, clean up old batches from the intermediate
// table. Some rows may be several batches old but still in the table if they were in the
// streaming buffer during the last purge.
logger.trace("Clearing batches from {} on back from intermediate table {}", batchNumber, intermediateTable);
String batchClearQuery = batchClearQuery(intermediateTable, batchNumber);
logger.trace(batchClearQuery);
bigQuery.query(QueryJobConfiguration.of(batchClearQuery));
}

/*
upsert+delete:
Expand Down Expand Up @@ -214,13 +231,14 @@ THEN INSERT (<keyField, _PARTITIONTIME, <valueField[, <valueField])
);
*/
private String mergeFlushQuery(TableId intermediateTable, TableId destinationTable, int batchNumber) {
@VisibleForTesting
String mergeFlushQuery(TableId intermediateTable, TableId destinationTable, int batchNumber) {
Schema intermediateSchema = schemaManager.cachedSchema(intermediateTable);

String srcKey = INTERMEDIATE_TABLE_KEY_FIELD_NAME;

List<String> keyFields = listFields(
intermediateSchema.getFields().get(keyFieldName).getSubFields(),
intermediateSchema.getFields().get(srcKey).getSubFields(),
srcKey + "."
);
List<String> dstValueFields = intermediateSchema.getFields().get(INTERMEDIATE_TABLE_VALUE_FIELD_NAME).getSubFields()
Expand All @@ -235,8 +253,8 @@ private String mergeFlushQuery(TableId intermediateTable, TableId destinationTab
.map(field -> field + "=`src`." + INTERMEDIATE_TABLE_VALUE_FIELD_NAME + "." + field)
.collect(Collectors.toList());

String partitionTimeField = usePartitionDecorator ? "_PARTITIONTIME, " : "";
String partitionTimeValue = usePartitionDecorator
String partitionTimeField = insertPartitionTime ? "_PARTITIONTIME, " : "";
String partitionTimeValue = insertPartitionTime
? "CAST(CAST(DATE(`src`." + INTERMEDIATE_TABLE_PARTITION_TIME_FIELD_NAME + ") AS DATE) AS TIMESTAMP), "
: "";

Expand Down Expand Up @@ -299,9 +317,9 @@ private String mergeFlushQuery(TableId intermediateTable, TableId destinationTab
// Assume all rows have non-null values and upsert them all
return mergeOpening
.append("ON ").append(keysMatch).append(" ")
.append("WHEN MATCHED")
.append("WHEN MATCHED ")
.append(updateClause).append(" ")
.append("WHEN NOT MATCHED")
.append("WHEN NOT MATCHED ")
.append(insertClause)
.append(";")
.toString();
Expand All @@ -311,7 +329,8 @@ private String mergeFlushQuery(TableId intermediateTable, TableId destinationTab
}

// DELETE FROM `<intermediateTable>` WHERE batchNumber <= <batchNumber> AND _PARTITIONTIME IS NOT NULL;
private static String clearBatchQuery(TableId intermediateTable, int batchNumber) {
@VisibleForTesting
static String batchClearQuery(TableId intermediateTable, int batchNumber) {
return new StringBuilder("DELETE FROM `").append(intermediateTable.getDataset()).append("`.`").append(intermediateTable.getTable()).append("` ")
.append("WHERE ")
.append(INTERMEDIATE_TABLE_BATCH_NUMBER_FIELD).append(" <= ").append(batchNumber).append(" ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ private List<Field> getIntermediateSchemaFields(com.google.cloud.bigquery.Schema

com.google.cloud.bigquery.Schema keySchema = schemaConverter.convertSchema(kafkaKeySchema);
Field kafkaKeyField = Field.newBuilder(MergeQueries.INTERMEDIATE_TABLE_KEY_FIELD_NAME, LegacySQLTypeName.RECORD, keySchema.getFields())
.setMode(Field.Mode.REQUIRED).build();
.setMode(Field.Mode.REQUIRED)
.build();
result.add(kafkaKeyField);

Field partitionTimeField = Field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Optional;

/**
Expand Down Expand Up @@ -473,62 +476,71 @@ public static ConfigDef getConfig() {
* @param props sink configuration properties
*/
public static void validate(Map<String, String> props) {
final boolean hasTopicsConfig = hasTopicsConfig(props);
final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
final boolean hasTopicsConfig = hasTopicsConfig(props);
final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);

if (hasTopicsConfig && hasTopicsRegexConfig) {
throw new ConfigException(TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG +
" are mutually exclusive options, but both are set.");
if (hasTopicsConfig && hasTopicsRegexConfig) {
throw new ConfigException(TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG +
" are mutually exclusive options, but both are set.");
}

if (!hasTopicsConfig && !hasTopicsRegexConfig) {
throw new ConfigException("Must configure one of " +
TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG);
}

if (upsertDeleteEnabled(props)) {
if (gcsBatchLoadingEnabled(props)) {
throw new ConfigException("Cannot enable both upsert/delete and GCS batch loading");
}

if (!hasTopicsConfig && !hasTopicsRegexConfig) {
throw new ConfigException("Must configure one of " +
TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG);
String mergeIntervalStr = Optional.ofNullable(props.get(MERGE_INTERVAL_MS_CONFIG))
.map(String::trim)
.orElse(Long.toString(MERGE_INTERVAL_MS_DEFAULT));
String mergeRecordsThresholdStr = Optional.ofNullable(props.get(MERGE_RECORDS_THRESHOLD_CONFIG))
.map(String::trim)
.orElse(Long.toString(MERGE_RECORDS_THRESHOLD_DEFAULT));
if ("-1".equals(mergeIntervalStr) && "-1".equals(mergeRecordsThresholdStr)) {
throw new ConfigException(MERGE_INTERVAL_MS_CONFIG + " and "
+ MERGE_RECORDS_THRESHOLD_CONFIG + " cannot both be -1");
}

if (upsertDeleteEnabled(props)) {
String mergeIntervalStr = Optional.ofNullable(props.get(MERGE_INTERVAL_MS_CONFIG))
.map(String::trim)
.orElse(Long.toString(MERGE_INTERVAL_MS_DEFAULT));
String mergeRecordsThresholdStr = Optional.ofNullable(props.get(MERGE_RECORDS_THRESHOLD_CONFIG))
.map(String::trim)
.orElse(Long.toString(MERGE_RECORDS_THRESHOLD_DEFAULT));
if ("-1".equals(mergeIntervalStr) && "-1".equals(mergeRecordsThresholdStr)) {
throw new ConfigException(MERGE_INTERVAL_MS_CONFIG + " and "
+ MERGE_RECORDS_THRESHOLD_CONFIG + " cannot both be -1");
}

if ("0".equals(mergeIntervalStr)) {
throw new ConfigException(MERGE_INTERVAL_MS_CONFIG, mergeIntervalStr, "cannot be zero");
}
if ("0".equals(mergeRecordsThresholdStr)) {
throw new ConfigException(MERGE_RECORDS_THRESHOLD_CONFIG, mergeRecordsThresholdStr, "cannot be zero");
}

String kafkaKeyFieldStr = props.get(KAFKA_KEY_FIELD_NAME_CONFIG);
if (kafkaKeyFieldStr == null || kafkaKeyFieldStr.trim().isEmpty()) {
throw new ConfigException(KAFKA_KEY_FIELD_NAME_CONFIG + " must be specified when "
+ UPSERT_ENABLED_CONFIG + " and/or " + DELETE_ENABLED_CONFIG + " are set to true");
}
if ("0".equals(mergeIntervalStr)) {
throw new ConfigException(MERGE_INTERVAL_MS_CONFIG, mergeIntervalStr, "cannot be zero");
}
if ("0".equals(mergeRecordsThresholdStr)) {
throw new ConfigException(MERGE_RECORDS_THRESHOLD_CONFIG, mergeRecordsThresholdStr, "cannot be zero");
}
}

public static boolean hasTopicsConfig(Map<String, String> props) {
String topicsStr = props.get(TOPICS_CONFIG);
return topicsStr != null && !topicsStr.trim().isEmpty();
String kafkaKeyFieldStr = props.get(KAFKA_KEY_FIELD_NAME_CONFIG);
if (kafkaKeyFieldStr == null || kafkaKeyFieldStr.trim().isEmpty()) {
throw new ConfigException(KAFKA_KEY_FIELD_NAME_CONFIG + " must be specified when "
+ UPSERT_ENABLED_CONFIG + " and/or " + DELETE_ENABLED_CONFIG + " are set to true");
}
}
}

public static boolean hasTopicsRegexConfig(Map<String, String> props) {
String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
}
public static boolean hasTopicsConfig(Map<String, String> props) {
String topicsStr = props.get(TOPICS_CONFIG);
return topicsStr != null && !topicsStr.trim().isEmpty();
}

public static boolean upsertDeleteEnabled(Map<String, String> props) {
String upsertStr = props.get(UPSERT_ENABLED_CONFIG);
String deleteStr = props.get(DELETE_ENABLED_CONFIG);
return Boolean.TRUE.toString().equalsIgnoreCase(upsertStr)
|| Boolean.TRUE.toString().equalsIgnoreCase(deleteStr);
}
public static boolean hasTopicsRegexConfig(Map<String, String> props) {
String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
}

public static boolean upsertDeleteEnabled(Map<String, String> props) {
String upsertStr = props.get(UPSERT_ENABLED_CONFIG);
String deleteStr = props.get(DELETE_ENABLED_CONFIG);
return Boolean.TRUE.toString().equalsIgnoreCase(upsertStr)
|| Boolean.TRUE.toString().equalsIgnoreCase(deleteStr);
}

public static boolean gcsBatchLoadingEnabled(Map<String, String> props) {
String batchLoadStr = props.get(ENABLE_BATCH_CONFIG);
return batchLoadStr != null && !batchLoadStr.isEmpty();
}

/**
* Returns the keyfile
Expand Down
Loading

0 comments on commit 5901eb0

Please sign in to comment.