diff --git a/contrib/datawave-quickstart/bin/env.sh b/contrib/datawave-quickstart/bin/env.sh index 1ffcfb5d430..80225cc72a3 100644 --- a/contrib/datawave-quickstart/bin/env.sh +++ b/contrib/datawave-quickstart/bin/env.sh @@ -62,3 +62,5 @@ fi # Order of registration is important, as it reflects install and startup order within global wrapper # functions such as 'allInstall', 'allStart', etc. Likewise, 'allStop' and 'allUninstall' perform # actions in reverse order of registration. See bin/common.sh for more info + + diff --git a/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java b/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java index 2dbfdf655b4..946d43860c7 100644 --- a/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java +++ b/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java @@ -494,6 +494,11 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement */ private boolean sortQueryByCounts = false; + /** + * Controls whether query IDs are logged on the tserver level via {@link datawave.query.iterator.QueryLogIterator}. + */ + private boolean tserverLoggingActive = true; + /** * Default constructor */ @@ -720,6 +725,7 @@ public ShardQueryConfiguration(ShardQueryConfiguration other) { this.setUseTermCounts(other.getUseTermCounts()); this.setSortQueryBeforeGlobalIndex(other.isSortQueryBeforeGlobalIndex()); this.setSortQueryByCounts(other.isSortQueryByCounts()); + this.setTserverLoggingActive(other.isTserverLoggingActive()); } /** @@ -2740,6 +2746,15 @@ public void setSortQueryByCounts(boolean sortQueryByCounts) { this.sortQueryByCounts = sortQueryByCounts; } + public boolean isTserverLoggingActive() { + return this.tserverLoggingActive; + } + + public void setTserverLoggingActive(boolean tserverLoggingActive) { + this.tserverLoggingActive = tserverLoggingActive; + } + + @Override public boolean equals(Object o) { if (this == o) @@ -2947,7 +2962,8 @@ public boolean equals(Object o) { getUseFieldCounts() == that.getUseFieldCounts() && getUseTermCounts() == that.getUseTermCounts() && isSortQueryBeforeGlobalIndex() == that.isSortQueryBeforeGlobalIndex() && - isSortQueryByCounts() == that.isSortQueryByCounts(); + isSortQueryByCounts() == that.isSortQueryByCounts() && + isTserverLoggingActive() == that.isTserverLoggingActive(); // @formatter:on } @@ -3152,7 +3168,8 @@ public int hashCode() { getUseFieldCounts(), getUseTermCounts(), isSortQueryBeforeGlobalIndex(), - isSortQueryByCounts()); + isSortQueryByCounts(), + isTserverLoggingActive()); // @formatter:on } diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryLogIterator.java b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryLogIterator.java new file mode 100644 index 00000000000..32078c2d29c --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryLogIterator.java @@ -0,0 +1,145 @@ +package datawave.query.iterator; + +import static datawave.query.iterator.QueryOptions.QUERY_ID; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.OptionDescriber; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.log4j.Logger; + +public class QueryLogIterator implements SortedKeyValueIterator, OptionDescriber { + + private static final Logger log = Logger.getLogger(QueryLogIterator.class); + + private static final String CLASS_NAME = QueryLogIterator.class.getSimpleName(); + private String queryID; + private SortedKeyValueIterator source; + private IteratorEnvironment myEnvironment; + + public QueryLogIterator() {} + + public QueryLogIterator(QueryLogIterator other, IteratorEnvironment env) { + this.myEnvironment = other.myEnvironment; + this.queryID = other.queryID; + } + + @Override + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { + + try { + this.queryID = options.get(QUERY_ID); + this.source = source; + this.myEnvironment = env; + logStartOf("init()"); + } finally { + logEndOf("init()"); + } + } + + private void logStartOf(String methodName) { + if (log.isInfoEnabled()) { + log.info(CLASS_NAME + " " + methodName + " Started QueryID: " + this.queryID); + } + } + + private void logEndOf(String methodName) { + if (log.isInfoEnabled()) { + log.info(CLASS_NAME + " " + methodName + " Ended QueryID: " + this.queryID); + } + } + + @Override + public boolean hasTop() { + + boolean result; + + try { + logStartOf("hasTop()"); + result = source.hasTop(); + } finally { + logEndOf("hasTop()"); + } + return result; + } + + @Override + public void next() throws IOException { + try { + logStartOf("next()"); + source.next(); + } finally { + logEndOf("next()"); + } + } + + @Override + public Key getTopKey() { + Key k; + try { + logStartOf("getTopKey()"); + k = source.getTopKey(); + } finally { + logEndOf("getTopKey()"); + } + return k; + } + + @Override + public Value getTopValue() { + Value v; + try { + logStartOf("getTopValue()"); + v = source.getTopValue(); + } finally { + logEndOf("getTopValue()"); + } + return v; + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment iteratorEnvironment) { + + QueryLogIterator copy; + + try { + logStartOf("deepCopy()"); + copy = new QueryLogIterator(this, this.myEnvironment); + } finally { + logEndOf("deepCopy()"); + } + return copy; + } + + @Override + public void seek(Range range, Collection collection, boolean b) throws IOException { + + try { + logStartOf("seek()"); + this.source.seek(range, collection, b); + } finally { + logEndOf("seek()"); + } + } + + @Override + public IteratorOptions describeOptions() { + Map options = new HashMap<>(); + options.put(QUERY_ID, "The QueryID to be logged as methods are invoked"); + + return new IteratorOptions(getClass().getSimpleName(), "An iterator used to log the QueryID", options, null); + } + + @Override + public boolean validateOptions(Map options) { + return options.containsKey(QUERY_ID); + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java index 0f108147785..40ee086683f 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.TreeSet; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -39,12 +38,7 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.CharMatcher; import com.google.common.base.Function; import com.google.common.base.Predicate; @@ -283,6 +277,12 @@ public class QueryOptions implements OptionDescriber { public static final String FIELD_COUNTS = "field.counts"; public static final String TERM_COUNTS = "term.counts"; + + /** + * Controls whether a query's ID is logged on the tserver using {@link QueryLogIterator} + */ + public static final String TSERVER_LOGGING_ACTIVE = "tserver.logging.active"; + protected Map options; protected String scanId; @@ -450,6 +450,9 @@ public class QueryOptions implements OptionDescriber { private CountMap termCounts; private CountMapSerDe mapSerDe; + // Controls whether query IDs are logged on the tserver level via QueryLogIterator. + private boolean tserverLoggingActive = false; + public void deepCopy(QueryOptions other) { this.options = other.options; this.query = other.query; @@ -562,6 +565,7 @@ public void deepCopy(QueryOptions other) { this.fieldCounts = other.fieldCounts; this.termCounts = other.termCounts; + } public String getQuery() { @@ -1286,6 +1290,7 @@ public IteratorOptions describeOptions() { options.put(TERM_FREQUENCY_AGGREGATION_THRESHOLD_MS, "TermFrequency aggregations that exceed this threshold are logged as a warning"); options.put(FIELD_COUNTS, "Map of field counts from the global index"); options.put(TERM_COUNTS, "Map of term counts from the global index"); + options.put(TSERVER_LOGGING_ACTIVE, "Whether the queryID will be logged during queries"); return new IteratorOptions(getClass().getSimpleName(), "Runs a query against the DATAWAVE tables", options, null); } @@ -1782,6 +1787,10 @@ public boolean validateOptions(Map options) { } } + if (options.containsKey(TSERVER_LOGGING_ACTIVE)) { + this.tserverLoggingActive = Boolean.parseBoolean(options.get(TSERVER_LOGGING_ACTIVE)); + } + return true; } @@ -2267,6 +2276,14 @@ public void setTfAggregationThresholdMs(int tfAggregationThresholdMs) { this.tfAggregationThresholdMs = tfAggregationThresholdMs; } + public boolean isTserverLoggingActive() { + return this.tserverLoggingActive; + } + + public void setTserverLoggingActive(boolean tserverLoggingActive) { + this.tserverLoggingActive = tserverLoggingActive; + } + /** * Get an {@link Equality} * diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/SourcedOptions.java b/warehouse/query-core/src/main/java/datawave/query/iterator/SourcedOptions.java index 3b71812d88e..b8d149927a9 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/SourcedOptions.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/SourcedOptions.java @@ -1,13 +1,20 @@ package datawave.query.iterator; +import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.OptionDescriber; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; /** * Container for iterator options, plus a source iterator and its environment diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java index 3c7fd8ce575..e7cbc394214 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java @@ -90,6 +90,7 @@ import datawave.query.index.lookup.RangeStream; import datawave.query.iterator.CloseableListIterable; import datawave.query.iterator.QueryIterator; +import datawave.query.iterator.QueryLogIterator; import datawave.query.iterator.QueryOptions; import datawave.query.iterator.ivarator.IvaratorCacheDirConfig; import datawave.query.iterator.logic.IndexIterator; @@ -254,6 +255,7 @@ public class DefaultQueryPlanner extends QueryPlanner implements Cloneable { private List transformRules = Lists.newArrayList(); protected Class> queryIteratorClazz = QueryIterator.class; + protected Class> QueryLogIteratorClazz = QueryLogIterator.class; protected String plannedScript = null; @@ -272,6 +274,7 @@ public class DefaultQueryPlanner extends QueryPlanner implements Cloneable { protected ExecutorService builderThread = null; protected Future settingFuture = null; + protected Future settingFutureLog = null; protected long maxRangeWaitMillis = 125; @@ -330,6 +333,7 @@ protected DefaultQueryPlanner(DefaultQueryPlanner other) { setDisableExpandIndexFunction(other.disableExpandIndexFunction); rules.addAll(other.rules); queryIteratorClazz = other.queryIteratorClazz; + QueryLogIteratorClazz = other.QueryLogIteratorClazz; setMetadataHelper(other.getMetadataHelper()); setDateIndexHelper(other.getDateIndexHelper()); setCompressOptionMappings(other.getCompressOptionMappings()); @@ -407,11 +411,17 @@ public CloseableIterable process(GenericQueryConfiguration genericCon protected CloseableIterable process(ScannerFactory scannerFactory, MetadataHelper metadataHelper, DateIndexHelper dateIndexHelper, ShardQueryConfiguration config, String query, Query settings) throws DatawaveQueryException { settingFuture = null; + settingFutureLog = null; IteratorSetting cfg = null; + IteratorSetting logCfg = null; if (preloadOptions) { cfg = getQueryIterator(metadataHelper, config, settings, "", false, true); + // Create and add the QueryLogIterator only if active Tserver logging is true. + if (config.isTserverLoggingActive()) { + logCfg = getQueryLogIterator(metadataHelper, config, settings, "", false, true); + } } try { @@ -469,9 +479,17 @@ protected CloseableIterable process(ScannerFactory scannerFactory, Me cfg = getQueryIterator(metadataHelper, config, settings, "", false, false); } configureIterator(config, cfg, newQueryString, isFullTable); + + // Create and add the QueryLogIterator only if active Tserver logging is true. + if (config.isTserverLoggingActive()) { + while (null == logCfg) { + logCfg = getQueryLogIterator(metadataHelper, config, settings, "", false, false); + } + configureIterator(config, logCfg, newQueryString, isFullTable); + } } - final QueryData queryData = new QueryData().withQuery(newQueryString).withSettings(Lists.newArrayList(cfg)); + final QueryData queryData = new QueryData().withQuery(newQueryString).withSettings(Lists.newArrayList(cfg, logCfg)); stopwatch.stop(); @@ -545,6 +563,7 @@ private void configureIterator(ShardQueryConfiguration config, IteratorSetting c addOption(cfg, QueryOptions.FULL_TABLE_SCAN_ONLY, Boolean.toString(isFullTable), false); addOption(cfg, QueryOptions.TRACK_SIZES, Boolean.toString(config.isTrackSizes()), false); addOption(cfg, QueryOptions.ACTIVE_QUERY_LOG_NAME, config.getActiveQueryLogName(), false); + addOption(cfg, QueryOptions.TSERVER_LOGGING_ACTIVE, Boolean.toString(config.isTserverLoggingActive()), true); // Set the start and end dates configureTypeMappings(config, cfg, metadataHelper, getCompressOptionMappings()); } @@ -2144,6 +2163,7 @@ protected Future loadQueryIterator(final MetadataHelper metadat final String queryString, final Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { return builderThread.submit(() -> { + // VersioningIterator is typically set at 20 on the table IteratorSetting cfg = new IteratorSetting(config.getBaseIteratorPriority() + 40, "query", getQueryIteratorClass()); @@ -2237,6 +2257,23 @@ protected Future loadQueryIterator(final MetadataHelper metadat }); } + protected Future loadQueryLogIterator(final MetadataHelper metadataHelper, final ShardQueryConfiguration config, + final Query settings, final String queryString, final Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { + return builderThread.submit(() -> { + + // VersioningIterator is typically set at 20 on the table + IteratorSetting cfg = new IteratorSetting(config.getBaseIteratorPriority() + 50, "smuckerLog", getQueryLogIteratorClass()); + + if (config.isTserverLoggingActive()) { + System.out.println("INFO: " + Boolean.toString(config.isTserverLoggingActive()) + settings.getId().toString()); + addOption(cfg, QueryOptions.TSERVER_LOGGING_ACTIVE, Boolean.toString(config.isTserverLoggingActive()), false); + addOption(cfg, QueryOptions.QUERY_ID, settings.getId().toString(), false); + } + + return cfg; + }); + } + /** * Load indexed, index only, and composite fields into the query iterator * @@ -2359,6 +2396,39 @@ protected IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQ return null; } + /** + * Get the loaded {@link IteratorSetting} + * + * @param metadataHelper + * the {@link MetadataHelper} + * @param config + * the {@link ShardQueryConfiguration} + * @param settings + * the {@link Query} + * @param queryString + * the raw query string + * @param isFullTable + * a flag indicating if this is a full table scan + * @param isPreload + * a flag indicating the iterator is being loaded prior to planning + * @return a loaded {@link IteratorSetting} + * @throws DatawaveQueryException + * if something goes wrong + */ + protected IteratorSetting getQueryLogIterator(MetadataHelper metadataHelper, ShardQueryConfiguration config, Query settings, String queryString, + Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { + if (null == settingFutureLog) + settingFutureLog = loadQueryLogIterator(metadataHelper, config, settings, queryString, isFullTable, isPreload); + if (settingFutureLog.isDone()) + try { + return settingFutureLog.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + else + return null; + } + public static void configureTypeMappings(ShardQueryConfiguration config, IteratorSetting cfg, MetadataHelper metadataHelper, boolean compressMappings) throws DatawaveQueryException { configureTypeMappings(config, cfg, metadataHelper, compressMappings, false); @@ -3061,6 +3131,14 @@ public void setQueryIteratorClass(Class> getQueryLogIteratorClass() { + return QueryLogIteratorClazz; + } + + public void setQueryLogIteratorClass(Class> clazz) { + QueryLogIteratorClazz = clazz; + } + public void setPreloadOptions(boolean preloadOptions) { this.preloadOptions = preloadOptions; } diff --git a/warehouse/query-core/src/test/java/datawave/query/iterator/QueryLogIteratorTest.java b/warehouse/query-core/src/test/java/datawave/query/iterator/QueryLogIteratorTest.java new file mode 100644 index 00000000000..e59874f85fb --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/iterator/QueryLogIteratorTest.java @@ -0,0 +1,119 @@ +package datawave.query.iterator; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; +import datawave.security.util.ScannerHelper; +import org.apache.accumulo.core.client.*; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.util.*; + + +/** + * Unit test for {@link QueryLogIterator}. + */ +public class QueryLogIteratorTest { + + private static final String TABLE_NAME = "testTable"; + private static final String[] AUTHS = {"FOO", "BAR", "COB"}; + private final Value BLANK_VALUE = new Value(new byte[0]); + private final Set AUTHS_SET = Collections.singleton(new Authorizations(AUTHS)); + + private AccumuloClient accumuloClient; + + + @BeforeClass + public static void beforeClass() throws Exception { + File dir = new File(Objects.requireNonNull(ClassLoader.getSystemClassLoader().getResource(".")).toURI()); + File targetDir = dir.getParentFile(); + System.setProperty("hadoop.home.dir", targetDir.getAbsolutePath()); + } + + @Before + public void setUp() throws Exception { + accumuloClient = new InMemoryAccumuloClient("root", new InMemoryInstance(QueryLogIteratorTest.class.toString())); + if (!accumuloClient.tableOperations().exists(TABLE_NAME)) { + accumuloClient.tableOperations().create(TABLE_NAME); + } + + BatchWriterConfig config = new BatchWriterConfig(); + config.setMaxMemory(1000L); + try (BatchWriter writer = accumuloClient.createBatchWriter(TABLE_NAME, config)) { + Mutation mutation = new Mutation("fieldA"); + mutation.put("boo", "a", BLANK_VALUE); + mutation.put("boo", "b", BLANK_VALUE); + mutation.put("foo", "a", BLANK_VALUE); + mutation.put("foo", "c", BLANK_VALUE); + writer.addMutation(mutation); + writer.flush(); + } catch (MutationsRejectedException | TableNotFoundException e) { + throw new RuntimeException(e); + } + } + + @After + public void tearDown() throws Exception { + accumuloClient.tableOperations().deleteRows(TABLE_NAME, null, null); + } + + @Test + public void testLogging() throws TableNotFoundException { + Logger.getRootLogger().setLevel(Level.INFO); + + Scanner scanner = ScannerHelper.createScanner(accumuloClient, TABLE_NAME, AUTHS_SET); + scanner.setRange(new Range()); + + IteratorSetting iteratorSetting = new IteratorSetting(10, QueryLogIterator.class); + iteratorSetting.addOption(QueryOptions.QUERY_ID, "12345"); + scanner.addScanIterator(iteratorSetting); + + for (Map.Entry entry : scanner) { + // Do nothing here, we are examining logs only. + } + + // Logs will print to console with a start/end for each method in the iterator. + } + + /* @Test + public void test() { + TestAppender testAppender = new TestAppender(); + + Logger.getRootLogger().addAppender(testAppender); + ClassUnderTest.logMessage(); + LoggingEvent loggingEvent = testAppender.events.get(0); + //asset equals 1 because log level is info, change it to debug and + //the test will fail + assertTrue("Unexpected empty log",testAppender.events.size()==1); + assertEquals("Unexpected log level",Level.INFO,loggingEvent.getLevel()); + assertEquals("Unexpected log message" + ,loggingEvent.getMessage().toString() + ,"Hello Test"); + } + + public static class TestAppender extends AppenderSkeleton { + public List events = new ArrayList(); + public void close() {} + public boolean requiresLayout() {return false;} + @Override + protected void append(LoggingEvent event) { + events.add(event); + } + } +*/ +} + +