Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove shardPerDay and eventPerDay thresholds #2447

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions properties/default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,6 @@ hierarchy.field.options=

# BaseEventQuery (beq) thresholds
beq.baseIteratorPriority=100
beq.eventPerDayThreshold=40000
beq.shardsPerDayThreshold=20
# max number of terms BEFORE all expansions (calculated based on how much the initial parser can handle before hitting a stack overflow: between 3500 and 3750)
beq.initialMaxTermThreshold=2000
# max number of terms AFTER all expansions (calculated based on how much the initial parser can handle before hitting a stack overflow: between 3500 and 3750)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ public String getQueryIteratorClass() {
return queryIteratorClass;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getMaxShardsPerDayThreshold() {
return maxShardsPerDayThreshold;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setMaxShardsPerDayThreshold(int maxShardsPerDayThreshold) {
this.maxShardsPerDayThreshold = maxShardsPerDayThreshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,18 +1291,22 @@ public void setUnevaluatedFields(Collection<String> unevaluatedFields) {
}
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getEventPerDayThreshold() {
return eventPerDayThreshold;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setEventPerDayThreshold(int eventPerDayThreshold) {
this.eventPerDayThreshold = eventPerDayThreshold;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getShardsPerDayThreshold() {
return shardsPerDayThreshold;
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setShardsPerDayThreshold(int shardsPerDayThreshold) {
this.shardsPerDayThreshold = shardsPerDayThreshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,7 @@ public ScannerStream visit(ASTEQNode node, Object data) {

if (limitScanners) {
// Setup the CreateUidsIterator
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery(),
config.getShardsPerDayThreshold());
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery());

uidSetting = new IteratorSetting(stackStart++, createUidsIteratorClass);
uidSetting.addOption(CreateUidsIterator.COLLAPSE_UIDS, Boolean.toString(collapseUids));
Expand All @@ -581,8 +580,7 @@ public ScannerStream visit(ASTEQNode node, Object data) {

} else {
// Setup so this is a pass-through
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery(),
config.getShardsPerDayThreshold());
scannerSession = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery());

uidSetting = new IteratorSetting(stackStart++, createUidsIteratorClass);
uidSetting.addOption(CreateUidsIterator.COLLAPSE_UIDS, Boolean.toString(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
Expand All @@ -20,8 +19,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.annotation.Nullable;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
Expand All @@ -33,11 +30,9 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;

import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Queues;
Expand All @@ -55,28 +50,26 @@
/**
* Purpose: Extends Scanner session so that we can modify how we build our subsequent ranges. Breaking this out cleans up the code. May require implementation
* specific details if you are using custom iterators, as we are reinitializing a seek
*
* <p>
* Design: Extends Scanner session and only overrides the buildNextRange.
*
* <p>
* The {@link datawave.query.index.lookup.RangeStream} configures the iterator running against the global index.
*
* <p>
* Typically the iterator is a {@link datawave.query.index.lookup.CreateUidsIterator} or variant.
*
* <p>
* The iterator returns a tuple of shard - {@link IndexInfo} object pairs, each shard representing and day or shard range.
*
* <p>
* Results from the iterator are put onto the currentQueue and then flushed into the resultQueue. Under certain circumstances these results may be modified
* final to the prior flush into the resultQueue.
*
* <p>
* The RangeStreamScanner supports "seeking" the global index iterator. Because the RangeStreamScanner supports a {@link PeekingIterator} some implementation
* details are not immediately obvious. For more information, see {@link #seek(String)}.
*/
public class RangeStreamScanner extends ScannerSession implements Callable<RangeStreamScanner> {

private static final int MAX_MEDIAN = 20;
private static final Logger log = Logger.getLogger(RangeStreamScanner.class);
private int shardsPerDayThreshold = Integer.MAX_VALUE;
// simply compare the strings. no need for a date formatter
protected static final int dateCfLength = 8;
protected static final int DATE_CF_LENGTH = 8;
protected boolean seenUnexpectedKey = false;
protected ArrayDeque<Result> currentQueue;

Expand Down Expand Up @@ -144,13 +137,13 @@ public RangeStreamScanner setScannerFactory(ScannerFactory factory) {

/**
* Override this for your specific implementation.
*
* <p>
* In this specific implementation our row key will be the term, the column family will be the field name, and the column family will be the shard,so we
* should have the following as our last key
*
* <p>
* bar FOO:20130101_0
*
* so we should append a null so that we we don't skip shards. similarly, an assumption is made of the key structure within this class.
* <p>
* so we should append a null so that we don't skip shards. similarly, an assumption is made of the key structure within this class.
*
* @param lastKey
* the last key
Expand Down Expand Up @@ -435,9 +428,6 @@ protected int scannerInvariant(final Iterator<Result> iter) {
PeekingIterator<Result> kvIter = new PeekingIterator<>(iter);

int retrievalCount = 0;

Result myEntry;

String currentDay = null;

if (null != prevDay) {
Expand All @@ -452,13 +442,11 @@ protected int scannerInvariant(final Iterator<Result> iter) {
return 0;
}
}
// produces stats for us, so we don't have to!
DescriptiveStatistics stats = new DescriptiveStatistics();

writeLock.lock();
try {
while (kvIter.hasNext()) {
Result currentKeyValue = kvIter.peek();
Result<?> currentKeyValue = kvIter.peek();

// become a pass-through if we've seen an unexpected key.
if (seenUnexpectedKey) {
Expand All @@ -475,7 +463,6 @@ protected int scannerInvariant(final Iterator<Result> iter) {
currentDay = getDay(currentKeyValue.getKey());

currentQueue.add(trimTrailingUnderscore(currentKeyValue));

lastSeenKey = kvIter.next().getKey();
} else {
String nextKeysDay = getDay(currentKeyValue.getKey());
Expand All @@ -490,22 +477,7 @@ protected int scannerInvariant(final Iterator<Result> iter) {
log.trace("adding count of " + info.count());
}

stats.addValue(info.count());

if (currentQueue.size() <= shardsPerDayThreshold || stats.getPercentile(50) < MAX_MEDIAN) {

if (log.isTraceEnabled()) {
log.trace("adding our stats are " + stats.getPercentile(50) + " on " + currentQueue.size());
}

currentQueue.add(trimTrailingUnderscore(currentKeyValue));

} else {
if (log.isTraceEnabled()) {
log.trace("breaking because our stats are " + stats.getPercentile(50) + " on " + currentQueue.size());
}
break;
}
currentQueue.add(trimTrailingUnderscore(currentKeyValue));
lastSeenKey = kvIter.next().getKey();
} else {

Expand All @@ -522,36 +494,8 @@ protected int scannerInvariant(final Iterator<Result> iter) {
}
}

if (currentQueue.size() >= shardsPerDayThreshold && stats.getPercentile(50) > MAX_MEDIAN) {

Result top = currentQueue.poll();

Key topKey = top.getKey();
if (log.isTraceEnabled())
log.trace(topKey + " for " + currentDay + " exceeds limit of " + shardsPerDayThreshold + " with " + currentQueue.size());
Key newKey = new Key(topKey.getRow(), topKey.getColumnFamily(), new Text(currentDay), topKey.getColumnVisibility(), topKey.getTimestamp());
retrievalCount += dequeue();

Value newValue = writeInfoToValue();

myEntry = new Result(top.getContext(), newKey, newValue);
lastSeenKey = newKey;

try {
if (!resultQueue.offer(myEntry, 1, TimeUnit.SECONDS)) {
if (log.isTraceEnabled()) {
log.trace("could not add day! converting " + myEntry + " to " + prevDay);
}
prevDay = myEntry;
}
} catch (InterruptedException exception) {
prevDay = myEntry;
}

currentQueue.clear();

} else {
retrievalCount += dequeue();
}
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -673,8 +617,8 @@ protected boolean flushNeeded() {
protected String getDay(final Key key) {
String myDay = null;
byte[] cq = key.getColumnQualifierData().getBackingArray();
if (cq.length >= dateCfLength) {
myDay = new String(cq, 0, dateCfLength);
if (cq.length >= DATE_CF_LENGTH) {
myDay = new String(cq, 0, DATE_CF_LENGTH);
if (log.isTraceEnabled()) {
log.trace("Day is " + myDay + " for " + key);
}
Expand All @@ -700,11 +644,6 @@ public static String shardFromKey(final Key key) {
}
}

public RangeStreamScanner setShardsPerDayThreshold(int shardsPerDayThreshold) {
this.shardsPerDayThreshold = shardsPerDayThreshold;
return this;
}

@Override
public RangeStreamScanner call() throws Exception {
findTop();
Expand Down Expand Up @@ -795,14 +734,11 @@ else if (baseScanner instanceof RfileScanner)
if (baseScanner instanceof Scanner)
((Scanner) baseScanner).setRange(currentRange);

Iterator<Result> iter = Iterators.transform(baseScanner.iterator(), new Function<Entry<Key,Value>,Result>() {
@Override
public Result apply(@Nullable Entry<Key,Value> input) {
if (input == null) {
return null;
}
return new Result(input.getKey(), input.getValue());
Iterator<Result> iter = Iterators.transform(baseScanner.iterator(), input -> {
if (input == null) {
return null;
}
return new Result<>(input.getKey(), input.getValue());
});
// do not continue if we've reached the end of the corpus

Expand Down Expand Up @@ -867,8 +803,8 @@ private boolean isBeyondRange(Key lastSeenKey, Key endKey) {
log.trace(cf + " " + endCf);
}

if (dateCfLength == cf.length()) {
endCf = endCf.substring(0, dateCfLength);
if (DATE_CF_LENGTH == cf.length()) {
endCf = endCf.substring(0, DATE_CF_LENGTH);
if (cf.compareTo(endCf) >= 0) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public RangeStreamScanner newRangeScanner(final String tableName, final Set<Auth
}

public RangeStreamScanner newRangeScanner(String tableName, Set<Authorizations> auths, Query query, int shardsPerDayThreshold) throws Exception {
return newLimitedScanner(RangeStreamScanner.class, tableName, auths, settings).setShardsPerDayThreshold(shardsPerDayThreshold).setScannerFactory(this);
return newLimitedScanner(RangeStreamScanner.class, tableName, auths, settings).setScannerFactory(this);
}

public boolean close(ScannerBase bs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1601,18 +1601,22 @@ public void setHitList(boolean hitList) {
getConfig().setHitList(hitList);
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getEventPerDayThreshold() {
return getConfig().getEventPerDayThreshold();
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setEventPerDayThreshold(int eventPerDayThreshold) {
getConfig().setEventPerDayThreshold(eventPerDayThreshold);
}

@Deprecated(since = "7.1.0", forRemoval = true)
public int getShardsPerDayThreshold() {
return getConfig().getShardsPerDayThreshold();
}

@Deprecated(since = "7.1.0", forRemoval = true)
public void setShardsPerDayThreshold(int shardsPerDayThreshold) {
getConfig().setShardsPerDayThreshold(shardsPerDayThreshold);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ public void testEventPerDay() throws Exception {
String dtFilter = CityEntry.generic.getDataType();
qOptions.put(QueryParameters.DATATYPE_FILTER_SET, dtFilter);

this.logic.setEventPerDayThreshold(1);

String query = CityField.STATE.name() + EQ_OP + "'missouri'";
String expect = "(" + query + ")" + AND_OP + BaseRawData.EVENT_DATATYPE + EQ_OP + "'" + CityEntry.generic.getDataType() + "'";
runTest(query, expect, qOptions);
Expand All @@ -146,9 +144,6 @@ public void testEventAndShardsPerDay() throws Exception {
String dtFilter = CityEntry.generic.getDataType();
qOptions.put(QueryParameters.DATATYPE_FILTER_SET, dtFilter);

this.logic.setEventPerDayThreshold(1);
this.logic.setShardsPerDayThreshold(1);

String query = CityField.STATE.name() + EQ_OP + "'missouri'";
String expect = "(" + query + ")" + AND_OP + BaseRawData.EVENT_DATATYPE + EQ_OP + "'" + CityEntry.generic.getDataType() + "'";
runTest(query, expect, qOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ public void testFieldOpField() throws Exception {
@Test
public void testEventThreshold() throws Exception {
log.info("------ testEventThreshold ------");
// setting event per day does not alter results
this.logic.setEventPerDayThreshold(1);
String phrase = RE_OP + "'.*a'";
String query = Constants.ANY_FIELD + phrase;
String expect = this.dataManager.convertAnyField(phrase);
Expand All @@ -138,8 +136,6 @@ public void testEventThreshold() throws Exception {
@Test(expected = InvalidQueryException.class)
public void testFieldIgnoreParam1() throws Exception {
log.info("------ testFieldIgnoreParam1 ------");
// setting event per day does not alter results
this.logic.setEventPerDayThreshold(1);
String phrase = RE_OP + "'.*a'" + "&& FOO == bar2";
String query = Constants.ANY_FIELD + phrase + "&& FOO == bar2";
String expect = this.dataManager.convertAnyField(phrase);
Expand All @@ -155,8 +151,6 @@ public void testFieldIgnoreParam1() throws Exception {
@Test
public void testFieldIgnoreParam2() throws Exception {
log.info("------ testFieldIgnoreParam2 ------");
// setting event per day does not alter results
this.logic.setEventPerDayThreshold(1);
String phrase = RE_OP + "'.*a'" + "&& FOO == bar2";
String query = Constants.ANY_FIELD + phrase + "&& FOO == bar2";
String expect = this.dataManager.convertAnyField(phrase);
Expand All @@ -172,8 +166,6 @@ public void testFieldIgnoreParam2() throws Exception {
@Test
public void testFieldIgnoreParam3() throws Exception {
log.info("------ testFieldIgnoreParam3 ------");
// setting event per day does not alter results
this.logic.setEventPerDayThreshold(1);
String phrase = RE_OP + "'.*a' && STATE == 'sta'";
String query = Constants.ANY_FIELD + phrase + "&& STATE == 'sta'";
String expect = this.dataManager.convertAnyField(phrase);
Expand All @@ -188,8 +180,6 @@ public void testFieldIgnoreParam3() throws Exception {
@Test
public void testShardThreshold() throws Exception {
log.info("------ testShardThreshold ------");
// setting shards per day does not alter results -
this.logic.setShardsPerDayThreshold(1);
String phrase = RE_OP + "'.*a'";
String query = Constants.ANY_FIELD + phrase;
String expect = this.dataManager.convertAnyField(phrase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public void setupTest() throws ParseException {
config.setDatatypeFilter(Collections.singleton("datatype"));
config.setQueryFieldsDatatypes(fieldToDataType);
config.setIndexedFields(fieldToDataType);
config.setShardsPerDayThreshold(2);
}

@AfterClass
Expand Down
Loading
Loading