Skip to content

Commit

Permalink
Most recent unique feature (#1991)
Browse files Browse the repository at this point in the history
* Refactored the SortedSet implementations to be RewritableSortedSet
* Creating the key/value and byte[]/Document sorted set mechanisms
* Refactored the Rile input and output streams allowing for extensions
* Added a RewritableSortedSet tnd other sorted set ests
* Integrated most recent unique feature into the query iterator
* Added test for the most recent unique feature
* Updated BufferedFileBackedSortedSet and HdfsBackedSortedSet to use builder pattern for construction
* Update bufferedfilebackedsortedset builder so that extending classes don't have to overload methods just to call the super implementation.
* updated unique transform builder to not depend on query logic or query iterator to be built.
* Enforce hdfs configs set when using most unique feature
* Added test case to ensure most recent flag works with unique field parameter or function, and uppercase vs lowercase
* Ensure fields are uppercased
* Updated test cases to deal with upper cased field names
* Ensure group fields are uppercased as well
* Updated most recent transform to return docs in dockey order
  • Loading branch information
ivakegg committed Apr 4, 2024
1 parent fb7480c commit a4587af
Show file tree
Hide file tree
Showing 59 changed files with 3,409 additions and 1,092 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1297,8 +1297,11 @@ protected void setupRowBasedHdfsBackedSet(String row) throws IOException {
this.createdRowDir = false;
}

this.set = new HdfsBackedSortedSet<>(null, hdfsBackedSetBufferSize, ivaratorCacheDirs, row, maxOpenFiles, numRetries, persistOptions,
new FileKeySortedSet.Factory());
// noinspection unchecked
this.set = (HdfsBackedSortedSet<Key>) HdfsBackedSortedSet.builder().withBufferPersistThreshold(hdfsBackedSetBufferSize)
.withIvaratorCacheDirs(ivaratorCacheDirs).withUniqueSubPath(row).withMaxOpenFiles(maxOpenFiles).withNumRetries(numRetries)
.withPersistOptions(persistOptions).withSetFactory(new FileKeySortedSet.Factory()).build();

this.threadSafeSet = Collections.synchronizedSortedSet(this.set);
this.currentRow = row;
this.setControl.takeOwnership(row, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ public class FileSystemCache {

public FileSystemCache(String hdfsSiteConfigs) throws MalformedURLException {
conf = new Configuration();
for (String url : org.apache.commons.lang.StringUtils.split(hdfsSiteConfigs, ',')) {
conf.addResource(new URL(url));
if (hdfsSiteConfigs != null) {
for (String url : org.apache.commons.lang.StringUtils.split(hdfsSiteConfigs, ',')) {
conf.addResource(new URL(url));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public class QueryParameters {

public static final String GROUP_FIELDS_BATCH_SIZE = "group.fields.batch.size";
public static final String UNIQUE_FIELDS = "unique.fields";
public static final String MOST_RECENT_UNIQUE = "most.recent.unique";

/**
* Used to specify fields which are excluded from QueryModel expansion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
Expand All @@ -25,9 +26,10 @@
* captured as a parameter string using {@link UniqueFields#toString()}, and transformed back into a {@link UniqueFields} instance via
* {@link UniqueFields#from(String)}.
*/
public class UniqueFields implements Serializable {
public class UniqueFields implements Serializable, Cloneable {

private Multimap<String,UniqueGranularity> fieldMap;
private final SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
private boolean mostRecent = false;

/**
* Returns a new {@link UniqueFields} parsed from this string. The provided string is expected to have the format returned by
Expand Down Expand Up @@ -128,24 +130,19 @@ private static UniqueGranularity parseGranularity(String granularity) {
}

/**
* Return a copy of the given {@link UniqueFields}.
* Return a clone of this class
*
* @param other
* the other instance to copy
* @return the copy
*/
public static UniqueFields copyOf(UniqueFields other) {
if (other == null) {
return null;
}
UniqueFields uniqueFields = new UniqueFields();
uniqueFields.fieldMap = TreeMultimap.create(other.fieldMap);
return uniqueFields;
@Override
public UniqueFields clone() {
UniqueFields newFields = new UniqueFields();
newFields.fieldMap.putAll(this.fieldMap);
newFields.mostRecent = this.mostRecent;
return newFields;
}

public UniqueFields() {
fieldMap = TreeMultimap.create();
}
public UniqueFields() {}

/**
* Create a new {@link UniqueFields} with the provided map as the underlying field map.
Expand All @@ -154,7 +151,24 @@ public UniqueFields() {
* the field map to use
*/
public UniqueFields(SortedSetMultimap<String,UniqueGranularity> fieldMap) {
this.fieldMap = fieldMap;
putAll(fieldMap);
}

/**
* Clear out the field map
*/
public UniqueFields clear() {
this.fieldMap.clear();
return this;
}

/**
* Set the field map
*
* @param fields
*/
public UniqueFields set(Multimap<String,UniqueGranularity> fields) {
return clear().putAll(fields);
}

/**
Expand All @@ -165,8 +179,9 @@ public UniqueFields(SortedSetMultimap<String,UniqueGranularity> fieldMap) {
* @param uniqueGranularity
* the granularity
*/
public void put(String field, UniqueGranularity uniqueGranularity) {
fieldMap.put(field, uniqueGranularity);
public UniqueFields put(String field, UniqueGranularity uniqueGranularity) {
fieldMap.put(JexlASTHelper.deconstructIdentifier(field).toUpperCase(), uniqueGranularity);
return this;
}

/**
Expand All @@ -175,10 +190,13 @@ public void put(String field, UniqueGranularity uniqueGranularity) {
* @param fieldMap
* the field map to add entries from
*/
public void putAll(Multimap<String,UniqueGranularity> fieldMap) {
public UniqueFields putAll(Multimap<String,UniqueGranularity> fieldMap) {
if (fieldMap != null) {
this.fieldMap.putAll(fieldMap);
for (String field : fieldMap.keySet()) {
this.fieldMap.putAll(JexlASTHelper.deconstructIdentifier(field).toUpperCase(), fieldMap.get(field));
}
}
return this;
}

/**
Expand Down Expand Up @@ -209,23 +227,7 @@ public Set<String> getFields() {
* @return the field map
*/
public Multimap<String,UniqueGranularity> getFieldMap() {
return fieldMap;
}

/**
* Replace any identifier fields with their deconstructed version.
*/
public void deconstructIdentifierFields() {
Multimap<String,UniqueGranularity> newFieldMap = TreeMultimap.create();
for (String field : fieldMap.keySet()) {
String newField = JexlASTHelper.deconstructIdentifier(field);
if (newField.equals(field)) {
newFieldMap.putAll(field, fieldMap.get(field));
} else {
newFieldMap.putAll(newField, fieldMap.get(field));
}
}
this.fieldMap = newFieldMap;
return TreeMultimap.create(fieldMap);
}

/**
Expand All @@ -238,12 +240,11 @@ public void remapFields(Multimap<String,String> model) {
Multimap<String,UniqueGranularity> newFieldMap = TreeMultimap.create(fieldMap);
for (String field : fieldMap.keySet()) {
Collection<UniqueGranularity> granularities = fieldMap.get(field);
field = field.toUpperCase();
if (model.containsKey(field)) {
model.get(field).forEach((newField) -> newFieldMap.putAll(newField, granularities));
}
}
this.fieldMap = newFieldMap;
set(newFieldMap);
}

/**
Expand Down Expand Up @@ -326,6 +327,15 @@ public String toString() {
return sb.toString();
}

public boolean isMostRecent() {
return mostRecent;
}

public UniqueFields setMostRecent(boolean mostRecent) {
this.mostRecent = mostRecent;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -335,12 +345,12 @@ public boolean equals(Object o) {
return false;
}
UniqueFields that = (UniqueFields) o;
return Objects.equals(fieldMap, that.fieldMap);
return Objects.equals(fieldMap, that.fieldMap) && mostRecent == that.mostRecent;
}

@Override
public int hashCode() {
return Objects.hash(fieldMap);
return Objects.hash(fieldMap, mostRecent);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void deconstructIdentifiers() {

// Return a copy of the given set with all identifiers deconstructed.
private Set<String> deconstructIdentifiers(Set<String> set) {
return set.stream().map(JexlASTHelper::deconstructIdentifier).collect(Collectors.toSet());
return set.stream().map(JexlASTHelper::deconstructIdentifier).map(String::toUpperCase).collect(Collectors.toSet());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
private List<IvaratorCacheDirConfig> ivaratorCacheDirConfigs = Collections.emptyList();
private String ivaratorFstHdfsBaseURIs = null;
private int ivaratorCacheBufferSize = 10000;

private int uniqueCacheBufferSize = 100;
private long ivaratorCacheScanPersistThreshold = 100000L;
private long ivaratorCacheScanTimeout = 1000L * 60 * 60;
private int maxFieldIndexRangeSplit = 11;
Expand Down Expand Up @@ -380,6 +382,7 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
private int groupFieldsBatchSize;
private boolean accrueStats = false;
private UniqueFields uniqueFields = new UniqueFields();
private boolean mostRecentUnique = false;
private boolean cacheModel = false;
/**
* should the sizes of documents be tracked for this query
Expand Down Expand Up @@ -665,7 +668,8 @@ public ShardQueryConfiguration(ShardQueryConfiguration other) {
this.setCompositeFilterFunctionsEnabled(other.isCompositeFilterFunctionsEnabled());
this.setGroupFieldsBatchSize(other.getGroupFieldsBatchSize());
this.setAccrueStats(other.getAccrueStats());
this.setUniqueFields(UniqueFields.copyOf(other.getUniqueFields()));
this.setUniqueFields(other.getUniqueFields());
this.setUniqueCacheBufferSize(other.getUniqueCacheBufferSize());
this.setCacheModel(other.getCacheModel());
this.setTrackSizes(other.isTrackSizes());
this.setContentFieldNames(null == other.getContentFieldNames() ? null : Lists.newArrayList(other.getContentFieldNames()));
Expand Down Expand Up @@ -1414,6 +1418,14 @@ public void setIvaratorFstHdfsBaseURIs(String ivaratorFstHdfsBaseURIs) {
this.ivaratorFstHdfsBaseURIs = ivaratorFstHdfsBaseURIs;
}

public int getUniqueCacheBufferSize() {
return uniqueCacheBufferSize;
}

public void setUniqueCacheBufferSize(int uniqueCacheBufferSize) {
this.uniqueCacheBufferSize = uniqueCacheBufferSize;
}

public int getIvaratorCacheBufferSize() {
return ivaratorCacheBufferSize;
}
Expand Down Expand Up @@ -1774,11 +1786,7 @@ public UniqueFields getUniqueFields() {
}

public void setUniqueFields(UniqueFields uniqueFields) {
this.uniqueFields = uniqueFields;
// If unique fields are present, make sure they are deconstructed by this point.
if (uniqueFields != null) {
uniqueFields.deconstructIdentifierFields();
}
this.uniqueFields = uniqueFields.clone();
}

public boolean isHitList() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import datawave.webservice.query.cache.ResultsPage;
import datawave.webservice.query.dashboard.DashboardFields;
import datawave.webservice.query.dashboard.DashboardSummary;
import datawave.webservice.query.exception.QueryException;
import datawave.webservice.query.logic.QueryLogicTransformer;
import datawave.webservice.query.logic.ResponseEnricher;
import datawave.webservice.query.result.event.EventBase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,7 @@ else if (documentRange != null && (!this.isContainsIndexOnlyTerms() && this.getT
// now apply the unique iterator if requested
UniqueTransform uniquify = getUniqueTransform();
if (uniquify != null) {
// pipelineDocuments = uniquify;
pipelineDocuments = Iterators.filter(pipelineDocuments, uniquify.getUniquePredicate());
pipelineDocuments = uniquify.getIterator(pipelineDocuments);
}

// apply the grouping iterator if requested and if the batch size is greater than zero
Expand Down Expand Up @@ -1541,11 +1540,23 @@ public Comparator<Object> getValueComparator(Tuple3<Key,Document,Map<String,Obje
return new ValueComparator(from.second().getMetadata());
}

protected UniqueTransform getUniqueTransform() {
protected UniqueTransform getUniqueTransform() throws IOException {
if (uniqueTransform == null && getUniqueFields() != null && !getUniqueFields().isEmpty()) {
synchronized (getUniqueFields()) {
if (uniqueTransform == null) {
uniqueTransform = new UniqueTransform(getUniqueFields(), getResultTimeout());
// @formatter:off
uniqueTransform = new UniqueTransform.Builder()
.withUniqueFields(getUniqueFields())
.withQueryExecutionForPageTimeout(getResultTimeout())
.withBufferPersistThreshold(getUniqueCacheBufferSize())
.withIvaratorCacheDirConfigs(getIvaratorCacheDirConfigs())
.withHdfsSiteConfigURLs(getHdfsSiteConfigURLs())
.withSubDirectory(getQueryId() + "-" + getScanId())
.withMaxOpenFiles(getIvaratorMaxOpenFiles())
.withNumRetries(getIvaratorNumRetries())
.withPersistOptions(getIvaratorPersistOptions())
.build();
// @formatter:on
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ public class QueryOptions implements OptionDescriber {
public static final String GROUP_FIELDS = "group.fields";
public static final String GROUP_FIELDS_BATCH_SIZE = "group.fields.batch.size";
public static final String UNIQUE_FIELDS = "unique.fields";
public static final String MOST_RECENT_UNIQUE = "most.recent.unique";
public static final String UNIQUE_CACHE_BUFFER_SIZE = "unique.cache.buffer.size";

public static final String HITS_ONLY = "hits.only";
public static final String HIT_LIST = "hit.list";
public static final String START_TIME = "start.time";
Expand Down Expand Up @@ -313,6 +316,7 @@ public class QueryOptions implements OptionDescriber {
protected GroupFields groupFields = new GroupFields();
protected int groupFieldsBatchSize = Integer.MAX_VALUE;
protected UniqueFields uniqueFields = new UniqueFields();
protected int uniqueCacheBufferSize = 100;

protected Set<String> hitsOnlySet = new HashSet<>();

Expand Down Expand Up @@ -506,6 +510,7 @@ public void deepCopy(QueryOptions other) {
this.ivaratorCacheDirConfigs = (other.ivaratorCacheDirConfigs == null) ? null : new ArrayList<>(other.ivaratorCacheDirConfigs);
this.hdfsSiteConfigURLs = other.hdfsSiteConfigURLs;
this.ivaratorCacheBufferSize = other.ivaratorCacheBufferSize;
this.uniqueCacheBufferSize = other.uniqueCacheBufferSize;
this.ivaratorCacheScanPersistThreshold = other.ivaratorCacheScanPersistThreshold;
this.ivaratorCacheScanTimeout = other.ivaratorCacheScanTimeout;
this.hdfsFileCompressionCodec = other.hdfsFileCompressionCodec;
Expand Down Expand Up @@ -958,6 +963,14 @@ public void setIvaratorCacheBufferSize(int ivaratorCacheBufferSize) {
this.ivaratorCacheBufferSize = ivaratorCacheBufferSize;
}

public int getUniqueCacheBufferSize() {
return uniqueCacheBufferSize;
}

public void setUniqueCacheBufferSize(int uniqueCacheBufferSize) {
this.uniqueCacheBufferSize = uniqueCacheBufferSize;
}

public long getIvaratorCacheScanPersistThreshold() {
return ivaratorCacheScanPersistThreshold;
}
Expand Down Expand Up @@ -1103,7 +1116,7 @@ public UniqueFields getUniqueFields() {
}

public void setUniqueFields(UniqueFields uniqueFields) {
this.uniqueFields = uniqueFields;
this.uniqueFields = uniqueFields.clone();
}

public Set<String> getHitsOnlySet() {
Expand Down Expand Up @@ -1582,6 +1595,12 @@ public boolean validateOptions(Map<String,String> options) {

if (options.containsKey(UNIQUE_FIELDS)) {
this.setUniqueFields(UniqueFields.from(options.get(UNIQUE_FIELDS)));
if (options.containsKey(MOST_RECENT_UNIQUE)) {
this.getUniqueFields().setMostRecent(Boolean.valueOf(options.get(MOST_RECENT_UNIQUE)));
if (options.containsKey(UNIQUE_CACHE_BUFFER_SIZE)) {
this.setUniqueCacheBufferSize(Integer.parseInt(options.get(UNIQUE_CACHE_BUFFER_SIZE)));
}
}
}

if (options.containsKey(HIT_LIST)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public class FinalDocumentTrackingIterator implements Iterator<Entry<Key,Documen
private boolean statsEntryReturned;
private Key lastKey = null;

private static final Text MARKER_TEXT = new Text("\u2735FinalDocument\u2735");
private static final ByteSequence MARKER_SEQUENCE = new ArrayByteSequence(MARKER_TEXT.getBytes(), 0, MARKER_TEXT.getLength());
public static final Text MARKER_TEXT = new Text("\u2735FinalDocument\u2735");
public static final ByteSequence MARKER_SEQUENCE = new ArrayByteSequence(MARKER_TEXT.getBytes(), 0, MARKER_TEXT.getLength());

private final Range seekRange;
private final QuerySpanCollector querySpanCollector;
Expand Down
Loading

0 comments on commit a4587af

Please sign in to comment.