Skip to content

Commit

Permalink
Merge branch 'integration' into task/remove-query-data-query-plan-dep…
Browse files Browse the repository at this point in the history
…recations
  • Loading branch information
apmoriarty committed Jun 27, 2024
2 parents 9f78b39 + c2f5550 commit 6a72a47
Show file tree
Hide file tree
Showing 16 changed files with 582 additions and 562 deletions.
2 changes: 0 additions & 2 deletions properties/default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,6 @@ hierarchy.field.options=

# BaseEventQuery (beq) thresholds
beq.baseIteratorPriority=100
beq.eventPerDayThreshold=40000
beq.shardsPerDayThreshold=20
# max number of terms BEFORE all expansions (calculated based on how much the initial parser can handle before hitting a stack overflow: between 3500 and 3750)
beq.initialMaxTermThreshold=2000
# max number of terms AFTER all expansions (calculated based on how much the initial parser can handle before hitting a stack overflow: between 3500 and 3750)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ public String getQueryIteratorClass() {
return queryIteratorClass;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getMaxShardsPerDayThreshold() {
return maxShardsPerDayThreshold;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setMaxShardsPerDayThreshold(int maxShardsPerDayThreshold) {
this.maxShardsPerDayThreshold = maxShardsPerDayThreshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,18 +1291,22 @@ public void setUnevaluatedFields(Collection<String> unevaluatedFields) {
}
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getEventPerDayThreshold() {
return eventPerDayThreshold;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setEventPerDayThreshold(int eventPerDayThreshold) {
this.eventPerDayThreshold = eventPerDayThreshold;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getShardsPerDayThreshold() {
return shardsPerDayThreshold;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setShardsPerDayThreshold(int shardsPerDayThreshold) {
this.shardsPerDayThreshold = shardsPerDayThreshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,7 @@ public ScannerStream visit(ASTEQNode node, Object data) {

if (limitScanners) {
// Setup the CreateUidsIterator
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery(),
config.getShardsPerDayThreshold());
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery());

uidSetting = new IteratorSetting(stackStart++, createUidsIteratorClass);
uidSetting.addOption(CreateUidsIterator.COLLAPSE_UIDS, Boolean.toString(collapseUids));
Expand All @@ -581,8 +580,7 @@ public ScannerStream visit(ASTEQNode node, Object data) {

} else {
// Setup so this is a pass-through
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery(),
config.getShardsPerDayThreshold());
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery());

uidSetting = new IteratorSetting(stackStart++, createUidsIteratorClass);
uidSetting.addOption(CreateUidsIterator.COLLAPSE_UIDS, Boolean.toString(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
Expand All @@ -20,8 +19,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.annotation.Nullable;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
Expand All @@ -33,11 +30,9 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;

import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Queues;
Expand All @@ -55,28 +50,26 @@
/**
* Purpose: Extends Scanner session so that we can modify how we build our subsequent ranges. Breaking this out cleans up the code. May require implementation
* specific details if you are using custom iterators, as we are reinitializing a seek
*
* <p>
* Design: Extends Scanner session and only overrides the buildNextRange.
*
* <p>
* The {@link datawave.query.index.lookup.RangeStream} configures the iterator running against the global index.
*
* <p>
* Typically the iterator is a {@link datawave.query.index.lookup.CreateUidsIterator} or variant.
*
* <p>
* The iterator returns a tuple of shard - {@link IndexInfo} object pairs, each shard representing and day or shard range.
*
* <p>
* Results from the iterator are put onto the currentQueue and then flushed into the resultQueue. Under certain circumstances these results may be modified
* final to the prior flush into the resultQueue.
*
* <p>
* The RangeStreamScanner supports "seeking" the global index iterator. Because the RangeStreamScanner supports a {@link PeekingIterator} some implementation
* details are not immediately obvious. For more information, see {@link #seek(String)}.
*/
public class RangeStreamScanner extends ScannerSession implements Callable<RangeStreamScanner> {

private static final int MAX_MEDIAN = 20;
private static final Logger log = Logger.getLogger(RangeStreamScanner.class);
private int shardsPerDayThreshold = Integer.MAX_VALUE;
// simply compare the strings. no need for a date formatter
protected static final int dateCfLength = 8;
protected static final int DATE_CF_LENGTH = 8;
protected boolean seenUnexpectedKey = false;
protected ArrayDeque<Result> currentQueue;

Expand Down Expand Up @@ -144,13 +137,13 @@ public RangeStreamScanner setScannerFactory(ScannerFactory factory) {

/**
* Override this for your specific implementation.
*
* <p>
* In this specific implementation our row key will be the term, the column family will be the field name, and the column family will be the shard,so we
* should have the following as our last key
*
* <p>
* bar FOO:20130101_0
*
* so we should append a null so that we we don't skip shards. similarly, an assumption is made of the key structure within this class.
* <p>
* so we should append a null so that we don't skip shards. similarly, an assumption is made of the key structure within this class.
*
* @param lastKey
* the last key
Expand Down Expand Up @@ -435,9 +428,6 @@ protected int scannerInvariant(final Iterator<Result> iter) {
PeekingIterator<Result> kvIter = new PeekingIterator<>(iter);

int retrievalCount = 0;

Result myEntry;

String currentDay = null;

if (null != prevDay) {
Expand All @@ -452,13 +442,11 @@ protected int scannerInvariant(final Iterator<Result> iter) {
return 0;
}
}
// produces stats for us, so we don't have to!
DescriptiveStatistics stats = new DescriptiveStatistics();

writeLock.lock();
try {
while (kvIter.hasNext()) {
Result currentKeyValue = kvIter.peek();
Result<?> currentKeyValue = kvIter.peek();

// become a pass-through if we've seen an unexpected key.
if (seenUnexpectedKey) {
Expand All @@ -475,7 +463,6 @@ protected int scannerInvariant(final Iterator<Result> iter) {
currentDay = getDay(currentKeyValue.getKey());

currentQueue.add(trimTrailingUnderscore(currentKeyValue));

lastSeenKey = kvIter.next().getKey();
} else {
String nextKeysDay = getDay(currentKeyValue.getKey());
Expand All @@ -490,22 +477,7 @@ protected int scannerInvariant(final Iterator<Result> iter) {
log.trace("adding count of " + info.count());
}

stats.addValue(info.count());

if (currentQueue.size() <= shardsPerDayThreshold || stats.getPercentile(50) < MAX_MEDIAN) {

if (log.isTraceEnabled()) {
log.trace("adding our stats are " + stats.getPercentile(50) + " on " + currentQueue.size());
}

currentQueue.add(trimTrailingUnderscore(currentKeyValue));

} else {
if (log.isTraceEnabled()) {
log.trace("breaking because our stats are " + stats.getPercentile(50) + " on " + currentQueue.size());
}
break;
}
currentQueue.add(trimTrailingUnderscore(currentKeyValue));
lastSeenKey = kvIter.next().getKey();
} else {

Expand All @@ -522,36 +494,8 @@ protected int scannerInvariant(final Iterator<Result> iter) {
}
}

if (currentQueue.size() >= shardsPerDayThreshold && stats.getPercentile(50) > MAX_MEDIAN) {

Result top = currentQueue.poll();

Key topKey = top.getKey();
if (log.isTraceEnabled())
log.trace(topKey + " for " + currentDay + " exceeds limit of " + shardsPerDayThreshold + " with " + currentQueue.size());
Key newKey = new Key(topKey.getRow(), topKey.getColumnFamily(), new Text(currentDay), topKey.getColumnVisibility(), topKey.getTimestamp());
retrievalCount += dequeue();

Value newValue = writeInfoToValue();

myEntry = new Result(top.getContext(), newKey, newValue);
lastSeenKey = newKey;

try {
if (!resultQueue.offer(myEntry, 1, TimeUnit.SECONDS)) {
if (log.isTraceEnabled()) {
log.trace("could not add day! converting " + myEntry + " to " + prevDay);
}
prevDay = myEntry;
}
} catch (InterruptedException exception) {
prevDay = myEntry;
}

currentQueue.clear();

} else {
retrievalCount += dequeue();
}
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -673,8 +617,8 @@ protected boolean flushNeeded() {
protected String getDay(final Key key) {
String myDay = null;
byte[] cq = key.getColumnQualifierData().getBackingArray();
if (cq.length >= dateCfLength) {
myDay = new String(cq, 0, dateCfLength);
if (cq.length >= DATE_CF_LENGTH) {
myDay = new String(cq, 0, DATE_CF_LENGTH);
if (log.isTraceEnabled()) {
log.trace("Day is " + myDay + " for " + key);
}
Expand All @@ -700,11 +644,6 @@ public static String shardFromKey(final Key key) {
}
}

public RangeStreamScanner setShardsPerDayThreshold(int shardsPerDayThreshold) {
this.shardsPerDayThreshold = shardsPerDayThreshold;
return this;
}

@Override
public RangeStreamScanner call() throws Exception {
findTop();
Expand Down Expand Up @@ -795,14 +734,11 @@ else if (baseScanner instanceof RfileScanner)
if (baseScanner instanceof Scanner)
((Scanner) baseScanner).setRange(currentRange);

Iterator<Result> iter = Iterators.transform(baseScanner.iterator(), new Function<Entry<Key,Value>,Result>() {
@Override
public Result apply(@Nullable Entry<Key,Value> input) {
if (input == null) {
return null;
}
return new Result(input.getKey(), input.getValue());
Iterator<Result> iter = Iterators.transform(baseScanner.iterator(), input -> {
if (input == null) {
return null;
}
return new Result<>(input.getKey(), input.getValue());
});
// do not continue if we've reached the end of the corpus

Expand Down Expand Up @@ -867,8 +803,8 @@ private boolean isBeyondRange(Key lastSeenKey, Key endKey) {
log.trace(cf + " " + endCf);
}

if (dateCfLength == cf.length()) {
endCf = endCf.substring(0, dateCfLength);
if (DATE_CF_LENGTH == cf.length()) {
endCf = endCf.substring(0, DATE_CF_LENGTH);
if (cf.compareTo(endCf) >= 0) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public RangeStreamScanner newRangeScanner(final String tableName, final Set<Auth
}

public RangeStreamScanner newRangeScanner(String tableName, Set<Authorizations> auths, Query query, int shardsPerDayThreshold) throws Exception {
return newLimitedScanner(RangeStreamScanner.class, tableName, auths, settings).setShardsPerDayThreshold(shardsPerDayThreshold).setScannerFactory(this);
return newLimitedScanner(RangeStreamScanner.class, tableName, auths, settings).setScannerFactory(this);
}

public boolean close(ScannerBase bs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1601,18 +1601,22 @@ public void setHitList(boolean hitList) {
getConfig().setHitList(hitList);
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getEventPerDayThreshold() {
return getConfig().getEventPerDayThreshold();
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setEventPerDayThreshold(int eventPerDayThreshold) {
getConfig().setEventPerDayThreshold(eventPerDayThreshold);
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getShardsPerDayThreshold() {
return getConfig().getShardsPerDayThreshold();
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setShardsPerDayThreshold(int shardsPerDayThreshold) {
getConfig().setShardsPerDayThreshold(shardsPerDayThreshold);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ public void testEventPerDay() throws Exception {
String dtFilter = CityEntry.generic.getDataType();
qOptions.put(QueryParameters.DATATYPE_FILTER_SET, dtFilter);

this.logic.setEventPerDayThreshold(1);

String query = CityField.STATE.name() + EQ_OP + "'missouri'";
String expect = "(" + query + ")" + AND_OP + BaseRawData.EVENT_DATATYPE + EQ_OP + "'" + CityEntry.generic.getDataType() + "'";
runTest(query, expect, qOptions);
Expand All @@ -146,9 +144,6 @@ public void testEventAndShardsPerDay() throws Exception {
String dtFilter = CityEntry.generic.getDataType();
qOptions.put(QueryParameters.DATATYPE_FILTER_SET, dtFilter);

this.logic.setEventPerDayThreshold(1);
this.logic.setShardsPerDayThreshold(1);

String query = CityField.STATE.name() + EQ_OP + "'missouri'";
String expect = "(" + query + ")" + AND_OP + BaseRawData.EVENT_DATATYPE + EQ_OP + "'" + CityEntry.generic.getDataType() + "'";
runTest(query, expect, qOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ public void testFieldOpField() throws Exception {
@Test
public void testEventThreshold() throws Exception {
log.info("------ testEventThreshold ------");
// setting event per day does not alter results
this.logic.setEventPerDayThreshold(1);
String phrase = RE_OP + "'.*a'";
String query = Constants.ANY_FIELD + phrase;
String expect = this.dataManager.convertAnyField(phrase);
Expand All @@ -138,8 +136,6 @@ public void testEventThreshold() throws Exception {
@Test(expected = InvalidQueryException.class)
public void testFieldIgnoreParam1() throws Exception {
log.info("------ testFieldIgnoreParam1 ------");
// setting event per day does not alter results
this.logic.setEventPerDayThreshold(1);
String phrase = RE_OP + "'.*a'" + "&& FOO == bar2";
String query = Constants.ANY_FIELD + phrase + "&& FOO == bar2";
String expect = this.dataManager.convertAnyField(phrase);
Expand All @@ -155,8 +151,6 @@ public void testFieldIgnoreParam1() throws Exception {
@Test
public void testFieldIgnoreParam2() throws Exception {
log.info("------ testFieldIgnoreParam2 ------");
// setting event per day does not alter results
this.logic.setEventPerDayThreshold(1);
String phrase = RE_OP + "'.*a'" + "&& FOO == bar2";
String query = Constants.ANY_FIELD + phrase + "&& FOO == bar2";
String expect = this.dataManager.convertAnyField(phrase);
Expand All @@ -172,8 +166,6 @@ public void testFieldIgnoreParam2() throws Exception {
@Test
public void testFieldIgnoreParam3() throws Exception {
log.info("------ testFieldIgnoreParam3 ------");
// setting event per day does not alter results
this.logic.setEventPerDayThreshold(1);
String phrase = RE_OP + "'.*a' && STATE == 'sta'";
String query = Constants.ANY_FIELD + phrase + "&& STATE == 'sta'";
String expect = this.dataManager.convertAnyField(phrase);
Expand All @@ -188,8 +180,6 @@ public void testFieldIgnoreParam3() throws Exception {
@Test
public void testShardThreshold() throws Exception {
log.info("------ testShardThreshold ------");
// setting shards per day does not alter results -
this.logic.setShardsPerDayThreshold(1);
String phrase = RE_OP + "'.*a'";
String query = Constants.ANY_FIELD + phrase;
String expect = this.dataManager.convertAnyField(phrase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public void setupTest() throws ParseException {
config.setDatatypeFilter(Collections.singleton("datatype"));
config.setQueryFieldsDatatypes(fieldToDataType);
config.setIndexedFields(fieldToDataType);
config.setShardsPerDayThreshold(2);
}

@AfterClass
Expand Down
Loading

0 comments on commit 6a72a47

Please sign in to comment.