Skip to content

Commit

Permalink
[GOBBLIN-1927] Add topic validation support in KafkaSource, and add T…
Browse files Browse the repository at this point in the history
…opicNameValidator (#3793)

* * Add generic topic validation support
* Add the first validator TopicNameValidator into the validator chain, as a refactor of existing codes

* Refine to address comments

* Refine

---------

Co-authored-by: Tao Qin <[email protected]>
  • Loading branch information
wsarecv and Tao Qin committed Oct 18, 2023
1 parent cea7cda commit 6266a12
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.EventBasedSource;
import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
Expand Down Expand Up @@ -218,7 +219,7 @@ public List<WorkUnit> getWorkunits(SourceState state) {

this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));

List<KafkaTopic> topics = getFilteredTopics(state);
List<KafkaTopic> topics = getValidTopics(getFilteredTopics(state), state);
this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());

for (String topic : this.topicsToProcess) {
Expand Down Expand Up @@ -802,6 +803,7 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets
protected List<KafkaTopic> getFilteredTopics(SourceState state) {
List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state, TOPIC_BLACKLIST);
List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state, TOPIC_WHITELIST);
// TODO: replace this with TopicNameValidator in the config once TopicValidators is rolled out.
if (!state.getPropAsBoolean(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, true)) {
blacklist.add(Pattern.compile(".*\\..*"));
}
Expand All @@ -815,6 +817,13 @@ public void shutdown(SourceState state) {
state.setProp(ConfigurationKeys.FAIL_TO_GET_OFFSET_COUNT, this.failToGetOffsetCount);
}

/**
* Return topics that pass all the topic validators.
*/
protected List<KafkaTopic> getValidTopics(List<KafkaTopic> topics, SourceState state) {
return new TopicValidators(state).validate(topics);
}

/**
* This class contains startOffset, earliestOffset and latestOffset for a Kafka partition.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.source.extractor.extract.kafka.validator;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;

/**
* A topic validator that validates the topic name
*/
public class TopicNameValidator extends TopicValidatorBase {
private static final String DOT = ".";

public TopicNameValidator(State state) {
super(state);
}

/**
* Check if a topic name is valid, current rules are:
* 1. must not contain "."
* @param topic the topic to be validated
* @return true if the topic name is valid (aka. doesn't contain ".")
*/
@Override
public boolean validate(KafkaTopic topic) throws Exception {
return !topic.getName().contains(DOT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.source.extractor.extract.kafka.validator;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;

/**
* The base class of a topic validator
*/
public abstract class TopicValidatorBase {
protected State state;

public TopicValidatorBase(State sourceState) {
this.state = sourceState;
}

public abstract boolean validate(KafkaTopic topic) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.source.extractor.extract.kafka.validator;

import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;


/**
* The TopicValidators contains a list of {@link TopicValidatorBase} that validate topics.
* To enable it, add below settings in the config:
* gobblin.kafka.topicValidators=validator1_class_name,validator2_class_name...
*/
@Slf4j
public class TopicValidators {
public static final String VALIDATOR_CLASSES_KEY = "gobblin.kafka.topicValidators";

private static long DEFAULTL_TIMEOUT = 10L;

private static TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.MINUTES;

private final List<TopicValidatorBase> validators = new ArrayList<>();

private final State state;

public TopicValidators(State state) {
this.state = state;
for (String validatorClassName : state.getPropAsList(VALIDATOR_CLASSES_KEY, StringUtils.EMPTY)) {
try {
this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class, validatorClassName,
state));
} catch (Exception e) {
log.error("Failed to create topic validator: {}, due to {}", validatorClassName, e);
}
}
}

/**
* Validate topics with all the internal validators. The default timeout is set to 1 hour.
* Note:
* 1. the validations for every topic run in parallel.
* 2. when timeout happens, un-validated topics are still treated as "valid".
* @param topics the topics to be validated
* @return the topics that pass all the validators
*/
public List<KafkaTopic> validate(List<KafkaTopic> topics) {
return validate(topics, DEFAULTL_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
}

/**
* Validate topics with all the internal validators.
* Note:
* 1. the validations for every topic run in parallel.
* 2. when timeout happens, un-validated topics are still treated as "valid".
* @param topics the topics to be validated
* @param timeout the timeout for the validation
* @param timeoutUnit the time unit for the timeout
* @return the topics that pass all the validators
*/
public List<KafkaTopic> validate(List<KafkaTopic> topics, long timeout, TimeUnit timeoutUnit) {
int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);

// Tasks running in the thread pool will have the same access control and class loader settings as current thread
ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newPrivilegedThreadFactory(
Optional.of(log)));

List<Future<Boolean>> results = new ArrayList<>();
Stopwatch stopwatch = Stopwatch.createStarted();
for (KafkaTopic topic : topics) {
results.add(threadPool.submit(() -> validate(topic)));
}
ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(log), timeout, timeoutUnit);
log.info(String.format("Validate %d topics in %d seconds", topics.size(), stopwatch.elapsed(TimeUnit.SECONDS)));

List<KafkaTopic> validTopics = new ArrayList<>();
for (int i = 0; i < results.size(); ++i) {
try {
if (results.get(i).get()) {
validTopics.add(topics.get(i));
}
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to validate topic: {}, treat it as a valid topic", topics.get(i));
validTopics.add(topics.get(i));
}
}
return validTopics;
}

/**
* Validates a single topic with all the internal validators
*/
private boolean validate(KafkaTopic topic) throws Exception {
log.info("Validating topic {} in thread: {}", topic, Thread.currentThread().getName());
for (TopicValidatorBase validator : this.validators) {
if (!validator.validate(topic)) {
log.warn("KafkaTopic: {} doesn't pass the validator: {}", topic, validator.getClass().getName());
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.commons.collections.CollectionUtils;
import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicNameValidator;
import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -56,6 +59,36 @@ public void testGetFilteredTopics() {
Assert.assertEquals(new TestKafkaSource(testKafkaClient).getFilteredTopics(state), toKafkaTopicList(allTopics.subList(0, 3)));
}

@Test
public void testTopicValidators() {
TestKafkaClient testKafkaClient = new TestKafkaClient();
List<String> allTopics = Arrays.asList(
"Topic1", "topic-v2", "topic3", // allowed
"topic-with.period-in_middle", ".topic-with-period-at-start", "topicWithPeriodAtEnd.", //period topics
"not-allowed-topic");
testKafkaClient.testTopics = allTopics;
KafkaSource kafkaSource = new TestKafkaSource(testKafkaClient);

SourceState state = new SourceState();
state.setProp(KafkaSource.TOPIC_WHITELIST, ".*[Tt]opic.*");
state.setProp(KafkaSource.TOPIC_BLACKLIST, "not-allowed.*");
List<KafkaTopic> topicsToValidate = kafkaSource.getFilteredTopics(state);

// Test without TopicValidators in the state
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state),
toKafkaTopicList(allTopics.subList(0, 6))));

// Test empty TopicValidators in the state
state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, "");
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state),
toKafkaTopicList(allTopics.subList(0, 6))));

// Test TopicValidators with TopicNameValidator in the state
state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, TopicNameValidator.class.getName());
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state),
toKafkaTopicList(allTopics.subList(0, 3))));
}

public List<KafkaTopic> toKafkaTopicList(List<String> topicNames) {
return topicNames.stream().map(topicName -> new KafkaTopic(topicName, Collections.emptyList())).collect(Collectors.toList());
}
Expand Down
Loading

0 comments on commit 6266a12

Please sign in to comment.