Skip to content

Commit

Permalink
[GOBBLIN-2167] Allow filtering of Hive datasets by underlying HDFS fo…
Browse files Browse the repository at this point in the history
…lder location (#4069)

* Add regex filter for table based on location
  • Loading branch information
Will-Lo authored Oct 22, 2024
1 parent e3baf91 commit 916d570
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,14 @@

package org.apache.gobblin.data.management.copy.hive;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import javax.annotation.Nonnull;

import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
Expand All @@ -43,12 +37,18 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import javax.annotation.Nonnull;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientCache;
import org.apache.gobblin.config.client.ConfigClientUtils;
Expand Down Expand Up @@ -80,6 +80,9 @@ public class HiveDatasetFinder implements IterableDatasetFinder<HiveDataset> {
public static final String DEFAULT_TABLE_PATTERN = "*";
public static final String TABLE_FILTER = HIVE_DATASET_PREFIX + ".tableFilter";

// Property used to filter tables only physically within a folder, represented by a regex
public static final String TABLE_FOLDER_ALLOWLIST_FILTER = HIVE_DATASET_PREFIX + ".tableFolderAllowlistFilter";

/*
* By setting the prefix, only config keys with this prefix will be used to build a HiveDataset.
* By passing scoped configurations the same config keys can be used in different contexts.
Expand Down Expand Up @@ -118,6 +121,8 @@ public class HiveDatasetFinder implements IterableDatasetFinder<HiveDataset> {
protected final Function<Table, String> configStoreDatasetUriBuilder;
protected final Optional<Predicate<Table>> tableFilter;

protected final Optional<Pattern> tableFolderAllowlistRegex;

protected final String datasetConfigPrefix;
protected final ConfigClient configClient;
private final Config jobConfig;
Expand Down Expand Up @@ -194,6 +199,8 @@ protected HiveDatasetFinder(FileSystem fs, Properties properties, HiveMetastoreC
} else {
this.tableFilter = Optional.absent();
}
this.tableFolderAllowlistRegex = properties.containsKey(TABLE_FOLDER_ALLOWLIST_FILTER) ?
Optional.of(Pattern.compile(properties.getProperty(TABLE_FOLDER_ALLOWLIST_FILTER))): Optional.absent();
}

protected static HiveMetastoreClientPool createClientPool(Properties properties) throws IOException {
Expand Down Expand Up @@ -262,7 +269,10 @@ protected HiveDataset computeNext() {

try (AutoReturnableObject<IMetaStoreClient> client = HiveDatasetFinder.this.clientPool.getClient()) {
Table table = client.get().getTable(dbAndTable.getDb(), dbAndTable.getTable());
if (tableFilter.isPresent() && !tableFilter.get().apply(table)) {
if ((tableFilter.isPresent() && !tableFilter.get().apply(table))
|| !shouldAllowTableLocation(tableFolderAllowlistRegex, table)) {
log.info("Ignoring table {} as its underlying location {} does not pass allowlist regex {}", dbAndTable,
table.getSd().getLocation(), tableFolderAllowlistRegex.get());
continue;
}

Expand Down Expand Up @@ -294,6 +304,12 @@ protected HiveDataset computeNext() {
};
}

protected static boolean shouldAllowTableLocation(Optional<Pattern> regex, Table table) {
if (!regex.isPresent()) {
return true;
}
return regex.get().matcher(table.getSd().getLocation()).matches();
}

/**
* @deprecated Use {@link #createHiveDataset(Table, Config)} instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,51 @@ public void testDatasetConfig() throws Exception {

}

@Test
public void testHiveTableFolderAllowlistFilter() throws Exception {
List<HiveDatasetFinder.DbAndTable> dbAndTables = Lists.newArrayList();
dbAndTables.add(new HiveDatasetFinder.DbAndTable("db1", "table1"));
// This table is created on /tmp/test
HiveMetastoreClientPool pool = getTestPool(dbAndTables);

Properties properties = new Properties();
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
// Try a regex with multiple groups
properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "(/tmp/|a).*");

HiveDatasetFinder finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
List<HiveDataset> datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 1);

properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
// The table located at /tmp/test should be filtered
properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "/a/b");

finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 0);

// Test empty filter
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
// The table located at /tmp/test should be filtered
properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "");

finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 0);

// Test no regex config
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");

finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 0);
}

private HiveMetastoreClientPool getTestPool(List<HiveDatasetFinder.DbAndTable> dbAndTables) throws Exception {

SetMultimap<String, String> entities = HashMultimap.create();
Expand Down

0 comments on commit 916d570

Please sign in to comment.