Skip to content

Commit

Permalink
Revert "Fixed bug where unique results of null values failed to return (
Browse files Browse the repository at this point in the history
#2354)"

This reverts commit 15f08a3.
  • Loading branch information
ivakegg committed Jul 9, 2024
1 parent 52baa09 commit b0a8044
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;

import org.apache.commons.lang.StringUtils;

Expand All @@ -30,7 +28,7 @@
*/
public class UniqueFields implements Serializable, Cloneable {

private final TreeMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
private final SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
private boolean mostRecent = false;

/**
Expand Down Expand Up @@ -215,21 +213,21 @@ public void replace(String field, String replacement) {
}

/**
* Return the fields within this {@link UniqueFields}. Modifications to this set will modify the fields in this {@link UniqueFields}.
* Return a copy of the fields within this {@link UniqueFields}. Modifications to this set will not modify the fields in this {@link UniqueFields}.
*
* @return a copy of the fields
*/
public NavigableSet<String> getFields() {
return fieldMap.keySet();
public Set<String> getFields() {
return Sets.newHashSet(fieldMap.keySet());
}

/**
* Return the underlying field-granularity map from this {@link UniqueFields}.
*
* @return the field map
*/
public TreeMultimap<String,UniqueGranularity> getFieldMap() {
return fieldMap;
public Multimap<String,UniqueGranularity> getFieldMap() {
return TreeMultimap.create(fieldMap);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ private void addConfigBasedTransformers() throws QueryException {
if (getConfig().getUniqueFields() != null && !getConfig().getUniqueFields().isEmpty()) {
DocumentTransform alreadyExists = ((DocumentTransformer) this.transformerInstance).containsTransform(UniqueTransform.class);
if (alreadyExists != null) {
((UniqueTransform) alreadyExists).updateConfig(getConfig().getUniqueFields());
((UniqueTransform) alreadyExists).updateConfig(getConfig().getUniqueFields(), getQueryModel());
} else {
try {
// @formatter:off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface DocumentTransform extends Function<Map.Entry<Key,Document>,Map.
class DefaultDocumentTransform implements DocumentTransform {
protected Query settings;
protected MarkingFunctions markingFunctions;
protected long queryExecutionForPageStartTime = System.currentTimeMillis();
protected long queryExecutionForPageStartTime;

@Override
public void initialize(Query settings, MarkingFunctions markingFunctions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import datawave.query.common.grouping.GroupingUtils;
import datawave.query.common.grouping.Groups;
import datawave.query.iterator.profile.FinalDocumentTrackingIterator;
import datawave.query.model.QueryModel;

/**
* GroupingTransform mimics GROUP BY with a COUNT in SQL. For the given fields, this transform will group into unique combinations of values and assign a count
Expand Down Expand Up @@ -90,10 +91,6 @@ public Entry<Key,Document> apply(@Nullable Entry<Key,Document> keyDocumentEntry)
return keyDocumentEntry;
}

if (keyDocumentEntry.getValue().isIntermediateResult()) {
return keyDocumentEntry;
}

keys.add(keyDocumentEntry.getKey());
log.trace("{} get list key counts for: {}", "web-server", keyDocumentEntry);
DocumentGrouper.group(keyDocumentEntry, groupFields, groups);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -40,8 +39,10 @@
import datawave.query.iterator.ivarator.IvaratorCacheDirConfig;
import datawave.query.iterator.profile.FinalDocumentTrackingIterator;
import datawave.query.model.QueryModel;
import datawave.query.tables.ShardQueryLogic;
import datawave.query.util.sortedset.ByteArrayComparator;
import datawave.query.util.sortedset.FileByteDocumentSortedSet;
import datawave.query.util.sortedset.FileKeySortedSet;
import datawave.query.util.sortedset.FileKeyValueSortedSet;
import datawave.query.util.sortedset.FileSortedSet;
import datawave.query.util.sortedset.HdfsBackedSortedSet;
Expand All @@ -58,6 +59,7 @@ public class UniqueTransform extends DocumentTransform.DefaultDocumentTransform

private BloomFilter<byte[]> bloom;
private UniqueFields uniqueFields = new UniqueFields();
private Multimap<String,String> modelMapping;
private HdfsBackedSortedSet<Entry<byte[],Document>> set;
private HdfsBackedSortedSet<Entry<Key,Document>> returnSet;
private Iterator<Entry<Key,Document>> setIterator;
Expand Down Expand Up @@ -87,13 +89,40 @@ public UniqueTransform(UniqueFields uniqueFields, long queryExecutionForPageTime
}
}

/*
* Create a new {@link UniqueTransform} that will use a bloom filter to return on those results that are unique per the uniqueFields. Special uniqueness can
* be requested for date/time fields (@see UniqueFields). The logic will be used to get a query model to include the reverse mappings in the unique field
* set
*
* @param logic The query logic from whih to pull the query model
*
* @param uniqueFields The unique fields
*
* @param queryExecutionForPageTimeout If this timeout is passed before since the last result was returned, then an "intermediate" result is returned
* denoting we are still looking for the next unique result.
*/
public UniqueTransform(ShardQueryLogic logic, UniqueFields uniqueFields, long queryExecutionForPageTimeout) {
this(uniqueFields, queryExecutionForPageTimeout);
QueryModel model = logic.getQueryModel();
if (model != null) {
modelMapping = HashMultimap.create();
// reverse the reverse query mapping which will give us a mapping from the final field name to the original field name(s)
for (Map.Entry<String,String> entry : model.getReverseQueryMapping().entrySet()) {
modelMapping.put(entry.getValue(), entry.getKey());
}
}
setModelMappings(model);
}

/**
* Update the configuration of this transform. If the configuration is actually changing, then the bloom filter will be reset as well.
*
* @param uniqueFields
* The new set of unique fields.
* @param model
* The query model
*/
public void updateConfig(UniqueFields uniqueFields) {
public void updateConfig(UniqueFields uniqueFields, QueryModel model) {
// only reset the bloom filter if changing the field set
if (!this.uniqueFields.equals(uniqueFields)) {
this.uniqueFields = uniqueFields.clone();
Expand All @@ -103,6 +132,23 @@ public void updateConfig(UniqueFields uniqueFields) {
log.trace("unique fields: " + this.uniqueFields.getFields());
}
}
setModelMappings(model);
}

/**
* Set the query model from which the reverse query mappings are pulled.
*
* @param model
* The query model
*/
private void setModelMappings(QueryModel model) {
if (model != null) {
modelMapping = HashMultimap.create();
// reverse the reverse query mapping which will give us a mapping from the final field name to the original field name(s)
for (Map.Entry<String,String> entry : model.getReverseQueryMapping().entrySet()) {
modelMapping.put(entry.getValue(), entry.getKey());
}
}
}

/**
Expand Down Expand Up @@ -131,10 +177,6 @@ public Entry<Key,Document> apply(@Nullable Entry<Key,Document> keyDocumentEntry)
return keyDocumentEntry;
}

if (keyDocumentEntry.getValue().isIntermediateResult()) {
return keyDocumentEntry;
}

try {
if (set != null) {
byte[] signature = getBytes(keyDocumentEntry.getValue());
Expand Down Expand Up @@ -238,48 +280,50 @@ byte[] getBytes(Document document) throws IOException {
* if we failed to generate the byte array
*/
private void outputSortedFieldValues(Document document, DataOutputStream output) throws IOException {
Multimap<String,String> values = HashMultimap.create();
int count = 0;
String lastField = "";
List<String> values = new ArrayList<>();
for (String documentField : new TreeSet<>(document.getDictionary().keySet())) {
String field = getUniqueField(documentField);
if (field != null) {
if (!field.equals(lastField)) {
count = dumpValues(count, lastField, values, output);
lastField = field;
}
addValues(field, document.get(documentField), values);
}
}
// Always dump the fields in the same order (uniqueFields.getFields is a sorted collection)
for (String field : uniqueFields.getFields()) {
dumpValues(field, values.get(field), output);
}
dumpValues(count, lastField, values, output);
output.flush();
}

/**
* Dump a list of values, sorted, to the data output stream
*
* @param count
* value count
* @param field
* a field
* @param values
* the list of values
* @param output
* the output stream
* @return The next field count
* @throws IOException
* for issues with read/write
*/
private void dumpValues(String field, Collection<String> values, DataOutputStream output) throws IOException {
String separator = "f-" + field + ":";
private int dumpValues(int count, String field, List<String> values, DataOutputStream output) throws IOException {
if (!values.isEmpty()) {
List<String> valueList = new ArrayList<>(values);
// always output values in sorted order.
Collections.sort(valueList);
for (String value : valueList) {
Collections.sort(values);
String separator = "f-" + field + '/' + (count++) + ":";
for (String value : values) {
output.writeUTF(separator);
output.writeUTF(value);
separator = ",";
}
} else {
// dump at least a header for empty value sets to ensure we have some bytes to check against
// in the bloom filter.
output.writeUTF(separator);
values.clear();
}
return count;
}

/**
Expand All @@ -290,16 +334,16 @@ private void dumpValues(String field, Collection<String> values, DataOutputStrea
* @param attribute
* The attribute
* @param values
* The map of values to be updated
* The list of values to be updated
*/
private void addValues(final String field, Attribute<?> attribute, Multimap<String,String> values) {
private void addValues(final String field, Attribute<?> attribute, List<String> values) {
if (attribute instanceof Attributes) {
// @formatter:off
((Attributes) attribute).getAttributes().stream()
.forEach(a -> addValues(field, a, values));
// @formatter:on
} else {
values.put(field, uniqueFields.transformValue(field, String.valueOf(attribute.getData())));
values.add(uniqueFields.transformValue(field, String.valueOf(attribute.getData())));
}
}

Expand Down Expand Up @@ -332,7 +376,8 @@ private String getFieldWithoutGrouping(String field) {
}

/**
* Return whether or not the provided document field is considered a case-insensitive match for the provided field
* Return whether or not the provided document field is considered a case-insensitive match for the provided field, applying reverse model mappings if
* configured.
*
* @param baseField
* The base field
Expand All @@ -341,7 +386,9 @@ private String getFieldWithoutGrouping(String field) {
* @return true if matching
*/
private boolean isMatchingField(String baseField, String field) {
return baseField.equalsIgnoreCase(field);
baseField = baseField.toUpperCase();
field = field.toUpperCase();
return field.equals(baseField) || (modelMapping != null && modelMapping.get(field).contains(baseField));
}

/**
Expand Down Expand Up @@ -526,6 +573,10 @@ public Builder withQueryExecutionForPageTimeout(long timeout) {
public UniqueTransform build() throws IOException {
UniqueTransform transform = new UniqueTransform(uniqueFields, queryExecutionForPageTimeout);

if (model != null) {
transform.setModelMappings(model);
}

if (transform.uniqueFields.isMostRecent()) {
// @formatter:off
// noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
Expand All @@ -26,7 +25,6 @@
import java.util.stream.StreamSupport;

import org.apache.accumulo.core.data.Key;
import org.apache.commons.collections.keyvalue.UnmodifiableMapEntry;
import org.apache.commons.collections4.Transformer;
import org.apache.commons.collections4.iterators.TransformIterator;
import org.apache.commons.lang.RandomStringUtils;
Expand All @@ -37,7 +35,6 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
import com.google.common.primitives.Longs;

Expand Down Expand Up @@ -484,35 +481,6 @@ public void testUniquenessWithTwoGroupsAndPartialGroups() {
assertOrderedFieldValues();
}

@Test
public void testFinalDocIgnored() {
SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
fieldMap.put("FIELD", UniqueGranularity.ALL);
UniqueFields fields = new UniqueFields(fieldMap);
UniqueTransform transform = new UniqueTransform(fields, 10000000L);
Key key = new Key("shard", "dt\u0000uid", FinalDocumentTrackingIterator.MARKER_TEXT.toString());
Document doc = new Document();
Map.Entry<Key,Document> entry = new UnmodifiableMapEntry(key, doc);
for (int i = 0; i < 10; i++) {
assertTrue(entry == transform.apply(entry));
}
}

@Test
public void testIntermediateIgnored() {
SortedSetMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
fieldMap.put("FIELD", UniqueGranularity.ALL);
UniqueFields fields = new UniqueFields(fieldMap);
UniqueTransform transform = new UniqueTransform(fields, 10000000L);
Key key = new Key("shard", "dt\u0000uid");
Document doc = new Document();
doc.setIntermediateResult(true);
Map.Entry<Key,Document> entry = new UnmodifiableMapEntry(key, doc);
for (int i = 0; i < 10; i++) {
assertTrue(entry == transform.apply(entry));
}
}

protected void assertUniqueDocuments() {
List<Document> actual = getUniqueDocumentsWithUpdateConfigCalls(inputDocuments);
Collections.sort(expectedUniqueDocuments);
Expand Down Expand Up @@ -574,7 +542,7 @@ protected UniqueTransform getUniqueTransform() {
}

protected void updateUniqueTransform(UniqueTransform uniqueTransform) {
uniqueTransform.updateConfig(uniqueFields);
uniqueTransform.updateConfig(uniqueFields, null);
}

protected InputDocumentBuilder givenInputDocument() {
Expand Down Expand Up @@ -668,16 +636,13 @@ public byte[] build() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(bytes);
int count = 0;
for (String field : fieldValues.keySet()) {
String separator = "f-" + field + ":";
if (fieldValues.isEmpty()) {
String separator = "f-" + field + '/' + (count++) + ":";
for (String value : fieldValues.get(field)) {
output.writeUTF(separator);
} else {
for (String value : fieldValues.get(field)) {
output.writeUTF(separator);
output.writeUTF(value);
separator = ",";
}
output.writeUTF(value);
separator = ",";
}
}
output.flush();
Expand Down

0 comments on commit b0a8044

Please sign in to comment.