From aab6f6bd724886ea4ab93b95ca9985d6bf9f4b34 Mon Sep 17 00:00:00 2001 From: Seth Smucker Date: Thu, 21 Mar 2024 12:17:27 -0400 Subject: [PATCH] Add TServer logging of query IDs Added the query option tserver.logging.active to enable logging of query IDs to the TServer logs. Created the class QueryLogIterator to handle this. Logs the start and end of each method called. Fixes #2305 --- contrib/datawave-quickstart/bin/env.sh | 2 + .../query/config/ShardQueryConfiguration.java | 21 ++- .../query/iterator/QueryLogIterator.java | 145 ++++++++++++++++++ .../datawave/query/iterator/QueryOptions.java | 29 +++- .../query/iterator/SourcedOptions.java | 7 + .../query/planner/DefaultQueryPlanner.java | 80 +++++++++- .../query/iterator/QueryLogIteratorTest.java | 119 ++++++++++++++ 7 files changed, 394 insertions(+), 9 deletions(-) create mode 100644 warehouse/query-core/src/main/java/datawave/query/iterator/QueryLogIterator.java create mode 100644 warehouse/query-core/src/test/java/datawave/query/iterator/QueryLogIteratorTest.java diff --git a/contrib/datawave-quickstart/bin/env.sh b/contrib/datawave-quickstart/bin/env.sh index 1ffcfb5d430..80225cc72a3 100644 --- a/contrib/datawave-quickstart/bin/env.sh +++ b/contrib/datawave-quickstart/bin/env.sh @@ -62,3 +62,5 @@ fi # Order of registration is important, as it reflects install and startup order within global wrapper # functions such as 'allInstall', 'allStart', etc. Likewise, 'allStop' and 'allUninstall' perform # actions in reverse order of registration. See bin/common.sh for more info + + diff --git a/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java b/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java index 2dbfdf655b4..946d43860c7 100644 --- a/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java +++ b/warehouse/query-core/src/main/java/datawave/query/config/ShardQueryConfiguration.java @@ -494,6 +494,11 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement */ private boolean sortQueryByCounts = false; + /** + * Controls whether query IDs are logged on the tserver level via {@link datawave.query.iterator.QueryLogIterator}. + */ + private boolean tserverLoggingActive = true; + /** * Default constructor */ @@ -720,6 +725,7 @@ public ShardQueryConfiguration(ShardQueryConfiguration other) { this.setUseTermCounts(other.getUseTermCounts()); this.setSortQueryBeforeGlobalIndex(other.isSortQueryBeforeGlobalIndex()); this.setSortQueryByCounts(other.isSortQueryByCounts()); + this.setTserverLoggingActive(other.isTserverLoggingActive()); } /** @@ -2740,6 +2746,15 @@ public void setSortQueryByCounts(boolean sortQueryByCounts) { this.sortQueryByCounts = sortQueryByCounts; } + public boolean isTserverLoggingActive() { + return this.tserverLoggingActive; + } + + public void setTserverLoggingActive(boolean tserverLoggingActive) { + this.tserverLoggingActive = tserverLoggingActive; + } + + @Override public boolean equals(Object o) { if (this == o) @@ -2947,7 +2962,8 @@ public boolean equals(Object o) { getUseFieldCounts() == that.getUseFieldCounts() && getUseTermCounts() == that.getUseTermCounts() && isSortQueryBeforeGlobalIndex() == that.isSortQueryBeforeGlobalIndex() && - isSortQueryByCounts() == that.isSortQueryByCounts(); + isSortQueryByCounts() == that.isSortQueryByCounts() && + isTserverLoggingActive() == that.isTserverLoggingActive(); // @formatter:on } @@ -3152,7 +3168,8 @@ public int hashCode() { getUseFieldCounts(), getUseTermCounts(), isSortQueryBeforeGlobalIndex(), - isSortQueryByCounts()); + isSortQueryByCounts(), + isTserverLoggingActive()); // @formatter:on } diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryLogIterator.java b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryLogIterator.java new file mode 100644 index 00000000000..32078c2d29c --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryLogIterator.java @@ -0,0 +1,145 @@ +package datawave.query.iterator; + +import static datawave.query.iterator.QueryOptions.QUERY_ID; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.OptionDescriber; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.log4j.Logger; + +public class QueryLogIterator implements SortedKeyValueIterator, OptionDescriber { + + private static final Logger log = Logger.getLogger(QueryLogIterator.class); + + private static final String CLASS_NAME = QueryLogIterator.class.getSimpleName(); + private String queryID; + private SortedKeyValueIterator source; + private IteratorEnvironment myEnvironment; + + public QueryLogIterator() {} + + public QueryLogIterator(QueryLogIterator other, IteratorEnvironment env) { + this.myEnvironment = other.myEnvironment; + this.queryID = other.queryID; + } + + @Override + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { + + try { + this.queryID = options.get(QUERY_ID); + this.source = source; + this.myEnvironment = env; + logStartOf("init()"); + } finally { + logEndOf("init()"); + } + } + + private void logStartOf(String methodName) { + if (log.isInfoEnabled()) { + log.info(CLASS_NAME + " " + methodName + " Started QueryID: " + this.queryID); + } + } + + private void logEndOf(String methodName) { + if (log.isInfoEnabled()) { + log.info(CLASS_NAME + " " + methodName + " Ended QueryID: " + this.queryID); + } + } + + @Override + public boolean hasTop() { + + boolean result; + + try { + logStartOf("hasTop()"); + result = source.hasTop(); + } finally { + logEndOf("hasTop()"); + } + return result; + } + + @Override + public void next() throws IOException { + try { + logStartOf("next()"); + source.next(); + } finally { + logEndOf("next()"); + } + } + + @Override + public Key getTopKey() { + Key k; + try { + logStartOf("getTopKey()"); + k = source.getTopKey(); + } finally { + logEndOf("getTopKey()"); + } + return k; + } + + @Override + public Value getTopValue() { + Value v; + try { + logStartOf("getTopValue()"); + v = source.getTopValue(); + } finally { + logEndOf("getTopValue()"); + } + return v; + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment iteratorEnvironment) { + + QueryLogIterator copy; + + try { + logStartOf("deepCopy()"); + copy = new QueryLogIterator(this, this.myEnvironment); + } finally { + logEndOf("deepCopy()"); + } + return copy; + } + + @Override + public void seek(Range range, Collection collection, boolean b) throws IOException { + + try { + logStartOf("seek()"); + this.source.seek(range, collection, b); + } finally { + logEndOf("seek()"); + } + } + + @Override + public IteratorOptions describeOptions() { + Map options = new HashMap<>(); + options.put(QUERY_ID, "The QueryID to be logged as methods are invoked"); + + return new IteratorOptions(getClass().getSimpleName(), "An iterator used to log the QueryID", options, null); + } + + @Override + public boolean validateOptions(Map options) { + return options.containsKey(QUERY_ID); + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java index 0f108147785..40ee086683f 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.TreeSet; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -39,12 +38,7 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.CharMatcher; import com.google.common.base.Function; import com.google.common.base.Predicate; @@ -283,6 +277,12 @@ public class QueryOptions implements OptionDescriber { public static final String FIELD_COUNTS = "field.counts"; public static final String TERM_COUNTS = "term.counts"; + + /** + * Controls whether a query's ID is logged on the tserver using {@link QueryLogIterator} + */ + public static final String TSERVER_LOGGING_ACTIVE = "tserver.logging.active"; + protected Map options; protected String scanId; @@ -450,6 +450,9 @@ public class QueryOptions implements OptionDescriber { private CountMap termCounts; private CountMapSerDe mapSerDe; + // Controls whether query IDs are logged on the tserver level via QueryLogIterator. + private boolean tserverLoggingActive = false; + public void deepCopy(QueryOptions other) { this.options = other.options; this.query = other.query; @@ -562,6 +565,7 @@ public void deepCopy(QueryOptions other) { this.fieldCounts = other.fieldCounts; this.termCounts = other.termCounts; + } public String getQuery() { @@ -1286,6 +1290,7 @@ public IteratorOptions describeOptions() { options.put(TERM_FREQUENCY_AGGREGATION_THRESHOLD_MS, "TermFrequency aggregations that exceed this threshold are logged as a warning"); options.put(FIELD_COUNTS, "Map of field counts from the global index"); options.put(TERM_COUNTS, "Map of term counts from the global index"); + options.put(TSERVER_LOGGING_ACTIVE, "Whether the queryID will be logged during queries"); return new IteratorOptions(getClass().getSimpleName(), "Runs a query against the DATAWAVE tables", options, null); } @@ -1782,6 +1787,10 @@ public boolean validateOptions(Map options) { } } + if (options.containsKey(TSERVER_LOGGING_ACTIVE)) { + this.tserverLoggingActive = Boolean.parseBoolean(options.get(TSERVER_LOGGING_ACTIVE)); + } + return true; } @@ -2267,6 +2276,14 @@ public void setTfAggregationThresholdMs(int tfAggregationThresholdMs) { this.tfAggregationThresholdMs = tfAggregationThresholdMs; } + public boolean isTserverLoggingActive() { + return this.tserverLoggingActive; + } + + public void setTserverLoggingActive(boolean tserverLoggingActive) { + this.tserverLoggingActive = tserverLoggingActive; + } + /** * Get an {@link Equality} * diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/SourcedOptions.java b/warehouse/query-core/src/main/java/datawave/query/iterator/SourcedOptions.java index 3b71812d88e..b8d149927a9 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/SourcedOptions.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/SourcedOptions.java @@ -1,13 +1,20 @@ package datawave.query.iterator; +import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.OptionDescriber; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; /** * Container for iterator options, plus a source iterator and its environment diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java index 3c7fd8ce575..e7cbc394214 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java @@ -90,6 +90,7 @@ import datawave.query.index.lookup.RangeStream; import datawave.query.iterator.CloseableListIterable; import datawave.query.iterator.QueryIterator; +import datawave.query.iterator.QueryLogIterator; import datawave.query.iterator.QueryOptions; import datawave.query.iterator.ivarator.IvaratorCacheDirConfig; import datawave.query.iterator.logic.IndexIterator; @@ -254,6 +255,7 @@ public class DefaultQueryPlanner extends QueryPlanner implements Cloneable { private List transformRules = Lists.newArrayList(); protected Class> queryIteratorClazz = QueryIterator.class; + protected Class> QueryLogIteratorClazz = QueryLogIterator.class; protected String plannedScript = null; @@ -272,6 +274,7 @@ public class DefaultQueryPlanner extends QueryPlanner implements Cloneable { protected ExecutorService builderThread = null; protected Future settingFuture = null; + protected Future settingFutureLog = null; protected long maxRangeWaitMillis = 125; @@ -330,6 +333,7 @@ protected DefaultQueryPlanner(DefaultQueryPlanner other) { setDisableExpandIndexFunction(other.disableExpandIndexFunction); rules.addAll(other.rules); queryIteratorClazz = other.queryIteratorClazz; + QueryLogIteratorClazz = other.QueryLogIteratorClazz; setMetadataHelper(other.getMetadataHelper()); setDateIndexHelper(other.getDateIndexHelper()); setCompressOptionMappings(other.getCompressOptionMappings()); @@ -407,11 +411,17 @@ public CloseableIterable process(GenericQueryConfiguration genericCon protected CloseableIterable process(ScannerFactory scannerFactory, MetadataHelper metadataHelper, DateIndexHelper dateIndexHelper, ShardQueryConfiguration config, String query, Query settings) throws DatawaveQueryException { settingFuture = null; + settingFutureLog = null; IteratorSetting cfg = null; + IteratorSetting logCfg = null; if (preloadOptions) { cfg = getQueryIterator(metadataHelper, config, settings, "", false, true); + // Create and add the QueryLogIterator only if active Tserver logging is true. + if (config.isTserverLoggingActive()) { + logCfg = getQueryLogIterator(metadataHelper, config, settings, "", false, true); + } } try { @@ -469,9 +479,17 @@ protected CloseableIterable process(ScannerFactory scannerFactory, Me cfg = getQueryIterator(metadataHelper, config, settings, "", false, false); } configureIterator(config, cfg, newQueryString, isFullTable); + + // Create and add the QueryLogIterator only if active Tserver logging is true. + if (config.isTserverLoggingActive()) { + while (null == logCfg) { + logCfg = getQueryLogIterator(metadataHelper, config, settings, "", false, false); + } + configureIterator(config, logCfg, newQueryString, isFullTable); + } } - final QueryData queryData = new QueryData().withQuery(newQueryString).withSettings(Lists.newArrayList(cfg)); + final QueryData queryData = new QueryData().withQuery(newQueryString).withSettings(Lists.newArrayList(cfg, logCfg)); stopwatch.stop(); @@ -545,6 +563,7 @@ private void configureIterator(ShardQueryConfiguration config, IteratorSetting c addOption(cfg, QueryOptions.FULL_TABLE_SCAN_ONLY, Boolean.toString(isFullTable), false); addOption(cfg, QueryOptions.TRACK_SIZES, Boolean.toString(config.isTrackSizes()), false); addOption(cfg, QueryOptions.ACTIVE_QUERY_LOG_NAME, config.getActiveQueryLogName(), false); + addOption(cfg, QueryOptions.TSERVER_LOGGING_ACTIVE, Boolean.toString(config.isTserverLoggingActive()), true); // Set the start and end dates configureTypeMappings(config, cfg, metadataHelper, getCompressOptionMappings()); } @@ -2144,6 +2163,7 @@ protected Future loadQueryIterator(final MetadataHelper metadat final String queryString, final Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { return builderThread.submit(() -> { + // VersioningIterator is typically set at 20 on the table IteratorSetting cfg = new IteratorSetting(config.getBaseIteratorPriority() + 40, "query", getQueryIteratorClass()); @@ -2237,6 +2257,23 @@ protected Future loadQueryIterator(final MetadataHelper metadat }); } + protected Future loadQueryLogIterator(final MetadataHelper metadataHelper, final ShardQueryConfiguration config, + final Query settings, final String queryString, final Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { + return builderThread.submit(() -> { + + // VersioningIterator is typically set at 20 on the table + IteratorSetting cfg = new IteratorSetting(config.getBaseIteratorPriority() + 50, "smuckerLog", getQueryLogIteratorClass()); + + if (config.isTserverLoggingActive()) { + System.out.println("INFO: " + Boolean.toString(config.isTserverLoggingActive()) + settings.getId().toString()); + addOption(cfg, QueryOptions.TSERVER_LOGGING_ACTIVE, Boolean.toString(config.isTserverLoggingActive()), false); + addOption(cfg, QueryOptions.QUERY_ID, settings.getId().toString(), false); + } + + return cfg; + }); + } + /** * Load indexed, index only, and composite fields into the query iterator * @@ -2359,6 +2396,39 @@ protected IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQ return null; } + /** + * Get the loaded {@link IteratorSetting} + * + * @param metadataHelper + * the {@link MetadataHelper} + * @param config + * the {@link ShardQueryConfiguration} + * @param settings + * the {@link Query} + * @param queryString + * the raw query string + * @param isFullTable + * a flag indicating if this is a full table scan + * @param isPreload + * a flag indicating the iterator is being loaded prior to planning + * @return a loaded {@link IteratorSetting} + * @throws DatawaveQueryException + * if something goes wrong + */ + protected IteratorSetting getQueryLogIterator(MetadataHelper metadataHelper, ShardQueryConfiguration config, Query settings, String queryString, + Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { + if (null == settingFutureLog) + settingFutureLog = loadQueryLogIterator(metadataHelper, config, settings, queryString, isFullTable, isPreload); + if (settingFutureLog.isDone()) + try { + return settingFutureLog.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + else + return null; + } + public static void configureTypeMappings(ShardQueryConfiguration config, IteratorSetting cfg, MetadataHelper metadataHelper, boolean compressMappings) throws DatawaveQueryException { configureTypeMappings(config, cfg, metadataHelper, compressMappings, false); @@ -3061,6 +3131,14 @@ public void setQueryIteratorClass(Class> getQueryLogIteratorClass() { + return QueryLogIteratorClazz; + } + + public void setQueryLogIteratorClass(Class> clazz) { + QueryLogIteratorClazz = clazz; + } + public void setPreloadOptions(boolean preloadOptions) { this.preloadOptions = preloadOptions; } diff --git a/warehouse/query-core/src/test/java/datawave/query/iterator/QueryLogIteratorTest.java b/warehouse/query-core/src/test/java/datawave/query/iterator/QueryLogIteratorTest.java new file mode 100644 index 00000000000..e59874f85fb --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/iterator/QueryLogIteratorTest.java @@ -0,0 +1,119 @@ +package datawave.query.iterator; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; +import datawave.security.util.ScannerHelper; +import org.apache.accumulo.core.client.*; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.util.*; + + +/** + * Unit test for {@link QueryLogIterator}. + */ +public class QueryLogIteratorTest { + + private static final String TABLE_NAME = "testTable"; + private static final String[] AUTHS = {"FOO", "BAR", "COB"}; + private final Value BLANK_VALUE = new Value(new byte[0]); + private final Set AUTHS_SET = Collections.singleton(new Authorizations(AUTHS)); + + private AccumuloClient accumuloClient; + + + @BeforeClass + public static void beforeClass() throws Exception { + File dir = new File(Objects.requireNonNull(ClassLoader.getSystemClassLoader().getResource(".")).toURI()); + File targetDir = dir.getParentFile(); + System.setProperty("hadoop.home.dir", targetDir.getAbsolutePath()); + } + + @Before + public void setUp() throws Exception { + accumuloClient = new InMemoryAccumuloClient("root", new InMemoryInstance(QueryLogIteratorTest.class.toString())); + if (!accumuloClient.tableOperations().exists(TABLE_NAME)) { + accumuloClient.tableOperations().create(TABLE_NAME); + } + + BatchWriterConfig config = new BatchWriterConfig(); + config.setMaxMemory(1000L); + try (BatchWriter writer = accumuloClient.createBatchWriter(TABLE_NAME, config)) { + Mutation mutation = new Mutation("fieldA"); + mutation.put("boo", "a", BLANK_VALUE); + mutation.put("boo", "b", BLANK_VALUE); + mutation.put("foo", "a", BLANK_VALUE); + mutation.put("foo", "c", BLANK_VALUE); + writer.addMutation(mutation); + writer.flush(); + } catch (MutationsRejectedException | TableNotFoundException e) { + throw new RuntimeException(e); + } + } + + @After + public void tearDown() throws Exception { + accumuloClient.tableOperations().deleteRows(TABLE_NAME, null, null); + } + + @Test + public void testLogging() throws TableNotFoundException { + Logger.getRootLogger().setLevel(Level.INFO); + + Scanner scanner = ScannerHelper.createScanner(accumuloClient, TABLE_NAME, AUTHS_SET); + scanner.setRange(new Range()); + + IteratorSetting iteratorSetting = new IteratorSetting(10, QueryLogIterator.class); + iteratorSetting.addOption(QueryOptions.QUERY_ID, "12345"); + scanner.addScanIterator(iteratorSetting); + + for (Map.Entry entry : scanner) { + // Do nothing here, we are examining logs only. + } + + // Logs will print to console with a start/end for each method in the iterator. + } + + /* @Test + public void test() { + TestAppender testAppender = new TestAppender(); + + Logger.getRootLogger().addAppender(testAppender); + ClassUnderTest.logMessage(); + LoggingEvent loggingEvent = testAppender.events.get(0); + //asset equals 1 because log level is info, change it to debug and + //the test will fail + assertTrue("Unexpected empty log",testAppender.events.size()==1); + assertEquals("Unexpected log level",Level.INFO,loggingEvent.getLevel()); + assertEquals("Unexpected log message" + ,loggingEvent.getMessage().toString() + ,"Hello Test"); + } + + public static class TestAppender extends AppenderSkeleton { + public List events = new ArrayList(); + public void close() {} + public boolean requiresLayout() {return false;} + @Override + protected void append(LoggingEvent event) { + events.add(event); + } + } +*/ +} + +