Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2167] Allow filtering of Hive datasets by underlying HDFS folder location #4069

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,14 @@

package org.apache.gobblin.data.management.copy.hive;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import javax.annotation.Nonnull;

import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
Expand All @@ -43,12 +37,18 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import javax.annotation.Nonnull;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientCache;
import org.apache.gobblin.config.client.ConfigClientUtils;
Expand Down Expand Up @@ -80,6 +80,9 @@ public class HiveDatasetFinder implements IterableDatasetFinder<HiveDataset> {
public static final String DEFAULT_TABLE_PATTERN = "*";
public static final String TABLE_FILTER = HIVE_DATASET_PREFIX + ".tableFilter";

// Property used to filter tables only physically within a folder, represented by a regex
public static final String TABLE_FOLDER_ALLOWLIST_FILTER = HIVE_DATASET_PREFIX + ".tableFolderAllowlistFilter";

/*
* By setting the prefix, only config keys with this prefix will be used to build a HiveDataset.
* By passing scoped configurations the same config keys can be used in different contexts.
Expand Down Expand Up @@ -118,6 +121,8 @@ public class HiveDatasetFinder implements IterableDatasetFinder<HiveDataset> {
protected final Function<Table, String> configStoreDatasetUriBuilder;
protected final Optional<Predicate<Table>> tableFilter;

protected final Optional<String> tableFolderAllowlistRegex;

protected final String datasetConfigPrefix;
protected final ConfigClient configClient;
private final Config jobConfig;
Expand Down Expand Up @@ -194,6 +199,8 @@ protected HiveDatasetFinder(FileSystem fs, Properties properties, HiveMetastoreC
} else {
this.tableFilter = Optional.absent();
}
this.tableFolderAllowlistRegex = properties.containsKey(TABLE_FOLDER_ALLOWLIST_FILTER) ?
Optional.of(properties.getProperty(TABLE_FOLDER_ALLOWLIST_FILTER)): Optional.absent();
}

protected static HiveMetastoreClientPool createClientPool(Properties properties) throws IOException {
Expand Down Expand Up @@ -262,7 +269,10 @@ protected HiveDataset computeNext() {

try (AutoReturnableObject<IMetaStoreClient> client = HiveDatasetFinder.this.clientPool.getClient()) {
Table table = client.get().getTable(dbAndTable.getDb(), dbAndTable.getTable());
if (tableFilter.isPresent() && !tableFilter.get().apply(table)) {
if ((tableFilter.isPresent() && !tableFilter.get().apply(table))
|| !shouldAllowTableLocation(tableFolderAllowlistRegex, table)) {
log.info("Ignoring table {} as its underlying location {} does not pass allowlist regex {}", dbAndTable,
table.getSd().getLocation(), tableFolderAllowlistRegex);
continue;
}

Expand Down Expand Up @@ -294,6 +304,12 @@ protected HiveDataset computeNext() {
};
}

protected static boolean shouldAllowTableLocation(Optional<String> regex, Table table) {
if (!regex.isPresent()) {
return true;
}
return Pattern.compile(regex.get()).matcher(table.getSd().getLocation()).matches();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern.compile(regex.get()) is called every time the method shouldAllowTableLocation is executed when the regex is present. Compiling a regex every time can be inefficient, we can compile the pattern once and store it, eg.:
private static Pattern compiledAllowlistPattern = regex.map(Pattern::compile).orElse(null);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I compiled it as part of the class field instead, good callout

}

/**
* @deprecated Use {@link #createHiveDataset(Table, Config)} instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,32 @@ public void testDatasetConfig() throws Exception {

}

@Test
public void testHiveTableFolderFilter() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be renamed to testHiveTableFolderAllowlistFilter

List<HiveDatasetFinder.DbAndTable> dbAndTables = Lists.newArrayList();
dbAndTables.add(new HiveDatasetFinder.DbAndTable("db1", "table1"));
HiveMetastoreClientPool pool = getTestPool(dbAndTables);

Properties properties = new Properties();
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
// Try a regex with multiple groups
properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "(/tmp/|a).*");

HiveDatasetFinder finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
List<HiveDataset> datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 1);

properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
// The table located at /tmp/test should be filtered
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be helpful to call out that the dataset table is created at path /tmp/test on line 221 since there is another assertion above using a different regex which doesn't filter the table

properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "/a/b");

finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can also add a test for the case where the regex is empty or null

}

private HiveMetastoreClientPool getTestPool(List<HiveDatasetFinder.DbAndTable> dbAndTables) throws Exception {

SetMultimap<String, String> entities = HashMultimap.create();
Expand Down
Loading