Skip to content

Commit

Permalink
Add TServer logging of query IDs
Browse files Browse the repository at this point in the history
Added the query option tserver.logging.active to enable
logging of query IDs to the TServer logs. Created the
class QueryLogIterator to handle this. Logs the
start and end of each method called.

Fixes NationalSecurityAgency#2305
  • Loading branch information
SethSmucker committed Jun 5, 2024
1 parent c5a0f3a commit aab6f6b
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 9 deletions.
2 changes: 2 additions & 0 deletions contrib/datawave-quickstart/bin/env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ fi
# Order of registration is important, as it reflects install and startup order within global wrapper
# functions such as 'allInstall', 'allStart', etc. Likewise, 'allStop' and 'allUninstall' perform
# actions in reverse order of registration. See bin/common.sh for more info


Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
*/
private boolean sortQueryByCounts = false;

/**
* Controls whether query IDs are logged on the tserver level via {@link datawave.query.iterator.QueryLogIterator}.
*/
private boolean tserverLoggingActive = true;

/**
* Default constructor
*/
Expand Down Expand Up @@ -720,6 +725,7 @@ public ShardQueryConfiguration(ShardQueryConfiguration other) {
this.setUseTermCounts(other.getUseTermCounts());
this.setSortQueryBeforeGlobalIndex(other.isSortQueryBeforeGlobalIndex());
this.setSortQueryByCounts(other.isSortQueryByCounts());
this.setTserverLoggingActive(other.isTserverLoggingActive());
}

/**
Expand Down Expand Up @@ -2740,6 +2746,15 @@ public void setSortQueryByCounts(boolean sortQueryByCounts) {
this.sortQueryByCounts = sortQueryByCounts;
}

public boolean isTserverLoggingActive() {
return this.tserverLoggingActive;
}

public void setTserverLoggingActive(boolean tserverLoggingActive) {
this.tserverLoggingActive = tserverLoggingActive;
}


@Override
public boolean equals(Object o) {
if (this == o)
Expand Down Expand Up @@ -2947,7 +2962,8 @@ public boolean equals(Object o) {
getUseFieldCounts() == that.getUseFieldCounts() &&
getUseTermCounts() == that.getUseTermCounts() &&
isSortQueryBeforeGlobalIndex() == that.isSortQueryBeforeGlobalIndex() &&
isSortQueryByCounts() == that.isSortQueryByCounts();
isSortQueryByCounts() == that.isSortQueryByCounts() &&
isTserverLoggingActive() == that.isTserverLoggingActive();
// @formatter:on
}

Expand Down Expand Up @@ -3152,7 +3168,8 @@ public int hashCode() {
getUseFieldCounts(),
getUseTermCounts(),
isSortQueryBeforeGlobalIndex(),
isSortQueryByCounts());
isSortQueryByCounts(),
isTserverLoggingActive());
// @formatter:on
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package datawave.query.iterator;

import static datawave.query.iterator.QueryOptions.QUERY_ID;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.log4j.Logger;

public class QueryLogIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {

private static final Logger log = Logger.getLogger(QueryLogIterator.class);

private static final String CLASS_NAME = QueryLogIterator.class.getSimpleName();
private String queryID;
private SortedKeyValueIterator<Key,Value> source;
private IteratorEnvironment myEnvironment;

public QueryLogIterator() {}

public QueryLogIterator(QueryLogIterator other, IteratorEnvironment env) {
this.myEnvironment = other.myEnvironment;
this.queryID = other.queryID;
}

@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {

try {
this.queryID = options.get(QUERY_ID);
this.source = source;
this.myEnvironment = env;
logStartOf("init()");
} finally {
logEndOf("init()");
}
}

private void logStartOf(String methodName) {
if (log.isInfoEnabled()) {
log.info(CLASS_NAME + " " + methodName + " Started QueryID: " + this.queryID);
}
}

private void logEndOf(String methodName) {
if (log.isInfoEnabled()) {
log.info(CLASS_NAME + " " + methodName + " Ended QueryID: " + this.queryID);
}
}

@Override
public boolean hasTop() {

boolean result;

try {
logStartOf("hasTop()");
result = source.hasTop();
} finally {
logEndOf("hasTop()");
}
return result;
}

@Override
public void next() throws IOException {
try {
logStartOf("next()");
source.next();
} finally {
logEndOf("next()");
}
}

@Override
public Key getTopKey() {
Key k;
try {
logStartOf("getTopKey()");
k = source.getTopKey();
} finally {
logEndOf("getTopKey()");
}
return k;
}

@Override
public Value getTopValue() {
Value v;
try {
logStartOf("getTopValue()");
v = source.getTopValue();
} finally {
logEndOf("getTopValue()");
}
return v;
}

@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment iteratorEnvironment) {

QueryLogIterator copy;

try {
logStartOf("deepCopy()");
copy = new QueryLogIterator(this, this.myEnvironment);
} finally {
logEndOf("deepCopy()");
}
return copy;
}

@Override
public void seek(Range range, Collection<ByteSequence> collection, boolean b) throws IOException {

try {
logStartOf("seek()");
this.source.seek(range, collection, b);
} finally {
logEndOf("seek()");
}
}

@Override
public IteratorOptions describeOptions() {
Map<String,String> options = new HashMap<>();
options.put(QUERY_ID, "The QueryID to be logged as methods are invoked");

return new IteratorOptions(getClass().getSimpleName(), "An iterator used to log the QueryID", options, null);
}

@Override
public boolean validateOptions(Map<String,String> options) {
return options.containsKey(QUERY_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
Expand All @@ -39,12 +38,7 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CharMatcher;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
Expand Down Expand Up @@ -283,6 +277,12 @@ public class QueryOptions implements OptionDescriber {
public static final String FIELD_COUNTS = "field.counts";
public static final String TERM_COUNTS = "term.counts";


/**
* Controls whether a query's ID is logged on the tserver using {@link QueryLogIterator}
*/
public static final String TSERVER_LOGGING_ACTIVE = "tserver.logging.active";

