From dd6a584c63aedfdc38a75095fb75cd73a64dbb10 Mon Sep 17 00:00:00 2001 From: Christopher Gross cogross Date: Mon, 1 Apr 2024 12:11:05 +0000 Subject: [PATCH] fix major issues from sonarqube --- .../SecureEventSequenceFileInputFormat.java | 1 + .../data/tokenize/TokenizationHelper.java | 3 ++ .../reader/AbstractEventRecordReader.java | 3 ++ .../ingest/mapreduce/EventMapper.java | 9 +++++- .../handler/facet/HashTableFunction.java | 5 +++- .../ContentIndexingColumnBasedHandler.java | 5 +++- ...ndedContentIndexingColumnBasedHandler.java | 6 +++- .../job/BulkIngestMapFileLoader.java | 5 ++++ .../ingest/mapreduce/job/IngestJob.java | 1 + .../job/metrics/AggregatingMetricsStore.java | 3 ++ .../job/metrics/PassThroughMetricsStore.java | 3 ++ .../BulkIngestKeyAggregatingReducer.java | 3 ++ .../reduce/BulkIngestKeyDedupeCombiner.java | 3 ++ .../writer/AbstractChainedContextWriter.java | 3 ++ .../job/writer/TableCachingContextWriter.java | 3 ++ .../datawave/ingest/util/KillJobByRegex.java | 6 ++++ .../java/datawave/ingest/util/ThreadUtil.java | 1 + .../java/datawave/util/flag/FlagMaker.java | 6 +++- .../ingest/csv/mr/input/CSVReaderBase.java | 1 + .../wikipedia/WikipediaDataTypeHandler.java | 4 +++ .../metrics/mapreduce/MetricsIngester.java | 3 ++ .../metrics/mapreduce/util/JobSetupUtil.java | 3 ++ ...DatawaveFieldIndexCachingIteratorJexl.java | 4 +++ .../DatawaveFieldIndexRegexIteratorJexl.java | 2 -- .../mr/bulk/MultiRfileInputformat.java | 5 +++- .../java/datawave/mr/bulk/RecordIterator.java | 10 ++++++- .../java/datawave/mr/bulk/RfileScanner.java | 5 +++- .../mr/bulk/RfileSplitInputFormat.java | 1 + .../AncestorIndexBuildingVisitor.java | 2 +- .../query/ancestor/AncestorIndexIterator.java | 2 -- .../query/cardinality/CardinalityRecord.java | 10 +++++-- .../query/composite/CompositeRange.java | 28 ++++--------------- .../query/discovery/DiscoveryLogic.java | 8 ++++-- .../edge/DefaultExtendedEdgeQueryLogic.java | 2 +- .../datawave/query/function/LogTiming.java | 4 ++- .../lookup/ConcurrentScannerInitializer.java | 6 +++- .../datawave/query/iterator/QueryOptions.java | 4 ++- .../query/iterator/SourceManager.java | 5 ++-- .../query/iterator/filter/LoadDateFilter.java | 4 +-- .../iterator/pipeline/PipelineIterator.java | 3 ++ .../query/jexl/DatawaveArithmetic.java | 4 ++- .../EvaluationPhaseFilterFunctions.java | 4 ++- .../query/jexl/lookups/AsyncIndexLookup.java | 7 ++++- .../query/jexl/lookups/RegexIndexLookup.java | 6 ++-- .../ShardIndexQueryTableStaticMethods.java | 3 -- .../CustomWildcardQueryNodeProcessor.java | 6 ++-- .../metrics/ShardTableQueryMetricHandler.java | 7 ++--- .../query/planner/DefaultQueryPlanner.java | 5 +++- .../query/planner/IndexQueryPlanner.java | 5 +++- .../planner/ThreadedRangeBundlerIterator.java | 6 ++++ .../ChainableEventDataQueryFilter.java | 2 +- .../query/scheduler/PushdownFunction.java | 1 + .../query/tables/BatchScannerSession.java | 1 + .../query/tables/RangeStreamScanner.java | 8 +++++- .../datawave/query/tables/ScannerSession.java | 2 ++ .../query/tables/async/SpeculativeScan.java | 1 + .../results/cached/CachedResultsBean.java | 4 +++ .../CreateQuerySessionIDFilter.java | 4 +-- .../common/cache/SharedCacheCoordinator.java | 6 ++++ .../webservice/common/health/HealthBean.java | 1 + .../common/remote/RemoteHttpService.java | 2 ++ .../query-war/src/main/webapp/index.html | 2 +- .../datawave/webservice/mr/MapReduceBean.java | 3 ++ .../map/BulkResultsFileOutputMapper.java | 3 ++ .../map/BulkResultsTableOutputMapper.java | 3 ++ .../logic/composite/CompositeQueryLogic.java | 2 ++ .../CompositeQueryLogicResultsIterator.java | 1 + .../query/metric/QueryMetricHolder.java | 2 +- .../query/metric/QueryMetricMessage.java | 2 +- .../query/metric/QueryMetricsWriter.java | 13 +++++++-- .../query/runner/QueryExecutorBean.java | 1 + .../webservice/query/runner/RunningQuery.java | 6 ++++ .../security/cache/AccumuloCacheStore.java | 13 +++++++-- .../web-root/src/main/webapp/index.html | 5 ++-- 74 files changed, 247 insertions(+), 79 deletions(-) diff --git a/warehouse/assemble/webservice/src/main/java/datawave/webservice/mr/input/SecureEventSequenceFileInputFormat.java b/warehouse/assemble/webservice/src/main/java/datawave/webservice/mr/input/SecureEventSequenceFileInputFormat.java index 9b7de3928a5..5eed3c3c8cc 100644 --- a/warehouse/assemble/webservice/src/main/java/datawave/webservice/mr/input/SecureEventSequenceFileInputFormat.java +++ b/warehouse/assemble/webservice/src/main/java/datawave/webservice/mr/input/SecureEventSequenceFileInputFormat.java @@ -17,6 +17,7 @@ public RecordReader createRecordReader(InputSplit split, T try { reader.initialize(split, context); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException("Error initializing SecureEventSequenceFileRecordReader", e); } return reader; diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/tokenize/TokenizationHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/tokenize/TokenizationHelper.java index 931b2efa40e..3ecf92f76fb 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/tokenize/TokenizationHelper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/tokenize/TokenizationHelper.java @@ -1,6 +1,7 @@ package datawave.ingest.data.tokenize; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; @@ -25,6 +26,7 @@ public static class HeartBeatThread extends Thread { public static final long INTERVAL = 500; // half second resolution public static volatile int counter = 0; + public static long lastRun; static { @@ -41,6 +43,7 @@ public void run() { try { Thread.sleep(INTERVAL); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/input/reader/AbstractEventRecordReader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/input/reader/AbstractEventRecordReader.java index 52e0c50e864..069fe1343f0 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/input/reader/AbstractEventRecordReader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/input/reader/AbstractEventRecordReader.java @@ -175,6 +175,9 @@ public RawRecordContainer getEvent() { try { event.setRawRecordNumber(getCurrentKey().get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to get current key", e); } catch (Exception e) { throw new RuntimeException("Unable to get current key", e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java index edce34ef644..63baa7b8be7 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java @@ -253,6 +253,9 @@ public void setup(Context context) throws IOException, InterruptedException { try { contextWriter = contextWriterClass.getDeclaredConstructor().newInstance(); contextWriter.setup(filterConf, filterConf.getBoolean(CONTEXT_WRITER_OUTPUT_TABLE_COUNTERS, false)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } @@ -480,7 +483,11 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt NDC.push(origFiles.iterator().next()); reprocessedNDCPush = true; } - + } catch (InterruptedException e) { + contextWriter.rollback(); + Thread.currentThread().interrupt(); + log.error("Failed to clean event from error table. Terminating map", e); + throw new IOException("Failed to clean event from error table, Terminating map", e); } catch (Exception e) { contextWriter.rollback(); log.error("Failed to clean event from error table. Terminating map", e); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/facet/HashTableFunction.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/facet/HashTableFunction.java index 14d376fc24c..e8b3f758d06 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/facet/HashTableFunction.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/facet/HashTableFunction.java @@ -107,7 +107,10 @@ public Collection apply(@Nullable Collection getShardNamesAndValues(Raw if (indexField || reverseIndexField) { try { tokenizeField(analyzer, nci, indexField, reverseIndexField, reporter); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ex); } catch (Exception ex) { throw new RuntimeException(ex); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/tokenize/ExtendedContentIndexingColumnBasedHandler.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/tokenize/ExtendedContentIndexingColumnBasedHandler.java index 8141959ae98..5ddf3cceab8 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/tokenize/ExtendedContentIndexingColumnBasedHandler.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/tokenize/ExtendedContentIndexingColumnBasedHandler.java @@ -229,7 +229,10 @@ public void close(TaskAttemptContext context) { this.docWriterService.shutdown(); this.docWriterService.awaitTermination(1, TimeUnit.MINUTES); this.docWriter.close(); - } catch (InterruptedException | MutationsRejectedException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Unable to terminate document writing service!", e); + } catch (MutationsRejectedException e) { log.error("Unable to terminate document writing service!", e); } } @@ -669,6 +672,7 @@ public void run() { try { Thread.sleep(INTERVAL); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java index 45d720019a7..02e7d849981 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java @@ -512,6 +512,7 @@ public void run() { try { Thread.sleep(FAILURE_SLEEP_TIME); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); log.warn("Interrupted while sleeping.", ie); } } @@ -822,6 +823,7 @@ public void bringMapFilesOnline(Path mapFilesDir) throws IOException, AccumuloEx if (e == null) e = importTask.getException(); } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); // this task was interrupted, wait for the others and then terminate if (e == null) e = interrupted; @@ -844,6 +846,7 @@ public void bringMapFilesOnline(Path mapFilesDir) throws IOException, AccumuloEx if (e == null) e = importTask.getException(); } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); // this task was interrupted, wait for the others and then terminate if (e == null) e = interrupted; @@ -1238,6 +1241,7 @@ public void markSourceFilesLoaded(Path jobDirectory) throws IOException { } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); if (null != e.getCause()) throw new IOException(e.getCause().getMessage()); else @@ -1320,6 +1324,7 @@ private void sleep() { System.gc(); Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.warn("Interrupted while sleeping.", e); } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java index 064d970a901..895177531cb 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java @@ -416,6 +416,7 @@ public int run(String[] args) throws Exception { try { Thread.sleep(3000); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // do nothing } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/AggregatingMetricsStore.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/AggregatingMetricsStore.java index 6ef1b98e2fa..bafd7bb7465 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/AggregatingMetricsStore.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/AggregatingMetricsStore.java @@ -60,6 +60,9 @@ public void flush(ConcurrentMap counts) { byte[] countAsBytes = TextUtil.toUtf8(String.valueOf(entry.getValue().get())); Key key = KeyConverter.fromString(entry.getKey()); contextWriter.write(new BulkIngestKey(table, key), new Value(countAsBytes), context); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Could not flush metrics, dropping them", e); } catch (Exception e) { logger.error("Could not flush metrics, dropping them", e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/PassThroughMetricsStore.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/PassThroughMetricsStore.java index 033c35cbf2b..514154c356e 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/PassThroughMetricsStore.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/PassThroughMetricsStore.java @@ -47,6 +47,9 @@ public void increase(String key, long count) { Value v = (count == 1L && ONE != null) ? ONE : new Value(String.valueOf(count).getBytes(ENCODING)); Key k = KeyConverter.fromString(key); contextWriter.write(new BulkIngestKey(table, k), v, context); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Could not write metrics to the context writer, dropping them...", e); } catch (Exception e) { logger.error("Could not write metrics to the context writer, dropping them...", e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java index 1eed6e5f53b..2a7b0ccf036 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java @@ -65,6 +65,9 @@ protected void setupContextWriter(Configuration conf) throws IOException { try { setContextWriter(contextWriterClass.newInstance()); contextWriter.setup(conf, conf.getBoolean(CONTEXT_WRITER_OUTPUT_TABLE_COUNTERS, false)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyDedupeCombiner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyDedupeCombiner.java index ae64849a43f..c552a7b724b 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyDedupeCombiner.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyDedupeCombiner.java @@ -56,6 +56,9 @@ protected void setupContextWriter(Configuration conf) throws IOException { try { setContextWriter(contextWriterClass.getDeclaredConstructor().newInstance()); contextWriter.setup(conf, conf.getBoolean(CONTEXT_WRITER_OUTPUT_TABLE_COUNTERS, false)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/AbstractChainedContextWriter.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/AbstractChainedContextWriter.java index 842c028da10..09e617ed009 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/AbstractChainedContextWriter.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/AbstractChainedContextWriter.java @@ -44,6 +44,9 @@ public void setup(Configuration conf, boolean outputTableCounters) throws IOExce try { contextWriter = contextWriterClass.newInstance(); contextWriter.setup(conf, outputTableCounters); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + getChainedContextWriterOption(), e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + getChainedContextWriterOption(), e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/TableCachingContextWriter.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/TableCachingContextWriter.java index 4fdecb7b6ac..7fcb794fe24 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/TableCachingContextWriter.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/TableCachingContextWriter.java @@ -90,6 +90,9 @@ public void setup(Configuration conf, boolean outputTableCounters) throws IOExce try { contextWriter = contextWriterClass.getDeclaredConstructor().newInstance(); contextWriter.setup(conf, outputTableCounters); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/util/KillJobByRegex.java b/warehouse/ingest-core/src/main/java/datawave/ingest/util/KillJobByRegex.java index f48bb7eaff1..ba0cd7e1984 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/util/KillJobByRegex.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/util/KillJobByRegex.java @@ -38,6 +38,11 @@ public void run() { Job job = null; try { job = cluster.getJob(js.getJobID()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.out.println("Error getting job: " + js.getJobID() + " Reason: " + e.getMessage()); + JOB_FAILED_COUNT.getAndIncrement(); + return; } catch (Exception e) { System.out.println("Error getting job: " + js.getJobID() + " Reason: " + e.getMessage()); JOB_FAILED_COUNT.getAndIncrement(); @@ -82,6 +87,7 @@ public static void main(String[] args) throws IOException, InterruptedException JOB_KILLER_SVC.shutdown(); // signal shutdown JOB_KILLER_SVC.awaitTermination(1, TimeUnit.MINUTES); // allow processes to stop } catch (InterruptedException e) { + Thread.currentThread().interrupt(); JOB_KILLER_SVC.shutdownNow(); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/util/ThreadUtil.java b/warehouse/ingest-core/src/main/java/datawave/ingest/util/ThreadUtil.java index 2205ab79a29..07522a69270 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/util/ThreadUtil.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/util/ThreadUtil.java @@ -29,6 +29,7 @@ public static boolean shutdownAndWait(ThreadPoolExecutor executor, long timeToWa executor.awaitTermination(timeToWait, unit); return true; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); logger.warn("Closed thread pool but not all threads completed successfully."); return false; } diff --git a/warehouse/ingest-core/src/main/java/datawave/util/flag/FlagMaker.java b/warehouse/ingest-core/src/main/java/datawave/util/flag/FlagMaker.java index 66926763ee0..bd28da85514 100644 --- a/warehouse/ingest-core/src/main/java/datawave/util/flag/FlagMaker.java +++ b/warehouse/ingest-core/src/main/java/datawave/util/flag/FlagMaker.java @@ -226,6 +226,7 @@ public void run() { } catch (Exception ex) { log.error("An unexpected exception occurred. Exiting", ex); running = false; + Thread.currentThread().interrupt(); } } } finally { @@ -575,7 +576,10 @@ private boolean processResults(List> futures, Collection getSplits(JobContext job) throws IOException { List inputSplits = Lists.newArrayList(); try { inputSplits = computeSplitPoints(job, tableName, ranges); - } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException | InterruptedException e) { + } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { + throw new IOException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException(e); } diff --git a/warehouse/query-core/src/main/java/datawave/mr/bulk/RecordIterator.java b/warehouse/query-core/src/main/java/datawave/mr/bulk/RecordIterator.java index 11315035581..c114f1da1eb 100644 --- a/warehouse/query-core/src/main/java/datawave/mr/bulk/RecordIterator.java +++ b/warehouse/query-core/src/main/java/datawave/mr/bulk/RecordIterator.java @@ -347,6 +347,8 @@ protected synchronized void initialize(Configuration conf, boolean initRanges) t } catch (ExecutionException e) { close(); throw new RuntimeException(e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } catch (Exception e) { } @@ -562,6 +564,7 @@ protected synchronized void fail(Throwable t) { try { Thread.sleep(failureSleep); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); try { close(); } catch (IOException e1) { @@ -590,6 +593,11 @@ protected synchronized void fail(Throwable t) { seekLastSeen(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (!callClosed.get() && !Thread.interrupted()) { + fail(e); + } } catch (Exception e) { if (!callClosed.get() && !Thread.interrupted()) fail(e); @@ -853,7 +861,7 @@ public synchronized void setBlockFile(Reader reader) throws InterruptedException this.reader = reader; } - public FSDataInputStream getInputStream() { + public synchronized FSDataInputStream getInputStream() { return inputStream; } diff --git a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileScanner.java b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileScanner.java index e0bccbb81c8..981742cecc2 100644 --- a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileScanner.java +++ b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileScanner.java @@ -203,7 +203,10 @@ public Iterator> iterator() { log.trace("KV iterator is " + (kv == null ? "null" : "not null")); } } while (null == kv); - + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + IOUtils.cleanupWithLogger(null, this); + throw new RuntimeException(e); } catch (Exception e) { IOUtils.cleanupWithLogger(null, this); throw new RuntimeException(e); diff --git a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileSplitInputFormat.java b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileSplitInputFormat.java index 637c3ea12d2..17f700ef1fe 100644 --- a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileSplitInputFormat.java +++ b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileSplitInputFormat.java @@ -58,6 +58,7 @@ public List getSplits(JobContext job) throws IOException { try { split.add(frSplit); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } diff --git a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexBuildingVisitor.java b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexBuildingVisitor.java index 23b01f6c087..8569b925e11 100644 --- a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexBuildingVisitor.java +++ b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexBuildingVisitor.java @@ -251,7 +251,7 @@ private String getTLDId(Key key) { String uid = getUid(key); // if the uid is not empty - if (!uid.isEmpty()) { + if (uid != null && !uid.isEmpty()) { uid = TLD.parseRootPointerFromId(uid); } diff --git a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexIterator.java b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexIterator.java index 453a56b01ac..6e3f796ef83 100644 --- a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexIterator.java +++ b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexIterator.java @@ -33,8 +33,6 @@ protected AncestorIndexIterator(Builder builder) { @Override protected Range buildIndexRange(Range r) { Key start = r.getStartKey(); - Key end = r.getEndKey(); - String endCf = (end == null || end.getColumnFamily() == null ? "" : end.getColumnFamily().toString()); String startCf = (start == null || start.getColumnFamily() == null ? "" : start.getColumnFamily().toString()); // if the start key is inclusive, and contains a datatype/0UID, then move back to the top level ancestor diff --git a/warehouse/query-core/src/main/java/datawave/query/cardinality/CardinalityRecord.java b/warehouse/query-core/src/main/java/datawave/query/cardinality/CardinalityRecord.java index 2b9d971d84c..0e8067858a8 100644 --- a/warehouse/query-core/src/main/java/datawave/query/cardinality/CardinalityRecord.java +++ b/warehouse/query-core/src/main/java/datawave/query/cardinality/CardinalityRecord.java @@ -30,6 +30,8 @@ public class CardinalityRecord implements Serializable { private HashMultimap cardinalityMap = HashMultimap.create(); private static Logger log = Logger.getLogger(CardinalityRecord.class); + private static final Object LOCK = new Object(); + public enum DateType { DOCUMENT, CURRENT } @@ -236,7 +238,7 @@ public static void writeToDisk(final CardinalityRecord cardinalityRecord, final if (!cardinalityRecord.getCardinalityMap().isEmpty()) { ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(() -> { - synchronized (file) { + synchronized (LOCK) { ObjectOutputStream oos = null; try { FileOutputStream fos = new FileOutputStream(file); @@ -246,7 +248,7 @@ public static void writeToDisk(final CardinalityRecord cardinalityRecord, final log.error(e.getMessage(), e); } finally { IOUtils.closeQuietly(oos); - file.notify(); + file.notifyAll(); } } }); @@ -273,6 +275,10 @@ public void merge(File file) { CardinalityRecord cardinalityRecord = CardinalityRecord.readFromDisk(file); + if (cardinalityRecord == null) { + throw new IllegalArgumentException("ResultsCardinalityRecords have different resultCardinalityValueFields"); + } + int intersectionSize = Sets.intersection(this.resultCardinalityValueFields, cardinalityRecord.resultCardinalityValueFields).size(); if (intersectionSize != this.resultCardinalityValueFields.size() || intersectionSize != cardinalityRecord.resultCardinalityValueFields.size()) { throw new IllegalArgumentException("ResultsCardinalityRecords have different resultCardinalityValueFields"); diff --git a/warehouse/query-core/src/main/java/datawave/query/composite/CompositeRange.java b/warehouse/query-core/src/main/java/datawave/query/composite/CompositeRange.java index 27ce96858a3..ff4873513da 100644 --- a/warehouse/query-core/src/main/java/datawave/query/composite/CompositeRange.java +++ b/warehouse/query-core/src/main/java/datawave/query/composite/CompositeRange.java @@ -320,10 +320,10 @@ else if (node instanceof ASTEQNode) public int hashCode() { final int prime = 31; int result = super.hashCode(); - result = prime * result + ((expressionListLowerBound == null) ? 0 : expressionListLowerBound.hashCode()); - result = prime * result + ((jexlNodeListLowerBound == null) ? 0 : jexlNodeListLowerBound.hashCode()); - result = prime * result + ((expressionListUpperBound == null) ? 0 : expressionListUpperBound.hashCode()); - result = prime * result + ((jexlNodeListUpperBound == null) ? 0 : jexlNodeListUpperBound.hashCode()); + result = prime * result + expressionListLowerBound.hashCode(); + result = prime * result + jexlNodeListLowerBound.hashCode(); + result = prime * result + expressionListUpperBound.hashCode(); + result = prime * result + jexlNodeListUpperBound.hashCode(); return result; } @@ -338,25 +338,7 @@ public boolean equals(Object obj) { if (!super.equals(obj)) return false; CompositeRange other = (CompositeRange) obj; - if (expressionListLowerBound == null) { - if (other.expressionListLowerBound != null) - return false; - } else if (!expressionListLowerBound.equals(other.expressionListLowerBound)) - return false; - if (jexlNodeListLowerBound == null) { - if (other.jexlNodeListLowerBound != null) - return false; - } else if (!jexlNodeListLowerBound.equals(other.jexlNodeListLowerBound)) - return false; - if (expressionListUpperBound == null) { - if (other.expressionListUpperBound != null) - return false; - } else if (!expressionListUpperBound.equals(other.expressionListUpperBound)) - return false; - if (jexlNodeListUpperBound == null) { - if (other.jexlNodeListUpperBound != null) - return false; - } else if (!jexlNodeListUpperBound.equals(other.jexlNodeListUpperBound)) + if (!jexlNodeListUpperBound.equals(other.jexlNodeListUpperBound)) return false; return true; } diff --git a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryLogic.java b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryLogic.java index 59c2ba05d98..88b71c9595f 100644 --- a/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryLogic.java +++ b/warehouse/query-core/src/main/java/datawave/query/discovery/DiscoveryLogic.java @@ -219,13 +219,17 @@ public void setupQuery(GenericQueryConfiguration genericConfig) throws QueryExce if (!forward.isEmpty()) { BatchScanner bs = configureBatchScannerForDiscovery(config, scannerFactory, config.getIndexTableName(), forward, familiesToSeek, config.getLiterals(), config.getPatterns(), config.getRanges(), false); - iterators.add(transformScanner(bs)); + if (bs != null) { + iterators.add(transformScanner(bs)); + } } Collection reverse = seekRanges.getValue1(); if (!reverse.isEmpty()) { BatchScanner bs = configureBatchScannerForDiscovery(config, scannerFactory, config.getReverseIndexTableName(), reverse, familiesToSeek, config.getLiterals(), config.getPatterns(), config.getRanges(), true); - iterators.add(transformScanner(bs)); + if (bs != null) { + iterators.add(transformScanner(bs)); + } } config.setSeparateCountsByColVis(separateCountsByColVis); diff --git a/warehouse/query-core/src/main/java/datawave/query/edge/DefaultExtendedEdgeQueryLogic.java b/warehouse/query-core/src/main/java/datawave/query/edge/DefaultExtendedEdgeQueryLogic.java index 4f5d31cc023..069211a0fca 100644 --- a/warehouse/query-core/src/main/java/datawave/query/edge/DefaultExtendedEdgeQueryLogic.java +++ b/warehouse/query-core/src/main/java/datawave/query/edge/DefaultExtendedEdgeQueryLogic.java @@ -126,7 +126,7 @@ public void setupQuery(GenericQueryConfiguration configuration) throws Exception } } - if ((null == normalizedQuery || normalizedQuery.equals("")) && qData.getRanges().size() < 1) { + if (normalizedQuery.equals("") && qData.getRanges().size() < 1) { throw new IllegalStateException("Query string is empty after initial processing, no ranges or filters can be generated to execute."); } diff --git a/warehouse/query-core/src/main/java/datawave/query/function/LogTiming.java b/warehouse/query-core/src/main/java/datawave/query/function/LogTiming.java index eed51322de1..a2d450cf1cb 100644 --- a/warehouse/query-core/src/main/java/datawave/query/function/LogTiming.java +++ b/warehouse/query-core/src/main/java/datawave/query/function/LogTiming.java @@ -24,6 +24,8 @@ public class LogTiming implements Function,Entry initializeScannerStreams(List initializeScannerStreams(List options; protected String scanId; @@ -2150,7 +2152,7 @@ public void setStatsdHostAndPort(String statsdHostAndPort) { public QueryStatsDClient getStatsdClient() { if (statsdHostAndPort != null && queryId != null) { if (statsdClient == null) { - synchronized (queryId) { + synchronized (LOCK) { if (statsdClient == null) { setStatsdClient(new QueryStatsDClient(queryId, getStatsdHost(statsdHostAndPort), getStatsdPort(statsdHostAndPort), getStatsdMaxQueueSize())); diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/SourceManager.java b/warehouse/query-core/src/main/java/datawave/query/iterator/SourceManager.java index e1293ba5c10..cfda35fa919 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/SourceManager.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/SourceManager.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; @@ -27,7 +28,7 @@ public class SourceManager implements SortedKeyValueIterator { private static final Logger log = Logger.getLogger(SourceManager.class); protected volatile int sources = 0; - protected volatile int deepCopiesCalled = 0; + protected AtomicInteger deepCopiesCalled = new AtomicInteger(0); protected long initialSize = 0; protected long createdSize = 0; @@ -230,7 +231,7 @@ public void seek(Range range, Collection columnFamilies, boolean i * translate to realistic performance benefits. */ public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - deepCopiesCalled++; + deepCopiesCalled.incrementAndGet(); if (initialSize > 0) { Queue queue = sourceQueue; SourceManager nextSource = null; diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/filter/LoadDateFilter.java b/warehouse/query-core/src/main/java/datawave/query/iterator/filter/LoadDateFilter.java index 06afc382ac3..b85e16fcf56 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/filter/LoadDateFilter.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/filter/LoadDateFilter.java @@ -56,7 +56,7 @@ public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { * @throws IOException * for issues with read/write */ - private void initOptions(Map options) throws IOException { + private void initializeOptions(Map options) throws IOException { String e = options.get(ColumnRangeIterator.RANGE_NAME); if (e == null) { @@ -68,7 +68,7 @@ private void initOptions(Map options) throws IOException { @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws java.io.IOException { super.init(source, options, env); - initOptions(options); + initializeOptions(options); } /** diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/pipeline/PipelineIterator.java b/warehouse/query-core/src/main/java/datawave/query/iterator/pipeline/PipelineIterator.java index 01214d848e4..0c20e80a5c3 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/pipeline/PipelineIterator.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/pipeline/PipelineIterator.java @@ -151,6 +151,9 @@ private Entry getNext(boolean remove) { return next; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } // cancel out existing executions cancel(); diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/DatawaveArithmetic.java b/warehouse/query-core/src/main/java/datawave/query/jexl/DatawaveArithmetic.java index 5c668e88816..7988fad30cf 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/DatawaveArithmetic.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/DatawaveArithmetic.java @@ -26,6 +26,8 @@ public abstract class DatawaveArithmetic extends JexlArithmetic { private static final Logger log = Logger.getLogger(DatawaveArithmetic.class); + private static final Object LOCK = new Object(); + /** * Default to being lenient so we don't have to add "null" for every field in the query that doesn't exist in the document */ @@ -317,7 +319,7 @@ public static boolean matchesFst(Object object, FST fst) throws IOException { final IntsRefBuilder irBuilder = new IntsRefBuilder(); Util.toUTF16(object.toString(), irBuilder); final IntsRef ints = irBuilder.get(); - synchronized (fst) { + synchronized (LOCK) { return Util.get(fst, ints) != null; } } diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/functions/EvaluationPhaseFilterFunctions.java b/warehouse/query-core/src/main/java/datawave/query/jexl/functions/EvaluationPhaseFilterFunctions.java index ce38f809b38..4a4a13118d9 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/functions/EvaluationPhaseFilterFunctions.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/functions/EvaluationPhaseFilterFunctions.java @@ -53,6 +53,8 @@ public class EvaluationPhaseFilterFunctions { */ public static final String CASE_INSENSITIVE = ".*\\(\\?[idmsux]*-[dmsux]*i[idmsux]*\\).*"; + public static final Object LOCK = new Object(); + private static final Logger log = Logger.getLogger(EvaluationPhaseFilterFunctions.class); public static boolean occurrence(Iterable fieldValues, String operator, int count) { @@ -1549,7 +1551,7 @@ public static long getNextTime(long time, int granularity) { * if the value failed to be parsed using the supplied format */ public static long getTime(Object value, DateFormat format) throws ParseException { - synchronized (format) { + synchronized (LOCK) { return format.parse(ValueTuple.getStringValue(value)).getTime(); } } diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java index e6e261cd45d..2789d6d3b63 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java @@ -47,6 +47,7 @@ protected void timedScanWait(Future future, CountDownLatch startedLatch try { startedLatch.await(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new UnsupportedOperationException("Interrupted while waiting for IndexLookup to start", e); } } else { @@ -79,7 +80,10 @@ protected void timedScanWait(Future future, CountDownLatch startedLatch break; } - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } catch (TimeoutException e) { future.cancel(true); @@ -87,6 +91,7 @@ protected void timedScanWait(Future future, CountDownLatch startedLatch try { stoppedLatch.await(); } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); log.error("Interrupted waiting for canceled AsyncIndexLookup to complete."); throw new RuntimeException(ex); } diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/RegexIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/RegexIndexLookup.java index 6276bed3b8c..7c3d2fe743b 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/RegexIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/RegexIndexLookup.java @@ -45,6 +45,8 @@ * An asynchronous index lookup which looks up concrete values for the specified regex term. */ public class RegexIndexLookup extends AsyncIndexLookup { + + private static final Object LOCK = new Object(); private static final Logger log = ThreadConfigurableLogger.getLogger(RegexIndexLookup.class); protected MetadataHelper helper; @@ -307,7 +309,7 @@ protected Callable createTimedCallable(final Iterator> String field = holder.toString(); // synchronize access to fieldsToValues - synchronized (indexLookupMap) { + synchronized (LOCK) { // We are only returning a mapping of field value to field name, no need to // determine cardinality and such at this point. indexLookupMap.put(field, term); @@ -330,7 +332,7 @@ protected Callable createTimedCallable(final Iterator> log.debug("Failed or Timed out " + e); } // synchronize access to fieldsToValues - synchronized (indexLookupMap) { + synchronized (LOCK) { // Only if not doing an unfielded lookup should we mark all fields as having an exceeded threshold if (!unfieldedLookup) { for (String field : fields) { diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/ShardIndexQueryTableStaticMethods.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/ShardIndexQueryTableStaticMethods.java index dc2db8df405..ab09653044e 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/ShardIndexQueryTableStaticMethods.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/ShardIndexQueryTableStaticMethods.java @@ -539,9 +539,6 @@ public static final void configureGlobalIndexDataTypeFilter(ShardQueryConfigurat } IteratorSetting cfg = configureGlobalIndexDataTypeFilter(config, dataTypes); - if (cfg == null) { - return; - } bs.addScanIterator(cfg); } diff --git a/warehouse/query-core/src/main/java/datawave/query/language/processor/lucene/CustomWildcardQueryNodeProcessor.java b/warehouse/query-core/src/main/java/datawave/query/language/processor/lucene/CustomWildcardQueryNodeProcessor.java index 91b33bd9f1b..1bdc75765a8 100644 --- a/warehouse/query-core/src/main/java/datawave/query/language/processor/lucene/CustomWildcardQueryNodeProcessor.java +++ b/warehouse/query-core/src/main/java/datawave/query/language/processor/lucene/CustomWildcardQueryNodeProcessor.java @@ -39,7 +39,7 @@ protected QueryNode postProcessNode(QueryNode node) throws QueryNodeException { // Code below simulates the old lucene parser behavior for wildcards - if (isWildcard(text)) { + if (checkIfWildcard(text)) { if (isPrefixWildcard(text)) { return new PrefixWildcardQueryNode(fqn.getField(), text, fqn.getBegin(), fqn.getEnd()); } else { @@ -51,7 +51,7 @@ protected QueryNode postProcessNode(QueryNode node) throws QueryNodeException { return node; } - private boolean isWildcard(CharSequence text) { + private boolean checkIfWildcard(CharSequence text) { if (text == null || text.length() <= 0) return false; @@ -67,7 +67,7 @@ private boolean isWildcard(CharSequence text) { } private boolean isPrefixWildcard(CharSequence text) { - if (text == null || text.length() <= 0 || !isWildcard(text)) + if (text == null || text.length() <= 0 || !checkIfWildcard(text)) return false; // Validate last character is a '*' and was not escaped diff --git a/warehouse/query-core/src/main/java/datawave/query/metrics/ShardTableQueryMetricHandler.java b/warehouse/query-core/src/main/java/datawave/query/metrics/ShardTableQueryMetricHandler.java index 791c85f7802..204f1eba09a 100644 --- a/warehouse/query-core/src/main/java/datawave/query/metrics/ShardTableQueryMetricHandler.java +++ b/warehouse/query-core/src/main/java/datawave/query/metrics/ShardTableQueryMetricHandler.java @@ -599,8 +599,6 @@ public BaseQueryMetric toMetric(EventBase event) { m.setPlan(fieldValue); } else if (fieldName.equals("QUERY_LOGIC")) { m.setQueryLogic(fieldValue); - } else if (fieldName.equals("QUERY_ID")) { - m.setQueryId(fieldValue); } else if (fieldName.equals("BEGIN_DATE")) { try { Date d = sdf_date_time1.parse(fieldValue); @@ -701,8 +699,6 @@ public BaseQueryMetric toMetric(EventBase event) { m.addVersion(BaseQueryMetric.DATAWAVE, fieldValue); } else if (fieldName.startsWith("VERSION.")) { m.addVersion(fieldName.substring(8), fieldValue); - } else if (fieldName.equals("YIELD_COUNT")) { - m.setYieldCount(Long.parseLong(fieldValue)); } else if (fieldName.equals("LOGIN_TIME")) { m.setLoginTime(Long.parseLong(fieldValue)); } else { @@ -820,6 +816,9 @@ public void reload() { try { this.recordWriter.close(null); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.error(e.getMessage(), e); } } diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java index e8ea5e91cff..65988a88b03 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java @@ -2362,7 +2362,10 @@ protected IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQ if (settingFuture.isDone()) try { return settingFuture.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e.getCause()); + } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } else diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java index 513f1cd9a7f..9ec5061e3c2 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java @@ -44,7 +44,10 @@ public IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQuer if (null == cfg) { try { cfg = settingFuture.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } } diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/ThreadedRangeBundlerIterator.java b/warehouse/query-core/src/main/java/datawave/query/planner/ThreadedRangeBundlerIterator.java index 82d68f16b59..e84410b9095 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/ThreadedRangeBundlerIterator.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/ThreadedRangeBundlerIterator.java @@ -198,6 +198,9 @@ public boolean hasNext() { break; } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.error("Exception in ThreadedRangeBundlerIterator", e); throw new RuntimeException(e); } @@ -391,6 +394,9 @@ public void run() { } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(e); } finally { rangeConsumer.stop(); diff --git a/warehouse/query-core/src/main/java/datawave/query/predicate/ChainableEventDataQueryFilter.java b/warehouse/query-core/src/main/java/datawave/query/predicate/ChainableEventDataQueryFilter.java index c6caed26a97..463eecd3ba2 100644 --- a/warehouse/query-core/src/main/java/datawave/query/predicate/ChainableEventDataQueryFilter.java +++ b/warehouse/query-core/src/main/java/datawave/query/predicate/ChainableEventDataQueryFilter.java @@ -105,7 +105,7 @@ private Range updateRange(Range current, Range candidate) { } // test the end key - if (result == null || result.getEndKey() == null || result.getEndKey().compareTo(candidate.getEndKey()) >= 0) { + if (result.getEndKey() == null || result.getEndKey().compareTo(candidate.getEndKey()) >= 0) { result = new Range(result.getStartKey(), result.isStartKeyInclusive(), candidate.getEndKey(), result.isEndKeyInclusive() && candidate.isEndKeyInclusive()); } diff --git a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java index 6d74daa80b5..9e1b28b19d2 100644 --- a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java +++ b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java @@ -216,6 +216,7 @@ else if (ctx.getTableState(tableId) == TableState.OFFLINE) try { Thread.sleep(100); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } } else { diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java index fd3b2aff8e1..763bce095c4 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java @@ -649,6 +649,7 @@ protected void shutdownServices() { log.error("Executor did not fully shutdown"); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // we were interrupted while waiting } } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java b/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java index a35949efa6d..2a57043f274 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java @@ -375,6 +375,7 @@ public boolean hasNext() { currentEntry = resultQueue.poll(getPollTime(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.error(e); throw new RuntimeException(e); } @@ -407,7 +408,10 @@ private void submitTask() { Future future = myExecutor.submit(this); try { future.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } } @@ -446,6 +450,7 @@ protected int scannerInvariant(final Iterator> iter) { } prevDay = null; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return 0; } } @@ -541,6 +546,7 @@ protected int scannerInvariant(final Iterator> iter) { prevDay = myEntry; } } catch (InterruptedException exception) { + Thread.currentThread().interrupt(); prevDay = myEntry; } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java index 7741a43d1d5..22ca8b98973 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java @@ -321,6 +321,7 @@ public boolean hasNext() { currentEntry = resultQueue.poll(getPollTime(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.trace("hasNext" + isRunning() + " interrupted"); log.error("Interrupted before finding next", e); throw new RuntimeException(e); @@ -553,6 +554,7 @@ protected int scannerInvariant(final Iterator> iter) { try { accepted = resultQueue.offer(myEntry, 200, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // keep trying } } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/async/SpeculativeScan.java b/warehouse/query-core/src/main/java/datawave/query/tables/async/SpeculativeScan.java index 4e3720576e8..cd97ceaafeb 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/async/SpeculativeScan.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/async/SpeculativeScan.java @@ -205,6 +205,7 @@ public void onSuccess(Scan result) { } catch (InterruptedException e) { close(); + Thread.currentThread().interrupt(); throw new RuntimeException(e); } finally { writeControl.unlock(); diff --git a/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java b/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java index 6246258eca2..a5d3709965c 100644 --- a/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java +++ b/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java @@ -632,6 +632,9 @@ else if (maxValueLength > maxLength) { response.addException(e.getBottomQueryException()); throw new NoResultsException(e, response.getResult()); } catch (QueryCanceledQueryException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.info("Query " + queryId + " canceled on request"); if (crq != null) { crq.getMetric().setLifecycle(QueryMetric.Lifecycle.CANCELLED); @@ -2363,6 +2366,7 @@ public CachedRunningQuery retrieve(String id, String owner) throws IOException { outReadThread.join(); errReadThread.join(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.error("Thread interrupted waiting for import to finish, killing import process"); process.destroy(); } finally { diff --git a/web-services/common-util/src/main/java/datawave/resteasy/interceptor/CreateQuerySessionIDFilter.java b/web-services/common-util/src/main/java/datawave/resteasy/interceptor/CreateQuerySessionIDFilter.java index cf4d4d7ddfe..8e14cd7868b 100644 --- a/web-services/common-util/src/main/java/datawave/resteasy/interceptor/CreateQuerySessionIDFilter.java +++ b/web-services/common-util/src/main/java/datawave/resteasy/interceptor/CreateQuerySessionIDFilter.java @@ -43,7 +43,7 @@ public void filter(ContainerRequestContext request, ContainerResponseContext res // If we're sending an error response, then there's no need to set a cookie since // there's no query "session" to stick to this server. setCookie = false; - QUERY_ID.set(null); + QUERY_ID.remove(); break; default: @@ -51,7 +51,7 @@ public void filter(ContainerRequestContext request, ContainerResponseContext res log.error(method.getResourceClass() + "." + method.getMethod().getName() + " did not set QUERY_ID threadlocal."); } else { id = QUERY_ID.get(); - QUERY_ID.set(null); + QUERY_ID.remove(); } break; } diff --git a/web-services/common/src/main/java/datawave/webservice/common/cache/SharedCacheCoordinator.java b/web-services/common/src/main/java/datawave/webservice/common/cache/SharedCacheCoordinator.java index 363da12ef16..9eb226701f0 100644 --- a/web-services/common/src/main/java/datawave/webservice/common/cache/SharedCacheCoordinator.java +++ b/web-services/common/src/main/java/datawave/webservice/common/cache/SharedCacheCoordinator.java @@ -740,11 +740,17 @@ protected void reapEvictions() { String recursiveDeletePath = ZKPaths.makePath(curatorClient.getNamespace(), path); ZKUtil.deleteRecursive(curatorClient.getZookeeperClient().getZooKeeper(), recursiveDeletePath); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.trace("Problem deleting " + path + " (this may be ok): " + e.getMessage(), e); } } } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.warn("Error cleaning up eviction notices: " + e.getMessage(), e); } } diff --git a/web-services/common/src/main/java/datawave/webservice/common/health/HealthBean.java b/web-services/common/src/main/java/datawave/webservice/common/health/HealthBean.java index e7d993fee68..aaf8f832fde 100644 --- a/web-services/common/src/main/java/datawave/webservice/common/health/HealthBean.java +++ b/web-services/common/src/main/java/datawave/webservice/common/health/HealthBean.java @@ -187,6 +187,7 @@ public Response waitForQueryCompletion(@QueryParam("timeoutMinutes") @DefaultVal try { Thread.sleep(queryCompletionWaitIntervalMillis); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.warn("Interrupted while waiting for queries to complete."); } } diff --git a/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java b/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java index 24814d6ab8d..0c9ba5ee72e 100644 --- a/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java +++ b/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java @@ -208,6 +208,7 @@ protected void shutdown() { try { Thread.sleep(1000L); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); break; } totalWait = System.currentTimeMillis() - waitStart; @@ -583,6 +584,7 @@ public boolean retryRequest(IOException exception, int executionCount, HttpConte try { Thread.sleep(unavailableRetryDelay); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Ignore -- we'll just end up retrying a little too fast } } diff --git a/web-services/examples/query-war/src/main/webapp/index.html b/web-services/examples/query-war/src/main/webapp/index.html index 25247f89bcb..db205c3f054 100644 --- a/web-services/examples/query-war/src/main/webapp/index.html +++ b/web-services/examples/query-war/src/main/webapp/index.html @@ -1,6 +1,6 @@ - + DataWave Web Service Examples diff --git a/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java b/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java index c5c0d21f288..58ba0a87bd5 100644 --- a/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java +++ b/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java @@ -507,6 +507,9 @@ public GenericResponse submit(@FormParam("jobName") String jobName, @For try { j.submit(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } QueryException qe = new QueryException(DatawaveErrorCode.MAPREDUCE_JOB_START_ERROR, e); log.error(qe.getMessage(), qe); response.addException(qe.getBottomQueryException()); diff --git a/web-services/map-reduce/src/main/java/datawave/webservice/mr/bulkresults/map/BulkResultsFileOutputMapper.java b/web-services/map-reduce/src/main/java/datawave/webservice/mr/bulkresults/map/BulkResultsFileOutputMapper.java index 08e6d834c21..592838e7309 100644 --- a/web-services/map-reduce/src/main/java/datawave/webservice/mr/bulkresults/map/BulkResultsFileOutputMapper.java +++ b/web-services/map-reduce/src/main/java/datawave/webservice/mr/bulkresults/map/BulkResultsFileOutputMapper.java @@ -132,6 +132,9 @@ protected void map(Key key, Value value, org.apache.hadoop.mapreduce.Mapper getMetricsFromQueue(int batchSize, long maxLaten metricHolderList.add(holder); } } catch (InterruptedException e) { - + Thread.currentThread().interrupt(); } } if (metricHolderList.size() > 0 || blockingQueue.size() > 0) { @@ -312,6 +316,9 @@ public void run() { } } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.error(e.getMessage(), e); } } diff --git a/web-services/query/src/main/java/datawave/webservice/query/runner/QueryExecutorBean.java b/web-services/query/src/main/java/datawave/webservice/query/runner/QueryExecutorBean.java index 0e35675483e..85820659299 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/runner/QueryExecutorBean.java +++ b/web-services/query/src/main/java/datawave/webservice/query/runner/QueryExecutorBean.java @@ -1274,6 +1274,7 @@ public VoidResponse reset(@Required("id") @PathParam("id") String id) { CreateQuerySessionIDFilter.QUERY_ID.set(id); return response; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); if (query != null) { query.getMetric().setLifecycle(QueryMetric.Lifecycle.CANCELLED); } diff --git a/web-services/query/src/main/java/datawave/webservice/query/runner/RunningQuery.java b/web-services/query/src/main/java/datawave/webservice/query/runner/RunningQuery.java index f34e5f0df98..f91345af1f2 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/runner/RunningQuery.java +++ b/web-services/query/src/main/java/datawave/webservice/query/runner/RunningQuery.java @@ -251,6 +251,7 @@ private Object getResultsThread() { try { Thread.sleep(1); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } } @@ -272,6 +273,9 @@ private Object getResultsThread() { } } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (settings.getUncaughtExceptionHandler() != null) { settings.getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e); } else { @@ -315,6 +319,7 @@ private boolean hasNext(long pageStartTime) throws TimeoutException { try { hasNext.wait(timeout); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // if we got interrupted, then just return false return false; } @@ -358,6 +363,7 @@ private Object getNext(long pageStartTime) throws TimeoutException { try { gotNext.wait(timeout); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // if we got interrupted, then just return null return null; } diff --git a/web-services/security/src/main/java/datawave/security/cache/AccumuloCacheStore.java b/web-services/security/src/main/java/datawave/security/cache/AccumuloCacheStore.java index b5a7fbd9fc3..080274816fc 100644 --- a/web-services/security/src/main/java/datawave/security/cache/AccumuloCacheStore.java +++ b/web-services/security/src/main/java/datawave/security/cache/AccumuloCacheStore.java @@ -191,7 +191,10 @@ public boolean delete(Object key) { } catch (MutationsRejectedException e) { throw new PersistenceException("Unable to write cache value to Accumulo", e); } - } catch (IOException | InterruptedException e) { + } catch (IOException e) { + throw new PersistenceException("Unable to serialize key: " + key, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new PersistenceException("Unable to serialize key: " + key, e); } } @@ -209,7 +212,10 @@ public MarshalledEntry _load(Object key, boolean loadValue, boolean loadMet scanner.setRange(new Range(new Text(keyBytes))); } catch (TableNotFoundException e) { throw new PersistenceException(e); - } catch (IOException | InterruptedException e) { + } catch (IOException e) { + throw new PersistenceException("Unable to serialize key " + key, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new PersistenceException("Unable to serialize key " + key, e); } @@ -296,6 +302,9 @@ public void process(KeyFilter filter, CacheLoaderTask task, Exec task.processEntry(marshalledEntry, taskContext); } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new PersistenceException(e); } } diff --git a/web-services/web-root/src/main/webapp/index.html b/web-services/web-root/src/main/webapp/index.html index 8be6325e0bb..0147d934b51 100644 --- a/web-services/web-root/src/main/webapp/index.html +++ b/web-services/web-root/src/main/webapp/index.html @@ -1,8 +1,9 @@ - + + DataWave

This is DataWave.

- \ No newline at end of file +