From 0f342201b57bb3670f59fef87bad807608ab1c74 Mon Sep 17 00:00:00 2001 From: Moriarty <22225248+apmoriarty@users.noreply.github.com> Date: Wed, 12 Jun 2024 10:24:19 -0400 Subject: [PATCH] Add support for setting scan consistency level on a query logic (#2386) * Add support for setting scan consistency level on a query logic Additional updates Update a few instances that still relied on old constructors Only update scanner factory hints and consistency if set on logic config * merge request feedback * clarify names are per-table --- .../GenericQueryConfiguration.java | 25 ++ .../core/query/logic/BaseQueryLogic.java | 16 ++ .../query/tables/BatchScannerSession.java | 4 + .../datawave/query/tables/ScannerFactory.java | 163 +++++++++--- .../query/tables/ShardIndexQueryTable.java | 9 +- .../query/tables/ShardQueryLogic.java | 8 + .../tables/content/ContentQueryLogic.java | 5 +- .../query/tables/edge/EdgeQueryLogic.java | 3 +- .../shard/FieldIndexCountQueryLogic.java | 3 +- .../ssdeep/SSDeepSimilarityQueryLogic.java | 3 +- .../query/ScanConsistencyQueryTest.java | 121 +++++++++ .../config/ShardQueryConfigurationTest.java | 5 + .../index/lookup/RangeStreamQueryTest.java | 3 +- .../query/index/lookup/RangeStreamTest.java | 73 +++--- .../query/index/lookup/RangeStreamTestX.java | 6 +- .../UnfieldedIndexExpansionVisitorTest.java | 8 +- .../query/tables/RangeStreamScannerTest.java | 7 +- .../query/tables/ScannerFactoryTest.java | 232 ++++++++++++++++++ .../AbstractFunctionalQuery.java | 2 +- .../configuration/TestBaseQueryLogic.java | 2 + 20 files changed, 614 insertions(+), 84 deletions(-) create mode 100644 warehouse/query-core/src/test/java/datawave/query/ScanConsistencyQueryTest.java create mode 100644 warehouse/query-core/src/test/java/datawave/query/tables/ScannerFactoryTest.java diff --git a/core/query/src/main/java/datawave/core/query/configuration/GenericQueryConfiguration.java b/core/query/src/main/java/datawave/core/query/configuration/GenericQueryConfiguration.java index a8dd9cfbd6e..594716baa79 100644 --- a/core/query/src/main/java/datawave/core/query/configuration/GenericQueryConfiguration.java +++ b/core/query/src/main/java/datawave/core/query/configuration/GenericQueryConfiguration.java @@ -5,13 +5,16 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.security.Authorizations; import com.google.common.collect.Iterators; @@ -70,6 +73,10 @@ public class GenericQueryConfiguration implements Serializable { // Whether or not this query emits every result or performs some kind of result reduction protected boolean reduceResults = false; + // either IMMEDIATE or EVENTUAL + private Map tableConsistencyLevels = new HashMap<>(); + private Map> tableHints = new HashMap<>(); + /** * Empty default constructor */ @@ -103,6 +110,8 @@ public GenericQueryConfiguration(GenericQueryConfiguration genericConfig) { this.setQueryString(genericConfig.getQueryString()); this.setTableName(genericConfig.getTableName()); this.setReduceResults(genericConfig.isReduceResults()); + this.setTableConsistencyLevels(genericConfig.getTableConsistencyLevels()); + this.setTableHints(genericConfig.getTableHints()); } public Collection getQueries() { @@ -269,6 +278,22 @@ public void setAccumuloPassword(String password) { this.accumuloPassword = EnvProvider.resolve(password); } + public Map getTableConsistencyLevels() { + return tableConsistencyLevels; + } + + public void setTableConsistencyLevels(Map tableConsistencyLevels) { + this.tableConsistencyLevels = tableConsistencyLevels; + } + + public Map> getTableHints() { + return tableHints; + } + + public void setTableHints(Map> tableHints) { + this.tableHints = tableHints; + } + /** * Checks for non-null, sane values for the configured values * diff --git a/core/query/src/main/java/datawave/core/query/logic/BaseQueryLogic.java b/core/query/src/main/java/datawave/core/query/logic/BaseQueryLogic.java index 652d02536bb..f90c7d44701 100644 --- a/core/query/src/main/java/datawave/core/query/logic/BaseQueryLogic.java +++ b/core/query/src/main/java/datawave/core/query/logic/BaseQueryLogic.java @@ -445,4 +445,20 @@ public void setClientConfig(AccumuloClientConfiguration clientConfig) { public AccumuloClientConfiguration getClientConfig() { return clientConfig; } + + public Map getTableConsistencyLevels() { + return getConfig().getTableConsistencyLevels(); + } + + public void setTableConsistencyLevels(Map consistencyLevels) { + getConfig().setTableConsistencyLevels(consistencyLevels); + } + + public Map> getTableHints() { + return getConfig().getTableHints(); + } + + public void setTableHints(Map> hints) { + getConfig().setTableHints(hints); + } } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java index b38883be416..c00e0decb3e 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.clientImpl.ScannerOptions; import org.apache.accumulo.core.clientImpl.TabletLocator; import org.apache.accumulo.core.data.Key; @@ -524,7 +525,10 @@ protected void submitScan(Scan scan, boolean increment) { */ public BatchScannerSession setOptions(SessionOptions options) { return this; + } + public void setConsistencyLevel(ScannerBase.ConsistencyLevel consistencyLevel) { + this.options.setConsistencyLevel(consistencyLevel); } /** diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java index 034f1c890a4..b12970e808a 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerFactory.java @@ -3,7 +3,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,10 +35,12 @@ public class ScannerFactory { + private static final int DEFAULT_MAX_THREADS = 100; protected int maxQueue = 1000; + protected final Set instances = Collections.synchronizedSet(new HashSet<>()); protected final Set sessionInstances = Collections.synchronizedSet(new HashSet<>()); - protected final AccumuloClient cxn; + protected AccumuloClient client; // using an AtomicBoolean to give us a separate monitor for synchronization protected final AtomicBoolean open = new AtomicBoolean(true); @@ -45,43 +49,97 @@ public class ScannerFactory { protected ResourceQueue scanQueue = null; protected ShardQueryConfiguration config = null; - private static final Logger log = Logger.getLogger(ScannerFactory.class); - - public ScannerFactory(GenericQueryConfiguration queryConfiguration) { + protected Map consistencyByTable = new HashMap<>(); + protected Map> hintsByTable = new HashMap<>(); - this.cxn = queryConfiguration.getClient(); - log.debug("Created scanner factory " + System.identityHashCode(this) + " is wrapped ? " + (cxn instanceof WrappedConnector)); + private static final Logger log = Logger.getLogger(ScannerFactory.class); - if (queryConfiguration instanceof ShardQueryConfiguration) { - this.config = ((ShardQueryConfiguration) queryConfiguration); - this.maxQueue = this.config.getMaxScannerBatchSize(); - this.settings = this.config.getQuery(); - this.accrueStats = this.config.getAccrueStats(); - try { - scanQueue = new ResourceQueue(this.config.getNumQueryThreads(), this.cxn); - } catch (Exception e) { - throw new RuntimeException(e); - } - } + /** + * Preferred constructor, builds scanner factory from configs + * + * @param config + * a {@link GenericQueryConfiguration} + */ + public ScannerFactory(GenericQueryConfiguration config) { + updateConfigs(config); } + /** + * Constructor that accepts a prebuilt AccumuloClient + * + * @param client + * an {@link AccumuloClient} + */ public ScannerFactory(AccumuloClient client) { - this(client, 100); + this(client, DEFAULT_MAX_THREADS); } + /** + * Constructor that accepts a prebuild AccumuloClient and limits the internal result queue to the provided value + * + * @param client + * an {@link AccumuloClient} + * @param queueSize + * the internal result queue size + */ public ScannerFactory(AccumuloClient client, int queueSize) { try { - this.cxn = client; + this.client = client; this.scanQueue = new ResourceQueue(queueSize, client); } catch (Exception e) { throw new RuntimeException(e); } } + /** + * Method that allows a ScannerFactory to be updated by a config after initialization + * + * @param genericConfig + * a {@link GenericQueryConfiguration} + */ + public void updateConfigs(GenericQueryConfiguration genericConfig) { + + this.client = genericConfig.getClient(); + + Map consistencyLevels = genericConfig.getTableConsistencyLevels(); + if (consistencyLevels != null && !consistencyLevels.isEmpty()) { + this.consistencyByTable = genericConfig.getTableConsistencyLevels(); + } + + Map> hints = genericConfig.getTableHints(); + if (hints != null && !hints.isEmpty()) { + this.hintsByTable = genericConfig.getTableHints(); + } + + int numThreads = DEFAULT_MAX_THREADS; + if (genericConfig instanceof ShardQueryConfiguration) { + ShardQueryConfiguration config = (ShardQueryConfiguration) genericConfig; + + this.settings = config.getQuery(); + this.accrueStats = config.getAccrueStats(); + this.maxQueue = config.getMaxScannerBatchSize(); + this.config = config; + + numThreads = config.getNumQueryThreads(); + } + + try { + this.scanQueue = new ResourceQueue(numThreads, this.client); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (log.isDebugEnabled()) { + log.debug("Created ScannerFactory " + System.identityHashCode(this) + " is wrapped ? " + (client instanceof WrappedConnector)); + } + } + public Scanner newSingleScanner(String tableName, Set auths, Query query) throws TableNotFoundException { if (open.get()) { - Scanner bs = QueryScannerHelper.createScannerWithoutInfo(cxn, tableName, auths, query); + Scanner bs = QueryScannerHelper.createScannerWithoutInfo(client, tableName, auths, query); + applyConfigs(bs, tableName); + log.debug("Created scanner " + System.identityHashCode(bs)); if (log.isTraceEnabled()) { log.trace("Adding instance " + bs.hashCode()); @@ -103,7 +161,9 @@ public Scanner newSingleScanner(String tableName, Set auths, Que public BatchScanner newScanner(String tableName, Set auths, int threads, Query query) throws TableNotFoundException { if (open.get()) { - BatchScanner bs = QueryScannerHelper.createBatchScanner(cxn, tableName, auths, threads, query); + BatchScanner bs = QueryScannerHelper.createBatchScanner(client, tableName, auths, threads, query); + applyConfigs(bs, tableName); + log.debug("Created scanner " + System.identityHashCode(bs)); if (log.isTraceEnabled()) { log.trace("Adding instance " + bs.hashCode()); @@ -124,7 +184,9 @@ public BatchScanner newScanner(String tableName, Set auths, int public BatchScanner newScanner(String tableName, Set auths, int threads, Query query, boolean reportErrors) throws TableNotFoundException { if (open.get()) { - BatchScanner bs = QueryScannerHelper.createBatchScanner(cxn, tableName, auths, threads, query, reportErrors); + BatchScanner bs = QueryScannerHelper.createBatchScanner(client, tableName, auths, threads, query, reportErrors); + applyConfigs(bs, tableName); + log.debug("Created scanner " + System.identityHashCode(bs)); if (log.isTraceEnabled()) { log.trace("Adding instance " + bs.hashCode()); @@ -166,7 +228,6 @@ public BatchScanner newScanner(String tableName, Query query) throws TableNotFou * if there are issues */ public BatchScannerSession newQueryScanner(final String tableName, final Set auths, Query settings) throws Exception { - return newLimitedScanner(BatchScannerSession.class, tableName, auths, settings).setThreads(scanQueue.getCapacity()); } @@ -202,7 +263,7 @@ public T newLimitedScanner(Class wrapper, final St stats = new ScanSessionStats(); } - T session = null; + T session; if (wrapper == ScannerSession.class) { session = (T) new ScannerSession(tableName, auths, scanQueue, maxQueue, settings).applyStats(stats); } else { @@ -210,6 +271,8 @@ public T newLimitedScanner(Class wrapper, final St .newInstance(new ScannerSession(tableName, auths, scanQueue, maxQueue, settings).applyStats(stats)); } + applyConfigs(session, tableName); + log.debug("Created session " + System.identityHashCode(session)); if (log.isTraceEnabled()) { log.trace("Adding instance " + session.hashCode()); @@ -316,24 +379,24 @@ public ScannerBase newRfileScanner(String tableName, Set auths, if (open.get()) { Configuration conf = new Configuration(); - AccumuloClient con = cxn; - - Properties clientProps = con.properties(); + Properties clientProps = client.properties(); final String instanceName = clientProps.getProperty(ClientProperty.INSTANCE_NAME.getKey()); final String zookeepers = clientProps.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey()); AccumuloHelper.setInstanceName(conf, instanceName); - AccumuloHelper.setUsername(conf, con.whoami()); + AccumuloHelper.setUsername(conf, client.whoami()); AccumuloHelper.setZooKeepers(conf, zookeepers); BulkInputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); AccumuloHelper.setPassword(conf, config.getAccumuloPassword().getBytes()); - BulkInputFormat.setMemoryInput(conf, con.whoami(), config.getAccumuloPassword().getBytes(), tableName, auths.iterator().next()); + BulkInputFormat.setMemoryInput(conf, client.whoami(), config.getAccumuloPassword().getBytes(), tableName, auths.iterator().next()); conf.set(MultiRfileInputformat.CACHE_METADATA, "true"); - ScannerBase baseScanner = new RfileScanner(con, conf, tableName, auths, 1); + ScannerBase baseScanner = new RfileScanner(client, conf, tableName, auths, 1); + + applyConfigs(baseScanner, tableName); synchronized (open) { if (open.get()) { @@ -348,4 +411,44 @@ public ScannerBase newRfileScanner(String tableName, Set auths, throw new IllegalStateException("Factory has been locked. No new scanners can be created."); } } + + /** + * Apply table-specific scanner configs to the provided scanner base object + * + * @param scannerBase + * a {@link ScannerBase} + * @param tableName + * the table + */ + protected void applyConfigs(ScannerBase scannerBase, String tableName) { + if (consistencyByTable != null && consistencyByTable.containsKey(tableName)) { + scannerBase.setConsistencyLevel(consistencyByTable.get(tableName)); + } + + if (hintsByTable != null && hintsByTable.containsKey(tableName)) { + scannerBase.setExecutionHints(hintsByTable.get(tableName)); + } + } + + /** + * Apply table-specific scanner configs to the provided scanner session + * + * @param scannerSession + * the {@link ScannerSession} + * @param tableName + * the table + */ + protected void applyConfigs(ScannerSession scannerSession, String tableName) { + SessionOptions options = scannerSession.getOptions(); + + if (consistencyByTable != null && consistencyByTable.containsKey(tableName)) { + options.setConsistencyLevel(consistencyByTable.get(tableName)); + } + + if (hintsByTable != null && hintsByTable.containsKey(tableName)) { + options.setExecutionHints(hintsByTable.get(tableName)); + } + + scannerSession.setOptions(options); + } } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ShardIndexQueryTable.java b/warehouse/query-core/src/main/java/datawave/query/tables/ShardIndexQueryTable.java index d10c7d978de..380447822d7 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ShardIndexQueryTable.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ShardIndexQueryTable.java @@ -183,9 +183,11 @@ public void setReverseIndexTableName(String reverseIndexTableName) { @Override public GenericQueryConfiguration initialize(AccumuloClient client, Query settings, Set auths) throws Exception { - this.config = new ShardIndexQueryConfiguration(this, settings); - this.scannerFactory = new ScannerFactory(client); - MetadataHelper metadataHelper = initializeMetadataHelper(client, getConfig().getMetadataTableName(), auths); + ShardIndexQueryConfiguration config = new ShardIndexQueryConfiguration(this, settings); + config.setClient(client); + this.scannerFactory = new ScannerFactory(config); + + MetadataHelper metadataHelper = initializeMetadataHelper(client, config.getMetadataTableName(), auths); if (StringUtils.isEmpty(settings.getQuery())) { throw new IllegalArgumentException("Query cannot be null"); @@ -424,7 +426,6 @@ public void setupQuery(AccumuloClient client, GenericQueryConfiguration baseConf baseConfig.setQueries(checkpoint.getQueries()); config.setClient(client); - scannerFactory = new ScannerFactory(client); MetadataHelper metadataHelper = initializeMetadataHelper(client, config.getMetadataTableName(), config.getAuthorizations()); config.setQueryModel(metadataHelper.getQueryModel(config.getModelTableName(), config.getModelName(), null)); diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ShardQueryLogic.java b/warehouse/query-core/src/main/java/datawave/query/tables/ShardQueryLogic.java index 13451949d05..4ce5f1c1b8e 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ShardQueryLogic.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ShardQueryLogic.java @@ -260,6 +260,14 @@ public static BatchScanner createBatchScanner(ShardQueryConfiguration config, Sc bs.addScanIterator(cfg); } + if (config.getTableConsistencyLevels().containsKey(config.getTableName())) { + bs.setConsistencyLevel(config.getTableConsistencyLevels().get(config.getTableName())); + } + + if (config.getTableHints().containsKey(config.getTableName())) { + bs.setExecutionHints(config.getTableHints().get(config.getTableName())); + } + return bs; } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/content/ContentQueryLogic.java b/warehouse/query-core/src/main/java/datawave/query/tables/content/ContentQueryLogic.java index 797e240d061..26d48b3a32b 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/content/ContentQueryLogic.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/content/ContentQueryLogic.java @@ -101,11 +101,12 @@ public void close() { @Override public GenericQueryConfiguration initialize(final AccumuloClient client, final Query settings, final Set auths) throws Exception { // Initialize the config and scanner factory - config = new ContentQueryConfiguration(this, settings); - this.scannerFactory = new ScannerFactory(client); + final ContentQueryConfiguration config = new ContentQueryConfiguration(this, settings); config.setClient(client); config.setAuthorizations(auths); + this.scannerFactory = new ScannerFactory(config); + // Re-assign the view name if specified via params Parameter p = settings.findParameter(QueryParameters.CONTENT_VIEW_NAME); if (null != p && !StringUtils.isEmpty(p.getParameterValue())) { diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/edge/EdgeQueryLogic.java b/warehouse/query-core/src/main/java/datawave/query/tables/edge/EdgeQueryLogic.java index f161e326574..432d4eff4e7 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/edge/EdgeQueryLogic.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/edge/EdgeQueryLogic.java @@ -149,9 +149,10 @@ public GenericQueryConfiguration initialize(AccumuloClient client, Query setting } else { config.setQueryString(queryString); } + config.setBeginDate(settings.getBeginDate()); config.setEndDate(settings.getEndDate()); - scannerFactory = new ScannerFactory(client); + scannerFactory = new ScannerFactory(config); prefilterValues = null; EdgeQueryConfiguration.dateType dateFilterType = config.getDateRangeType(); diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/shard/FieldIndexCountQueryLogic.java b/warehouse/query-core/src/main/java/datawave/query/tables/shard/FieldIndexCountQueryLogic.java index 4602dcf3c55..0720f412b09 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/shard/FieldIndexCountQueryLogic.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/shard/FieldIndexCountQueryLogic.java @@ -69,7 +69,6 @@ public GenericQueryConfiguration initialize(AccumuloClient client, Query setting logger.trace("initialize"); } - this.scannerFactory = new ScannerFactory(client); MetadataHelper metadataHelper = prepareMetadataHelper(client, this.getMetadataTableName(), auths); String modelName = this.getModelName(); String modelTableName = this.getModelTableName(); @@ -97,6 +96,8 @@ public GenericQueryConfiguration initialize(AccumuloClient client, Query setting config.setClient(client); config.setAuthorizations(auths); + this.scannerFactory = new ScannerFactory(config); + // the following throw IllegalArgumentExceptions if validation fails. parseQuery(config, settings); configDate(config, settings); diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ssdeep/SSDeepSimilarityQueryLogic.java b/warehouse/query-core/src/main/java/datawave/query/tables/ssdeep/SSDeepSimilarityQueryLogic.java index eb40a1f4d16..ccc849baa35 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ssdeep/SSDeepSimilarityQueryLogic.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ssdeep/SSDeepSimilarityQueryLogic.java @@ -59,12 +59,11 @@ public SSDeepSimilarityQueryConfiguration getConfig() { @Override public GenericQueryConfiguration initialize(AccumuloClient accumuloClient, Query settings, Set auths) throws Exception { - this.scannerFactory = new ScannerFactory(accumuloClient); - final SSDeepSimilarityQueryConfiguration config = getConfig(); config.setQuery(settings); config.setClient(accumuloClient); config.setAuthorizations(auths); + this.scannerFactory = new ScannerFactory(config); setupRanges(settings, config); return config; } diff --git a/warehouse/query-core/src/test/java/datawave/query/ScanConsistencyQueryTest.java b/warehouse/query-core/src/test/java/datawave/query/ScanConsistencyQueryTest.java new file mode 100644 index 00000000000..dda9526009a --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/ScanConsistencyQueryTest.java @@ -0,0 +1,121 @@ +package datawave.query; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import com.google.common.collect.Sets; + +import datawave.core.query.configuration.GenericQueryConfiguration; +import datawave.core.query.logic.BaseQueryLogic; +import datawave.helpers.PrintUtility; +import datawave.microservice.query.QueryImpl; +import datawave.query.testframework.AbstractFunctionalQuery; +import datawave.query.testframework.AccumuloSetup; +import datawave.query.testframework.CitiesDataType; +import datawave.query.testframework.CitiesDataType.CityEntry; +import datawave.query.testframework.CitiesDataType.CityField; +import datawave.query.testframework.DataTypeHadoopConfig; +import datawave.query.testframework.FieldConfig; +import datawave.query.testframework.FileType; +import datawave.query.testframework.GenericCityFields; +import datawave.query.testframework.QueryLogicTestHarness; +import datawave.util.TableName; + +/** + * Tests for setting and persisting of scan consistency level + */ +public class ScanConsistencyQueryTest extends AbstractFunctionalQuery { + + @ClassRule + public static AccumuloSetup accumuloSetup = new AccumuloSetup(); + + private static final Logger log = Logger.getLogger(ScanConsistencyQueryTest.class); + + @BeforeClass + public static void filterSetup() throws Exception { + Collection dataTypes = new ArrayList<>(); + FieldConfig generic = new GenericCityFields(); + generic.addIndexField(CityField.NUM.name()); + dataTypes.add(new CitiesDataType(CityEntry.generic, generic)); + + accumuloSetup.setData(FileType.CSV, dataTypes); + Logger.getLogger(PrintUtility.class).setLevel(Level.DEBUG); + client = accumuloSetup.loadTables(log); + } + + public ScanConsistencyQueryTest() { + super(CitiesDataType.getManager()); + } + + @Test + public void testSimpleQuery() throws Exception { + + // for now just assert the consistency level on the final batch scanner + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put(TableName.SHARD, ScannerBase.ConsistencyLevel.EVENTUAL); + logic.setTableConsistencyLevels(consistencyLevels); + + String query = "CITY == 'london'"; + Set expected = Sets.newHashSet("ldn-usa-mi-10", "ldn-uk-7", "ldn-fra-lle-11", "ldn-usa-oh-8", "ldn-usa-mo-8"); + runTest(query, expected); + } + + @Override + protected void runTestQuery(Collection expected, String queryStr, Date startDate, Date endDate, Map options, + List checkers, Set authSet) throws Exception { + + if (authSet == null || authSet.isEmpty()) { + authSet = this.authSet; + } + + QueryImpl q = new QueryImpl(); + q.setBeginDate(startDate); + q.setEndDate(endDate); + q.setQuery(queryStr); + q.setParameters(options); + + q.setId(UUID.randomUUID()); + q.setPagesize(Integer.MAX_VALUE); + q.setQueryAuthorizations(auths.toString()); + + GenericQueryConfiguration config = this.logic.initialize(client, q, authSet); + config.setTableConsistencyLevels(Collections.singletonMap(TableName.SHARD, ScannerBase.ConsistencyLevel.IMMEDIATE)); + + this.logic.setupQuery(config); + + assertScannerConsistency(this.logic); + } + + public void assertScannerConsistency(BaseQueryLogic> logic) { + Map expected = Collections.singletonMap(TableName.SHARD, ScannerBase.ConsistencyLevel.IMMEDIATE); + assertEquals(expected, logic.getConfig().getTableConsistencyLevels()); + assertEquals(expected, logic.getTableConsistencyLevels()); + } + + // ============================================ + // implemented abstract methods + protected void testInit() { + this.auths = CitiesDataType.getTestAuths(); + this.documentKey = CityField.EVENT_ID.name(); + } +} diff --git a/warehouse/query-core/src/test/java/datawave/query/config/ShardQueryConfigurationTest.java b/warehouse/query-core/src/test/java/datawave/query/config/ShardQueryConfigurationTest.java index f1f22b30725..895a2cbae9f 100644 --- a/warehouse/query-core/src/test/java/datawave/query/config/ShardQueryConfigurationTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/config/ShardQueryConfigurationTest.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.function.Predicate; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.security.Authorizations; import org.junit.Assert; import org.junit.Before; @@ -580,6 +581,10 @@ public void setUp() throws Exception { updatedValues.put("sortQueryBeforeGlobalIndex", true); defaultValues.put("sortQueryByCounts", false); updatedValues.put("sortQueryByCounts", true); + defaultValues.put("tableConsistencyLevels", Collections.emptyMap()); + updatedValues.put("tableConsistencyLevels", Collections.singletonMap(TableName.SHARD, ScannerBase.ConsistencyLevel.EVENTUAL)); + defaultValues.put("tableHints", Collections.emptyMap()); + updatedValues.put("tableHints", Collections.emptyMap()); } private Query createQuery(String query) { diff --git a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamQueryTest.java b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamQueryTest.java index 36d36a95d45..3f25e103f88 100644 --- a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamQueryTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamQueryTest.java @@ -383,7 +383,8 @@ private void test(String query, String expected, QUERY_CONTEXT queryContext, TER // Run a standard limited-scanner range stream. count++; - rangeStream = new RangeStream(config, new ScannerFactory(config.getClient(), 1), helper); + ScannerFactory scannerFactory = new ScannerFactory(config); + rangeStream = new RangeStream(config, scannerFactory, helper); rangeStream.setLimitScanners(true); script = JexlASTHelper.parseAndFlattenJexlQuery(query); diff --git a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTest.java b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTest.java index 637018c0053..b2c59176dc9 100644 --- a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTest.java @@ -51,6 +51,7 @@ import datawave.query.jexl.visitors.JexlStringBuildingVisitor; import datawave.query.planner.QueryPlan; import datawave.query.tables.ScannerFactory; +import datawave.query.util.MetadataHelper; import datawave.query.util.MockMetadataHelper; import datawave.query.util.Tuple2; import datawave.test.JexlNodeAssert; @@ -443,7 +444,7 @@ public void testTheSimplestOfQueries() throws Exception { helper.setIndexedFields(dataTypes.keySet()); Set expectedRanges = Sets.newHashSet(makeTestRange("20190314", "datatype1\u0000234"), makeTestRange("20190314", "datatype1\u0000345")); - for (QueryPlan queryPlan : new RangeStream(config, new ScannerFactory(config.getClient()), helper).streamPlans(script)) { + for (QueryPlan queryPlan : getRangeStream(helper).streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range from expected ranges: " + range.toString(), expectedRanges.remove(range)); } @@ -470,7 +471,7 @@ public void testShardAndDaysHint() throws Exception { helper.setIndexedFields(dataTypes.keySet()); Set expectedRanges = Sets.newHashSet(makeShardedRange("20190314_1")); - for (QueryPlan queryPlan : new RangeStream(config, new ScannerFactory(config.getClient()), helper).streamPlans(script)) { + for (QueryPlan queryPlan : getRangeStream(helper).streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges, expectedRanges.remove(range)); } @@ -499,7 +500,7 @@ public void testShardAndDaysHints(String originalQuery) throws Exception { expectedRanges.add(makeShardedRange(shard)); } - for (QueryPlan queryPlan : new RangeStream(config, new ScannerFactory(config.getClient()), helper).streamPlans(script)) { + for (QueryPlan queryPlan : getRangeStream(helper).streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges, expectedRanges.remove(range)); } @@ -558,7 +559,7 @@ public void testShardAndDaysHints6() throws Exception { Set expectedRanges = Sets.newHashSet(makeShardedRange("20190314_1")); - for (QueryPlan queryPlan : new RangeStream(config, new ScannerFactory(config.getClient()), helper).streamPlans(script)) { + for (QueryPlan queryPlan : getRangeStream(helper).streamPlans(script)) { // verify the query plan dropped no terms JexlNode queryTree = JexlASTHelper.parseJexlQuery(queryPlan.getQueryString()); JexlNode expectedTree = JexlASTHelper.parseJexlQuery( @@ -596,7 +597,7 @@ public void testOrBothIndexed() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Range range3 = makeTestRange("20190314", "datatype1\u0000123"); Set expectedRanges = Sets.newHashSet(range1, range2, range3); - for (QueryPlan queryPlan : new RangeStream(config, new ScannerFactory(config.getClient()), helper).streamPlans(script)) { + for (QueryPlan queryPlan : getRangeStream(helper).streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), expectedRanges.remove(range)); @@ -628,7 +629,7 @@ public void testNestedOr() throws Exception { Range range3 = makeTestRange("20190314", "datatype1\u0000123"); Set expectedRanges = Sets.newHashSet(range1, range2, range3); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient(), 1), helper); + RangeStream rangeStream = getRangeStream(helper); rangeStream.setLimitScanners(true); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { @@ -663,7 +664,7 @@ public void testBothIndexedPrune() throws Exception { Range range4 = makeTestRange("20190414_1", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2, range3, range4); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient(), 1), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), @@ -692,7 +693,7 @@ public void testOrOneFieldIndexed() throws Exception { helper.setIndexedFields(dataTypes.keySet()); helper.addFields(ImmutableSet.of("TACO")); - assertFalse(new RangeStream(config, new ScannerFactory(config.getClient()), helper).streamPlans(script).iterator().hasNext()); + assertFalse(getRangeStream(helper).streamPlans(script).iterator().hasNext()); } @Test @@ -713,7 +714,7 @@ public void testOrNoFieldIndexed() throws Exception { MockMetadataHelper helper = new MockMetadataHelper(); helper.setIndexedFields(dataTypes.keySet()); - assertFalse(new RangeStream(config, new ScannerFactory(config.getClient()), helper).streamPlans(script).iterator().hasNext()); + assertFalse(getRangeStream(helper).streamPlans(script).iterator().hasNext()); } @Test @@ -734,7 +735,7 @@ public void testAndTwoFieldsIndexed() throws Exception { MockMetadataHelper helper = new MockMetadataHelper(); helper.setIndexedFields(dataTypes.keySet()); - assertFalse(new RangeStream(config, new ScannerFactory(config.getClient()), helper).streamPlans(script).iterator().hasNext()); + assertFalse(getRangeStream(helper).streamPlans(script).iterator().hasNext()); } @Test @@ -760,7 +761,7 @@ public void testAndOneFieldIndexed() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper); + RangeStream rangeStream = getRangeStream(helper); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), @@ -788,7 +789,7 @@ public void testAndNoFieldIndexed() throws Exception { MockMetadataHelper helper = new MockMetadataHelper(); helper.setIndexedFields(dataTypes.keySet()); - assertFalse(new RangeStream(config, new ScannerFactory(config.getClient()), helper).streamPlans(script).iterator().hasNext()); + assertFalse(getRangeStream(helper).streamPlans(script).iterator().hasNext()); } @Test @@ -813,7 +814,7 @@ public void testNonIndexedNumeric() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper); + RangeStream rangeStream = getRangeStream(helper); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), @@ -840,7 +841,7 @@ public void testGuardAgainstVeryOddlyFormedJexlQueriesLikeFoo() throws Exception MockMetadataHelper helper = new MockMetadataHelper(); helper.setIndexedFields(dataTypes.keySet()); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper); + RangeStream rangeStream = getRangeStream(helper); rangeStream.streamPlans(script); assertEquals(IndexStream.StreamContext.UNINDEXED, rangeStream.context()); assertEquals(Collections.emptyIterator(), rangeStream.iterator()); @@ -870,7 +871,7 @@ public void testPrune() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper); + RangeStream rangeStream = getRangeStream(helper); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { assertEquals("FOO == 'bag'", JexlStringBuildingVisitor.buildQuery(queryPlan.getQueryTree())); for (Range range : queryPlan.getRanges()) { @@ -903,7 +904,7 @@ public void testUnIndexedNumericAsString() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper); + RangeStream rangeStream = getRangeStream(helper); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), @@ -934,7 +935,7 @@ public void testNegatedIndexWithResults() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), @@ -965,7 +966,7 @@ public void testNegatedIndexWithNoResults() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), @@ -996,7 +997,7 @@ public void testNegatedNonIndexed() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), @@ -1027,7 +1028,7 @@ public void testNegatedIndexWithResultsStandaloneNot() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), @@ -1058,7 +1059,7 @@ public void testNegatedIndexWithNoResultsStandaloneNot() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); for (QueryPlan queryPlan : rangeStream.streamPlans(script)) { for (Range range : queryPlan.getRanges()) { assertTrue("Tried to remove unexpected range " + range.toString() + " from expected ranges: " + expectedRanges.toString(), @@ -1096,7 +1097,7 @@ public void testIntersectionOfTwoUnions() throws Exception { Range range2 = makeTestRange("20190314", "datatype1\u0000345"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); assertEquals(IndexStream.StreamContext.VARIABLE, rangeStream.context()); @@ -1131,7 +1132,7 @@ public void testIntersectionOfTwoUnionsAllIndexed() throws Exception { MockMetadataHelper helper = new MockMetadataHelper(); helper.setIndexedFields(dataTypes.keySet()); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); rangeStream.streamPlans(script); // streamPlans(script) to populate the StreamContext. assertEquals(IndexStream.StreamContext.ABSENT, rangeStream.context()); @@ -1155,7 +1156,7 @@ public void testNonExistentFieldInOr() throws Exception { MockMetadataHelper helper = new MockMetadataHelper(); helper.setIndexedFields(dataTypes.keySet()); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); rangeStream.streamPlans(script); // streamPlans(script) to populate the StreamContext. assertFalse(rangeStream.iterator().hasNext()); @@ -1181,7 +1182,7 @@ public void testNonExistentFieldInAnd() throws Exception { MockMetadataHelper helper = new MockMetadataHelper(); helper.setIndexedFields(dataTypes.keySet()); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient()), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); rangeStream.streamPlans(script); // streamPlans(script) to populate the StreamContext. assertFalse(rangeStream.iterator().hasNext()); @@ -1216,7 +1217,7 @@ public void testDropTwoPredicates() throws Exception { expectedRanges.add(makeShardedRange(shard)); } - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(config.getClient(), 1), helper).setLimitScanners(true); + RangeStream rangeStream = getRangeStream(helper).setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); // streamPlans(script) to populate the StreamContext. assertEquals(IndexStream.StreamContext.PRESENT, rangeStream.context()); @@ -1267,7 +1268,8 @@ public void testIntersection_HighAndLowCardinality_withSeek() throws Exception { Range range3 = makeTestRange("20190315_49", "datatype1\u0000a.b.c"); Set expectedRanges = Sets.newHashSet(range1, range2, range3); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(client, 1), helper); + ScannerFactory scannerFactory = new ScannerFactory(config); + RangeStream rangeStream = getRangeStream(helper); rangeStream.setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); assertEquals(IndexStream.StreamContext.PRESENT, rangeStream.context()); @@ -1309,7 +1311,7 @@ public void testIntersection_NestedUnionOfHighCardinalityTerm_withSeek() throws Range range3 = makeTestRange("20190315_49", "datatype1\u0000a.b.c"); Set expectedRanges = Sets.newHashSet(range1, range2, range3); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(client, 1), helper); + RangeStream rangeStream = getRangeStream(helper); rangeStream.setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); assertEquals(IndexStream.StreamContext.PRESENT, rangeStream.context()); @@ -1353,7 +1355,7 @@ public void testIntersection_NestedUnionOfLowCardinalityTerm_withSeek() throws E Range range6 = makeTestRange("20190317_1", "datatype1\u0000a.b.c"); Set expectedRanges = Sets.newHashSet(range1, range2, range3, range4, range5, range6); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(client, 1), helper); + RangeStream rangeStream = getRangeStream(helper); rangeStream.setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); assertEquals(IndexStream.StreamContext.PRESENT, rangeStream.context()); @@ -1400,7 +1402,7 @@ public void testUnion_HighCardWithNestedIntersectionOfLowCardTerms_withSeek() th expectedRanges.add(makeTestRange("20190315_33", "datatype1\u0000a.b.c")); expectedRanges.add(makeTestRange("20190317_1", "datatype1\u0000a.b.c")); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(client, 1), helper); + RangeStream rangeStream = getRangeStream(helper); rangeStream.setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); assertEquals(IndexStream.StreamContext.PRESENT, rangeStream.context()); @@ -1444,7 +1446,7 @@ public void testUnion_OfTwoNestedIntersections_LeftLowCardTerms_withSeek() throw } } - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(client, 1), helper); + RangeStream rangeStream = getRangeStream(helper); rangeStream.setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); assertEquals(IndexStream.StreamContext.PRESENT, rangeStream.context()); @@ -1489,7 +1491,7 @@ public void testIntersection_OfTwoNestedUnions_LeftLowCardTerms_withSeek() throw Range range6 = makeTestRange("20190317_1", "datatype1\u0000a.b.c"); Set expectedRanges = Sets.newHashSet(range1, range2, range3, range4, range5, range6); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(client, 1), helper); + RangeStream rangeStream = getRangeStream(helper); rangeStream.setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); assertEquals(IndexStream.StreamContext.PRESENT, rangeStream.context()); @@ -1531,7 +1533,7 @@ public void testIntersection_ofDayRangesAndShardRange() throws Exception { Range range2 = makeShardedRange("20190315_51"); Set expectedRanges = Sets.newHashSet(range1, range2); - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(client, 1), helper); + RangeStream rangeStream = getRangeStream(helper); rangeStream.setLimitScanners(true); CloseableIterable queryPlans = rangeStream.streamPlans(script); assertEquals(IndexStream.StreamContext.PRESENT, rangeStream.context()); @@ -1543,4 +1545,9 @@ public void testIntersection_ofDayRangesAndShardRange() throws Exception { } assertTrue("Expected ranges not found in query plan: " + expectedRanges, expectedRanges.isEmpty()); } + + private RangeStream getRangeStream(MetadataHelper helper) { + ScannerFactory scannerFactory = new ScannerFactory(config); + return new RangeStream(config, scannerFactory, helper); + } } diff --git a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTestX.java b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTestX.java index bd7241b8c6e..8062325300d 100644 --- a/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTestX.java +++ b/warehouse/query-core/src/test/java/datawave/query/index/lookup/RangeStreamTestX.java @@ -375,6 +375,7 @@ private static Value buildValueForDay() { public void setupTest() { config = new ShardQueryConfiguration(); config.setShardsPerDayThreshold(20); + config.setClient(client); } // A && B @@ -3294,12 +3295,13 @@ private void runTest(String query, List expectedRanges, List expe helper.setIndexedFields(dataTypes.keySet()); // Run a standard limited-scanner range stream. - RangeStream rangeStream = new RangeStream(config, new ScannerFactory(client, 1), helper); + ScannerFactory scannerFactory = new ScannerFactory(config); + RangeStream rangeStream = new RangeStream(config, scannerFactory, helper); rangeStream.setLimitScanners(true); runTest(rangeStream, script, expectedRanges, expectedQueries); // Run a default range stream. - rangeStream = new RangeStream(config, new ScannerFactory(client, 1), helper); + rangeStream = new RangeStream(config, scannerFactory, helper); rangeStream.setLimitScanners(false); runTest(rangeStream, script, expectedRanges, expectedQueries); diff --git a/warehouse/query-core/src/test/java/datawave/query/jexl/visitors/UnfieldedIndexExpansionVisitorTest.java b/warehouse/query-core/src/test/java/datawave/query/jexl/visitors/UnfieldedIndexExpansionVisitorTest.java index 5a89fecd576..c3f3c7eaf2c 100644 --- a/warehouse/query-core/src/test/java/datawave/query/jexl/visitors/UnfieldedIndexExpansionVisitorTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/jexl/visitors/UnfieldedIndexExpansionVisitorTest.java @@ -219,7 +219,7 @@ public void testSingleTermMatchesWithExpansionFields() { String expected = "FIELD1 == 'burrito' || FIELD2 == 'burrito'"; ASTJexlScript script = JexlASTHelper.parseJexlQuery(query); - ScannerFactory scannerFactory = new ScannerFactory(config.getClient()); + ScannerFactory scannerFactory = new ScannerFactory(config); metadataHelper.addExpansionFields(ImmutableSet.of("FIELD1", "FIELD2")); ASTJexlScript fixed = UnfieldedIndexExpansionVisitor.expandUnfielded(config, scannerFactory, metadataHelper, script); @@ -243,7 +243,7 @@ public void testSingleTermMatchesWithMisMatchedExpansionFields() { String expected = "_NOFIELD_ == 'burrito'"; ASTJexlScript script = JexlASTHelper.parseJexlQuery(query); - ScannerFactory scannerFactory = new ScannerFactory(config.getClient()); + ScannerFactory scannerFactory = new ScannerFactory(config); metadataHelper.addExpansionFields(ImmutableSet.of("FOOBAR")); ASTJexlScript fixed = UnfieldedIndexExpansionVisitor.expandUnfielded(config, scannerFactory, metadataHelper, script); @@ -369,7 +369,7 @@ public void testNegatedLeadingRegexExpansionRestrictedByExpansionFields() { String expected = "(_ANYFIELD_ !~ '.*?ly' && FIELD8 != 'fearfully' && FIELD9 != 'wonderfully')"; ASTJexlScript script = JexlASTHelper.parseJexlQuery(query); - ScannerFactory scannerFactory = new ScannerFactory(config.getClient()); + ScannerFactory scannerFactory = new ScannerFactory(config); metadataHelper.addExpansionFields(ImmutableSet.of("FIELD8", "FIELD9")); ASTJexlScript fixed = UnfieldedIndexExpansionVisitor.expandUnfielded(config, scannerFactory, metadataHelper, script); @@ -428,7 +428,7 @@ public void test(String query, String expected, ShardQueryConfiguration config) // throw exception to assert failure cases, such as CannotExpandUnfieldedTermFatalException public void test(String query, String expected, ShardQueryConfiguration config, MockMetadataHelper metadataHelper) throws Exception { ASTJexlScript script = JexlASTHelper.parseJexlQuery(query); - ScannerFactory scannerFactory = new ScannerFactory(config.getClient()); + ScannerFactory scannerFactory = new ScannerFactory(config); ASTJexlScript fixed = UnfieldedIndexExpansionVisitor.expandUnfielded(config, scannerFactory, metadataHelper, script); // assert and validate diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/RangeStreamScannerTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/RangeStreamScannerTest.java index b16f60393b9..a935f250460 100644 --- a/warehouse/query-core/src/test/java/datawave/query/tables/RangeStreamScannerTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/tables/RangeStreamScannerTest.java @@ -105,8 +105,6 @@ public static void beforeClass() throws Exception { client = new InMemoryAccumuloClient("", instance); client.tableOperations().create(SHARD_INDEX); - scannerFactory = new ScannerFactory(client, 1); - BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(1024L).setMaxLatency(1, TimeUnit.SECONDS).setMaxWriteThreads(1); BatchWriter bw = client.createBatchWriter(SHARD_INDEX, bwConfig); @@ -169,6 +167,9 @@ public static void beforeClass() throws Exception { config.setQueryFieldsDatatypes(dataTypes); config.setIndexedFields(dataTypes); + + config.setClient(client); + scannerFactory = new ScannerFactory(config); } // Helper method largely copied from RangeStream class. @@ -368,7 +369,7 @@ public void testExceedShardsPerDayThresholdAndDocumentsPerShardThreshold() throw @Test public void testGetDay() throws Exception { // Build RangeStreamScanner - ScannerFactory scanners = new ScannerFactory(client, 1); + ScannerFactory scanners = new ScannerFactory(config); RangeStreamScanner rangeStreamScanner = scanners.newRangeScanner(config.getIndexTableName(), config.getAuthorizations(), config.getQuery(), config.getShardsPerDayThreshold()); diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/ScannerFactoryTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerFactoryTest.java new file mode 100644 index 00000000000..499773360b9 --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/tables/ScannerFactoryTest.java @@ -0,0 +1,232 @@ +package datawave.query.tables; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.core.security.Authorizations; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; +import datawave.microservice.query.Query; +import datawave.microservice.query.QueryImpl; +import datawave.query.config.ShardQueryConfiguration; +import datawave.util.TableName; + +class ScannerFactoryTest { + + private static final InMemoryInstance instance = new InMemoryInstance(ScannerFactoryTest.class.getSimpleName()); + private static ScannerFactory scannerFactory; + private static final ShardQueryConfiguration config = new ShardQueryConfiguration(); + + @BeforeAll + public static void before() throws Exception { + AccumuloClient client = new MyAccumuloClient("", instance); + config.setClient(client); + scannerFactory = new ScannerFactory(config); + + client.tableOperations().create(TableName.SHARD); + client.instanceOperations().setProperty("accumulo.instance.name", "required-for-tests"); + } + + @BeforeEach + public void setup() { + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put(TableName.SHARD, ScannerBase.ConsistencyLevel.IMMEDIATE); + config.setTableConsistencyLevels(consistencyLevels); + scannerFactory.updateConfigs(config); + } + + @Test + void testSingleScannerWithTableNameAuthsQuery() throws TableNotFoundException { + Scanner scanner = scannerFactory.newSingleScanner(TableName.SHARD, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newSingleScanner(TableName.SHARD, getAuths(), getQuery()); + assertEventualConsistency(scanner); + } + + @Test + void testSingleScannerWithTableNameAuthsThreadsQuery() throws TableNotFoundException { + BatchScanner scanner = scannerFactory.newScanner(TableName.SHARD, getAuths(), 1, getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newScanner(TableName.SHARD, getAuths(), 1, getQuery()); + assertEventualConsistency(scanner); + } + + @Test + void testSingleScannerWithTableNameAuthsThreadsQueryReportErrors() throws TableNotFoundException { + BatchScanner scanner = scannerFactory.newScanner(TableName.SHARD, getAuths(), 1, getQuery(), true); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newScanner(TableName.SHARD, getAuths(), 1, getQuery(), true); + assertEventualConsistency(scanner); + } + + @Test + void testScannerWithAuthsQuery() throws TableNotFoundException { + BatchScanner scanner = scannerFactory.newScanner(TableName.SHARD, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newScanner(TableName.SHARD, getAuths(), getQuery()); + assertEventualConsistency(scanner); + } + + @Test + void testScannerWithQuery() throws TableNotFoundException { + BatchScanner scanner = scannerFactory.newScanner(TableName.SHARD, getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newScanner(TableName.SHARD, getQuery()); + assertEventualConsistency(scanner); + } + + @Test + void testQueryScannerWithAuthsQuery() throws Exception { + BatchScannerSession scanner = scannerFactory.newQueryScanner(TableName.SHARD, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newQueryScanner(TableName.SHARD, getAuths(), getQuery()); + assertEventualConsistency(scanner); + } + + @Test + void testLimitedScannerAsAnyFieldScanner() throws Exception { + AnyFieldScanner scanner = scannerFactory.newLimitedScanner(AnyFieldScanner.class, TableName.SHARD, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newLimitedScanner(AnyFieldScanner.class, TableName.SHARD, getAuths(), getQuery()); + assertEventualConsistency(scanner); + } + + @Test + void testLimitedScannerAsRangeStreamScanner() throws Exception { + RangeStreamScanner scanner = scannerFactory.newLimitedScanner(RangeStreamScanner.class, TableName.SHARD, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newLimitedScanner(RangeStreamScanner.class, TableName.SHARD, getAuths(), getQuery()); + assertEventualConsistency(scanner); + } + + @Test + void testLimitedScannerAsBatchScannerSession() throws Exception { + BatchScannerSession scanner = scannerFactory.newLimitedScanner(BatchScannerSession.class, TableName.SHARD, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newLimitedScanner(BatchScannerSession.class, TableName.SHARD, getAuths(), getQuery()); + assertEventualConsistency(scanner); + } + + @Test + void testRangeScannerWithAuthsQuery() throws Exception { + RangeStreamScanner scanner = scannerFactory.newRangeScanner(TableName.SHARD, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newRangeScanner(TableName.SHARD, getAuths(), getQuery()); + assertEventualConsistency(scanner); + } + + @Test + void testRangeScannerWithAuthsQueryThreshold() throws Exception { + RangeStreamScanner scanner = scannerFactory.newRangeScanner(TableName.SHARD, getAuths(), getQuery(), 123); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newRangeScanner(TableName.SHARD, getAuths(), getQuery(), 123); + assertEventualConsistency(scanner); + } + + @Test + void testRFileScanner() { + ScannerBase scanner = scannerFactory.newRfileScanner(TableName.SHARD, getAuths(), getQuery()); + assertImmediateConsistency(scanner); + + setEventualConsistency(); + scanner = scannerFactory.newRfileScanner(TableName.SHARD, getAuths(), getQuery()); + assertEventualConsistency(scanner); + } + + private void setEventualConsistency() { + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put(TableName.SHARD, ScannerBase.ConsistencyLevel.EVENTUAL); + config.setTableConsistencyLevels(consistencyLevels); + scannerFactory.updateConfigs(config); + } + + private void assertEventualConsistency(Scanner scanner) { + assertEquals(ScannerBase.ConsistencyLevel.EVENTUAL, scanner.getConsistencyLevel()); + } + + private void assertEventualConsistency(ScannerBase scanner) { + assertEquals(ScannerBase.ConsistencyLevel.EVENTUAL, scanner.getConsistencyLevel()); + } + + private void assertEventualConsistency(ScannerSession session) { + assertEquals(ScannerBase.ConsistencyLevel.EVENTUAL, session.getOptions().getConsistencyLevel()); + } + + private void assertImmediateConsistency(Scanner scanner) { + assertEquals(ScannerBase.ConsistencyLevel.IMMEDIATE, scanner.getConsistencyLevel()); + } + + private void assertImmediateConsistency(ScannerBase scanner) { + assertEquals(ScannerBase.ConsistencyLevel.IMMEDIATE, scanner.getConsistencyLevel()); + } + + private void assertImmediateConsistency(ScannerSession session) { + assertEquals(ScannerBase.ConsistencyLevel.IMMEDIATE, session.getOptions().getConsistencyLevel()); + } + + private Query getQuery() { + return new QueryImpl(); + } + + private Set getAuths() { + return Collections.singleton(new Authorizations()); + } + + /** + * The RFileScanner factory method requires two client properties to be set. The InMemoryAccumuloClient does not support setting these properties, or any + * properties so to fully test the ScannerFactory we must override the {@link InMemoryAccumuloClient#properties()} method. + */ + static class MyAccumuloClient extends InMemoryAccumuloClient { + + public MyAccumuloClient(String username, InMemoryInstance instance) throws AccumuloSecurityException { + super(username, instance); + } + + @Override + public Properties properties() { + Properties props = new Properties(); + props.put(ClientProperty.INSTANCE_NAME.getKey(), "for-testing"); + props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), "for-testing"); + return props; + } + } + +} diff --git a/warehouse/query-core/src/test/java/datawave/query/testframework/AbstractFunctionalQuery.java b/warehouse/query-core/src/test/java/datawave/query/testframework/AbstractFunctionalQuery.java index 46a2c1a8384..f14173ff0fb 100644 --- a/warehouse/query-core/src/test/java/datawave/query/testframework/AbstractFunctionalQuery.java +++ b/warehouse/query-core/src/test/java/datawave/query/testframework/AbstractFunctionalQuery.java @@ -154,7 +154,7 @@ public enum TestCities { protected QueryLogicTestHarness testHarness; protected DatawavePrincipal principal; - private final Set authSet = new HashSet<>(); + protected final Set authSet = new HashSet<>(); protected AbstractFunctionalQuery(final RawDataManager mgr) { this.dataManager = mgr; diff --git a/web-services/query/src/test/java/datawave/webservice/query/configuration/TestBaseQueryLogic.java b/web-services/query/src/test/java/datawave/webservice/query/configuration/TestBaseQueryLogic.java index e903fbdd148..d806dae0316 100644 --- a/web-services/query/src/test/java/datawave/webservice/query/configuration/TestBaseQueryLogic.java +++ b/web-services/query/src/test/java/datawave/webservice/query/configuration/TestBaseQueryLogic.java @@ -82,6 +82,8 @@ public void testConstructor_Copy() throws Exception { expect(config.getClient()).andReturn(null).anyTimes(); expect(config.getQueries()).andReturn(Collections.emptyList()).anyTimes(); expect(config.getQueriesIter()).andReturn(Collections.emptyIterator()).anyTimes(); + expect(config.getTableConsistencyLevels()).andReturn(Collections.emptyMap()).anyTimes(); + expect(config.getTableHints()).andReturn(Collections.emptyMap()).anyTimes(); expect(this.copy.getConfig()).andReturn(config).anyTimes(); // Run the test