protected Map<String,String> options;

protected String scanId;
Expand Down Expand Up @@ -450,6 +450,9 @@ public class QueryOptions implements OptionDescriber {
private CountMap termCounts;
private CountMapSerDe mapSerDe;

// Controls whether query IDs are logged on the tserver level via QueryLogIterator.
private boolean tserverLoggingActive = false;

public void deepCopy(QueryOptions other) {
this.options = other.options;
this.query = other.query;
Expand Down Expand Up @@ -562,6 +565,7 @@ public void deepCopy(QueryOptions other) {

this.fieldCounts = other.fieldCounts;
this.termCounts = other.termCounts;

}

public String getQuery() {
Expand Down Expand Up @@ -1286,6 +1290,7 @@ public IteratorOptions describeOptions() {
options.put(TERM_FREQUENCY_AGGREGATION_THRESHOLD_MS, "TermFrequency aggregations that exceed this threshold are logged as a warning");
options.put(FIELD_COUNTS, "Map of field counts from the global index");
options.put(TERM_COUNTS, "Map of term counts from the global index");
options.put(TSERVER_LOGGING_ACTIVE, "Whether the queryID will be logged during queries");
return new IteratorOptions(getClass().getSimpleName(), "Runs a query against the DATAWAVE tables", options, null);
}

Expand Down Expand Up @@ -1782,6 +1787,10 @@ public boolean validateOptions(Map<String,String> options) {
}
}

if (options.containsKey(TSERVER_LOGGING_ACTIVE)) {
this.tserverLoggingActive = Boolean.parseBoolean(options.get(TSERVER_LOGGING_ACTIVE));
}

return true;
}

Expand Down Expand Up @@ -2267,6 +2276,14 @@ public void setTfAggregationThresholdMs(int tfAggregationThresholdMs) {
this.tfAggregationThresholdMs = tfAggregationThresholdMs;
}

public boolean isTserverLoggingActive() {
return this.tserverLoggingActive;
}

public void setTserverLoggingActive(boolean tserverLoggingActive) {
this.tserverLoggingActive = tserverLoggingActive;
}

/**
* Get an {@link Equality}
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package datawave.query.iterator;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/**
* Container for iterator options, plus a source iterator and its environment
Expand Down
Loading

0 comments on commit aab6f6b

Please sign in to comment.